Commit d44adcfe7585b800a8585abae2281c861b668b0a

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 9093206b

added removing rpc request from pending map after rpc deleting

@@ -95,6 +95,7 @@ public class AppActor extends ContextAwareActor { @@ -95,6 +95,7 @@ public class AppActor extends ContextAwareActor {
95 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 95 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
96 case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 96 case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
97 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 97 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  98 + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
98 onToDeviceActorMsg((TenantAwareMsg) msg, true); 99 onToDeviceActorMsg((TenantAwareMsg) msg, true);
99 break; 100 break;
100 case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: 101 case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId;
28 import org.thingsboard.server.common.msg.TbActorMsg; 28 import org.thingsboard.server.common.msg.TbActorMsg;
29 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; 29 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
30 import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; 30 import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
  31 +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
31 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; 32 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
32 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; 33 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
33 34
@@ -84,6 +85,9 @@ public class DeviceActor extends ContextAwareActor { @@ -84,6 +85,9 @@ public class DeviceActor extends ContextAwareActor {
84 case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: 85 case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
85 processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg); 86 processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg);
86 break; 87 break;
  88 + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
  89 + processor.processRemoveRpc(ctx, (RemoveRpcActorMsg) msg);
  90 + break;
87 default: 91 default:
88 return false; 92 return false;
89 } 93 }
@@ -88,6 +88,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct @@ -88,6 +88,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct
88 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; 88 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
89 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; 89 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
90 import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; 90 import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
  91 +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
91 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; 92 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
92 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; 93 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
93 94
@@ -263,6 +264,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -263,6 +264,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
263 } 264 }
264 } 265 }
265 266
  267 + void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
  268 + log.debug("[{}] Processing remove rpc command", msg.getRequestId());
  269 + Integer requestId = null;
  270 + for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {
  271 + if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
  272 + requestId = entry.getKey();
  273 + break;
  274 + }
  275 + }
  276 +
  277 + if (requestId != null) {
  278 + toDeviceRpcPendingMap.remove(requestId);
  279 + }
  280 + }
  281 +
266 private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { 282 private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
267 toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); 283 toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
268 DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); 284 DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
@@ -165,6 +165,7 @@ public class TenantActor extends RuleChainManagerActor { @@ -165,6 +165,7 @@ public class TenantActor extends RuleChainManagerActor {
165 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 165 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
166 case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 166 case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
167 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 167 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  168 + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
168 onToDeviceActorMsg((DeviceAwareMsg) msg, true); 169 onToDeviceActorMsg((DeviceAwareMsg) msg, true);
169 break; 170 break;
170 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 171 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
@@ -57,7 +57,7 @@ import java.util.UUID; @@ -57,7 +57,7 @@ import java.util.UUID;
57 public abstract class AbstractRpcController extends BaseController { 57 public abstract class AbstractRpcController extends BaseController {
58 58
59 @Autowired 59 @Autowired
60 - private TbCoreDeviceRpcService deviceRpcService; 60 + protected TbCoreDeviceRpcService deviceRpcService;
61 61
62 @Autowired 62 @Autowired
63 private AccessValidator accessValidator; 63 private AccessValidator accessValidator;
@@ -27,6 +27,7 @@ import org.springframework.web.bind.annotation.RequestParam; @@ -27,6 +27,7 @@ import org.springframework.web.bind.annotation.RequestParam;
27 import org.springframework.web.bind.annotation.ResponseBody; 27 import org.springframework.web.bind.annotation.ResponseBody;
28 import org.springframework.web.bind.annotation.RestController; 28 import org.springframework.web.bind.annotation.RestController;
29 import org.springframework.web.context.request.async.DeferredResult; 29 import org.springframework.web.context.request.async.DeferredResult;
  30 +import org.thingsboard.common.util.JacksonUtil;
30 import org.thingsboard.server.common.data.exception.ThingsboardException; 31 import org.thingsboard.server.common.data.exception.ThingsboardException;
31 import org.thingsboard.server.common.data.id.DeviceId; 32 import org.thingsboard.server.common.data.id.DeviceId;
32 import org.thingsboard.server.common.data.id.RpcId; 33 import org.thingsboard.server.common.data.id.RpcId;
@@ -35,11 +36,16 @@ import org.thingsboard.server.common.data.page.PageData; @@ -35,11 +36,16 @@ import org.thingsboard.server.common.data.page.PageData;
35 import org.thingsboard.server.common.data.page.PageLink; 36 import org.thingsboard.server.common.data.page.PageLink;
36 import org.thingsboard.server.common.data.rpc.Rpc; 37 import org.thingsboard.server.common.data.rpc.Rpc;
37 import org.thingsboard.server.common.data.rpc.RpcStatus; 38 import org.thingsboard.server.common.data.rpc.RpcStatus;
  39 +import org.thingsboard.server.common.msg.TbMsg;
  40 +import org.thingsboard.server.common.msg.TbMsgMetaData;
38 import org.thingsboard.server.queue.util.TbCoreComponent; 41 import org.thingsboard.server.queue.util.TbCoreComponent;
  42 +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
39 import org.thingsboard.server.service.security.permission.Operation; 43 import org.thingsboard.server.service.security.permission.Operation;
40 44
41 import java.util.UUID; 45 import java.util.UUID;
42 46
  47 +import static org.thingsboard.server.common.data.DataConstants.RPC_DELETED;
  48 +
43 @RestController 49 @RestController
44 @TbCoreComponent 50 @TbCoreComponent
45 @RequestMapping(TbUrlConstants.RPC_V2_URL_PREFIX) 51 @RequestMapping(TbUrlConstants.RPC_V2_URL_PREFIX)
@@ -100,7 +106,21 @@ public class RpcV2Controller extends AbstractRpcController { @@ -100,7 +106,21 @@ public class RpcV2Controller extends AbstractRpcController {
100 public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { 106 public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
101 checkParameter("RpcId", strRpc); 107 checkParameter("RpcId", strRpc);
102 try { 108 try {
103 - rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc))); 109 + RpcId rpcId = new RpcId(UUID.fromString(strRpc));
  110 + Rpc rpc = checkRpcId(rpcId, Operation.DELETE);
  111 +
  112 + if (rpc != null) {
  113 + if (rpc.getStatus().equals(RpcStatus.QUEUED)) {
  114 + RemoveRpcActorMsg removeMsg = new RemoveRpcActorMsg(getTenantId(), rpc.getDeviceId(), rpc.getUuidId());
  115 + log.trace("[{}] Forwarding msg {} to queue actor!", rpc.getDeviceId(), rpc);
  116 + tbClusterService.pushMsgToCore(removeMsg, null);
  117 + }
  118 +
  119 + rpcService.deleteRpc(getTenantId(), rpcId);
  120 +
  121 + TbMsg msg = TbMsg.newMsg(RPC_DELETED, rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc));
  122 + tbClusterService.pushMsgToRuleEngine(getTenantId(), rpc.getDeviceId(), msg, null);
  123 + }
104 } catch (Exception e) { 124 } catch (Exception e) {
105 throw handleException(e); 125 throw handleException(e);
106 } 126 }
@@ -139,6 +139,12 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -139,6 +139,12 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
139 } 139 }
140 } 140 }
141 141
  142 + @Override
  143 + public void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg) {
  144 + log.trace("[{}][{}] Processing remove RPC [{}]", removeRpcMsg.getTenantId(), removeRpcMsg.getRequestId(), removeRpcMsg.getDeviceId());
  145 + actorContext.tellWithHighPriority(removeRpcMsg);
  146 + }
  147 +
142 private void sendRpcResponseToTbRuleEngine(String originServiceId, FromDeviceRpcResponse response) { 148 private void sendRpcResponseToTbRuleEngine(String originServiceId, FromDeviceRpcResponse response) {
143 if (serviceId.equals(originServiceId)) { 149 if (serviceId.equals(originServiceId)) {
144 if (tbRuleEngineRpcService.isPresent()) { 150 if (tbRuleEngineRpcService.isPresent()) {
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.rpc;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.RequiredArgsConstructor;
  20 +import lombok.ToString;
  21 +import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.msg.MsgType;
  25 +
  26 +import java.util.UUID;
  27 +
  28 +@ToString
  29 +@RequiredArgsConstructor
  30 +public class RemoveRpcActorMsg implements ToDeviceActorNotificationMsg {
  31 +
  32 + @Getter
  33 + private final TenantId tenantId;
  34 + @Getter
  35 + private final DeviceId deviceId;
  36 +
  37 + @Getter
  38 + private final UUID requestId;
  39 +
  40 + @Override
  41 + public MsgType getMsgType() {
  42 + return MsgType.REMOVE_RPC_TO_DEVICE_ACTOR_MSG;
  43 + }
  44 +}
@@ -56,4 +56,6 @@ public interface TbCoreDeviceRpcService { @@ -56,4 +56,6 @@ public interface TbCoreDeviceRpcService {
56 */ 56 */
57 void processRpcResponseFromDeviceActor(FromDeviceRpcResponse response); 57 void processRpcResponseFromDeviceActor(FromDeviceRpcResponse response);
58 58
  59 + void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg);
  60 +
59 } 61 }
@@ -89,6 +89,7 @@ public class DataConstants { @@ -89,6 +89,7 @@ public class DataConstants {
89 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; 89 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
90 public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; 90 public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
91 public static final String RPC_FAILED = "RPC_FAILED"; 91 public static final String RPC_FAILED = "RPC_FAILED";
  92 + public static final String RPC_DELETED = "RPC_DELETED";
92 93
93 public static final String DEFAULT_SECRET_KEY = ""; 94 public static final String DEFAULT_SECRET_KEY = "";
94 public static final String SECRET_KEY_FIELD_NAME = "secretKey"; 95 public static final String SECRET_KEY_FIELD_NAME = "secretKey";
@@ -92,6 +92,8 @@ public enum MsgType { @@ -92,6 +92,8 @@ public enum MsgType {
92 92
93 DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG, 93 DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
94 94
  95 + REMOVE_RPC_TO_DEVICE_ACTOR_MSG,
  96 +
95 /** 97 /**
96 * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement 98 * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
97 */ 99 */
@@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
33 type = ComponentType.FILTER, 33 type = ComponentType.FILTER,
34 name = "message type switch", 34 name = "message type switch",
35 configClazz = EmptyNodeConfiguration.class, 35 configClazz = EmptyNodeConfiguration.class,
36 - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", 36 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted",
37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", 37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", 38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
39 "Timeseries Updated", "Timeseries Deleted"}, 39 "Timeseries Updated", "Timeseries Deleted"},
@@ -105,6 +105,8 @@ public class TbMsgTypeSwitchNode implements TbNode { @@ -105,6 +105,8 @@ public class TbMsgTypeSwitchNode implements TbNode {
105 relationType = "RPC Timeout"; 105 relationType = "RPC Timeout";
106 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { 106 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
107 relationType = "RPC Failed"; 107 relationType = "RPC Failed";
  108 + } else if (msg.getType().equals(DataConstants.RPC_DELETED)) {
  109 + relationType = "RPC Deleted";
108 } else { 110 } else {
109 relationType = "Other"; 111 relationType = "Other";
110 } 112 }
@@ -118,7 +118,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -118,7 +118,7 @@ public class TbSendRPCRequestNode implements TbNode {
118 .build(); 118 .build();
119 119
120 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { 120 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
121 - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { 121 + if (ruleEngineDeviceRpcResponse.getError().isEmpty()) {
122 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); 122 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
123 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); 123 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
124 } else { 124 } else {