Commit 65032092e17a96f3a096359cab930ec41799bf02

Authored by Andrii Shvaika
1 parent cf2a7762

Persistent RPC calls review

@@ -206,13 +206,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -206,13 +206,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
206 syncSessionSet.forEach(rpcSubscriptions::remove); 206 syncSessionSet.forEach(rpcSubscriptions::remove);
207 } 207 }
208 208
209 - if (persisted && !(sent || request.isOneway())) { 209 + if (persisted) {
210 ObjectNode response = JacksonUtil.newObjectNode(); 210 ObjectNode response = JacksonUtil.newObjectNode();
211 response.put("rpcId", request.getId().toString()); 211 response.put("rpcId", request.getId().toString());
212 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); 212 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
213 } 213 }
214 214
215 - if (request.isOneway() && sent) { 215 + if (!persisted && request.isOneway() && sent) {
216 log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); 216 log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
217 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); 217 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
218 } else { 218 } else {
@@ -298,7 +298,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -298,7 +298,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
298 toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds)); 298 toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
299 } 299 }
300 300
301 - sentOneWayIds.forEach(toDeviceRpcPendingMap::remove); 301 + sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
302 } 302 }
303 303
304 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) { 304 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
@@ -503,9 +503,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -503,9 +503,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
503 } 503 }
504 } else { 504 } else {
505 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 505 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
506 - if (requestMd.getMsg().getMsg().isPersisted()) {  
507 - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload()));  
508 - }  
509 } 506 }
510 } 507 }
511 508
@@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.ResponseBody; @@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
34 import org.springframework.web.bind.annotation.RestController; 34 import org.springframework.web.bind.annotation.RestController;
35 import org.springframework.web.context.request.async.DeferredResult; 35 import org.springframework.web.context.request.async.DeferredResult;
36 import org.thingsboard.rule.engine.api.RpcError; 36 import org.thingsboard.rule.engine.api.RpcError;
  37 +import org.thingsboard.server.common.data.DataConstants;
37 import org.thingsboard.server.common.data.audit.ActionType; 38 import org.thingsboard.server.common.data.audit.ActionType;
38 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; 39 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
39 import org.thingsboard.server.common.data.exception.ThingsboardException; 40 import org.thingsboard.server.common.data.exception.ThingsboardException;
@@ -100,7 +101,7 @@ public class RpcController extends BaseController { @@ -100,7 +101,7 @@ public class RpcController extends BaseController {
100 } 101 }
101 102
102 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") 103 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
103 - @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET) 104 + @RequestMapping(value = "/persistent/{rpcId}", method = RequestMethod.GET)
104 @ResponseBody 105 @ResponseBody
105 public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException { 106 public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
106 checkParameter("RpcId", strRpc); 107 checkParameter("RpcId", strRpc);
@@ -113,7 +114,7 @@ public class RpcController extends BaseController { @@ -113,7 +114,7 @@ public class RpcController extends BaseController {
113 } 114 }
114 115
115 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") 116 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
116 - @RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET) 117 + @RequestMapping(value = "/persistent/device/{deviceId}", method = RequestMethod.GET)
117 @ResponseBody 118 @ResponseBody
118 public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId, 119 public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId,
119 @RequestParam int pageSize, 120 @RequestParam int pageSize,
@@ -134,7 +135,7 @@ public class RpcController extends BaseController { @@ -134,7 +135,7 @@ public class RpcController extends BaseController {
134 } 135 }
135 136
136 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") 137 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
137 - @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE) 138 + @RequestMapping(value = "/persistent/{rpcId}", method = RequestMethod.DELETE)
138 @ResponseBody 139 @ResponseBody
139 public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { 140 public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
140 checkParameter("RpcId", strRpc); 141 checkParameter("RpcId", strRpc);
@@ -155,7 +156,7 @@ public class RpcController extends BaseController { @@ -155,7 +156,7 @@ public class RpcController extends BaseController {
155 long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout; 156 long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;
156 long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); 157 long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
157 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); 158 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
158 - boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean(); 159 + boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
159 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { 160 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
160 @Override 161 @Override
161 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { 162 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@@ -157,7 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -157,7 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
157 metaData.putValue("originServiceId", serviceId); 157 metaData.putValue("originServiceId", serviceId);
158 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); 158 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
159 metaData.putValue("oneway", Boolean.toString(msg.isOneway())); 159 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
160 - metaData.putValue("persisted", Boolean.toString(msg.isPersisted())); 160 + metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted()));
161 161
162 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); 162 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
163 if (device != null) { 163 if (device != null) {
@@ -35,6 +35,7 @@ public class DataConstants { @@ -35,6 +35,7 @@ public class DataConstants {
35 public static final String IS_CLEARED_ALARM = "isClearedAlarm"; 35 public static final String IS_CLEARED_ALARM = "isClearedAlarm";
36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats"; 36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration"; 37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
  38 + public static final String PERSISTENT = "persistent";
38 39
39 public static final String[] allScopes() { 40 public static final String[] allScopes() {
40 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE}; 41 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
@@ -81,7 +81,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -81,7 +81,7 @@ public class TbSendRPCRequestNode implements TbNode {
81 tmp = msg.getMetaData().getValue("oneway"); 81 tmp = msg.getMetaData().getValue("oneway");
82 boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); 82 boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
83 83
84 - tmp = msg.getMetaData().getValue("persisted"); 84 + tmp = msg.getMetaData().getValue(DataConstants.PERSISTENT);
85 boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); 85 boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
86 86
87 tmp = msg.getMetaData().getValue("requestUUID"); 87 tmp = msg.getMetaData().getValue("requestUUID");