Commit f0f08fd0c90c03dbcb88cbcd0089afff610fe0a7

Authored by nickAS21
1 parent dea35dc1

LWM2M: fix bug RPC device of line

... ... @@ -192,6 +192,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
192 192 syncSessionSet.add(key);
193 193 }
194 194 });
  195 + log.warn("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions);
195 196 syncSessionSet.forEach(rpcSubscriptions::remove);
196 197 }
197 198
... ... @@ -240,6 +241,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
240 241 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
241 242 if (sessionType == SessionType.SYNC) {
242 243 log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
  244 + log.warn("47) Rpc sessionId [{}] Cleanup sync rpc session subscription before [{}]",sessionId, rpcSubscriptions);
243 245 rpcSubscriptions.remove(sessionId);
244 246 }
245 247 } else {
... ... @@ -454,7 +456,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
454 456 } else {
455 457 SessionInfoMetaData sessionMD = sessions.get(sessionId);
456 458 if (sessionMD == null) {
457   - sessionMD = new SessionInfoMetaData(new SessionInfo(SessionType.SYNC, sessionInfo.getNodeId()));
  459 + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
458 460 }
459 461 sessionMD.setSubscribedToAttributes(true);
460 462 log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
... ... @@ -471,15 +473,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
471 473 UUID sessionId = getSessionId(sessionInfo);
472 474 if (subscribeCmd.getUnsubscribe()) {
473 475 log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
  476 + log.warn("48) Rpc sessionId [{}] Canceling rpc subscription for session subscription before [{}]",sessionId, rpcSubscriptions);
474 477 rpcSubscriptions.remove(sessionId);
475 478 } else {
476 479 SessionInfoMetaData sessionMD = sessions.get(sessionId);
477 480 if (sessionMD == null) {
478   - sessionMD = new SessionInfoMetaData(new SessionInfo(SessionType.SYNC, sessionInfo.getNodeId()));
  481 + sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
479 482 }
480 483 sessionMD.setSubscribedToRPC(true);
481 484 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
482 485 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
  486 + log.warn("45) sessionId [{}] Registering rpc subscription for session [{}]",sessionId, rpcSubscriptions);
483 487 sendPendingRequests(context, sessionId, sessionInfo);
484 488 dumpSessions();
485 489 }
... ... @@ -509,6 +513,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
509 513 log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
510 514 sessions.remove(sessionId);
511 515 attributeSubscriptions.remove(sessionId);
  516 + log.warn("49) Rpc sessionId [{}] Canceling subscriptions for closed session subscription before [{}]",sessionId, rpcSubscriptions);
512 517 rpcSubscriptions.remove(sessionId);
513 518 if (sessions.isEmpty()) {
514 519 reportSessionClose();
... ... @@ -764,6 +769,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
764 769 Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
765 770 sessionsToRemove.forEach((sessionId, sessionMD) -> {
766 771 sessions.remove(sessionId);
  772 + log.warn("50) Rpc sessionId [{}] checkSessionsTimeout subscription before [{}]",sessionId, rpcSubscriptions);
767 773 rpcSubscriptions.remove(sessionId);
768 774 attributeSubscriptions.remove(sessionId);
769 775 notifyTransportAboutClosedSession(sessionId, sessionMD, "session timeout!");
... ...
... ... @@ -310,10 +310,12 @@ message SessionCloseNotificationProto {
310 310
311 311 message SubscribeToAttributeUpdatesMsg {
312 312 bool unsubscribe = 1;
  313 + SessionType sessionType = 2;
313 314 }
314 315
315 316 message SubscribeToRPCMsg {
316 317 bool unsubscribe = 1;
  318 + SessionType sessionType = 2;
317 319 }
318 320
319 321 message ToDeviceRpcRequestMsg {
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.eclipse.californium.core.coap.CoAP;
  20 +import org.eclipse.californium.core.coap.Response;
  21 +import org.eclipse.californium.core.server.resources.CoapExchange;
  22 +import org.eclipse.leshan.core.californium.LwM2mCoapResource;
  23 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  24 +
  25 +@Slf4j
  26 +public abstract class AbstractLwm2mTransportResource extends LwM2mCoapResource {
  27 + protected final LwM2mTransportMsgHandler handler;
  28 +
  29 + public AbstractLwm2mTransportResource(LwM2mTransportMsgHandler handler, String name) {
  30 + super(name);
  31 + this.handler = handler;
  32 + }
  33 +
  34 + @Override
  35 + public void handleGET(CoapExchange exchange) {
  36 + processHandleGet(exchange);
  37 + }
  38 +
  39 + @Override
  40 + public void handlePOST(CoapExchange exchange) {
  41 + processHandlePost(exchange);
  42 + }
  43 +
  44 + protected abstract void processHandleGet(CoapExchange exchange);
  45 +
  46 + protected abstract void processHandlePost(CoapExchange exchange);
  47 +
  48 + public static class CoapOkCallback implements TransportServiceCallback<Void> {
  49 + private final CoapExchange exchange;
  50 + private final CoAP.ResponseCode onSuccessResponse;
  51 + private final CoAP.ResponseCode onFailureResponse;
  52 +
  53 + public CoapOkCallback(CoapExchange exchange, CoAP.ResponseCode onSuccessResponse, CoAP.ResponseCode onFailureResponse) {
  54 + this.exchange = exchange;
  55 + this.onSuccessResponse = onSuccessResponse;
  56 + this.onFailureResponse = onFailureResponse;
  57 + }
  58 +
  59 + @Override
  60 + public void onSuccess(Void msg) {
  61 + Response response = new Response(onSuccessResponse);
  62 + response.setAcknowledged(isConRequest());
  63 + exchange.respond(response);
  64 + }
  65 +
  66 + @Override
  67 + public void onError(Throwable e) {
  68 + exchange.respond(onFailureResponse);
  69 + }
  70 +
  71 + private boolean isConRequest() {
  72 + return exchange.advanced().getRequest().isConfirmable();
  73 + }
  74 + }
  75 +
  76 + public static class CoapNoOpCallback implements TransportServiceCallback<Void> {
  77 + private final CoapExchange exchange;
  78 +
  79 + CoapNoOpCallback(CoapExchange exchange) {
  80 + this.exchange = exchange;
  81 + }
  82 +
  83 + @Override
  84 + public void onSuccess(Void msg) {
  85 + }
  86 +
  87 + @Override
  88 + public void onError(Throwable e) {
  89 + exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  90 + }
  91 + }
  92 +}
... ...
... ... @@ -42,10 +42,10 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
42 42 import org.thingsboard.server.cache.ota.OtaPackageDataCache;
43 43 import org.thingsboard.server.common.data.Device;
44 44 import org.thingsboard.server.common.data.DeviceProfile;
  45 +import org.thingsboard.server.common.data.id.OtaPackageId;
45 46 import org.thingsboard.server.common.data.ota.OtaPackageKey;
46 47 import org.thingsboard.server.common.data.ota.OtaPackageType;
47 48 import org.thingsboard.server.common.data.ota.OtaPackageUtil;
48   -import org.thingsboard.server.common.data.id.OtaPackageId;
49 49 import org.thingsboard.server.common.transport.TransportService;
50 50 import org.thingsboard.server.common.transport.TransportServiceCallback;
51 51 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
... ... @@ -87,9 +87,9 @@ import java.util.stream.Collectors;
87 87
88 88 import static org.eclipse.californium.core.coap.CoAP.ResponseCode.BAD_REQUEST;
89 89 import static org.eclipse.leshan.core.attributes.Attribute.OBJECT_VERSION;
  90 +import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
90 91 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.DOWNLOADED;
91 92 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.UPDATING;
92   -import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
93 93 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper.getValueFromKvProto;
94 94 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.CLIENT_NOT_AUTHORIZED;
95 95 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.DEVICE_ATTRIBUTES_REQUEST;
... ... @@ -189,13 +189,15 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
189 189 SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(lwM2MClient);
190 190 if (sessionInfo != null) {
191 191 transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo));
192   - TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
  192 + log.warn("40) sessionId [{}] Registering rpc subscription after Registration client",new UUID (sessionInfo.getSessionIdMSB(),sessionInfo.getSessionIdLSB()));
  193 + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
193 194 .setSessionInfo(sessionInfo)
194 195 .setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN))
195   - .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build())
196   - .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().build())
197   - .build();
198   - transportService.process(msg, null);
  196 + .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder()
  197 + .setSessionType(TransportProtos.SessionType.ASYNC).build())
  198 + .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder()
  199 + .setSessionType(TransportProtos.SessionType.ASYNC).build())
  200 + .build(), null);
199 201 this.getInfoFirmwareUpdate(lwM2MClient, null);
200 202 this.getInfoSoftwareUpdate(lwM2MClient, null);
201 203 this.initLwM2mFromClientValue(registration, lwM2MClient);
... ... @@ -250,7 +252,7 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
250 252 unRegistrationExecutor.submit(() -> {
251 253 try {
252 254 this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client unRegistration", registration.getId());
253   - this.closeClientSession(registration);
  255 +// this.closeClientSession(registration);
254 256 } catch (Throwable t) {
255 257 log.error("[{}] endpoint [{}] error Unable un registration.", registration.getEndpoint(), t);
256 258 this.sendLogsToThingsboard(LOG_LW2M_ERROR + String.format(": Client Unable un Registration, %s", t.getMessage()), registration.getId());
... ... @@ -262,10 +264,11 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
262 264 SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(registration);
263 265 if (sessionInfo != null) {
264 266 transportService.deregisterSession(sessionInfo);
  267 + // TO DO may problem, better by registrationId
265 268 sessionStore.remove(registration.getEndpoint());
266 269 this.doCloseSession(sessionInfo);
267 270 clientContext.removeClientByRegistrationId(registration.getId());
268   - log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
  271 + log.warn("52) Client close session [{}]: [{}] unReg [{}] name [{}] profile ", new UUID (sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()), registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
269 272 } else {
270 273 log.error("Client close session: [{}] unReg [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
271 274 }
... ... @@ -446,10 +449,10 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
446 449 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg, SessionInfoProto sessionInfo) {
447 450 // #1
448 451 this.checkRpcRequestTimeout();
449   - log.warn ("4) toDeviceRpcRequestMsg: [{}], sessionUUID: [{}]", toDeviceRpcRequestMsg, new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()));
  452 + UUID requestUUID = new UUID(toDeviceRpcRequestMsg.getRequestIdMSB(), toDeviceRpcRequestMsg.getRequestIdLSB());
  453 + log.warn ("4) toDeviceRpcRequestMsg: [{}], sessionUUID: [{}]", toDeviceRpcRequestMsg, requestUUID);
450 454 String bodyParams = StringUtils.trimToNull(toDeviceRpcRequestMsg.getParams()) != null ? toDeviceRpcRequestMsg.getParams() : "null";
451 455 LwM2mTypeOper lwM2mTypeOper = setValidTypeOper(toDeviceRpcRequestMsg.getMethodName());
452   - UUID requestUUID = new UUID(toDeviceRpcRequestMsg.getRequestIdMSB(), toDeviceRpcRequestMsg.getRequestIdLSB());
453 456 if (!this.rpcSubscriptions.containsKey(requestUUID)) {
454 457 this.rpcSubscriptions.put(requestUUID, toDeviceRpcRequestMsg.getExpirationTime());
455 458 Lwm2mClientRpcRequest lwm2mClientRpcRequest = null;
... ... @@ -479,8 +482,14 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
479 482 }
480 483
481 484 private void checkRpcRequestTimeout() {
482   - Set<UUID> rpcSubscriptionsToRemove = rpcSubscriptions.entrySet().stream().filter(kv -> System.currentTimeMillis() > kv.getValue()).map(Map.Entry::getKey).collect(Collectors.toSet());
483   - rpcSubscriptionsToRemove.forEach(rpcSubscriptions::remove);
  485 + log.warn ("4.1) before rpcSubscriptions.size(): [{}]", rpcSubscriptions.size());
  486 + if (rpcSubscriptions.size() > 0) {
  487 + Set<UUID> rpcSubscriptionsToRemove = rpcSubscriptions.entrySet().stream().filter(kv -> System.currentTimeMillis() > kv.getValue()).map(Map.Entry::getKey).collect(Collectors.toSet());
  488 + log.warn ("4.2) System.currentTimeMillis(): [{}]", System.currentTimeMillis());
  489 + log.warn ("4.3) rpcSubscriptionsToRemove: [{}]", rpcSubscriptionsToRemove);
  490 + rpcSubscriptionsToRemove.forEach(rpcSubscriptions::remove);
  491 + }
  492 + log.warn ("4.4) after rpcSubscriptions.size(): [{}]", rpcSubscriptions.size());
484 493 }
485 494
486 495 public void sentRpcResponse(Lwm2mClientRpcRequest rpcRequest, String requestCode, String msg, String typeMsg) {
... ... @@ -1372,7 +1381,8 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
1372 1381 lwM2MClient.getFwUpdate().setRpcRequest(rpcRequest);
1373 1382 lwM2MClient.getFwUpdate().setCurrentVersion(response.getVersion());
1374 1383 lwM2MClient.getFwUpdate().setCurrentTitle(response.getTitle());
1375   - lwM2MClient.getFwUpdate().setCurrentId(new OtaPackageId(new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB())).getId());
  1384 + log.warn("11) OtaPackageIdMSB: [{}] OtaPackageIdLSB: [{}]", response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB());
  1385 + lwM2MClient.getFwUpdate().setCurrentId(new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB()));
1376 1386 if (rpcRequest == null) {
1377 1387 lwM2MClient.getFwUpdate().sendReadObserveInfo(lwM2mTransportRequest);
1378 1388 }
... ...
... ... @@ -28,7 +28,6 @@ import org.eclipse.leshan.server.californium.registration.CaliforniumRegistratio
28 28 import org.eclipse.leshan.server.model.LwM2mModelProvider;
29 29 import org.eclipse.leshan.server.security.EditableSecurityStore;
30 30 import org.springframework.stereotype.Component;
31   -import org.thingsboard.common.util.ThingsBoardThreadFactory;
32 31 import org.thingsboard.server.common.data.StringUtils;
33 32 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
34 33 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
... ... @@ -58,14 +57,14 @@ import java.security.spec.InvalidParameterSpecException;
58 57 import java.security.spec.KeySpec;
59 58 import java.security.spec.PKCS8EncodedKeySpec;
60 59 import java.util.Arrays;
61   -import java.util.concurrent.Executors;
62   -import java.util.concurrent.ScheduledExecutorService;
63 60
64 61 import static org.eclipse.californium.scandium.dtls.cipher.CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256;
65 62 import static org.eclipse.californium.scandium.dtls.cipher.CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8;
66 63 import static org.eclipse.californium.scandium.dtls.cipher.CipherSuite.TLS_PSK_WITH_AES_128_CBC_SHA256;
67 64 import static org.eclipse.californium.scandium.dtls.cipher.CipherSuite.TLS_PSK_WITH_AES_128_CCM_8;
68 65 import static org.thingsboard.server.transport.lwm2m.server.LwM2mNetworkConfig.getCoapConfig;
  66 +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_COAP_RESOURCE;
  67 +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_COAP_RESOURCE;
69 68
70 69 @Slf4j
71 70 @Component
... ... @@ -81,7 +80,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
81 80 private final LwM2mTransportContext context;
82 81 private final LwM2MTransportServerConfig config;
83 82 private final LwM2mTransportServerHelper helper;
84   - private final LwM2mTransportMsgHandler handler;
  83 + private final DefaultLwM2MTransportMsgHandler handler;
85 84 private final CaliforniumRegistrationStore registrationStore;
86 85 private final EditableSecurityStore securityStore;
87 86 private final LwM2mClientContext lwM2mClientContext;
... ... @@ -96,6 +95,19 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
96 95 new LWM2MGenerationPSkRPkECC();
97 96 }
98 97 this.server = getLhServer();
  98 + /**
  99 + * Add a resource to the server.
  100 + * CoapResource ->
  101 + * path = FW_PACKAGE or SW_PACKAGE
  102 + * nameFile = "BC68JAR01A09_TO_BC68JAR01A10.bin"
  103 + * "coap://host:port/{path}/{token}/{nameFile}"
  104 + */
  105 +
  106 +
  107 + LwM2mTransportCoapResource fwCoapResource = new LwM2mTransportCoapResource(handler, FW_COAP_RESOURCE);
  108 + LwM2mTransportCoapResource swCoapResource = new LwM2mTransportCoapResource(handler, SW_COAP_RESOURCE);
  109 + this.server.coap().getServer().add(fwCoapResource);
  110 + this.server.coap().getServer().add(swCoapResource);
99 111 this.startLhServer();
100 112 this.context.setServer(server);
101 113 }
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.eclipse.californium.core.coap.CoAP;
  20 +import org.eclipse.californium.core.coap.Request;
  21 +import org.eclipse.californium.core.coap.Response;
  22 +import org.eclipse.californium.core.network.Exchange;
  23 +import org.eclipse.californium.core.observe.ObserveRelation;
  24 +import org.eclipse.californium.core.server.resources.CoapExchange;
  25 +import org.eclipse.californium.core.server.resources.Resource;
  26 +import org.eclipse.californium.core.server.resources.ResourceObserver;
  27 +
  28 +import java.util.UUID;
  29 +import java.util.concurrent.ConcurrentHashMap;
  30 +import java.util.concurrent.ConcurrentMap;
  31 +import java.util.concurrent.atomic.AtomicInteger;
  32 +
  33 +@Slf4j
  34 +public class LwM2mTransportCoapResource extends AbstractLwm2mTransportResource {
  35 + private final ConcurrentMap<String, ObserveRelation> tokenToObserveRelationMap = new ConcurrentHashMap<>();
  36 + private final ConcurrentMap<String, AtomicInteger> tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>();
  37 +
  38 + public LwM2mTransportCoapResource(LwM2mTransportMsgHandler handler, String name) {
  39 + super(handler, name);
  40 + this.setObservable(true); // enable observing
  41 + this.addObserver(new CoapResourceObserver());
  42 + }
  43 +
  44 +
  45 + @Override
  46 + public void checkObserveRelation(Exchange exchange, Response response) {
  47 + String token = getTokenFromRequest(exchange.getRequest());
  48 + final ObserveRelation relation = exchange.getRelation();
  49 + if (relation == null || relation.isCanceled()) {
  50 + return; // because request did not try to establish a relation
  51 + }
  52 + if (CoAP.ResponseCode.isSuccess(response.getCode())) {
  53 +
  54 + if (!relation.isEstablished()) {
  55 + relation.setEstablished();
  56 + addObserveRelation(relation);
  57 + }
  58 + AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0));
  59 + response.getOptions().setObserve(notificationCounter.getAndIncrement());
  60 + } // ObserveLayer takes care of the else case
  61 + }
  62 +
  63 +
  64 + @Override
  65 + protected void processHandleGet(CoapExchange exchange) {
  66 + log.warn("1) processHandleGet [{}]", exchange);
  67 +// exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
  68 +// int ver = 10;
  69 + int ver = 9;
  70 + UUID currentId;
  71 + if (ver == 10) {
  72 + long mSB = 4951557297924280811L;
  73 + long lSb = -8451242882176289074L;
  74 + currentId = new UUID(mSB, lSb);
  75 + } else {
  76 + long mSB = 9085827945869414891L;
  77 + long lSb = -9086716326447629319L;
  78 + currentId = new UUID(mSB, lSb);
  79 + }
  80 + String resource = exchange.getRequestOptions().getUriPath().get(0);
  81 + String token = exchange.getRequestOptions().getUriPath().get(1);
  82 + exchange.respond(CoAP.ResponseCode.CONTENT, this.getFwData(currentId));
  83 +
  84 + }
  85 +
  86 + @Override
  87 + protected void processHandlePost(CoapExchange exchange) {
  88 + log.warn("2) processHandleGet [{}]", exchange);
  89 + }
  90 +
  91 + /**
  92 + * Override the default behavior so that requests to sub resources (typically /{name}/{token}) are handled by
  93 + * /name resource.
  94 + */
  95 + @Override
  96 + public Resource getChild(String name) {
  97 + return this;
  98 + }
  99 +
  100 +
  101 + private String getTokenFromRequest(Request request) {
  102 + return (request.getSourceContext() != null ? request.getSourceContext().getPeerAddress().getAddress().getHostAddress() : "null")
  103 + + ":" + (request.getSourceContext() != null ? request.getSourceContext().getPeerAddress().getPort() : -1) + ":" + request.getTokenString();
  104 + }
  105 +
  106 + public class CoapResourceObserver implements ResourceObserver {
  107 +
  108 + @Override
  109 + public void changedName(String old) {
  110 +
  111 + }
  112 +
  113 + @Override
  114 + public void changedPath(String old) {
  115 +
  116 + }
  117 +
  118 + @Override
  119 + public void addedChild(Resource child) {
  120 +
  121 + }
  122 +
  123 + @Override
  124 + public void removedChild(Resource child) {
  125 +
  126 + }
  127 +
  128 + @Override
  129 + public void addedObserveRelation(ObserveRelation relation) {
  130 +
  131 + }
  132 +
  133 + @Override
  134 + public void removedObserveRelation(ObserveRelation relation) {
  135 +
  136 + }
  137 + }
  138 +
  139 + private byte[] getFwData(UUID currentId) {
  140 + int chunkSize = 0;
  141 + int chunk = 0;
  142 + return ((DefaultLwM2MTransportMsgHandler) handler).otaPackageDataCache.get(currentId.toString(), chunkSize, chunk);
  143 + }
  144 +
  145 +}
... ...
... ... @@ -43,11 +43,11 @@ import org.eclipse.leshan.server.registration.Registration;
43 43 import org.nustaq.serialization.FSTConfiguration;
44 44 import org.thingsboard.server.common.data.DeviceProfile;
45 45 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
  46 +import org.thingsboard.server.common.data.id.TenantId;
46 47 import org.thingsboard.server.common.data.ota.OtaPackageKey;
47 48 import org.thingsboard.server.common.data.ota.OtaPackageType;
48 49 import org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus;
49 50 import org.thingsboard.server.common.data.ota.OtaPackageUtil;
50   -import org.thingsboard.server.common.data.id.TenantId;
51 51 import org.thingsboard.server.common.transport.TransportServiceCallback;
52 52 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
53 53 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientProfile;
... ... @@ -77,14 +77,14 @@ import static org.eclipse.leshan.core.model.ResourceModel.Type.OBJLNK;
77 77 import static org.eclipse.leshan.core.model.ResourceModel.Type.OPAQUE;
78 78 import static org.eclipse.leshan.core.model.ResourceModel.Type.STRING;
79 79 import static org.eclipse.leshan.core.model.ResourceModel.Type.TIME;
  80 +import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_KEY;
  81 +import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
80 82 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.DOWNLOADED;
81 83 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.DOWNLOADING;
82 84 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.FAILED;
83 85 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.UPDATED;
84 86 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.UPDATING;
85 87 import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.VERIFIED;
86   -import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_KEY;
87   -import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
88 88
89 89 @Slf4j
90 90 public class LwM2mTransportUtil {
... ... @@ -140,6 +140,7 @@ public class LwM2mTransportUtil {
140 140 public static final String METHOD_KEY = "methodName";
141 141
142 142 // Firmware
  143 + public static final String FW_COAP_RESOURCE = "fw";
143 144 public static final String FW_UPDATE = "Firmware update";
144 145 public static final Integer FW_ID = 5;
145 146 // Package W
... ... @@ -156,6 +157,7 @@ public class LwM2mTransportUtil {
156 157 public static final String FW_UPDATE_ID = "/5/0/2";
157 158
158 159 // Software
  160 + public static final String SW_COAP_RESOURCE = "sw";
159 161 public static final String SW_UPDATE = "Software update";
160 162 public static final Integer SW_ID = 9;
161 163 // Package W
... ...
... ... @@ -37,6 +37,7 @@ import java.util.Optional;
37 37 import java.util.Set;
38 38 import java.util.UUID;
39 39 import java.util.concurrent.ConcurrentHashMap;
  40 +import java.util.function.Predicate;
40 41
41 42 import static org.eclipse.leshan.core.SecurityMode.NO_SEC;
42 43 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertPathFromObjectIdToIdVer;
... ... @@ -68,12 +69,15 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
68 69
69 70 @Override
70 71 public LwM2mClient getOrRegister(Registration registration) {
  72 + LwM2mClient client = null;
71 73 if (registration == null) {
72   - return null;
  74 + return client;
  75 + }
  76 + if (lwM2mClientsByRegistrationId.size()>0) {
  77 + client = this.lwM2mClientsByRegistrationId.get(registration.getId());
73 78 }
74   - LwM2mClient client = lwM2mClientsByRegistrationId.get(registration.getId());
75 79 if (client == null) {
76   - client = lwM2mClientsByEndpoint.get(registration.getEndpoint());
  80 + client = this.lwM2mClientsByEndpoint.get(registration.getEndpoint());
77 81 if (client == null) {
78 82 client = registerOrUpdate(registration);
79 83 }
... ... @@ -83,12 +87,29 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
83 87
84 88 @Override
85 89 public LwM2mClient getClient(TransportProtos.SessionInfoProto sessionInfo) {
86   - LwM2mClient lwM2mClient = lwM2mClientsByEndpoint.values().stream().filter(c ->
  90 + LwM2mClient lwM2mClient = null;
  91 + Predicate<LwM2mClient> isClientFilter = c ->
87 92 (new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()))
88   - .equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())))
89   -
90   - ).findAny().get();
91   - if (lwM2mClient == null) {
  93 + .equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
  94 +// if (this.lwM2mClientsByEndpoint.size()>0) {
  95 +// lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(c ->
  96 +// (new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()))
  97 +// .equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())))
  98 +// ).findAny().get();
  99 +// }
  100 + if (this.lwM2mClientsByEndpoint.size()>0) {
  101 + lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().get();
  102 + }
  103 +// if (lwM2mClient == null && this.lwM2mClientsByRegistrationId.size() > 0) {
  104 +// lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(c ->
  105 +// (new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()))
  106 +// .equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())))
  107 +// ).findAny().get();
  108 +// }
  109 + if (lwM2mClient == null && this.lwM2mClientsByRegistrationId.size() > 0) {
  110 + lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(isClientFilter).findAny().get();
  111 + }
  112 + if (lwM2mClient == null){
92 113 log.warn("Device TimeOut? lwM2mClient is null.");
93 114 log.warn("SessionInfo input [{}], lwM2mClientsByEndpoint size: [{}]", sessionInfo, lwM2mClientsByEndpoint.values().size());
94 115 log.error("", new RuntimeException());
... ...
... ... @@ -259,8 +259,10 @@ public class GatewaySessionHandler {
259 259 transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
260 260 .setSessionInfo(deviceSessionInfo)
261 261 .setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN))
262   - .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build())
263   - .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().build())
  262 + .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder()
  263 + .setSessionType(TransportProtos.SessionType.ASYNC).build())
  264 + .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder()
  265 + .setSessionType(TransportProtos.SessionType.ASYNC).build())
264 266 .build(), null);
265 267 }
266 268 futureToSet.set(devices.get(deviceName));
... ...
... ... @@ -708,6 +708,7 @@ public class DefaultTransportService implements TransportService {
708 708 log.debug("Stopping scheduler to avoid resending response if request has been ack.");
709 709 currentSession.getScheduledFuture().cancel(false);
710 710 }
  711 + log.warn("54) session [{}] deregisterSession Stopping scheduler to avoid resending response if request has been ack.", toSessionId(sessionInfo));
711 712 sessions.remove(toSessionId(sessionInfo));
712 713 }
713 714
... ...