Commit 5613baf066dc56ee85709c10d757ce8799c4f425

Authored by Volodymyr Babak
1 parent 78a7e676

Added correct onComplete in case onError

... ... @@ -40,6 +40,7 @@ import org.thingsboard.server.gen.edge.UplinkResponseMsg;
40 40 import javax.net.ssl.SSLException;
41 41 import java.io.File;
42 42 import java.net.URISyntaxException;
  43 +import java.util.concurrent.ExecutionException;
43 44 import java.util.concurrent.TimeUnit;
44 45 import java.util.concurrent.locks.ReentrantLock;
45 46 import java.util.function.Consumer;
... ... @@ -107,7 +108,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
107 108 } else {
108 109 log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg());
109 110 try {
110   - EdgeGrpcClient.this.disconnect();
  111 + EdgeGrpcClient.this.disconnect(true);
111 112 } catch (InterruptedException e) {
112 113 log.error("[{}] Got interruption during disconnect!", edgeKey, e);
113 114 }
... ... @@ -136,7 +137,12 @@ public class EdgeGrpcClient implements EdgeRpcClient {
136 137 }
137 138
138 139 @Override
139   - public void disconnect() throws InterruptedException {
  140 + public void disconnect(boolean onError) throws InterruptedException {
  141 + if (!onError) {
  142 + try {
  143 + inputStream.onCompleted();
  144 + } catch (Exception ignored) {}
  145 + }
140 146 if (channel != null) {
141 147 channel.shutdown().awaitTermination(timeoutSecs, TimeUnit.SECONDS);
142 148 }
... ...
... ... @@ -32,7 +32,7 @@ public interface EdgeRpcClient {
32 32 Consumer<DownlinkMsg> onDownlink,
33 33 Consumer<Exception> onError);
34 34
35   - void disconnect() throws InterruptedException;
  35 + void disconnect(boolean onError) throws InterruptedException;
36 36
37 37 void sendSyncRequestMsg();
38 38
... ...