Commit a26e68038ccfb9b0c60e45e9f399e90c2bb477e4

Authored by Andrew Shvayka
Committed by GitHub
2 parents c500be4e 4bf8ef18

Merge pull request #4815 from thingsboard/lwm2m-client-store

Lwm2m Client Store for Redis to survive server restarts.
Showing 28 changed files with 751 additions and 285 deletions
@@ -87,7 +87,7 @@ public class LwM2mTransportUtil { @@ -87,7 +87,7 @@ public class LwM2mTransportUtil {
87 87
88 public static final String LWM2M_VERSION_DEFAULT = "1.0"; 88 public static final String LWM2M_VERSION_DEFAULT = "1.0";
89 89
90 - public static final String LOG_LWM2M_TELEMETRY = "logLwm2m"; 90 + public static final String LOG_LWM2M_TELEMETRY = "transportLog";
91 public static final String LOG_LWM2M_INFO = "info"; 91 public static final String LOG_LWM2M_INFO = "info";
92 public static final String LOG_LWM2M_ERROR = "error"; 92 public static final String LOG_LWM2M_ERROR = "error";
93 public static final String LOG_LWM2M_WARN = "warn"; 93 public static final String LOG_LWM2M_WARN = "warn";
@@ -169,19 +169,6 @@ public class LwM2mTransportUtil { @@ -169,19 +169,6 @@ public class LwM2mTransportUtil {
169 return lwM2mOtaConvert; 169 return lwM2mOtaConvert;
170 } 170 }
171 171
172 - public static LwM2mNode getLvM2mNodeToObject(LwM2mNode content) {  
173 - if (content instanceof LwM2mObject) {  
174 - return (LwM2mObject) content;  
175 - } else if (content instanceof LwM2mObjectInstance) {  
176 - return (LwM2mObjectInstance) content;  
177 - } else if (content instanceof LwM2mSingleResource) {  
178 - return (LwM2mSingleResource) content;  
179 - } else if (content instanceof LwM2mMultipleResource) {  
180 - return (LwM2mMultipleResource) content;  
181 - }  
182 - return null;  
183 - }  
184 -  
185 public static Lwm2mDeviceProfileTransportConfiguration toLwM2MClientProfile(DeviceProfile deviceProfile) { 172 public static Lwm2mDeviceProfileTransportConfiguration toLwM2MClientProfile(DeviceProfile deviceProfile) {
186 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); 173 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
187 if (transportConfiguration.getType().equals(DeviceTransportType.LWM2M)) { 174 if (transportConfiguration.getType().equals(DeviceTransportType.LWM2M)) {
@@ -196,62 +183,6 @@ public class LwM2mTransportUtil { @@ -196,62 +183,6 @@ public class LwM2mTransportUtil {
196 return toLwM2MClientProfile(deviceProfile).getBootstrap(); 183 return toLwM2MClientProfile(deviceProfile).getBootstrap();
197 } 184 }
198 185
199 - public static JsonObject validateJson(String jsonStr) {  
200 - JsonObject object = null;  
201 - if (jsonStr != null && !jsonStr.isEmpty()) {  
202 - String jsonValidFlesh = jsonStr.replaceAll("\\\\", "");  
203 - jsonValidFlesh = jsonValidFlesh.replaceAll("\n", "");  
204 - jsonValidFlesh = jsonValidFlesh.replaceAll("\t", "");  
205 - jsonValidFlesh = jsonValidFlesh.replaceAll(" ", "");  
206 - String jsonValid = (jsonValidFlesh.charAt(0) == '"' && jsonValidFlesh.charAt(jsonValidFlesh.length() - 1) == '"') ? jsonValidFlesh.substring(1, jsonValidFlesh.length() - 1) : jsonValidFlesh;  
207 - try {  
208 - object = new JsonParser().parse(jsonValid).getAsJsonObject();  
209 - } catch (JsonSyntaxException e) {  
210 - log.error("[{}] Fail validateJson [{}]", jsonStr, e.getMessage());  
211 - }  
212 - }  
213 - return object;  
214 - }  
215 -  
216 - @SuppressWarnings("unchecked")  
217 - public static <T> Optional<T> decode(byte[] byteArray) {  
218 - try {  
219 - FSTConfiguration config = FSTConfiguration.createDefaultConfiguration();  
220 - T msg = (T) config.asObject(byteArray);  
221 - return Optional.ofNullable(msg);  
222 - } catch (IllegalArgumentException e) {  
223 - log.error("Error during deserialization message, [{}]", e.getMessage());  
224 - return Optional.empty();  
225 - }  
226 - }  
227 -  
228 - public static String splitCamelCaseString(String s) {  
229 - LinkedList<String> linkedListOut = new LinkedList<>();  
230 - LinkedList<String> linkedList = new LinkedList<String>((Arrays.asList(s.split(" "))));  
231 - linkedList.forEach(str -> {  
232 - String strOut = str.replaceAll("\\W", "").replaceAll("_", "").toUpperCase();  
233 - if (strOut.length() > 1) linkedListOut.add(strOut.charAt(0) + strOut.substring(1).toLowerCase());  
234 - else linkedListOut.add(strOut);  
235 - });  
236 - linkedListOut.set(0, (linkedListOut.get(0).substring(0, 1).toLowerCase() + linkedListOut.get(0).substring(1)));  
237 - return StringUtils.join(linkedListOut, "");  
238 - }  
239 -  
240 - public static <T> TransportServiceCallback<Void> getAckCallback(LwM2mClient lwM2MClient,  
241 - int requestId, String typeTopic) {  
242 - return new TransportServiceCallback<Void>() {  
243 - @Override  
244 - public void onSuccess(Void dummy) {  
245 - log.trace("[{}] [{}] - requestId [{}] - EndPoint , Access AckCallback", typeTopic, requestId, lwM2MClient.getEndpoint());  
246 - }  
247 -  
248 - @Override  
249 - public void onError(Throwable e) {  
250 - log.trace("[{}] Failed to publish msg", e.toString());  
251 - }  
252 - };  
253 - }  
254 -  
255 public static String fromVersionedIdToObjectId(String pathIdVer) { 186 public static String fromVersionedIdToObjectId(String pathIdVer) {
256 try { 187 try {
257 if (pathIdVer == null) { 188 if (pathIdVer == null) {
@@ -181,6 +181,7 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService { @@ -181,6 +181,7 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService {
181 } 181 }
182 } 182 }
183 }); 183 });
  184 + clientContext.update(lwM2MClient);
184 // #2.1 185 // #2.1
185 lwM2MClient.getSharedAttributes().forEach((pathIdVer, tsKvProto) -> { 186 lwM2MClient.getSharedAttributes().forEach((pathIdVer, tsKvProto) -> {
186 this.pushUpdateToClientIfNeeded(lwM2MClient, this.getResourceValueFormatKv(lwM2MClient, pathIdVer), 187 this.pushUpdateToClientIfNeeded(lwM2MClient, this.getResourceValueFormatKv(lwM2MClient, pathIdVer),
@@ -29,7 +29,6 @@ import org.eclipse.leshan.core.node.codec.LwM2mValueConverter; @@ -29,7 +29,6 @@ import org.eclipse.leshan.core.node.codec.LwM2mValueConverter;
29 import org.eclipse.leshan.core.request.ContentFormat; 29 import org.eclipse.leshan.core.request.ContentFormat;
30 import org.eclipse.leshan.server.model.LwM2mModelProvider; 30 import org.eclipse.leshan.server.model.LwM2mModelProvider;
31 import org.eclipse.leshan.server.registration.Registration; 31 import org.eclipse.leshan.server.registration.Registration;
32 -import org.eclipse.leshan.server.security.SecurityInfo;  
33 import org.thingsboard.server.common.data.Device; 32 import org.thingsboard.server.common.data.Device;
34 import org.thingsboard.server.common.data.DeviceProfile; 33 import org.thingsboard.server.common.data.DeviceProfile;
35 import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; 34 import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
@@ -38,16 +37,16 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -38,16 +37,16 @@ import org.thingsboard.server.common.data.id.TenantId;
38 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 37 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
39 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; 38 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
40 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; 39 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
41 -import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest;  
42 40
  41 +import java.io.IOException;
  42 +import java.io.ObjectInputStream;
  43 +import java.io.Serializable;
43 import java.util.Collection; 44 import java.util.Collection;
44 import java.util.Map; 45 import java.util.Map;
45 import java.util.Optional; 46 import java.util.Optional;
46 -import java.util.Queue;  
47 import java.util.Set; 47 import java.util.Set;
48 import java.util.UUID; 48 import java.util.UUID;
49 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.ConcurrentHashMap;
50 -import java.util.concurrent.ConcurrentLinkedQueue;  
51 import java.util.concurrent.locks.Lock; 50 import java.util.concurrent.locks.Lock;
52 import java.util.concurrent.locks.ReentrantLock; 51 import java.util.concurrent.locks.ReentrantLock;
53 import java.util.stream.Collectors; 52 import java.util.stream.Collectors;
@@ -60,48 +59,38 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f @@ -60,48 +59,38 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f
60 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.getVerFromPathIdVerOrId; 59 import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.getVerFromPathIdVerOrId;
61 60
62 @Slf4j 61 @Slf4j
63 -public class LwM2mClient implements Cloneable { 62 +public class LwM2mClient implements Serializable {
  63 +
  64 + private static final long serialVersionUID = 8793482946289222623L;
64 65
65 private final String nodeId; 66 private final String nodeId;
66 @Getter 67 @Getter
67 private final String endpoint; 68 private final String endpoint;
68 - private final Lock lock;  
69 - @Getter  
70 - @Setter  
71 - private LwM2MClientState state; 69 +
  70 + private transient Lock lock;
  71 +
72 @Getter 72 @Getter
73 private final Map<String, ResourceValue> resources; 73 private final Map<String, ResourceValue> resources;
74 @Getter 74 @Getter
75 private final Map<String, TsKvProto> sharedAttributes; 75 private final Map<String, TsKvProto> sharedAttributes;
76 - @Getter  
77 - private final Queue<LwM2mQueuedRequest> queuedRequests;  
78 76
79 @Getter 77 @Getter
80 - private String deviceName;  
81 - @Getter  
82 - private String deviceProfileName;  
83 -  
84 - @Getter  
85 - private PowerMode powerMode;  
86 -  
87 - @Getter  
88 - private String identity;  
89 - @Getter  
90 - private SecurityInfo securityInfo;  
91 - @Getter  
92 private TenantId tenantId; 78 private TenantId tenantId;
93 @Getter 79 @Getter
  80 + private UUID profileId;
  81 + @Getter
94 private UUID deviceId; 82 private UUID deviceId;
95 @Getter 83 @Getter
  84 + @Setter
  85 + private LwM2MClientState state;
  86 + @Getter
96 private SessionInfoProto session; 87 private SessionInfoProto session;
97 @Getter 88 @Getter
98 - private UUID profileId; 89 + private PowerMode powerMode;
99 @Getter 90 @Getter
100 @Setter 91 @Setter
101 private Registration registration; 92 private Registration registration;
102 93
103 - private ValidateDeviceCredentialsResponse credentials;  
104 -  
105 public Object clone() throws CloneNotSupportedException { 94 public Object clone() throws CloneNotSupportedException {
106 return super.clone(); 95 return super.clone();
107 } 96 }
@@ -109,23 +98,17 @@ public class LwM2mClient implements Cloneable { @@ -109,23 +98,17 @@ public class LwM2mClient implements Cloneable {
109 public LwM2mClient(String nodeId, String endpoint) { 98 public LwM2mClient(String nodeId, String endpoint) {
110 this.nodeId = nodeId; 99 this.nodeId = nodeId;
111 this.endpoint = endpoint; 100 this.endpoint = endpoint;
112 - this.lock = new ReentrantLock();  
113 this.sharedAttributes = new ConcurrentHashMap<>(); 101 this.sharedAttributes = new ConcurrentHashMap<>();
114 this.resources = new ConcurrentHashMap<>(); 102 this.resources = new ConcurrentHashMap<>();
115 - this.queuedRequests = new ConcurrentLinkedQueue<>();  
116 this.state = LwM2MClientState.CREATED; 103 this.state = LwM2MClientState.CREATED;
  104 + this.lock = new ReentrantLock();
117 } 105 }
118 106
119 - public void init(String identity, SecurityInfo securityInfo, ValidateDeviceCredentialsResponse credentials, UUID sessionId) {  
120 - this.identity = identity;  
121 - this.securityInfo = securityInfo;  
122 - this.credentials = credentials; 107 + public void init(ValidateDeviceCredentialsResponse credentials, UUID sessionId) {
123 this.session = createSession(nodeId, sessionId, credentials); 108 this.session = createSession(nodeId, sessionId, credentials);
124 this.tenantId = new TenantId(new UUID(session.getTenantIdMSB(), session.getTenantIdLSB())); 109 this.tenantId = new TenantId(new UUID(session.getTenantIdMSB(), session.getTenantIdLSB()));
125 this.deviceId = new UUID(session.getDeviceIdMSB(), session.getDeviceIdLSB()); 110 this.deviceId = new UUID(session.getDeviceIdMSB(), session.getDeviceIdLSB());
126 this.profileId = new UUID(session.getDeviceProfileIdMSB(), session.getDeviceProfileIdLSB()); 111 this.profileId = new UUID(session.getDeviceProfileIdMSB(), session.getDeviceProfileIdLSB());
127 - this.deviceName = session.getDeviceName();  
128 - this.deviceProfileName = session.getDeviceType();  
129 this.powerMode = credentials.getDeviceInfo().getPowerMode(); 112 this.powerMode = credentials.getDeviceInfo().getPowerMode();
130 } 113 }
131 114
@@ -140,10 +123,9 @@ public class LwM2mClient implements Cloneable { @@ -140,10 +123,9 @@ public class LwM2mClient implements Cloneable {
140 public void onDeviceUpdate(Device device, Optional<DeviceProfile> deviceProfileOpt) { 123 public void onDeviceUpdate(Device device, Optional<DeviceProfile> deviceProfileOpt) {
141 SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session); 124 SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session);
142 this.deviceId = device.getUuidId(); 125 this.deviceId = device.getUuidId();
143 - this.deviceName = device.getName();  
144 builder.setDeviceIdMSB(deviceId.getMostSignificantBits()); 126 builder.setDeviceIdMSB(deviceId.getMostSignificantBits());
145 builder.setDeviceIdLSB(deviceId.getLeastSignificantBits()); 127 builder.setDeviceIdLSB(deviceId.getLeastSignificantBits());
146 - builder.setDeviceName(deviceName); 128 + builder.setDeviceName(device.getName());
147 deviceProfileOpt.ifPresent(deviceProfile -> updateSession(deviceProfile, builder)); 129 deviceProfileOpt.ifPresent(deviceProfile -> updateSession(deviceProfile, builder));
148 this.session = builder.build(); 130 this.session = builder.build();
149 this.powerMode = ((Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration()).getPowerMode(); 131 this.powerMode = ((Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration()).getPowerMode();
@@ -156,13 +138,22 @@ public class LwM2mClient implements Cloneable { @@ -156,13 +138,22 @@ public class LwM2mClient implements Cloneable {
156 } 138 }
157 139
158 private void updateSession(DeviceProfile deviceProfile, SessionInfoProto.Builder builder) { 140 private void updateSession(DeviceProfile deviceProfile, SessionInfoProto.Builder builder) {
159 - this.deviceProfileName = deviceProfile.getName();  
160 this.profileId = deviceProfile.getUuidId(); 141 this.profileId = deviceProfile.getUuidId();
161 builder.setDeviceProfileIdMSB(profileId.getMostSignificantBits()); 142 builder.setDeviceProfileIdMSB(profileId.getMostSignificantBits());
162 builder.setDeviceProfileIdLSB(profileId.getLeastSignificantBits()); 143 builder.setDeviceProfileIdLSB(profileId.getLeastSignificantBits());
163 - builder.setDeviceType(this.deviceProfileName); 144 + builder.setDeviceType(deviceProfile.getName());
164 } 145 }
165 146
  147 + public void refreshSessionId(String nodeId) {
  148 + UUID newId = UUID.randomUUID();
  149 + SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session);
  150 + builder.setNodeId(nodeId);
  151 + builder.setSessionIdMSB(newId.getMostSignificantBits());
  152 + builder.setSessionIdLSB(newId.getLeastSignificantBits());
  153 + this.session = builder.build();
  154 + }
  155 +
  156 +
166 private SessionInfoProto createSession(String nodeId, UUID sessionId, ValidateDeviceCredentialsResponse msg) { 157 private SessionInfoProto createSession(String nodeId, UUID sessionId, ValidateDeviceCredentialsResponse msg) {
167 return SessionInfoProto.newBuilder() 158 return SessionInfoProto.newBuilder()
168 .setNodeId(nodeId) 159 .setNodeId(nodeId)
@@ -364,5 +355,10 @@ public class LwM2mClient implements Cloneable { @@ -364,5 +355,10 @@ public class LwM2mClient implements Cloneable {
364 } 355 }
365 } 356 }
366 357
  358 + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
  359 + in.defaultReadObject();
  360 + this.lock = new ReentrantLock();
  361 + }
  362 +
367 } 363 }
368 364
@@ -28,8 +28,6 @@ import java.util.UUID; @@ -28,8 +28,6 @@ import java.util.UUID;
28 28
29 public interface LwM2mClientContext { 29 public interface LwM2mClientContext {
30 30
31 - LwM2mClient getClientByRegistrationId(String registrationId);  
32 -  
33 LwM2mClient getClientByEndpoint(String endpoint); 31 LwM2mClient getClientByEndpoint(String endpoint);
34 32
35 LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo); 33 LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo);
@@ -53,10 +51,9 @@ public interface LwM2mClientContext { @@ -53,10 +51,9 @@ public interface LwM2mClientContext {
53 51
54 LwM2mClient getClientByDeviceId(UUID deviceId); 52 LwM2mClient getClientByDeviceId(UUID deviceId);
55 53
56 - String getObjectIdByKeyNameFromProfile(TransportProtos.SessionInfoProto sessionInfo, String keyName);  
57 -  
58 String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName); 54 String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName);
59 55
60 void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials); 56 void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials);
61 57
  58 + void update(LwM2mClient lwM2MClient);
62 } 59 }
@@ -24,6 +24,9 @@ import org.eclipse.leshan.server.registration.Registration; @@ -24,6 +24,9 @@ import org.eclipse.leshan.server.registration.Registration;
24 import org.springframework.stereotype.Service; 24 import org.springframework.stereotype.Service;
25 import org.thingsboard.server.common.data.DeviceProfile; 25 import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; 26 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
  27 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  28 +import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
  29 +import org.thingsboard.server.common.transport.TransportService;
27 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 30 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
28 import org.thingsboard.server.gen.transport.TransportProtos; 31 import org.thingsboard.server.gen.transport.TransportProtos;
29 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 32 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
@@ -31,6 +34,8 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; @@ -31,6 +34,8 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
31 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; 34 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
32 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; 35 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
33 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; 36 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
  37 +import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
  38 +import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore;
34 import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore; 39 import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
35 40
36 import java.util.Arrays; 41 import java.util.Arrays;
@@ -56,25 +61,50 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -56,25 +61,50 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
56 private final LwM2mTransportContext context; 61 private final LwM2mTransportContext context;
57 private final LwM2MTransportServerConfig config; 62 private final LwM2MTransportServerConfig config;
58 private final TbMainSecurityStore securityStore; 63 private final TbMainSecurityStore securityStore;
  64 + private final TbLwM2MClientStore clientStore;
  65 + private final LwM2MSessionManager sessionManager;
  66 + private final TransportDeviceProfileCache deviceProfileCache;
59 private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>(); 67 private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
60 private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); 68 private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
61 private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>(); 69 private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
62 70
63 @Override 71 @Override
64 public LwM2mClient getClientByEndpoint(String endpoint) { 72 public LwM2mClient getClientByEndpoint(String endpoint) {
65 - return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> new LwM2mClient(context.getNodeId(), ep)); 73 + return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> {
  74 + LwM2mClient client = clientStore.get(ep);
  75 + String nodeId = context.getNodeId();
  76 + if (client == null) {
  77 + log.info("[{}] initialized new client.", endpoint);
  78 + client = new LwM2mClient(nodeId, ep);
  79 + } else {
  80 + log.debug("[{}] fetched client from store: {}", endpoint, client);
  81 + boolean updated = false;
  82 + if (client.getRegistration() != null) {
  83 + lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client);
  84 + }
  85 + if (client.getSession() != null) {
  86 + client.refreshSessionId(nodeId);
  87 + sessionManager.register(client.getSession());
  88 + updated = true;
  89 + }
  90 + if (updated) {
  91 + clientStore.put(client);
  92 + }
  93 + }
  94 + return client;
  95 + });
66 } 96 }
67 97
68 @Override 98 @Override
69 - public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { 99 + public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
70 TransportProtos.SessionInfoProto oldSession = null; 100 TransportProtos.SessionInfoProto oldSession = null;
71 - lwM2MClient.lock(); 101 + client.lock();
72 try { 102 try {
73 - if (LwM2MClientState.UNREGISTERED.equals(lwM2MClient.getState())) {  
74 - throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); 103 + if (LwM2MClientState.UNREGISTERED.equals(client.getState())) {
  104 + throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
75 } 105 }
76 - oldSession = lwM2MClient.getSession();  
77 - TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint()); 106 + oldSession = client.getSession();
  107 + TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(client.getEndpoint());
78 if (securityInfo.getSecurityMode() != null) { 108 if (securityInfo.getSecurityMode() != null) {
79 if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) { 109 if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) {
80 securityStore.registerX509(registration.getEndpoint(), registration.getId()); 110 securityStore.registerX509(registration.getEndpoint(), registration.getId());
@@ -82,54 +112,57 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -82,54 +112,57 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
82 if (securityInfo.getDeviceProfile() != null) { 112 if (securityInfo.getDeviceProfile() != null) {
83 profileUpdate(securityInfo.getDeviceProfile()); 113 profileUpdate(securityInfo.getDeviceProfile());
84 if (securityInfo.getSecurityInfo() != null) { 114 if (securityInfo.getSecurityInfo() != null) {
85 - lwM2MClient.init(securityInfo.getSecurityInfo().getIdentity(), securityInfo.getSecurityInfo(), securityInfo.getMsg(), UUID.randomUUID()); 115 + client.init(securityInfo.getMsg(), UUID.randomUUID());
86 } else if (NO_SEC.equals(securityInfo.getSecurityMode())) { 116 } else if (NO_SEC.equals(securityInfo.getSecurityMode())) {
87 - lwM2MClient.init(null, null, securityInfo.getMsg(), UUID.randomUUID()); 117 + client.init(securityInfo.getMsg(), UUID.randomUUID());
88 } else { 118 } else {
89 - throw new RuntimeException(String.format("Registration failed: device %s not found.", lwM2MClient.getEndpoint())); 119 + throw new RuntimeException(String.format("Registration failed: device %s not found.", client.getEndpoint()));
90 } 120 }
91 } else { 121 } else {
92 - throw new RuntimeException(String.format("Registration failed: device %s not found.", lwM2MClient.getEndpoint())); 122 + throw new RuntimeException(String.format("Registration failed: device %s not found.", client.getEndpoint()));
93 } 123 }
94 } else { 124 } else {
95 - throw new RuntimeException(String.format("Registration failed: FORBIDDEN, endpointId: %s", lwM2MClient.getEndpoint())); 125 + throw new RuntimeException(String.format("Registration failed: FORBIDDEN, endpointId: %s", client.getEndpoint()));
96 } 126 }
97 - lwM2MClient.setRegistration(registration);  
98 - this.lwM2mClientsByRegistrationId.put(registration.getId(), lwM2MClient);  
99 - lwM2MClient.setState(LwM2MClientState.REGISTERED); 127 + client.setRegistration(registration);
  128 + this.lwM2mClientsByRegistrationId.put(registration.getId(), client);
  129 + client.setState(LwM2MClientState.REGISTERED);
  130 + clientStore.put(client);
100 } finally { 131 } finally {
101 - lwM2MClient.unlock(); 132 + client.unlock();
102 } 133 }
103 return Optional.ofNullable(oldSession); 134 return Optional.ofNullable(oldSession);
104 } 135 }
105 136
106 @Override 137 @Override
107 - public void updateRegistration(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException {  
108 - lwM2MClient.lock(); 138 + public void updateRegistration(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
  139 + client.lock();
109 try { 140 try {
110 - if (!LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) {  
111 - throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); 141 + if (!LwM2MClientState.REGISTERED.equals(client.getState())) {
  142 + throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
112 } 143 }
113 - lwM2MClient.setRegistration(registration); 144 + client.setRegistration(registration);
  145 + clientStore.put(client);
114 } finally { 146 } finally {
115 - lwM2MClient.unlock(); 147 + client.unlock();
116 } 148 }
117 } 149 }
118 150
119 @Override 151 @Override
120 - public void unregister(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException {  
121 - lwM2MClient.lock(); 152 + public void unregister(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
  153 + client.lock();
122 try { 154 try {
123 - if (!LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) {  
124 - throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); 155 + if (!LwM2MClientState.REGISTERED.equals(client.getState())) {
  156 + throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
125 } 157 }
126 lwM2mClientsByRegistrationId.remove(registration.getId()); 158 lwM2mClientsByRegistrationId.remove(registration.getId());
127 - Registration currentRegistration = lwM2MClient.getRegistration(); 159 + Registration currentRegistration = client.getRegistration();
128 if (currentRegistration.getId().equals(registration.getId())) { 160 if (currentRegistration.getId().equals(registration.getId())) {
129 - lwM2MClient.setState(LwM2MClientState.UNREGISTERED);  
130 - lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint());  
131 - this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId());  
132 - UUID profileId = lwM2MClient.getProfileId(); 161 + client.setState(LwM2MClientState.UNREGISTERED);
  162 + lwM2mClientsByEndpoint.remove(client.getEndpoint());
  163 + this.securityStore.remove(client.getEndpoint(), registration.getId());
  164 + clientStore.remove(client.getEndpoint());
  165 + UUID profileId = client.getProfileId();
133 if (profileId != null) { 166 if (profileId != null) {
134 Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst(); 167 Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
135 if (otherClients.isEmpty()) { 168 if (otherClients.isEmpty()) {
@@ -137,24 +170,19 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -137,24 +170,19 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
137 } 170 }
138 } 171 }
139 } else { 172 } else {
140 - throw new LwM2MClientStateException(lwM2MClient.getState(), "Client has different registration."); 173 + throw new LwM2MClientStateException(client.getState(), "Client has different registration.");
141 } 174 }
142 } finally { 175 } finally {
143 - lwM2MClient.unlock(); 176 + client.unlock();
144 } 177 }
145 } 178 }
146 179
147 @Override 180 @Override
148 - public LwM2mClient getClientByRegistrationId(String registrationId) {  
149 - return lwM2mClientsByRegistrationId.get(registrationId);  
150 - }  
151 -  
152 - @Override  
153 public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) { 181 public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
154 LwM2mClient lwM2mClient = null; 182 LwM2mClient lwM2mClient = null;
  183 + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
155 Predicate<LwM2mClient> isClientFilter = c -> 184 Predicate<LwM2mClient> isClientFilter = c ->
156 - (new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()))  
157 - .equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB()))); 185 + sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
158 if (this.lwM2mClientsByEndpoint.size() > 0) { 186 if (this.lwM2mClientsByEndpoint.size() > 0) {
159 lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null); 187 lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null);
160 } 188 }
@@ -162,31 +190,17 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -162,31 +190,17 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
162 lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(isClientFilter).findAny().orElse(null); 190 lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(isClientFilter).findAny().orElse(null);
163 } 191 }
164 if (lwM2mClient == null) { 192 if (lwM2mClient == null) {
165 - log.warn("Device TimeOut? lwM2mClient is null.");  
166 - log.warn("SessionInfo input [{}], lwM2mClientsByEndpoint size: [{}] lwM2mClientsByRegistrationId: [{}]", sessionInfo, lwM2mClientsByEndpoint.values(), lwM2mClientsByRegistrationId.values());  
167 - log.error("", new RuntimeException()); 193 + log.error("[{}] Failed to lookup client by session id.", sessionId);
168 } 194 }
169 return lwM2mClient; 195 return lwM2mClient;
170 } 196 }
171 197
172 - /**  
173 - * Get path to resource from profile equal keyName  
174 - *  
175 - * @param sessionInfo -  
176 - * @param keyName -  
177 - * @return -  
178 - */  
179 - @Override  
180 - public String getObjectIdByKeyNameFromProfile(TransportProtos.SessionInfoProto sessionInfo, String keyName) {  
181 - return getObjectIdByKeyNameFromProfile(getClientBySessionInfo(sessionInfo), keyName);  
182 - }  
183 -  
184 @Override 198 @Override
185 - public String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName) {  
186 - Lwm2mDeviceProfileTransportConfiguration profile = getProfile(lwM2mClient.getProfileId()); 199 + public String getObjectIdByKeyNameFromProfile(LwM2mClient client, String keyName) {
  200 + Lwm2mDeviceProfileTransportConfiguration profile = getProfile(client.getProfileId());
187 201
188 return profile.getObserveAttr().getKeyName().entrySet().stream() 202 return profile.getObserveAttr().getKeyName().entrySet().stream()
189 - .filter(e -> e.getValue().equals(keyName) && validateResourceInModel(lwM2mClient, e.getKey(), false)).findFirst().orElseThrow( 203 + .filter(e -> e.getValue().equals(keyName) && validateResourceInModel(client, e.getKey(), false)).findFirst().orElseThrow(
190 () -> new IllegalArgumentException(keyName + " is not configured in the device profile!") 204 () -> new IllegalArgumentException(keyName + " is not configured in the device profile!")
191 ).getKey(); 205 ).getKey();
192 } 206 }
@@ -198,31 +212,55 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { @@ -198,31 +212,55 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
198 @Override 212 @Override
199 public void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials) { 213 public void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials) {
200 LwM2mClient client = getClientByEndpoint(registration.getEndpoint()); 214 LwM2mClient client = getClientByEndpoint(registration.getEndpoint());
201 - client.init(null, null, credentials, UUID.randomUUID()); 215 + client.init(credentials, UUID.randomUUID());
202 lwM2mClientsByRegistrationId.put(registration.getId(), client); 216 lwM2mClientsByRegistrationId.put(registration.getId(), client);
203 profileUpdate(credentials.getDeviceProfile()); 217 profileUpdate(credentials.getDeviceProfile());
204 } 218 }
205 219
206 @Override 220 @Override
  221 + public void update(LwM2mClient client) {
  222 + client.lock();
  223 + try {
  224 + clientStore.put(client);
  225 + } finally {
  226 + client.unlock();
  227 + }
  228 + }
  229 +
  230 + @Override
207 public Collection<LwM2mClient> getLwM2mClients() { 231 public Collection<LwM2mClient> getLwM2mClients() {
208 return lwM2mClientsByEndpoint.values(); 232 return lwM2mClientsByEndpoint.values();
209 } 233 }
210 234
211 @Override 235 @Override
212 public Lwm2mDeviceProfileTransportConfiguration getProfile(UUID profileId) { 236 public Lwm2mDeviceProfileTransportConfiguration getProfile(UUID profileId) {
213 - return profiles.get(profileId); 237 + return doGetAndCache(profileId);
214 } 238 }
215 239
216 @Override 240 @Override
217 public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) { 241 public Lwm2mDeviceProfileTransportConfiguration getProfile(Registration registration) {
218 - return profiles.get(getClientByEndpoint(registration.getEndpoint()).getProfileId()); 242 + UUID profileId = getClientByEndpoint(registration.getEndpoint()).getProfileId();
  243 + Lwm2mDeviceProfileTransportConfiguration result = doGetAndCache(profileId);
  244 + if (result == null) {
  245 + log.debug("[{}] Fetching profile [{}]", registration.getEndpoint(), profileId);
  246 + DeviceProfile deviceProfile = deviceProfileCache.get(new DeviceProfileId(profileId));
  247 + if (deviceProfile != null) {
  248 + profileUpdate(deviceProfile);
  249 + result = doGetAndCache(profileId);
  250 + }
  251 + }
  252 + return result;
  253 + }
  254 +
  255 + private Lwm2mDeviceProfileTransportConfiguration doGetAndCache(UUID profileId) {
  256 + return profiles.get(profileId);
219 } 257 }
220 258
221 @Override 259 @Override
222 public Lwm2mDeviceProfileTransportConfiguration profileUpdate(DeviceProfile deviceProfile) { 260 public Lwm2mDeviceProfileTransportConfiguration profileUpdate(DeviceProfile deviceProfile) {
223 - Lwm2mDeviceProfileTransportConfiguration lwM2MClientProfile = LwM2mTransportUtil.toLwM2MClientProfile(deviceProfile);  
224 - profiles.put(deviceProfile.getUuidId(), lwM2MClientProfile);  
225 - return lwM2MClientProfile; 261 + Lwm2mDeviceProfileTransportConfiguration clientProfile = LwM2mTransportUtil.toLwM2MClientProfile(deviceProfile);
  262 + profiles.put(deviceProfile.getUuidId(), clientProfile);
  263 + return clientProfile;
226 } 264 }
227 265
228 @Override 266 @Override
@@ -18,14 +18,46 @@ package org.thingsboard.server.transport.lwm2m.server.client; @@ -18,14 +18,46 @@ package org.thingsboard.server.transport.lwm2m.server.client;
18 import lombok.Data; 18 import lombok.Data;
19 import org.eclipse.leshan.core.model.ResourceModel; 19 import org.eclipse.leshan.core.model.ResourceModel;
20 import org.eclipse.leshan.core.node.LwM2mResource; 20 import org.eclipse.leshan.core.node.LwM2mResource;
  21 +import org.eclipse.leshan.core.node.LwM2mResourceInstance;
  22 +
  23 +import java.io.Serializable;
21 24
22 @Data 25 @Data
23 -public class ResourceValue {  
24 - private LwM2mResource lwM2mResource;  
25 - private ResourceModel resourceModel; 26 +public class ResourceValue implements Serializable {
  27 +
  28 + private static final long serialVersionUID = -228268906779089402L;
  29 +
  30 + private TbLwM2MResource lwM2mResource;
  31 + private TbResourceModel resourceModel;
26 32
27 public ResourceValue(LwM2mResource lwM2mResource, ResourceModel resourceModel) { 33 public ResourceValue(LwM2mResource lwM2mResource, ResourceModel resourceModel) {
28 - this.lwM2mResource = lwM2mResource;  
29 - this.resourceModel = resourceModel; 34 + this.lwM2mResource = toTbLwM2MResource(lwM2mResource);
  35 + this.resourceModel = toTbResourceModel(resourceModel);
  36 + }
  37 +
  38 + public void setLwM2mResource(LwM2mResource lwM2mResource) {
  39 + this.lwM2mResource = toTbLwM2MResource(lwM2mResource);
  40 + }
  41 +
  42 + public void setResourceModel(ResourceModel resourceModel) {
  43 + this.resourceModel = toTbResourceModel(resourceModel);
  44 + }
  45 +
  46 + private static TbLwM2MResource toTbLwM2MResource(LwM2mResource lwM2mResource) {
  47 + if (lwM2mResource.isMultiInstances()) {
  48 + TbLwM2MResourceInstance[] instances = (TbLwM2MResourceInstance[]) lwM2mResource.getInstances().values().stream().map(ResourceValue::toTbLwM2MResourceInstance).toArray();
  49 + return new TbLwM2MMultipleResource(lwM2mResource.getId(), lwM2mResource.getType(), instances);
  50 + } else {
  51 + return new TbLwM2MSingleResource(lwM2mResource.getId(), lwM2mResource.getValue(), lwM2mResource.getType());
  52 + }
  53 + }
  54 +
  55 + private static TbLwM2MResourceInstance toTbLwM2MResourceInstance(LwM2mResourceInstance instance) {
  56 + return new TbLwM2MResourceInstance(instance.getId(), instance.getValue(), instance.getType());
  57 + }
  58 +
  59 + private static TbResourceModel toTbResourceModel(ResourceModel resourceModel) {
  60 + return new TbResourceModel(resourceModel.id, resourceModel.name, resourceModel.operations, resourceModel.multiple,
  61 + resourceModel.mandatory, resourceModel.type, resourceModel.rangeEnumeration, resourceModel.units, resourceModel.description);
30 } 62 }
31 } 63 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.client;
  17 +
  18 +import org.eclipse.leshan.core.model.ResourceModel;
  19 +import org.eclipse.leshan.core.node.LwM2mMultipleResource;
  20 +import org.eclipse.leshan.core.node.LwM2mResourceInstance;
  21 +
  22 +import java.io.Serializable;
  23 +import java.util.Collection;
  24 +
  25 +public class TbLwM2MMultipleResource extends LwM2mMultipleResource implements TbLwM2MResource, Serializable {
  26 +
  27 + private static final long serialVersionUID = 4658477128628087186L;
  28 +
  29 + public TbLwM2MMultipleResource(int id, ResourceModel.Type type, TbLwM2MResourceInstance... instances) {
  30 + super(id, type, instances);
  31 + }
  32 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.client;
  17 +
  18 +import org.eclipse.leshan.core.node.LwM2mResource;
  19 +
  20 +public interface TbLwM2MResource extends LwM2mResource {
  21 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.client;
  17 +
  18 +import org.eclipse.leshan.core.model.ResourceModel;
  19 +import org.eclipse.leshan.core.node.LwM2mResourceInstance;
  20 +
  21 +import java.io.Serializable;
  22 +
  23 +public class TbLwM2MResourceInstance extends LwM2mResourceInstance implements Serializable {
  24 +
  25 + private static final long serialVersionUID = -8322290426892538345L;
  26 +
  27 + protected TbLwM2MResourceInstance(int id, Object value, ResourceModel.Type type) {
  28 + super(id, value, type);
  29 + }
  30 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.client;
  17 +
  18 +import org.eclipse.leshan.core.model.ResourceModel;
  19 +import org.eclipse.leshan.core.node.LwM2mSingleResource;
  20 +
  21 +import java.io.Serializable;
  22 +
  23 +public class TbLwM2MSingleResource extends LwM2mSingleResource implements TbLwM2MResource, Serializable {
  24 +
  25 + private static final long serialVersionUID = -878078368245340809L;
  26 +
  27 + public TbLwM2MSingleResource(int id, Object value, ResourceModel.Type type) {
  28 + super(id, value, type);
  29 + }
  30 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.client;
  17 +
  18 +import org.eclipse.leshan.core.model.ResourceModel;
  19 +
  20 +import java.io.Serializable;
  21 +
  22 +public class TbResourceModel extends ResourceModel implements Serializable {
  23 +
  24 + private static final long serialVersionUID = -2082846558899793932L;
  25 +
  26 + public TbResourceModel(Integer id, String name, Operations operations, Boolean multiple, Boolean mandatory, Type type, String rangeEnumeration, String units, String description) {
  27 + super(id, name, operations, multiple, mandatory, type, rangeEnumeration, units, description);
  28 + }
  29 +}
@@ -31,18 +31,8 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.L @@ -31,18 +31,8 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.L
31 @RequiredArgsConstructor 31 @RequiredArgsConstructor
32 public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService { 32 public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService {
33 33
34 - private final LwM2mClientContext clientContext;  
35 private final LwM2mTransportServerHelper helper; 34 private final LwM2mTransportServerHelper helper;
36 35
37 - /**  
38 - * @param logMsg - text msg  
39 - * @param registrationId - Id of Registration LwM2M Client  
40 - */  
41 - @Override  
42 - public void log(String registrationId, String logMsg) {  
43 - log(clientContext.getClientByRegistrationId(registrationId), logMsg);  
44 - }  
45 -  
46 @Override 36 @Override
47 public void log(LwM2mClient client, String logMsg) { 37 public void log(LwM2mClient client, String logMsg) {
48 if (logMsg != null && client != null && client.getSession() != null) { 38 if (logMsg != null && client != null && client.getSession() != null) {
@@ -21,6 +21,4 @@ public interface LwM2MTelemetryLogService { @@ -21,6 +21,4 @@ public interface LwM2MTelemetryLogService {
21 21
22 void log(LwM2mClient client, String msg); 22 void log(LwM2mClient client, String msg);
23 23
24 - void log(String registrationId, String msg);  
25 -  
26 } 24 }
@@ -52,6 +52,7 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.FirmwareUpdate @@ -52,6 +52,7 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.FirmwareUpdate
52 import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareUpdateStrategy; 52 import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareUpdateStrategy;
53 import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateResult; 53 import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateResult;
54 import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateState; 54 import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateState;
  55 +import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientOtaInfoStore;
55 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; 56 import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
56 57
57 import javax.annotation.PostConstruct; 58 import javax.annotation.PostConstruct;
@@ -123,6 +124,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -123,6 +124,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
123 private final OtaPackageDataCache otaPackageDataCache; 124 private final OtaPackageDataCache otaPackageDataCache;
124 private final LwM2MTelemetryLogService logService; 125 private final LwM2MTelemetryLogService logService;
125 private final LwM2mTransportServerHelper helper; 126 private final LwM2mTransportServerHelper helper;
  127 + private final TbLwM2MClientOtaInfoStore otaInfoStore;
126 128
127 @Autowired 129 @Autowired
128 @Lazy 130 @Lazy
@@ -174,6 +176,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -174,6 +176,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
174 }, throwable -> { 176 }, throwable -> {
175 if (fwInfo.isSupported()) { 177 if (fwInfo.isSupported()) {
176 fwInfo.setTargetFetchFailure(true); 178 fwInfo.setTargetFetchFailure(true);
  179 + update(fwInfo);
177 } 180 }
178 }, executor); 181 }, executor);
179 } 182 }
@@ -191,6 +194,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -191,6 +194,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
191 public void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional<String> newFirmwareUrl) { 194 public void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional<String> newFirmwareUrl) {
192 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); 195 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
193 fwInfo.updateTarget(newFirmwareTitle, newFirmwareVersion, newFirmwareUrl); 196 fwInfo.updateTarget(newFirmwareTitle, newFirmwareVersion, newFirmwareUrl);
  197 + update(fwInfo);
194 startFirmwareUpdateIfNeeded(client, fwInfo); 198 startFirmwareUpdateIfNeeded(client, fwInfo);
195 } 199 }
196 200
@@ -202,7 +206,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -202,7 +206,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
202 } 206 }
203 207
204 @Override 208 @Override
205 - public void onCurrentFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration) { 209 + public void onFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration) {
206 log.debug("[{}] Current fw strategy: {}", client.getEndpoint(), configuration.getFwUpdateStrategy()); 210 log.debug("[{}] Current fw strategy: {}", client.getEndpoint(), configuration.getFwUpdateStrategy());
207 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); 211 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
208 fwInfo.setFwStrategy(LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(configuration.getFwUpdateStrategy())); 212 fwInfo.setFwStrategy(LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(configuration.getFwUpdateStrategy()));
@@ -242,9 +246,10 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -242,9 +246,10 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
242 executeFwUpdate(client); 246 executeFwUpdate(client);
243 } 247 }
244 fwInfo.setUpdateState(state); 248 fwInfo.setUpdateState(state);
245 - Optional<OtaPackageUpdateStatus> status = this.toOtaPackageUpdateStatus(state); 249 + Optional<OtaPackageUpdateStatus> status = toOtaPackageUpdateStatus(state);
246 status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, 250 status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo,
247 otaStatus, "Firmware Update State: " + state.name())); 251 otaStatus, "Firmware Update State: " + state.name()));
  252 + update(fwInfo);
248 } 253 }
249 254
250 @Override 255 @Override
@@ -252,15 +257,16 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -252,15 +257,16 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
252 log.debug("[{}] Current fw result: {}", client.getEndpoint(), code); 257 log.debug("[{}] Current fw result: {}", client.getEndpoint(), code);
253 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); 258 LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
254 FirmwareUpdateResult result = FirmwareUpdateResult.fromUpdateResultFwByCode(code.intValue()); 259 FirmwareUpdateResult result = FirmwareUpdateResult.fromUpdateResultFwByCode(code.intValue());
255 - Optional<OtaPackageUpdateStatus> status = this.toOtaPackageUpdateStatus(result); 260 + Optional<OtaPackageUpdateStatus> status = toOtaPackageUpdateStatus(result);
256 status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, 261 status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo,
257 otaStatus, "Firmware Update Result: " + result.name())); 262 otaStatus, "Firmware Update Result: " + result.name()));
258 if (result.isAgain() && fwInfo.getRetryAttempts() <= 2) { 263 if (result.isAgain() && fwInfo.getRetryAttempts() <= 2) {
259 fwInfo.setRetryAttempts(fwInfo.getRetryAttempts() + 1); 264 fwInfo.setRetryAttempts(fwInfo.getRetryAttempts() + 1);
260 startFirmwareUpdateIfNeeded(client, fwInfo); 265 startFirmwareUpdateIfNeeded(client, fwInfo);
261 } else { 266 } else {
262 - fwInfo.setUpdateResult(result); 267 + fwInfo.update(result);
263 } 268 }
  269 + update(fwInfo);
264 } 270 }
265 271
266 @Override 272 @Override
@@ -378,23 +384,38 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl @@ -378,23 +384,38 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
378 } 384 }
379 385
380 public LwM2MClientOtaInfo getOrInitFwInfo(LwM2mClient client) { 386 public LwM2MClientOtaInfo getOrInitFwInfo(LwM2mClient client) {
381 - //TODO: fetch state from the cache or DB.  
382 return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> { 387 return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> {
383 - var profile = clientContext.getProfile(client.getProfileId());  
384 - return new LwM2MClientOtaInfo(endpoint, OtaPackageType.FIRMWARE, profile.getClientLwM2mSettings().getFwUpdateStrategy(),  
385 - profile.getClientLwM2mSettings().getFwUpdateResource()); 388 + LwM2MClientOtaInfo info = otaInfoStore.get(OtaPackageType.FIRMWARE, endpoint);
  389 + if (info == null) {
  390 + var profile = clientContext.getProfile(client.getProfileId());
  391 + info = new LwM2MClientOtaInfo(endpoint, OtaPackageType.FIRMWARE,
  392 + LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(profile.getClientLwM2mSettings().getFwUpdateStrategy()),
  393 + profile.getClientLwM2mSettings().getFwUpdateResource());
  394 + update(info);
  395 + }
  396 + return info;
386 }); 397 });
387 } 398 }
388 399
389 private LwM2MClientOtaInfo getOrInitSwInfo(LwM2mClient client) { 400 private LwM2MClientOtaInfo getOrInitSwInfo(LwM2mClient client) {
390 - //TODO: fetch state from the cache or DB.  
391 - return swStates.computeIfAbsent(client.getEndpoint(), endpoint -> {  
392 - var profile = clientContext.getProfile(client.getProfileId());  
393 - return new LwM2MClientOtaInfo(endpoint, OtaPackageType.SOFTWARE, profile.getClientLwM2mSettings().getSwUpdateStrategy(), profile.getClientLwM2mSettings().getSwUpdateResource()); 401 + return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> {
  402 + LwM2MClientOtaInfo info = otaInfoStore.get(OtaPackageType.SOFTWARE, endpoint);
  403 + if (info == null) {
  404 + var profile = clientContext.getProfile(client.getProfileId());
  405 + info = new LwM2MClientOtaInfo(endpoint, OtaPackageType.SOFTWARE,
  406 + LwM2MSoftwareUpdateStrategy.fromStrategySwByCode(profile.getClientLwM2mSettings().getFwUpdateStrategy()),
  407 + profile.getClientLwM2mSettings().getSwUpdateResource());
  408 + update(info);
  409 + }
  410 + return info;
394 }); 411 });
395 412
396 } 413 }
397 414
  415 + private void update(LwM2MClientOtaInfo info) {
  416 + otaInfoStore.put(info);
  417 + }
  418 +
398 private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status, String log) { 419 private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status, String log) {
399 List<TransportProtos.KeyValueProto> result = new ArrayList<>(); 420 List<TransportProtos.KeyValueProto> result = new ArrayList<>();
400 TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(fwInfo.getType(), STATE)); 421 TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(fwInfo.getType(), STATE));
@@ -15,7 +15,9 @@ @@ -15,7 +15,9 @@
15 */ 15 */
16 package org.thingsboard.server.transport.lwm2m.server.ota; 16 package org.thingsboard.server.transport.lwm2m.server.ota;
17 17
  18 +import com.fasterxml.jackson.annotation.JsonIgnore;
18 import lombok.Data; 19 import lombok.Data;
  20 +import lombok.NoArgsConstructor;
19 import org.thingsboard.server.common.data.StringUtils; 21 import org.thingsboard.server.common.data.StringUtils;
20 import org.thingsboard.server.common.data.ota.OtaPackageType; 22 import org.thingsboard.server.common.data.ota.OtaPackageType;
21 import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareUpdateStrategy; 23 import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareUpdateStrategy;
@@ -26,10 +28,11 @@ import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareU @@ -26,10 +28,11 @@ import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareU
26 import java.util.Optional; 28 import java.util.Optional;
27 29
28 @Data 30 @Data
  31 +@NoArgsConstructor
29 public class LwM2MClientOtaInfo { 32 public class LwM2MClientOtaInfo {
30 33
31 - private final String endpoint;  
32 - private final OtaPackageType type; 34 + private String endpoint;
  35 + private OtaPackageType type;
33 36
34 private String baseUrl; 37 private String baseUrl;
35 38
@@ -53,10 +56,17 @@ public class LwM2MClientOtaInfo { @@ -53,10 +56,17 @@ public class LwM2MClientOtaInfo {
53 private String failedPackageId; 56 private String failedPackageId;
54 private int retryAttempts; 57 private int retryAttempts;
55 58
56 - public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, Integer strategyCode, String baseUrl) { 59 + public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, LwM2MFirmwareUpdateStrategy fwStrategy, String baseUrl) {
57 this.endpoint = endpoint; 60 this.endpoint = endpoint;
58 this.type = type; 61 this.type = type;
59 - this.fwStrategy = strategyCode != null ? LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(strategyCode) : LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY; 62 + this.fwStrategy = fwStrategy;
  63 + this.baseUrl = baseUrl;
  64 + }
  65 +
  66 + public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, LwM2MSoftwareUpdateStrategy swStrategy, String baseUrl) {
  67 + this.endpoint = endpoint;
  68 + this.type = type;
  69 + this.swStrategy = swStrategy;
60 this.baseUrl = baseUrl; 70 this.baseUrl = baseUrl;
61 } 71 }
62 72
@@ -66,6 +76,7 @@ public class LwM2MClientOtaInfo { @@ -66,6 +76,7 @@ public class LwM2MClientOtaInfo {
66 this.targetUrl = newFirmwareUrl.orElse(null); 76 this.targetUrl = newFirmwareUrl.orElse(null);
67 } 77 }
68 78
  79 + @JsonIgnore
69 public boolean isUpdateRequired() { 80 public boolean isUpdateRequired() {
70 if (StringUtils.isEmpty(targetName) || StringUtils.isEmpty(targetVersion) || !isSupported()) { 81 if (StringUtils.isEmpty(targetName) || StringUtils.isEmpty(targetVersion) || !isSupported()) {
71 return false; 82 return false;
@@ -86,11 +97,12 @@ public class LwM2MClientOtaInfo { @@ -86,11 +97,12 @@ public class LwM2MClientOtaInfo {
86 } 97 }
87 } 98 }
88 99
  100 + @JsonIgnore
89 public boolean isSupported() { 101 public boolean isSupported() {
90 return StringUtils.isNotEmpty(currentName) || StringUtils.isNotEmpty(currentVersion5) || StringUtils.isNotEmpty(currentVersion3); 102 return StringUtils.isNotEmpty(currentName) || StringUtils.isNotEmpty(currentVersion5) || StringUtils.isNotEmpty(currentVersion3);
91 } 103 }
92 104
93 - public void setUpdateResult(FirmwareUpdateResult updateResult) { 105 + public void update(FirmwareUpdateResult updateResult) {
94 this.updateResult = updateResult; 106 this.updateResult = updateResult;
95 switch (updateResult) { 107 switch (updateResult) {
96 case INITIAL: 108 case INITIAL:
@@ -32,7 +32,7 @@ public interface LwM2MOtaUpdateService { @@ -32,7 +32,7 @@ public interface LwM2MOtaUpdateService {
32 32
33 void onCurrentFirmwareNameUpdate(LwM2mClient client, String name); 33 void onCurrentFirmwareNameUpdate(LwM2mClient client, String name);
34 34
35 - void onCurrentFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration); 35 + void onFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration);
36 36
37 void onCurrentSoftwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration); 37 void onCurrentSoftwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration);
38 38
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.session;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.context.annotation.Lazy;
  20 +import org.springframework.stereotype.Service;
  21 +import org.thingsboard.server.common.transport.TransportService;
  22 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
  24 +import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
  25 +import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener;
  26 +import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesService;
  27 +import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler;
  28 +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
  29 +
  30 +@Slf4j
  31 +@Service
  32 +@TbLwM2mTransportComponent
  33 +public class DefaultLwM2MSessionManager implements LwM2MSessionManager {
  34 +
  35 + private final TransportService transportService;
  36 + private final LwM2MAttributesService attributesService;
  37 + private final LwM2MRpcRequestHandler rpcHandler;
  38 + private final LwM2mUplinkMsgHandler uplinkHandler;
  39 +
  40 + public DefaultLwM2MSessionManager(TransportService transportService,
  41 + @Lazy LwM2MAttributesService attributesService,
  42 + @Lazy LwM2MRpcRequestHandler rpcHandler,
  43 + @Lazy LwM2mUplinkMsgHandler uplinkHandler) {
  44 + this.transportService = transportService;
  45 + this.attributesService = attributesService;
  46 + this.rpcHandler = rpcHandler;
  47 + this.uplinkHandler = uplinkHandler;
  48 + }
  49 +
  50 + @Override
  51 + public void register(TransportProtos.SessionInfoProto sessionInfo) {
  52 + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(uplinkHandler, attributesService, rpcHandler, sessionInfo, transportService));
  53 + TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
  54 + .setSessionInfo(sessionInfo)
  55 + .setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN))
  56 + .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
  57 + .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
  58 + .build();
  59 + transportService.process(msg, null);
  60 + }
  61 +
  62 + @Override
  63 + public void deregister(TransportProtos.SessionInfoProto sessionInfo) {
  64 + transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  65 + transportService.deregisterSession(sessionInfo);
  66 + }
  67 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.session;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos;
  19 +
  20 +public interface LwM2MSessionManager {
  21 +
  22 + void register(TransportProtos.SessionInfoProto sessionInfo);
  23 +
  24 + void deregister(TransportProtos.SessionInfoProto sessionInfo);
  25 +
  26 +
  27 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.thingsboard.server.common.data.ota.OtaPackageType;
  19 +import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
  20 +
  21 +public class TbDummyLwM2MClientOtaInfoStore implements TbLwM2MClientOtaInfoStore {
  22 +
  23 + @Override
  24 + public LwM2MClientOtaInfo get(OtaPackageType type, String endpoint) {
  25 + return null;
  26 + }
  27 +
  28 + @Override
  29 + public void put(LwM2MClientOtaInfo info) {
  30 +
  31 + }
  32 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  19 +
  20 +public class TbDummyLwM2MClientStore implements TbLwM2MClientStore {
  21 + @Override
  22 + public LwM2mClient get(String endpoint) {
  23 + return null;
  24 + }
  25 +
  26 + @Override
  27 + public void put(LwM2mClient client) {
  28 +
  29 + }
  30 +
  31 + @Override
  32 + public void remove(String endpoint) {
  33 +
  34 + }
  35 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.thingsboard.server.common.data.ota.OtaPackageType;
  19 +import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
  20 +
  21 +public interface TbLwM2MClientOtaInfoStore {
  22 +
  23 + LwM2MClientOtaInfo get(OtaPackageType type, String endpoint);
  24 +
  25 + void put(LwM2MClientOtaInfo info);
  26 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  19 +
  20 +public interface TbLwM2MClientStore {
  21 +
  22 + LwM2mClient get(String endpoint);
  23 +
  24 + void put(LwM2mClient client);
  25 +
  26 + void remove(String endpoint);
  27 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
  19 +import org.eclipse.leshan.server.security.SecurityInfo;
  20 +import org.nustaq.serialization.FSTConfiguration;
  21 +import org.springframework.data.redis.connection.RedisConnectionFactory;
  22 +import org.springframework.integration.redis.util.RedisLockRegistry;
  23 +import org.thingsboard.common.util.JacksonUtil;
  24 +import org.thingsboard.server.common.data.ota.OtaPackageType;
  25 +import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
  26 +import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
  27 +
  28 +import java.util.concurrent.locks.Lock;
  29 +
  30 +public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore {
  31 + private static final String OTA_EP = "OTA#EP#";
  32 +
  33 + private final RedisConnectionFactory connectionFactory;
  34 +
  35 + public TbLwM2mRedisClientOtaInfoStore(RedisConnectionFactory connectionFactory) {
  36 + this.connectionFactory = connectionFactory;
  37 + }
  38 +
  39 + @Override
  40 + public LwM2MClientOtaInfo get(OtaPackageType type, String endpoint) {
  41 + try (var connection = connectionFactory.getConnection()) {
  42 + byte[] data = connection.get((OTA_EP + type + endpoint).getBytes());
  43 + return JacksonUtil.fromBytes(data, LwM2MClientOtaInfo.class);
  44 + }
  45 + }
  46 +
  47 + @Override
  48 + public void put(LwM2MClientOtaInfo info) {
  49 + try (var connection = connectionFactory.getConnection()) {
  50 + connection.set((OTA_EP + info.getType() + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes());
  51 + }
  52 + }
  53 +
  54 +}
@@ -20,6 +20,7 @@ import org.eclipse.leshan.server.californium.registration.InMemoryRegistrationSt @@ -20,6 +20,7 @@ import org.eclipse.leshan.server.californium.registration.InMemoryRegistrationSt
20 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.beans.factory.annotation.Autowired;
21 import org.springframework.beans.factory.annotation.Value; 21 import org.springframework.beans.factory.annotation.Value;
22 import org.springframework.context.annotation.Bean; 22 import org.springframework.context.annotation.Bean;
  23 +import org.springframework.data.redis.connection.RedisConnectionFactory;
23 import org.springframework.stereotype.Component; 24 import org.springframework.stereotype.Component;
24 import org.thingsboard.server.cache.TBRedisCacheConfiguration; 25 import org.thingsboard.server.cache.TBRedisCacheConfiguration;
25 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 26 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
@@ -46,20 +47,37 @@ public class TbLwM2mStoreFactory { @@ -46,20 +47,37 @@ public class TbLwM2mStoreFactory {
46 47
47 @Bean 48 @Bean
48 private CaliforniumRegistrationStore registrationStore() { 49 private CaliforniumRegistrationStore registrationStore() {
49 - return redisConfiguration.isPresent() && useRedis ?  
50 - new TbLwM2mRedisRegistrationStore(redisConfiguration.get().redisConnectionFactory()) : new InMemoryRegistrationStore(config.getCleanPeriodInSec()); 50 + return isRedis() ?
  51 + new TbLwM2mRedisRegistrationStore(getConnectionFactory()) : new InMemoryRegistrationStore(config.getCleanPeriodInSec());
51 } 52 }
52 53
53 @Bean 54 @Bean
54 private TbMainSecurityStore securityStore() { 55 private TbMainSecurityStore securityStore() {
55 - return new TbLwM2mSecurityStore(redisConfiguration.isPresent() && useRedis ?  
56 - new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator); 56 + return new TbLwM2mSecurityStore(isRedis() ?
  57 + new TbLwM2mRedisSecurityStore(getConnectionFactory()) : new TbInMemorySecurityStore(), validator);
  58 + }
  59 +
  60 + @Bean
  61 + private TbLwM2MClientStore clientStore() {
  62 + return isRedis() ? new TbRedisLwM2MClientStore(getConnectionFactory()) : new TbDummyLwM2MClientStore();
  63 + }
  64 +
  65 + @Bean
  66 + private TbLwM2MClientOtaInfoStore otaStore() {
  67 + return isRedis() ? new TbLwM2mRedisClientOtaInfoStore(getConnectionFactory()) : new TbDummyLwM2MClientOtaInfoStore();
57 } 68 }
58 69
59 @Bean 70 @Bean
60 private TbLwM2MDtlsSessionStore sessionStore() { 71 private TbLwM2MDtlsSessionStore sessionStore() {
61 - return redisConfiguration.isPresent() && useRedis ?  
62 - new TbLwM2MDtlsSessionRedisStore(redisConfiguration.get().redisConnectionFactory()) : new TbL2M2MDtlsSessionInMemoryStore(); 72 + return isRedis() ? new TbLwM2MDtlsSessionRedisStore(getConnectionFactory()) : new TbL2M2MDtlsSessionInMemoryStore();
  73 + }
  74 +
  75 + private RedisConnectionFactory getConnectionFactory() {
  76 + return redisConfiguration.get().redisConnectionFactory();
  77 + }
  78 +
  79 + private boolean isRedis() {
  80 + return redisConfiguration.isPresent() && useRedis;
63 } 81 }
64 82
65 } 83 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
  17 +
  18 +import org.nustaq.serialization.FSTConfiguration;
  19 +import org.springframework.data.redis.connection.RedisConnectionFactory;
  20 +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  21 +
  22 +public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
  23 +
  24 + private static final String CLIENT_EP = "CLIENT#EP#";
  25 + private final RedisConnectionFactory connectionFactory;
  26 + private final FSTConfiguration serializer;
  27 +
  28 + public TbRedisLwM2MClientStore(RedisConnectionFactory redisConnectionFactory) {
  29 + this.connectionFactory = redisConnectionFactory;
  30 + this.serializer = FSTConfiguration.createDefaultConfiguration();
  31 + }
  32 +
  33 + @Override
  34 + public LwM2mClient get(String endpoint) {
  35 + try (var connection = connectionFactory.getConnection()) {
  36 + byte[] data = connection.get(getKey(endpoint));
  37 + if (data == null) {
  38 + return null;
  39 + } else {
  40 + return (LwM2mClient) serializer.asObject(data);
  41 + }
  42 + }
  43 + }
  44 +
  45 + @Override
  46 + public void put(LwM2mClient client) {
  47 + byte[] clientSerialized = serializer.asByteArray(client);
  48 + try (var connection = connectionFactory.getConnection()) {
  49 + connection.getSet(getKey(client.getEndpoint()), clientSerialized);
  50 + }
  51 + }
  52 +
  53 + @Override
  54 + public void remove(String endpoint) {
  55 + try (var connection = connectionFactory.getConnection()) {
  56 + connection.del(getKey(endpoint));
  57 + }
  58 + }
  59 +
  60 + private byte[] getKey(String endpoint) {
  61 + return (CLIENT_EP + endpoint).getBytes();
  62 + }
  63 +}
@@ -48,15 +48,11 @@ import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTrans @@ -48,15 +48,11 @@ import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTrans
48 import org.thingsboard.server.common.data.ota.OtaPackageUtil; 48 import org.thingsboard.server.common.data.ota.OtaPackageUtil;
49 import org.thingsboard.server.common.transport.TransportService; 49 import org.thingsboard.server.common.transport.TransportService;
50 import org.thingsboard.server.common.transport.TransportServiceCallback; 50 import org.thingsboard.server.common.transport.TransportServiceCallback;
51 -import org.thingsboard.server.common.transport.service.DefaultTransportService;  
52 import org.thingsboard.server.gen.transport.TransportProtos; 51 import org.thingsboard.server.gen.transport.TransportProtos;
53 -import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;  
54 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; 52 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
55 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 53 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
56 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; 54 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
57 import org.thingsboard.server.transport.lwm2m.server.LwM2mOtaConvert; 55 import org.thingsboard.server.transport.lwm2m.server.LwM2mOtaConvert;
58 -import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest;  
59 -import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener;  
60 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; 56 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
61 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper; 57 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper;
62 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; 58 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
@@ -84,6 +80,7 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttrib @@ -84,6 +80,7 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttrib
84 import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; 80 import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService;
85 import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; 81 import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService;
86 import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler; 82 import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler;
  83 +import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
87 import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; 84 import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore;
88 import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl; 85 import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl;
89 86
@@ -129,6 +126,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -129,6 +126,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
129 private final TransportService transportService; 126 private final TransportService transportService;
130 private final LwM2mTransportContext context; 127 private final LwM2mTransportContext context;
131 private final LwM2MAttributesService attributesService; 128 private final LwM2MAttributesService attributesService;
  129 + private final LwM2MSessionManager sessionManager;
132 private final LwM2MOtaUpdateService otaService; 130 private final LwM2MOtaUpdateService otaService;
133 private final LwM2MTransportServerConfig config; 131 private final LwM2MTransportServerConfig config;
134 private final LwM2MTelemetryLogService logService; 132 private final LwM2MTelemetryLogService logService;
@@ -143,12 +141,14 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -143,12 +141,14 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
143 LwM2mTransportServerHelper helper, 141 LwM2mTransportServerHelper helper,
144 LwM2mClientContext clientContext, 142 LwM2mClientContext clientContext,
145 LwM2MTelemetryLogService logService, 143 LwM2MTelemetryLogService logService,
  144 + LwM2MSessionManager sessionManager,
146 @Lazy LwM2MOtaUpdateService otaService, 145 @Lazy LwM2MOtaUpdateService otaService,
147 @Lazy LwM2MAttributesService attributesService, 146 @Lazy LwM2MAttributesService attributesService,
148 @Lazy LwM2MRpcRequestHandler rpcHandler, 147 @Lazy LwM2MRpcRequestHandler rpcHandler,
149 @Lazy LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler, 148 @Lazy LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler,
150 LwM2mTransportContext context, TbLwM2MDtlsSessionStore sessionStore) { 149 LwM2mTransportContext context, TbLwM2MDtlsSessionStore sessionStore) {
151 this.transportService = transportService; 150 this.transportService = transportService;
  151 + this.sessionManager = sessionManager;
152 this.attributesService = attributesService; 152 this.attributesService = attributesService;
153 this.otaService = otaService; 153 this.otaService = otaService;
154 this.config = config; 154 this.config = config;
@@ -205,18 +205,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -205,18 +205,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
205 Optional<SessionInfoProto> oldSessionInfo = this.clientContext.register(lwM2MClient, registration); 205 Optional<SessionInfoProto> oldSessionInfo = this.clientContext.register(lwM2MClient, registration);
206 if (oldSessionInfo.isPresent()) { 206 if (oldSessionInfo.isPresent()) {
207 log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB())); 207 log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB()));
208 - closeSession(oldSessionInfo.get()); 208 + sessionManager.deregister(oldSessionInfo.get());
209 } 209 }
210 logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); 210 logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId());
211 - SessionInfoProto sessionInfo = lwM2MClient.getSession();  
212 - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));  
213 - TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()  
214 - .setSessionInfo(sessionInfo)  
215 - .setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN))  
216 - .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())  
217 - .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())  
218 - .build();  
219 - transportService.process(msg, null); 211 + sessionManager.register(lwM2MClient.getSession());
220 this.initClientTelemetry(lwM2MClient); 212 this.initClientTelemetry(lwM2MClient);
221 this.initAttributes(lwM2MClient); 213 this.initAttributes(lwM2MClient);
222 otaService.init(lwM2MClient); 214 otaService.init(lwM2MClient);
@@ -247,14 +239,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -247,14 +239,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
247 log.warn("[{}] [{{}] Client: update after Registration", registration.getEndpoint(), registration.getId()); 239 log.warn("[{}] [{{}] Client: update after Registration", registration.getEndpoint(), registration.getId());
248 logService.log(lwM2MClient, String.format("[%s][%s] Updated registration.", registration.getId(), registration.getSocketAddress())); 240 logService.log(lwM2MClient, String.format("[%s][%s] Updated registration.", registration.getId(), registration.getSocketAddress()));
249 clientContext.updateRegistration(lwM2MClient, registration); 241 clientContext.updateRegistration(lwM2MClient, registration);
250 - TransportProtos.SessionInfoProto sessionInfo = lwM2MClient.getSession();  
251 - this.reportActivityAndRegister(sessionInfo);  
252 - if (registration.usesQueueMode()) {  
253 - LwM2mQueuedRequest request;  
254 - while ((request = lwM2MClient.getQueuedRequests().poll()) != null) {  
255 - request.send();  
256 - }  
257 - } 242 + this.reportActivityAndRegister(lwM2MClient.getSession());
258 } catch (LwM2MClientStateException stateException) { 243 } catch (LwM2MClientStateException stateException) {
259 if (LwM2MClientState.REGISTERED.equals(stateException.getState())) { 244 if (LwM2MClientState.REGISTERED.equals(stateException.getState())) {
260 log.info("[{}] update registration failed because client has different registration id: [{}] {}.", registration.getEndpoint(), stateException.getState(), stateException.getMessage()); 245 log.info("[{}] update registration failed because client has different registration id: [{}] {}.", registration.getEndpoint(), stateException.getState(), stateException.getMessage());
@@ -280,7 +265,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -280,7 +265,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
280 clientContext.unregister(client, registration); 265 clientContext.unregister(client, registration);
281 SessionInfoProto sessionInfo = client.getSession(); 266 SessionInfoProto sessionInfo = client.getSession();
282 if (sessionInfo != null) { 267 if (sessionInfo != null) {
283 - closeSession(sessionInfo); 268 + sessionManager.deregister(sessionInfo);
284 sessionStore.remove(registration.getEndpoint()); 269 sessionStore.remove(registration.getEndpoint());
285 log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType()); 270 log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
286 } else { 271 } else {
@@ -295,11 +280,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -295,11 +280,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
295 }); 280 });
296 } 281 }
297 282
298 - public void closeSession(SessionInfoProto sessionInfo) {  
299 - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);  
300 - transportService.deregisterSession(sessionInfo);  
301 - }  
302 -  
303 @Override 283 @Override
304 public void onSleepingDev(Registration registration) { 284 public void onSleepingDev(Registration registration) {
305 log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint()); 285 log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint());
@@ -307,19 +287,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -307,19 +287,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
307 //TODO: associate endpointId with device information. 287 //TODO: associate endpointId with device information.
308 } 288 }
309 289
310 -// /**  
311 -// * Cancel observation for All objects for this registration  
312 -// */  
313 -// @Override  
314 -// public void setCancelObservationsAll(Registration registration) {  
315 -// if (registration != null) {  
316 -// LwM2mClient client = clientContext.getClientByEndpoint(registration.getEndpoint());  
317 -// if (client != null && client.getRegistration() != null && client.getRegistration().getId().equals(registration.getId())) {  
318 -// defaultLwM2MDownlinkMsgHandler.sendCancelAllRequest(client, TbLwM2MCancelAllRequest.builder().build(), new TbLwM2MCancelAllObserveRequestCallback(this, client));  
319 -// }  
320 -// }  
321 -// }  
322 -  
323 /** 290 /**
324 * Sending observe value to thingsboard from ObservationListener.onResponse: object, instance, SingleResource or MultipleResource 291 * Sending observe value to thingsboard from ObservationListener.onResponse: object, instance, SingleResource or MultipleResource
325 * 292 *
@@ -344,6 +311,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -344,6 +311,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
344 this.updateResourcesValue(lwM2MClient, lwM2mResource, path); 311 this.updateResourcesValue(lwM2MClient, lwM2mResource, path);
345 } 312 }
346 } 313 }
  314 + clientContext.update(lwM2MClient);
347 } 315 }
348 } 316 }
349 317
@@ -391,16 +359,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -391,16 +359,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
391 } 359 }
392 360
393 /** 361 /**
394 - * Deregister session in transport  
395 - *  
396 - * @param sessionInfo - lwm2m client  
397 - */  
398 - @Override  
399 - public void doDisconnect(SessionInfoProto sessionInfo) {  
400 - closeSession(sessionInfo);  
401 - }  
402 -  
403 - /**  
404 * Those methods are called by the protocol stage thread pool, this means that execution MUST be done in a short delay, 362 * Those methods are called by the protocol stage thread pool, this means that execution MUST be done in a short delay,
405 * * if you need to do long time processing use a dedicated thread pool. 363 * * if you need to do long time processing use a dedicated thread pool.
406 * 364 *
@@ -494,14 +452,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -494,14 +452,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
494 attributesMap.forEach((targetId, params) -> sendWriteAttributesRequest(lwM2MClient, targetId, params)); 452 attributesMap.forEach((targetId, params) -> sendWriteAttributesRequest(lwM2MClient, targetId, params));
495 } 453 }
496 454
497 - private void sendDiscoverRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {  
498 - Set<String> targetIds = profile.getObserveAttr().getAttributeLwm2m().keySet();  
499 - targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());  
500 -// TODO: why do we need to put observe into pending read requests?  
501 -// lwM2MClient.getPendingReadRequests().addAll(targetIds);  
502 - targetIds.forEach(targetId -> sendDiscoverRequest(lwM2MClient, targetId));  
503 - }  
504 -  
505 private void sendDiscoverRequest(LwM2mClient lwM2MClient, String targetId) { 455 private void sendDiscoverRequest(LwM2mClient lwM2MClient, String targetId) {
506 TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(targetId).timeout(this.config.getTimeout()).build(); 456 TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(targetId).timeout(this.config.getTimeout()).build();
507 defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(logService, lwM2MClient, targetId)); 457 defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(logService, lwM2MClient, targetId));
@@ -652,7 +602,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -652,7 +602,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
652 List<TransportProtos.KeyValueProto> resultAttributes = new ArrayList<>(); 602 List<TransportProtos.KeyValueProto> resultAttributes = new ArrayList<>();
653 profile.getObserveAttr().getAttribute().forEach(pathIdVer -> { 603 profile.getObserveAttr().getAttribute().forEach(pathIdVer -> {
654 if (path.contains(pathIdVer)) { 604 if (path.contains(pathIdVer)) {
655 - TransportProtos.KeyValueProto kvAttr = this.getKvToThingsboard(pathIdVer, registration); 605 + TransportProtos.KeyValueProto kvAttr = this.getKvToThingsBoard(pathIdVer, registration);
656 if (kvAttr != null) { 606 if (kvAttr != null) {
657 resultAttributes.add(kvAttr); 607 resultAttributes.add(kvAttr);
658 } 608 }
@@ -661,7 +611,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -661,7 +611,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
661 List<TransportProtos.KeyValueProto> resultTelemetries = new ArrayList<>(); 611 List<TransportProtos.KeyValueProto> resultTelemetries = new ArrayList<>();
662 profile.getObserveAttr().getTelemetry().forEach(pathIdVer -> { 612 profile.getObserveAttr().getTelemetry().forEach(pathIdVer -> {
663 if (path.contains(pathIdVer)) { 613 if (path.contains(pathIdVer)) {
664 - TransportProtos.KeyValueProto kvAttr = this.getKvToThingsboard(pathIdVer, registration); 614 + TransportProtos.KeyValueProto kvAttr = this.getKvToThingsBoard(pathIdVer, registration);
665 if (kvAttr != null) { 615 if (kvAttr != null) {
666 resultTelemetries.add(kvAttr); 616 resultTelemetries.add(kvAttr);
667 } 617 }
@@ -678,7 +628,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -678,7 +628,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
678 return null; 628 return null;
679 } 629 }
680 630
681 - private TransportProtos.KeyValueProto getKvToThingsboard(String pathIdVer, Registration registration) { 631 + private TransportProtos.KeyValueProto getKvToThingsBoard(String pathIdVer, Registration registration) {
682 LwM2mClient lwM2MClient = this.clientContext.getClientByEndpoint(registration.getEndpoint()); 632 LwM2mClient lwM2MClient = this.clientContext.getClientByEndpoint(registration.getEndpoint());
683 Map<String, String> names = clientContext.getProfile(lwM2MClient.getProfileId()).getObserveAttr().getKeyName(); 633 Map<String, String> names = clientContext.getProfile(lwM2MClient.getProfileId()).getObserveAttr().getKeyName();
684 if (names != null && names.containsKey(pathIdVer)) { 634 if (names != null && names.containsKey(pathIdVer)) {
@@ -725,10 +675,12 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -725,10 +675,12 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
725 public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) { 675 public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) {
726 if (request.getNode() instanceof LwM2mResource) { 676 if (request.getNode() instanceof LwM2mResource) {
727 this.updateResourcesValue(client, ((LwM2mResource) request.getNode()), path); 677 this.updateResourcesValue(client, ((LwM2mResource) request.getNode()), path);
  678 + clientContext.update(client);
728 } else if (request.getNode() instanceof LwM2mObjectInstance) { 679 } else if (request.getNode() instanceof LwM2mObjectInstance) {
729 ((LwM2mObjectInstance) request.getNode()).getResources().forEach((resId, resource) -> { 680 ((LwM2mObjectInstance) request.getNode()).getResources().forEach((resId, resource) -> {
730 this.updateResourcesValue(client, resource, path + "/" + resId); 681 this.updateResourcesValue(client, resource, path + "/" + resId);
731 }); 682 });
  683 + clientContext.update(client);
732 } 684 }
733 } 685 }
734 686
@@ -803,7 +755,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -803,7 +755,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
803 if (!newLwM2mSettings.getFwUpdateStrategy().equals(oldLwM2mSettings.getFwUpdateStrategy()) 755 if (!newLwM2mSettings.getFwUpdateStrategy().equals(oldLwM2mSettings.getFwUpdateStrategy())
804 || (StringUtils.isNotEmpty(newLwM2mSettings.getFwUpdateResource()) && 756 || (StringUtils.isNotEmpty(newLwM2mSettings.getFwUpdateResource()) &&
805 !newLwM2mSettings.getFwUpdateResource().equals(oldLwM2mSettings.getFwUpdateResource()))) { 757 !newLwM2mSettings.getFwUpdateResource().equals(oldLwM2mSettings.getFwUpdateResource()))) {
806 - clients.forEach(lwM2MClient -> otaService.onCurrentFirmwareStrategyUpdate(lwM2MClient, newLwM2mSettings)); 758 + clients.forEach(lwM2MClient -> otaService.onFirmwareStrategyUpdate(lwM2MClient, newLwM2mSettings));
807 } 759 }
808 760
809 if (!newLwM2mSettings.getSwUpdateStrategy().equals(oldLwM2mSettings.getSwUpdateStrategy()) 761 if (!newLwM2mSettings.getSwUpdateStrategy().equals(oldLwM2mSettings.getSwUpdateStrategy())
@@ -908,7 +860,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl @@ -908,7 +860,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
908 */ 860 */
909 private void reportActivityAndRegister(SessionInfoProto sessionInfo) { 861 private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
910 if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { 862 if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
911 - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); 863 + sessionManager.register(sessionInfo);
912 this.reportActivitySubscription(sessionInfo); 864 this.reportActivitySubscription(sessionInfo);
913 } 865 }
914 } 866 }
@@ -48,8 +48,6 @@ public interface LwM2mUplinkMsgHandler { @@ -48,8 +48,6 @@ public interface LwM2mUplinkMsgHandler {
48 48
49 void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt); 49 void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt);
50 50
51 - void doDisconnect(TransportProtos.SessionInfoProto sessionInfo);  
52 -  
53 void onAwakeDev(Registration registration); 51 void onAwakeDev(Registration registration);
54 52
55 void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request); 53 void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request);
@@ -47,7 +47,7 @@ public class JacksonUtil { @@ -47,7 +47,7 @@ public class JacksonUtil {
47 throw new IllegalArgumentException("The given object value: " 47 throw new IllegalArgumentException("The given object value: "
48 + fromValue + " cannot be converted to " + toValueTypeRef, e); 48 + fromValue + " cannot be converted to " + toValueTypeRef, e);
49 } 49 }
50 - } 50 + }
51 51
52 public static <T> T fromString(String string, Class<T> clazz) { 52 public static <T> T fromString(String string, Class<T> clazz) {
53 try { 53 try {
@@ -67,6 +67,15 @@ public class JacksonUtil { @@ -67,6 +67,15 @@ public class JacksonUtil {
67 } 67 }
68 } 68 }
69 69
  70 + public static <T> T fromBytes(byte[] bytes, Class<T> clazz) {
  71 + try {
  72 + return bytes != null ? OBJECT_MAPPER.readValue(bytes, clazz) : null;
  73 + } catch (IOException e) {
  74 + throw new IllegalArgumentException("The given string value: "
  75 + + Arrays.toString(bytes) + " cannot be transformed to Json object", e);
  76 + }
  77 + }
  78 +
70 public static JsonNode fromBytes(byte[] bytes) { 79 public static JsonNode fromBytes(byte[] bytes) {
71 try { 80 try {
72 return OBJECT_MAPPER.readTree(bytes); 81 return OBJECT_MAPPER.readTree(bytes);
@@ -96,7 +105,7 @@ public class JacksonUtil { @@ -96,7 +105,7 @@ public class JacksonUtil {
96 } 105 }
97 } 106 }
98 107
99 - public static ObjectNode newObjectNode(){ 108 + public static ObjectNode newObjectNode() {
100 return OBJECT_MAPPER.createObjectNode(); 109 return OBJECT_MAPPER.createObjectNode();
101 } 110 }
102 111