Commit 37de5cbc17dd0d64d7d0a3e7c18cc7bde787f55d

Authored by Andrew Shvayka
Committed by GitHub
2 parents b42b99da 7d14d46e

Merge pull request #1191 from ymucahit/zookeeper-reconnect-bug-fix

Zookeeper reconnect error
@@ -30,6 +30,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; @@ -30,6 +30,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
30 import org.apache.curator.retry.RetryForever; 30 import org.apache.curator.retry.RetryForever;
31 import org.apache.curator.utils.CloseableUtils; 31 import org.apache.curator.utils.CloseableUtils;
32 import org.apache.zookeeper.CreateMode; 32 import org.apache.zookeeper.CreateMode;
  33 +import org.apache.zookeeper.KeeperException;
33 import org.springframework.beans.factory.annotation.Autowired; 34 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.beans.factory.annotation.Value; 35 import org.springframework.beans.factory.annotation.Value;
35 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 36 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -51,6 +52,8 @@ import java.util.List; @@ -51,6 +52,8 @@ import java.util.List;
51 import java.util.NoSuchElementException; 52 import java.util.NoSuchElementException;
52 import java.util.stream.Collectors; 53 import java.util.stream.Collectors;
53 54
  55 +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
  56 +
54 /** 57 /**
55 * @author Andrew Shvayka 58 * @author Andrew Shvayka
56 */ 59 */
@@ -128,19 +131,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -128,19 +131,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
128 } 131 }
129 132
130 @Override 133 @Override
131 - public void publishCurrentServer() { 134 + public synchronized void publishCurrentServer() {
  135 + ServerInstance self = this.serverInstance.getSelf();
  136 + if (currentServerExists()) {
  137 + log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
  138 + } else {
  139 + try {
  140 + log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
  141 + nodePath = client.create()
  142 + .creatingParentsIfNeeded()
  143 + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
  144 + log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
  145 + client.getConnectionStateListenable().addListener(checkReconnect(self));
  146 + } catch (Exception e) {
  147 + log.error("Failed to create ZK node", e);
  148 + throw new RuntimeException(e);
  149 + }
  150 + }
  151 + }
  152 +
  153 + private boolean currentServerExists() {
  154 + if (nodePath == null) {
  155 + return false;
  156 + }
132 try { 157 try {
133 ServerInstance self = this.serverInstance.getSelf(); 158 ServerInstance self = this.serverInstance.getSelf();
134 - log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());  
135 - nodePath = client.create()  
136 - .creatingParentsIfNeeded()  
137 - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));  
138 - log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);  
139 - client.getConnectionStateListenable().addListener(checkReconnect(self)); 159 + ServerAddress registeredServerAdress = null;
  160 + registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
  161 + if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
  162 + return true;
  163 + }
  164 + } catch (KeeperException.NoNodeException e) {
  165 + log.info("ZK node does not exist: {}", nodePath);
140 } catch (Exception e) { 166 } catch (Exception e) {
141 - log.error("Failed to create ZK node", e);  
142 - throw new RuntimeException(e); 167 + log.error("Couldn't check if ZK node exists", e);
143 } 168 }
  169 + return false;
144 } 170 }
145 171
146 private ConnectionStateListener checkReconnect(ServerInstance self) { 172 private ConnectionStateListener checkReconnect(ServerInstance self) {
@@ -218,6 +244,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -218,6 +244,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
218 log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent); 244 log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
219 return; 245 return;
220 } else if (nodePath != null && nodePath.equals(data.getPath())) { 246 } else if (nodePath != null && nodePath.equals(data.getPath())) {
  247 + if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
  248 + log.info("ZK node for current instance is somehow deleted.");
  249 + publishCurrentServer();
  250 + }
221 log.debug("Ignoring event about current server {}", pathChildrenCacheEvent); 251 log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
222 return; 252 return;
223 } 253 }