Commit 68c0cd62919bcdc6c7c3b181cdf94ad64c5653cd

Authored by Igor Kulikov
1 parent f9088dac

Improve kafka topics management. Improve black box tests.

... ... @@ -112,7 +112,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
112 112 public void init() {
113 113 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
114 114 notificationsProducerBuilder.settings(kafkaSettings);
115   - notificationsProducerBuilder.defaultTopic(notificationsTopic);
116 115 notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
117 116
118 117 notificationsProducer = notificationsProducerBuilder.build();
... ...
... ... @@ -43,8 +43,6 @@ public class RemoteTransportApiService {
43 43
44 44 @Value("${transport.remote.transport_api.requests_topic}")
45 45 private String transportApiRequestsTopic;
46   - @Value("${transport.remote.transport_api.responses_topic}")
47   - private String transportApiResponsesTopic;
48 46 @Value("${transport.remote.transport_api.max_pending_requests}")
49 47 private int maxPendingRequests;
50 48 @Value("${transport.remote.transport_api.request_timeout}")
... ... @@ -73,7 +71,6 @@ public class RemoteTransportApiService {
73 71
74 72 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
75 73 responseBuilder.settings(kafkaSettings);
76   - responseBuilder.defaultTopic(transportApiResponsesTopic);
77 74 responseBuilder.encoder(new TransportApiResponseEncoder());
78 75
79 76 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
... ...
... ... @@ -55,6 +55,8 @@ rpc:
55 55
56 56 # Clustering properties related to consistent-hashing. See architecture docs for more details.
57 57 cluster:
  58 + # Unique id for this node (autogenerated if empty)
  59 + node_id: "${CLUSTER_NODE_ID:}"
58 60 # Name of hash function used for consistent hash ring.
59 61 hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
60 62 # Amount of virtual nodes in consistent hash ring.
... ... @@ -392,7 +394,6 @@ transport:
392 394 remote:
393 395 transport_api:
394 396 requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
395   - responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
396 397 max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
397 398 request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
398 399 request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
... ...
... ... @@ -15,15 +15,17 @@
15 15 */
16 16 package org.thingsboard.server.kafka;
17 17
18   -import org.apache.kafka.clients.admin.AdminClient;
19   -import org.apache.kafka.clients.admin.CreateTopicsResult;
20   -import org.apache.kafka.clients.admin.NewTopic;
  18 +import org.apache.kafka.clients.admin.*;
21 19 import org.apache.kafka.clients.consumer.ConsumerRecords;
22 20 import org.apache.kafka.clients.consumer.KafkaConsumer;
  21 +import org.apache.kafka.common.KafkaFuture;
23 22
24 23 import java.time.Duration;
25 24 import java.util.Collections;
26 25 import java.util.Properties;
  26 +import java.util.concurrent.ExecutionException;
  27 +import java.util.concurrent.TimeUnit;
  28 +import java.util.concurrent.TimeoutException;
27 29
28 30 /**
29 31 * Created by ashvayka on 24.09.18.
... ... @@ -36,7 +38,32 @@ public class TBKafkaAdmin {
36 38 client = AdminClient.create(settings.toProps());
37 39 }
38 40
  41 + public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException {
  42 + synchronized (this) {
  43 + long timeoutExpiredMs = System.currentTimeMillis() + timeoutUnit.toMillis(timeout);
  44 + while (!topicExists(topic)) {
  45 + long waitMs = timeoutExpiredMs - System.currentTimeMillis();
  46 + if (waitMs <= 0) {
  47 + throw new TimeoutException("Timeout occurred while waiting for topic [" + topic + "] to be available!");
  48 + } else {
  49 + wait(1000);
  50 + }
  51 + }
  52 + }
  53 + }
  54 +
39 55 public CreateTopicsResult createTopic(NewTopic topic){
40 56 return client.createTopics(Collections.singletonList(topic));
41 57 }
  58 +
  59 + private boolean topicExists(String topic) throws InterruptedException {
  60 + KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic);
  61 + try {
  62 + topicDescriptionFuture.get();
  63 + return true;
  64 + } catch (ExecutionException e) {
  65 + return false;
  66 + }
  67 + }
  68 +
42 69 }
... ...
... ... @@ -20,14 +20,17 @@ import lombok.Getter;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.apache.kafka.clients.admin.CreateTopicsResult;
22 22 import org.apache.kafka.clients.admin.NewTopic;
  23 +import org.apache.kafka.clients.admin.TopicDescription;
23 24 import org.apache.kafka.clients.producer.Callback;
24 25 import org.apache.kafka.clients.producer.KafkaProducer;
25 26 import org.apache.kafka.clients.producer.ProducerConfig;
26 27 import org.apache.kafka.clients.producer.ProducerRecord;
27 28 import org.apache.kafka.clients.producer.RecordMetadata;
  29 +import org.apache.kafka.common.KafkaFuture;
28 30 import org.apache.kafka.common.PartitionInfo;
29 31 import org.apache.kafka.common.errors.TopicExistsException;
30 32 import org.apache.kafka.common.header.Header;
  33 +import org.springframework.util.StringUtils;
31 34
32 35 import java.util.List;
33 36 import java.util.Properties;
... ... @@ -35,6 +38,7 @@ import java.util.UUID;
35 38 import java.util.concurrent.ConcurrentHashMap;
36 39 import java.util.concurrent.ConcurrentMap;
37 40 import java.util.concurrent.Future;
  41 +import java.util.concurrent.TimeUnit;
38 42
39 43 /**
40 44 * Created by ashvayka on 24.09.18.
... ... @@ -71,21 +75,19 @@ public class TBKafkaProducerTemplate<T> {
71 75 }
72 76
73 77 public void init() {
74   - try {
75   - TBKafkaAdmin admin = new TBKafkaAdmin(this.settings);
76   - CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
77   - result.all().get();
78   - } catch (Exception e) {
79   - if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
80   - log.trace("[{}] Topic already exists.", defaultTopic);
81   - } else {
82   - log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
  78 + this.partitionInfoMap = new ConcurrentHashMap<>();
  79 + if (!StringUtils.isEmpty(defaultTopic)) {
  80 + try {
  81 + TBKafkaAdmin admin = new TBKafkaAdmin(this.settings);
  82 + admin.waitForTopic(defaultTopic, 30, TimeUnit.SECONDS);
  83 + log.info("[{}] Topic exists.", defaultTopic);
  84 + } catch (Exception e) {
  85 + log.info("[{}] Failed to wait for topic: {}", defaultTopic, e.getMessage(), e);
83 86 throw new RuntimeException(e);
84 87 }
  88 + //Maybe this should not be cached, but we don't plan to change size of partitions
  89 + this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
85 90 }
86   - //Maybe this should not be cached, but we don't plan to change size of partitions
87   - this.partitionInfoMap = new ConcurrentHashMap<>();
88   - this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
89 91 }
90 92
91 93 T enrich(T value, String responseTopic, UUID requestId) {
... ... @@ -105,7 +107,11 @@ public class TBKafkaProducerTemplate<T> {
105 107 }
106 108
107 109 public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
108   - return send(this.defaultTopic, key, value, timestamp, headers, callback);
  110 + if (!StringUtils.isEmpty(this.defaultTopic)) {
  111 + return send(this.defaultTopic, key, value, timestamp, headers, callback);
  112 + } else {
  113 + throw new RuntimeException("Failed to send message! Default topic is not specified!");
  114 + }
109 115 }
110 116
111 117 public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
... ...
... ... @@ -56,6 +56,7 @@ services:
56 56 max-file: "30"
57 57 environment:
58 58 TB_HOST: tb1
  59 + CLUSTER_NODE_ID: tb1
59 60 env_file:
60 61 - tb-node.env
61 62 volumes:
... ... @@ -77,6 +78,7 @@ services:
77 78 max-file: "30"
78 79 environment:
79 80 TB_HOST: tb2
  81 + CLUSTER_NODE_ID: tb2
80 82 env_file:
81 83 - tb-node.env
82 84 volumes:
... ... @@ -93,6 +95,7 @@ services:
93 95 - "1883"
94 96 environment:
95 97 TB_HOST: tb-mqtt-transport1
  98 + CLUSTER_NODE_ID: tb-mqtt-transport1
96 99 env_file:
97 100 - tb-mqtt-transport.env
98 101 volumes:
... ... @@ -107,6 +110,7 @@ services:
107 110 - "1883"
108 111 environment:
109 112 TB_HOST: tb-mqtt-transport2
  113 + CLUSTER_NODE_ID: tb-mqtt-transport2
110 114 env_file:
111 115 - tb-mqtt-transport.env
112 116 volumes:
... ... @@ -121,6 +125,7 @@ services:
121 125 - "8081"
122 126 environment:
123 127 TB_HOST: tb-http-transport1
  128 + CLUSTER_NODE_ID: tb-http-transport1
124 129 env_file:
125 130 - tb-http-transport.env
126 131 volumes:
... ... @@ -135,6 +140,7 @@ services:
135 140 - "8081"
136 141 environment:
137 142 TB_HOST: tb-http-transport2
  143 + CLUSTER_NODE_ID: tb-http-transport2
138 144 env_file:
139 145 - tb-http-transport.env
140 146 volumes:
... ... @@ -149,6 +155,7 @@ services:
149 155 - "5683:5683/udp"
150 156 environment:
151 157 TB_HOST: tb-coap-transport
  158 + CLUSTER_NODE_ID: tb-coap-transport
152 159 env_file:
153 160 - tb-coap-transport.env
154 161 volumes:
... ...
... ... @@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
4 4 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
5 5 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
6 6 KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
7   -KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
  7 +KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600
8 8 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
9 9 KAFKA_LOG_RETENTION_BYTES=1073741824
10 10 KAFKA_LOG_SEGMENT_BYTES=268435456
... ...
... ... @@ -8,3 +8,5 @@ JS_EVALUATOR=remote
8 8 TRANSPORT_TYPE=remote
9 9 CACHE_TYPE=redis
10 10 REDIS_HOST=redis
  11 +
  12 +HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false
... ...
... ... @@ -66,7 +66,7 @@ public class MqttClientTest extends AbstractContainerTest {
66 66
67 67 WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
68 68 MqttClient mqttClient = getMqttClient(deviceCredentials, null);
69   - mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
  69 + mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())).get();
70 70 WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
71 71 log.info("Received telemetry: {}", actualLatestTelemetry);
72 72 wsClient.closeBlocking();
... ... @@ -93,7 +93,7 @@ public class MqttClientTest extends AbstractContainerTest {
93 93
94 94 WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
95 95 MqttClient mqttClient = getMqttClient(deviceCredentials, null);
96   - mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
  96 + mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())).get();
97 97 WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
98 98 log.info("Received telemetry: {}", actualLatestTelemetry);
99 99 wsClient.closeBlocking();
... ... @@ -123,7 +123,7 @@ public class MqttClientTest extends AbstractContainerTest {
123 123 clientAttributes.addProperty("attr2", true);
124 124 clientAttributes.addProperty("attr3", 42.0);
125 125 clientAttributes.addProperty("attr4", 73);
126   - mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
  126 + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get();
127 127 WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
128 128 log.info("Received telemetry: {}", actualLatestTelemetry);
129 129 wsClient.closeBlocking();
... ... @@ -146,6 +146,7 @@ public class MqttClientTest extends AbstractContainerTest {
146 146 Device device = createDevice("mqtt_");
147 147 DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
148 148
  149 + WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
149 150 MqttMessageListener listener = new MqttMessageListener();
150 151 MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
151 152
... ... @@ -153,7 +154,17 @@ public class MqttClientTest extends AbstractContainerTest {
153 154 JsonObject clientAttributes = new JsonObject();
154 155 String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
155 156 clientAttributes.addProperty("clientAttr", clientAttributeValue);
156   - mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
  157 + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get();
  158 +
  159 + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
  160 + log.info("Received ws telemetry: {}", actualLatestTelemetry);
  161 + wsClient.closeBlocking();
  162 +
  163 + Assert.assertEquals(1, actualLatestTelemetry.getData().size());
  164 + Assert.assertEquals(Sets.newHashSet("clientAttr"),
  165 + actualLatestTelemetry.getLatestValues().keySet());
  166 +
  167 + Assert.assertTrue(verify(actualLatestTelemetry, "clientAttr", clientAttributeValue));
157 168
158 169 // Add a new shared attribute
159 170 JsonObject sharedAttributes = new JsonObject();
... ... @@ -166,12 +177,16 @@ public class MqttClientTest extends AbstractContainerTest {
166 177 Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
167 178
168 179 // Subscribe to attributes response
169   - mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE);
  180 + mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get();
  181 +
  182 + // Wait until subscription is processed
  183 + TimeUnit.SECONDS.sleep(3);
  184 +
170 185 // Request attributes
171 186 JsonObject request = new JsonObject();
172 187 request.addProperty("clientKeys", "clientAttr");
173 188 request.addProperty("sharedKeys", "sharedAttr");
174   - mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
  189 + mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get();
175 190 MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
176 191 AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
177 192 log.info("Received telemetry: {}", attributes);
... ... @@ -193,7 +208,10 @@ public class MqttClientTest extends AbstractContainerTest {
193 208
194 209 MqttMessageListener listener = new MqttMessageListener();
195 210 MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
196   - mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE);
  211 + mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
  212 +
  213 + // Wait until subscription is processed
  214 + TimeUnit.SECONDS.sleep(3);
197 215
198 216 String sharedAttributeName = "sharedAttr";
199 217
... ... @@ -236,7 +254,10 @@ public class MqttClientTest extends AbstractContainerTest {
236 254
237 255 MqttMessageListener listener = new MqttMessageListener();
238 256 MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
239   - mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
  257 + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
  258 +
  259 + // Wait until subscription is processed
  260 + TimeUnit.SECONDS.sleep(3);
240 261
241 262 // Send an RPC from the server
242 263 JsonObject serverRpcPayload = new JsonObject();
... ... @@ -263,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest {
263 284 JsonObject clientResponse = new JsonObject();
264 285 clientResponse.addProperty("response", "someResponse");
265 286 // Send a response to the server's RPC request
266   - mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
  287 + mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
267 288
268 289 ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
269 290 Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
... ... @@ -280,7 +301,7 @@ public class MqttClientTest extends AbstractContainerTest {
280 301
281 302 MqttMessageListener listener = new MqttMessageListener();
282 303 MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
283   - mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
  304 + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
284 305
285 306 // Get the default rule chain id to make it root again after test finished
286 307 RuleChainId defaultRuleChainId = getDefaultRuleChainId();
... ... @@ -294,7 +315,7 @@ public class MqttClientTest extends AbstractContainerTest {
294 315 clientRequest.addProperty("method", "getResponse");
295 316 clientRequest.addProperty("params", true);
296 317 Integer requestId = 42;
297   - mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
  318 + mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get();
298 319
299 320 // Check the response from the server
300 321 TimeUnit.SECONDS.sleep(1);
... ...
... ... @@ -17,7 +17,12 @@
17 17 spring.main.web-environment: false
18 18 spring.main.web-application-type: none
19 19
20   -# MQTT server parameters
  20 +# Clustering properties
  21 +cluster:
  22 + # Unique id for this node (autogenerated if empty)
  23 + node_id: "${CLUSTER_NODE_ID:}"
  24 +
  25 +# COAP server parameters
21 26 transport:
22 27 coap:
23 28 bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
... ...
... ... @@ -20,6 +20,11 @@ server:
20 20 # Server bind port
21 21 port: "${HTTP_BIND_PORT:8081}"
22 22
  23 +# Clustering properties
  24 +cluster:
  25 + # Unique id for this node (autogenerated if empty)
  26 + node_id: "${CLUSTER_NODE_ID:}"
  27 +
23 28 # HTTP server parameters
24 29 transport:
25 30 http:
... ...
... ... @@ -17,6 +17,11 @@
17 17 spring.main.web-environment: false
18 18 spring.main.web-application-type: none
19 19
  20 +# Clustering properties
  21 +cluster:
  22 + # Unique id for this node (autogenerated if empty)
  23 + node_id: "${CLUSTER_NODE_ID:}"
  24 +
20 25 # MQTT server parameters
21 26 transport:
22 27 mqtt:
... ...