Commit 02585823fcf3e311998ef701eb88204f86dae2c7

Authored by Volodymyr Babak
1 parent 7644aa43

Properly hanlde gRPC session timeout

@@ -19,7 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; @@ -19,7 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
19 import com.google.common.io.Resources; 19 import com.google.common.io.Resources;
20 import com.google.common.util.concurrent.FutureCallback; 20 import com.google.common.util.concurrent.FutureCallback;
21 import io.grpc.Server; 21 import io.grpc.Server;
22 -import io.grpc.ServerBuilder; 22 +import io.grpc.netty.NettyServerBuilder;
23 import io.grpc.stub.StreamObserver; 23 import io.grpc.stub.StreamObserver;
24 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
25 import org.springframework.beans.factory.annotation.Autowired; 25 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,6 +49,7 @@ import java.util.Map; @@ -49,6 +49,7 @@ import java.util.Map;
49 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors; 51 import java.util.concurrent.Executors;
  52 +import java.util.concurrent.TimeUnit;
52 53
53 @Service 54 @Service
54 @Slf4j 55 @Slf4j
@@ -68,6 +69,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @@ -68,6 +69,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
68 private String privateKeyResource; 69 private String privateKeyResource;
69 @Value("${edges.state.persistToTelemetry:false}") 70 @Value("${edges.state.persistToTelemetry:false}")
70 private boolean persistToTelemetry; 71 private boolean persistToTelemetry;
  72 + @Value("${edges.rpc.client_max_keep_alive_time_sec}")
  73 + private int clientMaxKeepAliveTimeSec;
71 74
72 @Autowired 75 @Autowired
73 private EdgeContextComponent ctx; 76 private EdgeContextComponent ctx;
@@ -82,7 +85,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @@ -82,7 +85,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
82 @PostConstruct 85 @PostConstruct
83 public void init() { 86 public void init() {
84 log.info("Initializing Edge RPC service!"); 87 log.info("Initializing Edge RPC service!");
85 - ServerBuilder builder = ServerBuilder.forPort(rpcPort).addService(this); 88 + NettyServerBuilder builder = NettyServerBuilder.forPort(rpcPort)
  89 + .permitKeepAliveTime(clientMaxKeepAliveTimeSec, TimeUnit.SECONDS)
  90 + .addService(this);
86 if (sslEnabled) { 91 if (sslEnabled) {
87 try { 92 try {
88 File certFile = new File(Resources.getResource(certFileResource).toURI()); 93 File certFile = new File(Resources.getResource(certFileResource).toURI());
@@ -590,6 +590,7 @@ edges: @@ -590,6 +590,7 @@ edges:
590 rpc: 590 rpc:
591 enabled: "${EDGES_RPC_ENABLED:false}" 591 enabled: "${EDGES_RPC_ENABLED:false}"
592 port: "${EDGES_RPC_PORT:7070}" 592 port: "${EDGES_RPC_PORT:7070}"
  593 + client_max_keep_alive_time_sec: "${EDGES_RPC_CLIENT_MAX_KEEP_ALIVE_TIME_SEC:300}"
593 ssl: 594 ssl:
594 # Enable/disable SSL support 595 # Enable/disable SSL support
595 enabled: "${EDGES_RPC_SSL_ENABLED:false}" 596 enabled: "${EDGES_RPC_SSL_ENABLED:false}"
@@ -55,6 +55,8 @@ public class EdgeGrpcClient implements EdgeRpcClient { @@ -55,6 +55,8 @@ public class EdgeGrpcClient implements EdgeRpcClient {
55 private int rpcPort; 55 private int rpcPort;
56 @Value("${cloud.rpc.timeout}") 56 @Value("${cloud.rpc.timeout}")
57 private int timeoutSecs; 57 private int timeoutSecs;
  58 + @Value("${cloud.rpc.keep_alive_time_sec}")
  59 + private int keepAliveTimeSec;
58 @Value("${cloud.rpc.ssl.enabled}") 60 @Value("${cloud.rpc.ssl.enabled}")
59 private boolean sslEnabled; 61 private boolean sslEnabled;
60 @Value("${cloud.rpc.ssl.cert}") 62 @Value("${cloud.rpc.ssl.cert}")
@@ -73,7 +75,9 @@ public class EdgeGrpcClient implements EdgeRpcClient { @@ -73,7 +75,9 @@ public class EdgeGrpcClient implements EdgeRpcClient {
73 Consumer<EdgeConfiguration> onEdgeUpdate, 75 Consumer<EdgeConfiguration> onEdgeUpdate,
74 Consumer<DownlinkMsg> onDownlink, 76 Consumer<DownlinkMsg> onDownlink,
75 Consumer<Exception> onError) { 77 Consumer<Exception> onError) {
76 - NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext(); 78 + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort)
  79 + .keepAliveTime(keepAliveTimeSec, TimeUnit.SECONDS)
  80 + .usePlaintext();
77 if (sslEnabled) { 81 if (sslEnabled) {
78 try { 82 try {
79 builder.sslContext(GrpcSslContexts.forClient().trustManager(new File(Resources.getResource(certResource).toURI())).build()); 83 builder.sslContext(GrpcSslContexts.forClient().trustManager(new File(Resources.getResource(certResource).toURI())).build());