Commit f6b00b35db3b14d40c99f55adcb0eebc8d4289d6

Authored by Igor Kulikov
1 parent 3235e570

Improve docker compose. Improve TB shutdown logic.

@@ -21,6 +21,7 @@ import org.apache.commons.lang3.SerializationException; @@ -21,6 +21,7 @@ import org.apache.commons.lang3.SerializationException;
21 import org.apache.commons.lang3.SerializationUtils; 21 import org.apache.commons.lang3.SerializationUtils;
22 import org.apache.curator.framework.CuratorFramework; 22 import org.apache.curator.framework.CuratorFramework;
23 import org.apache.curator.framework.CuratorFrameworkFactory; 23 import org.apache.curator.framework.CuratorFrameworkFactory;
  24 +import org.apache.curator.framework.imps.CuratorFrameworkState;
24 import org.apache.curator.framework.recipes.cache.ChildData; 25 import org.apache.curator.framework.recipes.cache.ChildData;
25 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 26 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
26 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; 27 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -98,6 +99,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -98,6 +99,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
98 private PathChildrenCache cache; 99 private PathChildrenCache cache;
99 private String nodePath; 100 private String nodePath;
100 101
  102 + private volatile boolean stopped = false;
  103 +
101 @PostConstruct 104 @PostConstruct
102 public void init() { 105 public void init() {
103 log.info("Initializing..."); 106 log.info("Initializing...");
@@ -118,6 +121,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -118,6 +121,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
118 cache.start(); 121 cache.start();
119 } catch (Exception e) { 122 } catch (Exception e) {
120 log.error("Failed to connect to ZK: {}", e.getMessage(), e); 123 log.error("Failed to connect to ZK: {}", e.getMessage(), e);
  124 + CloseableUtils.closeQuietly(cache);
121 CloseableUtils.closeQuietly(client); 125 CloseableUtils.closeQuietly(client);
122 throw new RuntimeException(e); 126 throw new RuntimeException(e);
123 } 127 }
@@ -125,7 +129,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -125,7 +129,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
125 129
126 @PreDestroy 130 @PreDestroy
127 public void destroy() { 131 public void destroy() {
  132 + stopped = true;
128 unpublishCurrentServer(); 133 unpublishCurrentServer();
  134 + CloseableUtils.closeQuietly(cache);
129 CloseableUtils.closeQuietly(client); 135 CloseableUtils.closeQuietly(client);
130 log.info("Stopped discovery service"); 136 log.info("Stopped discovery service");
131 } 137 }
@@ -228,6 +234,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -228,6 +234,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
228 234
229 @Override 235 @Override
230 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { 236 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  237 + if (stopped) {
  238 + log.debug("Ignoring application ready event. Service is stopped.");
  239 + return;
  240 + }
  241 + if (client.getState() != CuratorFrameworkState.STARTED) {
  242 + log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState());
  243 + return;
  244 + }
231 publishCurrentServer(); 245 publishCurrentServer();
232 getOtherServers().forEach( 246 getOtherServers().forEach(
233 server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort()) 247 server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
@@ -236,6 +250,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -236,6 +250,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
236 250
237 @Override 251 @Override
238 public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { 252 public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
  253 + if (stopped) {
  254 + log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
  255 + return;
  256 + }
  257 + if (client.getState() != CuratorFrameworkState.STARTED) {
  258 + log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState());
  259 + return;
  260 + }
239 ChildData data = pathChildrenCacheEvent.getData(); 261 ChildData data = pathChildrenCacheEvent.getData();
240 if (data == null) { 262 if (data == null) {
241 log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent); 263 log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
@@ -18,6 +18,7 @@ package org.thingsboard.server.kafka; @@ -18,6 +18,7 @@ package org.thingsboard.server.kafka;
18 import lombok.Builder; 18 import lombok.Builder;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 import org.apache.kafka.clients.consumer.ConsumerRecords; 20 import org.apache.kafka.clients.consumer.ConsumerRecords;
  21 +import org.apache.kafka.common.errors.InterruptException;
21 import org.apache.kafka.common.header.Header; 22 import org.apache.kafka.common.header.Header;
22 import org.apache.kafka.common.header.internals.RecordHeader; 23 import org.apache.kafka.common.header.internals.RecordHeader;
23 24
@@ -127,6 +128,10 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT @@ -127,6 +128,10 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
127 log.warn("[{}] Failed to process the request: {}", requestId, request, e); 128 log.warn("[{}] Failed to process the request: {}", requestId, request, e);
128 } 129 }
129 }); 130 });
  131 + } catch (InterruptException ie) {
  132 + if (!stopped) {
  133 + log.warn("Fetching data from kafka was interrupted.", ie);
  134 + }
130 } catch (Throwable e) { 135 } catch (Throwable e) {
131 log.warn("Failed to obtain messages from queue.", e); 136 log.warn("Failed to obtain messages from queue.", e);
132 try { 137 try {
@@ -64,6 +64,7 @@ services: @@ -64,6 +64,7 @@ services:
64 depends_on: 64 depends_on:
65 - kafka 65 - kafka
66 - redis 66 - redis
  67 + - tb-js-executor
67 tb2: 68 tb2:
68 restart: always 69 restart: always
69 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" 70 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
@@ -84,6 +85,7 @@ services: @@ -84,6 +85,7 @@ services:
84 depends_on: 85 depends_on:
85 - kafka 86 - kafka
86 - redis 87 - redis
  88 + - tb-js-executor
87 tb-mqtt-transport1: 89 tb-mqtt-transport1:
88 restart: always 90 restart: always
89 image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}" 91 image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"