Commit 3d3cb1f16cde9a074675a03aad7f4d52afdca842

Authored by Andrew Shvayka
Committed by GitHub
2 parents 66d4c3fd f1355172

Merge pull request #4770 from ShvaykaD/fix/coap-transport/obesrve-sessions

CoAP: Updated observe sessions closing logic
@@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap;
65 import java.util.concurrent.ConcurrentMap; 65 import java.util.concurrent.ConcurrentMap;
66 import java.util.concurrent.TimeUnit; 66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.atomic.AtomicInteger; 67 import java.util.concurrent.atomic.AtomicInteger;
  68 +import java.util.stream.Collectors;
68 69
69 @Slf4j 70 @Slf4j
70 public class CoapTransportResource extends AbstractCoapTransportResource { 71 public class CoapTransportResource extends AbstractCoapTransportResource {
@@ -76,9 +77,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -76,9 +77,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
76 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; 77 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4;
77 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; 78 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID";
78 79
79 - private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionInfoMap = new ConcurrentHashMap<>();  
80 - private final ConcurrentMap<String, AtomicInteger> tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>();  
81 - private final ConcurrentMap<TransportProtos.SessionInfoProto, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); 80 + private final ConcurrentMap<String, CoapObserveSessionInfo> tokenToCoapSessionInfoMap = new ConcurrentHashMap<>();
  81 + private final ConcurrentMap<CoapObserveSessionInfo, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
82 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet(); 82 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
83 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet(); 83 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
84 84
@@ -94,7 +94,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -94,7 +94,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
94 this.timeout = coapServerService.getTimeout(); 94 this.timeout = coapServerService.getTimeout();
95 this.sessionReportTimeout = ctx.getSessionReportTimeout(); 95 this.sessionReportTimeout = ctx.getSessionReportTimeout();
96 ctx.getScheduler().scheduleAtFixedRate(() -> { 96 ctx.getScheduler().scheduleAtFixedRate(() -> {
97 - Set<TransportProtos.SessionInfoProto> observeSessions = sessionInfoToObserveRelationMap.keySet(); 97 + Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet();
  98 + Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos
  99 + .stream()
  100 + .map(CoapObserveSessionInfo::getSessionInfoProto)
  101 + .collect(Collectors.toSet());
98 observeSessions.forEach(this::reportActivity); 102 observeSessions.forEach(this::reportActivity);
99 }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); 103 }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
100 } 104 }
@@ -112,17 +116,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -112,17 +116,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
112 relation.setEstablished(); 116 relation.setEstablished();
113 addObserveRelation(relation); 117 addObserveRelation(relation);
114 } 118 }
115 - AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0));  
116 - response.getOptions().setObserve(notificationCounter.getAndIncrement()); 119 + AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter();
  120 + response.getOptions().setObserve(observeNotificationCounter.getAndIncrement());
117 } // ObserveLayer takes care of the else case 121 } // ObserveLayer takes care of the else case
118 } 122 }
119 123
120 - public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { 124 + private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) {
121 relation.cancel(); 125 relation.cancel();
122 relation.getExchange().sendResponse(new Response(code)); 126 relation.getExchange().sendResponse(new Response(code));
123 } 127 }
124 128
125 - public Map<TransportProtos.SessionInfoProto, ObserveRelation> getSessionInfoToObserveRelationMap() { 129 + private Map<CoapObserveSessionInfo, ObserveRelation> getCoapSessionInfoToObserveRelationMap() {
126 return sessionInfoToObserveRelationMap; 130 return sessionInfoToObserveRelationMap;
127 } 131 }
128 132
@@ -278,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -278,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
278 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); 282 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
279 break; 283 break;
280 case SUBSCRIBE_ATTRIBUTES_REQUEST: 284 case SUBSCRIBE_ATTRIBUTES_REQUEST:
281 - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));  
282 - if (currentAttrSession == null) { 285 + CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  286 + if (currentCoapObserveAttrSessionInfo == null) {
283 attributeSubscriptions.add(sessionId); 287 attributeSubscriptions.add(sessionId);
284 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, 288 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
285 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 289 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
@@ -291,20 +295,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -291,20 +295,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
291 } 295 }
292 break; 296 break;
293 case UNSUBSCRIBE_ATTRIBUTES_REQUEST: 297 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
294 - TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request));  
295 - if (attrSession != null) { 298 + CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  299 + if (coapObserveAttrSessionInfo != null) {
  300 + TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto();
296 UUID attrSessionId = toSessionId(attrSession); 301 UUID attrSessionId = toSessionId(attrSession);
297 attributeSubscriptions.remove(attrSessionId); 302 attributeSubscriptions.remove(attrSessionId);
298 - sessionInfoToObserveRelationMap.remove(attrSession);  
299 transportService.process(attrSession, 303 transportService.process(attrSession,
300 TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), 304 TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
301 - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));  
302 - closeAndDeregister(sessionInfo); 305 + new CoapNoOpCallback(exchange));
303 } 306 }
  307 + closeAndDeregister(sessionInfo);
304 break; 308 break;
305 case SUBSCRIBE_RPC_COMMANDS_REQUEST: 309 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
306 - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));  
307 - if (currentRpcSession == null) { 310 + CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  311 + if (currentCoapObserveRpcSessionInfo == null) {
308 rpcSubscriptions.add(sessionId); 312 rpcSubscriptions.add(sessionId);
309 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, 313 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
310 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 314 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
@@ -315,16 +319,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -315,16 +319,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
315 } 319 }
316 break; 320 break;
317 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: 321 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
318 - TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request));  
319 - if (rpcSession != null) { 322 + CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  323 + if (coapObserveRpcSessionInfo != null) {
  324 + TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto();
320 UUID rpcSessionId = toSessionId(rpcSession); 325 UUID rpcSessionId = toSessionId(rpcSession);
321 rpcSubscriptions.remove(rpcSessionId); 326 rpcSubscriptions.remove(rpcSessionId);
322 - sessionInfoToObserveRelationMap.remove(rpcSession);  
323 transportService.process(rpcSession, 327 transportService.process(rpcSession,
324 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), 328 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
325 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); 329 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
326 - closeAndDeregister(sessionInfo);  
327 } 330 }
  331 + closeAndDeregister(sessionInfo);
328 break; 332 break;
329 case TO_DEVICE_RPC_RESPONSE: 333 case TO_DEVICE_RPC_RESPONSE:
330 transportService.process(sessionInfo, 334 transportService.process(sessionInfo,
@@ -356,13 +360,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -356,13 +360,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
356 return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); 360 return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
357 } 361 }
358 362
359 - private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) {  
360 - tokenToObserveNotificationSeqMap.remove(token);  
361 - return tokenToSessionInfoMap.remove(token); 363 + private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) {
  364 + return tokenToCoapSessionInfoMap.remove(token);
362 } 365 }
363 366
364 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { 367 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
365 - tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); 368 + tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo));
366 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); 369 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
367 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 370 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
368 } 371 }
@@ -477,45 +480,36 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -477,45 +480,36 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
477 } 480 }
478 481
479 @Override 482 @Override
480 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg msg) { 483 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
  484 + log.trace("[{}] Received attributes update notification to device", sessionId);
481 try { 485 try {
482 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); 486 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
483 } catch (AdaptorException e) { 487 } catch (AdaptorException e) {
484 log.trace("Failed to reply due to error", e); 488 log.trace("Failed to reply due to error", e);
485 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 489 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  490 + closeAndDeregister();
486 } 491 }
487 } 492 }
488 493
489 @Override 494 @Override
490 public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { 495 public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
491 log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); 496 log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
492 - Map<TransportProtos.SessionInfoProto, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap();  
493 - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {  
494 - Set<TransportProtos.SessionInfoProto> observeSessions = sessionToObserveRelationMap.keySet();  
495 - Optional<TransportProtos.SessionInfoProto> observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> {  
496 - UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());  
497 - return observeSessionId.equals(sessionId);  
498 - }).findFirst();  
499 - if (observeSessionToClose.isPresent()) {  
500 - TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get();  
501 - ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto);  
502 - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE);  
503 - }  
504 - } 497 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE);
  498 + closeAndDeregister();
505 } 499 }
506 500
507 @Override 501 @Override
508 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {  
509 - boolean successful; 502 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
  503 + log.trace("[{}] Received RPC command to device", sessionId);
  504 + boolean successful = true;
510 try { 505 try {
511 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); 506 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
512 - successful = true;  
513 } catch (AdaptorException e) { 507 } catch (AdaptorException e) {
514 log.trace("Failed to reply due to error", e); 508 log.trace("Failed to reply due to error", e);
515 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 509 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
516 successful = false; 510 successful = false;
517 - }  
518 - if (msg.getPersisted()) { 511 + } finally {
  512 + if (msg.getPersisted()) {
519 RpcStatus status; 513 RpcStatus status;
520 if (!successful) { 514 if (!successful) {
521 status = RpcStatus.FAILED; 515 status = RpcStatus.FAILED;
@@ -531,6 +525,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -531,6 +525,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
531 .setStatus(status.name()) 525 .setStatus(status.name())
532 .build(); 526 .build();
533 coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); 527 coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
  528 + }
  529 + if (!successful) {
  530 + closeAndDeregister();
  531 + }
534 } 532 }
535 } 533 }
536 534
@@ -547,6 +545,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -547,6 +545,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
547 private boolean isConRequest() { 545 private boolean isConRequest() {
548 return exchange.advanced().getRequest().isConfirmable(); 546 return exchange.advanced().getRequest().isConfirmable();
549 } 547 }
  548 +
  549 + private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) {
  550 + Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap();
  551 + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
  552 + Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> {
  553 + TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto();
  554 + UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB());
  555 + return observeSessionId.equals(sessionId);
  556 + }).findFirst();
  557 + if (observeSessionToClose.isPresent()) {
  558 + CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get();
  559 + ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo);
  560 + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode);
  561 + }
  562 + }
  563 + }
  564 +
  565 + private void closeAndDeregister() {
  566 + Request request = exchange.advanced().getRequest();
  567 + String token = coapTransportResource.getTokenFromRequest(request);
  568 + CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token);
  569 + coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto());
  570 + }
  571 +
550 } 572 }
551 573
552 public class CoapResourceObserver implements ResourceObserver { 574 public class CoapResourceObserver implements ResourceObserver {
@@ -571,7 +593,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -571,7 +593,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
571 public void addedObserveRelation(ObserveRelation relation) { 593 public void addedObserveRelation(ObserveRelation relation) {
572 Request request = relation.getExchange().getRequest(); 594 Request request = relation.getExchange().getRequest();
573 String token = getTokenFromRequest(request); 595 String token = getTokenFromRequest(request);
574 - sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); 596 + sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation);
575 log.trace("Added Observe relation for token: {}", token); 597 log.trace("Added Observe relation for token: {}", token);
576 } 598 }
577 599
@@ -579,8 +601,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -579,8 +601,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
579 public void removedObserveRelation(ObserveRelation relation) { 601 public void removedObserveRelation(ObserveRelation relation) {
580 Request request = relation.getExchange().getRequest(); 602 Request request = relation.getExchange().getRequest();
581 String token = getTokenFromRequest(request); 603 String token = getTokenFromRequest(request);
582 - TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token);  
583 - sessionInfoToObserveRelationMap.remove(session); 604 + sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token));
584 log.trace("Relation removed for token: {}", token); 605 log.trace("Relation removed for token: {}", token);
585 } 606 }
586 } 607 }
@@ -591,7 +612,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -591,7 +612,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
591 transportService.deregisterSession(session); 612 transportService.deregisterSession(session);
592 rpcSubscriptions.remove(sessionId); 613 rpcSubscriptions.remove(sessionId);
593 attributeSubscriptions.remove(sessionId); 614 attributeSubscriptions.remove(sessionId);
594 - sessionInfoToObserveRelationMap.remove(session);  
595 } 615 }
596 616
597 private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { 617 private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException {
@@ -657,4 +677,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -657,4 +677,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
657 this.jsonPayload = jsonPayload; 677 this.jsonPayload = jsonPayload;
658 } 678 }
659 } 679 }
  680 +
  681 + @Data
  682 + private static class CoapObserveSessionInfo {
  683 +
  684 + private final TransportProtos.SessionInfoProto sessionInfoProto;
  685 + private final AtomicInteger observeNotificationCounter;
  686 +
  687 + private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) {
  688 + this.sessionInfoProto = sessionInfoProto;
  689 + this.observeNotificationCounter = new AtomicInteger(0);
  690 + }
  691 + }
  692 +
660 } 693 }
@@ -393,7 +393,8 @@ public class DeviceApiController implements TbTransportService { @@ -393,7 +393,8 @@ public class DeviceApiController implements TbTransportService {
393 } 393 }
394 394
395 @Override 395 @Override
396 - public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) { 396 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg msg) {
  397 + log.trace("[{}] Received attributes update notification to device", sessionId);
397 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); 398 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
398 } 399 }
399 400
@@ -404,7 +405,8 @@ public class DeviceApiController implements TbTransportService { @@ -404,7 +405,8 @@ public class DeviceApiController implements TbTransportService {
404 } 405 }
405 406
406 @Override 407 @Override
407 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { 408 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
  409 + log.trace("[{}] Received RPC command to device", sessionId);
408 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); 410 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
409 if (msg.getPersisted()) { 411 if (msg.getPersisted()) {
410 RpcStatus status; 412 RpcStatus status;
@@ -56,9 +56,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -56,9 +56,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
56 } 56 }
57 57
58 @Override 58 @Override
59 - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 59 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  60 + log.trace("[{}] Received attributes update notification to device", sessionId);
60 this.attributesService.onAttributesUpdate(attributeUpdateNotification, this.sessionInfo); 61 this.attributesService.onAttributesUpdate(attributeUpdateNotification, this.sessionInfo);
61 - } 62 + }
62 63
63 @Override 64 @Override
64 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) { 65 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) {
@@ -81,7 +82,8 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -81,7 +82,8 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
81 } 82 }
82 83
83 @Override 84 @Override
84 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 85 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  86 + log.trace("[{}] Received RPC command to device", sessionId);
85 this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo); 87 this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
86 if (toDeviceRequest.getPersisted()) { 88 if (toDeviceRequest.getPersisted()) {
87 RpcStatus status; 89 RpcStatus status;
@@ -797,7 +797,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -797,7 +797,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
797 } 797 }
798 798
799 @Override 799 @Override
800 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) { 800 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  801 + log.trace("[{}] Received attributes update notification to device", sessionId);
801 try { 802 try {
802 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); 803 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
803 } catch (Exception e) { 804 } catch (Exception e) {
@@ -812,7 +813,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -812,7 +813,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
812 } 813 }
813 814
814 @Override 815 @Override
815 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { 816 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
816 log.trace("[{}] Received RPC command to device", sessionId); 817 log.trace("[{}] Received RPC command to device", sessionId);
817 try { 818 try {
818 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest) 819 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)
@@ -85,7 +85,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -85,7 +85,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
85 } 85 }
86 86
87 @Override 87 @Override
88 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) { 88 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  89 + log.trace("[{}] Received attributes update notification to device", sessionId);
89 try { 90 try {
90 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); 91 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
91 } catch (Exception e) { 92 } catch (Exception e) {
@@ -94,7 +95,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -94,7 +95,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
94 } 95 }
95 96
96 @Override 97 @Override
97 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { 98 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
  99 + log.trace("[{}] Received RPC command to device", sessionId);
98 try { 100 try {
99 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( 101 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
100 payload -> { 102 payload -> {
@@ -129,7 +129,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -129,7 +129,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
129 } 129 }
130 130
131 @Override 131 @Override
132 - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 132 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  133 + log.trace("[{}] Received attributes update notification to device", sessionId);
133 snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification); 134 snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
134 } 135 }
135 136
@@ -139,7 +140,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -139,7 +140,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
139 } 140 }
140 141
141 @Override 142 @Override
142 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 143 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  144 + log.trace("[{}] Received RPC command to device", sessionId);
143 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); 145 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
144 if (toDeviceRequest.getPersisted()) { 146 if (toDeviceRequest.getPersisted()) {
145 RpcStatus status; 147 RpcStatus status;
@@ -36,11 +36,11 @@ public interface SessionMsgListener { @@ -36,11 +36,11 @@ public interface SessionMsgListener {
36 36
37 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); 37 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
38 38
39 - void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification); 39 + void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification);
40 40
41 void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification); 41 void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification);
42 42
43 - void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest); 43 + void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest);
44 44
45 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); 45 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
46 46
@@ -20,8 +20,6 @@ import org.thingsboard.server.common.data.DeviceTransportType; @@ -20,8 +20,6 @@ import org.thingsboard.server.common.data.DeviceTransportType;
20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; 20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22 import org.thingsboard.server.common.transport.service.SessionMetaData; 22 import org.thingsboard.server.common.transport.service.SessionMetaData;
23 -import org.thingsboard.server.gen.transport.TransportProtos;  
24 -import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;  
25 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; 23 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg;
@@ -30,9 +28,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceRequestMsg; @@ -30,9 +28,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceRequestMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceResponseMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceResponseMsg;
31 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
32 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; 30 import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
  31 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
33 import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageRequestMsg; 32 import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageRequestMsg;
34 import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageResponseMsg; 33 import org.thingsboard.server.gen.transport.TransportProtos.GetOtaPackageResponseMsg;
35 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;  
36 import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg; 34 import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg;
37 import org.thingsboard.server.gen.transport.TransportProtos.GetResourceResponseMsg; 35 import org.thingsboard.server.gen.transport.TransportProtos.GetResourceResponseMsg;
38 import org.thingsboard.server.gen.transport.TransportProtos.GetSnmpDevicesRequestMsg; 36 import org.thingsboard.server.gen.transport.TransportProtos.GetSnmpDevicesRequestMsg;
@@ -48,10 +46,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; @@ -48,10 +46,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
48 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; 46 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
49 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; 47 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
50 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; 48 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
  49 +import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;
51 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; 50 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
52 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; 51 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
  52 +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
53 import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg; 53 import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg;
54 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;  
55 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MCredentialsRequestMsg; 54 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MCredentialsRequestMsg;
56 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; 55 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
57 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 56 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
@@ -110,7 +109,7 @@ public interface TransportService { @@ -110,7 +109,7 @@ public interface TransportService {
110 109
111 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback); 110 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
112 111
113 - void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback); 112 + void process(SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback);
114 113
115 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback); 114 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
116 115
@@ -752,7 +752,7 @@ public class DefaultTransportService implements TransportService { @@ -752,7 +752,7 @@ public class DefaultTransportService implements TransportService {
752 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); 752 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
753 } 753 }
754 if (toSessionMsg.hasAttributeUpdateNotification()) { 754 if (toSessionMsg.hasAttributeUpdateNotification()) {
755 - listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); 755 + listener.onAttributeUpdate(sessionId, toSessionMsg.getAttributeUpdateNotification());
756 } 756 }
757 if (toSessionMsg.hasSessionCloseNotification()) { 757 if (toSessionMsg.hasSessionCloseNotification()) {
758 listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification()); 758 listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification());
@@ -761,7 +761,7 @@ public class DefaultTransportService implements TransportService { @@ -761,7 +761,7 @@ public class DefaultTransportService implements TransportService {
761 listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification()); 761 listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification());
762 } 762 }
763 if (toSessionMsg.hasToDeviceRequest()) { 763 if (toSessionMsg.hasToDeviceRequest()) {
764 - listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); 764 + listener.onToDeviceRpcRequest(sessionId, toSessionMsg.getToDeviceRequest());
765 } 765 }
766 if (toSessionMsg.hasToServerResponse()) { 766 if (toSessionMsg.hasToServerResponse()) {
767 String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId(); 767 String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();