Commit e04221b6e2f3adaceec15d5fbbb27f50e4563f79

Authored by 芯火源
1 parent 6688e6c4

fix(DEFECT-1234): 给网关子设备下发RPC指令

1、给子设备下发RPC指令时,自动转向网关
2、RPC记录挂载到网关子设备
... ... @@ -58,6 +58,7 @@ import org.thingsboard.server.common.data.rpc.RpcStatus;
58 58 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
59 59 import org.thingsboard.server.common.data.security.DeviceCredentials;
60 60 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
  61 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
61 62 import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
62 63 import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
63 64 import org.thingsboard.server.common.msg.TbActorMsg;
... ... @@ -242,20 +243,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
242 243 }
243 244
244 245 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
  246 + //Thingskit function
  247 + JsonNode old = JacksonUtil.toJsonNode(request.getAdditionalInfo());
  248 + ObjectNode additional = (old == null || old.isEmpty()) ?mapper.createObjectNode():(ObjectNode)old;
  249 + if(!additional.has(ModelConstants.TablePropertyMapping.COMMAND_TYPE)){
  250 + additional.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.DIY.ordinal());
  251 + }
  252 + DeviceId saveDeviceId = deviceId;
  253 + if(additional.has(FastIotConstants.Rpc.TARGET_ID)){
  254 + saveDeviceId = new DeviceId(UUID.fromString(additional.get(FastIotConstants.Rpc.TARGET_ID).asText()));
  255 + }
  256 +
245 257 Rpc rpc = new Rpc(new RpcId(request.getId()));
246 258 rpc.setCreatedTime(System.currentTimeMillis());
247 259 rpc.setTenantId(tenantId);
248   - rpc.setDeviceId(deviceId);
  260 + rpc.setDeviceId(saveDeviceId);
249 261 rpc.setExpirationTime(request.getExpirationTime());
250 262 rpc.setRequest(JacksonUtil.valueToTree(request));
251 263 rpc.setStatus(status);
252 264 rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo()));
253   - //Thingskit function
254   - JsonNode old = JacksonUtil.toJsonNode(request.getAdditionalInfo());
255   - ObjectNode additional = (old == null || old.isEmpty()) ?mapper.createObjectNode():(ObjectNode)old;
256   - if(!additional.has(ModelConstants.TablePropertyMapping.COMMAND_TYPE)){
257   - additional.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.DIY.ordinal());
258   - }
  265 +
259 266 rpc.setAdditionalInfo(additional);
260 267
261 268 return systemContext.getTbRpcService().save(tenantId, rpc);
... ...
... ... @@ -16,7 +16,9 @@
16 16 package org.thingsboard.server.controller;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
19 20 import com.google.common.util.concurrent.FutureCallback;
  21 +import com.google.gson.JsonObject;
20 22 import lombok.extern.slf4j.Slf4j;
21 23 import org.springframework.beans.factory.annotation.Autowired;
22 24 import org.springframework.beans.factory.annotation.Value;
... ... @@ -35,8 +37,14 @@ import org.thingsboard.server.common.data.id.TenantId;
35 37 import org.thingsboard.server.common.data.id.UUIDBased;
36 38 import org.thingsboard.server.common.data.rpc.RpcError;
37 39 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  40 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  41 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  42 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
  43 +import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
  44 +import org.thingsboard.server.common.data.yunteng.enums.DeviceTypeEnum;
38 45 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
39 46 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  47 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
40 48 import org.thingsboard.server.queue.util.TbCoreComponent;
41 49 import org.thingsboard.server.service.rpc.LocalRequestMetaData;
42 50 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
... ... @@ -62,6 +70,10 @@ public abstract class AbstractRpcController extends BaseController {
62 70 @Autowired
63 71 protected AccessValidator accessValidator;
64 72
  73 +
  74 + @Autowired
  75 + protected TkDeviceService tkDeviceService;
  76 +
65 77 @Value("${server.rest.server_side_rpc.min_timeout:5000}")
66 78 protected long minTimeout;
67 79
... ... @@ -79,14 +91,28 @@ public abstract class AbstractRpcController extends BaseController {
79 91 long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout);
80 92 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
81 93 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
82   - String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
  94 +
  95 +
  96 + //Thingskit function
  97 + DeviceDTO tkDevice = tkDeviceService.findDeviceInfoByTbDeviceId(currentUser.getCurrentTenantId(), deviceId.getId().toString());
  98 + ObjectNode additional = (ObjectNode) rpcRequestBody.get(DataConstants.ADDITIONAL_INFO);
  99 + if(additional == null){
  100 + additional = JacksonUtil.newObjectNode();
  101 + }
  102 + if(!additional.has(FastIotConstants.Rpc.TARGET_ID)){
  103 + additional.put(FastIotConstants.Rpc.TARGET_ID, deviceId.getId().toString());
  104 + }
  105 + DeviceId realDevice = DeviceTypeEnum.SENSOR == tkDevice.getDeviceType()?new DeviceId(UUID.fromString(tkDevice.getGatewayId())):deviceId;
  106 + String additionalInfo = JacksonUtil.toString(additional);
  107 +
83 108 Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null;
84   - accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
  109 + accessValidator.validate(currentUser, Operation.RPC_CALL, realDevice, new HttpValidationCallback(response, new FutureCallback<>() {
85 110 @Override
86 111 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
  112 +
87 113 ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(rpcRequestUUID,
88 114 tenantId,
89   - deviceId,
  115 + realDevice,
90 116 oneWay,
91 117 expTime,
92 118 body,
... ...
... ... @@ -167,5 +167,7 @@ public interface FastIotConstants {
167 167 public static String METHOD_NAME = "method";
168 168 /**RPC参数*/
169 169 public static String PARAMS_NAME = "params";
  170 + /**实控设备*/
  171 + public static String TARGET_ID = "target";
170 172 }
171 173 }
... ...