Commit def5fe4c5c68a41dea8d03ade2b2274d2e0736b0

Authored by Andrii Shvaika
2 parents dc7c96c4 06c4f7a3

Merge branch 'feature/lwm2m-refactoring-downlink' of https://github.com/YevhenBo…

…ndarenko/thingsboard into feature/lwm2m-refactoring-downlink
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m.server.downlink;
  17 +
  18 +import lombok.RequiredArgsConstructor;
  19 +
  20 +import java.util.concurrent.CountDownLatch;
  21 +
  22 +@RequiredArgsConstructor
  23 +public class TbLwM2MLatchCallback<R, T> implements DownlinkRequestCallback<R, T> {
  24 +
  25 + private final CountDownLatch countDownLatch;
  26 + private final DownlinkRequestCallback<R, T> callback;
  27 +
  28 + @Override
  29 + public void onSuccess(R request, T response) {
  30 + callback.onSuccess(request, response);
  31 + countDownLatch.countDown();
  32 + }
  33 +
  34 + @Override
  35 + public void onValidationError(String params, String msg) {
  36 + callback.onValidationError(params, msg);
  37 + countDownLatch.countDown();
  38 + }
  39 +
  40 + @Override
  41 + public void onError(String params, Exception e) {
  42 + callback.onError(params, e);
  43 + countDownLatch.countDown();
  44 + }
  45 +}
@@ -18,8 +18,8 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; @@ -18,8 +18,8 @@ package org.thingsboard.server.transport.lwm2m.server.downlink;
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.eclipse.leshan.core.request.ReadRequest; 19 import org.eclipse.leshan.core.request.ReadRequest;
20 import org.eclipse.leshan.core.response.ReadResponse; 20 import org.eclipse.leshan.core.response.ReadResponse;
21 -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;  
22 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; 21 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
  22 +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
23 23
24 @Slf4j 24 @Slf4j
25 public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback<ReadRequest, ReadResponse> { 25 public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback<ReadRequest, ReadResponse> {
@@ -28,7 +28,10 @@ import org.eclipse.leshan.core.node.LwM2mObjectInstance; @@ -28,7 +28,10 @@ import org.eclipse.leshan.core.node.LwM2mObjectInstance;
28 import org.eclipse.leshan.core.node.LwM2mPath; 28 import org.eclipse.leshan.core.node.LwM2mPath;
29 import org.eclipse.leshan.core.node.LwM2mResource; 29 import org.eclipse.leshan.core.node.LwM2mResource;
30 import org.eclipse.leshan.core.observation.Observation; 30 import org.eclipse.leshan.core.observation.Observation;
  31 +import org.eclipse.leshan.core.request.ObserveRequest;
  32 +import org.eclipse.leshan.core.request.ReadRequest;
31 import org.eclipse.leshan.core.request.WriteRequest; 33 import org.eclipse.leshan.core.request.WriteRequest;
  34 +import org.eclipse.leshan.core.response.ObserveResponse;
32 import org.eclipse.leshan.core.response.ReadResponse; 35 import org.eclipse.leshan.core.response.ReadResponse;
33 import org.eclipse.leshan.server.registration.Registration; 36 import org.eclipse.leshan.server.registration.Registration;
34 import org.springframework.context.annotation.Lazy; 37 import org.springframework.context.annotation.Lazy;
@@ -68,7 +71,9 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2mFwSwUpdate; @@ -68,7 +71,9 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2mFwSwUpdate;
68 import org.thingsboard.server.transport.lwm2m.server.client.ParametersAnalyzeResult; 71 import org.thingsboard.server.transport.lwm2m.server.client.ParametersAnalyzeResult;
69 import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue; 72 import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue;
70 import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto; 73 import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto;
  74 +import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
71 import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; 75 import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler;
  76 +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MLatchCallback;
72 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback; 77 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback;
73 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest; 78 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest;
74 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MDiscoverCallback; 79 import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MDiscoverCallback;
@@ -97,6 +102,7 @@ import java.util.Random; @@ -97,6 +102,7 @@ import java.util.Random;
97 import java.util.Set; 102 import java.util.Set;
98 import java.util.UUID; 103 import java.util.UUID;
99 import java.util.concurrent.ConcurrentHashMap; 104 import java.util.concurrent.ConcurrentHashMap;
  105 +import java.util.concurrent.CountDownLatch;
100 import java.util.concurrent.ExecutorService; 106 import java.util.concurrent.ExecutorService;
101 import java.util.concurrent.TimeUnit; 107 import java.util.concurrent.TimeUnit;
102 import java.util.stream.Collectors; 108 import java.util.stream.Collectors;
@@ -451,16 +457,29 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { @@ -451,16 +457,29 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
451 Set<String> targetIds = new HashSet<>(profile.getObserveAttr().getAttribute()); 457 Set<String> targetIds = new HashSet<>(profile.getObserveAttr().getAttribute());
452 targetIds.addAll(profile.getObserveAttr().getTelemetry()); 458 targetIds.addAll(profile.getObserveAttr().getTelemetry());
453 targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet()); 459 targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());
454 - lwM2MClient.getPendingReadRequests().addAll(targetIds);  
455 - targetIds.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId)); 460 +
  461 + CountDownLatch latch = new CountDownLatch(targetIds.size());
  462 + targetIds.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId,
  463 + new TbLwM2MLatchCallback<>(latch, new TbLwM2MReadCallback(this, lwM2MClient, versionedId))));
  464 + try {
  465 + latch.await();
  466 + } catch (InterruptedException e) {
  467 + log.error("Failed to await Read requests!");
  468 + }
456 } 469 }
457 470
458 private void sendObserveRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) { 471 private void sendObserveRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {
459 Set<String> targetIds = profile.getObserveAttr().getObserve(); 472 Set<String> targetIds = profile.getObserveAttr().getObserve();
460 targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet()); 473 targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());
461 -// TODO: why do we need to put observe into pending read requests?  
462 -// lwM2MClient.getPendingReadRequests().addAll(targetIds);  
463 - targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId)); 474 +
  475 + CountDownLatch latch = new CountDownLatch(targetIds.size());
  476 + targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId,
  477 + new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, lwM2MClient, targetId))));
  478 + try {
  479 + latch.await();
  480 + } catch (InterruptedException e) {
  481 + log.error("Failed to await Observe requests!");
  482 + }
464 } 483 }
465 484
466 private void sendWriteAttributeRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) { 485 private void sendWriteAttributeRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {
@@ -485,13 +504,21 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { @@ -485,13 +504,21 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler {
485 } 504 }
486 505
487 private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId) { 506 private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId) {
  507 + sendReadRequest(lwM2MClient, versionedId, new TbLwM2MReadCallback(this, lwM2MClient, versionedId));
  508 + }
  509 +
  510 + private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback<ReadRequest, ReadResponse> callback) {
488 TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); 511 TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build();
489 - defaultLwM2MDownlinkMsgHandler.sendReadRequest(lwM2MClient, request, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)); 512 + defaultLwM2MDownlinkMsgHandler.sendReadRequest(lwM2MClient, request, callback);
490 } 513 }
491 514
492 private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId) { 515 private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId) {
  516 + sendObserveRequest(lwM2MClient, versionedId, new TbLwM2MObserveCallback(this, lwM2MClient, versionedId));
  517 + }
  518 +
  519 + private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback<ObserveRequest, ObserveResponse> callback) {
493 TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); 520 TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build();
494 - defaultLwM2MDownlinkMsgHandler.sendObserveRequest(lwM2MClient, request, new TbLwM2MObserveCallback(this, lwM2MClient, versionedId)); 521 + defaultLwM2MDownlinkMsgHandler.sendObserveRequest(lwM2MClient, request, callback);
495 } 522 }
496 523
497 private void sendWriteAttributesRequest(LwM2mClient lwM2MClient, String targetId, ObjectAttributes params) { 524 private void sendWriteAttributesRequest(LwM2mClient lwM2MClient, String targetId, ObjectAttributes params) {