Commit 5c70739e053fa6b04c21e660cdd76306f987e5bb

Authored by Igor Kulikov
Committed by GitHub
2 parents 20540b1c d0a0e48d

Merge pull request #1724 from elbstack/1686-No-response-to-client-side-RPC-via-CoAP

1686 no response to client side rpc via co ap
@@ -192,6 +192,7 @@ public class CoapTransportResource extends CoapResource { @@ -192,6 +192,7 @@ public class CoapTransportResource extends CoapResource {
192 new CoapOkCallback(exchange)); 192 new CoapOkCallback(exchange));
193 break; 193 break;
194 case TO_SERVER_RPC_REQUEST: 194 case TO_SERVER_RPC_REQUEST:
  195 + transportService.registerSyncSession(sessionInfo, new CoapSessionListener(sessionId, exchange), transportContext.getTimeout());
195 transportService.process(sessionInfo, 196 transportService.process(sessionInfo,
196 transportContext.getAdaptor().convertToServerRpcRequest(sessionId, request), 197 transportContext.getAdaptor().convertToServerRpcRequest(sessionId, request),
197 new CoapNoOpCallback(exchange)); 198 new CoapNoOpCallback(exchange));
@@ -178,15 +178,24 @@ public abstract class AbstractTransportService implements TransportService { @@ -178,15 +178,24 @@ public abstract class AbstractTransportService implements TransportService {
178 178
179 @Override 179 @Override
180 public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { 180 public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
181 - sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener));  
182 - schedulerExecutor.schedule(() -> { 181 + SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
  182 + sessions.putIfAbsent(toId(sessionInfo), currentSession);
  183 +
  184 + ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> {
183 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); 185 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
184 deregisterSession(sessionInfo); 186 deregisterSession(sessionInfo);
185 }, timeout, TimeUnit.MILLISECONDS); 187 }, timeout, TimeUnit.MILLISECONDS);
  188 +
  189 + currentSession.setScheduledFuture(executorFuture);
186 } 190 }
187 191
188 @Override 192 @Override
189 public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) { 193 public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
  194 + SessionMetaData currentSession = sessions.get(toId(sessionInfo));
  195 + if (currentSession.hasScheduledFuture()) {
  196 + log.debug("Stopping scheduler to avoid resending response if request has been ack.");
  197 + currentSession.getScheduledFuture().cancel(false);
  198 + }
190 sessions.remove(toId(sessionInfo)); 199 sessions.remove(toId(sessionInfo));
191 } 200 }
192 201
@@ -19,6 +19,8 @@ import lombok.Data; @@ -19,6 +19,8 @@ import lombok.Data;
19 import org.thingsboard.server.common.transport.SessionMsgListener; 19 import org.thingsboard.server.common.transport.SessionMsgListener;
20 import org.thingsboard.server.gen.transport.TransportProtos; 20 import org.thingsboard.server.gen.transport.TransportProtos;
21 21
  22 +import java.util.concurrent.ScheduledFuture;
  23 +
22 /** 24 /**
23 * Created by ashvayka on 15.10.18. 25 * Created by ashvayka on 15.10.18.
24 */ 26 */
@@ -29,19 +31,33 @@ class SessionMetaData { @@ -29,19 +31,33 @@ class SessionMetaData {
29 private final TransportProtos.SessionType sessionType; 31 private final TransportProtos.SessionType sessionType;
30 private final SessionMsgListener listener; 32 private final SessionMsgListener listener;
31 33
  34 + private ScheduledFuture scheduledFuture;
  35 +
32 private volatile long lastActivityTime; 36 private volatile long lastActivityTime;
33 private volatile boolean subscribedToAttributes; 37 private volatile boolean subscribedToAttributes;
34 private volatile boolean subscribedToRPC; 38 private volatile boolean subscribedToRPC;
35 39
36 - SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) { 40 + SessionMetaData(
  41 + TransportProtos.SessionInfoProto sessionInfo,
  42 + TransportProtos.SessionType sessionType,
  43 + SessionMsgListener listener
  44 + ) {
37 this.sessionInfo = sessionInfo; 45 this.sessionInfo = sessionInfo;
38 this.sessionType = sessionType; 46 this.sessionType = sessionType;
39 this.listener = listener; 47 this.listener = listener;
40 this.lastActivityTime = System.currentTimeMillis(); 48 this.lastActivityTime = System.currentTimeMillis();
  49 + this.scheduledFuture = null;
41 } 50 }
42 51
43 void updateLastActivityTime() { 52 void updateLastActivityTime() {
44 this.lastActivityTime = System.currentTimeMillis(); 53 this.lastActivityTime = System.currentTimeMillis();
45 } 54 }
46 55
  56 + void setScheduledFuture(ScheduledFuture scheduledFuture) { this.scheduledFuture = scheduledFuture; }
  57 +
  58 + public ScheduledFuture getScheduledFuture() {
  59 + return scheduledFuture;
  60 + }
  61 +
  62 + public boolean hasScheduledFuture() { return null != this.scheduledFuture; }
47 } 63 }