Commit 6c618a7db35ecc1a757f09869fcb16465f7de465

Authored by Andrew Shvayka
1 parent 474dc167

Fixed issue with Zookeeper reconnect

@@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData; @@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData;
24 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 24 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
25 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; 25 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
26 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; 26 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  27 +import org.apache.curator.framework.state.ConnectionState;
  28 +import org.apache.curator.framework.state.ConnectionStateListener;
27 import org.apache.curator.retry.RetryForever; 29 import org.apache.curator.retry.RetryForever;
28 import org.apache.curator.utils.CloseableUtils; 30 import org.apache.curator.utils.CloseableUtils;
29 import org.apache.zookeeper.CreateMode; 31 import org.apache.zookeeper.CreateMode;
@@ -127,12 +129,38 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -127,12 +129,38 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
127 .creatingParentsIfNeeded() 129 .creatingParentsIfNeeded()
128 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress())); 130 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
129 log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath); 131 log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
  132 + client.getConnectionStateListenable().addListener(checkReconnect(self));
130 } catch (Exception e) { 133 } catch (Exception e) {
131 log.error("Failed to create ZK node", e); 134 log.error("Failed to create ZK node", e);
132 throw new RuntimeException(e); 135 throw new RuntimeException(e);
133 } 136 }
134 } 137 }
135 138
  139 + private ConnectionStateListener checkReconnect(ServerInstance self) {
  140 + return (client, newState) -> {
  141 + log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
  142 + if (newState == ConnectionState.LOST) {
  143 + reconnect();
  144 + }
  145 + };
  146 + }
  147 +
  148 + private boolean reconnectInProgress = false;
  149 +
  150 + private synchronized void reconnect() {
  151 + if (!reconnectInProgress) {
  152 + reconnectInProgress = true;
  153 + try {
  154 + client.blockUntilConnected();
  155 + publishCurrentServer();
  156 + } catch (InterruptedException e) {
  157 + log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
  158 + } finally {
  159 + reconnectInProgress = false;
  160 + }
  161 + }
  162 + }
  163 +
136 @Override 164 @Override
137 public void unpublishCurrentServer() { 165 public void unpublishCurrentServer() {
138 try { 166 try {
@@ -156,7 +184,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -156,7 +184,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
156 .filter(cd -> !cd.getPath().equals(nodePath)) 184 .filter(cd -> !cd.getPath().equals(nodePath))
157 .map(cd -> { 185 .map(cd -> {
158 try { 186 try {
159 - return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData())); 187 + return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
160 } catch (NoSuchElementException e) { 188 } catch (NoSuchElementException e) {
161 log.error("Failed to decode ZK node", e); 189 log.error("Failed to decode ZK node", e);
162 throw new RuntimeException(e); 190 throw new RuntimeException(e);
@@ -198,7 +226,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -198,7 +226,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
198 } 226 }
199 ServerInstance instance; 227 ServerInstance instance;
200 try { 228 try {
201 - ServerAddress serverAddress = SerializationUtils.deserialize(data.getData()); 229 + ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
202 instance = new ServerInstance(serverAddress); 230 instance = new ServerInstance(serverAddress);
203 } catch (SerializationException e) { 231 } catch (SerializationException e) {
204 log.error("Failed to decode server instance for node {}", data.getPath(), e); 232 log.error("Failed to decode server instance for node {}", data.getPath(), e);