Commit 7d14d46edf693a25a933cc57fe1829bbce090730

Authored by Yusuf Mücahit Çetinkaya
1 parent 3aa3a56b

check and recover currentServer zNode on zk.

@@ -29,6 +29,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; @@ -29,6 +29,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
29 import org.apache.curator.retry.RetryForever; 29 import org.apache.curator.retry.RetryForever;
30 import org.apache.curator.utils.CloseableUtils; 30 import org.apache.curator.utils.CloseableUtils;
31 import org.apache.zookeeper.CreateMode; 31 import org.apache.zookeeper.CreateMode;
  32 +import org.apache.zookeeper.KeeperException;
32 import org.springframework.beans.factory.annotation.Autowired; 33 import org.springframework.beans.factory.annotation.Autowired;
33 import org.springframework.beans.factory.annotation.Value; 34 import org.springframework.beans.factory.annotation.Value;
34 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 35 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -49,6 +50,8 @@ import java.util.NoSuchElementException; @@ -49,6 +50,8 @@ import java.util.NoSuchElementException;
49 import java.util.concurrent.CopyOnWriteArrayList; 50 import java.util.concurrent.CopyOnWriteArrayList;
50 import java.util.stream.Collectors; 51 import java.util.stream.Collectors;
51 52
  53 +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
  54 +
52 /** 55 /**
53 * @author Andrew Shvayka 56 * @author Andrew Shvayka
54 */ 57 */
@@ -121,19 +124,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -121,19 +124,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
121 } 124 }
122 125
123 @Override 126 @Override
124 - public void publishCurrentServer() { 127 + public synchronized void publishCurrentServer() {
  128 + ServerInstance self = this.serverInstance.getSelf();
  129 + if (currentServerExists()) {
  130 + log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
  131 + } else {
  132 + try {
  133 + log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
  134 + nodePath = client.create()
  135 + .creatingParentsIfNeeded()
  136 + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
  137 + log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
  138 + client.getConnectionStateListenable().addListener(checkReconnect(self));
  139 + } catch (Exception e) {
  140 + log.error("Failed to create ZK node", e);
  141 + throw new RuntimeException(e);
  142 + }
  143 + }
  144 + }
  145 +
  146 + private boolean currentServerExists() {
  147 + if (nodePath == null) {
  148 + return false;
  149 + }
125 try { 150 try {
126 ServerInstance self = this.serverInstance.getSelf(); 151 ServerInstance self = this.serverInstance.getSelf();
127 - log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());  
128 - nodePath = client.create()  
129 - .creatingParentsIfNeeded()  
130 - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));  
131 - log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);  
132 - client.getConnectionStateListenable().addListener(checkReconnect(self)); 152 + ServerAddress registeredServerAdress = null;
  153 + registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
  154 + if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
  155 + return true;
  156 + }
  157 + } catch (KeeperException.NoNodeException e) {
  158 + log.info("ZK node does not exist: {}", nodePath);
133 } catch (Exception e) { 159 } catch (Exception e) {
134 - log.error("Failed to create ZK node", e);  
135 - throw new RuntimeException(e); 160 + log.error("Couldn't check if ZK node exists", e);
136 } 161 }
  162 + return false;
137 } 163 }
138 164
139 private ConnectionStateListener checkReconnect(ServerInstance self) { 165 private ConnectionStateListener checkReconnect(ServerInstance self) {
@@ -221,6 +247,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @@ -221,6 +247,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
221 log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent); 247 log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
222 return; 248 return;
223 } else if (nodePath != null && nodePath.equals(data.getPath())) { 249 } else if (nodePath != null && nodePath.equals(data.getPath())) {
  250 + if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
  251 + log.info("ZK node for current instance is somehow deleted.");
  252 + publishCurrentServer();
  253 + }
224 log.debug("Ignoring event about current server {}", pathChildrenCacheEvent); 254 log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
225 return; 255 return;
226 } 256 }