Commit cb3cb2cd883d50c765e236ab9f3c3c0b80252f30

Authored by Igor Kulikov
2 parents 9ee06e0e baaa26ff
... ... @@ -78,6 +78,43 @@
78 78 <artifactId>mockito-all</artifactId>
79 79 <scope>test</scope>
80 80 </dependency>
81   -
82 81 </dependencies>
  82 +
  83 +
  84 + <build>
  85 + <plugins>
  86 + <plugin>
  87 + <groupId>org.apache.maven.plugins</groupId>
  88 + <artifactId>maven-shade-plugin</artifactId>
  89 + <executions>
  90 + <execution>
  91 + <phase>package</phase>
  92 + <goals>
  93 + <goal>shade</goal>
  94 + </goals>
  95 + <configuration>
  96 + <filters>
  97 + <filter>
  98 + <artifact>*:*</artifact>
  99 + <excludes>
  100 + <exclude>META-INF/*.SF</exclude>
  101 + <exclude>META-INF/*.DSA</exclude>
  102 + <exclude>META-INF/*.RSA</exclude>
  103 + </excludes>
  104 + </filter>
  105 + </filters>
  106 + <transformers>
  107 + <transformer
  108 + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  109 + <manifestEntries>
  110 + <Main-Class>org.thingsboard.client.tools.MqttStressTestTool</Main-Class>
  111 + </manifestEntries>
  112 + </transformer>
  113 + </transformers>
  114 + </configuration>
  115 + </execution>
  116 + </executions>
  117 + </plugin>
  118 + </plugins>
  119 + </build>
83 120 </project>
... ...
... ... @@ -40,10 +40,10 @@ public class MqttStressTestClient {
40 40 this.client = new MqttAsyncClient(brokerUri, clientId, persistence);
41 41 }
42 42
43   - public void connect() throws MqttException {
  43 + public IMqttToken connect() throws MqttException {
44 44 MqttConnectOptions options = new MqttConnectOptions();
45 45 options.setUserName(deviceToken);
46   - client.connect(options, null, new IMqttActionListener() {
  46 + return client.connect(options, null, new IMqttActionListener() {
47 47 @Override
48 48 public void onSuccess(IMqttToken iMqttToken) {
49 49 log.info("OnSuccess");
... ... @@ -60,6 +60,22 @@ public class MqttStressTestClient {
60 60 client.disconnect();
61 61 }
62 62
  63 +
  64 +
  65 + public void warmUp(byte[] data) throws MqttException {
  66 + MqttMessage msg = new MqttMessage(data);
  67 + client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
  68 + @Override
  69 + public void onSuccess(IMqttToken asyncActionToken) {
  70 + }
  71 +
  72 + @Override
  73 + public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
  74 + }
  75 + }).waitForCompletion();
  76 + }
  77 +
  78 +
63 79 public void publishTelemetry(byte[] data) throws MqttException {
64 80 long sendTime = System.currentTimeMillis();
65 81 MqttMessage msg = new MqttMessage(data);
... ... @@ -67,14 +83,12 @@ public class MqttStressTestClient {
67 83 @Override
68 84 public void onSuccess(IMqttToken asyncActionToken) {
69 85 long ackTime = System.currentTimeMillis();
70   -// log.info("Delivery time: {}", ackTime - sendTime);
71 86 results.onResult(true, ackTime - sendTime);
72 87 }
73 88
74 89 @Override
75 90 public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
76 91 long failTime = System.currentTimeMillis();
77   -// log.info("Failure time: {}", failTime - sendTime);
78 92 results.onResult(false, failTime - sendTime);
79 93 }
80 94 });
... ...
1   -package org.thingsboard.client.tools; /**
  1 +/**
2 2 * Copyright © 2016 The Thingsboard Authors
3 3 *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
... ... @@ -13,14 +13,32 @@ package org.thingsboard.client.tools; /**
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
  16 +package org.thingsboard.client.tools; /**
  17 + * Copyright © 2016 The Thingsboard Authors
  18 + * <p>
  19 + * Licensed under the Apache License, Version 2.0 (the "License");
  20 + * you may not use this file except in compliance with the License.
  21 + * You may obtain a copy of the License at
  22 + * <p>
  23 + * http://www.apache.org/licenses/LICENSE-2.0
  24 + * <p>
  25 + * Unless required by applicable law or agreed to in writing, software
  26 + * distributed under the License is distributed on an "AS IS" BASIS,
  27 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  28 + * See the License for the specific language governing permissions and
  29 + * limitations under the License.
  30 + */
  31 +
16 32 import lombok.extern.slf4j.Slf4j;
  33 +import org.eclipse.paho.client.mqttv3.IMqttToken;
17 34 import org.thingsboard.server.common.data.Device;
18 35 import org.thingsboard.server.common.data.security.DeviceCredentials;
19 36
20 37 import java.nio.charset.StandardCharsets;
21 38 import java.util.ArrayList;
22 39 import java.util.List;
23   -import java.util.concurrent.TimeUnit;
  40 +import java.util.UUID;
  41 +import java.util.concurrent.*;
24 42 import java.util.concurrent.atomic.AtomicLong;
25 43
26 44 /**
... ... @@ -29,60 +47,83 @@ import java.util.concurrent.atomic.AtomicLong;
29 47 @Slf4j
30 48 public class MqttStressTestTool {
31 49
32   - private static final long TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
33   - private static final long TEST_ITERATION = TimeUnit.MILLISECONDS.toMillis(100);
34   - private static final long TEST_SUB_ITERATION = TimeUnit.MILLISECONDS.toMillis(2);
35   - private static final int DEVICE_COUNT = 100;
36   - private static final String BASE_URL = "http://localhost:8080";
37   - private static final String[] MQTT_URLS = {"tcp://localhost:1883"};
38   -// private static final String[] MQTT_URLS = {"tcp://localhost:1883", "tcp://localhost:1884", "tcp://localhost:1885"};
39   - private static final String USERNAME = "tenant@thingsboard.org";
40   - private static final String PASSWORD = "tenant";
  50 + public static void main(String[] args) throws Exception {
  51 + TestParams params = new TestParams();
  52 + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
41 53
42 54
43   - public static void main(String[] args) throws Exception {
  55 + if (params.getDuration() % params.getIterationInterval() != 0) {
  56 + throw new IllegalArgumentException("Test Duration % Iteration Interval != 0");
  57 + }
  58 +
  59 + if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) {
  60 + throw new IllegalArgumentException("Iteration Interval % Device Count != 0");
  61 + }
  62 +
44 63 ResultAccumulator results = new ResultAccumulator();
45 64
46 65 AtomicLong value = new AtomicLong(Long.MAX_VALUE);
47 66 log.info("value: {} ", value.incrementAndGet());
48 67
49   - RestClient restClient = new RestClient(BASE_URL);
50   - restClient.login(USERNAME, PASSWORD);
  68 + RestClient restClient = new RestClient(params.getRestApiUrl());
  69 + restClient.login(params.getUsername(), params.getPassword());
51 70
52 71 List<MqttStressTestClient> clients = new ArrayList<>();
53   - for (int i = 0; i < DEVICE_COUNT; i++) {
54   - Device device = restClient.createDevice("Device " + i);
  72 + List<IMqttToken> connectTokens = new ArrayList<>();
  73 + for (int i = 0; i < params.getDeviceCount(); i++) {
  74 + Device device = restClient.createDevice("Device " + UUID.randomUUID());
55 75 DeviceCredentials credentials = restClient.getCredentials(device.getId());
56   - String mqttURL = MQTT_URLS[i % MQTT_URLS.length];
  76 + String[] mqttUrls = params.getMqttUrls();
  77 + String mqttURL = mqttUrls[i % mqttUrls.length];
57 78 MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
58   - client.connect();
  79 + connectTokens.add(client.connect());
59 80 clients.add(client);
60 81 }
61   - Thread.sleep(1000);
62 82
  83 + for (IMqttToken tokens : connectTokens) {
  84 + tokens.waitForCompletion();
  85 + }
63 86
64 87 byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8);
65   - long startTime = System.currentTimeMillis();
66   - int iterationsCount = (int) (TEST_DURATION / TEST_ITERATION);
67   - int subIterationsCount = (int) (TEST_ITERATION / TEST_SUB_ITERATION);
68   - if (clients.size() % subIterationsCount != 0) {
69   - throw new IllegalArgumentException("Invalid parameter exception!");
  88 +
  89 + for (MqttStressTestClient client : clients) {
  90 + client.warmUp(data);
70 91 }
  92 +
  93 + Thread.sleep(1000);
  94 +
  95 + long startTime = System.currentTimeMillis();
  96 + int iterationsCount = (int) (params.getDuration() / params.getIterationInterval());
  97 + int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount());
  98 +
  99 + List<ScheduledFuture<Void>> iterationFutures = new ArrayList<>();
71 100 for (int i = 0; i < iterationsCount; i++) {
72   - for (int j = 0; j < subIterationsCount; j++) {
73   - int packSize = clients.size() / subIterationsCount;
74   - for (int k = 0; k < packSize; k++) {
75   - int clientIndex = packSize * j + k;
76   - clients.get(clientIndex).publishTelemetry(data);
  101 + long delay = i * params.getIterationInterval();
  102 + iterationFutures.add(scheduler.schedule((Callable<Void>) () -> {
  103 + long sleepMicroSeconds = 0L;
  104 + for (MqttStressTestClient client : clients) {
  105 + client.publishTelemetry(data);
  106 + sleepMicroSeconds += subIterationMicroSeconds;
  107 + if (sleepMicroSeconds > 1000) {
  108 + Thread.sleep(sleepMicroSeconds / 1000);
  109 + sleepMicroSeconds = sleepMicroSeconds % 1000;
  110 + }
77 111 }
78   - Thread.sleep(TEST_SUB_ITERATION);
79   - }
  112 + return null;
  113 + }, delay, TimeUnit.MILLISECONDS));
80 114 }
  115 +
  116 + for (ScheduledFuture<Void> future : iterationFutures) {
  117 + future.get();
  118 + }
  119 +
81 120 Thread.sleep(1000);
  121 +
82 122 for (MqttStressTestClient client : clients) {
83 123 client.disconnect();
84 124 }
85 125 log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime);
  126 + scheduler.shutdownNow();
86 127 }
87 128
88 129 }
... ...
... ... @@ -73,7 +73,7 @@ public class ResultAccumulator {
73 73
74 74 @Override
75 75 public String toString() {
76   - return "org.thingsboard.client.tools.ResultAccumulator{" +
  76 + return "Result {" +
77 77 "successCount=" + getSuccessCount() +
78 78 ", errorCount=" + getErrorCount() +
79 79 ", totalTime=" + getTimeSpent() +
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.client.tools;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +
  20 +import java.io.FileInputStream;
  21 +import java.io.IOException;
  22 +import java.util.Properties;
  23 +import java.util.concurrent.TimeUnit;
  24 +
  25 +@Slf4j
  26 +public class TestParams {
  27 + static final String TEST_PROPERTIES = "test.properties";
  28 + static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
  29 + static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
  30 + static final int DEFAULT_DEVICE_COUNT = 100;
  31 + static final String DEFAULT_REST_URL = "http://localhost:8080";
  32 + static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
  33 + static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
  34 + static final String DEFAULT_PASSWORD = "tenant";
  35 +
  36 + private Properties params = new Properties();
  37 +
  38 + public TestParams() throws IOException {
  39 + try {
  40 + params.load(new FileInputStream(TEST_PROPERTIES));
  41 + } catch (Exception e) {
  42 + log.warn("Failed to read " + TEST_PROPERTIES);
  43 + }
  44 + }
  45 +
  46 + public long getDuration() {
  47 + return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION)));
  48 + }
  49 +
  50 + public long getIterationInterval() {
  51 + return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL)));
  52 + }
  53 +
  54 + public int getDeviceCount() {
  55 + return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT)));
  56 + }
  57 +
  58 + public String getRestApiUrl() {
  59 + return params.getProperty("restUrl", DEFAULT_REST_URL);
  60 + }
  61 +
  62 + public String[] getMqttUrls() {
  63 + return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(",");
  64 + }
  65 +
  66 + public String getUsername() {
  67 + return params.getProperty("username", DEFAULT_USERNAME);
  68 + }
  69 +
  70 + public String getPassword() {
  71 + return params.getProperty("password", DEFAULT_PASSWORD);
  72 + }
  73 +}
... ...
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!--
  3 +
  4 + Copyright © 2016 The Thingsboard Authors
  5 +
  6 + Licensed under the Apache License, Version 2.0 (the "License");
  7 + you may not use this file except in compliance with the License.
  8 + You may obtain a copy of the License at
  9 +
  10 + http://www.apache.org/licenses/LICENSE-2.0
  11 +
  12 + Unless required by applicable law or agreed to in writing, software
  13 + distributed under the License is distributed on an "AS IS" BASIS,
  14 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + See the License for the specific language governing permissions and
  16 + limitations under the License.
  17 +
  18 +-->
  19 +<!DOCTYPE configuration>
  20 +<configuration>
  21 +
  22 + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  23 + <encoder>
  24 + <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
  25 + </encoder>
  26 + </appender>
  27 +
  28 + <logger name="org.thingsboard" level="INFO" />
  29 +
  30 + <root level="INFO">
  31 + <appender-ref ref="STDOUT"/>
  32 + </root>
  33 +
  34 +</configuration>
\ No newline at end of file
... ...
  1 +deviceCount=1000
  2 +durationMs=5000
  3 +iterationIntervalMs=250
... ...