Commit 8022cfb15537e658bb77cb6e1ab6dc93d89283e2

Authored by nickAS21
Committed by Andrew Shvayka
1 parent ade09738

Lwm2m: back: fix bug delay operation

... ... @@ -96,7 +96,7 @@ public class LwM2MTransportRequest {
96 96 this.converter = LwM2mValueConverterImpl.getInstance();
97 97 executorResponse = Executors.newFixedThreadPool(this.context.getCtxServer().getRequestPoolSize(),
98 98 new NamedThreadFactory(String.format("LwM2M %s channel response", RESPONSE_CHANNEL)));
99   - executorResponseError = Executors.newFixedThreadPool(this.context.getCtxServer().getRequestErrorPoolSize(),
  99 + executorResponseError = Executors.newFixedThreadPool(this.context.getCtxServer().getRequestErrorPoolSize(),
100 100 new NamedThreadFactory(String.format("LwM2M %s channel response Error", RESPONSE_CHANNEL)));
101 101 }
102 102
... ... @@ -112,16 +112,15 @@ public class LwM2MTransportRequest {
112 112 /**
113 113 * Device management and service enablement, including Read, Write, Execute, Discover, Create, Delete and Write-Attributes
114 114 *
115   - * @param lwServer
116   - * @param registration
117   - * @param target
118   - * @param typeOper
119   - * @param contentFormatParam
120   - * @param lwM2MClient
121   - * @param observation
  115 + * @param lwServer -
  116 + * @param registration -
  117 + * @param target -
  118 + * @param typeOper -
  119 + * @param contentFormatParam -
  120 + * @param observation -
122 121 */
123   - public void sendAllRequest(LeshanServer lwServer, Registration registration, String target, String typeOper, String contentFormatParam,
124   - LwM2MClient lwM2MClient, Observation observation, Object params, long timeoutInMs, boolean isDelayedUpdate) {
  122 + public void sendAllRequest(LeshanServer lwServer, Registration registration, String target, String typeOper,
  123 + String contentFormatParam, Observation observation, Object params, long timeoutInMs) {
125 124 LwM2mPath resultIds = new LwM2mPath(target);
126 125 if (registration != null && resultIds.getObjectId() >= 0) {
127 126 DownlinkRequest request = null;
... ... @@ -221,13 +220,7 @@ public class LwM2MTransportRequest {
221 220 }
222 221
223 222 if (request != null) {
224   - this.sendRequest(lwServer, registration, request, lwM2MClient, timeoutInMs, isDelayedUpdate);
225   - } else if (isDelayedUpdate) {
226   - String msg = String.format(LOG_LW2M_ERROR + ": sendRequest: Resource path - %s msg No SendRequest to Client", target);
227   - service.sentLogsToThingsboard(msg, registration);
228   - log.error("[{}] - [{}] No SendRequest", target);
229   -// this.handleResponseError(registration, target, lwM2MClient, true);
230   -
  223 + this.sendRequest(lwServer, registration, request, timeoutInMs);
231 224 }
232 225 }
233 226 }
... ... @@ -237,45 +230,37 @@ public class LwM2MTransportRequest {
237 230 * @param lwServer -
238 231 * @param registration -
239 232 * @param request -
240   - * @param lwM2MClient -
241 233 * @param timeoutInMs -
242 234 */
243 235
244   - private void sendRequest(LeshanServer lwServer, Registration registration, DownlinkRequest request, LwM2MClient lwM2MClient, long timeoutInMs, boolean isDelayedUpdate) {
  236 + private void sendRequest(LeshanServer lwServer, Registration registration, DownlinkRequest request, long timeoutInMs) {
  237 + LwM2MClient lwM2MClient = this.service.lwM2mInMemorySecurityStore.getLwM2MClientWithReg(registration, null);
245 238 lwServer.send(registration, request, timeoutInMs, (ResponseCallback<?>) response -> {
  239 + if (!lwM2MClient.isInit()) lwM2MClient.initValue(this.service, request.getPath().toString());
246 240 if (isSuccess(((Response) response.getCoapResponse()).getCode())) {
247   - this.handleResponse(registration, request.getPath().toString(), response, request, lwM2MClient, isDelayedUpdate);
  241 + this.handleResponse(registration, request.getPath().toString(), response, request);
248 242 if (request instanceof WriteRequest && ((WriteRequest) request).isReplaceRequest()) {
249   - String delayedUpdateStr = "";
250   - if (isDelayedUpdate) {
251   - delayedUpdateStr = " (delayedUpdate) ";
252   - }
253   - String msg = String.format(LOG_LW2M_INFO + ": sendRequest Replace%s: CoapCde - %s Lwm2m code - %d name - %s Resource path - %s value - %s SendRequest to Client",
254   - delayedUpdateStr, ((Response) response.getCoapResponse()).getCode(), response.getCode().getCode(), response.getCode().getName(), request.getPath().toString(),
  243 + String msg = String.format(LOG_LW2M_INFO + ": sendRequest Replace: CoapCde - %s Lwm2m code - %d name - %s Resource path - %s value - %s SendRequest to Client",
  244 + ((Response) response.getCoapResponse()).getCode(), response.getCode().getCode(), response.getCode().getName(), request.getPath().toString(),
255 245 ((LwM2mSingleResource) ((WriteRequest) request).getNode()).getValue().toString());
256 246 service.sentLogsToThingsboard(msg, registration);
257   - log.info("[{}] - [{}] [{}] [{}] Update SendRequest[{}]", ((Response) response.getCoapResponse()).getCode(), response.getCode(), request.getPath().toString(),
258   - ((LwM2mSingleResource) ((WriteRequest) request).getNode()).getValue(), delayedUpdateStr);
  247 + log.info("[{}] [{}] - [{}] [{}] Update SendRequest[{}]", registration.getEndpoint(), ((Response) response.getCoapResponse()).getCode(), response.getCode(), request.getPath().toString(),
  248 + ((LwM2mSingleResource) ((WriteRequest) request).getNode()).getValue());
259 249 }
260 250 } else {
261 251 String msg = String.format(LOG_LW2M_ERROR + ": sendRequest: CoapCode - %s Lwm2m code - %d name - %s Resource path - %s SendRequest to Client",
262 252 ((Response) response.getCoapResponse()).getCode(), response.getCode().getCode(), response.getCode().getName(), request.getPath().toString());
263 253 service.sentLogsToThingsboard(msg, registration);
264 254 log.error("[{}] - [{}] [{}] error SendRequest", ((Response) response.getCoapResponse()).getCode(), response.getCode(), request.getPath().toString());
265   -// if (request instanceof WriteRequest && ((WriteRequest) request).isReplaceRequest() && isDelayedUpdate) {
266   -// this.handleResponseError(registration, request.getPath().toString(), lwM2MClient, isDelayedUpdate);
267   -// }
268 255 }
269   -
270 256 }, e -> {
  257 + if (!lwM2MClient.isInit()) lwM2MClient.initValue(this.service, request.getPath().toString());
271 258 String msg = String.format(LOG_LW2M_ERROR + ": sendRequest: Resource path - %s msg error - %s SendRequest to Client",
272 259 request.getPath().toString(), e.toString());
273 260 service.sentLogsToThingsboard(msg, registration);
274 261 log.error("[{}] - [{}] error SendRequest", request.getPath().toString(), e.toString());
275   -// if (request instanceof WriteRequest && ((WriteRequest) request).isReplaceRequest() && isDelayedUpdate) {
276   -// this.handleResponseError(registration, request.getPath().toString(), lwM2MClient, isDelayedUpdate);
277   -// }
278 262 });
  263 +
279 264 }
280 265
281 266 private WriteRequest getWriteRequestSingleResource(ContentFormat contentFormat, Integer objectId, Integer instanceId, Integer resourceId, Object value, ResourceModel.Type type, Registration registration) {
... ... @@ -308,34 +293,23 @@ public class LwM2MTransportRequest {
308 293 }
309 294 }
310 295
311   - private void handleResponse(Registration registration, final String path, LwM2mResponse response, DownlinkRequest request, LwM2MClient lwM2MClient, boolean isDelayedUpdate) {
  296 + private void handleResponse(Registration registration, final String path, LwM2mResponse response, DownlinkRequest request) {
312 297 executorResponse.submit(() -> {
313 298 try {
314   - sendResponse(registration, path, response, request, lwM2MClient, isDelayedUpdate);
  299 + sendResponse(registration, path, response, request);
315 300 } catch (Exception e) {
316 301 log.error("[{}] endpoint [{}] path [{}] Exception Unable to after send response.", registration.getEndpoint(), path, e);
317 302 }
318 303 });
319 304 }
320   -//
321   -// private void handleResponseError(Registration registration, final String path, LwM2MClient lwM2MClient, boolean isDelayedUpdate) {
322   -// executorResponseError.submit(() -> {
323   -// try {
324   -// if (isDelayedUpdate) lwM2MClient.onSuccessOrErrorDelayedRequests(path);
325   -// } catch (RuntimeException t) {
326   -// log.error("[{}] endpoint [{}] path [{}] RuntimeException Unable to after send response.", registration.getEndpoint(), path, t);
327   -// }
328   -// });
329   -// }
330 305
331 306 /**
332 307 * processing a response from a client
333 308 * @param registration -
334 309 * @param path -
335 310 * @param response -
336   - * @param lwM2MClient -
337 311 */
338   - private void sendResponse(Registration registration, String path, LwM2mResponse response, DownlinkRequest request, LwM2MClient lwM2MClient, boolean isDelayedUpdate) {
  312 + private void sendResponse(Registration registration, String path, LwM2mResponse response, DownlinkRequest request) {
339 313 if (response instanceof ObserveResponse || response instanceof ReadResponse) {
340 314 service.onObservationResponse(registration, path, (ReadResponse) response);
341 315 } else if (response instanceof CancelObservationResponse) {
... ... @@ -350,7 +324,7 @@ public class LwM2MTransportRequest {
350 324 log.info("[{}] Path [{}] WriteAttributesResponse 8_Send", path, response);
351 325 } else if (response instanceof WriteResponse) {
352 326 log.info("[{}] Path [{}] WriteAttributesResponse 9_Send", path, response);
353   - service.onWriteResponseOk(registration, path, (WriteRequest) request, isDelayedUpdate);
  327 + service.onWriteResponseOk(registration, path, (WriteRequest) request);
354 328 }
355 329 }
356 330 }
... ...
... ... @@ -121,6 +121,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
121 121 this.context.getScheduler().scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) context.getCtxServer().getSessionReportTimeout()), context.getCtxServer().getSessionReportTimeout(), TimeUnit.MILLISECONDS);
122 122 this.executorRegistered = Executors.newFixedThreadPool(this.context.getCtxServer().getRegisteredPoolSize(),
123 123 new NamedThreadFactory(String.format("LwM2M %s channel registered", SERVICE_CHANNEL)));
  124 +// this.executorRegistered = Executors.newWorkStealingPool(this.context.getCtxServer().getRegisteredPoolSize());
124 125 this.executorUpdateRegistered = Executors.newFixedThreadPool(this.context.getCtxServer().getUpdateRegisteredPoolSize(),
125 126 new NamedThreadFactory(String.format("LwM2M %s channel update registered", SERVICE_CHANNEL)));
126 127 this.executorUnRegistered = Executors.newFixedThreadPool(this.context.getCtxServer().getUnRegisteredPoolSize(),
... ... @@ -147,14 +148,10 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
147 148 executorRegistered.submit(() -> {
148 149 try {
149 150 log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId());
150   - LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration);
  151 + LwM2MClient lwM2MClient = this.lwM2mInMemorySecurityStore.updateInSessionsLwM2MClient(lwServer, registration);
151 152 if (lwM2MClient != null) {
152 153 lwM2MClient.setLwM2MTransportServiceImpl(this);
153   - lwM2MClient.setSessionUuid(UUID.randomUUID());
154 154 this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client Registered", registration);
155   - LwM2MClientProfile lwM2MClientProfile = lwM2mInMemorySecurityStore.getProfile(registration.getId());
156   - this.putDelayedUpdateResourcesThingsboard(lwM2MClient);
157   - this.setLwM2mFromClientValue(lwServer, registration, lwM2MClient, lwM2MClientProfile);
158 155 SessionInfoProto sessionInfo = this.getValidateSessionInfo(registration);
159 156 if (sessionInfo != null) {
160 157 lwM2MClient.setDeviceUuid(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
... ... @@ -165,9 +162,8 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
165 162 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
166 163 transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
167 164 this.sentLogsToThingsboard(LOG_LW2M_INFO + ": Client create after Registration", registration);
168   -// if (LwM2MTransportHandler.getClientUpdateValueAfterConnect(lwM2MClientProfile)) {
169   -// this.putDelayedUpdateResourcesThingsboard(lwM2MClient);
170   -// }
  165 + this.initLwM2mFromClientValue(lwServer, registration, lwM2MClient);
  166 +
171 167 } else {
172 168 log.error("Client: [{}] onRegistered [{}] name [{}] sessionInfo ", registration.getId(), registration.getEndpoint(), null);
173 169 }
... ... @@ -301,8 +297,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
301 297 if (!path.isEmpty() && (this.validatePathInAttrProfile(profile, path) || this.validatePathInTelemetryProfile(profile, path))) {
302 298 if (resourceModel != null && resourceModel.operations.isWritable()) {
303 299 lwM2MTransportRequest.sendAllRequest(lwM2MClient.getLwServer(), lwM2MClient.getRegistration(), path, POST_TYPE_OPER_WRITE_REPLACE,
304   - ContentFormat.TLV.getName(), lwM2MClient, null, value, this.context.getCtxServer().getTimeout(),
305   - false);
  300 + ContentFormat.TLV.getName(), null, value, this.context.getCtxServer().getTimeout());
306 301 } else {
307 302 log.error("Resource path - [{}] value - [{}] is not Writable and cannot be updated", path, value);
308 303 String logMsg = String.format(LOG_LW2M_ERROR + ": attributeUpdate: Resource path - %s value - %s is not Writable and cannot be updated", path, value);
... ... @@ -356,8 +351,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
356 351 @Override
357 352 public void doTrigger(LeshanServer lwServer, Registration registration, String path) {
358 353 lwM2MTransportRequest.sendAllRequest(lwServer, registration, path, POST_TYPE_OPER_EXECUTE,
359   - ContentFormat.TLV.getName(), null, null, null, this.context.getCtxServer().getTimeout(),
360   - false);
  354 + ContentFormat.TLV.getName(), null, null, this.context.getCtxServer().getTimeout());
361 355 }
362 356
363 357 /**
... ... @@ -461,23 +455,26 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
461 455 * @param registration - Registration LwM2M Client
462 456 * @param lwM2MClient - object with All parameters off client
463 457 */
464   - private void setLwM2mFromClientValue(LeshanServer lwServer, Registration registration, LwM2MClient lwM2MClient, LwM2MClientProfile lwM2MClientProfile) {
  458 + private void initLwM2mFromClientValue(LeshanServer lwServer, Registration registration, LwM2MClient lwM2MClient) {
  459 + LwM2MClientProfile lwM2MClientProfile = lwM2mInMemorySecurityStore.getProfile(registration.getId());
465 460 Set<String> clientObjects = this.getAllOjectsInClient(registration);
466   - if (clientObjects != null) {
  461 + if (clientObjects != null && !LwM2MTransportHandler.getClientOnlyObserveAfterConnect(lwM2MClientProfile)) {
467 462 // #2
468   - if (!LwM2MTransportHandler.getClientOnlyObserveAfterConnect(lwM2MClientProfile) && !LwM2MTransportHandler.getClientUpdateValueAfterConnect(lwM2MClientProfile)) {
469   - this.onSentReadAttrTelemetryToClient(lwServer, registration);
  463 + if (!LwM2MTransportHandler.getClientUpdateValueAfterConnect(lwM2MClientProfile)) {
  464 + this.initReadAttrTelemetryObserveToClient(lwServer, registration, lwM2MClient, GET_TYPE_OPER_READ);
  465 +
470 466 }
471 467 // #3
472 468 else {
  469 + lwM2MClient.getPendingRequests().addAll(clientObjects);
473 470 clientObjects.forEach(path -> {
474 471 lwM2MTransportRequest.sendAllRequest(lwServer, registration, path, GET_TYPE_OPER_READ, ContentFormat.TLV.getName(),
475   - lwM2MClient, null, null, this.context.getCtxServer().getTimeout(), false);
  472 + null, null, this.context.getCtxServer().getTimeout());
476 473 });
477 474 }
478 475 }
479 476 // #1
480   - this.onSentObserveToClient(lwServer, registration);
  477 + this.initReadAttrTelemetryObserveToClient(lwServer, registration, lwM2MClient, GET_TYPE_OPER_OBSERVE);
481 478 }
482 479
483 480 /**
... ... @@ -519,7 +516,6 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
519 516 private void updateResourcesValue(Registration registration, LwM2mResource lwM2mResource, String path) {
520 517 LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClientWithReg(registration, null);
521 518 lwM2MClient.updateResourceValue(path, lwM2mResource);
522   - log.warn("upDateResize: [{}] [{}]", lwM2MClient.getEndPoint(), path);
523 519 Set<String> paths = new HashSet<>();
524 520 paths.add(path);
525 521 this.updateAttrTelemetry(registration, false, paths);
... ... @@ -582,30 +578,45 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
582 578 }
583 579
584 580 /**
585   - * Start observe
  581 + * Start observe/read: Attr/Telemetry
586 582 * #1 - Analyze:
587   - * #1.1 path in observe profile == client resource
588   - *
589   - * @param lwServer - LeshanServer
590   - * @param registration - Registration LwM2M Client
  583 + * #1.1 path in resource profile == client resource
  584 + * @param lwServer -
  585 + * @param registration -
591 586 */
592   - private void onSentObserveToClient(LeshanServer lwServer, Registration registration) {
593   - if (lwServer.getObservationService().getObservations(registration).size() > 0) {
594   - this.setCancelObservations(lwServer, registration);
595   - }
596   - LwM2MClientProfile lwM2MClientProfile = lwM2mInMemorySecurityStore.getProfile(registration.getId());
597   - Set<String> clientInstances = this.getAllInstancesInClient(registration);
598   - lwM2MClientProfile.getPostObserveProfile().forEach(p -> {
599   - // #1.1
600   - String target = p.getAsString().toString();
601   - String[] resPath = target.split("/");
602   - String instance = "/" + resPath[1] + "/" + resPath[2];
603   - if (clientInstances.contains(instance)) {
604   - lwM2MTransportRequest.sendAllRequest(lwServer, registration, target, GET_TYPE_OPER_OBSERVE,
605   - null, null, null, null, this.context.getCtxServer().getTimeout(),
606   - false);
  587 + private void initReadAttrTelemetryObserveToClient(LeshanServer lwServer, Registration registration, LwM2MClient lwM2MClient, String typeOper) {
  588 + try {
  589 + LwM2MClientProfile lwM2MClientProfile = lwM2mInMemorySecurityStore.getProfile(registration.getId());
  590 + Set<String> clientInstances = this.getAllInstancesInClient(registration);
  591 + Set<String> result;
  592 + if (GET_TYPE_OPER_READ.equals(typeOper)) {
  593 + result = new ObjectMapper().readValue(lwM2MClientProfile.getPostAttributeProfile().getAsJsonArray().toString().getBytes(), Set.class);
  594 + result.addAll(new ObjectMapper().readValue(lwM2MClientProfile.getPostTelemetryProfile().getAsJsonArray().toString().getBytes(), Set.class));
607 595 }
608   - });
  596 + else {
  597 + result = new ObjectMapper().readValue(lwM2MClientProfile.getPostObserveProfile().getAsJsonArray().toString().getBytes(), Set.class);
  598 + }
  599 + Set<String> pathSent = ConcurrentHashMap.newKeySet();
  600 + result.forEach(p -> {
  601 + // #1.1
  602 + String target = p;
  603 + String[] resPath = target.split("/");
  604 + String instance = "/" + resPath[1] + "/" + resPath[2];
  605 + if (clientInstances.contains(instance)) {
  606 + pathSent.add(target);
  607 + }
  608 + });
  609 + lwM2MClient.getPendingRequests().addAll(pathSent);
  610 + pathSent.forEach(target -> {
  611 + lwM2MTransportRequest.sendAllRequest(lwServer, registration, target, typeOper, ContentFormat.TLV.getName(),
  612 + null, null, this.context.getCtxServer().getTimeout());
  613 + });
  614 + if (GET_TYPE_OPER_OBSERVE.equals(typeOper)) {
  615 + lwM2MClient.initValue(this, null);
  616 + }
  617 + } catch (IOException e) {
  618 + e.printStackTrace();
  619 + }
609 620 }
610 621
611 622 /**
... ... @@ -657,28 +668,6 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
657 668 return (clientInstances.size() > 0) ? clientInstances : null;
658 669 }
659 670
660   - private void onSentReadAttrTelemetryToClient(LeshanServer lwServer, Registration registration) {
661   - LwM2MClientProfile lwM2MClientProfile = lwM2mInMemorySecurityStore.getProfile(registration.getId());
662   - Set<String> clientInstances = this.getAllInstancesInClient(registration);
663   - Set<String> attr = ConcurrentHashMap.newKeySet();
664   - try {
665   - attr = new ObjectMapper().readValue(lwM2MClientProfile.getPostAttributeProfile().getAsJsonArray().toString().getBytes(), Set.class);
666   - attr.addAll(new ObjectMapper().readValue(lwM2MClientProfile.getPostTelemetryProfile().getAsJsonArray().toString().getBytes(), Set.class));
667   - } catch (IOException e) {
668   - e.printStackTrace();
669   - }
670   - attr.forEach(p -> {
671   - // #1.1
672   - String target = p;
673   - String[] resPath = target.split("/");
674   - String instance = "/" + resPath[1] + "/" + resPath[2];
675   - if (clientInstances.contains(instance)) {
676   - lwM2MTransportRequest.sendAllRequest(lwServer, registration, target, GET_TYPE_OPER_READ, ContentFormat.TLV.getName(),
677   - null, null, null, this.context.getCtxServer().getTimeout(), false);
678   - }
679   - });
680   - }
681   -
682 671 /**
683 672 * get AttrName/TelemetryName with value from Client
684 673 *
... ... @@ -767,7 +756,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
767 756 * @param path -
768 757 * @param request -
769 758 */
770   - public void onWriteResponseOk(Registration registration, String path, WriteRequest request, boolean isDelayedUpdate) {
  759 + public void onWriteResponseOk(Registration registration, String path, WriteRequest request) {
771 760 this.updateResourcesValue(registration, ((LwM2mResource) request.getNode()), path);
772 761 }
773 762
... ... @@ -841,8 +830,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
841 830 LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClientWithReg(null, registrationId);
842 831 LeshanServer lwServer = lwM2MClient.getLwServer();
843 832 Registration registration = lwM2mInMemorySecurityStore.getByRegistration(registrationId);
844   - log.warn("[{}] # 4.1", registration.getEndpoint());
845   - this.updateResourceValueObserve(lwServer, registration, sentAttrToThingsboard.getPathPostParametersAdd(), GET_TYPE_OPER_READ);
  833 + this.readResourceValueObserve(lwServer, registration, sentAttrToThingsboard.getPathPostParametersAdd(), GET_TYPE_OPER_READ);
846 834 // sent attr/telemetry to tingsboard for new path
847 835 this.updateAttrTelemetry(registration, false, sentAttrToThingsboard.getPathPostParametersAdd());
848 836 });
... ... @@ -870,8 +858,7 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
870 858 LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(null, registrationId);
871 859 LeshanServer lwServer = lwM2MClient.getLwServer();
872 860 Registration registration = lwM2mInMemorySecurityStore.getByRegistration(registrationId);
873   - log.warn("[{}] # 5.1", registration.getEndpoint());
874   - this.updateResourceValueObserve(lwServer, registration, postObserveAnalyzer.getPathPostParametersAdd(), GET_TYPE_OPER_OBSERVE);
  861 + this.readResourceValueObserve(lwServer, registration, postObserveAnalyzer.getPathPostParametersAdd(), GET_TYPE_OPER_OBSERVE);
875 862 // 5.3 del
876 863 // sent Request cancel observe to Client
877 864 this.cancelObserveIsValue(lwServer, registration, postObserveAnalyzer.getPathPostParametersDel());
... ... @@ -914,18 +901,16 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
914 901 * @param registration - Registration LwM2M Client
915 902 * @param targets - path Resources == [ "/2/0/0", "/2/0/1"]
916 903 */
917   - private void updateResourceValueObserve(LeshanServer lwServer, Registration registration, Set<String> targets, String typeOper) {
  904 + private void readResourceValueObserve(LeshanServer lwServer, Registration registration, Set<String> targets, String typeOper) {
918 905 targets.forEach(target -> {
919 906 LwM2mPath pathIds = new LwM2mPath(target);
920 907 if (pathIds.isResource()) {
921 908 if (GET_TYPE_OPER_READ.equals(typeOper)) {
922 909 lwM2MTransportRequest.sendAllRequest(lwServer, registration, target, typeOper,
923   - ContentFormat.TLV.getName(), null, null, null, this.context.getCtxServer().getTimeout(),
924   - false);
  910 + ContentFormat.TLV.getName(), null, null, this.context.getCtxServer().getTimeout());
925 911 } else if (GET_TYPE_OPER_OBSERVE.equals(typeOper)) {
926 912 lwM2MTransportRequest.sendAllRequest(lwServer, registration, target, typeOper,
927   - null, null, null, null, this.context.getCtxServer().getTimeout(),
928   - false);
  913 + null, null, null, this.context.getCtxServer().getTimeout());
929 914 }
930 915 }
931 916 });
... ... @@ -952,10 +937,11 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
952 937 }
953 938
954 939 private void putDelayedUpdateResourcesClient(LwM2MClient lwM2MClient, Object valueOld, Object valueNew, String path) {
955   - if (valueNew != null && !valueNew.toString().equals(valueOld.toString())) {
  940 + if (valueNew != null && (valueOld == null || !valueNew.toString().equals(valueOld.toString()))) {
956 941 lwM2MTransportRequest.sendAllRequest(lwM2MClient.getLwServer(), lwM2MClient.getRegistration(), path, POST_TYPE_OPER_WRITE_REPLACE,
957   - ContentFormat.TLV.getName(), lwM2MClient, null, valueNew, this.context.getCtxServer().getTimeout(),
958   - true);
  942 + ContentFormat.TLV.getName(), null, valueNew, this.context.getCtxServer().getTimeout());
  943 + } else {
  944 + log.error("05 delayError");
959 945 }
960 946 }
961 947
... ... @@ -1007,22 +993,26 @@ public class LwM2MTransportServiceImpl implements LwM2MTransportService {
1007 993 * @param sessionInfo -
1008 994 */
1009 995 public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg attributesResponse, TransportProtos.SessionInfoProto sessionInfo) {
1010   - LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(sessionInfo);
1011   - attributesResponse.getSharedAttributeListList().forEach(attr -> {
1012   - String path = this.getPathAttributeUpdate(sessionInfo, attr.getKv().getKey());
1013   - // #1.1
1014   - if (lwM2MClient.getDelayedRequests().containsKey(path) && attr.getTs() > lwM2MClient.getDelayedRequests().get(path).getTs()) {
1015   - lwM2MClient.getDelayedRequests().put(path, attr);
1016   - } else {
1017   - lwM2MClient.getDelayedRequests().put(path, attr);
1018   - }
1019   - });
1020   - // #2.1
1021   - lwM2MClient.getDelayedRequests().forEach((k, v) -> {
1022   - ArrayList<TransportProtos.KeyValueProto> listV = new ArrayList<>();
1023   - listV.add(v.getKv());
1024   - this.putDelayedUpdateResourcesClient(lwM2MClient, this.getResourceValueToString(lwM2MClient, k), getJsonObject(listV).get(v.getKv().getKey()), k);
1025   - });
  996 + try {
  997 + LwM2MClient lwM2MClient = lwM2mInMemorySecurityStore.getLwM2MClient(sessionInfo);
  998 + attributesResponse.getSharedAttributeListList().forEach(attr -> {
  999 + String path = this.getPathAttributeUpdate(sessionInfo, attr.getKv().getKey());
  1000 + // #1.1
  1001 + if (lwM2MClient.getDelayedRequests().containsKey(path) && attr.getTs() > lwM2MClient.getDelayedRequests().get(path).getTs()) {
  1002 + lwM2MClient.getDelayedRequests().put(path, attr);
  1003 + } else {
  1004 + lwM2MClient.getDelayedRequests().put(path, attr);
  1005 + }
  1006 + });
  1007 + // #2.1
  1008 + lwM2MClient.getDelayedRequests().forEach((k, v) -> {
  1009 + ArrayList<TransportProtos.KeyValueProto> listV = new ArrayList<>();
  1010 + listV.add(v.getKv());
  1011 + this.putDelayedUpdateResourcesClient(lwM2MClient, this.getResourceValueToString(lwM2MClient, k), getJsonObject(listV).get(v.getKv().getKey()), k);
  1012 + });
  1013 + } catch (Exception e) {
  1014 + log.error(String.valueOf(e));
  1015 + }
1026 1016 }
1027 1017
1028 1018 /**
... ...
... ... @@ -108,7 +108,7 @@ public class LwM2mServerListener {
108 108
109 109 @Override
110 110 public void newObservation(Observation observation, Registration registration) {
111   - log.info("Received newObservation from [{}] endpoint [{}] ", observation.getPath(), registration.getEndpoint());
  111 +// log.info("Received newObservation from [{}] endpoint [{}] ", observation.getPath(), registration.getEndpoint());
112 112 }
113 113 };
114 114
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCreden
28 28 import org.thingsboard.server.transport.lwm2m.server.LwM2MTransportServiceImpl;
29 29 import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl;
30 30
  31 +import java.util.ArrayList;
31 32 import java.util.Map;
32 33 import java.util.UUID;
33 34 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -49,73 +50,30 @@ public class LwM2MClient implements Cloneable {
49 50 private ValidateDeviceCredentialsResponseMsg credentialsResponse;
50 51 private Map<String, String> attributes;
51 52 private Map<String, ResourceValue> resources;
52   - // private Set<String> pendingRequests;
53 53 private Map<String, TransportProtos.TsKvProto> delayedRequests;
54   -// private Set<Integer> delayedRequestsId;
55   -// private Map<String, LwM2mResponse> responses;
  54 + private ArrayList<String> pendingRequests;
  55 + private boolean init;
56 56 private final LwM2mValueConverterImpl converter;
57 57
58 58 public Object clone() throws CloneNotSupportedException {
59 59 return super.clone();
60 60 }
61 61
62   - public LwM2MClient(String endPoint, String identity, SecurityInfo securityInfo, ValidateDeviceCredentialsResponseMsg credentialsResponse, UUID profileUuid) {
  62 + public LwM2MClient(String endPoint, String identity, SecurityInfo securityInfo, ValidateDeviceCredentialsResponseMsg credentialsResponse, UUID profileUuid, UUID sessionUuid) {
63 63 this.endPoint = endPoint;
64 64 this.identity = identity;
65 65 this.securityInfo = securityInfo;
66 66 this.credentialsResponse = credentialsResponse;
67 67 this.attributes = new ConcurrentHashMap<>();
68   -// this.pendingRequests = ConcurrentHashMap.newKeySet();
69 68 this.delayedRequests = new ConcurrentHashMap<>();
  69 + this.pendingRequests = new ArrayList<>();
70 70 this.resources = new ConcurrentHashMap<>();
71   -// this.delayedRequestsId = ConcurrentHashMap.newKeySet();
72 71 this.profileUuid = profileUuid;
73   - /**
74   - * Key <objectId>, response<Value -> instance -> resources: value...>
75   - */
76   -// this.responses = new ConcurrentHashMap<>();
  72 + this.sessionUuid = sessionUuid;
77 73 this.converter = LwM2mValueConverterImpl.getInstance();
  74 + this.init = false;
78 75 }
79 76
80   -// /**
81   -// * Fill with data -> Model client
82   -// *
83   -// * @param path -
84   -// * @param response -
85   -// */
86   -// public void onSuccessHandler(String path, LwM2mResponse response) {
87   -// this.responses.put(path, response);
88   -// this.pendingRequests.remove(path);
89   -// if (this.pendingRequests.size() == 0) {
90   -// this.initValue();
91   -// this.lwM2MTransportServiceImpl.putDelayedUpdateResourcesThingsboard(this);
92   -// }
93   -// }
94   -//
95   -// private void initValue() {
96   -// this.responses.forEach((key, lwM2mResponse) -> {
97   -// LwM2mPath pathIds = new LwM2mPath(key);
98   -// if (pathIds.isObjectInstance()) {
99   -// ((LwM2mObjectInstance) ((ReadResponse) lwM2mResponse).getContent()).getResources().forEach((k, v) -> {
100   -// String pathRez = pathIds.toString() + "/" + k;
101   -// this.updateResourceValue(pathRez, v);
102   -// });
103   -// }
104   -// else if (pathIds.isResource()) {
105   -// this.updateResourceValue(pathIds.toString(), ((LwM2mResource) ((ReadResponse) lwM2mResponse).getContent()));
106   -// }
107   -// });
108   -// if (this.responses.size() == 0) this.responses = new ConcurrentHashMap<>();
109   -// }
110   -
111   -// public void updateObjectInstanceResourceValue(String pathInst, LwM2mObjectInstance instance) {
112   -// LwM2mPath pathIds = new LwM2mPath(pathInst);
113   -// instance.getResources().forEach((k, v) -> {
114   -// String pathRez = pathIds.toString() + "/" + k;
115   -// this.updateResourceValue(pathRez, v);
116   -// });
117   -// }
118   -
119 77 public void updateResourceValue(String pathRez, LwM2mResource rez) {
120 78 if (rez instanceof LwM2mMultipleResource) {
121 79 this.resources.put(pathRez, new ResourceValue(rez.getValues(), null, true));
... ... @@ -124,17 +82,18 @@ public class LwM2MClient implements Cloneable {
124 82 }
125 83 }
126 84
127   -// /**
128   -// * if path != null
129   -// *
130   -// * @param path
131   -// */
132   -// public void onSuccessOrErrorDelayedRequests(String path) {
133   -// if (path != null) this.delayedRequests.remove(path);
134   -// if (this.delayedRequests.size() == 0 && this.getDelayedRequestsId().size() == 0) {
135   -// this.lwM2MTransportServiceImpl.updatesAndSentModelParameter(this);
136   -// }
137   -// }
  85 + public void initValue(LwM2MTransportServiceImpl lwM2MTransportService, String path) {
  86 + if (path != null) {
  87 + this.pendingRequests.remove(path);
  88 + }
  89 + if (this.pendingRequests.size() == 0) {
  90 + this.init = true;
  91 + lwM2MTransportService.putDelayedUpdateResourcesThingsboard(this);
  92 + }
  93 + }
138 94
  95 + public LwM2MClient copy() {
  96 + return new LwM2MClient(this.endPoint, this.identity, this.securityInfo, this.credentialsResponse, this.profileUuid, this.sessionUuid);
  97 + }
139 98 }
140 99
... ...
... ... @@ -165,7 +165,7 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
165 165 if (this.sessions.get(registration.getEndpoint()) == null) {
166 166 this.addLwM2MClientToSession(registration.getEndpoint());
167 167 }
168   - LwM2MClient lwM2MClient = this.sessions.get(registration.getEndpoint());
  168 + LwM2MClient lwM2MClient = this.sessions.get(registration.getEndpoint()).copy();
169 169 lwM2MClient.setLwServer(lwServer);
170 170 lwM2MClient.setRegistration(registration);
171 171 lwM2MClient.setAttributes(registration.getAdditionalRegistrationAttributes());
... ... @@ -203,9 +203,9 @@ public class LwM2mInMemorySecurityStore extends InMemorySecurityStore {
203 203 UUID profileUuid = (store.getDeviceProfile() != null && addUpdateProfileParameters(store.getDeviceProfile())) ? store.getDeviceProfile().getUuidId() : null;
204 204 if (store.getSecurityInfo() != null && profileUuid != null) {
205 205 String endpoint = store.getSecurityInfo().getEndpoint();
206   - sessions.put(endpoint, new LwM2MClient(endpoint, store.getSecurityInfo().getIdentity(), store.getSecurityInfo(), store.getMsg(), profileUuid));
  206 + sessions.put(endpoint, new LwM2MClient(endpoint, store.getSecurityInfo().getIdentity(), store.getSecurityInfo(), store.getMsg(), profileUuid, UUID.randomUUID()));
207 207 } else if (store.getSecurityMode() == NO_SEC.code && profileUuid != null) {
208   - sessions.put(identity, new LwM2MClient(identity, null, null, store.getMsg(), profileUuid));
  208 + sessions.put(identity, new LwM2MClient(identity, null, null, store.getMsg(), profileUuid, UUID.randomUUID()));
209 209 } else {
210 210 log.error("Registration failed: FORBIDDEN/profileUuid/device [{}] , endpointId: [{}]", profileUuid, identity);
211 211 /**
... ...