Commit f9e06b63a6e23c3c9974915a3e3a23e56423a468

Authored by xp.Huang
2 parents 40685e07 7ab1f566

Merge branch '20230615' into 'master_dev'

refactor: RPC命令下发兼容,定时任务和接口

See merge request yunteng/thingskit!198
@@ -71,8 +71,6 @@ public abstract class AbstractRpcController extends BaseController { @@ -71,8 +71,6 @@ public abstract class AbstractRpcController extends BaseController {
71 protected AccessValidator accessValidator; 71 protected AccessValidator accessValidator;
72 72
73 73
74 - @Autowired  
75 - protected TkDeviceService tkDeviceService;  
76 74
77 @Value("${server.rest.server_side_rpc.min_timeout:5000}") 75 @Value("${server.rest.server_side_rpc.min_timeout:5000}")
78 protected long minTimeout; 76 protected long minTimeout;
@@ -83,42 +81,24 @@ public abstract class AbstractRpcController extends BaseController { @@ -83,42 +81,24 @@ public abstract class AbstractRpcController extends BaseController {
83 protected DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody, HttpStatus timeoutStatus, HttpStatus noActiveConnectionStatus) throws ThingsboardException { 81 protected DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody, HttpStatus timeoutStatus, HttpStatus noActiveConnectionStatus) throws ThingsboardException {
84 try { 82 try {
85 JsonNode rpcRequestBody = JacksonUtil.toJsonNode(requestBody); 83 JsonNode rpcRequestBody = JacksonUtil.toJsonNode(requestBody);
  84 + ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(rpcRequestBody.get("method").asText(), JacksonUtil.toString(rpcRequestBody.get("params")));
86 SecurityUser currentUser = getCurrentUser(); 85 SecurityUser currentUser = getCurrentUser();
87 -  
88 - //Thingskit function  
89 - DeviceDTO targetDevice = tkDeviceService.findDeviceInfoByTbDeviceId(currentUser.getCurrentTenantId(), deviceId.getId().toString());  
90 - ObjectNode additional = (ObjectNode) rpcRequestBody.get(DataConstants.ADDITIONAL_INFO);  
91 - if(additional == null){  
92 - additional = JacksonUtil.newObjectNode();  
93 - }  
94 - if(!additional.has(FastIotConstants.Rpc.TARGET_ID)){  
95 - additional.put(FastIotConstants.Rpc.TARGET_ID, deviceId.getId().toString());  
96 - }  
97 - String additionalInfo = JacksonUtil.toString(additional);  
98 - String methodName = rpcRequestBody.get(FastIotConstants.Rpc.METHOD_NAME).asText();  
99 - JsonNode params = rpcRequestBody.get(FastIotConstants.Rpc.PARAMS_NAME);  
100 - if(DeviceTypeEnum.SENSOR == targetDevice.getDeviceType() && params!=null && !params.isTextual()){  
101 - ObjectNode methodParams = (ObjectNode) rpcRequestBody.get(FastIotConstants.Rpc.PARAMS_NAME);  
102 - methodParams.put(FastIotConstants.Rpc.TARGET_NAME,targetDevice.getName());  
103 - }  
104 - DeviceId realDevice = DeviceTypeEnum.SENSOR == targetDevice.getDeviceType()?new DeviceId(UUID.fromString(targetDevice.getGatewayId())):deviceId;  
105 -  
106 - ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(methodName, JacksonUtil.toString(params));  
107 TenantId tenantId = currentUser.getTenantId(); 86 TenantId tenantId = currentUser.getTenantId();
108 final DeferredResult<ResponseEntity> response = new DeferredResult<>(); 87 final DeferredResult<ResponseEntity> response = new DeferredResult<>();
109 long timeout = rpcRequestBody.has(DataConstants.TIMEOUT) ? rpcRequestBody.get(DataConstants.TIMEOUT).asLong() : defaultTimeout; 88 long timeout = rpcRequestBody.has(DataConstants.TIMEOUT) ? rpcRequestBody.get(DataConstants.TIMEOUT).asLong() : defaultTimeout;
110 long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout); 89 long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout);
111 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); 90 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
112 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean(); 91 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
113 - 92 + String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
114 Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null; 93 Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null;
115 - accessValidator.validate(currentUser, Operation.RPC_CALL, realDevice, new HttpValidationCallback(response, new FutureCallback<>() { 94 +
  95 + accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
116 @Override 96 @Override
117 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { 97 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
118 98
119 ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(rpcRequestUUID, 99 ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(rpcRequestUUID,
120 tenantId, 100 tenantId,
121 - realDevice, 101 + deviceId,
122 oneWay, 102 oneWay,
123 expTime, 103 expTime,
124 body, 104 body,
@@ -21,19 +21,27 @@ import com.fasterxml.jackson.databind.node.ObjectNode; @@ -21,19 +21,27 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
21 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
22 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.common.util.JacksonUtil;
24 import org.thingsboard.common.util.ThingsBoardThreadFactory; 25 import org.thingsboard.common.util.ThingsBoardThreadFactory;
25 import org.thingsboard.server.actors.ActorSystemContext; 26 import org.thingsboard.server.actors.ActorSystemContext;
26 import org.thingsboard.server.cluster.TbClusterService; 27 import org.thingsboard.server.cluster.TbClusterService;
27 import org.thingsboard.server.common.data.DataConstants; 28 import org.thingsboard.server.common.data.DataConstants;
28 import org.thingsboard.server.common.data.Device; 29 import org.thingsboard.server.common.data.Device;
  30 +import org.thingsboard.server.common.data.StringUtils;
29 import org.thingsboard.server.common.data.User; 31 import org.thingsboard.server.common.data.User;
  32 +import org.thingsboard.server.common.data.id.DeviceId;
30 import org.thingsboard.server.common.data.rpc.RpcError; 33 import org.thingsboard.server.common.data.rpc.RpcError;
  34 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  35 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  36 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
  37 +import org.thingsboard.server.common.data.yunteng.enums.DeviceTypeEnum;
31 import org.thingsboard.server.common.msg.TbMsg; 38 import org.thingsboard.server.common.msg.TbMsg;
32 import org.thingsboard.server.common.msg.TbMsgDataType; 39 import org.thingsboard.server.common.msg.TbMsgDataType;
33 import org.thingsboard.server.common.msg.TbMsgMetaData; 40 import org.thingsboard.server.common.msg.TbMsgMetaData;
34 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; 41 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
35 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 42 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
36 import org.thingsboard.server.dao.device.DeviceService; 43 import org.thingsboard.server.dao.device.DeviceService;
  44 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
37 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; 45 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
38 import org.thingsboard.server.queue.util.TbCoreComponent; 46 import org.thingsboard.server.queue.util.TbCoreComponent;
39 import org.thingsboard.server.service.security.model.SecurityUser; 47 import org.thingsboard.server.service.security.model.SecurityUser;
@@ -71,6 +79,8 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -71,6 +79,8 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
71 private ScheduledExecutorService scheduler; 79 private ScheduledExecutorService scheduler;
72 private String serviceId; 80 private String serviceId;
73 81
  82 + @Autowired
  83 + protected TkDeviceService tkDeviceService;
74 public DefaultTbCoreDeviceRpcService(DeviceService deviceService, TbClusterService clusterService, TbServiceInfoProvider serviceInfoProvider, 84 public DefaultTbCoreDeviceRpcService(DeviceService deviceService, TbClusterService clusterService, TbServiceInfoProvider serviceInfoProvider,
75 ActorSystemContext actorContext) { 85 ActorSystemContext actorContext) {
76 this.deviceService = deviceService; 86 this.deviceService = deviceService;
@@ -100,6 +110,28 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -100,6 +110,28 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
100 @Override 110 @Override
101 public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer, SecurityUser currentUser) { 111 public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer, SecurityUser currentUser) {
102 log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); 112 log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
  113 +
  114 + //Thingskit function 修改RPC三要素:设备ID、参数、附加信息
  115 + DeviceId targetId = request.getDeviceId();
  116 + DeviceDTO targetDevice = tkDeviceService.findDeviceInfoByTbDeviceId(currentUser.getCurrentTenantId(), targetId.getId().toString());
  117 + DeviceId netEnableId = DeviceTypeEnum.SENSOR == targetDevice.getDeviceType()?new DeviceId(UUID.fromString(targetDevice.getGatewayId())):targetId;
  118 + ToDeviceRpcRequestBody body = request.getBody();
  119 + String paramStr = body.getParams();
  120 + if(DeviceTypeEnum.SENSOR == targetDevice.getDeviceType() && paramStr.contains("}")){
  121 + ObjectNode methodParams = (ObjectNode) JacksonUtil.toJsonNode(paramStr);
  122 + methodParams.put(FastIotConstants.Rpc.TARGET_NAME,targetDevice.getName());
  123 + body = new ToDeviceRpcRequestBody(body.getMethod(),JacksonUtil.toString(methodParams));
  124 + }
  125 +
  126 + ObjectNode additional = JacksonUtil.newObjectNode();
  127 + if(StringUtils.isNotBlank(request.getAdditionalInfo())){
  128 + additional = (ObjectNode) JacksonUtil.toJsonNode(request.getAdditionalInfo());
  129 + }
  130 + if(!additional.has(FastIotConstants.Rpc.TARGET_ID)){
  131 + additional.put(FastIotConstants.Rpc.TARGET_ID, targetId.getId().toString());
  132 + }
  133 + request = new ToDeviceRpcRequest(request.getId(),request.getTenantId(),netEnableId,request.isOneway(),request.getExpirationTime(),body,request.isPersisted(),request.getRetries(),JacksonUtil.toString(additional));
  134 +
103 UUID requestId = request.getId(); 135 UUID requestId = request.getId();
104 localToRuleEngineRpcRequests.put(requestId, responseConsumer); 136 localToRuleEngineRpcRequests.put(requestId, responseConsumer);
105 sendRpcRequestToRuleEngine(request, currentUser); 137 sendRpcRequestToRuleEngine(request, currentUser);