Commit 06c4f7a39617d97c78f65b19b60d7692f79a626c

Authored by YevhenBondarenko
2 parents 51e7549b 2c31de4a

Merge branch 'feature/lwm2m-refactoring-downlink' of https://github.com/thingsbo…

…ard/thingsboard into feature/lwm2m-refactoring-downlink

# Conflicts:
#	common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java
... ... @@ -18,11 +18,11 @@ package org.thingsboard.server.transport.lwm2m.server.client;
18 18 import org.eclipse.leshan.server.registration.Registration;
19 19 import org.thingsboard.server.common.data.DeviceProfile;
20 20 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
21   -import org.thingsboard.server.common.data.id.TenantId;
22 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
23 22 import org.thingsboard.server.gen.transport.TransportProtos;
24 23
25 24 import java.util.Collection;
  25 +import java.util.Optional;
26 26 import java.util.Set;
27 27 import java.util.UUID;
28 28
... ... @@ -34,7 +34,7 @@ public interface LwM2mClientContext {
34 34
35 35 LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo);
36 36
37   - void register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException;
  37 + Optional<TransportProtos.SessionInfoProto> register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException;
38 38
39 39 void updateRegistration(LwM2mClient client, Registration registration) throws LwM2MClientStateException;
40 40
... ...
... ... @@ -60,12 +60,14 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
60 60 }
61 61
62 62 @Override
63   - public void register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException {
  63 + public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException {
  64 + TransportProtos.SessionInfoProto oldSession = null;
64 65 lwM2MClient.lock();
65 66 try {
66 67 if (LwM2MClientState.UNREGISTERED.equals(lwM2MClient.getState())) {
67 68 throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state.");
68 69 }
  70 + oldSession = lwM2MClient.getSession();
69 71 TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint());
70 72 if (securityInfo.getSecurityMode() != null) {
71 73 if (securityInfo.getDeviceProfile() != null) {
... ... @@ -89,6 +91,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
89 91 } finally {
90 92 lwM2MClient.unlock();
91 93 }
  94 + return Optional.ofNullable(oldSession);
92 95 }
93 96
94 97 @Override
... ...
... ... @@ -15,12 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18   -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
  18 +import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  20 +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
20 21
21 22 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO;
22   -import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.OBSERVE_CANCEL_ALL;
23 23
  24 +@Slf4j
24 25 public class TbLwM2MCancelAllObserveCallback extends AbstractTbLwM2MRequestCallback<TbLwM2MCancelAllRequest, Integer> {
25 26
26 27 public TbLwM2MCancelAllObserveCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client) {
... ... @@ -29,7 +30,8 @@ public class TbLwM2MCancelAllObserveCallback extends AbstractTbLwM2MRequestCallb
29 30
30 31 @Override
31 32 public void onSuccess(TbLwM2MCancelAllRequest request, Integer canceledSubscriptionsCount) {
32   - String observeCancelMsg = String.format("%s: type operation %s paths: count: %d", LOG_LWM2M_INFO, OBSERVE_CANCEL_ALL.name(), canceledSubscriptionsCount);
  33 + log.trace("[{}] Cancel of all observations was successful: {}", client.getEndpoint(), canceledSubscriptionsCount);
  34 + handler.logToTelemetry(client, String.format("[%s]: Cancel of all observations was successful. Result: [%s]", LOG_LWM2M_INFO, canceledSubscriptionsCount));
33 35 }
34 36
35 37 }
... ...
... ... @@ -15,12 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 19 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
19 20 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
20 21
21 22 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO;
22 23 import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.OBSERVE_CANCEL;
23 24
  25 +@Slf4j
24 26 public class TbLwM2MCancelObserveCallback extends AbstractTbLwM2MRequestCallback<TbLwM2MCancelObserveRequest, Integer> {
25 27
26 28 private final String versionedId;
... ... @@ -32,7 +34,8 @@ public class TbLwM2MCancelObserveCallback extends AbstractTbLwM2MRequestCallback
32 34
33 35 @Override
34 36 public void onSuccess(TbLwM2MCancelObserveRequest request, Integer canceledSubscriptionsCount) {
35   - String observeCancelMsg = String.format("%s: type operation %s paths: %s count: %d", LOG_LWM2M_INFO, OBSERVE_CANCEL.name(), versionedId, canceledSubscriptionsCount);
  37 + log.trace("[{}] Cancel observation of [{}] successful: {}", client.getEndpoint(), versionedId, canceledSubscriptionsCount);
  38 + handler.logToTelemetry(client, String.format("[%s]: Cancel Observe for [%s] successful. Result: [%s]", LOG_LWM2M_INFO, versionedId, canceledSubscriptionsCount));
36 39 }
37 40
38 41 }
... ...
... ... @@ -20,18 +20,10 @@ import org.eclipse.leshan.core.response.ExecuteResponse;
20 20 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
21 21 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
22 22
23   -public class TbLwM2MExecuteCallback extends AbstractTbLwM2MRequestCallback<ExecuteRequest, ExecuteResponse> {
24   -
25   - private final String targetId;
  23 +public class TbLwM2MExecuteCallback extends TbLwM2MTargetedCallback<ExecuteRequest, ExecuteResponse> {
26 24
27 25 public TbLwM2MExecuteCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) {
28   - super(handler, client);
29   - this.targetId = targetId;
30   - }
31   -
32   - @Override
33   - public void onSuccess(ExecuteRequest request, ExecuteResponse response) {
34   - //TODO: separate callback wrapper for the RPC calls.
  26 + super(handler, client, targetId);
35 27 }
36 28
37 29 }
... ...
... ... @@ -17,14 +17,10 @@ package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.eclipse.leshan.core.request.ObserveRequest;
20   -import org.eclipse.leshan.core.request.ReadRequest;
21 20 import org.eclipse.leshan.core.response.ObserveResponse;
22   -import org.eclipse.leshan.core.response.ReadResponse;
23 21 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
24 22 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
25 23
26   -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO;
27   -
28 24 @Slf4j
29 25 public class TbLwM2MObserveCallback extends TbLwM2MTargetedCallback<ObserveRequest, ObserveResponse> {
30 26
... ... @@ -35,6 +31,6 @@ public class TbLwM2MObserveCallback extends TbLwM2MTargetedCallback<ObserveReque
35 31 @Override
36 32 public void onSuccess(ObserveRequest request, ObserveResponse response) {
37 33 super.onSuccess(request, response);
38   - handler.onUpdateValueAfterReadResponse(client.getRegistration(), targetId, response, null);
  34 + handler.onUpdateValueAfterReadResponse(client.getRegistration(), versionedId, response, null);
39 35 }
40 36 }
... ...
... ... @@ -31,7 +31,7 @@ public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback<ReadRequest, Re
31 31 @Override
32 32 public void onSuccess(ReadRequest request, ReadResponse response) {
33 33 super.onSuccess(request, response);
34   - handler.onUpdateValueAfterReadResponse(client.getRegistration(), targetId, response, null);
  34 + handler.onUpdateValueAfterReadResponse(client.getRegistration(), versionedId, response, null);
35 35 }
36 36
37 37 }
... ...
... ... @@ -16,8 +16,6 @@
16 16 package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19   -import org.eclipse.leshan.core.request.ReadRequest;
20   -import org.eclipse.leshan.core.response.ReadResponse;
21 19 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
22 20 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
23 21
... ... @@ -26,19 +24,19 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.L
26 24 @Slf4j
27 25 public abstract class TbLwM2MTargetedCallback<R, T> extends AbstractTbLwM2MRequestCallback<R, T> {
28 26
29   - protected final String targetId;
  27 + protected final String versionedId;
30 28
31   - public TbLwM2MTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) {
  29 + public TbLwM2MTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String versionedId) {
32 30 super(handler, client);
33   - this.targetId = targetId;
  31 + this.versionedId = versionedId;
34 32 }
35 33
36 34 @Override
37 35 public void onSuccess(R request, T response) {
38 36 //TODO convert camelCase to "camel case" using .split("(?<!(^|[A-Z]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])")
39 37 String requestName = request.getClass().getSimpleName();
40   - log.trace("[{}] {} [{}] successful: {}", client.getEndpoint(), requestName, targetId, response);
41   - handler.logToTelemetry(client, String.format("[%s]: %s [%s] successful. Result: [%s]", LOG_LWM2M_INFO, requestName, targetId, response));
  38 + log.trace("[{}] {} [{}] successful: {}", client.getEndpoint(), requestName, versionedId, response);
  39 + handler.logToTelemetry(client, String.format("[%s]: %s [%s] successful. Result: [%s]", LOG_LWM2M_INFO, requestName, versionedId, response));
42 40 }
43 41
44 42 }
... ...
... ... @@ -17,21 +17,13 @@ package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18 18 import org.eclipse.leshan.core.request.WriteAttributesRequest;
19 19 import org.eclipse.leshan.core.response.WriteAttributesResponse;
20   -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
21 20 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  21 +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
22 22
23   -public class TbLwM2MWriteAttributesCallback extends AbstractTbLwM2MRequestCallback<WriteAttributesRequest, WriteAttributesResponse> {
24   -
25   - private final String targetId;
  23 +public class TbLwM2MWriteAttributesCallback extends TbLwM2MTargetedCallback<WriteAttributesRequest, WriteAttributesResponse> {
26 24
27 25 public TbLwM2MWriteAttributesCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) {
28   - super(handler, client);
29   - this.targetId = targetId;
30   - }
31   -
32   - @Override
33   - public void onSuccess(WriteAttributesRequest request, WriteAttributesResponse response) {
34   - //TODO: separate callback wrapper for the RPC calls.
  26 + super(handler, client, targetId);
35 27 }
36 28
37 29 }
... ...
... ... @@ -15,25 +15,21 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18   -import lombok.Setter;
19 18 import org.eclipse.leshan.core.request.WriteRequest;
20 19 import org.eclipse.leshan.core.response.WriteResponse;
21   -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
22 20 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  21 +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
23 22
24   -public class TbLwM2MWriteResponseCallback extends AbstractTbLwM2MRequestCallback<WriteRequest, WriteResponse> {
25   -
26   - private final String targetId;
  23 +public class TbLwM2MWriteResponseCallback extends TbLwM2MTargetedCallback<WriteRequest, WriteResponse> {
27 24
28 25 public TbLwM2MWriteResponseCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) {
29   - super(handler, client);
30   - this.targetId = targetId;
  26 + super(handler, client, targetId);
31 27 }
32 28
33 29 @Override
34 30 public void onSuccess(WriteRequest request, WriteResponse response) {
35   - handler.onWriteResponseOk(client, targetId, request);
36   - //TODO: separate callback wrapper for the RPC calls.
  31 + super.onSuccess(request, response);
  32 + handler.onWriteResponseOk(client, versionedId, request);
37 33 }
38 34
39 35 }
... ...
... ... @@ -201,7 +201,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
201 201 try {
202 202 log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId());
203 203 if (lwM2MClient != null) {
204   - this.clientContext.register(lwM2MClient, registration);
  204 + Optional<SessionInfoProto> oldSessionInfo = this.clientContext.register(lwM2MClient, registration);
  205 + if (oldSessionInfo.isPresent()) {
  206 + log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB()));
  207 + closeSession(oldSessionInfo.get());
  208 + }
205 209 this.logToTelemetry(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId());
206 210 SessionInfoProto sessionInfo = lwM2MClient.getSession();
207 211 transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, rpcHandler, sessionInfo));
... ... @@ -278,8 +282,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
278 282 clientContext.unregister(client, registration);
279 283 SessionInfoProto sessionInfo = client.getSession();
280 284 if (sessionInfo != null) {
281   - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
282   - transportService.deregisterSession(sessionInfo);
  285 + closeSession(sessionInfo);
283 286 sessionStore.remove(registration.getEndpoint());
284 287 log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
285 288 } else {
... ... @@ -294,6 +297,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
294 297 });
295 298 }
296 299
  300 + public void closeSession(SessionInfoProto sessionInfo) {
  301 + transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
  302 + transportService.deregisterSession(sessionInfo);
  303 + }
  304 +
297 305 @Override
298 306 public void onSleepingDev(Registration registration) {
299 307 log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint());
... ... @@ -447,8 +455,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
447 455 */
448 456 @Override
449 457 public void doDisconnect(SessionInfoProto sessionInfo) {
450   - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
451   - transportService.deregisterSession(sessionInfo);
  458 + closeSession(sessionInfo);
452 459 }
453 460
454 461 /**
... ... @@ -613,7 +620,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
613 620 * #3 If fr_update -> UpdateFirmware
614 621 * #4 updateAttrTelemetry
615 622 *
616   - * @param lwM2MClient - Registration LwM2M Client
  623 + * @param lwM2MClient - Registration LwM2M Client
617 624 * @param lwM2mResource - LwM2mSingleResource response.getContent()
618 625 * @param path - resource
619 626 */
... ... @@ -811,13 +818,6 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
811 818 return lwm2mResourceValue;
812 819 }
813 820
814   - /**
815   - * Update resource (attribute) value on thingsboard after update value in client
816   - *
817   - * @param registration -
818   - * @param path -
819   - * @param request -
820   - */
821 821 @Override
822 822 public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) {
823 823 if (request.getNode() instanceof LwM2mResource) {
... ... @@ -982,7 +982,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
982 982 * Get path to resource from profile equal keyName
983 983 *
984 984 * @param sessionInfo -
985   - * @param keyName -
  985 + * @param keyName -
986 986 * @return -
987 987 */
988 988 @Override
... ...