Commit 6106f4c0bf60ab70ad96c72830ea3714089e36b0

Authored by Andrew Shvayka
1 parent 6c2a7d6f

Routing RPC calls through rule engine now

... ... @@ -69,6 +69,18 @@
69 69 "configuration": {
70 70 "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
71 71 }
  72 + },
  73 + {
  74 + "additionalInfo": {
  75 + "layoutX": 825,
  76 + "layoutY": 468
  77 + },
  78 + "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
  79 + "name": "Test",
  80 + "debugMode": true,
  81 + "configuration": {
  82 + "timeoutInSeconds": 60
  83 + }
72 84 }
73 85 ],
74 86 "connections": [
... ... @@ -91,6 +103,11 @@
91 103 "fromIndex": 2,
92 104 "toIndex": 3,
93 105 "type": "RPC Request"
  106 + },
  107 + {
  108 + "fromIndex": 2,
  109 + "toIndex": 5,
  110 + "type": "RPC Request to Device"
94 111 }
95 112 ],
96 113 "ruleChainConnections": null
... ...
... ... @@ -225,9 +225,12 @@ class DefaultTbContext implements TbContext {
225 225
226 226 @Override
227 227 public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
228   - ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
229   - src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
  228 + ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(),
  229 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
230 230 mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
  231 + if (src.isRestApiCall()) {
  232 + mainCtx.getDeviceRpcService().processRestAPIRpcResponseFromRuleEngine(response);
  233 + }
231 234 consumer.accept(RuleEngineDeviceRpcResponse.builder()
232 235 .deviceId(src.getDeviceId())
233 236 .requestId(src.getRequestId())
... ...
... ... @@ -128,7 +128,7 @@ public class RpcController extends BaseController {
128 128 timeout,
129 129 body
130 130 );
131   - deviceRpcService.processRpcRequestToDevice(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
  131 + deviceRpcService.processRestAPIRpcRequestToRuleEngine(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
132 132 }
133 133
134 134 @Override
... ...
... ... @@ -15,6 +15,10 @@
15 15 */
16 16 package org.thingsboard.server.service.rpc;
17 17
  18 +import com.datastax.driver.core.utils.UUIDs;
  19 +import com.fasterxml.jackson.core.JsonProcessingException;
  20 +import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import com.fasterxml.jackson.databind.node.ObjectNode;
18 22 import com.google.protobuf.InvalidProtocolBufferException;
19 23 import lombok.extern.slf4j.Slf4j;
20 24 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -22,22 +26,23 @@ import org.springframework.stereotype.Service;
22 26 import org.thingsboard.rule.engine.api.RpcError;
23 27 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
24 28 import org.thingsboard.server.actors.service.ActorService;
  29 +import org.thingsboard.server.common.data.DataConstants;
25 30 import org.thingsboard.server.common.data.id.DeviceId;
26 31 import org.thingsboard.server.common.data.id.TenantId;
  32 +import org.thingsboard.server.common.msg.TbMsg;
  33 +import org.thingsboard.server.common.msg.TbMsgDataType;
  34 +import org.thingsboard.server.common.msg.TbMsgMetaData;
27 35 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
28 36 import org.thingsboard.server.common.msg.cluster.ServerAddress;
29 37 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
30 38 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
31   -import org.thingsboard.server.dao.audit.AuditLogService;
  39 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
32 40 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
33 41 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
34 42 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
35   -import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
36   -import org.thingsboard.server.service.telemetry.sub.Subscription;
37 43
38 44 import javax.annotation.PostConstruct;
39 45 import javax.annotation.PreDestroy;
40   -import java.util.Optional;
41 46 import java.util.UUID;
42 47 import java.util.concurrent.ConcurrentHashMap;
43 48 import java.util.concurrent.ConcurrentMap;
... ... @@ -53,6 +58,8 @@ import java.util.function.Consumer;
53 58 @Slf4j
54 59 public class DefaultDeviceRpcService implements DeviceRpcService {
55 60
  61 + private static final ObjectMapper json = new ObjectMapper();
  62 +
56 63 @Autowired
57 64 private ClusterRoutingService routingService;
58 65
... ... @@ -64,7 +71,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
64 71
65 72 private ScheduledExecutorService rpcCallBackExecutor;
66 73
67   - private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
  74 + private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localToRuleEngineRpcRequests = new ConcurrentHashMap<>();
  75 + private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localToDeviceRpcRequests = new ConcurrentHashMap<>();
68 76
69 77 @PostConstruct
70 78 public void initExecutor() {
... ... @@ -79,28 +87,40 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
79 87 }
80 88
81 89 @Override
  90 + public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
  91 + log.trace("[{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getDeviceId());
  92 + UUID requestId = request.getId();
  93 + localToRuleEngineRpcRequests.put(requestId, responseConsumer);
  94 + sendRpcRequestToRuleEngine(request);
  95 + scheduleTimeout(request, requestId, localToRuleEngineRpcRequests);
  96 + }
  97 +
  98 + @Override
  99 + public void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response) {
  100 + UUID requestId = response.getId();
  101 + Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
  102 + if (consumer != null) {
  103 + consumer.accept(response);
  104 + } else {
  105 + log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
  106 + }
  107 + }
  108 +
  109 + @Override
82 110 public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
83   - log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
84   - sendRpcRequest(request);
  111 + log.trace("[{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getDeviceId());
85 112 UUID requestId = request.getId();
86   - localRpcRequests.put(requestId, responseConsumer);
87   - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
88   - log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
89   - rpcCallBackExecutor.schedule(() -> {
90   - log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
91   - Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
92   - if (consumer != null) {
93   - consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
94   - }
95   - }, timeout, TimeUnit.MILLISECONDS);
  113 + localToDeviceRpcRequests.put(requestId, responseConsumer);
  114 + sendRpcRequestToDevice(request);
  115 + scheduleTimeout(request, requestId, localToDeviceRpcRequests);
96 116 }
97 117
98 118 @Override
99 119 public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
100   - log.error("[{}] response to request: [{}]", this.hashCode(), response.getId());
  120 + log.trace("[{}] response to request: [{}]", this.hashCode(), response.getId());
101 121 if (routingService.getCurrentServer().equals(response.getServerAddress())) {
102 122 UUID requestId = response.getId();
103   - Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
  123 + Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
104 124 if (consumer != null) {
105 125 consumer.accept(response);
106 126 } else {
... ... @@ -140,7 +160,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
140 160 forward(deviceId, rpcMsg);
141 161 }
142 162
143   - private void sendRpcRequest(ToDeviceRpcRequest msg) {
  163 + private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg) {
  164 + ObjectNode entityNode = json.createObjectNode();
  165 + TbMsgMetaData metaData = new TbMsgMetaData();
  166 + metaData.putValue("requestUUID", msg.getId().toString());
  167 + metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
  168 + metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
  169 +
  170 + entityNode.put("method", msg.getBody().getMethod());
  171 + entityNode.put("params", msg.getBody().getParams());
  172 +
  173 + try {
  174 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON
  175 + , json.writeValueAsString(entityNode)
  176 + , null, null, 0L);
  177 + actorService.onMsg(new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg));
  178 + } catch (JsonProcessingException e) {
  179 + throw new RuntimeException(e);
  180 + }
  181 + }
  182 +
  183 + private void sendRpcRequestToDevice(ToDeviceRpcRequest msg) {
144 184 ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(routingService.getCurrentServer(), msg);
145 185 log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
146 186 forward(msg.getDeviceId(), rpcMsg);
... ... @@ -149,4 +189,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
149 189 private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
150 190 actorService.onMsg(new SendToClusterMsg(deviceId, msg));
151 191 }
  192 +
  193 + private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId, ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> requestsMap) {
  194 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
  195 + log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
  196 + rpcCallBackExecutor.schedule(() -> {
  197 + log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
  198 + Consumer<FromDeviceRpcResponse> consumer = requestsMap.remove(requestId);
  199 + if (consumer != null) {
  200 + consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
  201 + }
  202 + }, timeout, TimeUnit.MILLISECONDS);
  203 + }
  204 +
  205 +
152 206 }
... ...
... ... @@ -27,6 +27,10 @@ import java.util.function.Consumer;
27 27 */
28 28 public interface DeviceRpcService {
29 29
  30 + void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
  31 +
  32 + void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response);
  33 +
30 34 void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
31 35
32 36 void processRpcResponseFromDevice(FromDeviceRpcResponse response);
... ...
... ... @@ -58,4 +58,6 @@ public class DataConstants {
58 58 public static final String ATTRIBUTES_UPDATED = "ATTRIBUTES_UPDATED";
59 59 public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED";
60 60
  61 + public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
  62 +
61 63 }
... ...
... ... @@ -19,6 +19,8 @@ import lombok.Builder;
19 19 import lombok.Data;
20 20 import org.thingsboard.server.common.data.id.DeviceId;
21 21
  22 +import java.util.UUID;
  23 +
22 24 /**
23 25 * Created by ashvayka on 02.04.18.
24 26 */
... ... @@ -28,9 +30,11 @@ public final class RuleEngineDeviceRpcRequest {
28 30
29 31 private final DeviceId deviceId;
30 32 private final int requestId;
  33 + private final UUID requestUUID;
31 34 private final boolean oneway;
32 35 private final String method;
33 36 private final String body;
34   - private final long timeout;
  37 + private final long expirationTime;
  38 + private final boolean restApiCall;
35 39
36 40 }
... ...
... ... @@ -28,7 +28,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
28 28 type = ComponentType.FILTER,
29 29 name = "message type switch",
30 30 configClazz = EmptyNodeConfiguration.class,
31   - relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Activity Event", "Inactivity Event",
  31 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event",
32 32 "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
33 33 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Other"},
34 34 nodeDescription = "Route incoming messages by Message Type",
... ... @@ -52,7 +52,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
52 52 } else if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
53 53 relationType = "Post telemetry";
54 54 } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) {
55   - relationType = "RPC Request";
  55 + relationType = "RPC Request from Device";
56 56 } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) {
57 57 relationType = "Activity Event";
58 58 } else if (msg.getType().equals(DataConstants.INACTIVITY_EVENT)) {
... ... @@ -75,6 +75,8 @@ public class TbMsgTypeSwitchNode implements TbNode {
75 75 relationType = "Attributes Updated";
76 76 } else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
77 77 relationType = "Attributes Deleted";
  78 + } else if (msg.getType().equals(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE)) {
  79 + relationType = "RPC Request to Device";
78 80 } else {
79 81 relationType = "Other";
80 82 }
... ...
... ... @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.TbMsg;
33 33 type = ComponentType.ACTION,
34 34 name = "rpc call reply",
35 35 configClazz = TbSendRpcReplyNodeConfiguration.class,
36   - nodeDescription = "Sends one-way RPC call to device",
  36 + nodeDescription = "Sends reply to RPC call from device",
37 37 nodeDetails = "Expects messages with any message type. Will forward message body to the device.",
38 38 uiResources = {"static/rulenode/rulenode-core-config.js"},
39 39 configDirective = "tbActionNodeRpcReplyConfig",
... ...
... ... @@ -15,10 +15,12 @@
15 15 */
16 16 package org.thingsboard.rule.engine.rpc;
17 17
  18 +import com.datastax.driver.core.utils.UUIDs;
18 19 import com.google.gson.Gson;
19 20 import com.google.gson.JsonObject;
20 21 import com.google.gson.JsonParser;
21 22 import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.util.StringUtils;
22 24 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
23 25 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
24 26 import org.thingsboard.rule.engine.api.RuleNode;
... ... @@ -27,12 +29,14 @@ import org.thingsboard.rule.engine.api.TbNode;
27 29 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
28 30 import org.thingsboard.rule.engine.api.TbNodeException;
29 31 import org.thingsboard.rule.engine.api.TbRelationTypes;
  32 +import org.thingsboard.server.common.data.DataConstants;
30 33 import org.thingsboard.server.common.data.EntityType;
31 34 import org.thingsboard.server.common.data.id.DeviceId;
32 35 import org.thingsboard.server.common.data.plugin.ComponentType;
33 36 import org.thingsboard.server.common.msg.TbMsg;
34 37
35 38 import java.util.Random;
  39 +import java.util.UUID;
36 40 import java.util.concurrent.TimeUnit;
37 41
38 42 @Slf4j
... ... @@ -40,8 +44,9 @@ import java.util.concurrent.TimeUnit;
40 44 type = ComponentType.ACTION,
41 45 name = "rpc call request",
42 46 configClazz = TbSendRpcRequestNodeConfiguration.class,
43   - nodeDescription = "Sends two-way RPC call to device",
44   - nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes.",
  47 + nodeDescription = "Sends RPC call to device",
  48 + nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes." +
  49 + "If the RPC call request is originated by REST API call from user, will forward the response to user immediately.",
45 50 uiResources = {"static/rulenode/rulenode-core-config.js"},
46 51 configDirective = "tbActionNodeRpcRequestConfig",
47 52 icon = "call_made"
... ... @@ -61,7 +66,7 @@ public class TbSendRPCRequestNode implements TbNode {
61 66 @Override
62 67 public void onMsg(TbContext ctx, TbMsg msg) {
63 68 JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
64   -
  69 + String tmp;
65 70 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
66 71 ctx.tellFailure(msg, new RuntimeException("Message originator is not a device entity!"));
67 72 } else if (!json.has("method")) {
... ... @@ -70,17 +75,31 @@ public class TbSendRPCRequestNode implements TbNode {
70 75 ctx.tellFailure(msg, new RuntimeException("Params are not present in the message!"));
71 76 } else {
72 77 int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
  78 + boolean restApiCall = msg.getType().equals(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE);
  79 +
  80 + tmp = msg.getMetaData().getValue("oneway");
  81 + boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
  82 +
  83 + tmp = msg.getMetaData().getValue("requestUUID");
  84 + UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased();
  85 +
  86 + tmp = msg.getMetaData().getValue("expirationTime");
  87 + long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
  88 +
73 89 RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
  90 + .oneway(oneway)
74 91 .method(json.get("method").getAsString())
75 92 .body(gson.toJson(json.get("params")))
76 93 .deviceId(new DeviceId(msg.getOriginator().getId()))
77 94 .requestId(requestId)
78   - .timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()))
  95 + .requestUUID(requestUUID)
  96 + .expirationTime(expirationTime)
  97 + .restApiCall(restApiCall)
79 98 .build();
80 99
81 100 ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
82 101 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
83   - TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
  102 + TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
84 103 ctx.tellNext(next, TbRelationTypes.SUCCESS);
85 104 } else {
86 105 TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
... ...