Commit 786439f90ac1ca514af11dabb3995227db5f7b65

Authored by Igor Kulikov
2 parents fa588d89 a91d2335

Merge branch 'master' of github.com:thingsboard/thingsboard

Showing 14 changed files with 355 additions and 130 deletions
@@ -233,7 +233,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -233,7 +233,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
233 rpc.setExpirationTime(request.getExpirationTime()); 233 rpc.setExpirationTime(request.getExpirationTime());
234 rpc.setRequest(JacksonUtil.valueToTree(request)); 234 rpc.setRequest(JacksonUtil.valueToTree(request));
235 rpc.setStatus(status); 235 rpc.setStatus(status);
236 - systemContext.getTbRpcService().save(tenantId, rpc);  
237 return systemContext.getTbRpcService().save(tenantId, rpc); 236 return systemContext.getTbRpcService().save(tenantId, rpc);
238 } 237 }
239 238
@@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
31 import org.thingsboard.server.common.msg.session.FeatureType; 31 import org.thingsboard.server.common.msg.session.FeatureType;
32 import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; 32 import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest;
33 33
  34 +import java.nio.charset.StandardCharsets;
34 import java.util.concurrent.CountDownLatch; 35 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.TimeUnit;
36 37
@@ -15,11 +15,14 @@ @@ -15,11 +15,14 @@
15 */ 15 */
16 package org.thingsboard.server.transport.coap; 16 package org.thingsboard.server.transport.coap;
17 17
  18 +import lombok.SneakyThrows;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
19 import org.eclipse.californium.core.CoapResource; 20 import org.eclipse.californium.core.CoapResource;
20 import org.eclipse.californium.core.coap.CoAP; 21 import org.eclipse.californium.core.coap.CoAP;
  22 +import org.eclipse.californium.core.coap.MessageObserver;
21 import org.eclipse.californium.core.coap.Response; 23 import org.eclipse.californium.core.coap.Response;
22 import org.eclipse.californium.core.server.resources.CoapExchange; 24 import org.eclipse.californium.core.server.resources.CoapExchange;
  25 +import org.eclipse.californium.elements.EndpointContext;
23 import org.thingsboard.server.common.data.DeviceProfile; 26 import org.thingsboard.server.common.data.DeviceProfile;
24 import org.thingsboard.server.common.transport.TransportContext; 27 import org.thingsboard.server.common.transport.TransportContext;
25 import org.thingsboard.server.common.transport.TransportService; 28 import org.thingsboard.server.common.transport.TransportService;
@@ -29,8 +32,12 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes @@ -29,8 +32,12 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes
29 import org.thingsboard.server.gen.transport.TransportProtos; 32 import org.thingsboard.server.gen.transport.TransportProtos;
30 33
31 import java.util.UUID; 34 import java.util.UUID;
  35 +import java.util.concurrent.ThreadLocalRandom;
32 import java.util.function.BiConsumer; 36 import java.util.function.BiConsumer;
33 37
  38 +import static org.eclipse.californium.core.coap.Message.MAX_MID;
  39 +import static org.eclipse.californium.core.coap.Message.NONE;
  40 +
34 @Slf4j 41 @Slf4j
35 public abstract class AbstractCoapTransportResource extends CoapResource { 42 public abstract class AbstractCoapTransportResource extends CoapResource {
36 43
@@ -75,77 +82,8 @@ public abstract class AbstractCoapTransportResource extends CoapResource { @@ -75,77 +82,8 @@ public abstract class AbstractCoapTransportResource extends CoapResource {
75 .setEvent(event).build(); 82 .setEvent(event).build();
76 } 83 }
77 84
78 - public static class CoapDeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {  
79 - private final TransportContext transportContext;  
80 - private final CoapExchange exchange;  
81 - private final BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess;  
82 -  
83 - public CoapDeviceAuthCallback(TransportContext transportContext, CoapExchange exchange, BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess) {  
84 - this.transportContext = transportContext;  
85 - this.exchange = exchange;  
86 - this.onSuccess = onSuccess;  
87 - }  
88 -  
89 - @Override  
90 - public void onSuccess(ValidateDeviceCredentialsResponse msg) {  
91 - DeviceProfile deviceProfile = msg.getDeviceProfile();  
92 - if (msg.hasDeviceInfo() && deviceProfile != null) {  
93 - TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID());  
94 - onSuccess.accept(sessionInfoProto, deviceProfile);  
95 - } else {  
96 - exchange.respond(CoAP.ResponseCode.UNAUTHORIZED);  
97 - }  
98 - }  
99 -  
100 - @Override  
101 - public void onError(Throwable e) {  
102 - log.warn("Failed to process request", e);  
103 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);  
104 - }  
105 - }  
106 -  
107 - public static class CoapOkCallback implements TransportServiceCallback<Void> {  
108 - private final CoapExchange exchange;  
109 - private final CoAP.ResponseCode onSuccessResponse;  
110 - private final CoAP.ResponseCode onFailureResponse;  
111 -  
112 - public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) {  
113 - this.exchange = exchange;  
114 - this.onSuccessResponse = onSuccessResponse;  
115 - this.onFailureResponse = onFailureResponse;  
116 - }  
117 -  
118 - @Override  
119 - public void onSuccess(Void msg) {  
120 - Response response = new Response(onSuccessResponse);  
121 - response.setAcknowledged(isConRequest());  
122 - exchange.respond(response);  
123 - }  
124 -  
125 - @Override  
126 - public void onError(Throwable e) {  
127 - exchange.respond(onFailureResponse);  
128 - }  
129 -  
130 - private boolean isConRequest() {  
131 - return exchange.advanced().getRequest().isConfirmable();  
132 - } 85 + protected int getNextMsgId() {
  86 + return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1);
133 } 87 }
134 88
135 - public static class CoapNoOpCallback implements TransportServiceCallback<Void> {  
136 - private final CoapExchange exchange;  
137 -  
138 - CoapNoOpCallback(CoapExchange exchange) {  
139 - this.exchange = exchange;  
140 - }  
141 -  
142 - @Override  
143 - public void onSuccess(Void msg) {  
144 - }  
145 -  
146 - @Override  
147 - public void onError(Throwable e) {  
148 - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);  
149 - }  
150 - }  
151 } 89 }
@@ -22,10 +22,14 @@ import org.springframework.beans.factory.annotation.Value; @@ -22,10 +22,14 @@ import org.springframework.beans.factory.annotation.Value;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 22 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
23 import org.springframework.stereotype.Component; 23 import org.springframework.stereotype.Component;
24 import org.thingsboard.server.common.transport.TransportContext; 24 import org.thingsboard.server.common.transport.TransportContext;
  25 +import org.thingsboard.server.gen.transport.TransportProtos;
25 import org.thingsboard.server.transport.coap.adaptors.JsonCoapAdaptor; 26 import org.thingsboard.server.transport.coap.adaptors.JsonCoapAdaptor;
26 import org.thingsboard.server.transport.coap.adaptors.ProtoCoapAdaptor; 27 import org.thingsboard.server.transport.coap.adaptors.ProtoCoapAdaptor;
27 import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor; 28 import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor;
28 29
  30 +import java.util.concurrent.ConcurrentHashMap;
  31 +import java.util.concurrent.ConcurrentMap;
  32 +
29 33
30 /** 34 /**
31 * Created by ashvayka on 18.10.18. 35 * Created by ashvayka on 18.10.18.
@@ -33,22 +37,21 @@ import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor; @@ -33,22 +37,21 @@ import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor;
33 @Slf4j 37 @Slf4j
34 @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.coap.enabled}'=='true')") 38 @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.coap.enabled}'=='true')")
35 @Component 39 @Component
  40 +@Getter
36 public class CoapTransportContext extends TransportContext { 41 public class CoapTransportContext extends TransportContext {
37 42
38 - @Getter  
39 @Value("${transport.sessions.report_timeout}") 43 @Value("${transport.sessions.report_timeout}")
40 private long sessionReportTimeout; 44 private long sessionReportTimeout;
41 45
42 - @Getter  
43 @Autowired 46 @Autowired
44 private JsonCoapAdaptor jsonCoapAdaptor; 47 private JsonCoapAdaptor jsonCoapAdaptor;
45 48
46 - @Getter  
47 @Autowired 49 @Autowired
48 private ProtoCoapAdaptor protoCoapAdaptor; 50 private ProtoCoapAdaptor protoCoapAdaptor;
49 51
50 - @Getter  
51 @Autowired 52 @Autowired
52 private EfentoCoapAdaptor efentoCoapAdaptor; 53 private EfentoCoapAdaptor efentoCoapAdaptor;
53 54
  55 + private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck = new ConcurrentHashMap<>();
  56 +
54 } 57 }
@@ -19,6 +19,7 @@ import com.google.gson.JsonParseException; @@ -19,6 +19,7 @@ import com.google.gson.JsonParseException;
19 import com.google.protobuf.Descriptors; 19 import com.google.protobuf.Descriptors;
20 import com.google.protobuf.DynamicMessage; 20 import com.google.protobuf.DynamicMessage;
21 import lombok.Data; 21 import lombok.Data;
  22 +import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
23 import org.eclipse.californium.core.coap.CoAP; 24 import org.eclipse.californium.core.coap.CoAP;
24 import org.eclipse.californium.core.coap.Request; 25 import org.eclipse.californium.core.coap.Request;
@@ -53,6 +54,9 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -53,6 +54,9 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
53 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 54 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
54 import org.thingsboard.server.gen.transport.TransportProtos; 55 import org.thingsboard.server.gen.transport.TransportProtos;
55 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; 56 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
  57 +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
  58 +import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
  59 +import org.thingsboard.server.transport.coap.callback.CoapOkCallback;
56 60
57 import java.util.List; 61 import java.util.List;
58 import java.util.Map; 62 import java.util.Map;
@@ -81,9 +85,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -81,9 +85,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
81 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet(); 85 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
82 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet(); 86 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
83 87
84 - private ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap;  
85 - private long timeout;  
86 - private long sessionReportTimeout; 88 + private final ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap;
  89 + private final long timeout;
87 90
88 public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { 91 public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) {
89 super(ctx, name); 92 super(ctx, name);
@@ -91,7 +94,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -91,7 +94,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
91 this.addObserver(new CoapResourceObserver()); 94 this.addObserver(new CoapResourceObserver());
92 this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap(); 95 this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap();
93 this.timeout = coapServerService.getTimeout(); 96 this.timeout = coapServerService.getTimeout();
94 - this.sessionReportTimeout = ctx.getSessionReportTimeout(); 97 + long sessionReportTimeout = ctx.getSessionReportTimeout();
95 ctx.getScheduler().scheduleAtFixedRate(() -> { 98 ctx.getScheduler().scheduleAtFixedRate(() -> {
96 Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet(); 99 Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet();
97 Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos 100 Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos
@@ -110,7 +113,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -110,7 +113,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
110 return; // because request did not try to establish a relation 113 return; // because request did not try to establish a relation
111 } 114 }
112 if (CoAP.ResponseCode.isSuccess(response.getCode())) { 115 if (CoAP.ResponseCode.isSuccess(response.getCode())) {
113 -  
114 if (!relation.isEstablished()) { 116 if (!relation.isEstablished()) {
115 relation.setEstablished(); 117 relation.setEstablished();
116 addObserveRelation(relation); 118 addObserveRelation(relation);
@@ -280,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -280,8 +282,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
280 CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); 282 CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
281 if (currentCoapObserveAttrSessionInfo == null) { 283 if (currentCoapObserveAttrSessionInfo == null) {
282 attributeSubscriptions.add(sessionId); 284 attributeSubscriptions.add(sessionId);
283 - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,  
284 - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 285 + registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(),
  286 + sessionInfo, getTokenFromRequest(request));
285 transportService.process(sessionInfo, 287 transportService.process(sessionInfo,
286 TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange)); 288 TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange));
287 transportService.process(sessionInfo, 289 transportService.process(sessionInfo,
@@ -305,11 +307,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -305,11 +307,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
305 CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request)); 307 CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
306 if (currentCoapObserveRpcSessionInfo == null) { 308 if (currentCoapObserveRpcSessionInfo == null) {
307 rpcSubscriptions.add(sessionId); 309 rpcSubscriptions.add(sessionId);
308 - registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,  
309 - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); 310 + registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()
  311 + , sessionInfo, getTokenFromRequest(request));
310 transportService.process(sessionInfo, 312 transportService.process(sessionInfo,
311 TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), 313 TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
312 - new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) 314 + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
313 ); 315 );
314 } 316 }
315 break; 317 break;
@@ -359,14 +361,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -359,14 +361,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
359 return tokenToCoapSessionInfoMap.remove(token); 361 return tokenToCoapSessionInfoMap.remove(token);
360 } 362 }
361 363
362 - private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { 364 + private void registerAsyncCoapSession(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor,
  365 + DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo, String token) {
363 tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo)); 366 tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo));
364 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); 367 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
365 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 368 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
366 } 369 }
367 370
368 - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {  
369 - return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); 371 + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor,
  372 + DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
  373 + return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo);
370 } 374 }
371 375
372 private String getTokenFromRequest(Request request) { 376 private String getTokenFromRequest(Request request) {
@@ -448,22 +452,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -448,22 +452,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
448 } 452 }
449 } 453 }
450 454
451 - private static class CoapSessionListener implements SessionMsgListener { 455 + @RequiredArgsConstructor
  456 + private class CoapSessionListener implements SessionMsgListener {
452 457
453 - private final CoapTransportResource coapTransportResource;  
454 private final CoapExchange exchange; 458 private final CoapExchange exchange;
455 private final CoapTransportAdaptor coapTransportAdaptor; 459 private final CoapTransportAdaptor coapTransportAdaptor;
456 private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; 460 private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
457 private final TransportProtos.SessionInfoProto sessionInfo; 461 private final TransportProtos.SessionInfoProto sessionInfo;
458 462
459 - CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {  
460 - this.coapTransportResource = coapTransportResource;  
461 - this.exchange = exchange;  
462 - this.coapTransportAdaptor = coapTransportAdaptor;  
463 - this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;  
464 - this.sessionInfo = sessionInfo;  
465 - }  
466 -  
467 @Override 463 @Override
468 public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { 464 public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
469 try { 465 try {
@@ -496,18 +492,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -496,18 +492,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
496 @Override 492 @Override
497 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { 493 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
498 log.trace("[{}] Received RPC command to device", sessionId); 494 log.trace("[{}] Received RPC command to device", sessionId);
499 - boolean successful = true;  
500 try { 495 try {
501 - exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); 496 + Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder);
  497 + int requestId = getNextMsgId();
  498 + response.setMID(requestId);
  499 + if (msg.getPersisted()) {
  500 + transportContext.getRpcAwaitingAck().put(requestId, msg);
  501 + transportContext.getScheduler().schedule(() -> {
  502 + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
  503 + if (awaitingAckMsg != null) {
  504 + transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY);
  505 + }
  506 + }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  507 + }
  508 + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
  509 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
  510 + if (rpcRequestMsg != null) {
  511 + transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
  512 + }
  513 + }));
  514 + exchange.respond(response);
502 } catch (AdaptorException e) { 515 } catch (AdaptorException e) {
503 log.trace("Failed to reply due to error", e); 516 log.trace("Failed to reply due to error", e);
504 closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 517 closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
505 - successful = false;  
506 - } finally {  
507 - coapTransportResource.transportService.process(sessionInfo, msg, !successful, TransportServiceCallback.EMPTY);  
508 - if (!successful) {  
509 - closeAndDeregister();  
510 - } 518 + closeAndDeregister();
511 } 519 }
512 } 520 }
513 521
@@ -526,8 +534,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -526,8 +534,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
526 } 534 }
527 535
528 private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) { 536 private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) {
529 - Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap();  
530 - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { 537 + Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = CoapTransportResource.this.getCoapSessionInfoToObserveRelationMap();
  538 + if (CoapTransportResource.this.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
531 Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> { 539 Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> {
532 TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto(); 540 TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto();
533 UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB()); 541 UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB());
@@ -536,16 +544,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -536,16 +544,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
536 if (observeSessionToClose.isPresent()) { 544 if (observeSessionToClose.isPresent()) {
537 CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get(); 545 CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get();
538 ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo); 546 ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo);
539 - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode); 547 + CoapTransportResource.this.clearAndNotifyObserveRelation(observeRelation, responseCode);
540 } 548 }
541 } 549 }
542 } 550 }
543 551
544 private void closeAndDeregister() { 552 private void closeAndDeregister() {
545 Request request = exchange.advanced().getRequest(); 553 Request request = exchange.advanced().getRequest();
546 - String token = coapTransportResource.getTokenFromRequest(request);  
547 - CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token);  
548 - coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto()); 554 + String token = CoapTransportResource.this.getTokenFromRequest(request);
  555 + CoapObserveSessionInfo deleted = CoapTransportResource.this.lookupAsyncSessionInfo(token);
  556 + CoapTransportResource.this.closeAndDeregister(deleted.getSessionInfoProto());
549 } 557 }
550 558
551 } 559 }
@@ -20,22 +20,19 @@ import org.eclipse.californium.core.coap.CoAP; @@ -20,22 +20,19 @@ import org.eclipse.californium.core.coap.CoAP;
20 import org.eclipse.californium.core.coap.Request; 20 import org.eclipse.californium.core.coap.Request;
21 import org.eclipse.californium.core.coap.Response; 21 import org.eclipse.californium.core.coap.Response;
22 import org.eclipse.californium.core.network.Exchange; 22 import org.eclipse.californium.core.network.Exchange;
23 -import org.eclipse.californium.core.observe.ObserveRelation;  
24 import org.eclipse.californium.core.server.resources.CoapExchange; 23 import org.eclipse.californium.core.server.resources.CoapExchange;
25 import org.eclipse.californium.core.server.resources.Resource; 24 import org.eclipse.californium.core.server.resources.Resource;
26 -import org.eclipse.californium.core.server.resources.ResourceObserver;  
27 -import org.thingsboard.common.util.ThingsBoardExecutors;  
28 import org.thingsboard.server.common.data.DeviceTransportType; 25 import org.thingsboard.server.common.data.DeviceTransportType;
29 import org.thingsboard.server.common.data.StringUtils; 26 import org.thingsboard.server.common.data.StringUtils;
30 import org.thingsboard.server.common.data.ota.OtaPackageType; 27 import org.thingsboard.server.common.data.ota.OtaPackageType;
31 import org.thingsboard.server.common.data.security.DeviceTokenCredentials; 28 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
32 import org.thingsboard.server.common.transport.TransportServiceCallback; 29 import org.thingsboard.server.common.transport.TransportServiceCallback;
33 import org.thingsboard.server.gen.transport.TransportProtos; 30 import org.thingsboard.server.gen.transport.TransportProtos;
  31 +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
34 32
35 import java.util.List; 33 import java.util.List;
36 import java.util.Optional; 34 import java.util.Optional;
37 import java.util.UUID; 35 import java.util.UUID;
38 -import java.util.concurrent.ExecutorService;  
39 36
40 @Slf4j 37 @Slf4j
41 public class OtaPackageTransportResource extends AbstractCoapTransportResource { 38 public class OtaPackageTransportResource extends AbstractCoapTransportResource {
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap;
  17 +
  18 +import lombok.RequiredArgsConstructor;
  19 +import org.eclipse.californium.core.coap.MessageObserver;
  20 +import org.eclipse.californium.core.coap.Response;
  21 +import org.eclipse.californium.elements.EndpointContext;
  22 +
  23 +import java.util.function.Consumer;
  24 +
  25 +@RequiredArgsConstructor
  26 +public class TbCoapMessageObserver implements MessageObserver {
  27 +
  28 + private final int msgId;
  29 + private final Consumer<Integer> onAcknowledge;
  30 +
  31 + @Override
  32 + public void onRetransmission() {
  33 +
  34 + }
  35 +
  36 + @Override
  37 + public void onResponse(Response response) {
  38 +
  39 + }
  40 +
  41 + @Override
  42 + public void onAcknowledgement() {
  43 + onAcknowledge.accept(msgId);
  44 + }
  45 +
  46 + @Override
  47 + public void onReject() {
  48 +
  49 + }
  50 +
  51 + @Override
  52 + public void onTimeout() {
  53 +
  54 + }
  55 +
  56 + @Override
  57 + public void onCancel() {
  58 +
  59 + }
  60 +
  61 + @Override
  62 + public void onReadyToSend() {
  63 +
  64 + }
  65 +
  66 + @Override
  67 + public void onConnecting() {
  68 +
  69 + }
  70 +
  71 + @Override
  72 + public void onDtlsRetransmission(int flight) {
  73 +
  74 + }
  75 +
  76 + @Override
  77 + public void onSent(boolean retransmission) {
  78 +
  79 + }
  80 +
  81 + @Override
  82 + public void onSendError(Throwable error) {
  83 +
  84 + }
  85 +
  86 + @Override
  87 + public void onContextEstablished(EndpointContext endpointContext) {
  88 +
  89 + }
  90 +
  91 + @Override
  92 + public void onComplete() {
  93 +
  94 + }
  95 +}
@@ -150,7 +150,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @@ -150,7 +150,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
150 private Response getObserveNotification(boolean confirmable, JsonElement json) { 150 private Response getObserveNotification(boolean confirmable, JsonElement json) {
151 Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); 151 Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
152 response.setPayload(json.toString()); 152 response.setPayload(json.toString());
153 - response.setAcknowledged(confirmable); 153 + response.setConfirmable(confirmable);
154 return response; 154 return response;
155 } 155 }
156 156
@@ -122,7 +122,7 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { @@ -122,7 +122,7 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
122 @Override 122 @Override
123 public Response convertToPublish(boolean isConfirmable, TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException { 123 public Response convertToPublish(boolean isConfirmable, TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException {
124 Response response = new Response(CoAP.ResponseCode.CONTENT); 124 Response response = new Response(CoAP.ResponseCode.CONTENT);
125 - response.setAcknowledged(isConfirmable); 125 + response.setConfirmable(isConfirmable);
126 response.setPayload(msg.toByteArray()); 126 response.setPayload(msg.toByteArray());
127 return response; 127 return response;
128 } 128 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.eclipse.californium.core.coap.CoAP;
  20 +import org.eclipse.californium.core.server.resources.CoapExchange;
  21 +import org.thingsboard.server.common.data.DeviceProfile;
  22 +import org.thingsboard.server.common.transport.TransportContext;
  23 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  24 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  25 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  26 +import org.thingsboard.server.gen.transport.TransportProtos;
  27 +import org.thingsboard.server.transport.coap.AbstractCoapTransportResource;
  28 +
  29 +import java.util.UUID;
  30 +import java.util.function.BiConsumer;
  31 +
  32 +@Slf4j
  33 +public class CoapDeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {
  34 + private final TransportContext transportContext;
  35 + private final CoapExchange exchange;
  36 + private final BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess;
  37 +
  38 + public CoapDeviceAuthCallback(TransportContext transportContext, CoapExchange exchange, BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess) {
  39 + this.transportContext = transportContext;
  40 + this.exchange = exchange;
  41 + this.onSuccess = onSuccess;
  42 + }
  43 +
  44 + @Override
  45 + public void onSuccess(ValidateDeviceCredentialsResponse msg) {
  46 + DeviceProfile deviceProfile = msg.getDeviceProfile();
  47 + if (msg.hasDeviceInfo() && deviceProfile != null) {
  48 + TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID());
  49 + onSuccess.accept(sessionInfoProto, deviceProfile);
  50 + } else {
  51 + exchange.respond(CoAP.ResponseCode.UNAUTHORIZED);
  52 + }
  53 + }
  54 +
  55 + @Override
  56 + public void onError(Throwable e) {
  57 + log.warn("Failed to process request", e);
  58 + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  59 + }
  60 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import org.eclipse.californium.core.coap.CoAP;
  19 +import org.eclipse.californium.core.server.resources.CoapExchange;
  20 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  21 +
  22 +public class CoapNoOpCallback implements TransportServiceCallback<Void> {
  23 + private final CoapExchange exchange;
  24 +
  25 + public CoapNoOpCallback(CoapExchange exchange) {
  26 + this.exchange = exchange;
  27 + }
  28 +
  29 + @Override
  30 + public void onSuccess(Void msg) {
  31 + }
  32 +
  33 + @Override
  34 + public void onError(Throwable e) {
  35 + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  36 + }
  37 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import org.eclipse.californium.core.coap.CoAP;
  19 +import org.eclipse.californium.core.coap.Response;
  20 +import org.eclipse.californium.core.server.resources.CoapExchange;
  21 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  22 +
  23 +public class CoapOkCallback implements TransportServiceCallback<Void> {
  24 +
  25 + protected final CoapExchange exchange;
  26 + protected final CoAP.ResponseCode onSuccessResponse;
  27 + protected final CoAP.ResponseCode onFailureResponse;
  28 +
  29 + public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) {
  30 + this.exchange = exchange;
  31 + this.onSuccessResponse = onSuccessResponse;
  32 + this.onFailureResponse = onFailureResponse;
  33 + }
  34 +
  35 + @Override
  36 + public void onSuccess(Void msg) {
  37 + Response response = new Response(onSuccessResponse);
  38 + response.setConfirmable(isConRequest());
  39 + exchange.respond(response);
  40 + }
  41 +
  42 + @Override
  43 + public void onError(Throwable e) {
  44 + exchange.respond(onFailureResponse);
  45 + }
  46 +
  47 + protected boolean isConRequest() {
  48 + return exchange.advanced().getRequest().isConfirmable();
  49 + }
  50 +}
@@ -35,6 +35,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -35,6 +35,8 @@ import org.thingsboard.server.gen.transport.TransportProtos;
35 import org.thingsboard.server.gen.transport.coap.MeasurementTypeProtos; 35 import org.thingsboard.server.gen.transport.coap.MeasurementTypeProtos;
36 import org.thingsboard.server.gen.transport.coap.MeasurementsProtos; 36 import org.thingsboard.server.gen.transport.coap.MeasurementsProtos;
37 import org.thingsboard.server.transport.coap.AbstractCoapTransportResource; 37 import org.thingsboard.server.transport.coap.AbstractCoapTransportResource;
  38 +import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
  39 +import org.thingsboard.server.transport.coap.callback.CoapOkCallback;
38 import org.thingsboard.server.transport.coap.CoapTransportContext; 40 import org.thingsboard.server.transport.coap.CoapTransportContext;
39 import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils; 41 import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils;
40 42
@@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt; @@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt;
17 17
18 import com.fasterxml.jackson.databind.JsonNode; 18 import com.fasterxml.jackson.databind.JsonNode;
19 import com.google.gson.JsonParseException; 19 import com.google.gson.JsonParseException;
20 -import io.netty.channel.ChannelFuture;  
21 import io.netty.channel.ChannelHandlerContext; 20 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInboundHandlerAdapter; 21 import io.netty.channel.ChannelInboundHandlerAdapter;
23 import io.netty.handler.codec.mqtt.MqttConnAckMessage; 22 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
@@ -40,6 +39,7 @@ import io.netty.util.CharsetUtil; @@ -40,6 +39,7 @@ import io.netty.util.CharsetUtil;
40 import io.netty.util.ReferenceCountUtil; 39 import io.netty.util.ReferenceCountUtil;
41 import io.netty.util.concurrent.Future; 40 import io.netty.util.concurrent.Future;
42 import io.netty.util.concurrent.GenericFutureListener; 41 import io.netty.util.concurrent.GenericFutureListener;
  42 +import lombok.Data;
43 import lombok.extern.slf4j.Slf4j; 43 import lombok.extern.slf4j.Slf4j;
44 import org.apache.commons.lang3.StringUtils; 44 import org.apache.commons.lang3.StringUtils;
45 import org.thingsboard.server.common.data.DataConstants; 45 import org.thingsboard.server.common.data.DataConstants;
@@ -129,6 +129,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -129,6 +129,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
129 129
130 private final ConcurrentHashMap<String, String> otaPackSessions; 130 private final ConcurrentHashMap<String, String> otaPackSessions;
131 private final ConcurrentHashMap<String, Integer> chunkSizes; 131 private final ConcurrentHashMap<String, Integer> chunkSizes;
  132 + private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
132 133
133 MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) { 134 MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) {
134 this.sessionId = UUID.randomUUID(); 135 this.sessionId = UUID.randomUUID();
@@ -140,6 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -140,6 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
140 this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap, context); 141 this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap, context);
141 this.otaPackSessions = new ConcurrentHashMap<>(); 142 this.otaPackSessions = new ConcurrentHashMap<>();
142 this.chunkSizes = new ConcurrentHashMap<>(); 143 this.chunkSizes = new ConcurrentHashMap<>();
  144 + this.rpcAwaitingAck = new ConcurrentHashMap<>();
143 } 145 }
144 146
145 @Override 147 @Override
@@ -243,6 +245,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -243,6 +245,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
243 processDisconnect(ctx); 245 processDisconnect(ctx);
244 } 246 }
245 break; 247 break;
  248 + case PUBACK:
  249 + int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
  250 + TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
  251 + if (rpcRequest != null) {
  252 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY);
  253 + }
  254 + break;
246 default: 255 default:
247 break; 256 break;
248 } 257 }
@@ -815,16 +824,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -815,16 +824,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
815 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { 824 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
816 log.trace("[{}] Received RPC command to device", sessionId); 825 log.trace("[{}] Received RPC command to device", sessionId);
817 try { 826 try {
818 - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)  
819 - .ifPresent(payload -> {  
820 - ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload);  
821 - if (rpcRequest.getPersisted()) {  
822 - channelFuture.addListener(future ->  
823 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest,  
824 - future.cause() != null, TransportServiceCallback.EMPTY)  
825 - );  
826 - }  
827 - }); 827 + deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
  828 + RequestInfo requestInfo = publish(payload, deviceSessionCtx);
  829 + int msgId = requestInfo.getMsgId();
  830 +
  831 + if (isAckExpected(payload)) {
  832 + if (rpcRequest.getPersisted()) {
  833 + rpcAwaitingAck.put(msgId, rpcRequest);
  834 + context.getScheduler().schedule(() -> {
  835 + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId);
  836 + if (awaitingAckMsg != null) {
  837 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY);
  838 + }
  839 + }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  840 + }
  841 + }
  842 + });
828 } catch (Exception e) { 843 } catch (Exception e) {
829 log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); 844 log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
830 } 845 }
@@ -840,6 +855,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -840,6 +855,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
840 } 855 }
841 } 856 }
842 857
  858 + private RequestInfo publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
  859 + deviceSessionCtx.getChannel().writeAndFlush(message);
  860 +
  861 + int msgId = ((MqttPublishMessage) message).variableHeader().packetId();
  862 + RequestInfo requestInfo = new RequestInfo(msgId, System.currentTimeMillis(), deviceSessionCtx.getSessionInfo());
  863 +
  864 + return requestInfo;
  865 + }
  866 +
  867 + private boolean isAckExpected(MqttMessage message) {
  868 + return message.fixedHeader().qosLevel().value() > 0;
  869 + }
  870 +
843 @Override 871 @Override
844 public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) { 872 public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
845 deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile); 873 deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
@@ -849,4 +877,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -849,4 +877,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
849 public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { 877 public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
850 deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); 878 deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
851 } 879 }
  880 +
  881 + @Data
  882 + public static class RequestInfo {
  883 + private final int msgId;
  884 + private final long requestTime;
  885 + private final TransportProtos.SessionInfoProto sessionInfo;
  886 + }
852 } 887 }