Showing
4 changed files
with
47 additions
and
9 deletions
... | ... | @@ -19,7 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; |
19 | 19 | import lombok.RequiredArgsConstructor; |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | 21 | import org.springframework.stereotype.Service; |
22 | -import org.thingsboard.server.common.data.audit.ActionType; | |
22 | +import org.thingsboard.common.util.JacksonUtil; | |
23 | 23 | import org.thingsboard.server.common.data.id.DeviceId; |
24 | 24 | import org.thingsboard.server.common.data.id.RpcId; |
25 | 25 | import org.thingsboard.server.common.data.id.TenantId; |
... | ... | @@ -27,9 +27,11 @@ import org.thingsboard.server.common.data.page.PageData; |
27 | 27 | import org.thingsboard.server.common.data.page.PageLink; |
28 | 28 | import org.thingsboard.server.common.data.rpc.Rpc; |
29 | 29 | import org.thingsboard.server.common.data.rpc.RpcStatus; |
30 | +import org.thingsboard.server.common.msg.TbMsg; | |
31 | +import org.thingsboard.server.common.msg.TbMsgMetaData; | |
30 | 32 | import org.thingsboard.server.dao.rpc.RpcService; |
31 | 33 | import org.thingsboard.server.queue.util.TbCoreComponent; |
32 | -import org.thingsboard.server.service.action.RuleEngineEntityActionService; | |
34 | +import org.thingsboard.server.service.queue.TbClusterService; | |
33 | 35 | |
34 | 36 | @TbCoreComponent |
35 | 37 | @Service |
... | ... | @@ -37,11 +39,11 @@ import org.thingsboard.server.service.action.RuleEngineEntityActionService; |
37 | 39 | @Slf4j |
38 | 40 | public class TbRpcService { |
39 | 41 | private final RpcService rpcService; |
40 | - private final RuleEngineEntityActionService ruleEngineEntityActionService; | |
42 | + private final TbClusterService tbClusterService; | |
41 | 43 | |
42 | 44 | public void save(TenantId tenantId, Rpc rpc) { |
43 | 45 | Rpc saved = rpcService.save(tenantId, rpc); |
44 | - ruleEngineEntityActionService.pushEntityActionToRuleEngine(saved.getId(), saved, tenantId, null, rpc.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); | |
46 | + pushRpcMsgToRuleEngine(tenantId, saved); | |
45 | 47 | } |
46 | 48 | |
47 | 49 | public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) { |
... | ... | @@ -52,12 +54,17 @@ public class TbRpcService { |
52 | 54 | foundRpc.setResponse(response); |
53 | 55 | } |
54 | 56 | Rpc saved = rpcService.save(tenantId, foundRpc); |
55 | - ruleEngineEntityActionService.pushEntityActionToRuleEngine(saved.getId(), saved, tenantId, null, ActionType.UPDATED, null); | |
57 | + pushRpcMsgToRuleEngine(tenantId, saved); | |
56 | 58 | } else { |
57 | 59 | log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId); |
58 | 60 | } |
59 | 61 | } |
60 | 62 | |
63 | + private void pushRpcMsgToRuleEngine(TenantId tenantId, Rpc rpc) { | |
64 | + TbMsg msg = TbMsg.newMsg("RPC_" + rpc.getStatus().name(), rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc)); | |
65 | + tbClusterService.pushMsgToRuleEngine(tenantId, rpc.getId(), msg, null); | |
66 | + } | |
67 | + | |
61 | 68 | public Rpc findRpcById(TenantId tenantId, RpcId rpcId) { |
62 | 69 | return rpcService.findById(tenantId, rpcId); |
63 | 70 | } | ... | ... |
... | ... | @@ -76,6 +76,13 @@ public class DataConstants { |
76 | 76 | |
77 | 77 | public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; |
78 | 78 | |
79 | + public static final String RPC_QUEUED = "RPC_QUEUED"; | |
80 | + public static final String RPC_SENT = "RPC_SENT"; | |
81 | + public static final String RPC_DELIVERED = "RPC_DELIVERED"; | |
82 | + public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; | |
83 | + public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; | |
84 | + public static final String RPC_FAILED = "RPC_FAILED"; | |
85 | + | |
79 | 86 | public static final String DEFAULT_SECRET_KEY = ""; |
80 | 87 | public static final String SECRET_KEY_FIELD_NAME = "secretKey"; |
81 | 88 | public static final String DURATION_MS_FIELD_NAME = "durationMs"; | ... | ... |
... | ... | @@ -33,8 +33,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; |
33 | 33 | type = ComponentType.FILTER, |
34 | 34 | name = "message type switch", |
35 | 35 | configClazz = EmptyNodeConfiguration.class, |
36 | - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event", | |
37 | - "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", | |
36 | + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", | |
37 | + "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", | |
38 | 38 | "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", |
39 | 39 | "Timeseries Updated", "Timeseries Deleted"}, |
40 | 40 | nodeDescription = "Route incoming messages by Message Type", |
... | ... | @@ -95,6 +95,18 @@ public class TbMsgTypeSwitchNode implements TbNode { |
95 | 95 | relationType = "Timeseries Updated"; |
96 | 96 | } else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) { |
97 | 97 | relationType = "Timeseries Deleted"; |
98 | + } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { | |
99 | + relationType = "RPC Queued"; | |
100 | + } else if (msg.getType().equals(DataConstants.RPC_SENT)) { | |
101 | + relationType = "RPC Sent"; | |
102 | + } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { | |
103 | + relationType = "RPC Delivered"; | |
104 | + } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { | |
105 | + relationType = "RPC Successful"; | |
106 | + } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { | |
107 | + relationType = "RPC Timeout"; | |
108 | + } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { | |
109 | + relationType = "RPC Failed"; | |
98 | 110 | } else { |
99 | 111 | relationType = "Other"; |
100 | 112 | } | ... | ... |
... | ... | @@ -352,7 +352,13 @@ export enum MessageType { |
352 | 352 | ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED', |
353 | 353 | ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED', |
354 | 354 | TIMESERIES_UPDATED = 'TIMESERIES_UPDATED', |
355 | - TIMESERIES_DELETED = 'TIMESERIES_DELETED' | |
355 | + TIMESERIES_DELETED = 'TIMESERIES_DELETED', | |
356 | + RPC_QUEUED = 'RPC_QUEUED', | |
357 | + RPC_SENT = 'RPC_SENT', | |
358 | + RPC_DELIVERED = 'RPC_SENT', | |
359 | + RPC_SUCCESSFUL = 'RPC_DELIVERED', | |
360 | + RPC_TIMEOUT = 'RPC_TIMEOUT', | |
361 | + RPC_FAILED = 'RPC_FAILED' | |
356 | 362 | } |
357 | 363 | |
358 | 364 | export const messageTypeNames = new Map<MessageType, string>( |
... | ... | @@ -373,7 +379,13 @@ export const messageTypeNames = new Map<MessageType, string>( |
373 | 379 | [MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'], |
374 | 380 | [MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'], |
375 | 381 | [MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'], |
376 | - [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'] | |
382 | + [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'], | |
383 | + [MessageType.RPC_QUEUED, 'RPC Queued'], | |
384 | + [MessageType.RPC_SENT, 'RPC Sent'], | |
385 | + [MessageType.RPC_DELIVERED, 'RPC Delivered'], | |
386 | + [MessageType.RPC_SUCCESSFUL, 'RPC Successful'], | |
387 | + [MessageType.RPC_TIMEOUT, 'RPC Timeout'], | |
388 | + [MessageType.RPC_FAILED, 'RPC Failed'] | |
377 | 389 | ] |
378 | 390 | ); |
379 | 391 | ... | ... |