Commit ed85d3de3e27a84bcc145f441f1b9c219585846d

Authored by Volodymyr Babak
1 parent e1a6c1c4

Refactored RPC request

Showing 19 changed files with 242 additions and 107 deletions
... ... @@ -291,6 +291,9 @@ public class ActorSystemContext {
291 291 @Getter
292 292 private long statisticsPersistFrequency;
293 293
  294 + @Value("${edges.rpc.enabled}")
  295 + @Getter
  296 + private boolean edgesRpcEnabled;
294 297
295 298 @Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
296 299 public void printStats() {
... ...
... ... @@ -87,7 +87,9 @@ public class AppActor extends ContextAwareActor {
87 87 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
88 88 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
89 89 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
  90 + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
90 91 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
  92 + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
91 93 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
92 94 onToDeviceActorMsg((TenantAwareMsg) msg, true);
93 95 break;
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.device;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
  20 +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg;
20 21 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
21 22 import org.thingsboard.server.actors.ActorSystemContext;
22 23 import org.thingsboard.server.actors.TbActorCtx;
... ... @@ -26,6 +27,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28 import org.thingsboard.server.common.msg.TbActorMsg;
28 29 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
  30 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
29 31 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
30 32 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
31 33
... ... @@ -70,12 +72,18 @@ public class DeviceActor extends ContextAwareActor {
70 72 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
71 73 processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
72 74 break;
  75 + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  76 + processor.processRpcResponsesFromEdge(ctx, (FromDeviceRpcResponseActorMsg) msg);
  77 + break;
73 78 case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
74 79 processor.processServerSideRpcTimeout(ctx, (DeviceActorServerSideRpcTimeoutMsg) msg);
75 80 break;
76 81 case SESSION_TIMEOUT_MSG:
77 82 processor.checkSessionsTimeout();
78 83 break;
  84 + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
  85 + processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg);
  86 + break;
79 87 default:
80 88 return false;
81 89 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.device;
17 17
  18 +import com.fasterxml.jackson.databind.node.ObjectNode;
18 19 import com.google.common.util.concurrent.FutureCallback;
19 20 import com.google.common.util.concurrent.Futures;
20 21 import com.google.common.util.concurrent.ListenableFuture;
... ... @@ -24,17 +25,24 @@ import lombok.extern.slf4j.Slf4j;
24 25 import org.apache.commons.collections.CollectionUtils;
25 26 import org.thingsboard.rule.engine.api.RpcError;
26 27 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
  28 +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg;
27 29 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
28 30 import org.thingsboard.server.actors.ActorSystemContext;
29 31 import org.thingsboard.server.actors.TbActorCtx;
30 32 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
31 33 import org.thingsboard.server.common.data.DataConstants;
32 34 import org.thingsboard.server.common.data.Device;
  35 +import org.thingsboard.server.common.data.edge.EdgeEvent;
  36 +import org.thingsboard.server.common.data.edge.EdgeEventActionType;
  37 +import org.thingsboard.server.common.data.edge.EdgeEventType;
33 38 import org.thingsboard.server.common.data.id.DeviceId;
  39 +import org.thingsboard.server.common.data.id.EdgeId;
34 40 import org.thingsboard.server.common.data.id.TenantId;
35 41 import org.thingsboard.server.common.data.kv.AttributeKey;
36 42 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
37 43 import org.thingsboard.server.common.data.kv.KvEntry;
  44 +import org.thingsboard.server.common.data.relation.EntityRelation;
  45 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
38 46 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
39 47 import org.thingsboard.server.common.msg.TbMsgMetaData;
40 48 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -63,6 +71,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
63 71 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
64 72 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
65 73 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
  74 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
66 75 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
67 76 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
68 77
... ... @@ -98,6 +107,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
98 107 private String deviceName;
99 108 private String deviceType;
100 109 private TbMsgMetaData defaultMetaData;
  110 + private EdgeId edgeId;
101 111
102 112 DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
103 113 super(systemContext);
... ... @@ -120,12 +130,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
120 130 this.defaultMetaData = new TbMsgMetaData();
121 131 this.defaultMetaData.putValue("deviceName", deviceName);
122 132 this.defaultMetaData.putValue("deviceType", deviceType);
  133 + if (systemContext.isEdgesRpcEnabled()) {
  134 + this.edgeId = findRelatedEdgeId();
  135 + }
123 136 return true;
124 137 } else {
125 138 return false;
126 139 }
127 140 }
128 141
  142 + private EdgeId findRelatedEdgeId() {
  143 + List<EntityRelation> result =
  144 + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
  145 + if (result != null && result.size() > 0) {
  146 + EntityRelation relationToEdge = result.get(0);
  147 + if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
  148 + return new EdgeId(relationToEdge.getFrom().getId());
  149 + }
  150 + }
  151 + return null;
  152 + }
  153 +
129 154 void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
130 155 ToDeviceRpcRequest request = msg.getMsg();
131 156 ToDeviceRpcRequestBody body = request.getBody();
... ... @@ -138,15 +163,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
138 163 return;
139 164 }
140 165
141   - boolean sent = rpcSubscriptions.size() > 0;
142   - Set<UUID> syncSessionSet = new HashSet<>();
143   - rpcSubscriptions.forEach((key, value) -> {
144   - sendToTransport(rpcRequest, key, value.getNodeId());
145   - if (SessionType.SYNC == value.getType()) {
146   - syncSessionSet.add(key);
147   - }
148   - });
149   - syncSessionSet.forEach(rpcSubscriptions::remove);
  166 + boolean sent;
  167 + if (systemContext.isEdgesRpcEnabled() && edgeId != null) {
  168 + saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
  169 + sent = true;
  170 + } else {
  171 + sent = rpcSubscriptions.size() > 0;
  172 + Set<UUID> syncSessionSet = new HashSet<>();
  173 + rpcSubscriptions.forEach((key, value) -> {
  174 + sendToTransport(rpcRequest, key, value.getNodeId());
  175 + if (SessionType.SYNC == value.getType()) {
  176 + syncSessionSet.add(key);
  177 + }
  178 + });
  179 + syncSessionSet.forEach(rpcSubscriptions::remove);
  180 + }
150 181
151 182 if (request.isOneway() && sent) {
152 183 log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
... ... @@ -161,6 +192,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
161 192 }
162 193 }
163 194
  195 + void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
  196 + log.debug("[{}] Processing rpc command response from edge session", deviceId);
  197 + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  198 + boolean success = requestMd != null;
  199 + if (success) {
  200 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(responseMsg.getMsg());
  201 + } else {
  202 + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
  203 + }
  204 + }
  205 +
164 206 private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
165 207 toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
166 208 DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
... ... @@ -473,6 +515,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
473 515 this.defaultMetaData.putValue("deviceType", deviceType);
474 516 }
475 517
  518 + void processEdgeUpdate(DeviceEdgeUpdateMsg msg) {
  519 + this.edgeId = msg.getEdgeId();
  520 + }
  521 +
476 522 private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
477 523 ToTransportMsg msg = ToTransportMsg.newBuilder()
478 524 .setSessionIdMSB(sessionInfo.getSessionIdMSB())
... ... @@ -505,6 +551,25 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
505 551 systemContext.getTbCoreToTransportService().process(nodeId, msg);
506 552 }
507 553
  554 + private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
  555 + EdgeEvent edgeEvent = new EdgeEvent();
  556 + edgeEvent.setTenantId(tenantId);
  557 + edgeEvent.setAction(EdgeEventActionType.RPC_CALL);
  558 + edgeEvent.setEntityId(deviceId.getId());
  559 + edgeEvent.setType(EdgeEventType.DEVICE);
  560 +
  561 + ObjectNode body = mapper.createObjectNode();
  562 + body.put("requestId", requestId);
  563 + body.put("requestUUID", msg.getId().toString());
  564 + body.put("oneway", msg.isOneway());
  565 + body.put("expirationTime", msg.getExpirationTime());
  566 + body.put("method", msg.getBody().getMethod());
  567 + body.put("params", msg.getBody().getParams());
  568 + edgeEvent.setBody(body);
  569 +
  570 + edgeEvent.setEdgeId(edgeId);
  571 + systemContext.getEdgeEventService().saveAsync(edgeEvent);
  572 + }
508 573
509 574 private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
510 575 List<TsKvProto> clientAttributes;
... ...
... ... @@ -148,7 +148,9 @@ public class TenantActor extends RuleChainManagerActor {
148 148 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
149 149 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
150 150 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
  151 + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
151 152 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
  153 + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
152 154 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
153 155 onToDeviceActorMsg((DeviceAwareMsg) msg, true);
154 156 break;
... ...
... ... @@ -223,7 +223,7 @@ public abstract class BaseController {
223 223
224 224 @Value("${edges.rpc.enabled}")
225 225 @Getter
226   - private boolean edgesSupportEnabled;
  226 + private boolean edgesRpcEnabled;
227 227
228 228 @ExceptionHandler(ThingsboardException.class)
229 229 public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
... ... @@ -761,7 +761,7 @@ public abstract class BaseController {
761 761 }
762 762
763 763 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, CustomerId customerId, EdgeEventActionType action) {
764   - if (!edgesSupportEnabled) {
  764 + if (!edgesRpcEnabled) {
765 765 return;
766 766 }
767 767 try {
... ... @@ -772,7 +772,7 @@ public abstract class BaseController {
772 772 }
773 773
774 774 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, CustomerId customerId, EdgeEventActionType action) {
775   - if (!edgesSupportEnabled) {
  775 + if (!edgesRpcEnabled) {
776 776 return;
777 777 }
778 778 EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
... ... @@ -786,7 +786,7 @@ public abstract class BaseController {
786 786 }
787 787
788 788 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityRelation relation, EdgeEventActionType action) {
789   - if (!edgesSupportEnabled) {
  789 + if (!edgesRpcEnabled) {
790 790 return;
791 791 }
792 792 try {
... ... @@ -804,7 +804,7 @@ public abstract class BaseController {
804 804 }
805 805
806 806 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) {
807   - if (!edgesSupportEnabled) {
  807 + if (!edgesRpcEnabled) {
808 808 return;
809 809 }
810 810 EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
... ...
... ... @@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.ResponseStatus;
32 32 import org.springframework.web.bind.annotation.RestController;
33 33 import org.springframework.web.context.request.async.DeferredResult;
34 34 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
  35 +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg;
35 36 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
36 37 import org.thingsboard.server.common.data.ClaimRequest;
37 38 import org.thingsboard.server.common.data.Customer;
... ... @@ -575,6 +576,9 @@ public class DeviceController extends BaseController {
575 576
576 577 Device savedDevice = checkNotNull(deviceService.assignDeviceToEdge(getCurrentUser().getTenantId(), deviceId, edgeId));
577 578
  579 + tbClusterService.pushMsgToCore(new DeviceEdgeUpdateMsg(savedDevice.getTenantId(),
  580 + savedDevice.getId(), edgeId), null);
  581 +
578 582 logEntityAction(deviceId, savedDevice,
579 583 savedDevice.getCustomerId(),
580 584 ActionType.ASSIGNED_TO_EDGE, null, strDeviceId, strEdgeId, edge.getName());
... ... @@ -606,6 +610,9 @@ public class DeviceController extends BaseController {
606 610
607 611 Device savedDevice = checkNotNull(deviceService.unassignDeviceFromEdge(getCurrentUser().getTenantId(), deviceId, edgeId));
608 612
  613 + tbClusterService.pushMsgToCore(new DeviceEdgeUpdateMsg(savedDevice.getTenantId(),
  614 + savedDevice.getId(), null), null);
  615 +
609 616 logEntityAction(deviceId, device,
610 617 device.getCustomerId(),
611 618 ActionType.UNASSIGNED_FROM_EDGE, null, strDeviceId, strEdgeId, edge.getName());
... ...
... ... @@ -416,7 +416,7 @@ public class EdgeController extends BaseController {
416 416 public void syncEdge(@RequestBody EdgeId edgeId) throws ThingsboardException {
417 417 try {
418 418 edgeId = checkNotNull(edgeId);
419   - if (isEdgesSupportEnabled()) {
  419 + if (isEdgesRpcEnabled()) {
420 420 EdgeGrpcSession session = edgeGrpcService.getEdgeGrpcSessionById(edgeId);
421 421 Edge edge = session.getEdge();
422 422 syncEdgeService.sync(edge);
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
36 36 import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
37 37 import org.thingsboard.server.gen.edge.RequestMsg;
38 38 import org.thingsboard.server.gen.edge.ResponseMsg;
  39 +import org.thingsboard.server.queue.util.TbCoreComponent;
39 40 import org.thingsboard.server.service.edge.EdgeContextComponent;
40 41 import org.thingsboard.server.service.state.DefaultDeviceStateService;
41 42 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
... ... @@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit;
55 56 @Service
56 57 @Slf4j
57 58 @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true")
  59 +@TbCoreComponent
58 60 public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
59 61
60 62 private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
... ... @@ -176,13 +178,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
176 178 log.trace("No sessions available, sleep for the next run");
177 179 try {
178 180 Thread.sleep(1000);
179   - } catch (InterruptedException ignore) {}
  181 + } catch (InterruptedException ignore) {
  182 + }
180 183 }
181 184 } catch (Exception e) {
182 185 log.warn("Failed to process messages handling!", e);
183 186 try {
184 187 Thread.sleep(1000);
185   - } catch (InterruptedException ignore) {}
  188 + } catch (InterruptedException ignore) {
  189 + }
186 190 }
187 191 }
188 192 });
... ...
... ... @@ -377,7 +377,7 @@ public final class EdgeGrpcSession implements Closeable {
377 377 private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) {
378 378 log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent);
379 379 DeviceRpcCallMsg deviceRpcCallMsg =
380   - ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getBody());
  380 + ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody());
381 381 return DownlinkMsg.newBuilder()
382 382 .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg))
383 383 .build();
... ... @@ -398,7 +398,6 @@ public final class EdgeGrpcSession implements Closeable {
398 398 return downlinkMsg;
399 399 }
400 400
401   -
402 401 private ListenableFuture<Long> getQueueStartTs() {
403 402 ListenableFuture<Optional<AttributeKvEntry>> future =
404 403 ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
... ...
... ... @@ -18,7 +18,6 @@ package org.thingsboard.server.service.edge.rpc.constructor;
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20 20 import org.springframework.stereotype.Component;
21   -import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
22 21 import org.thingsboard.server.common.data.Device;
23 22 import org.thingsboard.server.common.data.id.CustomerId;
24 23 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -31,6 +30,8 @@ import org.thingsboard.server.gen.edge.RpcRequestMsg;
31 30 import org.thingsboard.server.gen.edge.UpdateMsgType;
32 31 import org.thingsboard.server.queue.util.TbCoreComponent;
33 32
  33 +import java.util.UUID;
  34 +
34 35 @Component
35 36 @TbCoreComponent
36 37 public class DeviceMsgConstructor {
... ... @@ -78,22 +79,26 @@ public class DeviceMsgConstructor {
78 79 .setIdLSB(deviceId.getId().getLeastSignificantBits()).build();
79 80 }
80 81
81   - public DeviceRpcCallMsg constructDeviceRpcCallMsg(JsonNode body) {
82   - RuleEngineDeviceRpcRequest request = mapper.convertValue(body, RuleEngineDeviceRpcRequest.class);
  82 + public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) {
  83 + int requestId = body.get("requestId").asInt();
  84 + boolean oneway = body.get("oneway").asBoolean();
  85 + UUID requestUUID = UUID.fromString(body.get("requestUUID").asText());
  86 + long expirationTime = body.get("expirationTime").asLong();
  87 + String method = body.get("method").asText();
  88 + String params = body.get("params").asText();
  89 +
83 90 RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder();
84   - requestBuilder.setMethod(request.getMethod());
85   - requestBuilder.setParams(request.getBody());
  91 + requestBuilder.setMethod(method);
  92 + requestBuilder.setParams(params);
86 93 DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder()
87   - .setDeviceIdMSB(request.getDeviceId().getId().getMostSignificantBits())
88   - .setDeviceIdLSB(request.getDeviceId().getId().getLeastSignificantBits())
89   - .setRequestIdMSB(request.getRequestUUID().getMostSignificantBits())
90   - .setRequestIdLSB(request.getRequestUUID().getLeastSignificantBits())
91   - .setExpirationTime(request.getExpirationTime())
92   - .setOneway(request.isOneway())
  94 + .setDeviceIdMSB(deviceId.getMostSignificantBits())
  95 + .setDeviceIdLSB(deviceId.getLeastSignificantBits())
  96 + .setRequestUuidMSB(requestUUID.getMostSignificantBits())
  97 + .setRequestUuidLSB(requestUUID.getLeastSignificantBits())
  98 + .setRequestId(requestId)
  99 + .setExpirationTime(expirationTime)
  100 + .setOneway(oneway)
93 101 .setRequestMsg(requestBuilder.build());
94   - if (request.getOriginServiceId() != null) {
95   - builder.setOriginServiceId(request.getOriginServiceId());
96   - }
97 102 return builder.build();
98 103 }
99 104 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -50,6 +50,7 @@ import org.thingsboard.server.queue.TbQueueCallback;
50 50 import org.thingsboard.server.queue.TbQueueMsgMetadata;
51 51 import org.thingsboard.server.queue.util.TbCoreComponent;
52 52 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
  53 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
53 54
54 55 import java.util.UUID;
55 56 import java.util.concurrent.locks.ReentrantLock;
... ... @@ -223,12 +224,14 @@ public class DeviceProcessor extends BaseProcessor {
223 224 public ListenableFuture<Void> processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
224 225 log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg);
225 226 SettableFuture<Void> futureToSet = SettableFuture.create();
226   - UUID uuid = new UUID(deviceRpcCallMsg.getRequestIdMSB(), deviceRpcCallMsg.getRequestIdLSB());
  227 + UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB());
  228 + DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB()));
  229 +
227 230 FromDeviceRpcResponse response;
228 231 if (!StringUtils.isEmpty(deviceRpcCallMsg.getResponseMsg().getError())) {
229   - response = new FromDeviceRpcResponse(uuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError()));
  232 + response = new FromDeviceRpcResponse(requestUuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError()));
230 233 } else {
231   - response = new FromDeviceRpcResponse(uuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null);
  234 + response = new FromDeviceRpcResponse(requestUuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null);
232 235 }
233 236 TbQueueCallback callback = new TbQueueCallback() {
234 237 @Override
... ... @@ -242,7 +245,11 @@ public class DeviceProcessor extends BaseProcessor {
242 245 futureToSet.setException(t);
243 246 }
244 247 };
245   - tbClusterService.pushNotificationToCore(deviceRpcCallMsg.getOriginServiceId(), response, callback);
  248 + FromDeviceRpcResponseActorMsg msg =
  249 + new FromDeviceRpcResponseActorMsg(deviceRpcCallMsg.getRequestId(),
  250 + tenantId,
  251 + deviceId, response);
  252 + tbClusterService.pushMsgToCore(msg, callback);
246 253 return futureToSet;
247 254 }
248 255
... ...
... ... @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor;
20 20 import lombok.ToString;
21 21 import org.thingsboard.rule.engine.api.RpcError;
22 22
  23 +import java.io.Serializable;
23 24 import java.util.Optional;
24 25 import java.util.UUID;
25 26
... ... @@ -28,7 +29,7 @@ import java.util.UUID;
28 29 */
29 30 @RequiredArgsConstructor
30 31 @ToString
31   -public class FromDeviceRpcResponse {
  32 +public class FromDeviceRpcResponse implements Serializable {
32 33 @Getter
33 34 private final UUID id;
34 35 private final String response;
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.rpc;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.RequiredArgsConstructor;
  20 +import lombok.ToString;
  21 +import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.msg.MsgType;
  25 +
  26 +@ToString
  27 +@RequiredArgsConstructor
  28 +public class FromDeviceRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
  29 +
  30 + @Getter
  31 + private final Integer requestId;
  32 + @Getter
  33 + private final TenantId tenantId;
  34 + @Getter
  35 + private final DeviceId deviceId;
  36 +
  37 + @Getter
  38 + private final FromDeviceRpcResponse msg;
  39 +
  40 + @Override
  41 + public MsgType getMsgType() {
  42 + return MsgType.DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
  43 + }
  44 +}
... ...
... ... @@ -1281,7 +1281,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1281 1281 deviceRpcCallResponseBuilder.setDeviceIdMSB(device.getUuidId().getMostSignificantBits());
1282 1282 deviceRpcCallResponseBuilder.setDeviceIdLSB(device.getUuidId().getLeastSignificantBits());
1283 1283 deviceRpcCallResponseBuilder.setOneway(true);
1284   - deviceRpcCallResponseBuilder.setOriginServiceId("originServiceId");
  1284 + deviceRpcCallResponseBuilder.setRequestId(0);
1285 1285 deviceRpcCallResponseBuilder.setExpirationTime(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10));
1286 1286 RpcResponseMsg.Builder responseBuilder =
1287 1287 RpcResponseMsg.newBuilder().setResponse("{}");
... ...
... ... @@ -336,11 +336,11 @@ message DeviceCredentialsRequestMsg {
336 336 message DeviceRpcCallMsg {
337 337 int64 deviceIdMSB = 1;
338 338 int64 deviceIdLSB = 2;
339   - int64 requestIdMSB = 3;
340   - int64 requestIdLSB = 4;
341   - int64 expirationTime = 5;
342   - bool oneway = 6;
343   - string originServiceId = 7;
  339 + int64 requestUuidMSB = 3;
  340 + int64 requestUuidLSB = 4;
  341 + int32 requestId = 5;
  342 + int64 expirationTime = 6;
  343 + bool oneway = 7;
344 344 RpcRequestMsg requestMsg = 8;
345 345 RpcResponseMsg responseMsg = 9;
346 346 }
... ...
... ... @@ -82,8 +82,12 @@ public enum MsgType {
82 82
83 83 DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG,
84 84
  85 + DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG,
  86 +
85 87 DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG,
86 88
  89 + DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG,
  90 +
87 91 SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG,
88 92
89 93 DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.api.msg;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
  21 +import org.thingsboard.server.common.data.id.EdgeId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.msg.MsgType;
  24 +
  25 +@Data
  26 +@AllArgsConstructor
  27 +public class DeviceEdgeUpdateMsg implements ToDeviceActorNotificationMsg {
  28 +
  29 + private final TenantId tenantId;
  30 + private final DeviceId deviceId;
  31 + private final EdgeId edgeId;
  32 +
  33 + @Override
  34 + public MsgType getMsgType() {
  35 + return MsgType.DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG;
  36 + }
  37 +}
... ...
... ... @@ -16,10 +16,6 @@
16 16 package org.thingsboard.rule.engine.rpc;
17 17
18 18 import com.datastax.driver.core.utils.UUIDs;
19   -import com.fasterxml.jackson.databind.ObjectMapper;
20   -import com.google.common.util.concurrent.FutureCallback;
21   -import com.google.common.util.concurrent.Futures;
22   -import com.google.common.util.concurrent.ListenableFuture;
23 19 import com.google.gson.Gson;
24 20 import com.google.gson.JsonElement;
25 21 import com.google.gson.JsonObject;
... ... @@ -36,18 +32,10 @@ import org.thingsboard.rule.engine.api.TbRelationTypes;
36 32 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
37 33 import org.thingsboard.server.common.data.DataConstants;
38 34 import org.thingsboard.server.common.data.EntityType;
39   -import org.thingsboard.server.common.data.edge.EdgeEvent;
40   -import org.thingsboard.server.common.data.edge.EdgeEventActionType;
41   -import org.thingsboard.server.common.data.edge.EdgeEventType;
42 35 import org.thingsboard.server.common.data.id.DeviceId;
43   -import org.thingsboard.server.common.data.id.EdgeId;
44 36 import org.thingsboard.server.common.data.plugin.ComponentType;
45   -import org.thingsboard.server.common.data.relation.EntityRelation;
46   -import org.thingsboard.server.common.data.relation.RelationTypeGroup;
47 37 import org.thingsboard.server.common.msg.TbMsg;
48 38
49   -import javax.annotation.Nullable;
50   -import java.util.List;
51 39 import java.util.Random;
52 40 import java.util.UUID;
53 41 import java.util.concurrent.TimeUnit;
... ... @@ -66,7 +54,6 @@ import java.util.concurrent.TimeUnit;
66 54 )
67 55 public class TbSendRPCRequestNode implements TbNode {
68 56
69   - private static final ObjectMapper json = new ObjectMapper();
70 57 private Random random = new Random();
71 58 private Gson gson = new Gson();
72 59 private JsonParser jsonParser = new JsonParser();
... ... @@ -123,59 +110,19 @@ public class TbSendRPCRequestNode implements TbNode {
123 110 .restApiCall(restApiCall)
124 111 .build();
125 112
126   - EdgeId edgeId = findRelatedEdgeId(ctx, msg);
127   - if (edgeId != null) {
128   - sendRpcRequestToEdgeDevice(ctx, msg, edgeId, request);
129   - } else {
130   - ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
131   - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
132   - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
133   - ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
134   - } else {
135   - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
136   - ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
137   - }
138   - });
139   - }
  113 + ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
  114 + if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
  115 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
  116 + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
  117 + } else {
  118 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
  119 + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
  120 + }
  121 + });
140 122 ctx.ack(msg);
141 123 }
142 124 }
143 125
144   - private EdgeId findRelatedEdgeId(TbContext ctx, TbMsg msg) {
145   - List<EntityRelation> result =
146   - ctx.getRelationService().findByToAndType(ctx.getTenantId(), msg.getOriginator(), EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
147   - if (result != null && result.size() > 0) {
148   - EntityRelation relationToEdge = result.get(0);
149   - if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
150   - return new EdgeId(relationToEdge.getFrom().getId());
151   - }
152   - }
153   - return null;
154   - }
155   -
156   - private void sendRpcRequestToEdgeDevice(TbContext ctx, TbMsg msg, EdgeId edgeId, RuleEngineDeviceRpcRequest request) {
157   - EdgeEvent edgeEvent = new EdgeEvent();
158   - edgeEvent.setTenantId(ctx.getTenantId());
159   - edgeEvent.setAction(EdgeEventActionType.RPC_CALL);
160   - edgeEvent.setEntityId(request.getDeviceId().getId());
161   - edgeEvent.setType(EdgeEventType.DEVICE);
162   - edgeEvent.setBody(json.valueToTree(request));
163   - edgeEvent.setEdgeId(edgeId);
164   - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
165   - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
166   - @Override
167   - public void onSuccess(@Nullable EdgeEvent event) {
168   - ctx.tellSuccess(msg);
169   - }
170   -
171   - @Override
172   - public void onFailure(Throwable th) {
173   - log.error("Could not save edge event", th);
174   - ctx.tellFailure(msg, th);
175   - }
176   - }, ctx.getDbCallbackExecutor());
177   - }
178   -
179 126 @Override
180 127 public void destroy() {
181 128 }
... ...