Commit c4be98576a91211532ddb6710849a17f17661374

Authored by Igor Kulikov
1 parent 5206a0e4

Improve zookeeper client reconnect logic. UI: Flot widget - Fixed incorrect indi…

…vidual tooltip content.
... ... @@ -52,6 +52,8 @@ import javax.annotation.PostConstruct;
52 52 import javax.annotation.PreDestroy;
53 53 import java.util.List;
54 54 import java.util.NoSuchElementException;
  55 +import java.util.concurrent.ExecutorService;
  56 +import java.util.concurrent.Executors;
55 57 import java.util.stream.Collectors;
56 58
57 59 import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
... ... @@ -96,11 +98,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
96 98 @Lazy
97 99 private ClusterRoutingService routingService;
98 100
  101 + private ExecutorService reconnectExecutorService;
  102 +
99 103 private CuratorFramework client;
100 104 private PathChildrenCache cache;
101 105 private String nodePath;
102 106
103   - private volatile boolean stopped = false;
  107 + private volatile boolean stopped = true;
104 108
105 109 @PostConstruct
106 110 public void init() {
... ... @@ -110,9 +114,15 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
110 114 Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
111 115 Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));
112 116
  117 + reconnectExecutorService = Executors.newSingleThreadExecutor();
  118 +
113 119 log.info("Initializing discovery service using ZK connect string: {}", zkUrl);
114 120
115 121 zkNodesDir = zkDir + "/nodes";
  122 + initZkClient();
  123 + }
  124 +
  125 + private void initZkClient() {
116 126 try {
117 127 client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval));
118 128 client.start();
... ... @@ -120,6 +130,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
120 130 cache = new PathChildrenCache(client, zkNodesDir, true);
121 131 cache.getListenable().addListener(this);
122 132 cache.start();
  133 + stopped = false;
  134 + log.info("ZK client connected");
123 135 } catch (Exception e) {
124 136 log.error("Failed to connect to ZK: {}", e.getMessage(), e);
125 137 CloseableUtils.closeQuietly(cache);
... ... @@ -128,12 +140,20 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
128 140 }
129 141 }
130 142
131   - @PreDestroy
132   - public void destroy() {
  143 + private void destroyZkClient() {
133 144 stopped = true;
134   - unpublishCurrentServer();
  145 + try {
  146 + unpublishCurrentServer();
  147 + } catch (Exception e) {}
135 148 CloseableUtils.closeQuietly(cache);
136 149 CloseableUtils.closeQuietly(client);
  150 + log.info("ZK client disconnected");
  151 + }
  152 +
  153 + @PreDestroy
  154 + public void destroy() {
  155 + destroyZkClient();
  156 + reconnectExecutorService.shutdownNow();
137 157 log.info("Stopped discovery service");
138 158 }
139 159
... ... @@ -180,20 +200,21 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
180 200 return (client, newState) -> {
181 201 log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
182 202 if (newState == ConnectionState.LOST) {
183   - reconnect();
  203 + reconnectExecutorService.submit(this::reconnect);
184 204 }
185 205 };
186 206 }
187 207
188   - private boolean reconnectInProgress = false;
  208 + private volatile boolean reconnectInProgress = false;
189 209
190 210 private synchronized void reconnect() {
191 211 if (!reconnectInProgress) {
192 212 reconnectInProgress = true;
193 213 try {
194   - client.blockUntilConnected();
  214 + destroyZkClient();
  215 + initZkClient();
195 216 publishCurrentServer();
196   - } catch (InterruptedException e) {
  217 + } catch (Exception e) {
197 218 log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
198 219 } finally {
199 220 reconnectInProgress = false;
... ...
... ... @@ -50,7 +50,7 @@
50 50 <commons-validator.version>1.5.0</commons-validator.version>
51 51 <commons-io.version>2.5</commons-io.version>
52 52 <commons-csv.version>1.4</commons-csv.version>
53   - <jackson.version>2.9.7</jackson.version>
  53 + <jackson.version>2.9.8</jackson.version>
54 54 <json-schema-validator.version>2.2.6</json-schema-validator.version>
55 55 <scala.version>2.11</scala.version>
56 56 <akka.version>2.4.2</akka.version>
... ... @@ -59,7 +59,7 @@
59 59 <velocity.version>1.7</velocity.version>
60 60 <velocity-tools.version>2.0</velocity-tools.version>
61 61 <mail.version>1.4.3</mail.version>
62   - <curator.version>4.0.1</curator.version>
  62 + <curator.version>4.2.0</curator.version>
63 63 <protobuf.version>3.6.1</protobuf.version>
64 64 <grpc.version>1.16.1</grpc.version>
65 65 <lombok.version>1.16.18</lombok.version>
... ...
... ... @@ -137,9 +137,11 @@ export default class TbFlot {
137 137 });
138 138 content += dateDiv.prop('outerHTML');
139 139 if (tbFlot.ctx.tooltipIndividual) {
140   - var seriesHoverInfo = hoverInfo.seriesHover[seriesIndex];
141   - if (seriesHoverInfo) {
142   - content += seriesInfoDivFromInfo(seriesHoverInfo, seriesIndex);
  140 + var found = hoverInfo.seriesHover.filter((seriesHover) => {
  141 + return seriesHover.index === seriesIndex;
  142 + });
  143 + if (found && found.length) {
  144 + content += seriesInfoDivFromInfo(found[0], seriesIndex);
143 145 }
144 146 } else {
145 147 var seriesDiv = $('<div></div>');
... ... @@ -161,7 +163,7 @@ export default class TbFlot {
161 163 if (i == hoverInfo.seriesHover.length) {
162 164 break;
163 165 }
164   - seriesHoverInfo = hoverInfo.seriesHover[i];
  166 + var seriesHoverInfo = hoverInfo.seriesHover[i];
165 167 columnContent += seriesInfoDivFromInfo(seriesHoverInfo, seriesIndex);
166 168 }
167 169 columnDiv.html(columnContent);
... ...