Commit 489c67778a4009964b0686c834d22ba591e2a588

Authored by YevhenBondarenko
1 parent 08e517a6

implemented persisted RPC

Showing 45 changed files with 940 additions and 60 deletions
@@ -135,3 +135,15 @@ $$; @@ -135,3 +135,15 @@ $$;
135 ALTER TABLE api_usage_state 135 ALTER TABLE api_usage_state
136 ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32); 136 ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32);
137 UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL; 137 UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL;
  138 +
  139 +CREATE TABLE IF NOT EXISTS rpc (
  140 + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
  141 + created_time bigint NOT NULL,
  142 + tenant_id uuid NOT NULL,
  143 + device_id uuid NOT NULL,
  144 + expiration_time bigint NOT NULL,
  145 + request varchar(10000000) NOT NULL,
  146 + response varchar(10000000),
  147 + status varchar(255) NOT NULL
  148 +);
  149 +
@@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService; @@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService;
65 import org.thingsboard.server.dao.resource.ResourceService; 65 import org.thingsboard.server.dao.resource.ResourceService;
66 import org.thingsboard.server.dao.rule.RuleChainService; 66 import org.thingsboard.server.dao.rule.RuleChainService;
67 import org.thingsboard.server.dao.rule.RuleNodeStateService; 67 import org.thingsboard.server.dao.rule.RuleNodeStateService;
  68 +import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
68 import org.thingsboard.server.dao.tenant.TenantProfileService; 69 import org.thingsboard.server.dao.tenant.TenantProfileService;
69 import org.thingsboard.server.dao.tenant.TenantService; 70 import org.thingsboard.server.dao.tenant.TenantService;
70 import org.thingsboard.server.dao.timeseries.TimeseriesService; 71 import org.thingsboard.server.dao.timeseries.TimeseriesService;
@@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService; @@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
80 import org.thingsboard.server.service.executors.SharedEventLoopGroupService; 81 import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
81 import org.thingsboard.server.service.mail.MailExecutorService; 82 import org.thingsboard.server.service.mail.MailExecutorService;
82 import org.thingsboard.server.service.profile.TbDeviceProfileCache; 83 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
83 -import org.thingsboard.server.dao.tenant.TbTenantProfileCache;  
84 import org.thingsboard.server.service.queue.TbClusterService; 84 import org.thingsboard.server.service.queue.TbClusterService;
85 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; 85 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  86 +import org.thingsboard.server.service.rpc.TbRpcService;
86 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; 87 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
87 import org.thingsboard.server.service.script.JsExecutorService; 88 import org.thingsboard.server.service.script.JsExecutorService;
88 import org.thingsboard.server.service.script.JsInvokeService; 89 import org.thingsboard.server.service.script.JsInvokeService;
@@ -303,23 +304,33 @@ public class ActorSystemContext { @@ -303,23 +304,33 @@ public class ActorSystemContext {
303 304
304 @Lazy 305 @Lazy
305 @Autowired(required = false) 306 @Autowired(required = false)
306 - @Getter private EdgeService edgeService; 307 + @Getter
  308 + private EdgeService edgeService;
307 309
308 @Lazy 310 @Lazy
309 @Autowired(required = false) 311 @Autowired(required = false)
310 - @Getter private EdgeEventService edgeEventService; 312 + @Getter
  313 + private EdgeEventService edgeEventService;
311 314
312 @Lazy 315 @Lazy
313 @Autowired(required = false) 316 @Autowired(required = false)
314 - @Getter private EdgeRpcService edgeRpcService; 317 + @Getter
  318 + private EdgeRpcService edgeRpcService;
315 319
316 @Lazy 320 @Lazy
317 @Autowired(required = false) 321 @Autowired(required = false)
318 - @Getter private ResourceService resourceService; 322 + @Getter
  323 + private ResourceService resourceService;
319 324
320 @Lazy 325 @Lazy
321 @Autowired(required = false) 326 @Autowired(required = false)
322 - @Getter private OtaPackageService otaPackageService; 327 + @Getter
  328 + private OtaPackageService otaPackageService;
  329 +
  330 + @Lazy
  331 + @Autowired(required = false)
  332 + @Getter
  333 + private TbRpcService tbRpcService;
323 334
324 @Value("${actors.session.max_concurrent_sessions_per_device:1}") 335 @Value("${actors.session.max_concurrent_sessions_per_device:1}")
325 @Getter 336 @Getter
@@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor { @@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor {
46 super.init(ctx); 46 super.init(ctx);
47 log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); 47 log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
48 try { 48 try {
49 - processor.initSessionTimeout(ctx); 49 + processor.init(ctx);
50 log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); 50 log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
51 } catch (Exception e) { 51 } catch (Exception e) {
52 log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e); 52 log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors; @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.protobuf.InvalidProtocolBufferException; 23 import com.google.protobuf.InvalidProtocolBufferException;
24 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
25 import org.apache.commons.collections.CollectionUtils; 25 import org.apache.commons.collections.CollectionUtils;
  26 +import org.thingsboard.common.util.JacksonUtil;
26 import org.thingsboard.rule.engine.api.RpcError; 27 import org.thingsboard.rule.engine.api.RpcError;
27 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; 28 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
28 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; 29 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
@@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
38 import org.thingsboard.server.common.data.edge.EdgeEventType; 39 import org.thingsboard.server.common.data.edge.EdgeEventType;
39 import org.thingsboard.server.common.data.id.DeviceId; 40 import org.thingsboard.server.common.data.id.DeviceId;
40 import org.thingsboard.server.common.data.id.EdgeId; 41 import org.thingsboard.server.common.data.id.EdgeId;
  42 +import org.thingsboard.server.common.data.id.RpcId;
41 import org.thingsboard.server.common.data.id.TenantId; 43 import org.thingsboard.server.common.data.id.TenantId;
42 import org.thingsboard.server.common.data.kv.AttributeKey; 44 import org.thingsboard.server.common.data.kv.AttributeKey;
43 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 45 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
44 import org.thingsboard.server.common.data.kv.KvEntry; 46 import org.thingsboard.server.common.data.kv.KvEntry;
  47 +import org.thingsboard.server.common.data.page.PageData;
  48 +import org.thingsboard.server.common.data.page.PageLink;
45 import org.thingsboard.server.common.data.relation.EntityRelation; 49 import org.thingsboard.server.common.data.relation.EntityRelation;
46 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 50 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
  51 +import org.thingsboard.server.common.data.rpc.Rpc;
  52 +import org.thingsboard.server.common.data.rpc.RpcStatus;
47 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; 53 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
48 import org.thingsboard.server.common.data.security.DeviceCredentials; 54 import org.thingsboard.server.common.data.security.DeviceCredentials;
49 import org.thingsboard.server.common.data.security.DeviceCredentialsType; 55 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
52 import org.thingsboard.server.common.msg.queue.TbCallback; 58 import org.thingsboard.server.common.msg.queue.TbCallback;
53 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 59 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
54 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; 60 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
55 -import org.thingsboard.server.gen.transport.TransportProtos;  
56 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 61 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
  62 +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
57 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; 63 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
58 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 64 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
59 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; 65 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; @@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
68 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; 74 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
69 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; 75 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
70 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; 76 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
  77 +import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;
71 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; 78 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
72 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; 79 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
73 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; 80 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
74 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 81 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  82 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
75 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; 83 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
76 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; 84 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
77 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 85 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@@ -162,19 +170,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -162,19 +170,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
162 170
163 void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { 171 void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
164 ToDeviceRpcRequest request = msg.getMsg(); 172 ToDeviceRpcRequest request = msg.getMsg();
165 - ToDeviceRpcRequestBody body = request.getBody();  
166 - ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder()  
167 - .setRequestId(rpcSeq++)  
168 - .setMethodName(body.getMethod())  
169 - .setParams(body.getParams())  
170 - .setExpirationTime(request.getExpirationTime())  
171 - .setRequestIdMSB(request.getId().getMostSignificantBits())  
172 - .setRequestIdLSB(request.getId().getLeastSignificantBits())  
173 - .build(); 173 + ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
174 174
175 long timeout = request.getExpirationTime() - System.currentTimeMillis(); 175 long timeout = request.getExpirationTime() - System.currentTimeMillis();
  176 + boolean persisted = request.isPersisted();
176 if (timeout <= 0) { 177 if (timeout <= 0) {
177 log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); 178 log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
  179 +
  180 + if (persisted) {
  181 + Rpc rpc = new Rpc(new RpcId(request.getId()));
  182 + rpc.setCreatedTime(System.currentTimeMillis());
  183 + rpc.setTenantId(tenantId);
  184 + rpc.setDeviceId(deviceId);
  185 + rpc.setExpirationTime(request.getExpirationTime());
  186 + rpc.setRequest(JacksonUtil.valueToTree(request));
  187 + rpc.setStatus(RpcStatus.TIMEOUT);
  188 + systemContext.getTbRpcService().save(tenantId, rpc);
  189 + }
178 return; 190 return;
179 } 191 }
180 192
@@ -195,6 +207,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -195,6 +207,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
195 syncSessionSet.forEach(rpcSubscriptions::remove); 207 syncSessionSet.forEach(rpcSubscriptions::remove);
196 } 208 }
197 209
  210 + if (persisted) {
  211 + Rpc rpc = new Rpc(new RpcId(request.getId()));
  212 + rpc.setCreatedTime(System.currentTimeMillis());
  213 + rpc.setTenantId(tenantId);
  214 + rpc.setDeviceId(deviceId);
  215 + rpc.setExpirationTime(request.getExpirationTime());
  216 + rpc.setRequest(JacksonUtil.valueToTree(request));
  217 + rpc.setStatus(sent ? RpcStatus.SENT : RpcStatus.QUEUED);
  218 + systemContext.getTbRpcService().save(tenantId, rpc);
  219 + if (!(sent || request.isOneway())) {
  220 + ObjectNode response = JacksonUtil.newObjectNode();
  221 + response.put("rpcId", request.getId().toString());
  222 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
  223 + }
  224 + }
  225 +
198 if (request.isOneway() && sent) { 226 if (request.isOneway() && sent) {
199 log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); 227 log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
200 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); 228 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
@@ -208,6 +236,20 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -208,6 +236,20 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
208 } 236 }
209 } 237 }
210 238
  239 + private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
  240 + ToDeviceRpcRequestBody body = request.getBody();
  241 + return ToDeviceRpcRequestMsg.newBuilder()
  242 + .setRequestId(rpcSeq++)
  243 + .setMethodName(body.getMethod())
  244 + .setParams(body.getParams())
  245 + .setExpirationTime(request.getExpirationTime())
  246 + .setRequestIdMSB(request.getId().getMostSignificantBits())
  247 + .setRequestIdLSB(request.getId().getLeastSignificantBits())
  248 + .setOneway(request.isOneway())
  249 + .setPersisted(request.isPersisted())
  250 + .build();
  251 + }
  252 +
211 void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) { 253 void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
212 log.debug("[{}] Processing rpc command response from edge session", deviceId); 254 log.debug("[{}] Processing rpc command response from edge session", deviceId);
213 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); 255 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
@@ -229,6 +271,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -229,6 +271,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
229 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); 271 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
230 if (requestMd != null) { 272 if (requestMd != null) {
231 log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); 273 log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
  274 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
232 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 275 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
233 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 276 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
234 } 277 }
@@ -270,7 +313,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -270,7 +313,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
270 .setExpirationTime(request.getExpirationTime()) 313 .setExpirationTime(request.getExpirationTime())
271 .setRequestIdMSB(request.getId().getMostSignificantBits()) 314 .setRequestIdMSB(request.getId().getMostSignificantBits())
272 .setRequestIdLSB(request.getId().getLeastSignificantBits()) 315 .setRequestIdLSB(request.getId().getLeastSignificantBits())
  316 + .setOneway(request.isOneway())
  317 + .setPersisted(request.isPersisted())
273 .build(); 318 .build();
  319 +
  320 + if (request.isPersisted()) {
  321 + systemContext.getTbRpcService().save(tenantId, new RpcId(request.getId()), RpcStatus.SENT, null);
  322 + }
274 sendToTransport(rpcRequest, sessionId, nodeId); 323 sendToTransport(rpcRequest, sessionId, nodeId);
275 }; 324 };
276 } 325 }
@@ -299,10 +348,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -299,10 +348,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
299 if (msg.hasClaimDevice()) { 348 if (msg.hasClaimDevice()) {
300 handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice()); 349 handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice());
301 } 350 }
  351 + if (msg.hasPersistedRpcResponseMsg()) {
  352 + processPersistedRpcResponses(context, msg.getSessionInfo(), msg.getPersistedRpcResponseMsg());
  353 + }
302 callback.onSuccess(); 354 callback.onSuccess();
303 } 355 }
304 356
305 - private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) { 357 + private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
306 DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); 358 DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
307 systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()); 359 systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
308 } 360 }
@@ -441,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -441,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
441 if (success) { 493 if (success) {
442 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 494 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
443 responseMsg.getPayload(), null)); 495 responseMsg.getPayload(), null));
  496 + if (requestMd.getMsg().getMsg().isPersisted()) {
  497 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload()));
  498 + }
444 } else { 499 } else {
445 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 500 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
  501 + if (requestMd.getMsg().getMsg().isPersisted()) {
  502 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload()));
  503 + }
446 } 504 }
447 } 505 }
448 506
  507 + private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) {
  508 + UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
  509 + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null);
  510 + }
  511 +
449 private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { 512 private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
450 UUID sessionId = getSessionId(sessionInfo); 513 UUID sessionId = getSessionId(sessionInfo);
451 if (subscribeCmd.getUnsubscribe()) { 514 if (subscribeCmd.getUnsubscribe()) {
@@ -564,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -564,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
564 627
565 void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) { 628 void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
566 log.info("2) LwM2Mtype: "); 629 log.info("2) LwM2Mtype: ");
567 - TransportProtos.ToTransportUpdateCredentialsProto.Builder notification = TransportProtos.ToTransportUpdateCredentialsProto.newBuilder(); 630 + ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder();
568 notification.addCredentialsId(deviceCredentials.getCredentialsId()); 631 notification.addCredentialsId(deviceCredentials.getCredentialsId());
569 notification.addCredentialsValue(deviceCredentials.getCredentialsValue()); 632 notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
570 ToTransportMsg msg = ToTransportMsg.newBuilder() 633 ToTransportMsg msg = ToTransportMsg.newBuilder()
@@ -639,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -639,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
639 ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent); 702 ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
640 Futures.addCallback(future, new FutureCallback<EdgeEvent>() { 703 Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
641 @Override 704 @Override
642 - public void onSuccess( EdgeEvent result) { 705 + public void onSuccess(EdgeEvent result) {
643 systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); 706 systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
644 } 707 }
645 708
@@ -755,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -755,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
755 .addAllSessions(sessionsList).build().toByteArray()); 818 .addAllSessions(sessionsList).build().toByteArray());
756 } 819 }
757 820
758 - void initSessionTimeout(TbActorCtx ctx) { 821 + void init(TbActorCtx ctx) {
759 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); 822 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
  823 + PageLink pageLink = new PageLink(10);
  824 + PageData<Rpc> pageData;
  825 + do {
  826 + pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(deviceId, RpcStatus.QUEUED, pageLink);
  827 + pageData.getData().forEach(rpc -> {
  828 + ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
  829 + long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
  830 + if (timeout <= 0) {
  831 + rpc.setStatus(RpcStatus.TIMEOUT);
  832 + systemContext.getTbRpcService().save(tenantId, rpc);
  833 + } else {
  834 + registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
  835 + }
  836 + });
  837 + if (pageData.hasNext()) {
  838 + pageLink = pageLink.nextPageLink();
  839 + }
  840 + } while (pageData.hasNext());
760 } 841 }
761 842
762 void checkSessionsTimeout() { 843 void checkSessionsTimeout() {
@@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId; @@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId;
69 import org.thingsboard.server.common.data.id.EntityIdFactory; 69 import org.thingsboard.server.common.data.id.EntityIdFactory;
70 import org.thingsboard.server.common.data.id.EntityViewId; 70 import org.thingsboard.server.common.data.id.EntityViewId;
71 import org.thingsboard.server.common.data.id.OtaPackageId; 71 import org.thingsboard.server.common.data.id.OtaPackageId;
  72 +import org.thingsboard.server.common.data.id.RpcId;
72 import org.thingsboard.server.common.data.id.TbResourceId; 73 import org.thingsboard.server.common.data.id.TbResourceId;
73 import org.thingsboard.server.common.data.id.RuleChainId; 74 import org.thingsboard.server.common.data.id.RuleChainId;
74 import org.thingsboard.server.common.data.id.RuleNodeId; 75 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink; @@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
83 import org.thingsboard.server.common.data.plugin.ComponentDescriptor; 84 import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
84 import org.thingsboard.server.common.data.plugin.ComponentType; 85 import org.thingsboard.server.common.data.plugin.ComponentType;
85 import org.thingsboard.server.common.data.relation.EntityRelation; 86 import org.thingsboard.server.common.data.relation.EntityRelation;
  87 +import org.thingsboard.server.common.data.rpc.Rpc;
86 import org.thingsboard.server.common.data.rule.RuleChain; 88 import org.thingsboard.server.common.data.rule.RuleChain;
87 import org.thingsboard.server.common.data.rule.RuleChainType; 89 import org.thingsboard.server.common.data.rule.RuleChainType;
88 import org.thingsboard.server.common.data.rule.RuleNode; 90 import org.thingsboard.server.common.data.rule.RuleNode;
@@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants; @@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants;
106 import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService; 108 import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService;
107 import org.thingsboard.server.dao.oauth2.OAuth2Service; 109 import org.thingsboard.server.dao.oauth2.OAuth2Service;
108 import org.thingsboard.server.dao.relation.RelationService; 110 import org.thingsboard.server.dao.relation.RelationService;
  111 +import org.thingsboard.server.dao.rpc.RpcService;
109 import org.thingsboard.server.dao.rule.RuleChainService; 112 import org.thingsboard.server.dao.rule.RuleChainService;
110 import org.thingsboard.server.dao.tenant.TbTenantProfileCache; 113 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
111 import org.thingsboard.server.dao.tenant.TenantProfileService; 114 import org.thingsboard.server.dao.tenant.TenantProfileService;
@@ -246,6 +249,9 @@ public abstract class BaseController { @@ -246,6 +249,9 @@ public abstract class BaseController {
246 protected OtaPackageStateService otaPackageStateService; 249 protected OtaPackageStateService otaPackageStateService;
247 250
248 @Autowired 251 @Autowired
  252 + protected RpcService rpcService;
  253 +
  254 + @Autowired
249 protected TbQueueProducerProvider producerProvider; 255 protected TbQueueProducerProvider producerProvider;
250 256
251 @Autowired 257 @Autowired
@@ -786,6 +792,18 @@ public abstract class BaseController { @@ -786,6 +792,18 @@ public abstract class BaseController {
786 } 792 }
787 } 793 }
788 794
  795 + Rpc checkRpcId(RpcId rpcId, Operation operation) throws ThingsboardException {
  796 + try {
  797 + validateId(rpcId, "Incorrect rpcId " + rpcId);
  798 + Rpc rpc = rpcService.findById(getCurrentUser().getTenantId(), rpcId);
  799 + checkNotNull(rpc);
  800 + accessControlService.checkPermission(getCurrentUser(), Resource.RPC, operation, rpcId, rpc);
  801 + return rpc;
  802 + } catch (Exception e) {
  803 + throw handleException(e, false);
  804 + }
  805 + }
  806 +
789 @SuppressWarnings("unchecked") 807 @SuppressWarnings("unchecked")
790 protected <I extends EntityId> I emptyId(EntityType entityType) { 808 protected <I extends EntityId> I emptyId(EntityType entityType) {
791 return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID); 809 return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID);
@@ -38,8 +38,10 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; @@ -38,8 +38,10 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
38 import org.thingsboard.server.common.data.exception.ThingsboardException; 38 import org.thingsboard.server.common.data.exception.ThingsboardException;
39 import org.thingsboard.server.common.data.id.DeviceId; 39 import org.thingsboard.server.common.data.id.DeviceId;
40 import org.thingsboard.server.common.data.id.EntityId; 40 import org.thingsboard.server.common.data.id.EntityId;
  41 +import org.thingsboard.server.common.data.id.RpcId;
41 import org.thingsboard.server.common.data.id.TenantId; 42 import org.thingsboard.server.common.data.id.TenantId;
42 import org.thingsboard.server.common.data.id.UUIDBased; 43 import org.thingsboard.server.common.data.id.UUIDBased;
  44 +import org.thingsboard.server.common.data.rpc.Rpc;
43 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; 45 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
44 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 46 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
45 import org.thingsboard.server.queue.util.TbCoreComponent; 47 import org.thingsboard.server.queue.util.TbCoreComponent;
@@ -93,6 +95,19 @@ public class RpcController extends BaseController { @@ -93,6 +95,19 @@ public class RpcController extends BaseController {
93 return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); 95 return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
94 } 96 }
95 97
  98 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  99 + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET)
  100 + @ResponseBody
  101 + public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
  102 + checkParameter("RpcId", strRpc);
  103 + try {
  104 + RpcId rpcId = new RpcId(UUID.fromString(strRpc));
  105 + return checkRpcId(rpcId, Operation.READ);
  106 + } catch (Exception e) {
  107 + throw handleException(e);
  108 + }
  109 + }
  110 +
96 private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { 111 private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
97 try { 112 try {
98 JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); 113 JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
@@ -103,6 +118,7 @@ public class RpcController extends BaseController { @@ -103,6 +118,7 @@ public class RpcController extends BaseController {
103 long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout; 118 long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;
104 long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); 119 long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
105 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); 120 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
  121 + boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean();
106 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { 122 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
107 @Override 123 @Override
108 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { 124 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@@ -111,7 +127,8 @@ public class RpcController extends BaseController { @@ -111,7 +127,8 @@ public class RpcController extends BaseController {
111 deviceId, 127 deviceId,
112 oneWay, 128 oneWay,
113 expTime, 129 expTime,
114 - body 130 + body,
  131 + persisted
115 ); 132 );
116 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser); 133 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser);
117 } 134 }
@@ -54,7 +54,7 @@ public class RuleEngineEntityActionService { @@ -54,7 +54,7 @@ public class RuleEngineEntityActionService {
54 54
55 private static final ObjectMapper json = new ObjectMapper(); 55 private static final ObjectMapper json = new ObjectMapper();
56 56
57 - public void pushEntityActionToRuleEngine(EntityId entityId, HasName entity, TenantId tenantId, CustomerId customerId, 57 + public void pushEntityActionToRuleEngine(EntityId entityId, Object entity, TenantId tenantId, CustomerId customerId,
58 ActionType actionType, User user, Object... additionalInfo) { 58 ActionType actionType, User user, Object... additionalInfo) {
59 String msgType = null; 59 String msgType = null;
60 switch (actionType) { 60 switch (actionType) {
@@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
157 metaData.putValue("originServiceId", serviceId); 157 metaData.putValue("originServiceId", serviceId);
158 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); 158 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
159 metaData.putValue("oneway", Boolean.toString(msg.isOneway())); 159 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
  160 + metaData.putValue("persisted", Boolean.toString(msg.isPersisted()));
160 161
161 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); 162 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
162 if (device != null) { 163 if (device != null) {
@@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
100 @Override 100 @Override
101 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { 101 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
102 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), 102 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
103 - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); 103 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted());
104 forwardRpcRequestToDeviceActor(request, response -> { 104 forwardRpcRequestToDeviceActor(request, response -> {
105 if (src.isRestApiCall()) { 105 if (src.isRestApiCall()) {
106 sendRpcResponseToTbCore(src.getOriginServiceId(), response); 106 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.rpc;
  17 +
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import lombok.RequiredArgsConstructor;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.common.data.audit.ActionType;
  23 +import org.thingsboard.server.common.data.id.DeviceId;
  24 +import org.thingsboard.server.common.data.id.RpcId;
  25 +import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.data.page.PageData;
  27 +import org.thingsboard.server.common.data.page.PageLink;
  28 +import org.thingsboard.server.common.data.rpc.Rpc;
  29 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  30 +import org.thingsboard.server.dao.rpc.RpcService;
  31 +import org.thingsboard.server.queue.util.TbCoreComponent;
  32 +import org.thingsboard.server.service.action.RuleEngineEntityActionService;
  33 +
  34 +@TbCoreComponent
  35 +@Service
  36 +@RequiredArgsConstructor
  37 +@Slf4j
  38 +public class TbRpcService {
  39 + private final RpcService rpcService;
  40 + private final RuleEngineEntityActionService ruleEngineEntityActionService;
  41 +
  42 + public void save(TenantId tenantId, Rpc rpc) {
  43 + Rpc saved = rpcService.save(tenantId, rpc);
  44 + ruleEngineEntityActionService.pushEntityActionToRuleEngine(saved.getId(), saved, tenantId, null, rpc.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
  45 + }
  46 +
  47 + public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) {
  48 + Rpc foundRpc = rpcService.findById(tenantId, rpcId);
  49 + if (foundRpc != null) {
  50 + foundRpc.setStatus(newStatus);
  51 + if (response != null) {
  52 + foundRpc.setResponse(response);
  53 + }
  54 + Rpc saved = rpcService.save(tenantId, foundRpc);
  55 + ruleEngineEntityActionService.pushEntityActionToRuleEngine(saved.getId(), saved, tenantId, null, ActionType.UPDATED, null);
  56 + } else {
  57 + log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId);
  58 + }
  59 + }
  60 +
  61 + public Rpc findRpcById(TenantId tenantId, RpcId rpcId) {
  62 + return rpcService.findById(tenantId, rpcId);
  63 + }
  64 +
  65 + public PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
  66 + return rpcService.findAllByDeviceIdAndStatus(deviceId, rpcStatus, pageLink);
  67 + }
  68 +
  69 +}
@@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId; @@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId;
47 import org.thingsboard.server.common.data.id.EntityIdFactory; 47 import org.thingsboard.server.common.data.id.EntityIdFactory;
48 import org.thingsboard.server.common.data.id.EntityViewId; 48 import org.thingsboard.server.common.data.id.EntityViewId;
49 import org.thingsboard.server.common.data.id.OtaPackageId; 49 import org.thingsboard.server.common.data.id.OtaPackageId;
  50 +import org.thingsboard.server.common.data.id.RpcId;
50 import org.thingsboard.server.common.data.id.RuleChainId; 51 import org.thingsboard.server.common.data.id.RuleChainId;
51 import org.thingsboard.server.common.data.id.RuleNodeId; 52 import org.thingsboard.server.common.data.id.RuleNodeId;
52 import org.thingsboard.server.common.data.id.TbResourceId; 53 import org.thingsboard.server.common.data.id.TbResourceId;
53 import org.thingsboard.server.common.data.id.TenantId; 54 import org.thingsboard.server.common.data.id.TenantId;
54 import org.thingsboard.server.common.data.id.UserId; 55 import org.thingsboard.server.common.data.id.UserId;
  56 +import org.thingsboard.server.common.data.rpc.Rpc;
55 import org.thingsboard.server.common.data.rule.RuleChain; 57 import org.thingsboard.server.common.data.rule.RuleChain;
56 import org.thingsboard.server.common.data.rule.RuleNode; 58 import org.thingsboard.server.common.data.rule.RuleNode;
57 import org.thingsboard.server.controller.HttpValidationCallback; 59 import org.thingsboard.server.controller.HttpValidationCallback;
@@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; @@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
65 import org.thingsboard.server.dao.exception.IncorrectParameterException; 67 import org.thingsboard.server.dao.exception.IncorrectParameterException;
66 import org.thingsboard.server.dao.ota.OtaPackageService; 68 import org.thingsboard.server.dao.ota.OtaPackageService;
67 import org.thingsboard.server.dao.resource.ResourceService; 69 import org.thingsboard.server.dao.resource.ResourceService;
  70 +import org.thingsboard.server.dao.rpc.RpcService;
68 import org.thingsboard.server.dao.rule.RuleChainService; 71 import org.thingsboard.server.dao.rule.RuleChainService;
69 import org.thingsboard.server.dao.tenant.TenantService; 72 import org.thingsboard.server.dao.tenant.TenantService;
70 import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; 73 import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
@@ -137,6 +140,9 @@ public class AccessValidator { @@ -137,6 +140,9 @@ public class AccessValidator {
137 @Autowired 140 @Autowired
138 protected OtaPackageService otaPackageService; 141 protected OtaPackageService otaPackageService;
139 142
  143 + @Autowired
  144 + protected RpcService rpcService;
  145 +
140 private ExecutorService executor; 146 private ExecutorService executor;
141 147
142 @PostConstruct 148 @PostConstruct
@@ -235,6 +241,9 @@ public class AccessValidator { @@ -235,6 +241,9 @@ public class AccessValidator {
235 case OTA_PACKAGE: 241 case OTA_PACKAGE:
236 validateOtaPackage(currentUser, operation, entityId, callback); 242 validateOtaPackage(currentUser, operation, entityId, callback);
237 return; 243 return;
  244 + case RPC:
  245 + validateRpc(currentUser, operation, entityId, callback);
  246 + return;
238 default: 247 default:
239 //TODO: add support of other entities 248 //TODO: add support of other entities
240 throw new IllegalStateException("Not Implemented!"); 249 throw new IllegalStateException("Not Implemented!");
@@ -261,6 +270,22 @@ public class AccessValidator { @@ -261,6 +270,22 @@ public class AccessValidator {
261 } 270 }
262 } 271 }
263 272
  273 + private void validateRpc(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
  274 + ListenableFuture<Rpc> rpcFurure = rpcService.findRpcByIdAsync(currentUser.getTenantId(), new RpcId(entityId.getId()));
  275 + Futures.addCallback(rpcFurure, getCallback(callback, rpc -> {
  276 + if (rpc == null) {
  277 + return ValidationResult.entityNotFound("Rpc with requested id wasn't found!");
  278 + } else {
  279 + try {
  280 + accessControlService.checkPermission(currentUser, Resource.RPC, operation, entityId, rpc);
  281 + } catch (ThingsboardException e) {
  282 + return ValidationResult.accessDenied(e.getMessage());
  283 + }
  284 + return ValidationResult.ok(rpc);
  285 + }
  286 + }), executor);
  287 + }
  288 +
264 private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { 289 private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
265 if (currentUser.isSystemAdmin()) { 290 if (currentUser.isSystemAdmin()) {
266 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 291 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
@@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions { @@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions {
41 put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker); 41 put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker);
42 put(Resource.WIDGET_TYPE, widgetsPermissionChecker); 42 put(Resource.WIDGET_TYPE, widgetsPermissionChecker);
43 put(Resource.EDGE, customerEntityPermissionChecker); 43 put(Resource.EDGE, customerEntityPermissionChecker);
  44 + put(Resource.RPC, rpcPermissionChecker);
44 } 45 }
45 46
46 private static final PermissionChecker customerEntityPermissionChecker = 47 private static final PermissionChecker customerEntityPermissionChecker =
@@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions { @@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions {
138 } 139 }
139 140
140 }; 141 };
  142 +
  143 + private static final PermissionChecker rpcPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) {
  144 +
  145 + @Override
  146 + @SuppressWarnings("unchecked")
  147 + public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) {
  148 + if (!super.hasPermission(user, operation, entityId, entity)) {
  149 + return false;
  150 + }
  151 + if (entity.getTenantId() == null || entity.getTenantId().isNullUid()) {
  152 + return true;
  153 + }
  154 + if (!user.getTenantId().equals(entity.getTenantId())) {
  155 + return false;
  156 + }
  157 + return true;
  158 + }
  159 + };
141 } 160 }
@@ -39,7 +39,8 @@ public enum Resource { @@ -39,7 +39,8 @@ public enum Resource {
39 API_USAGE_STATE(EntityType.API_USAGE_STATE), 39 API_USAGE_STATE(EntityType.API_USAGE_STATE),
40 TB_RESOURCE(EntityType.TB_RESOURCE), 40 TB_RESOURCE(EntityType.TB_RESOURCE),
41 OTA_PACKAGE(EntityType.OTA_PACKAGE), 41 OTA_PACKAGE(EntityType.OTA_PACKAGE),
42 - EDGE(EntityType.EDGE); 42 + EDGE(EntityType.EDGE),
  43 + RPC(EntityType.RPC);
43 44
44 private final EntityType entityType; 45 private final EntityType entityType;
45 46
@@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions { @@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
44 put(Resource.TB_RESOURCE, tbResourcePermissionChecker); 44 put(Resource.TB_RESOURCE, tbResourcePermissionChecker);
45 put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker); 45 put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker);
46 put(Resource.EDGE, tenantEntityPermissionChecker); 46 put(Resource.EDGE, tenantEntityPermissionChecker);
  47 + put(Resource.RPC, tenantEntityPermissionChecker);
47 } 48 }
48 49
49 public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() { 50 public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.rpc;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
  20 +import org.thingsboard.server.common.data.id.RpcId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.common.data.page.PageData;
  23 +import org.thingsboard.server.common.data.page.PageLink;
  24 +import org.thingsboard.server.common.data.rpc.Rpc;
  25 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  26 +
  27 +public interface RpcService {
  28 + Rpc save(TenantId tenantId, Rpc rpc);
  29 +
  30 + void remove(TenantId tenantId, RpcId id);
  31 +
  32 + Rpc findById(TenantId tenantId, RpcId id);
  33 +
  34 + ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id);
  35 +
  36 + PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
  37 +}
@@ -19,5 +19,5 @@ package org.thingsboard.server.common.data; @@ -19,5 +19,5 @@ package org.thingsboard.server.common.data;
19 * @author Andrew Shvayka 19 * @author Andrew Shvayka
20 */ 20 */
21 public enum EntityType { 21 public enum EntityType {
22 - TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE; 22 + TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC;
23 } 23 }
@@ -75,6 +75,8 @@ public class EntityIdFactory { @@ -75,6 +75,8 @@ public class EntityIdFactory {
75 return new OtaPackageId(uuid); 75 return new OtaPackageId(uuid);
76 case EDGE: 76 case EDGE:
77 return new EdgeId(uuid); 77 return new EdgeId(uuid);
  78 + case RPC:
  79 + return new RpcId(uuid);
78 } 80 }
79 throw new IllegalArgumentException("EntityType " + type + " is not supported!"); 81 throw new IllegalArgumentException("EntityType " + type + " is not supported!");
80 } 82 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.id;
  17 +
  18 +import com.fasterxml.jackson.annotation.JsonCreator;
  19 +import com.fasterxml.jackson.annotation.JsonIgnore;
  20 +import com.fasterxml.jackson.annotation.JsonProperty;
  21 +import org.thingsboard.server.common.data.EntityType;
  22 +
  23 +import java.util.UUID;
  24 +
  25 +public final class RpcId extends UUIDBased implements EntityId {
  26 +
  27 + private static final long serialVersionUID = 1L;
  28 +
  29 + @JsonCreator
  30 + public RpcId(@JsonProperty("id") UUID id) {
  31 + super(id);
  32 + }
  33 +
  34 + @JsonIgnore
  35 + @Override
  36 + public EntityType getEntityType() {
  37 + return EntityType.RPC;
  38 + }
  39 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.rpc;
  17 +
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import lombok.Data;
  20 +import lombok.EqualsAndHashCode;
  21 +import org.thingsboard.server.common.data.BaseData;
  22 +import org.thingsboard.server.common.data.HasTenantId;
  23 +import org.thingsboard.server.common.data.id.DeviceId;
  24 +import org.thingsboard.server.common.data.id.RpcId;
  25 +import org.thingsboard.server.common.data.id.TenantId;
  26 +
  27 +@Data
  28 +@EqualsAndHashCode(callSuper = true)
  29 +public class Rpc extends BaseData<RpcId> implements HasTenantId {
  30 + private TenantId tenantId;
  31 + private DeviceId deviceId;
  32 + private long expirationTime;
  33 + private JsonNode request;
  34 + private JsonNode response;
  35 + private RpcStatus status;
  36 +
  37 + public Rpc() {
  38 + super();
  39 + }
  40 +
  41 + public Rpc(RpcId id) {
  42 + super(id);
  43 + }
  44 +
  45 + public Rpc(Rpc rpc) {
  46 + super(rpc);
  47 + this.tenantId = rpc.getTenantId();
  48 + this.deviceId = rpc.getDeviceId();
  49 + this.expirationTime = rpc.getExpirationTime();
  50 + this.request = rpc.getRequest();
  51 + this.response = rpc.getResponse();
  52 + this.status = rpc.getStatus();
  53 + }
  54 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.rpc;
  17 +
  18 +public enum RpcStatus {
  19 + QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
  20 +}
@@ -34,5 +34,6 @@ public class ToDeviceRpcRequest implements Serializable { @@ -34,5 +34,6 @@ public class ToDeviceRpcRequest implements Serializable {
34 private final boolean oneway; 34 private final boolean oneway;
35 private final long expirationTime; 35 private final long expirationTime;
36 private final ToDeviceRpcRequestBody body; 36 private final ToDeviceRpcRequestBody body;
  37 + private final boolean persisted;
37 } 38 }
38 39
@@ -323,6 +323,8 @@ message ToDeviceRpcRequestMsg { @@ -323,6 +323,8 @@ message ToDeviceRpcRequestMsg {
323 int64 expirationTime = 4; 323 int64 expirationTime = 4;
324 int64 requestIdMSB = 5; 324 int64 requestIdMSB = 5;
325 int64 requestIdLSB = 6; 325 int64 requestIdLSB = 6;
  326 + bool oneway = 7;
  327 + bool persisted = 8;
326 } 328 }
327 329
328 message ToDeviceRpcResponseMsg { 330 message ToDeviceRpcResponseMsg {
@@ -330,6 +332,13 @@ message ToDeviceRpcResponseMsg { @@ -330,6 +332,13 @@ message ToDeviceRpcResponseMsg {
330 string payload = 2; 332 string payload = 2;
331 } 333 }
332 334
  335 +message ToDevicePersistedRpcResponseMsg {
  336 + int32 requestId = 1;
  337 + int64 requestIdMSB = 2;
  338 + int64 requestIdLSB = 3;
  339 + string status = 4;
  340 +}
  341 +
333 message ToServerRpcRequestMsg { 342 message ToServerRpcRequestMsg {
334 int32 requestId = 1; 343 int32 requestId = 1;
335 string methodName = 2; 344 string methodName = 2;
@@ -433,6 +442,7 @@ message TransportToDeviceActorMsg { @@ -433,6 +442,7 @@ message TransportToDeviceActorMsg {
433 SubscriptionInfoProto subscriptionInfo = 7; 442 SubscriptionInfoProto subscriptionInfo = 7;
434 ClaimDeviceMsg claimDevice = 8; 443 ClaimDeviceMsg claimDevice = 8;
435 ProvisionDeviceRequestMsg provisionDevice = 9; 444 ProvisionDeviceRequestMsg provisionDevice = 9;
  445 + ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10;
436 } 446 }
437 447
438 message TransportToRuleEngineMsg { 448 message TransportToRuleEngineMsg {
@@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadCon @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadCon
45 import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; 45 import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
46 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; 46 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
47 import org.thingsboard.server.common.data.ota.OtaPackageType; 47 import org.thingsboard.server.common.data.ota.OtaPackageType;
  48 +import org.thingsboard.server.common.data.rpc.RpcStatus;
48 import org.thingsboard.server.common.data.security.DeviceTokenCredentials; 49 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
49 import org.thingsboard.server.common.msg.session.FeatureType; 50 import org.thingsboard.server.common.msg.session.FeatureType;
50 import org.thingsboard.server.common.msg.session.SessionMsgType; 51 import org.thingsboard.server.common.msg.session.SessionMsgType;
@@ -337,14 +338,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -337,14 +338,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
337 break; 338 break;
338 case TO_SERVER_RPC_REQUEST: 339 case TO_SERVER_RPC_REQUEST:
339 transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, 340 transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
340 - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); 341 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
341 transportService.process(sessionInfo, 342 transportService.process(sessionInfo,
342 coapTransportAdaptor.convertToServerRpcRequest(sessionId, request), 343 coapTransportAdaptor.convertToServerRpcRequest(sessionId, request),
343 new CoapNoOpCallback(exchange)); 344 new CoapNoOpCallback(exchange));
344 break; 345 break;
345 case GET_ATTRIBUTES_REQUEST: 346 case GET_ATTRIBUTES_REQUEST:
346 transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, 347 transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
347 - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); 348 + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
348 transportService.process(sessionInfo, 349 transportService.process(sessionInfo,
349 coapTransportAdaptor.convertToGetAttributes(sessionId, request), 350 coapTransportAdaptor.convertToGetAttributes(sessionId, request),
350 new CoapNoOpCallback(exchange)); 351 new CoapNoOpCallback(exchange));
@@ -383,12 +384,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -383,12 +384,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
383 384
384 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { 385 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
385 tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); 386 tokenToSessionInfoMap.putIfAbsent(token, sessionInfo);
386 - transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); 387 + transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
387 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 388 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
388 } 389 }
389 390
390 - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {  
391 - return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); 391 + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
  392 + return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo);
392 } 393 }
393 394
394 private String getTokenFromRequest(Request request) { 395 private String getTokenFromRequest(Request request) {
@@ -510,12 +511,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -510,12 +511,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
510 private final CoapExchange exchange; 511 private final CoapExchange exchange;
511 private final CoapTransportAdaptor coapTransportAdaptor; 512 private final CoapTransportAdaptor coapTransportAdaptor;
512 private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; 513 private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
  514 + private final TransportProtos.SessionInfoProto sessionInfo;
513 515
514 - CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { 516 + CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
515 this.coapTransportResource = coapTransportResource; 517 this.coapTransportResource = coapTransportResource;
516 this.exchange = exchange; 518 this.exchange = exchange;
517 this.coapTransportAdaptor = coapTransportAdaptor; 519 this.coapTransportAdaptor = coapTransportAdaptor;
518 this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; 520 this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
  521 + this.sessionInfo = sessionInfo;
519 } 522 }
520 523
521 @Override 524 @Override
@@ -558,11 +561,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @@ -558,11 +561,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
558 561
559 @Override 562 @Override
560 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { 563 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {
  564 + boolean successful;
561 try { 565 try {
562 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); 566 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
  567 + successful = true;
563 } catch (AdaptorException e) { 568 } catch (AdaptorException e) {
564 log.trace("Failed to reply due to error", e); 569 log.trace("Failed to reply due to error", e);
565 exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); 570 exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  571 + successful = false;
  572 + }
  573 + if (msg.getPersisted()) {
  574 + RpcStatus status;
  575 + if (successful) {
  576 + status = RpcStatus.FAILED;
  577 + } else if (msg.getOneway()) {
  578 + status = RpcStatus.SUCCESSFUL;
  579 + } else {
  580 + status = RpcStatus.DELIVERED;
  581 + }
  582 + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  583 + .setRequestId(msg.getRequestId())
  584 + .setRequestIdLSB(msg.getRequestIdLSB())
  585 + .setRequestIdMSB(msg.getRequestIdMSB())
  586 + .setStatus(status.name())
  587 + .build();
  588 + coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
566 } 589 }
567 } 590 }
568 591
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http; @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http;
17 17
18 import com.google.gson.JsonObject; 18 import com.google.gson.JsonObject;
19 import com.google.gson.JsonParser; 19 import com.google.gson.JsonParser;
  20 +import lombok.RequiredArgsConstructor;
20 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 23 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam; @@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam;
34 import org.springframework.web.bind.annotation.RestController; 35 import org.springframework.web.bind.annotation.RestController;
35 import org.springframework.web.context.request.async.DeferredResult; 36 import org.springframework.web.context.request.async.DeferredResult;
36 import org.thingsboard.server.common.data.DeviceTransportType; 37 import org.thingsboard.server.common.data.DeviceTransportType;
37 -import org.thingsboard.server.common.data.ota.OtaPackageType;  
38 import org.thingsboard.server.common.data.TbTransportService; 38 import org.thingsboard.server.common.data.TbTransportService;
39 import org.thingsboard.server.common.data.id.DeviceId; 39 import org.thingsboard.server.common.data.id.DeviceId;
  40 +import org.thingsboard.server.common.data.ota.OtaPackageType;
  41 +import org.thingsboard.server.common.data.rpc.RpcStatus;
40 import org.thingsboard.server.common.transport.SessionMsgListener; 42 import org.thingsboard.server.common.transport.SessionMsgListener;
41 import org.thingsboard.server.common.transport.TransportContext; 43 import org.thingsboard.server.common.transport.TransportContext;
42 import org.thingsboard.server.common.transport.TransportService; 44 import org.thingsboard.server.common.transport.TransportService;
@@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService { @@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService {
95 request.addAllSharedAttributeNames(sharedKeySet); 97 request.addAllSharedAttributeNames(sharedKeySet);
96 } 98 }
97 TransportService transportService = transportContext.getTransportService(); 99 TransportService transportService = transportContext.getTransportService();
98 - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); 100 + transportService.registerSyncSession(sessionInfo,
  101 + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
  102 + transportContext.getDefaultTimeout());
99 transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); 103 transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
100 })); 104 }));
101 return responseWriter; 105 return responseWriter;
@@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService { @@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService {
151 transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), 155 transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
152 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { 156 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
153 TransportService transportService = transportContext.getTransportService(); 157 TransportService transportService = transportContext.getTransportService();
154 - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), 158 + transportService.registerSyncSession(sessionInfo,
  159 + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
155 timeout == 0 ? transportContext.getDefaultTimeout() : timeout); 160 timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
156 transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(), 161 transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
157 new SessionCloseOnErrorCallback(transportService, sessionInfo)); 162 new SessionCloseOnErrorCallback(transportService, sessionInfo));
@@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService { @@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService {
181 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { 186 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
182 JsonObject request = new JsonParser().parse(json).getAsJsonObject(); 187 JsonObject request = new JsonParser().parse(json).getAsJsonObject();
183 TransportService transportService = transportContext.getTransportService(); 188 TransportService transportService = transportContext.getTransportService();
184 - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); 189 + transportService.registerSyncSession(sessionInfo,
  190 + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
  191 + transportContext.getDefaultTimeout());
185 transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0) 192 transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
186 .setMethodName(request.get("method").getAsString()) 193 .setMethodName(request.get("method").getAsString())
187 .setParams(request.get("params").toString()).build(), 194 .setParams(request.get("params").toString()).build(),
@@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService { @@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService {
198 transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), 205 transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
199 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { 206 new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
200 TransportService transportService = transportContext.getTransportService(); 207 TransportService transportService = transportContext.getTransportService();
201 - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), 208 + transportService.registerSyncSession(sessionInfo,
  209 + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
202 timeout == 0 ? transportContext.getDefaultTimeout() : timeout); 210 timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
203 transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(), 211 transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
204 new SessionCloseOnErrorCallback(transportService, sessionInfo)); 212 new SessionCloseOnErrorCallback(transportService, sessionInfo));
@@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService { @@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService {
372 } 380 }
373 } 381 }
374 382
  383 + @RequiredArgsConstructor
375 private static class HttpSessionListener implements SessionMsgListener { 384 private static class HttpSessionListener implements SessionMsgListener {
376 385
377 private final DeferredResult<ResponseEntity> responseWriter; 386 private final DeferredResult<ResponseEntity> responseWriter;
378 -  
379 - HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) {  
380 - this.responseWriter = responseWriter;  
381 - } 387 + private final TransportService transportService;
  388 + private final SessionInfoProto sessionInfo;
382 389
383 @Override 390 @Override
384 public void onGetAttributesResponse(GetAttributeResponseMsg msg) { 391 public void onGetAttributesResponse(GetAttributeResponseMsg msg) {
@@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService { @@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService {
399 @Override 406 @Override
400 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { 407 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
401 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); 408 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
  409 + if (msg.getPersisted()) {
  410 + RpcStatus status;
  411 + if (msg.getOneway()) {
  412 + status = RpcStatus.SUCCESSFUL;
  413 + } else {
  414 + status = RpcStatus.DELIVERED;
  415 + }
  416 + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  417 + .setRequestId(msg.getRequestId())
  418 + .setRequestIdLSB(msg.getRequestIdLSB())
  419 + .setRequestIdMSB(msg.getRequestIdMSB())
  420 + .setStatus(status.name())
  421 + .build();
  422 + transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
  423 + }
402 } 424 }
403 425
404 @Override 426 @Override
@@ -161,7 +161,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { @@ -161,7 +161,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
161 LwM2MServerBootstrap profileLwm2mServer = mapper.readValue(bootstrapObject.get(LWM2M_SERVER).toString(), LwM2MServerBootstrap.class); 161 LwM2MServerBootstrap profileLwm2mServer = mapper.readValue(bootstrapObject.get(LWM2M_SERVER).toString(), LwM2MServerBootstrap.class);
162 UUID sessionUUiD = UUID.randomUUID(); 162 UUID sessionUUiD = UUID.randomUUID();
163 TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits()); 163 TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits());
164 - context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, sessionInfo)); 164 + context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, sessionInfo, context.getTransportService()));
165 if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) { 165 if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) {
166 lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap); 166 lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap);
167 lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer); 167 lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer);
@@ -188,7 +188,7 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler @@ -188,7 +188,7 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
188 if (lwM2MClient != null) { 188 if (lwM2MClient != null) {
189 SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(lwM2MClient); 189 SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(lwM2MClient);
190 if (sessionInfo != null) { 190 if (sessionInfo != null) {
191 - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo)); 191 + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo, transportService));
192 TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder() 192 TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
193 .setSessionInfo(sessionInfo) 193 .setSessionInfo(sessionInfo)
194 .setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN)) 194 .setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN))
@@ -1320,7 +1320,7 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler @@ -1320,7 +1320,7 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
1320 */ 1320 */
1321 private void reportActivityAndRegister(SessionInfoProto sessionInfo) { 1321 private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
1322 if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { 1322 if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
1323 - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo)); 1323 + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo, transportService));
1324 this.reportActivitySubscription(sessionInfo); 1324 this.reportActivitySubscription(sessionInfo);
1325 } 1325 }
1326 } 1326 }
@@ -17,12 +17,16 @@ package org.thingsboard.server.transport.lwm2m.server; @@ -17,12 +17,16 @@ package org.thingsboard.server.transport.lwm2m.server;
17 17
18 import io.netty.util.concurrent.Future; 18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.GenericFutureListener; 19 import io.netty.util.concurrent.GenericFutureListener;
  20 +import lombok.RequiredArgsConstructor;
20 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
21 import org.jetbrains.annotations.NotNull; 22 import org.jetbrains.annotations.NotNull;
22 import org.thingsboard.server.common.data.Device; 23 import org.thingsboard.server.common.data.Device;
23 import org.thingsboard.server.common.data.DeviceProfile; 24 import org.thingsboard.server.common.data.DeviceProfile;
24 import org.thingsboard.server.common.data.ResourceType; 25 import org.thingsboard.server.common.data.ResourceType;
  26 +import org.thingsboard.server.common.data.rpc.RpcStatus;
25 import org.thingsboard.server.common.transport.SessionMsgListener; 27 import org.thingsboard.server.common.transport.SessionMsgListener;
  28 +import org.thingsboard.server.common.transport.TransportService;
  29 +import org.thingsboard.server.common.transport.TransportServiceCallback;
26 import org.thingsboard.server.gen.transport.TransportProtos; 30 import org.thingsboard.server.gen.transport.TransportProtos;
27 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 31 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; 32 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@@ -35,14 +39,11 @@ import java.util.Optional; @@ -35,14 +39,11 @@ import java.util.Optional;
35 import java.util.UUID; 39 import java.util.UUID;
36 40
37 @Slf4j 41 @Slf4j
  42 +@RequiredArgsConstructor
38 public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? super Void>>, SessionMsgListener { 43 public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
39 - private DefaultLwM2MTransportMsgHandler handler;  
40 - private TransportProtos.SessionInfoProto sessionInfo;  
41 -  
42 - public LwM2mSessionMsgListener(DefaultLwM2MTransportMsgHandler handler, TransportProtos.SessionInfoProto sessionInfo) {  
43 - this.handler = handler;  
44 - this.sessionInfo = sessionInfo;  
45 - } 44 + private final DefaultLwM2MTransportMsgHandler handler;
  45 + private final TransportProtos.SessionInfoProto sessionInfo;
  46 + private final TransportService transportService;
46 47
47 @Override 48 @Override
48 public void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse) { 49 public void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse) {
@@ -52,7 +53,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -52,7 +53,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
52 @Override 53 @Override
53 public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) { 54 public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) {
54 this.handler.onAttributeUpdate(attributeUpdateNotification, this.sessionInfo); 55 this.handler.onAttributeUpdate(attributeUpdateNotification, this.sessionInfo);
55 - } 56 + }
56 57
57 @Override 58 @Override
58 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) { 59 public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) {
@@ -76,7 +77,22 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -76,7 +77,22 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
76 77
77 @Override 78 @Override
78 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 79 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
79 - this.handler.onToDeviceRpcRequest(toDeviceRequest,this.sessionInfo); 80 + this.handler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
  81 + if (toDeviceRequest.getPersisted()) {
  82 + RpcStatus status;
  83 + if (toDeviceRequest.getOneway()) {
  84 + status = RpcStatus.SUCCESSFUL;
  85 + } else {
  86 + status = RpcStatus.DELIVERED;
  87 + }
  88 + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  89 + .setRequestId(toDeviceRequest.getRequestId())
  90 + .setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
  91 + .setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
  92 + .setStatus(status.name())
  93 + .build();
  94 + transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
  95 + }
80 } 96 }
81 97
82 @Override 98 @Override
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt; @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt;
17 17
18 import com.fasterxml.jackson.databind.JsonNode; 18 import com.fasterxml.jackson.databind.JsonNode;
19 import com.google.gson.JsonParseException; 19 import com.google.gson.JsonParseException;
  20 +import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext; 21 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelInboundHandlerAdapter; 22 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import io.netty.handler.codec.mqtt.MqttConnAckMessage; 23 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
@@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
47 import org.thingsboard.server.common.data.DeviceTransportType; 48 import org.thingsboard.server.common.data.DeviceTransportType;
48 import org.thingsboard.server.common.data.TransportPayloadType; 49 import org.thingsboard.server.common.data.TransportPayloadType;
49 import org.thingsboard.server.common.data.device.profile.MqttTopics; 50 import org.thingsboard.server.common.data.device.profile.MqttTopics;
50 -import org.thingsboard.server.common.data.ota.OtaPackageType;  
51 import org.thingsboard.server.common.data.id.OtaPackageId; 51 import org.thingsboard.server.common.data.id.OtaPackageId;
  52 +import org.thingsboard.server.common.data.ota.OtaPackageType;
  53 +import org.thingsboard.server.common.data.rpc.RpcStatus;
52 import org.thingsboard.server.common.msg.EncryptionUtil; 54 import org.thingsboard.server.common.msg.EncryptionUtil;
53 import org.thingsboard.server.common.msg.tools.TbRateLimitsException; 55 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
54 import org.thingsboard.server.common.transport.SessionMsgListener; 56 import org.thingsboard.server.common.transport.SessionMsgListener;
@@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
813 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { 815 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
814 log.trace("[{}] Received RPC command to device", sessionId); 816 log.trace("[{}] Received RPC command to device", sessionId);
815 try { 817 try {
816 - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); 818 + deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)
  819 + .ifPresent(payload -> {
  820 + ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload);
  821 + if (rpcRequest.getPersisted()) {
  822 + channelFuture.addListener(future -> {
  823 + RpcStatus status;
  824 + Throwable t = future.cause();
  825 + if (t != null) {
  826 + log.error("Failed delivering RPC command to device!", t);
  827 + status = RpcStatus.FAILED;
  828 + } else if (rpcRequest.getOneway()) {
  829 + status = RpcStatus.SUCCESSFUL;
  830 + } else {
  831 + status = RpcStatus.DELIVERED;
  832 + }
  833 + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  834 + .setRequestId(rpcRequest.getRequestId())
  835 + .setRequestIdLSB(rpcRequest.getRequestIdLSB())
  836 + .setRequestIdMSB(rpcRequest.getRequestIdMSB())
  837 + .setStatus(status.name())
  838 + .build();
  839 + transportService.process(deviceSessionCtx.getSessionInfo(), msg, TransportServiceCallback.EMPTY);
  840 + });
  841 + }
  842 + });
817 } catch (Exception e) { 843 } catch (Exception e) {
818 log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); 844 log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
819 } 845 }
@@ -15,9 +15,13 @@ @@ -15,9 +15,13 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
  18 +import io.netty.channel.ChannelFuture;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.common.data.DeviceProfile; 20 import org.thingsboard.server.common.data.DeviceProfile;
  21 +import org.thingsboard.server.common.data.rpc.RpcStatus;
20 import org.thingsboard.server.common.transport.SessionMsgListener; 22 import org.thingsboard.server.common.transport.SessionMsgListener;
  23 +import org.thingsboard.server.common.transport.TransportService;
  24 +import org.thingsboard.server.common.transport.TransportServiceCallback;
21 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; 25 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
22 import org.thingsboard.server.gen.transport.TransportProtos; 26 import org.thingsboard.server.gen.transport.TransportProtos;
23 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; 27 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap; @@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap;
32 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { 36 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
33 37
34 private final GatewaySessionHandler parent; 38 private final GatewaySessionHandler parent;
  39 + private final TransportService transportService;
35 40
36 public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, 41 public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
37 - DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { 42 + DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
  43 + TransportService transportService) {
38 super(UUID.randomUUID(), mqttQoSMap); 44 super(UUID.randomUUID(), mqttQoSMap);
39 this.parent = parent; 45 this.parent = parent;
40 setSessionInfo(SessionInfoProto.newBuilder() 46 setSessionInfo(SessionInfoProto.newBuilder()
@@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
56 .build()); 62 .build());
57 setDeviceInfo(deviceInfo); 63 setDeviceInfo(deviceInfo);
58 setDeviceProfile(deviceProfile); 64 setDeviceProfile(deviceProfile);
  65 + this.transportService = transportService;
59 } 66 }
60 67
61 @Override 68 @Override
@@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
89 @Override 96 @Override
90 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { 97 public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
91 try { 98 try {
92 - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush); 99 + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
  100 + payload -> {
  101 + ChannelFuture channelFuture = parent.writeAndFlush(payload);
  102 + if (request.getPersisted()) {
  103 + channelFuture.addListener(future -> {
  104 + RpcStatus status;
  105 + Throwable t = future.cause();
  106 + if (t != null) {
  107 + log.error("Failed delivering RPC command to device!", t);
  108 + status = RpcStatus.FAILED;
  109 + } else if (request.getOneway()) {
  110 + status = RpcStatus.SUCCESSFUL;
  111 + } else {
  112 + status = RpcStatus.DELIVERED;
  113 + }
  114 + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  115 + .setRequestId(request.getRequestId())
  116 + .setRequestIdLSB(request.getRequestIdLSB())
  117 + .setRequestIdMSB(request.getRequestIdMSB())
  118 + .setStatus(status.name())
  119 + .build();
  120 + transportService.process(getSessionInfo(), msg, TransportServiceCallback.EMPTY);
  121 + });
  122 + }
  123 + }
  124 + );
93 } catch (Exception e) { 125 } catch (Exception e) {
94 log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); 126 log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
95 } 127 }
@@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException; @@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException;
28 import com.google.protobuf.InvalidProtocolBufferException; 28 import com.google.protobuf.InvalidProtocolBufferException;
29 import com.google.protobuf.ProtocolStringList; 29 import com.google.protobuf.ProtocolStringList;
30 import io.netty.buffer.ByteBuf; 30 import io.netty.buffer.ByteBuf;
  31 +import io.netty.channel.ChannelFuture;
31 import io.netty.channel.ChannelHandlerContext; 32 import io.netty.channel.ChannelHandlerContext;
32 import io.netty.handler.codec.mqtt.MqttMessage; 33 import io.netty.handler.codec.mqtt.MqttMessage;
33 import io.netty.handler.codec.mqtt.MqttPublishMessage; 34 import io.netty.handler.codec.mqtt.MqttPublishMessage;
@@ -188,8 +189,8 @@ public class GatewaySessionHandler { @@ -188,8 +189,8 @@ public class GatewaySessionHandler {
188 } 189 }
189 } 190 }
190 191
191 - void writeAndFlush(MqttMessage mqttMessage) {  
192 - channel.writeAndFlush(mqttMessage); 192 + ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
  193 + return channel.writeAndFlush(mqttMessage);
193 } 194 }
194 195
195 int nextMsgId() { 196 int nextMsgId() {
@@ -251,7 +252,7 @@ public class GatewaySessionHandler { @@ -251,7 +252,7 @@ public class GatewaySessionHandler {
251 new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() { 252 new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
252 @Override 253 @Override
253 public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { 254 public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
254 - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap); 255 + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
255 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { 256 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
256 log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); 257 log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
257 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); 258 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
@@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; 26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; 27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
28 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.rpc.RpcStatus;
29 import org.thingsboard.server.common.transport.SessionMsgListener; 30 import org.thingsboard.server.common.transport.SessionMsgListener;
  31 +import org.thingsboard.server.common.transport.TransportServiceCallback;
30 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 32 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
31 import org.thingsboard.server.gen.transport.TransportProtos; 33 import org.thingsboard.server.gen.transport.TransportProtos;
32 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 34 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
@@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
139 @Override 141 @Override
140 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { 142 public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
141 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); 143 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
  144 + if (toDeviceRequest.getPersisted()) {
  145 + RpcStatus status;
  146 + if (toDeviceRequest.getOneway()) {
  147 + status = RpcStatus.SUCCESSFUL;
  148 + } else {
  149 + status = RpcStatus.DELIVERED;
  150 + }
  151 + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
  152 + .setRequestId(toDeviceRequest.getRequestId())
  153 + .setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
  154 + .setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
  155 + .setStatus(status.name())
  156 + .build();
  157 + snmpTransportContext.getTransportService().process(getSessionInfo(), responseMsg, TransportServiceCallback.EMPTY);
  158 + }
142 } 159 }
143 160
144 @Override 161 @Override
@@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType; @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; 20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22 import org.thingsboard.server.common.transport.service.SessionMetaData; 22 import org.thingsboard.server.common.transport.service.SessionMetaData;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
23 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
24 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 26 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@@ -109,6 +110,8 @@ public interface TransportService { @@ -109,6 +110,8 @@ public interface TransportService {
109 110
110 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback); 111 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
111 112
  113 + void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback);
  114 +
112 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback); 115 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
113 116
114 void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback); 117 void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
@@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService { @@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService {
557 } 557 }
558 } 558 }
559 559
  560 + @Override
  561 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  562 + if (checkLimits(sessionInfo, msg, callback)) {
  563 + reportActivityInternal(sessionInfo);
  564 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(msg).build(),
  565 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
  566 + }
  567 + }
  568 +
560 private void processTimeout(String requestId) { 569 private void processTimeout(String requestId) {
561 RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); 570 RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
562 if (data != null) { 571 if (data != null) {
@@ -497,6 +497,17 @@ public class ModelConstants { @@ -497,6 +497,17 @@ public class ModelConstants {
497 public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY; 497 public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY;
498 498
499 /** 499 /**
  500 + * Persisted RPC constants.
  501 + */
  502 + public static final String RPC_TABLE_NAME = "rpc";
  503 + public static final String RPC_TENANT_ID_COLUMN = TENANT_ID_COLUMN;
  504 + public static final String RPC_DEVICE_ID = "device_id";
  505 + public static final String RPC_EXPIRATION_TIME = "expiration_time";
  506 + public static final String RPC_REQUEST = "request";
  507 + public static final String RPC_RESPONSE = "response";
  508 + public static final String RPC_STATUS = "status";
  509 +
  510 + /**
500 * Edge constants. 511 * Edge constants.
501 */ 512 */
502 public static final String EDGE_COLUMN_FAMILY_NAME = "edge"; 513 public static final String EDGE_COLUMN_FAMILY_NAME = "edge";
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.model.sql;
  17 +
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import lombok.Data;
  20 +import lombok.EqualsAndHashCode;
  21 +import org.hibernate.annotations.Type;
  22 +import org.hibernate.annotations.TypeDef;
  23 +import org.thingsboard.server.common.data.id.DeviceId;
  24 +import org.thingsboard.server.common.data.id.RpcId;
  25 +import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.data.rpc.Rpc;
  27 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  28 +import org.thingsboard.server.dao.model.BaseEntity;
  29 +import org.thingsboard.server.dao.model.BaseSqlEntity;
  30 +import org.thingsboard.server.dao.util.mapping.JsonStringType;
  31 +
  32 +import javax.persistence.Column;
  33 +import javax.persistence.Entity;
  34 +import javax.persistence.EnumType;
  35 +import javax.persistence.Enumerated;
  36 +import javax.persistence.Table;
  37 +import java.util.UUID;
  38 +
  39 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID;
  40 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME;
  41 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST;
  42 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_RESPONSE;
  43 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_STATUS;
  44 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TABLE_NAME;
  45 +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TENANT_ID_COLUMN;
  46 +
  47 +@Data
  48 +@EqualsAndHashCode(callSuper = true)
  49 +@Entity
  50 +@TypeDef(name = "json", typeClass = JsonStringType.class)
  51 +@Table(name = RPC_TABLE_NAME)
  52 +public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
  53 +
  54 + @Column(name = RPC_TENANT_ID_COLUMN)
  55 + private UUID tenantId;
  56 +
  57 + @Column(name = RPC_DEVICE_ID)
  58 + private UUID deviceId;
  59 +
  60 + @Column(name = RPC_EXPIRATION_TIME)
  61 + private long expirationTime;
  62 +
  63 + @Type(type = "json")
  64 + @Column(name = RPC_REQUEST)
  65 + private JsonNode request;
  66 +
  67 + @Type(type = "json")
  68 + @Column(name = RPC_RESPONSE)
  69 + private JsonNode response;
  70 +
  71 + @Enumerated(EnumType.STRING)
  72 + @Column(name = RPC_STATUS)
  73 + private RpcStatus status;
  74 +
  75 + public RpcEntity() {
  76 + super();
  77 + }
  78 +
  79 + public RpcEntity(Rpc rpc) {
  80 + this.setUuid(rpc.getUuidId());
  81 + this.createdTime = rpc.getCreatedTime();
  82 + this.tenantId = rpc.getTenantId().getId();
  83 + this.deviceId = rpc.getDeviceId().getId();
  84 + this.expirationTime = rpc.getExpirationTime();
  85 + this.request = rpc.getRequest();
  86 + this.response = rpc.getResponse();
  87 + this.status = rpc.getStatus();
  88 + }
  89 +
  90 + @Override
  91 + public Rpc toData() {
  92 + Rpc rpc = new Rpc(new RpcId(id));
  93 + rpc.setCreatedTime(createdTime);
  94 + rpc.setTenantId(new TenantId(tenantId));
  95 + rpc.setDeviceId(new DeviceId(deviceId));
  96 + rpc.setExpirationTime(expirationTime);
  97 + rpc.setRequest(request);
  98 + rpc.setResponse(response);
  99 + rpc.setStatus(status);
  100 + return rpc;
  101 + }
  102 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.rpc;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import lombok.RequiredArgsConstructor;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.RpcId;
  24 +import org.thingsboard.server.common.data.id.TenantId;
  25 +import org.thingsboard.server.common.data.page.PageData;
  26 +import org.thingsboard.server.common.data.page.PageLink;
  27 +import org.thingsboard.server.common.data.rpc.Rpc;
  28 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  29 +
  30 +@Service
  31 +@Slf4j
  32 +@RequiredArgsConstructor
  33 +public class BaseRpcService implements RpcService {
  34 + private final RpcDao rpcDao;
  35 +
  36 + @Override
  37 + public Rpc save(TenantId tenantId, Rpc rpc) {
  38 + return rpcDao.save(tenantId, rpc);
  39 + }
  40 +
  41 + @Override
  42 + public void remove(TenantId tenantId, RpcId id) {
  43 + rpcDao.removeById(tenantId, id.getId());
  44 + }
  45 +
  46 + @Override
  47 + public Rpc findById(TenantId tenantId, RpcId id) {
  48 + return rpcDao.findById(tenantId, id.getId());
  49 + }
  50 +
  51 + @Override
  52 + public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id) {
  53 + return rpcDao.findByIdAsync(tenantId, id.getId());
  54 + }
  55 +
  56 + @Override
  57 + public PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
  58 + return rpcDao.findAllByDeviceId(deviceId, rpcStatus, pageLink);
  59 + }
  60 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.rpc;
  17 +
  18 +import org.thingsboard.server.common.data.id.DeviceId;
  19 +import org.thingsboard.server.common.data.page.PageData;
  20 +import org.thingsboard.server.common.data.page.PageLink;
  21 +import org.thingsboard.server.common.data.rpc.Rpc;
  22 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  23 +import org.thingsboard.server.dao.Dao;
  24 +
  25 +public interface RpcDao extends Dao<Rpc> {
  26 + PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
  27 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql.rpc;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.data.repository.CrudRepository;
  21 +import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.page.PageData;
  24 +import org.thingsboard.server.common.data.page.PageLink;
  25 +import org.thingsboard.server.common.data.rpc.Rpc;
  26 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  27 +import org.thingsboard.server.dao.DaoUtil;
  28 +import org.thingsboard.server.dao.model.sql.RpcEntity;
  29 +import org.thingsboard.server.dao.rpc.RpcDao;
  30 +import org.thingsboard.server.dao.sql.JpaAbstractDao;
  31 +
  32 +import java.util.UUID;
  33 +
  34 +@Slf4j
  35 +@Component
  36 +@AllArgsConstructor
  37 +public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao {
  38 +
  39 + private final RpcRepository rpcRepository;
  40 +
  41 + @Override
  42 + protected Class<RpcEntity> getEntityClass() {
  43 + return RpcEntity.class;
  44 + }
  45 +
  46 + @Override
  47 + protected CrudRepository<RpcEntity, UUID> getCrudRepository() {
  48 + return rpcRepository;
  49 + }
  50 +
  51 +
  52 + @Override
  53 + public PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
  54 + return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
  55 + }
  56 +}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql.rpc;
  17 +
  18 +import org.springframework.data.domain.Page;
  19 +import org.springframework.data.domain.Pageable;
  20 +import org.springframework.data.repository.CrudRepository;
  21 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  22 +import org.thingsboard.server.dao.model.sql.RpcEntity;
  23 +
  24 +import java.util.UUID;
  25 +
  26 +public interface RpcRepository extends CrudRepository<RpcEntity, UUID> {
  27 + Page<RpcEntity> findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable);
  28 +}
@@ -503,3 +503,14 @@ CREATE TABLE IF NOT EXISTS edge_event ( @@ -503,3 +503,14 @@ CREATE TABLE IF NOT EXISTS edge_event (
503 tenant_id uuid, 503 tenant_id uuid,
504 ts bigint NOT NULL 504 ts bigint NOT NULL
505 ); 505 );
  506 +
  507 +CREATE TABLE IF NOT EXISTS rpc (
  508 + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
  509 + created_time bigint NOT NULL,
  510 + tenant_id uuid NOT NULL,
  511 + device_id uuid NOT NULL,
  512 + expiration_time bigint NOT NULL,
  513 + request varchar(10000000) NOT NULL,
  514 + response varchar(10000000),
  515 + status varchar(255) NOT NULL
  516 +);
@@ -541,6 +541,17 @@ CREATE TABLE IF NOT EXISTS edge_event ( @@ -541,6 +541,17 @@ CREATE TABLE IF NOT EXISTS edge_event (
541 ts bigint NOT NULL 541 ts bigint NOT NULL
542 ); 542 );
543 543
  544 +CREATE TABLE IF NOT EXISTS rpc (
  545 + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
  546 + created_time bigint NOT NULL,
  547 + tenant_id uuid NOT NULL,
  548 + device_id uuid NOT NULL,
  549 + expiration_time bigint NOT NULL,
  550 + request varchar(10000000) NOT NULL,
  551 + response varchar(10000000),
  552 + status varchar(255) NOT NULL
  553 +);
  554 +
544 CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint) 555 CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
545 LANGUAGE plpgsql AS 556 LANGUAGE plpgsql AS
546 $$ 557 $$
@@ -32,4 +32,5 @@ DROP TABLE IF EXISTS resource; @@ -32,4 +32,5 @@ DROP TABLE IF EXISTS resource;
32 DROP TABLE IF EXISTS ota_package; 32 DROP TABLE IF EXISTS ota_package;
33 DROP TABLE IF EXISTS edge; 33 DROP TABLE IF EXISTS edge;
34 DROP TABLE IF EXISTS edge_event; 34 DROP TABLE IF EXISTS edge_event;
  35 +DROP TABLE IF EXISTS rpc;
35 DROP FUNCTION IF EXISTS to_uuid; 36 DROP FUNCTION IF EXISTS to_uuid;
@@ -33,3 +33,4 @@ DROP TABLE IF EXISTS resource; @@ -33,3 +33,4 @@ DROP TABLE IF EXISTS resource;
33 DROP TABLE IF EXISTS firmware; 33 DROP TABLE IF EXISTS firmware;
34 DROP TABLE IF EXISTS edge; 34 DROP TABLE IF EXISTS edge;
35 DROP TABLE IF EXISTS edge_event; 35 DROP TABLE IF EXISTS edge_event;
  36 +DROP TABLE IF EXISTS rpc;
@@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest { @@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest {
35 private final UUID requestUUID; 35 private final UUID requestUUID;
36 private final String originServiceId; 36 private final String originServiceId;
37 private final boolean oneway; 37 private final boolean oneway;
  38 + private final boolean persisted;
38 private final String method; 39 private final String method;
39 private final String body; 40 private final String body;
40 private final long expirationTime; 41 private final long expirationTime;
@@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode { @@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode {
81 tmp = msg.getMetaData().getValue("oneway"); 81 tmp = msg.getMetaData().getValue("oneway");
82 boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); 82 boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
83 83
  84 + tmp = msg.getMetaData().getValue("persisted");
  85 + boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
  86 +
84 tmp = msg.getMetaData().getValue("requestUUID"); 87 tmp = msg.getMetaData().getValue("requestUUID");
85 UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased(); 88 UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased();
86 tmp = msg.getMetaData().getValue("originServiceId"); 89 tmp = msg.getMetaData().getValue("originServiceId");
@@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode {
108 .originServiceId(originServiceId) 111 .originServiceId(originServiceId)
109 .expirationTime(expirationTime) 112 .expirationTime(expirationTime)
110 .restApiCall(restApiCall) 113 .restApiCall(restApiCall)
  114 + .persisted(persisted)
111 .build(); 115 .build();
112 116
113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { 117 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {