Commit 85fcfef8a5e45b88f359fe3242dbfc2b242123ec

Authored by Volodymyr Babak
1 parent c526d13e

Added support for RPC call for edge devices

@@ -255,12 +255,15 @@ public class ActorSystemContext { @@ -255,12 +255,15 @@ public class ActorSystemContext {
255 @Getter 255 @Getter
256 private TbCoreDeviceRpcService tbCoreDeviceRpcService; 256 private TbCoreDeviceRpcService tbCoreDeviceRpcService;
257 257
  258 + @Lazy
258 @Autowired(required = false) 259 @Autowired(required = false)
259 @Getter private EdgeService edgeService; 260 @Getter private EdgeService edgeService;
260 261
  262 + @Lazy
261 @Autowired(required = false) 263 @Autowired(required = false)
262 @Getter private EdgeEventService edgeEventService; 264 @Getter private EdgeEventService edgeEventService;
263 265
  266 + @Lazy
264 @Autowired(required = false) 267 @Autowired(required = false)
265 @Getter private EdgeRpcService edgeRpcService; 268 @Getter private EdgeRpcService edgeRpcService;
266 269
@@ -78,6 +78,7 @@ import org.thingsboard.server.gen.edge.CustomerUpdateMsg; @@ -78,6 +78,7 @@ import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
78 import org.thingsboard.server.gen.edge.DashboardUpdateMsg; 78 import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
79 import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; 79 import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
80 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; 80 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
  81 +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
81 import org.thingsboard.server.gen.edge.DeviceUpdateMsg; 82 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
82 import org.thingsboard.server.gen.edge.DownlinkMsg; 83 import org.thingsboard.server.gen.edge.DownlinkMsg;
83 import org.thingsboard.server.gen.edge.DownlinkResponseMsg; 84 import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
@@ -333,6 +334,9 @@ public final class EdgeGrpcSession implements Closeable { @@ -333,6 +334,9 @@ public final class EdgeGrpcSession implements Closeable {
333 case ENTITY_EXISTS_REQUEST: 334 case ENTITY_EXISTS_REQUEST:
334 downlinkMsg = processEntityExistsRequestMessage(edgeEvent); 335 downlinkMsg = processEntityExistsRequestMessage(edgeEvent);
335 break; 336 break;
  337 + case RPC_CALL:
  338 + downlinkMsg = processRpcCallMsg(edgeEvent);
  339 + break;
336 } 340 }
337 if (downlinkMsg != null) { 341 if (downlinkMsg != null) {
338 result.add(downlinkMsg); 342 result.add(downlinkMsg);
@@ -358,6 +362,15 @@ public final class EdgeGrpcSession implements Closeable { @@ -358,6 +362,15 @@ public final class EdgeGrpcSession implements Closeable {
358 return downlinkMsg; 362 return downlinkMsg;
359 } 363 }
360 364
  365 + private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) {
  366 + log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent);
  367 + DeviceRpcCallMsg deviceRpcCallMsg =
  368 + ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityBody());
  369 + return DownlinkMsg.newBuilder()
  370 + .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg))
  371 + .build();
  372 + }
  373 +
361 private DownlinkMsg processCredentialsRequestMessage(EdgeEvent edgeEvent) { 374 private DownlinkMsg processCredentialsRequestMessage(EdgeEvent edgeEvent) {
362 DownlinkMsg downlinkMsg = null; 375 DownlinkMsg downlinkMsg = null;
363 if (EdgeEventType.DEVICE.equals(edgeEvent.getEdgeEventType())) { 376 if (EdgeEventType.DEVICE.equals(edgeEvent.getEdgeEventType())) {
@@ -883,6 +896,11 @@ public final class EdgeGrpcSession implements Closeable { @@ -883,6 +896,11 @@ public final class EdgeGrpcSession implements Closeable {
883 result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg)); 896 result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg));
884 } 897 }
885 } 898 }
  899 + if (uplinkMsg.getDeviceRpcCallMsgList() != null && !uplinkMsg.getDeviceRpcCallMsgList().isEmpty()) {
  900 + for (DeviceRpcCallMsg deviceRpcCallMsg: uplinkMsg.getDeviceRpcCallMsgList()) {
  901 + result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg));
  902 + }
  903 + }
886 } catch (Exception e) { 904 } catch (Exception e) {
887 log.error("Can't process uplink msg [{}]", uplinkMsg, e); 905 log.error("Can't process uplink msg [{}]", uplinkMsg, e);
888 } 906 }
@@ -15,21 +15,27 @@ @@ -15,21 +15,27 @@
15 */ 15 */
16 package org.thingsboard.server.service.edge.rpc.constructor; 16 package org.thingsboard.server.service.edge.rpc.constructor;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
18 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
  22 +import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
20 import org.thingsboard.server.common.data.Device; 23 import org.thingsboard.server.common.data.Device;
21 import org.thingsboard.server.common.data.id.CustomerId; 24 import org.thingsboard.server.common.data.id.CustomerId;
22 import org.thingsboard.server.common.data.id.DeviceId; 25 import org.thingsboard.server.common.data.id.DeviceId;
23 -import org.thingsboard.server.common.data.id.EntityId;  
24 import org.thingsboard.server.common.data.security.DeviceCredentials; 26 import org.thingsboard.server.common.data.security.DeviceCredentials;
25 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; 27 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
  28 +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
26 import org.thingsboard.server.gen.edge.DeviceUpdateMsg; 29 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  30 +import org.thingsboard.server.gen.edge.RpcRequestMsg;
27 import org.thingsboard.server.gen.edge.UpdateMsgType; 31 import org.thingsboard.server.gen.edge.UpdateMsgType;
28 32
29 @Component 33 @Component
30 @Slf4j 34 @Slf4j
31 public class DeviceMsgConstructor { 35 public class DeviceMsgConstructor {
32 36
  37 + protected static final ObjectMapper mapper = new ObjectMapper();
  38 +
33 public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, CustomerId customerId) { 39 public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, CustomerId customerId) {
34 DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() 40 DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
35 .setMsgType(msgType) 41 .setMsgType(msgType)
@@ -67,4 +73,21 @@ public class DeviceMsgConstructor { @@ -67,4 +73,21 @@ public class DeviceMsgConstructor {
67 .setIdMSB(deviceId.getId().getMostSignificantBits()) 73 .setIdMSB(deviceId.getId().getMostSignificantBits())
68 .setIdLSB(deviceId.getId().getLeastSignificantBits()).build(); 74 .setIdLSB(deviceId.getId().getLeastSignificantBits()).build();
69 } 75 }
  76 +
  77 + public DeviceRpcCallMsg constructDeviceRpcCallMsg(JsonNode body) {
  78 + RuleEngineDeviceRpcRequest request = mapper.convertValue(body, RuleEngineDeviceRpcRequest.class);
  79 + RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder();
  80 + requestBuilder.setMethod(request.getMethod());
  81 + requestBuilder.setParams(request.getBody());
  82 + DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder()
  83 + .setDeviceIdMSB(request.getDeviceId().getId().getMostSignificantBits())
  84 + .setDeviceIdLSB(request.getDeviceId().getId().getLeastSignificantBits())
  85 + .setRequestIdMSB(request.getRequestUUID().getMostSignificantBits())
  86 + .setRequestIdLSB(request.getRequestUUID().getLeastSignificantBits())
  87 + .setExpirationTime(request.getExpirationTime())
  88 + .setOriginServiceId(request.getOriginServiceId())
  89 + .setOneway(request.isOneway())
  90 + .setRequestMsg(requestBuilder.build());
  91 + return builder.build();
  92 + }
70 } 93 }
@@ -39,6 +39,7 @@ import org.thingsboard.server.dao.relation.RelationService; @@ -39,6 +39,7 @@ import org.thingsboard.server.dao.relation.RelationService;
39 import org.thingsboard.server.dao.user.UserService; 39 import org.thingsboard.server.dao.user.UserService;
40 import org.thingsboard.server.service.executors.DbCallbackExecutorService; 40 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
41 import org.thingsboard.server.service.queue.TbClusterService; 41 import org.thingsboard.server.service.queue.TbClusterService;
  42 +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
42 import org.thingsboard.server.service.state.DeviceStateService; 43 import org.thingsboard.server.service.state.DeviceStateService;
43 44
44 @Slf4j 45 @Slf4j
@@ -47,6 +48,9 @@ public abstract class BaseProcessor { @@ -47,6 +48,9 @@ public abstract class BaseProcessor {
47 protected static final ObjectMapper mapper = new ObjectMapper(); 48 protected static final ObjectMapper mapper = new ObjectMapper();
48 49
49 @Autowired 50 @Autowired
  51 + protected TbRuleEngineDeviceRpcService tbDeviceRpcService;
  52 +
  53 + @Autowired
50 protected AlarmService alarmService; 54 protected AlarmService alarmService;
51 55
52 @Autowired 56 @Autowired
@@ -21,7 +21,9 @@ import com.google.common.util.concurrent.Futures; @@ -21,7 +21,9 @@ import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
22 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.commons.lang.RandomStringUtils; 23 import org.apache.commons.lang.RandomStringUtils;
  24 +import org.apache.commons.lang.StringUtils;
24 import org.springframework.stereotype.Component; 25 import org.springframework.stereotype.Component;
  26 +import org.thingsboard.rule.engine.api.RpcError;
25 import org.thingsboard.server.common.data.DataConstants; 27 import org.thingsboard.server.common.data.DataConstants;
26 import org.thingsboard.server.common.data.Device; 28 import org.thingsboard.server.common.data.Device;
27 import org.thingsboard.server.common.data.audit.ActionType; 29 import org.thingsboard.server.common.data.audit.ActionType;
@@ -40,9 +42,11 @@ import org.thingsboard.server.common.msg.TbMsg; @@ -40,9 +42,11 @@ import org.thingsboard.server.common.msg.TbMsg;
40 import org.thingsboard.server.common.msg.TbMsgDataType; 42 import org.thingsboard.server.common.msg.TbMsgDataType;
41 import org.thingsboard.server.common.msg.TbMsgMetaData; 43 import org.thingsboard.server.common.msg.TbMsgMetaData;
42 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; 44 import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
  45 +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
43 import org.thingsboard.server.gen.edge.DeviceUpdateMsg; 46 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
44 import org.thingsboard.server.queue.TbQueueCallback; 47 import org.thingsboard.server.queue.TbQueueCallback;
45 import org.thingsboard.server.queue.TbQueueMsgMetadata; 48 import org.thingsboard.server.queue.TbQueueMsgMetadata;
  49 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
46 50
47 import java.util.UUID; 51 import java.util.UUID;
48 import java.util.concurrent.locks.ReentrantLock; 52 import java.util.concurrent.locks.ReentrantLock;
@@ -213,4 +217,17 @@ public class DeviceProcessor extends BaseProcessor { @@ -213,4 +217,17 @@ public class DeviceProcessor extends BaseProcessor {
213 metaData.putValue("edgeName", edge.getName()); 217 metaData.putValue("edgeName", edge.getName());
214 return metaData; 218 return metaData;
215 } 219 }
  220 +
  221 + public ListenableFuture<Void> processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
  222 + UUID uuid = new UUID(deviceRpcCallMsg.getRequestIdMSB(), deviceRpcCallMsg.getRequestIdLSB());
  223 + FromDeviceRpcResponse response;
  224 + if (!StringUtils.isEmpty(deviceRpcCallMsg.getResponseMsg().getError())) {
  225 + response = new FromDeviceRpcResponse(uuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError()));
  226 + } else {
  227 + response = new FromDeviceRpcResponse(uuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null);
  228 + }
  229 + tbDeviceRpcService.sendRpcResponseToTbCore(deviceRpcCallMsg.getOriginServiceId(), response);
  230 + return Futures.immediateFuture(null);
  231 + }
  232 +
216 } 233 }
@@ -151,7 +151,8 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -151,7 +151,8 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
151 } 151 }
152 } 152 }
153 153
154 - private void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response) { 154 + @Override
  155 + public void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response) {
155 if (serviceId.equals(originServiceId)) { 156 if (serviceId.equals(originServiceId)) {
156 if (tbCoreRpcService.isPresent()) { 157 if (tbCoreRpcService.isPresent()) {
157 tbCoreRpcService.get().processRpcResponseFromRuleEngine(response); 158 tbCoreRpcService.get().processRpcResponseFromRuleEngine(response);
@@ -29,4 +29,7 @@ public interface TbRuleEngineDeviceRpcService extends RuleEngineRpcService { @@ -29,4 +29,7 @@ public interface TbRuleEngineDeviceRpcService extends RuleEngineRpcService {
29 */ 29 */
30 void processRpcResponseFromDevice(FromDeviceRpcResponse response); 30 void processRpcResponseFromDevice(FromDeviceRpcResponse response);
31 31
  32 +
  33 + void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response);
  34 +
32 } 35 }
@@ -588,7 +588,7 @@ transport: @@ -588,7 +588,7 @@ transport:
588 # Edges parameters 588 # Edges parameters
589 edges: 589 edges:
590 rpc: 590 rpc:
591 - enabled: "${EDGES_RPC_ENABLED:true}" 591 + enabled: "${EDGES_RPC_ENABLED:false}"
592 port: "${EDGES_RPC_PORT:7070}" 592 port: "${EDGES_RPC_PORT:7070}"
593 ssl: 593 ssl:
594 # Enable/disable SSL support 594 # Enable/disable SSL support
@@ -322,6 +322,28 @@ message DeviceCredentialsRequestMsg { @@ -322,6 +322,28 @@ message DeviceCredentialsRequestMsg {
322 int64 deviceIdLSB = 2; 322 int64 deviceIdLSB = 2;
323 } 323 }
324 324
  325 +message DeviceRpcCallMsg {
  326 + int64 deviceIdMSB = 1;
  327 + int64 deviceIdLSB = 2;
  328 + int64 requestIdMSB = 3;
  329 + int64 requestIdLSB = 4;
  330 + int64 expirationTime = 5;
  331 + bool oneway = 6;
  332 + string originServiceId = 7;
  333 + RpcRequestMsg requestMsg = 8;
  334 + RpcResponseMsg responseMsg = 9;
  335 +}
  336 +
  337 +message RpcRequestMsg {
  338 + string method = 1;
  339 + string params = 2;
  340 +}
  341 +
  342 +message RpcResponseMsg {
  343 + string response = 1;
  344 + string error = 2;
  345 +}
  346 +
325 enum EdgeEntityType { 347 enum EdgeEntityType {
326 DEVICE = 0; 348 DEVICE = 0;
327 ASSET = 1; 349 ASSET = 1;
@@ -343,6 +365,7 @@ message UplinkMsg { @@ -343,6 +365,7 @@ message UplinkMsg {
343 repeated RelationRequestMsg relationRequestMsg = 9; 365 repeated RelationRequestMsg relationRequestMsg = 9;
344 repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10; 366 repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10;
345 repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11; 367 repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11;
  368 + repeated DeviceRpcCallMsg deviceRpcCallMsg = 12;
346 } 369 }
347 370
348 message UplinkResponseMsg { 371 message UplinkResponseMsg {
@@ -374,6 +397,6 @@ message DownlinkMsg { @@ -374,6 +397,6 @@ message DownlinkMsg {
374 repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 16; 397 repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 16;
375 repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 17; 398 repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 17;
376 repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 18; 399 repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 18;
377 - 400 + repeated DeviceRpcCallMsg deviceRpcCallMsg = 19;
378 } 401 }
379 402
@@ -425,38 +425,43 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @@ -425,38 +425,43 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
425 425
426 @Override 426 @Override
427 public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { 427 public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
428 - switch (entityId.getEntityType()) {  
429 - case DEVICE:  
430 - case ASSET:  
431 - case ENTITY_VIEW:  
432 - ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture =  
433 - relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);  
434 - return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> {  
435 - if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0 &&  
436 - originatorEdgeRelations.get(0).getFrom() != null) {  
437 - return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); 428 + if (EntityType.TENANT.equals(entityId.getEntityType())) {
  429 + TextPageData<Edge> edgesByTenantId = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
  430 + return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
  431 + } else {
  432 + switch (entityId.getEntityType()) {
  433 + case DEVICE:
  434 + case ASSET:
  435 + case ENTITY_VIEW:
  436 + ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture =
  437 + relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
  438 + return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> {
  439 + if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0 &&
  440 + originatorEdgeRelations.get(0).getFrom() != null) {
  441 + return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
  442 + } else {
  443 + return Collections.emptyList();
  444 + }
  445 + }, MoreExecutors.directExecutor());
  446 + case DASHBOARD:
  447 + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())));
  448 + case RULE_CHAIN:
  449 + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())));
  450 + case USER:
  451 + User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));
  452 + if (userById == null) {
  453 + return Futures.immediateFuture(Collections.emptyList());
  454 + }
  455 + TextPageData<Edge> edges;
  456 + if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) {
  457 + edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
438 } else { 458 } else {
439 - return Collections.emptyList(); 459 + edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
440 } 460 }
441 - }, MoreExecutors.directExecutor());  
442 - case DASHBOARD:  
443 - return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())));  
444 - case RULE_CHAIN:  
445 - return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())));  
446 - case USER:  
447 - User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));  
448 - if (userById == null) { 461 + return convertToEdgeIds(Futures.immediateFuture(edges.getData()));
  462 + default:
449 return Futures.immediateFuture(Collections.emptyList()); 463 return Futures.immediateFuture(Collections.emptyList());
450 - }  
451 - TextPageData<Edge> edges;  
452 - if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) {  
453 - edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));  
454 - } else {  
455 - edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));  
456 - }  
457 - return convertToEdgeIds(Futures.immediateFuture(edges.getData()));  
458 - default:  
459 - return Futures.immediateFuture(Collections.emptyList()); 464 + }
460 } 465 }
461 } 466 }
462 467
@@ -33,29 +33,20 @@ import org.thingsboard.server.common.data.DataConstants; @@ -33,29 +33,20 @@ import org.thingsboard.server.common.data.DataConstants;
33 import org.thingsboard.server.common.data.EdgeUtils; 33 import org.thingsboard.server.common.data.EdgeUtils;
34 import org.thingsboard.server.common.data.EntityType; 34 import org.thingsboard.server.common.data.EntityType;
35 import org.thingsboard.server.common.data.audit.ActionType; 35 import org.thingsboard.server.common.data.audit.ActionType;
36 -import org.thingsboard.server.common.data.edge.Edge;  
37 import org.thingsboard.server.common.data.edge.EdgeEvent; 36 import org.thingsboard.server.common.data.edge.EdgeEvent;
38 import org.thingsboard.server.common.data.edge.EdgeEventType; 37 import org.thingsboard.server.common.data.edge.EdgeEventType;
39 import org.thingsboard.server.common.data.id.EdgeId; 38 import org.thingsboard.server.common.data.id.EdgeId;
40 -import org.thingsboard.server.common.data.id.EntityId;  
41 -import org.thingsboard.server.common.data.id.IdBased;  
42 import org.thingsboard.server.common.data.id.TenantId; 39 import org.thingsboard.server.common.data.id.TenantId;
43 -import org.thingsboard.server.common.data.page.TextPageData;  
44 -import org.thingsboard.server.common.data.page.TextPageLink;  
45 import org.thingsboard.server.common.data.plugin.ComponentType; 40 import org.thingsboard.server.common.data.plugin.ComponentType;
46 -import org.thingsboard.server.common.data.relation.EntityRelation;  
47 -import org.thingsboard.server.common.data.relation.RelationTypeGroup;  
48 import org.thingsboard.server.common.data.rule.RuleChainType; 41 import org.thingsboard.server.common.data.rule.RuleChainType;
49 import org.thingsboard.server.common.msg.TbMsg; 42 import org.thingsboard.server.common.msg.TbMsg;
50 import org.thingsboard.server.common.msg.session.SessionMsgType; 43 import org.thingsboard.server.common.msg.session.SessionMsgType;
51 44
52 import javax.annotation.Nullable; 45 import javax.annotation.Nullable;
53 -import java.util.ArrayList;  
54 import java.util.HashMap; 46 import java.util.HashMap;
55 import java.util.List; 47 import java.util.List;
56 import java.util.Map; 48 import java.util.Map;
57 import java.util.UUID; 49 import java.util.UUID;
58 -import java.util.stream.Collectors;  
59 50
60 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; 51 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
61 52
@@ -86,51 +77,12 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -86,51 +77,12 @@ public class TbMsgPushToEdgeNode implements TbNode {
86 public void onMsg(TbContext ctx, TbMsg msg) { 77 public void onMsg(TbContext ctx, TbMsg msg) {
87 if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) { 78 if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
88 log.debug("Ignoring msg from the cloud, msg [{}]", msg); 79 log.debug("Ignoring msg from the cloud, msg [{}]", msg);
  80 + ctx.ack(msg);
89 return; 81 return;
90 } 82 }
91 if (isSupportedOriginator(msg.getOriginator().getEntityType())) { 83 if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
92 if (isSupportedMsgType(msg.getType())) { 84 if (isSupportedMsgType(msg.getType())) {
93 - ListenableFuture<List<EdgeId>> getEdgeIdsFuture = getEdgeIdsByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());  
94 - Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() {  
95 - @Override  
96 - public void onSuccess(@Nullable List<EdgeId> edgeIds) {  
97 - if (edgeIds != null && !edgeIds.isEmpty()) {  
98 - for (EdgeId edgeId : edgeIds) {  
99 - try {  
100 - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);  
101 - if (edgeEvent == null) {  
102 - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());  
103 - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));  
104 - } else {  
105 - edgeEvent.setEdgeId(edgeId);  
106 - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);  
107 - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {  
108 - @Override  
109 - public void onSuccess(@Nullable EdgeEvent event) {  
110 - ctx.tellNext(msg, SUCCESS);  
111 - }  
112 -  
113 - @Override  
114 - public void onFailure(Throwable th) {  
115 - log.error("Could not save edge event", th);  
116 - ctx.tellFailure(msg, th);  
117 - }  
118 - }, ctx.getDbCallbackExecutor());  
119 - }  
120 - } catch (JsonProcessingException e) {  
121 - log.error("Failed to build edge event", e);  
122 - ctx.tellFailure(msg, e);  
123 - }  
124 - }  
125 - }  
126 - }  
127 -  
128 - @Override  
129 - public void onFailure(Throwable t) {  
130 - ctx.tellFailure(msg, t);  
131 - }  
132 -  
133 - }, ctx.getDbCallbackExecutor()); 85 + processMsg(ctx, msg);
134 } else { 86 } else {
135 log.debug("Unsupported msg type {}", msg.getType()); 87 log.debug("Unsupported msg type {}", msg.getType());
136 ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'")); 88 ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'"));
@@ -141,6 +93,50 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -141,6 +93,50 @@ public class TbMsgPushToEdgeNode implements TbNode {
141 } 93 }
142 } 94 }
143 95
  96 + private void processMsg(TbContext ctx, TbMsg msg) {
  97 + ListenableFuture<List<EdgeId>> getEdgeIdsFuture = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator());
  98 + Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() {
  99 + @Override
  100 + public void onSuccess(@Nullable List<EdgeId> edgeIds) {
  101 + if (edgeIds != null && !edgeIds.isEmpty()) {
  102 + for (EdgeId edgeId : edgeIds) {
  103 + try {
  104 + EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
  105 + if (edgeEvent == null) {
  106 + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
  107 + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
  108 + } else {
  109 + edgeEvent.setEdgeId(edgeId);
  110 + ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
  111 + Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
  112 + @Override
  113 + public void onSuccess(@Nullable EdgeEvent event) {
  114 + ctx.tellNext(msg, SUCCESS);
  115 + }
  116 +
  117 + @Override
  118 + public void onFailure(Throwable th) {
  119 + log.error("Could not save edge event", th);
  120 + ctx.tellFailure(msg, th);
  121 + }
  122 + }, ctx.getDbCallbackExecutor());
  123 + }
  124 + } catch (JsonProcessingException e) {
  125 + log.error("Failed to build edge event", e);
  126 + ctx.tellFailure(msg, e);
  127 + }
  128 + }
  129 + }
  130 + }
  131 +
  132 + @Override
  133 + public void onFailure(Throwable t) {
  134 + ctx.tellFailure(msg, t);
  135 + }
  136 +
  137 + }, ctx.getDbCallbackExecutor());
  138 + }
  139 +
144 private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException { 140 private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException {
145 if (DataConstants.ALARM.equals(msg.getType())) { 141 if (DataConstants.ALARM.equals(msg.getType())) {
146 return buildEdgeEvent(ctx.getTenantId(), ActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null); 142 return buildEdgeEvent(ctx.getTenantId(), ActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null);
@@ -227,15 +223,6 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -227,15 +223,6 @@ public class TbMsgPushToEdgeNode implements TbNode {
227 || DataConstants.ALARM.equals(msgType); 223 || DataConstants.ALARM.equals(msgType);
228 } 224 }
229 225
230 - private ListenableFuture<List<EdgeId>> getEdgeIdsByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {  
231 - if (EntityType.TENANT.equals(originatorId.getEntityType())) {  
232 - TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));  
233 - return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));  
234 - } else {  
235 - return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId);  
236 - }  
237 - }  
238 -  
239 @Override 226 @Override
240 public void destroy() { 227 public void destroy() {
241 } 228 }
@@ -16,13 +16,16 @@ @@ -16,13 +16,16 @@
16 package org.thingsboard.rule.engine.rpc; 16 package org.thingsboard.rule.engine.rpc;
17 17
18 import com.datastax.driver.core.utils.UUIDs; 18 import com.datastax.driver.core.utils.UUIDs;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.util.concurrent.FutureCallback;
  21 +import com.google.common.util.concurrent.Futures;
  22 +import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.gson.Gson; 23 import com.google.gson.Gson;
20 import com.google.gson.JsonElement; 24 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject; 25 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser; 26 import com.google.gson.JsonParser;
23 import lombok.extern.slf4j.Slf4j; 27 import lombok.extern.slf4j.Slf4j;
24 import org.springframework.util.StringUtils; 28 import org.springframework.util.StringUtils;
25 -import org.thingsboard.rule.engine.api.util.TbNodeUtils;  
26 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; 29 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
27 import org.thingsboard.rule.engine.api.RuleNode; 30 import org.thingsboard.rule.engine.api.RuleNode;
28 import org.thingsboard.rule.engine.api.TbContext; 31 import org.thingsboard.rule.engine.api.TbContext;
@@ -30,13 +33,21 @@ import org.thingsboard.rule.engine.api.TbNode; @@ -30,13 +33,21 @@ import org.thingsboard.rule.engine.api.TbNode;
30 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 33 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
31 import org.thingsboard.rule.engine.api.TbNodeException; 34 import org.thingsboard.rule.engine.api.TbNodeException;
32 import org.thingsboard.rule.engine.api.TbRelationTypes; 35 import org.thingsboard.rule.engine.api.TbRelationTypes;
  36 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
33 import org.thingsboard.server.common.data.DataConstants; 37 import org.thingsboard.server.common.data.DataConstants;
34 import org.thingsboard.server.common.data.EntityType; 38 import org.thingsboard.server.common.data.EntityType;
  39 +import org.thingsboard.server.common.data.audit.ActionType;
  40 +import org.thingsboard.server.common.data.edge.EdgeEvent;
  41 +import org.thingsboard.server.common.data.edge.EdgeEventType;
35 import org.thingsboard.server.common.data.id.DeviceId; 42 import org.thingsboard.server.common.data.id.DeviceId;
  43 +import org.thingsboard.server.common.data.id.EdgeId;
36 import org.thingsboard.server.common.data.plugin.ComponentType; 44 import org.thingsboard.server.common.data.plugin.ComponentType;
37 -import org.thingsboard.server.common.data.rule.RuleChainType; 45 +import org.thingsboard.server.common.data.relation.EntityRelation;
  46 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
38 import org.thingsboard.server.common.msg.TbMsg; 47 import org.thingsboard.server.common.msg.TbMsg;
39 48
  49 +import javax.annotation.Nullable;
  50 +import java.util.List;
40 import java.util.Random; 51 import java.util.Random;
41 import java.util.UUID; 52 import java.util.UUID;
42 import java.util.concurrent.TimeUnit; 53 import java.util.concurrent.TimeUnit;
@@ -55,6 +66,7 @@ import java.util.concurrent.TimeUnit; @@ -55,6 +66,7 @@ import java.util.concurrent.TimeUnit;
55 ) 66 )
56 public class TbSendRPCRequestNode implements TbNode { 67 public class TbSendRPCRequestNode implements TbNode {
57 68
  69 + private static final ObjectMapper json = new ObjectMapper();
58 private Random random = new Random(); 70 private Random random = new Random();
59 private Gson gson = new Gson(); 71 private Gson gson = new Gson();
60 private JsonParser jsonParser = new JsonParser(); 72 private JsonParser jsonParser = new JsonParser();
@@ -111,19 +123,57 @@ public class TbSendRPCRequestNode implements TbNode { @@ -111,19 +123,57 @@ public class TbSendRPCRequestNode implements TbNode {
111 .restApiCall(restApiCall) 123 .restApiCall(restApiCall)
112 .build(); 124 .build();
113 125
114 - ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {  
115 - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {  
116 - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));  
117 - ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);  
118 - } else {  
119 - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));  
120 - ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));  
121 - }  
122 - }); 126 + EdgeId edgeId = findRelatedEdgeId(ctx, msg);
  127 + if (edgeId != null) {
  128 + sendRpcRequestToEdgeDevice(ctx, msg, edgeId, request);
  129 + } else {
  130 + ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
  131 + if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
  132 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
  133 + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
  134 + } else {
  135 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
  136 + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
  137 + }
  138 + });
  139 + }
123 ctx.ack(msg); 140 ctx.ack(msg);
124 } 141 }
125 } 142 }
126 143
  144 + private EdgeId findRelatedEdgeId(TbContext ctx, TbMsg msg) {
  145 + List<EntityRelation> result =
  146 + ctx.getRelationService().findByToAndType(ctx.getTenantId(), msg.getOriginator(), EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
  147 + if (result != null && result.size() > 0) {
  148 + return new EdgeId(result.get(0).getFrom().getId());
  149 + } else {
  150 + return null;
  151 + }
  152 + }
  153 +
  154 + private void sendRpcRequestToEdgeDevice(TbContext ctx, TbMsg msg, EdgeId edgeId, RuleEngineDeviceRpcRequest request) {
  155 + EdgeEvent edgeEvent = new EdgeEvent();
  156 + edgeEvent.setTenantId(ctx.getTenantId());
  157 + edgeEvent.setEdgeEventAction(ActionType.RPC_CALL.name());
  158 + edgeEvent.setEntityId(request.getDeviceId().getId());
  159 + edgeEvent.setEdgeEventType(EdgeEventType.DEVICE);
  160 + edgeEvent.setEntityBody(json.valueToTree(request));
  161 + edgeEvent.setEdgeId(edgeId);
  162 + ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
  163 + Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
  164 + @Override
  165 + public void onSuccess(@Nullable EdgeEvent event) {
  166 + ctx.tellSuccess(msg);
  167 + }
  168 +
  169 + @Override
  170 + public void onFailure(Throwable th) {
  171 + log.error("Could not save edge event", th);
  172 + ctx.tellFailure(msg, th);
  173 + }
  174 + }, ctx.getDbCallbackExecutor());
  175 + }
  176 +
127 @Override 177 @Override
128 public void destroy() { 178 public void destroy() {
129 } 179 }