Commit 34aa44d2cb067c645c5b1411637012de3e76379b

Authored by Andrii Shvaika
Committed by Andrew Shvayka
1 parent 6fe0aee7

Added deduplication of transport sessions for CoAP. Fix response codes

Showing 23 changed files with 1005 additions and 426 deletions
... ... @@ -109,7 +109,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr
109 109 protected void validateCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
110 110 assertNotNull(callback.getPayloadBytes());
111 111 assertNotNull(callback.getObserve());
112   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  112 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
113 113 assertEquals(0, callback.getObserve().intValue());
114 114 String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
115 115 assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION), JacksonUtil.toJsonNode(response));
... ... @@ -118,7 +118,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr
118 118 protected void validateEmptyCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
119 119 assertNotNull(callback.getPayloadBytes());
120 120 assertNotNull(callback.getObserve());
121   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  121 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
122 122 assertEquals(0, callback.getObserve().intValue());
123 123 String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
124 124 assertEquals("{}", response);
... ... @@ -127,7 +127,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr
127 127 protected void validateUpdateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
128 128 assertNotNull(callback.getPayloadBytes());
129 129 assertNotNull(callback.getObserve());
130   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  130 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
131 131 assertEquals(1, callback.getObserve().intValue());
132 132 String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
133 133 assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD), JacksonUtil.toJsonNode(response));
... ... @@ -136,7 +136,7 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr
136 136 protected void validateDeleteAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
137 137 assertNotNull(callback.getPayloadBytes());
138 138 assertNotNull(callback.getObserve());
139   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  139 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
140 140 assertEquals(2, callback.getObserve().intValue());
141 141 String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8);
142 142 assertEquals(JacksonUtil.toJsonNode(RESPONSE_ATTRIBUTES_PAYLOAD_DELETED), JacksonUtil.toJsonNode(response));
... ...
... ... @@ -62,7 +62,7 @@ public abstract class AbstractCoapAttributesUpdatesProtoIntegrationTest extends
62 62 protected void validateCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
63 63 assertNotNull(callback.getPayloadBytes());
64 64 assertNotNull(callback.getObserve());
65   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  65 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
66 66 assertEquals(0, callback.getObserve().intValue());
67 67 TransportProtos.AttributeUpdateNotificationMsg.Builder expectedCurrentStateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
68 68 TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("attribute1", "value", TransportProtos.KeyValueType.STRING_V);
... ... @@ -90,14 +90,14 @@ public abstract class AbstractCoapAttributesUpdatesProtoIntegrationTest extends
90 90 protected void validateEmptyCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
91 91 assertNull(callback.getPayloadBytes());
92 92 assertNotNull(callback.getObserve());
93   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  93 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
94 94 assertEquals(0, callback.getObserve().intValue());
95 95 }
96 96
97 97 protected void validateUpdateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
98 98 assertNotNull(callback.getPayloadBytes());
99 99 assertNotNull(callback.getObserve());
100   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  100 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
101 101 assertEquals(1, callback.getObserve().intValue());
102 102 TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
103 103 List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList();
... ... @@ -117,7 +117,7 @@ public abstract class AbstractCoapAttributesUpdatesProtoIntegrationTest extends
117 117 protected void validateDeleteAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException {
118 118 assertNotNull(callback.getPayloadBytes());
119 119 assertNotNull(callback.getObserve());
120   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  120 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
121 121 assertEquals(2, callback.getObserve().intValue());
122 122 TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
123 123 attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5");
... ...
... ... @@ -196,7 +196,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
196 196 assertTrue(StringUtils.isEmpty(result));
197 197 assertNotNull(callback.getPayloadBytes());
198 198 assertNotNull(callback.getObserve());
199   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  199 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
200 200 assertEquals(1, callback.getObserve().intValue());
201 201 }
202 202
... ... @@ -204,7 +204,7 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC
204 204 assertEquals(expectedResult, actualResult);
205 205 assertNotNull(callback.getPayloadBytes());
206 206 assertNotNull(callback.getObserve());
207   - assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  207 + assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode());
208 208 assertEquals(expectedObserveNumber, callback.getObserve().intValue());
209 209 }
210 210
... ...
... ... @@ -111,8 +111,7 @@ public class TbCoapDtlsCertificateVerifier implements NewAdvancedCertificateVeri
111 111 if (msg != null && strCert.equals(msg.getCredentials())) {
112 112 DeviceProfile deviceProfile = msg.getDeviceProfile();
113 113 if (msg.hasDeviceInfo() && deviceProfile != null) {
114   - TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, serviceInfoProvider.getServiceId(), UUID.randomUUID());
115   - tbCoapDtlsSessionInMemoryStorage.put(session.getSessionIdentifier().toString(), new TbCoapDtlsSessionInfo(sessionInfoProto, deviceProfile));
  114 + tbCoapDtlsSessionInMemoryStorage.put(session.getSessionIdentifier().toString(), new TbCoapDtlsSessionInfo(msg, deviceProfile));
116 115 }
117 116 break;
118 117 }
... ...
... ... @@ -17,18 +17,19 @@ package org.thingsboard.server.coapserver;
17 17
18 18 import lombok.Data;
19 19 import org.thingsboard.server.common.data.DeviceProfile;
  20 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
20 21 import org.thingsboard.server.gen.transport.TransportProtos;
21 22
22 23 @Data
23 24 public class TbCoapDtlsSessionInfo {
24 25
25   - private TransportProtos.SessionInfoProto sessionInfoProto;
  26 + private ValidateDeviceCredentialsResponse msg;
26 27 private DeviceProfile deviceProfile;
27 28 private long lastActivityTime;
28 29
29 30
30   - public TbCoapDtlsSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto, DeviceProfile deviceProfile) {
31   - this.sessionInfoProto = sessionInfoProto;
  31 + public TbCoapDtlsSessionInfo(ValidateDeviceCredentialsResponse msg, DeviceProfile deviceProfile) {
  32 + this.msg = msg;
32 33 this.deviceProfile = deviceProfile;
33 34 this.lastActivityTime = System.currentTimeMillis();
34 35 }
... ...
... ... @@ -72,18 +72,4 @@ public abstract class AbstractCoapTransportResource extends CoapResource {
72 72 .build(), TransportServiceCallback.EMPTY);
73 73 }
74 74
75   - protected void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
76   - transportService.reportActivity(sessionInfo);
77   - }
78   -
79   - protected static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
80   - return TransportProtos.SessionEventMsg.newBuilder()
81   - .setSessionType(TransportProtos.SessionType.ASYNC)
82   - .setEvent(event).build();
83   - }
84   -
85   - protected int getNextMsgId() {
86   - return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1);
87   - }
88   -
89 75 }
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.transport.TransportContext;
25 25 import org.thingsboard.server.gen.transport.TransportProtos;
26 26 import org.thingsboard.server.transport.coap.adaptors.JsonCoapAdaptor;
27 27 import org.thingsboard.server.transport.coap.adaptors.ProtoCoapAdaptor;
  28 +import org.thingsboard.server.transport.coap.client.CoapClientContext;
28 29 import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor;
29 30
30 31 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -52,6 +53,9 @@ public class CoapTransportContext extends TransportContext {
52 53 @Autowired
53 54 private EfentoCoapAdaptor efentoCoapAdaptor;
54 55
  56 + @Autowired
  57 + private CoapClientContext clientContext;
  58 +
55 59 private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck = new ConcurrentHashMap<>();
56 60
57 61 }
... ...
... ... @@ -16,10 +16,6 @@
16 16 package org.thingsboard.server.transport.coap;
17 17
18 18 import com.google.gson.JsonParseException;
19   -import com.google.protobuf.Descriptors;
20   -import com.google.protobuf.DynamicMessage;
21   -import lombok.Data;
22   -import lombok.RequiredArgsConstructor;
23 19 import lombok.extern.slf4j.Slf4j;
24 20 import org.eclipse.californium.core.coap.CoAP;
25 21 import org.eclipse.californium.core.coap.Request;
... ... @@ -29,7 +25,6 @@ import org.eclipse.californium.core.observe.ObserveRelation;
29 25 import org.eclipse.californium.core.server.resources.CoapExchange;
30 26 import org.eclipse.californium.core.server.resources.Resource;
31 27 import org.eclipse.californium.core.server.resources.ResourceObserver;
32   -import org.springframework.util.CollectionUtils;
33 28 import org.thingsboard.server.coapserver.CoapServerService;
34 29 import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo;
35 30 import org.thingsboard.server.common.data.DataConstants;
... ... @@ -37,38 +32,30 @@ import org.thingsboard.server.common.data.DeviceProfile;
37 32 import org.thingsboard.server.common.data.DeviceTransportType;
38 33 import org.thingsboard.server.common.data.StringUtils;
39 34 import org.thingsboard.server.common.data.TransportPayloadType;
40   -import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
41   -import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration;
42   -import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration;
43   -import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
44   -import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
45   -import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
46   -import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
47   -import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
48 35 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
49 36 import org.thingsboard.server.common.msg.session.FeatureType;
50 37 import org.thingsboard.server.common.msg.session.SessionMsgType;
51   -import org.thingsboard.server.common.transport.SessionMsgListener;
52 38 import org.thingsboard.server.common.transport.TransportServiceCallback;
53 39 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
54 40 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  41 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
55 42 import org.thingsboard.server.gen.transport.TransportProtos;
56   -import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
57 43 import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
58 44 import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
59 45 import org.thingsboard.server.transport.coap.callback.CoapOkCallback;
  46 +import org.thingsboard.server.transport.coap.callback.GetAttributesSyncSessionCallback;
  47 +import org.thingsboard.server.transport.coap.callback.ToServerRpcSyncSessionCallback;
  48 +import org.thingsboard.server.transport.coap.client.CoapClientContext;
  49 +import org.thingsboard.server.transport.coap.client.TbCoapClientState;
60 50
61 51 import java.util.List;
62   -import java.util.Map;
63 52 import java.util.Optional;
64 53 import java.util.Random;
65   -import java.util.Set;
66 54 import java.util.UUID;
67 55 import java.util.concurrent.ConcurrentHashMap;
68 56 import java.util.concurrent.ConcurrentMap;
69 57 import java.util.concurrent.TimeUnit;
70 58 import java.util.concurrent.atomic.AtomicInteger;
71   -import java.util.stream.Collectors;
72 59
73 60 @Slf4j
74 61 public class CoapTransportResource extends AbstractCoapTransportResource {
... ... @@ -80,13 +67,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
80 67 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4;
81 68 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID";
82 69
83   - private final ConcurrentMap<String, CoapObserveSessionInfo> tokenToCoapSessionInfoMap = new ConcurrentHashMap<>();
84   - private final ConcurrentMap<CoapObserveSessionInfo, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
85   - private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
86   - private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
  70 + private final ConcurrentMap<TbCoapClientState, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
87 71
88 72 private final ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap;
89 73 private final long timeout;
  74 + private final CoapClientContext clients;
90 75
91 76 public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) {
92 77 super(ctx, name);
... ... @@ -94,17 +79,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
94 79 this.addObserver(new CoapResourceObserver());
95 80 this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap();
96 81 this.timeout = coapServerService.getTimeout();
  82 + this.clients = ctx.getClientContext();
97 83 long sessionReportTimeout = ctx.getSessionReportTimeout();
98   - ctx.getScheduler().scheduleAtFixedRate(() -> {
99   - Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet();
100   - Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos
101   - .stream()
102   - .map(CoapObserveSessionInfo::getSessionInfoProto)
103   - .collect(Collectors.toSet());
104   - observeSessions.forEach(this::reportActivity);
105   - }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
  84 + ctx.getScheduler().scheduleAtFixedRate(clients::reportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
106 85 }
107 86
  87 + /*
  88 + * Overwritten method from CoapResource to be able to manage our own observe notification counters.
  89 + */
108 90 @Override
109 91 public void checkObserveRelation(Exchange exchange, Response response) {
110 92 String token = getTokenFromRequest(exchange.getRequest());
... ... @@ -117,20 +99,15 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
117 99 relation.setEstablished();
118 100 addObserveRelation(relation);
119 101 }
120   - AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter();
121   - response.getOptions().setObserve(observeNotificationCounter.getAndIncrement());
  102 + AtomicInteger state = clients.getNotificationCounterByToken(token);
  103 + if (state != null) {
  104 + response.getOptions().setObserve(state.getAndIncrement());
  105 + } else {
  106 + response.getOptions().removeObserve();
  107 + }
122 108 } // ObserveLayer takes care of the else case
123 109 }
124 110
125   - private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) {
126   - relation.cancel();
127   - relation.getExchange().sendResponse(new Response(code));
128   - }
129   -
130   - private Map<CoapObserveSessionInfo, ObserveRelation> getCoapSessionInfoToObserveRelationMap() {
131   - return sessionInfoToObserveRelationMap;
132   - }
133   -
134 111 @Override
135 112 protected void processHandleGet(CoapExchange exchange) {
136 113 Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
... ... @@ -232,7 +209,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
232 209 return dtlsSessionInfo;
233 210 });
234 211 if (tbCoapDtlsSessionInfo != null) {
235   - processRequest(exchange, type, request, tbCoapDtlsSessionInfo.getSessionInfoProto(), tbCoapDtlsSessionInfo.getDeviceProfile());
  212 + processRequest(exchange, type, request, tbCoapDtlsSessionInfo.getMsg(), tbCoapDtlsSessionInfo.getDeviceProfile());
236 213 } else {
237 214 processAccessTokenRequest(exchange, type, request);
238 215 }
... ... @@ -248,129 +225,133 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
248 225 return;
249 226 }
250 227 transportService.process(DeviceTransportType.COAP, TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),
251   - new CoapDeviceAuthCallback(transportContext, exchange, (sessionInfo, deviceProfile) -> {
252   - processRequest(exchange, type, request, sessionInfo, deviceProfile);
  228 + new CoapDeviceAuthCallback(exchange, (deviceCredentials, deviceProfile) -> {
  229 + processRequest(exchange, type, request, deviceCredentials, deviceProfile);
253 230 }));
254 231 }
255 232
256   - private void processRequest(CoapExchange exchange, SessionMsgType type, Request request, TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
257   - UUID sessionId = toSessionId(sessionInfo);
  233 + private void processRequest(CoapExchange exchange, SessionMsgType type, Request request, ValidateDeviceCredentialsResponse deviceCredentials, DeviceProfile deviceProfile) {
  234 + TbCoapClientState clientState = null;
258 235 try {
259   - TransportConfigurationContainer transportConfigurationContainer = getTransportConfigurationContainer(deviceProfile);
260   - CoapTransportAdaptor coapTransportAdaptor = getCoapTransportAdaptor(transportConfigurationContainer.isJsonPayload());
  236 + clientState = clients.getOrCreateClient(type, deviceCredentials, deviceProfile);
  237 + clientState.updateLastUplinkTime();
261 238 switch (type) {
262 239 case POST_ATTRIBUTES_REQUEST:
263   - transportService.process(sessionInfo,
264   - coapTransportAdaptor.convertToPostAttributes(sessionId, request,
265   - transportConfigurationContainer.getAttributesMsgDescriptor()),
266   - new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
267   - reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId));
  240 + handlePostAttributesRequest(clientState, exchange, request);
268 241 break;
269 242 case POST_TELEMETRY_REQUEST:
270   - transportService.process(sessionInfo,
271   - coapTransportAdaptor.convertToPostTelemetry(sessionId, request,
272   - transportConfigurationContainer.getTelemetryMsgDescriptor()),
273   - new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
274   - reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId));
  243 + handlePostTelemetryRequest(clientState, exchange, request);
275 244 break;
276 245 case CLAIM_REQUEST:
277   - transportService.process(sessionInfo,
278   - coapTransportAdaptor.convertToClaimDevice(sessionId, request, sessionInfo),
279   - new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  246 + handleClaimRequest(clientState, exchange, request);
280 247 break;
281 248 case SUBSCRIBE_ATTRIBUTES_REQUEST:
282   - CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
283   - if (currentCoapObserveAttrSessionInfo == null) {
284   - attributeSubscriptions.add(sessionId);
285   - registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(),
286   - sessionInfo, getTokenFromRequest(request));
287   - transportService.process(sessionInfo,
288   - TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange));
289   - transportService.process(sessionInfo,
290   - TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(),
291   - new CoapNoOpCallback(exchange));
292   - }
  249 + handleAttributeSubscribeRequest(clientState, exchange, request);
293 250 break;
294 251 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
295   - CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
296   - if (coapObserveAttrSessionInfo != null) {
297   - TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto();
298   - UUID attrSessionId = toSessionId(attrSession);
299   - attributeSubscriptions.remove(attrSessionId);
300   - transportService.process(attrSession,
301   - TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
302   - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
303   - }
304   - closeAndDeregister(sessionInfo);
  252 + handleAttributeUnsubscribeRequest(clientState, exchange, request);
305 253 break;
306 254 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
307   - CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
308   - if (currentCoapObserveRpcSessionInfo == null) {
309   - rpcSubscriptions.add(sessionId);
310   - registerAsyncCoapSession(exchange, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()
311   - , sessionInfo, getTokenFromRequest(request));
312   - transportService.process(sessionInfo,
313   - TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
314   - new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
315   - );
316   - }
  255 + handleRpcSubscribeRequest(clientState, exchange, request);
317 256 break;
318 257 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
319   - CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
320   - if (coapObserveRpcSessionInfo != null) {
321   - TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto();
322   - UUID rpcSessionId = toSessionId(rpcSession);
323   - rpcSubscriptions.remove(rpcSessionId);
324   - transportService.process(rpcSession,
325   - TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
326   - new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
327   - }
328   - closeAndDeregister(sessionInfo);
  258 + handleRpcUnsubscribeRequest(clientState, exchange, request);
329 259 break;
330 260 case TO_DEVICE_RPC_RESPONSE:
331   - transportService.process(sessionInfo,
332   - coapTransportAdaptor.convertToDeviceRpcResponse(sessionId, request, transportConfigurationContainer.getRpcResponseMsgDescriptor()),
333   - new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  261 + handleToDeviceRpcResponse(clientState, exchange, request);
334 262 break;
335 263 case TO_SERVER_RPC_REQUEST:
336   - transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
337   - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
338   - transportService.process(sessionInfo,
339   - coapTransportAdaptor.convertToServerRpcRequest(sessionId, request),
340   - new CoapNoOpCallback(exchange));
  264 + handleToServerRpcRequest(clientState, exchange, request);
341 265 break;
342 266 case GET_ATTRIBUTES_REQUEST:
343   - transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
344   - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
345   - transportService.process(sessionInfo,
346   - coapTransportAdaptor.convertToGetAttributes(sessionId, request),
347   - new CoapNoOpCallback(exchange));
  267 + handleGetAttributesRequest(clientState, exchange, request);
348 268 break;
349 269 }
350 270 } catch (AdaptorException e) {
351   - log.trace("[{}] Failed to decode message: ", sessionId, e);
  271 + if (clientState != null) {
  272 + log.trace("[{}] Failed to decode message: ", clientState.getDeviceId(), e);
  273 + }
352 274 exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
353 275 }
354 276 }
355 277
356   - private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) {
357   - return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
  278 + private void handlePostAttributesRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  279 + TransportProtos.SessionInfoProto sessionInfo = clients.getNewSyncSession(clientState);
  280 + UUID sessionId = toSessionId(sessionInfo);
  281 + transportService.process(sessionInfo, clientState.getAdaptor().convertToPostAttributes(sessionId, request,
  282 + clientState.getConfiguration().getAttributesMsgDescriptor()),
  283 + new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  284 + }
  285 +
  286 + private void handlePostTelemetryRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  287 + TransportProtos.SessionInfoProto sessionInfo = clients.getNewSyncSession(clientState);
  288 + UUID sessionId = toSessionId(sessionInfo);
  289 + transportService.process(sessionInfo, clientState.getAdaptor().convertToPostTelemetry(sessionId, request,
  290 + clientState.getConfiguration().getTelemetryMsgDescriptor()),
  291 + new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  292 + }
  293 +
  294 + private void handleClaimRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  295 + TransportProtos.SessionInfoProto sessionInfo = clients.getNewSyncSession(clientState);
  296 + UUID sessionId = toSessionId(sessionInfo);
  297 + transportService.process(sessionInfo,
  298 + clientState.getAdaptor().convertToClaimDevice(sessionId, request, sessionInfo),
  299 + new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  300 + }
  301 +
  302 + private void handleAttributeSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {
  303 + String attrSubToken = getTokenFromRequest(request);
  304 + if (!clients.registerAttributeObservation(clientState, attrSubToken, exchange)) {
  305 + log.warn("[{}] Received duplicate attribute subscribe request for token: {}", clientState.getDeviceId(), attrSubToken);
  306 + }
  307 + }
  308 +
  309 + private void handleAttributeUnsubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {
  310 + clients.deregisterAttributeObservation(clientState, getTokenFromRequest(request), exchange);
358 311 }
359 312
360   - private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) {
361   - return tokenToCoapSessionInfoMap.remove(token);
  313 + private void handleRpcUnsubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {
  314 + clients.deregisterRpcObservation(clientState, getTokenFromRequest(request), exchange);
362 315 }
363 316
364   - private void registerAsyncCoapSession(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor,
365   - DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo, String token) {
366   - tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo));
367   - transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
368   - transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  317 + private void handleToDeviceRpcResponse(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  318 + TransportProtos.SessionInfoProto session = clientState.getSession();
  319 + if (session == null) {
  320 + session = clients.getNewSyncSession(clientState);
  321 + }
  322 + UUID sessionId = toSessionId(session);
  323 + transportService.process(session,
  324 + clientState.getAdaptor().convertToDeviceRpcResponse(sessionId, request, clientState.getConfiguration().getRpcResponseMsgDescriptor()),
  325 + new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
369 326 }
370 327
371   - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor,
372   - DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
373   - return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo);
  328 + private void handleRpcSubscribeRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) {
  329 + String rpcSubToken = getTokenFromRequest(request);
  330 + if (!clients.registerRpcObservation(clientState, rpcSubToken, exchange)) {
  331 + log.warn("[{}] Received duplicate rpc subscribe request.", rpcSubToken);
  332 + }
  333 + }
  334 +
  335 + private void handleGetAttributesRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  336 + TransportProtos.SessionInfoProto sessionInfo = clients.getNewSyncSession(clientState);
  337 + UUID sessionId = toSessionId(sessionInfo);
  338 + transportService.registerSyncSession(sessionInfo, new GetAttributesSyncSessionCallback(clientState, exchange, request), timeout);
  339 + transportService.process(sessionInfo,
  340 + clientState.getAdaptor().convertToGetAttributes(sessionId, request),
  341 + new CoapNoOpCallback(exchange));
  342 + }
  343 +
  344 + private void handleToServerRpcRequest(TbCoapClientState clientState, CoapExchange exchange, Request request) throws AdaptorException {
  345 + TransportProtos.SessionInfoProto sessionInfo = clients.getNewSyncSession(clientState);
  346 + UUID sessionId = toSessionId(sessionInfo);
  347 + transportService.registerSyncSession(sessionInfo, new ToServerRpcSyncSessionCallback(clientState, exchange, request), timeout);
  348 + transportService.process(sessionInfo,
  349 + clientState.getAdaptor().convertToServerRpcRequest(sessionId, request),
  350 + new CoapNoOpCallback(exchange));
  351 + }
  352 +
  353 + private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) {
  354 + return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
374 355 }
375 356
376 357 private String getTokenFromRequest(Request request) {
... ... @@ -452,119 +433,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
452 433 }
453 434 }
454 435
455   - @RequiredArgsConstructor
456   - private class CoapSessionListener implements SessionMsgListener {
457   -
458   - private final CoapExchange exchange;
459   - private final CoapTransportAdaptor coapTransportAdaptor;
460   - private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
461   - private final TransportProtos.SessionInfoProto sessionInfo;
462   -
463   - @Override
464   - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
465   - try {
466   - exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
467   - } catch (AdaptorException e) {
468   - log.trace("Failed to reply due to error", e);
469   - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
470   - }
471   - }
472   -
473   - @Override
474   - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
475   - log.trace("[{}] Received attributes update notification to device", sessionId);
476   - try {
477   - exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
478   - } catch (AdaptorException e) {
479   - log.trace("Failed to reply due to error", e);
480   - closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
481   - closeAndDeregister();
482   - }
483   - }
484   -
485   - @Override
486   - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
487   - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
488   - closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE);
489   - closeAndDeregister();
490   - }
491   -
492   - @Override
493   - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
494   - log.trace("[{}] Received RPC command to device", sessionId);
495   - boolean sent = false;
496   - try {
497   - Response response = coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder);
498   - int requestId = getNextMsgId();
499   - response.setMID(requestId);
500   -
501   - if (msg.getPersisted() && isConRequest()) {
502   - transportContext.getRpcAwaitingAck().put(requestId, msg);
503   - transportContext.getScheduler().schedule(() -> {
504   - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
505   - if (awaitingAckMsg != null) {
506   - transportService.process(sessionInfo, msg, true, TransportServiceCallback.EMPTY);
507   - }
508   - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
509   - response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
510   - TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
511   - if (rpcRequestMsg != null) {
512   - transportService.process(sessionInfo, rpcRequestMsg, false, TransportServiceCallback.EMPTY);
513   - }
514   - }));
515   - }
516   - exchange.respond(response);
517   - sent = true;
518   - } catch (AdaptorException e) {
519   - log.trace("Failed to reply due to error", e);
520   - closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
521   - closeAndDeregister();
522   - } finally {
523   - if (msg.getPersisted() && !isConRequest()) {
524   - transportService.process(sessionInfo, msg, sent, TransportServiceCallback.EMPTY);
525   - }
526   - }
527   - }
528   -
529   - @Override
530   - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg msg) {
531   - try {
532   - exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
533   - } catch (AdaptorException e) {
534   - log.trace("Failed to reply due to error", e);
535   - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
536   - }
537   - }
538   -
539   - private boolean isConRequest() {
540   - return exchange.advanced().getRequest().isConfirmable();
541   - }
542   -
543   - private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) {
544   - Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = CoapTransportResource.this.getCoapSessionInfoToObserveRelationMap();
545   - if (CoapTransportResource.this.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
546   - Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> {
547   - TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto();
548   - UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB());
549   - return observeSessionId.equals(sessionId);
550   - }).findFirst();
551   - if (observeSessionToClose.isPresent()) {
552   - CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get();
553   - ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo);
554   - CoapTransportResource.this.clearAndNotifyObserveRelation(observeRelation, responseCode);
555   - }
556   - }
557   - }
558   -
559   - private void closeAndDeregister() {
560   - Request request = exchange.advanced().getRequest();
561   - String token = CoapTransportResource.this.getTokenFromRequest(request);
562   - CoapObserveSessionInfo deleted = CoapTransportResource.this.lookupAsyncSessionInfo(token);
563   - CoapTransportResource.this.closeAndDeregister(deleted.getSessionInfoProto());
564   - }
565   -
566   - }
567   -
568 436 public class CoapResourceObserver implements ResourceObserver {
569 437
570 438 @Override
... ... @@ -587,7 +455,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
587 455 public void addedObserveRelation(ObserveRelation relation) {
588 456 Request request = relation.getExchange().getRequest();
589 457 String token = getTokenFromRequest(request);
590   - sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation);
  458 + clients.registerObserveRelation(token, relation);
591 459 log.trace("Added Observe relation for token: {}", token);
592 460 }
593 461
... ... @@ -595,93 +463,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
595 463 public void removedObserveRelation(ObserveRelation relation) {
596 464 Request request = relation.getExchange().getRequest();
597 465 String token = getTokenFromRequest(request);
598   - sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token));
  466 + clients.deregisterObserveRelation(token);
599 467 log.trace("Relation removed for token: {}", token);
600 468 }
601 469 }
602 470
603   - private void closeAndDeregister(TransportProtos.SessionInfoProto session) {
604   - UUID sessionId = toSessionId(session);
605   - transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
606   - transportService.deregisterSession(session);
607   - rpcSubscriptions.remove(sessionId);
608   - attributeSubscriptions.remove(sessionId);
609   - }
610   -
611   - private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException {
612   - DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
613   - if (transportConfiguration instanceof DefaultDeviceProfileTransportConfiguration) {
614   - return new TransportConfigurationContainer(true);
615   - } else if (transportConfiguration instanceof CoapDeviceProfileTransportConfiguration) {
616   - CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration =
617   - (CoapDeviceProfileTransportConfiguration) transportConfiguration;
618   - CoapDeviceTypeConfiguration coapDeviceTypeConfiguration =
619   - coapDeviceProfileTransportConfiguration.getCoapDeviceTypeConfiguration();
620   - if (coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration) {
621   - DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration =
622   - (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
623   - TransportPayloadTypeConfiguration transportPayloadTypeConfiguration =
624   - defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
625   - if (transportPayloadTypeConfiguration instanceof JsonTransportPayloadConfiguration) {
626   - return new TransportConfigurationContainer(true);
627   - } else {
628   - ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration =
629   - (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
630   - String deviceTelemetryProtoSchema = protoTransportPayloadConfiguration.getDeviceTelemetryProtoSchema();
631   - String deviceAttributesProtoSchema = protoTransportPayloadConfiguration.getDeviceAttributesProtoSchema();
632   - String deviceRpcRequestProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcRequestProtoSchema();
633   - String deviceRpcResponseProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema();
634   - return new TransportConfigurationContainer(false,
635   - protoTransportPayloadConfiguration.getTelemetryDynamicMessageDescriptor(deviceTelemetryProtoSchema),
636   - protoTransportPayloadConfiguration.getAttributesDynamicMessageDescriptor(deviceAttributesProtoSchema),
637   - protoTransportPayloadConfiguration.getRpcResponseDynamicMessageDescriptor(deviceRpcResponseProtoSchema),
638   - protoTransportPayloadConfiguration.getRpcRequestDynamicMessageBuilder(deviceRpcRequestProtoSchema)
639   - );
640   - }
641   - } else {
642   - throw new AdaptorException("Invalid CoapDeviceTypeConfiguration type: " + coapDeviceTypeConfiguration.getClass().getSimpleName() + "!");
643   - }
644   - } else {
645   - throw new AdaptorException("Invalid DeviceProfileTransportConfiguration type" + transportConfiguration.getClass().getSimpleName() + "!");
646   - }
647   - }
648   -
649   - private CoapTransportAdaptor getCoapTransportAdaptor(boolean jsonPayloadType) {
650   - return jsonPayloadType ? transportContext.getJsonCoapAdaptor() : transportContext.getProtoCoapAdaptor();
651   - }
652   -
653   - @Data
654   - private static class TransportConfigurationContainer {
655   -
656   - private boolean jsonPayload;
657   - private Descriptors.Descriptor telemetryMsgDescriptor;
658   - private Descriptors.Descriptor attributesMsgDescriptor;
659   - private Descriptors.Descriptor rpcResponseMsgDescriptor;
660   - private DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
661   -
662   - public TransportConfigurationContainer(boolean jsonPayload, Descriptors.Descriptor telemetryMsgDescriptor, Descriptors.Descriptor attributesMsgDescriptor, Descriptors.Descriptor rpcResponseMsgDescriptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
663   - this.jsonPayload = jsonPayload;
664   - this.telemetryMsgDescriptor = telemetryMsgDescriptor;
665   - this.attributesMsgDescriptor = attributesMsgDescriptor;
666   - this.rpcResponseMsgDescriptor = rpcResponseMsgDescriptor;
667   - this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
668   - }
669   -
670   - public TransportConfigurationContainer(boolean jsonPayload) {
671   - this.jsonPayload = jsonPayload;
672   - }
673   - }
674   -
675   - @Data
676   - private static class CoapObserveSessionInfo {
677   -
678   - private final TransportProtos.SessionInfoProto sessionInfoProto;
679   - private final AtomicInteger observeNotificationCounter;
680   -
681   - private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) {
682   - this.sessionInfoProto = sessionInfoProto;
683   - this.observeNotificationCounter = new AtomicInteger(0);
684   - }
685   - }
686 471
687 472 }
... ...
... ... @@ -24,9 +24,13 @@ import org.eclipse.californium.core.server.resources.CoapExchange;
24 24 import org.eclipse.californium.core.server.resources.Resource;
25 25 import org.thingsboard.server.common.data.DeviceTransportType;
26 26 import org.thingsboard.server.common.data.StringUtils;
  27 +import org.thingsboard.server.common.data.id.DeviceId;
  28 +import org.thingsboard.server.common.data.id.TenantId;
27 29 import org.thingsboard.server.common.data.ota.OtaPackageType;
28 30 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
29 31 import org.thingsboard.server.common.transport.TransportServiceCallback;
  32 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  33 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
30 34 import org.thingsboard.server.gen.transport.TransportProtos;
31 35 import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback;
32 36
... ... @@ -68,19 +72,21 @@ public class OtaPackageTransportResource extends AbstractCoapTransportResource {
68 72 return;
69 73 }
70 74 transportService.process(DeviceTransportType.COAP, TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),
71   - new CoapDeviceAuthCallback(transportContext, exchange, (sessionInfo, deviceProfile) -> {
72   - getOtaPackageCallback(sessionInfo, exchange, otaPackageType);
  75 + new CoapDeviceAuthCallback(exchange, (msg, deviceProfile) -> {
  76 + getOtaPackageCallback(msg, exchange, otaPackageType);
73 77 }));
74 78 }
75 79
76   - private void getOtaPackageCallback(TransportProtos.SessionInfoProto sessionInfo, CoapExchange exchange, OtaPackageType firmwareType) {
  80 + private void getOtaPackageCallback(ValidateDeviceCredentialsResponse msg, CoapExchange exchange, OtaPackageType firmwareType) {
  81 + TenantId tenantId = msg.getDeviceInfo().getTenantId();
  82 + DeviceId deviceId = msg.getDeviceInfo().getDeviceId();
77 83 TransportProtos.GetOtaPackageRequestMsg requestMsg = TransportProtos.GetOtaPackageRequestMsg.newBuilder()
78   - .setTenantIdMSB(sessionInfo.getTenantIdMSB())
79   - .setTenantIdLSB(sessionInfo.getTenantIdLSB())
80   - .setDeviceIdMSB(sessionInfo.getDeviceIdMSB())
81   - .setDeviceIdLSB(sessionInfo.getDeviceIdLSB())
  84 + .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
  85 + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
  86 + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
  87 + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
82 88 .setType(firmwareType.name()).build();
83   - transportContext.getTransportService().process(sessionInfo, requestMsg, new OtaPackageCallback(exchange));
  89 + transportContext.getTransportService().process(SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()), requestMsg, new OtaPackageCallback(exchange));
84 90 }
85 91
86 92 private Optional<DeviceTokenCredentials> decodeCredentials(Request request) {
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap;
  17 +
  18 +import com.google.protobuf.Descriptors;
  19 +import com.google.protobuf.DynamicMessage;
  20 +import lombok.Data;
  21 +
  22 +@Data
  23 +public class TransportConfigurationContainer {
  24 +
  25 + private boolean jsonPayload;
  26 + private Descriptors.Descriptor telemetryMsgDescriptor;
  27 + private Descriptors.Descriptor attributesMsgDescriptor;
  28 + private Descriptors.Descriptor rpcResponseMsgDescriptor;
  29 + private DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
  30 +
  31 + public TransportConfigurationContainer(boolean jsonPayload, Descriptors.Descriptor telemetryMsgDescriptor, Descriptors.Descriptor attributesMsgDescriptor, Descriptors.Descriptor rpcResponseMsgDescriptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
  32 + this.jsonPayload = jsonPayload;
  33 + this.telemetryMsgDescriptor = telemetryMsgDescriptor;
  34 + this.attributesMsgDescriptor = attributesMsgDescriptor;
  35 + this.rpcResponseMsgDescriptor = rpcResponseMsgDescriptor;
  36 + this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
  37 + }
  38 +
  39 + public TransportConfigurationContainer(boolean jsonPayload) {
  40 + this.jsonPayload = jsonPayload;
  41 + }
  42 +}
... ...
... ... @@ -107,7 +107,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
107 107 Response response = new Response(CoAP.ResponseCode.CONTENT);
108 108 JsonElement result = JsonConverter.toJson(msg);
109 109 response.setPayload(result.toString());
110   - response.setAcknowledged(isConfirmable);
  110 + response.setConfirmable(isConfirmable);
111 111 return response;
112 112 }
113 113
... ... @@ -125,8 +125,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
125 125 public Response convertToPublish(boolean isConfirmable, TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException {
126 126 if (msg.getSharedStateMsg()) {
127 127 if (StringUtils.isEmpty(msg.getError())) {
128   - Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
129   - response.setAcknowledged(isConfirmable);
  128 + Response response = new Response(CoAP.ResponseCode.CONTENT);
  129 + response.setConfirmable(isConfirmable);
130 130 TransportProtos.AttributeUpdateNotificationMsg notificationMsg = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(msg.getSharedAttributeListList()).build();
131 131 JsonObject result = JsonConverter.toJson(notificationMsg);
132 132 response.setPayload(result.toString());
... ... @@ -139,7 +139,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
139 139 return new Response(CoAP.ResponseCode.NOT_FOUND);
140 140 } else {
141 141 Response response = new Response(CoAP.ResponseCode.CONTENT);
142   - response.setAcknowledged(isConfirmable);
  142 + response.setConfirmable(isConfirmable);
143 143 JsonObject result = JsonConverter.toJson(msg);
144 144 response.setPayload(result.toString());
145 145 return response;
... ... @@ -148,7 +148,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
148 148 }
149 149
150 150 private Response getObserveNotification(boolean confirmable, JsonElement json) {
151   - Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  151 + Response response = new Response(CoAP.ResponseCode.CONTENT);
152 152 response.setPayload(json.toString());
153 153 response.setConfirmable(confirmable);
154 154 return response;
... ...
... ... @@ -130,8 +130,8 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
130 130 public Response convertToPublish(boolean isConfirmable, TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException {
131 131 if (msg.getSharedStateMsg()) {
132 132 if (StringUtils.isEmpty(msg.getError())) {
133   - Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
134   - response.setAcknowledged(isConfirmable);
  133 + Response response = new Response(CoAP.ResponseCode.CONTENT);
  134 + response.setConfirmable(isConfirmable);
135 135 TransportProtos.AttributeUpdateNotificationMsg notificationMsg = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(msg.getSharedAttributeListList()).build();
136 136 response.setPayload(notificationMsg.toByteArray());
137 137 return response;
... ... @@ -143,7 +143,7 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
143 143 return new Response(CoAP.ResponseCode.NOT_FOUND);
144 144 } else {
145 145 Response response = new Response(CoAP.ResponseCode.CONTENT);
146   - response.setAcknowledged(isConfirmable);
  146 + response.setConfirmable(isConfirmable);
147 147 response.setPayload(msg.toByteArray());
148 148 return response;
149 149 }
... ... @@ -151,9 +151,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
151 151 }
152 152
153 153 private Response getObserveNotification(boolean confirmable, byte[] notification) {
154   - Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE);
  154 + Response response = new Response(CoAP.ResponseCode.CONTENT);
155 155 response.setPayload(notification);
156   - response.setAcknowledged(confirmable);
  156 + response.setConfirmable(confirmable);
157 157 return response;
158 158 }
159 159
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import lombok.RequiredArgsConstructor;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.eclipse.californium.core.coap.Request;
  21 +import org.eclipse.californium.core.server.resources.CoapExchange;
  22 +import org.thingsboard.server.common.transport.SessionMsgListener;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
  24 +import org.thingsboard.server.transport.coap.client.TbCoapClientState;
  25 +import org.thingsboard.server.transport.coap.client.TbCoapObservationState;
  26 +
  27 +import java.util.UUID;
  28 +
  29 +@RequiredArgsConstructor
  30 +@Slf4j
  31 +public abstract class AbstractSyncSessionCallback implements SessionMsgListener {
  32 +
  33 + protected final TbCoapClientState state;
  34 + protected final CoapExchange exchange;
  35 + protected final Request request;
  36 +
  37 + @Override
  38 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg getAttributesResponse) {
  39 + logUnsupportedCommandMessage(getAttributesResponse);
  40 + }
  41 +
  42 + @Override
  43 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotification) {
  44 + logUnsupportedCommandMessage(attributeUpdateNotification);
  45 + }
  46 +
  47 + @Override
  48 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  49 +
  50 + }
  51 +
  52 + @Override
  53 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) {
  54 + logUnsupportedCommandMessage(toDeviceRequest);
  55 + }
  56 +
  57 + @Override
  58 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
  59 + logUnsupportedCommandMessage(toServerResponse);
  60 + }
  61 +
  62 + private void logUnsupportedCommandMessage(Object update) {
  63 + log.trace("[{}] Ignore unsupported update: {}", state.getDeviceId(), update);
  64 + }
  65 +
  66 + public static boolean isConRequest(TbCoapObservationState state) {
  67 + if (state != null) {
  68 + return state.getExchange().advanced().getRequest().isConfirmable();
  69 + } else {
  70 + return false;
  71 + }
  72 + }
  73 +
  74 +}
... ...
... ... @@ -31,12 +31,10 @@ import java.util.function.BiConsumer;
31 31
32 32 @Slf4j
33 33 public class CoapDeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {
34   - private final TransportContext transportContext;
35 34 private final CoapExchange exchange;
36   - private final BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess;
  35 + private final BiConsumer<ValidateDeviceCredentialsResponse, DeviceProfile> onSuccess;
37 36
38   - public CoapDeviceAuthCallback(TransportContext transportContext, CoapExchange exchange, BiConsumer<TransportProtos.SessionInfoProto, DeviceProfile> onSuccess) {
39   - this.transportContext = transportContext;
  37 + public CoapDeviceAuthCallback(CoapExchange exchange, BiConsumer<ValidateDeviceCredentialsResponse, DeviceProfile> onSuccess) {
40 38 this.exchange = exchange;
41 39 this.onSuccess = onSuccess;
42 40 }
... ... @@ -45,8 +43,7 @@ public class CoapDeviceAuthCallback implements TransportServiceCallback<Validate
45 43 public void onSuccess(ValidateDeviceCredentialsResponse msg) {
46 44 DeviceProfile deviceProfile = msg.getDeviceProfile();
47 45 if (msg.hasDeviceInfo() && deviceProfile != null) {
48   - TransportProtos.SessionInfoProto sessionInfoProto = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID());
49   - onSuccess.accept(sessionInfoProto, deviceProfile);
  46 + onSuccess.accept(msg, deviceProfile);
50 47 } else {
51 48 exchange.respond(CoAP.ResponseCode.UNAUTHORIZED);
52 49 }
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.eclipse.californium.core.coap.CoAP;
  20 +import org.eclipse.californium.core.coap.Request;
  21 +import org.eclipse.californium.core.coap.Response;
  22 +import org.eclipse.californium.core.server.resources.CoapExchange;
  23 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  24 +import org.thingsboard.server.gen.transport.TransportProtos;
  25 +import org.thingsboard.server.transport.coap.client.TbCoapClientState;
  26 +import org.thingsboard.server.transport.coap.client.TbCoapObservationState;
  27 +
  28 +@Slf4j
  29 +public class GetAttributesSyncSessionCallback extends AbstractSyncSessionCallback {
  30 +
  31 + public GetAttributesSyncSessionCallback(TbCoapClientState state, CoapExchange exchange, Request request) {
  32 + super(state, exchange, request);
  33 + }
  34 +
  35 + @Override
  36 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
  37 + try {
  38 + exchange.respond(state.getAdaptor().convertToPublish(AbstractSyncSessionCallback.isConRequest(state.getAttrs()), msg));
  39 + } catch (AdaptorException e) {
  40 + log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
  41 + exchange.respond(new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  42 + }
  43 + }
  44 +
  45 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.callback;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.eclipse.californium.core.coap.CoAP;
  20 +import org.eclipse.californium.core.coap.Request;
  21 +import org.eclipse.californium.core.server.resources.CoapExchange;
  22 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
  24 +import org.thingsboard.server.transport.coap.client.TbCoapClientState;
  25 +
  26 +import java.util.UUID;
  27 +
  28 +@Slf4j
  29 +public class ToServerRpcSyncSessionCallback extends AbstractSyncSessionCallback {
  30 +
  31 + public ToServerRpcSyncSessionCallback(TbCoapClientState state, CoapExchange exchange, Request request) {
  32 + super(state, exchange, request);
  33 + }
  34 +
  35 + @Override
  36 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
  37 + try {
  38 + exchange.respond(state.getAdaptor().convertToPublish(isConRequest(state.getRpc()), toServerResponse));
  39 + } catch (AdaptorException e) {
  40 + log.trace("Failed to reply due to error", e);
  41 + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  42 + }
  43 + }
  44 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.client;
  17 +
  18 +import org.eclipse.californium.core.observe.ObserveRelation;
  19 +import org.eclipse.californium.core.server.resources.CoapExchange;
  20 +import org.thingsboard.server.common.data.DeviceProfile;
  21 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  22 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  23 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  24 +import org.thingsboard.server.gen.transport.TransportProtos;
  25 +
  26 +import java.util.concurrent.atomic.AtomicInteger;
  27 +
  28 +public interface CoapClientContext {
  29 +
  30 + boolean registerAttributeObservation(TbCoapClientState clientState, String token, CoapExchange exchange);
  31 +
  32 + boolean registerRpcObservation(TbCoapClientState clientState, String token, CoapExchange exchange);
  33 +
  34 + void onUplink(TransportProtos.SessionInfoProto sessionInfo);
  35 +
  36 + AtomicInteger getNotificationCounterByToken(String token);
  37 +
  38 + TbCoapClientState getOrCreateClient(SessionMsgType type, ValidateDeviceCredentialsResponse deviceCredentials, DeviceProfile deviceProfile) throws AdaptorException;
  39 +
  40 + TransportProtos.SessionInfoProto getNewSyncSession(TbCoapClientState clientState);
  41 +
  42 + void deregisterAttributeObservation(TbCoapClientState clientState, String token, CoapExchange exchange);
  43 +
  44 + void deregisterRpcObservation(TbCoapClientState clientState, String token, CoapExchange exchange);
  45 +
  46 + void reportActivity();
  47 +
  48 + void registerObserveRelation(String token, ObserveRelation relation);
  49 +
  50 + void deregisterObserveRelation(String token);
  51 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.client;
  17 +
  18 +import lombok.RequiredArgsConstructor;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.eclipse.californium.core.coap.CoAP;
  21 +import org.eclipse.californium.core.coap.Response;
  22 +import org.eclipse.californium.core.observe.ObserveRelation;
  23 +import org.eclipse.californium.core.server.resources.CoapExchange;
  24 +import org.springframework.stereotype.Service;
  25 +import org.thingsboard.server.coapserver.TbCoapServerComponent;
  26 +import org.thingsboard.server.common.data.DeviceProfile;
  27 +import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
  28 +import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration;
  29 +import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration;
  30 +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
  31 +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
  32 +import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
  33 +import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
  34 +import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
  35 +import org.thingsboard.server.common.data.id.DeviceId;
  36 +import org.thingsboard.server.common.msg.session.FeatureType;
  37 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  38 +import org.thingsboard.server.common.transport.SessionMsgListener;
  39 +import org.thingsboard.server.common.transport.TransportService;
  40 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  41 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  42 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  43 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  44 +import org.thingsboard.server.gen.transport.TransportProtos;
  45 +import org.thingsboard.server.transport.coap.CoapTransportContext;
  46 +import org.thingsboard.server.transport.coap.TbCoapMessageObserver;
  47 +import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
  48 +import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
  49 +import org.thingsboard.server.transport.coap.callback.AbstractSyncSessionCallback;
  50 +import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
  51 +import org.thingsboard.server.transport.coap.callback.CoapOkCallback;
  52 +
  53 +import java.util.UUID;
  54 +import java.util.concurrent.ConcurrentHashMap;
  55 +import java.util.concurrent.ConcurrentMap;
  56 +import java.util.concurrent.ThreadLocalRandom;
  57 +import java.util.concurrent.TimeUnit;
  58 +import java.util.concurrent.atomic.AtomicInteger;
  59 +
  60 +import static org.eclipse.californium.core.coap.Message.MAX_MID;
  61 +import static org.eclipse.californium.core.coap.Message.NONE;
  62 +
  63 +@Slf4j
  64 +@Service
  65 +@RequiredArgsConstructor
  66 +@TbCoapServerComponent
  67 +public class DefaultCoapClientContext implements CoapClientContext {
  68 +
  69 + private final CoapTransportContext transportContext;
  70 + private final TransportService transportService;
  71 + private final ConcurrentMap<DeviceId, TbCoapClientState> clients = new ConcurrentHashMap<>();
  72 + private final ConcurrentMap<String, TbCoapClientState> clientsByToken = new ConcurrentHashMap<>();
  73 +
  74 + @Override
  75 + public boolean registerAttributeObservation(TbCoapClientState clientState, String token, CoapExchange exchange) {
  76 + return registerFeatureObservation(clientState, token, exchange, FeatureType.ATTRIBUTES);
  77 + }
  78 +
  79 + @Override
  80 + public boolean registerRpcObservation(TbCoapClientState clientState, String token, CoapExchange exchange) {
  81 + return registerFeatureObservation(clientState, token, exchange, FeatureType.RPC);
  82 + }
  83 +
  84 + @Override
  85 + public void onUplink(TransportProtos.SessionInfoProto sessionInfo) {
  86 + getClientState(toDeviceId(sessionInfo)).updateLastUplinkTime();
  87 + }
  88 +
  89 + @Override
  90 + public AtomicInteger getNotificationCounterByToken(String token) {
  91 + TbCoapClientState state = clientsByToken.get(token);
  92 + if (state == null) {
  93 + log.trace("Failed to find state using token: {}", token);
  94 + return null;
  95 + }
  96 + if (state.getAttrs() != null && state.getAttrs().getToken().equals(token)) {
  97 + return state.getAttrs().getObserveCounter();
  98 + } else {
  99 + log.trace("Failed to find attr subscription using token: {}", token);
  100 + }
  101 + if (state.getRpc() != null && state.getRpc().getToken().equals(token)) {
  102 + return state.getRpc().getObserveCounter();
  103 + } else {
  104 + log.trace("Failed to find rpc subscription using token: {}", token);
  105 + }
  106 + return null;
  107 + }
  108 +
  109 + @Override
  110 + public void registerObserveRelation(String token, ObserveRelation relation) {
  111 + TbCoapClientState state = clientsByToken.get(token);
  112 + if (state == null) {
  113 + log.trace("Failed to find state using token: {}", token);
  114 + return;
  115 + }
  116 + if (state.getAttrs() != null && state.getAttrs().getToken().equals(token)) {
  117 + state.getAttrs().setObserveRelation(relation);
  118 + } else {
  119 + log.trace("Failed to find attr subscription using token: {}", token);
  120 + }
  121 + if (state.getRpc() != null && state.getRpc().getToken().equals(token)) {
  122 + state.getRpc().setObserveRelation(relation);
  123 + } else {
  124 + log.trace("Failed to find rpc subscription using token: {}", token);
  125 + }
  126 + }
  127 +
  128 + @Override
  129 + public void deregisterObserveRelation(String token) {
  130 + TbCoapClientState state = clientsByToken.remove(token);
  131 + if (state == null) {
  132 + log.trace("Failed to find state using token: {}", token);
  133 + return;
  134 + }
  135 + if (state.getAttrs() != null && state.getAttrs().getToken().equals(token)) {
  136 + cancelAttributeSubscription(state);
  137 + } else {
  138 + log.trace("Failed to find attr subscription using token: {}", token);
  139 + }
  140 + if (state.getRpc() != null && state.getRpc().getToken().equals(token)) {
  141 + cancelRpcSubscription(state);
  142 + } else {
  143 + log.trace("Failed to find rpc subscription using token: {}", token);
  144 + }
  145 + }
  146 +
  147 + @Override
  148 + public void reportActivity() {
  149 + for (TbCoapClientState state : clients.values()) {
  150 + if (state.getSession() != null) {
  151 + transportService.reportActivity(state.getSession());
  152 + }
  153 + }
  154 + }
  155 +
  156 + private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) {
  157 + state.lock();
  158 + try {
  159 + boolean newObservation;
  160 + if (FeatureType.ATTRIBUTES.equals(featureType)) {
  161 + if (state.getAttrs() == null) {
  162 + newObservation = true;
  163 + state.setAttrs(new TbCoapObservationState(exchange, token));
  164 + } else {
  165 + newObservation = !state.getAttrs().getToken().equals(token);
  166 + if (newObservation) {
  167 + TbCoapObservationState old = state.getAttrs();
  168 + state.setAttrs(new TbCoapObservationState(exchange, token));
  169 + old.getExchange().respond(CoAP.ResponseCode.DELETED);
  170 + }
  171 + }
  172 + } else {
  173 + if (state.getRpc() == null) {
  174 + newObservation = true;
  175 + state.setRpc(new TbCoapObservationState(exchange, token));
  176 + } else {
  177 + newObservation = !state.getRpc().getToken().equals(token);
  178 + if (newObservation) {
  179 + TbCoapObservationState old = state.getRpc();
  180 + state.setRpc(new TbCoapObservationState(exchange, token));
  181 + old.getExchange().respond(CoAP.ResponseCode.DELETED);
  182 + }
  183 + }
  184 + }
  185 + if (newObservation) {
  186 + clientsByToken.put(token, state);
  187 + if (state.getSession() == null) {
  188 + TransportProtos.SessionInfoProto session = SessionInfoCreator.create(state.getCredentials(), transportContext, UUID.randomUUID());
  189 + state.setSession(session);
  190 + transportService.registerAsyncSession(session, new CoapSessionListener(state));
  191 + transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  192 + }
  193 + if (FeatureType.ATTRIBUTES.equals(featureType)) {
  194 + transportService.process(state.getSession(),
  195 + TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange));
  196 + transportService.process(state.getSession(),
  197 + TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(),
  198 + new CoapNoOpCallback(exchange));
  199 + } else {
  200 + transportService.process(state.getSession(),
  201 + TransportProtos.SubscribeToRPCMsg.getDefaultInstance(),
  202 + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)
  203 + );
  204 + }
  205 + }
  206 + return newObservation;
  207 + } finally {
  208 + state.unlock();
  209 + }
  210 + }
  211 +
  212 + @Override
  213 + public void deregisterAttributeObservation(TbCoapClientState state, String token, CoapExchange exchange) {
  214 + state.lock();
  215 + try {
  216 + clientsByToken.remove(token);
  217 + if (state.getSession() == null) {
  218 + log.trace("[{}] Failed to delete attribute observation: {}. Session is not present.", state.getDeviceId(), token);
  219 + return;
  220 + }
  221 + if (state.getAttrs() == null) {
  222 + log.trace("[{}] Failed to delete attribute observation: {}. It is not registered.", state.getDeviceId(), token);
  223 + return;
  224 + }
  225 + if (!state.getAttrs().getToken().equals(token)) {
  226 + log.trace("[{}] Failed to delete attribute observation: {}. Token mismatch.", state.getDeviceId(), token);
  227 + return;
  228 + }
  229 + cancelAttributeSubscription(state);
  230 + } finally {
  231 + state.unlock();
  232 + }
  233 + }
  234 +
  235 + @Override
  236 + public void deregisterRpcObservation(TbCoapClientState state, String token, CoapExchange exchange) {
  237 + state.lock();
  238 + try {
  239 + clientsByToken.remove(token);
  240 + if (state.getSession() == null) {
  241 + log.trace("[{}] Failed to delete rpc observation: {}. Session is not present.", state.getDeviceId(), token);
  242 + return;
  243 + }
  244 + if (state.getRpc() == null) {
  245 + log.trace("[{}] Failed to delete rpc observation: {}. It is not registered.", state.getDeviceId(), token);
  246 + return;
  247 + }
  248 + if (!state.getRpc().getToken().equals(token)) {
  249 + log.trace("[{}] Failed to delete rpc observation: {}. Token mismatch.", state.getDeviceId(), token);
  250 + return;
  251 + }
  252 + cancelRpcSubscription(state);
  253 + } finally {
  254 + state.unlock();
  255 + }
  256 + }
  257 +
  258 + @Override
  259 + public TbCoapClientState getOrCreateClient(SessionMsgType type, ValidateDeviceCredentialsResponse deviceCredentials, DeviceProfile deviceProfile) throws AdaptorException {
  260 + DeviceId deviceId = deviceCredentials.getDeviceInfo().getDeviceId();
  261 + TbCoapClientState state = getClientState(deviceId);
  262 + state.lock();
  263 + try {
  264 + if (state.getConfiguration() == null || state.getAdaptor() == null) {
  265 + state.setConfiguration(getTransportConfigurationContainer(deviceProfile));
  266 + state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload()));
  267 + }
  268 + if (state.getCredentials() == null) {
  269 + state.setCredentials(deviceCredentials);
  270 + }
  271 + } finally {
  272 + state.unlock();
  273 + }
  274 + return state;
  275 + }
  276 +
  277 + @Override
  278 + public TransportProtos.SessionInfoProto getNewSyncSession(TbCoapClientState state) {
  279 + return SessionInfoCreator.create(state.getCredentials(), transportContext, UUID.randomUUID());
  280 + }
  281 +
  282 + private TbCoapClientState getClientState(DeviceId deviceId) {
  283 + return clients.computeIfAbsent(deviceId, TbCoapClientState::new);
  284 + }
  285 +
  286 + private static DeviceId toDeviceId(TransportProtos.SessionInfoProto s) {
  287 + return new DeviceId(new UUID(s.getDeviceIdMSB(), s.getDeviceIdLSB()));
  288 + }
  289 +
  290 + private static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
  291 + return TransportProtos.SessionEventMsg.newBuilder()
  292 + .setSessionType(TransportProtos.SessionType.ASYNC)
  293 + .setEvent(event).build();
  294 + }
  295 +
  296 + private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException {
  297 + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
  298 + if (transportConfiguration instanceof DefaultDeviceProfileTransportConfiguration) {
  299 + return new TransportConfigurationContainer(true);
  300 + } else if (transportConfiguration instanceof CoapDeviceProfileTransportConfiguration) {
  301 + CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration =
  302 + (CoapDeviceProfileTransportConfiguration) transportConfiguration;
  303 + CoapDeviceTypeConfiguration coapDeviceTypeConfiguration =
  304 + coapDeviceProfileTransportConfiguration.getCoapDeviceTypeConfiguration();
  305 + if (coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration) {
  306 + DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration =
  307 + (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
  308 + TransportPayloadTypeConfiguration transportPayloadTypeConfiguration =
  309 + defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
  310 + if (transportPayloadTypeConfiguration instanceof JsonTransportPayloadConfiguration) {
  311 + return new TransportConfigurationContainer(true);
  312 + } else {
  313 + ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration =
  314 + (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
  315 + String deviceTelemetryProtoSchema = protoTransportPayloadConfiguration.getDeviceTelemetryProtoSchema();
  316 + String deviceAttributesProtoSchema = protoTransportPayloadConfiguration.getDeviceAttributesProtoSchema();
  317 + String deviceRpcRequestProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcRequestProtoSchema();
  318 + String deviceRpcResponseProtoSchema = protoTransportPayloadConfiguration.getDeviceRpcResponseProtoSchema();
  319 + return new TransportConfigurationContainer(false,
  320 + protoTransportPayloadConfiguration.getTelemetryDynamicMessageDescriptor(deviceTelemetryProtoSchema),
  321 + protoTransportPayloadConfiguration.getAttributesDynamicMessageDescriptor(deviceAttributesProtoSchema),
  322 + protoTransportPayloadConfiguration.getRpcResponseDynamicMessageDescriptor(deviceRpcResponseProtoSchema),
  323 + protoTransportPayloadConfiguration.getRpcRequestDynamicMessageBuilder(deviceRpcRequestProtoSchema)
  324 + );
  325 + }
  326 + } else {
  327 + throw new AdaptorException("Invalid CoapDeviceTypeConfiguration type: " + coapDeviceTypeConfiguration.getClass().getSimpleName() + "!");
  328 + }
  329 + } else {
  330 + throw new AdaptorException("Invalid DeviceProfileTransportConfiguration type" + transportConfiguration.getClass().getSimpleName() + "!");
  331 + }
  332 + }
  333 +
  334 + private CoapTransportAdaptor getCoapTransportAdaptor(boolean jsonPayloadType) {
  335 + return jsonPayloadType ? transportContext.getJsonCoapAdaptor() : transportContext.getProtoCoapAdaptor();
  336 + }
  337 +
  338 + @RequiredArgsConstructor
  339 + private class CoapSessionListener implements SessionMsgListener {
  340 +
  341 + private final TbCoapClientState state;
  342 +
  343 + @Override
  344 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) {
  345 + TbCoapObservationState attrs = state.getAttrs();
  346 + if (attrs != null) {
  347 + try {
  348 + boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
  349 + Response response = state.getAdaptor().convertToPublish(conRequest, msg);
  350 + attrs.getExchange().respond(response);
  351 + } catch (AdaptorException e) {
  352 + log.trace("Failed to reply due to error", e);
  353 + cancelObserveRelation(attrs);
  354 + cancelAttributeSubscription(state);
  355 + }
  356 + } else {
  357 + log.debug("[{}] Get Attrs exchange is empty", state.getDeviceId());
  358 + }
  359 + }
  360 +
  361 + @Override
  362 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
  363 + log.trace("[{}] Received attributes update notification to device", sessionId);
  364 + TbCoapObservationState attrs = state.getAttrs();
  365 + if (attrs != null) {
  366 + try {
  367 + boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
  368 + Response response = state.getAdaptor().convertToPublish(conRequest, msg);
  369 + attrs.getExchange().respond(response);
  370 + } catch (AdaptorException e) {
  371 + log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
  372 + cancelObserveRelation(attrs);
  373 + cancelAttributeSubscription(state);
  374 + }
  375 + } else {
  376 + log.debug("[{}] Get Attrs exchange is empty", state.getDeviceId());
  377 + }
  378 + }
  379 +
  380 + @Override
  381 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  382 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  383 + cancelRpcSubscription(state);
  384 + cancelAttributeSubscription(state);
  385 + }
  386 +
  387 + @Override
  388 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
  389 + log.trace("[{}] Received RPC command to device", sessionId);
  390 + boolean sent = false;
  391 + boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc());
  392 + try {
  393 + Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
  394 + int requestId = getNextMsgId();
  395 + response.setMID(requestId);
  396 + if (msg.getPersisted() && conRequest) {
  397 + transportContext.getRpcAwaitingAck().put(requestId, msg);
  398 + transportContext.getScheduler().schedule(() -> {
  399 + TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
  400 + if (awaitingAckMsg != null) {
  401 + transportService.process(state.getSession(), msg, true, TransportServiceCallback.EMPTY);
  402 + }
  403 + }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  404 + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
  405 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
  406 + if (rpcRequestMsg != null) {
  407 + transportService.process(state.getSession(), rpcRequestMsg, false, TransportServiceCallback.EMPTY);
  408 + }
  409 + }));
  410 + }
  411 + state.getRpc().getExchange().respond(response);
  412 + sent = true;
  413 + } catch (AdaptorException e) {
  414 + log.trace("Failed to reply due to error", e);
  415 + cancelObserveRelation(state.getRpc());
  416 + cancelRpcSubscription(state);
  417 + } finally {
  418 + if (msg.getPersisted() && !conRequest) {
  419 + transportService.process(state.getSession(), msg, sent, TransportServiceCallback.EMPTY);
  420 + }
  421 + }
  422 + }
  423 +
  424 + @Override
  425 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg msg) {
  426 +
  427 + }
  428 +
  429 + private void cancelObserveRelation(TbCoapObservationState attrs) {
  430 + if (attrs.getObserveRelation() != null) {
  431 + attrs.getObserveRelation().cancel();
  432 + }
  433 + }
  434 + }
  435 +
  436 + protected int getNextMsgId() {
  437 + return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1);
  438 + }
  439 +
  440 + private void cancelRpcSubscription(TbCoapClientState state) {
  441 + if (state.getRpc() != null) {
  442 + clientsByToken.remove(state.getRpc().getToken());
  443 + CoapExchange exchange = state.getAttrs().getExchange();
  444 + state.setRpc(null);
  445 + transportService.process(state.getSession(),
  446 + TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
  447 + new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  448 + if (state.getAttrs() == null) {
  449 + transportService.process(state.getSession(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  450 + transportService.deregisterSession(state.getSession());
  451 + state.setSession(null);
  452 + }
  453 + }
  454 + }
  455 +
  456 + private void cancelAttributeSubscription(TbCoapClientState state) {
  457 + if (state.getAttrs() != null) {
  458 + clientsByToken.remove(state.getAttrs().getToken());
  459 + CoapExchange exchange = state.getAttrs().getExchange();
  460 + state.setAttrs(null);
  461 + transportService.process(state.getSession(),
  462 + TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
  463 + new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
  464 + if (state.getRpc() == null) {
  465 + transportService.process(state.getSession(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  466 + transportService.deregisterSession(state.getSession());
  467 + state.setSession(null);
  468 + }
  469 + }
  470 + }
  471 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.client;
  17 +
  18 +import lombok.Data;
  19 +import lombok.Getter;
  20 +import lombok.Setter;
  21 +import org.eclipse.californium.core.network.Exchange;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  24 +import org.thingsboard.server.gen.transport.TransportProtos;
  25 +import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
  26 +import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
  27 +
  28 +import java.util.HashMap;
  29 +import java.util.List;
  30 +import java.util.Map;
  31 +import java.util.concurrent.Future;
  32 +import java.util.concurrent.atomic.AtomicInteger;
  33 +import java.util.concurrent.locks.Lock;
  34 +import java.util.concurrent.locks.ReentrantLock;
  35 +import java.util.stream.Collectors;
  36 +import java.util.stream.Stream;
  37 +
  38 +@Data
  39 +public class TbCoapClientState {
  40 +
  41 + private final DeviceId deviceId;
  42 + private final Lock lock;
  43 +
  44 + private volatile TransportConfigurationContainer configuration;
  45 + private volatile CoapTransportAdaptor adaptor;
  46 + private volatile ValidateDeviceCredentialsResponse credentials;
  47 +
  48 + private volatile TransportProtos.SessionInfoProto session;
  49 +
  50 + private volatile TbCoapObservationState attrs;
  51 + private volatile TbCoapObservationState rpc;
  52 +
  53 + @Getter
  54 + @Setter
  55 + private boolean asleep;
  56 + @Getter
  57 + private long lastUplinkTime;
  58 + @Getter
  59 + @Setter
  60 + private Future<Void> sleepTask;
  61 +
  62 + private boolean firstEdrxDownlink = true;
  63 +
  64 + public TbCoapClientState(DeviceId deviceId) {
  65 + this.deviceId = deviceId;
  66 + this.lock = new ReentrantLock();
  67 + }
  68 +
  69 + public void lock() {
  70 + lock.lock();
  71 + }
  72 +
  73 + public void unlock() {
  74 + lock.unlock();
  75 + }
  76 +
  77 + public long updateLastUplinkTime(){
  78 + this.lastUplinkTime = System.currentTimeMillis();
  79 + this.firstEdrxDownlink = true;
  80 + return lastUplinkTime;
  81 + }
  82 +
  83 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.coap.client;
  17 +
  18 +import lombok.Data;
  19 +import lombok.RequiredArgsConstructor;
  20 +import org.eclipse.californium.core.observe.ObserveRelation;
  21 +import org.eclipse.californium.core.server.resources.CoapExchange;
  22 +
  23 +import java.util.concurrent.atomic.AtomicInteger;
  24 +
  25 +@Data
  26 +@RequiredArgsConstructor
  27 +public class TbCoapObservationState {
  28 +
  29 + private final CoapExchange exchange;
  30 + private final String token;
  31 + private final AtomicInteger observeCounter = new AtomicInteger(0);
  32 + private volatile ObserveRelation observeRelation;
  33 +
  34 +}
... ...
... ... @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransp
31 31 import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
32 32 import org.thingsboard.server.common.data.device.profile.EfentoCoapDeviceTypeConfiguration;
33 33 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  34 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
34 35 import org.thingsboard.server.gen.transport.TransportProtos;
35 36 import org.thingsboard.server.gen.transport.coap.MeasurementTypeProtos;
36 37 import org.thingsboard.server.gen.transport.coap.MeasurementsProtos;
... ... @@ -81,7 +82,8 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
81 82 log.trace("Successfully parsed Efento ProtoMeasurements: [{}]", protoMeasurements.getCloudToken());
82 83 String token = protoMeasurements.getCloudToken();
83 84 transportService.process(DeviceTransportType.COAP, TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(token).build(),
84   - new CoapDeviceAuthCallback(transportContext, exchange, (sessionInfo, deviceProfile) -> {
  85 + new CoapDeviceAuthCallback(exchange, (msg, deviceProfile) -> {
  86 + TransportProtos.SessionInfoProto sessionInfo = SessionInfoCreator.create(msg, transportContext, UUID.randomUUID());
85 87 UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
86 88 try {
87 89 validateEfentoTransportConfiguration(deviceProfile);
... ...
... ... @@ -43,50 +43,4 @@ public abstract class AbstractLwM2mTransportResource extends LwM2mCoapResource {
43 43
44 44 protected abstract void processHandlePost(CoapExchange exchange);
45 45
46   - public static class CoapOkCallback implements TransportServiceCallback<Void> {
47   - private final CoapExchange exchange;
48   - private final CoAP.ResponseCode onSuccessResponse;
49   - private final CoAP.ResponseCode onFailureResponse;
50   -
51   - public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) {
52   - this.exchange = exchange;
53   - this.onSuccessResponse = onSuccessResponse;
54   - this.onFailureResponse = onFailureResponse;
55   - }
56   -
57   - @Override
58   - public void onSuccess(Void msg) {
59   - Response response = new Response(onSuccessResponse);
60   - response.setAcknowledged(isConRequest());
61   - exchange.respond(response);
62   - }
63   -
64   - @Override
65   - public void onError(Throwable e) {
66   - exchange.respond(onFailureResponse);
67   - }
68   -
69   - private boolean isConRequest() {
70   - return exchange.advanced().getRequest().isConfirmable();
71   - }
72   - }
73   -
74   - public static class CoapNoOpCallback implements TransportServiceCallback<Void> {
75   - private final CoapExchange exchange;
76   -
77   - CoapNoOpCallback(CoapExchange exchange) {
78   - this.exchange = exchange;
79   - }
80   -
81   - @Override
82   - public void onSuccess(Void msg) {
83   - }
84   -
85   - @Override
86   - public void onError(Throwable e) {
87   - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
88   - }
89   - }
90   -
91   -
92 46 }
... ...
... ... @@ -161,6 +161,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
161 161 //TODO: check that the client supports FW and SW by checking the supported objects in the model.
162 162 List<String> attributesToFetch = new ArrayList<>();
163 163 LwM2MClientFwOtaInfo fwInfo = getOrInitFwInfo(client);
  164 +
164 165 if (fwInfo.isSupported()) {
165 166 attributesToFetch.add(FIRMWARE_TITLE);
166 167 attributesToFetch.add(FIRMWARE_VERSION);
... ...