Commit d3e16ad65009f7c80869dae628bf3aba47685071

Authored by ShvaykaD
1 parent e3c523cc

updated observe sessions closing actions

@@ -64,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -64,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap;
64 import java.util.concurrent.ConcurrentMap; 64 import java.util.concurrent.ConcurrentMap;
65 import java.util.concurrent.TimeUnit; 65 import java.util.concurrent.TimeUnit;
66 import java.util.concurrent.atomic.AtomicInteger; 66 import java.util.concurrent.atomic.AtomicInteger;
  67 +import java.util.stream.Collectors;
67 68
68 @Slf4j 69 @Slf4j
69 public class CoapTransportResource extends AbstractCoapTransportResource { 70 public class CoapTransportResource extends AbstractCoapTransportResource {
@@ -75,9 +76,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -75,9 +76,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
75 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; 76 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4;
76 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; 77 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID";
77 78
78 - private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionInfoMap = new ConcurrentHashMap<>();  
79 - private final ConcurrentMap<String, AtomicInteger> tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>();  
80 - private final ConcurrentMap<TransportProtos.SessionInfoProto, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); 79 + private final ConcurrentMap<String, CoapObserveSessionInfo> tokenToCoapSessionInfoMap = new ConcurrentHashMap<>();
  80 + private final ConcurrentMap<CoapObserveSessionInfo, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
81 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet(); 81 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
82 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet(); 82 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
83 83
@@ -93,7 +93,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -93,7 +93,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
93 this.timeout = coapServerService.getTimeout(); 93 this.timeout = coapServerService.getTimeout();
94 this.sessionReportTimeout = ctx.getSessionReportTimeout(); 94 this.sessionReportTimeout = ctx.getSessionReportTimeout();
95 ctx.getScheduler().scheduleAtFixedRate(() -> { 95 ctx.getScheduler().scheduleAtFixedRate(() -> {
96 - Set<TransportProtos.SessionInfoProto> observeSessions = sessionInfoToObserveRelationMap.keySet(); 96 + Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet();
  97 + Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos
  98 + .stream()
  99 + .map(CoapObserveSessionInfo::getSessionInfoProto)
  100 + .collect(Collectors.toSet());
97 observeSessions.forEach(this::reportActivity); 101 observeSessions.forEach(this::reportActivity);
98 }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); 102 }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
99 } 103 }
@@ -111,17 +115,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -111,17 +115,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
111 relation.setEstablished(); 115 relation.setEstablished();
112 addObserveRelation(relation); 116 addObserveRelation(relation);
113 } 117 }
114 - AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0));  
115 - response.getOptions().setObserve(notificationCounter.getAndIncrement()); 118 + AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter();
  119 + response.getOptions().setObserve(observeNotificationCounter.getAndIncrement());
116 } // ObserveLayer takes care of the else case 120 } // ObserveLayer takes care of the else case
117 } 121 }
118 122
119 - public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { 123 + private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) {
120 relation.cancel(); 124 relation.cancel();
121 relation.getExchange().sendResponse(new Response(code)); 125 relation.getExchange().sendResponse(new Response(code));
122 } 126 }
123 127
124 - public Map<TransportProtos.SessionInfoProto, ObserveRelation> getSessionInfoToObserveRelationMap() { 128 + private Map<CoapObserveSessionInfo, ObserveRelation> getCoapSessionInfoToObserveRelationMap() {
125 return sessionInfoToObserveRelationMap; 129 return sessionInfoToObserveRelationMap;
126 } 130 }
127 131
@@ -277,8 +281,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -277,8 +281,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
277 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); 281 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
278 break; 282 break;
279 case SUBSCRIBE_ATTRIBUTES_REQUEST: 283 case SUBSCRIBE_ATTRIBUTES_REQUEST:
280 - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));  
281 - if (currentAttrSession == null) { 284 + CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  285 + if (currentCoapObserveAttrSessionInfo == null) {
282 attributeSubscriptions.add(sessionId); 286 attributeSubscriptions.add(sessionId);
283 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, 287 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
284 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 288 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
@@ -290,20 +294,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -290,20 +294,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
290 } 294 }
291 break; 295 break;
292 case UNSUBSCRIBE_ATTRIBUTES_REQUEST: 296 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
293 - TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request));  
294 - if (attrSession != null) { 297 + CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  298 + if (coapObserveAttrSessionInfo != null) {
  299 + TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto();
295 UUID attrSessionId = toSessionId(attrSession); 300 UUID attrSessionId = toSessionId(attrSession);
296 attributeSubscriptions.remove(attrSessionId); 301 attributeSubscriptions.remove(attrSessionId);
297 - sessionInfoToObserveRelationMap.remove(attrSession);  
298 transportService.process(attrSession, 302 transportService.process(attrSession,
299 TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), 303 TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
300 - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));  
301 - closeAndDeregister(sessionInfo); 304 + new CoapNoOpCallback(exchange));
302 } 305 }
  306 + closeAndDeregister(sessionInfo);
303 break; 307 break;
304 case SUBSCRIBE_RPC_COMMANDS_REQUEST: 308 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
305 - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));  
306 - if (currentRpcSession == null) { 309 + CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  310 + if (currentCoapObserveRpcSessionInfo == null) {
307 rpcSubscriptions.add(sessionId); 311 rpcSubscriptions.add(sessionId);
308 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, 312 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
309 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 313 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
@@ -314,16 +318,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -314,16 +318,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
314 } 318 }
315 break; 319 break;
316 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: 320 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
317 - TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request));  
318 - if (rpcSession != null) { 321 + CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  322 + if (coapObserveRpcSessionInfo != null) {
  323 + TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto();
319 UUID rpcSessionId = toSessionId(rpcSession); 324 UUID rpcSessionId = toSessionId(rpcSession);
320 rpcSubscriptions.remove(rpcSessionId); 325 rpcSubscriptions.remove(rpcSessionId);
321 - sessionInfoToObserveRelationMap.remove(rpcSession);  
322 transportService.process(rpcSession, 326 transportService.process(rpcSession,
323 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), 327 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
324 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); 328 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
325 - closeAndDeregister(sessionInfo);  
326 } 329 }
  330 + closeAndDeregister(sessionInfo);
327 break; 331 break;
328 case TO_DEVICE_RPC_RESPONSE: 332 case TO_DEVICE_RPC_RESPONSE:
329 transportService.process(sessionInfo, 333 transportService.process(sessionInfo,
@@ -355,13 +359,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -355,13 +359,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
355 return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); 359 return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
356 } 360 }
357 361
358 - private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) {  
359 - tokenToObserveNotificationSeqMap.remove(token);  
360 - return tokenToSessionInfoMap.remove(token); 362 + private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) {
  363 + return tokenToCoapSessionInfoMap.remove(token);
361 } 364 }
362 365
363 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { 366 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
364 - tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); 367 + tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo));
365 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); 368 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder));
366 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 369 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
367 } 370 }
@@ -474,40 +477,33 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -474,40 +477,33 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
474 } 477 }
475 478
476 @Override 479 @Override
477 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg msg) { 480 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
  481 + log.trace("[{}] Received attributes update notification to device", sessionId);
478 try { 482 try {
479 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); 483 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
480 } catch (AdaptorException e) { 484 } catch (AdaptorException e) {
481 log.trace("Failed to reply due to error", e); 485 log.trace("Failed to reply due to error", e);
482 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 486 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  487 + closeAndDeregister();
483 } 488 }
484 } 489 }
485 490
486 @Override 491 @Override
487 public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { 492 public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
488 log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); 493 log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
489 - Map<TransportProtos.SessionInfoProto, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap();  
490 - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {  
491 - Set<TransportProtos.SessionInfoProto> observeSessions = sessionToObserveRelationMap.keySet();  
492 - Optional<TransportProtos.SessionInfoProto> observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> {  
493 - UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());  
494 - return observeSessionId.equals(sessionId);  
495 - }).findFirst();  
496 - if (observeSessionToClose.isPresent()) {  
497 - TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get();  
498 - ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto);  
499 - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE);  
500 - }  
501 - } 494 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE);
  495 + closeAndDeregister();
502 } 496 }
503 497
504 @Override 498 @Override
505 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { 499 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
  500 + log.trace("[{}] Received RPC command to device", sessionId);
506 try { 501 try {
507 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); 502 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
508 } catch (AdaptorException e) { 503 } catch (AdaptorException e) {
509 log.trace("Failed to reply due to error", e); 504 log.trace("Failed to reply due to error", e);
510 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 505 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  506 + closeAndDeregister();
511 } 507 }
512 } 508 }
513 509
@@ -524,6 +520,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -524,6 +520,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
524 private boolean isConRequest() { 520 private boolean isConRequest() {
525 return exchange.advanced().getRequest().isConfirmable(); 521 return exchange.advanced().getRequest().isConfirmable();
526 } 522 }
  523 +
  524 + private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) {
  525 + Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap();
  526 + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
  527 + Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> {
  528 + TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto();
  529 + UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB());
  530 + return observeSessionId.equals(sessionId);
  531 + }).findFirst();
  532 + if (observeSessionToClose.isPresent()) {
  533 + CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get();
  534 + ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo);
  535 + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode);
  536 + }
  537 + }
  538 + }
  539 +
  540 + private void closeAndDeregister() {
  541 + Request request = exchange.advanced().getRequest();
  542 + String token = coapTransportResource.getTokenFromRequest(request);
  543 + CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token);
  544 + coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto());
  545 + }
  546 +
527 } 547 }
528 548
529 public class CoapResourceObserver implements ResourceObserver { 549 public class CoapResourceObserver implements ResourceObserver {
@@ -548,7 +568,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -548,7 +568,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
548 public void addedObserveRelation(ObserveRelation relation) { 568 public void addedObserveRelation(ObserveRelation relation) {
549 Request request = relation.getExchange().getRequest(); 569 Request request = relation.getExchange().getRequest();
550 String token = getTokenFromRequest(request); 570 String token = getTokenFromRequest(request);
551 - sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); 571 + sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation);
552 log.trace("Added Observe relation for token: {}", token); 572 log.trace("Added Observe relation for token: {}", token);
553 } 573 }
554 574
@@ -556,8 +576,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -556,8 +576,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
556 public void removedObserveRelation(ObserveRelation relation) { 576 public void removedObserveRelation(ObserveRelation relation) {
557 Request request = relation.getExchange().getRequest(); 577 Request request = relation.getExchange().getRequest();
558 String token = getTokenFromRequest(request); 578 String token = getTokenFromRequest(request);
559 - TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token);  
560 - sessionInfoToObserveRelationMap.remove(session); 579 + sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token));
561 log.trace("Relation removed for token: {}", token); 580 log.trace("Relation removed for token: {}", token);
562 } 581 }
563 } 582 }
@@ -568,7 +587,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -568,7 +587,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
568 transportService.deregisterSession(session); 587 transportService.deregisterSession(session);
569 rpcSubscriptions.remove(sessionId); 588 rpcSubscriptions.remove(sessionId);
570 attributeSubscriptions.remove(sessionId); 589 attributeSubscriptions.remove(sessionId);
571 - sessionInfoToObserveRelationMap.remove(session);  
572 } 590 }
573 591
574 private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { 592 private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException {
@@ -634,4 +652,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -634,4 +652,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
634 this.jsonPayload = jsonPayload; 652 this.jsonPayload = jsonPayload;
635 } 653 }
636 } 654 }
  655 +
  656 + @Data
  657 + private static class CoapObserveSessionInfo {
  658 +
  659 + private final TransportProtos.SessionInfoProto sessionInfoProto;
  660 + private final AtomicInteger observeNotificationCounter;
  661 +
  662 + private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) {
  663 + this.sessionInfoProto = sessionInfoProto;
  664 + this.observeNotificationCounter = new AtomicInteger(0);
  665 + }
  666 + }
  667 +
637 } 668 }
@@ -386,7 +386,8 @@ public class DeviceApiController implements TbTransportService { @@ -386,7 +386,8 @@ public class DeviceApiController implements TbTransportService {
386 } 386 }
387 387
388 @Override 388 @Override
389 - public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) { 389 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg msg) {
  390 + log.trace("[{}] Received attributes update notification to device", sessionId);
390 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); 391 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
391 } 392 }
392 393
@@ -397,7 +398,8 @@ public class DeviceApiController implements TbTransportService { @@ -397,7 +398,8 @@ public class DeviceApiController implements TbTransportService {
397 } 398 }
398 399
399 @Override 400 @Override
400 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { 401 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
  402 + log.trace("[{}] Received RPC command to device", sessionId);
401 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); 403 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
402 } 404 }
403 405
@@ -52,9 +52,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -52,9 +52,10 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
52 } 52 }
53 53
54 @Override 54 @Override
55 - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 55 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  56 + log.trace("[{}] Received attributes update notification to device", sessionId);
56 this.attributesService.onAttributesUpdate(attributeUpdateNotification, this.sessionInfo); 57 this.attributesService.onAttributesUpdate(attributeUpdateNotification, this.sessionInfo);
57 - } 58 + }
58 59
59 @Override 60 @Override
60 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) { 61 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) {
@@ -77,8 +78,9 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -77,8 +78,9 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
77 } 78 }
78 79
79 @Override 80 @Override
80 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {  
81 - this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest,this.sessionInfo); 81 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  82 + log.trace("[{}] Received RPC command to device", sessionId);
  83 + this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
82 } 84 }
83 85
84 @Override 86 @Override
@@ -795,7 +795,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -795,7 +795,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
795 } 795 }
796 796
797 @Override 797 @Override
798 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) { 798 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  799 + log.trace("[{}] Received attributes update notification to device", sessionId);
799 try { 800 try {
800 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); 801 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
801 } catch (Exception e) { 802 } catch (Exception e) {
@@ -810,7 +811,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -810,7 +811,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
810 } 811 }
811 812
812 @Override 813 @Override
813 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { 814 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
814 log.trace("[{}] Received RPC command to device", sessionId); 815 log.trace("[{}] Received RPC command to device", sessionId);
815 try { 816 try {
816 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); 817 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
@@ -78,7 +78,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -78,7 +78,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
78 } 78 }
79 79
80 @Override 80 @Override
81 - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) { 81 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  82 + log.trace("[{}] Received attributes update notification to device", sessionId);
82 try { 83 try {
83 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush); 84 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
84 } catch (Exception e) { 85 } catch (Exception e) {
@@ -87,7 +88,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -87,7 +88,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
87 } 88 }
88 89
89 @Override 90 @Override
90 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { 91 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
  92 + log.trace("[{}] Received RPC command to device", sessionId);
91 try { 93 try {
92 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush); 94 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush);
93 } catch (Exception e) { 95 } catch (Exception e) {
@@ -127,7 +127,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -127,7 +127,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
127 } 127 }
128 128
129 @Override 129 @Override
130 - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 130 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  131 + log.trace("[{}] Received attributes update notification to device", sessionId);
131 snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification); 132 snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
132 } 133 }
133 134
@@ -137,7 +138,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -137,7 +138,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
137 } 138 }
138 139
139 @Override 140 @Override
140 - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 141 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  142 + log.trace("[{}] Received RPC command to device", sessionId);
141 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); 143 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
142 } 144 }
143 145
@@ -36,11 +36,11 @@ public interface SessionMsgListener { @@ -36,11 +36,11 @@ public interface SessionMsgListener {
36 36
37 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); 37 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
38 38
39 - void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification); 39 + void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification);
40 40
41 void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification); 41 void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification);
42 42
43 - void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest); 43 + void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest);
44 44
45 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); 45 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
46 46
@@ -743,7 +743,7 @@ public class DefaultTransportService implements TransportService { @@ -743,7 +743,7 @@ public class DefaultTransportService implements TransportService {
743 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); 743 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
744 } 744 }
745 if (toSessionMsg.hasAttributeUpdateNotification()) { 745 if (toSessionMsg.hasAttributeUpdateNotification()) {
746 - listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); 746 + listener.onAttributeUpdate(sessionId, toSessionMsg.getAttributeUpdateNotification());
747 } 747 }
748 if (toSessionMsg.hasSessionCloseNotification()) { 748 if (toSessionMsg.hasSessionCloseNotification()) {
749 listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification()); 749 listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification());
@@ -752,7 +752,7 @@ public class DefaultTransportService implements TransportService { @@ -752,7 +752,7 @@ public class DefaultTransportService implements TransportService {
752 listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification()); 752 listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification());
753 } 753 }
754 if (toSessionMsg.hasToDeviceRequest()) { 754 if (toSessionMsg.hasToDeviceRequest()) {
755 - listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); 755 + listener.onToDeviceRpcRequest(sessionId, toSessionMsg.getToDeviceRequest());
756 } 756 }
757 if (toSessionMsg.hasToServerResponse()) { 757 if (toSessionMsg.hasToServerResponse()) {
758 String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId(); 758 String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();