Commit ba507e99100db6ab503c6b08871a0e7c7712e898
Merge branch 'master' of https://github.com/thingsboard/thingsboard into feature/power-mode
Showing
57 changed files
with
1204 additions
and
89 deletions
... | ... | @@ -197,3 +197,17 @@ $$; |
197 | 197 | ALTER TABLE api_usage_state |
198 | 198 | ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32); |
199 | 199 | UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL; |
200 | + | |
201 | +CREATE TABLE IF NOT EXISTS rpc ( | |
202 | + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, | |
203 | + created_time bigint NOT NULL, | |
204 | + tenant_id uuid NOT NULL, | |
205 | + device_id uuid NOT NULL, | |
206 | + expiration_time bigint NOT NULL, | |
207 | + request varchar(10000000) NOT NULL, | |
208 | + response varchar(10000000), | |
209 | + status varchar(255) NOT NULL | |
210 | +); | |
211 | + | |
212 | +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); | |
213 | + | ... | ... |
... | ... | @@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService; |
65 | 65 | import org.thingsboard.server.dao.resource.ResourceService; |
66 | 66 | import org.thingsboard.server.dao.rule.RuleChainService; |
67 | 67 | import org.thingsboard.server.dao.rule.RuleNodeStateService; |
68 | +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; | |
68 | 69 | import org.thingsboard.server.dao.tenant.TenantProfileService; |
69 | 70 | import org.thingsboard.server.dao.tenant.TenantService; |
70 | 71 | import org.thingsboard.server.dao.timeseries.TimeseriesService; |
... | ... | @@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService; |
80 | 81 | import org.thingsboard.server.service.executors.SharedEventLoopGroupService; |
81 | 82 | import org.thingsboard.server.service.mail.MailExecutorService; |
82 | 83 | import org.thingsboard.server.service.profile.TbDeviceProfileCache; |
83 | -import org.thingsboard.server.dao.tenant.TbTenantProfileCache; | |
84 | 84 | import org.thingsboard.server.service.queue.TbClusterService; |
85 | 85 | import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; |
86 | +import org.thingsboard.server.service.rpc.TbRpcService; | |
86 | 87 | import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; |
87 | 88 | import org.thingsboard.server.service.script.JsExecutorService; |
88 | 89 | import org.thingsboard.server.service.script.JsInvokeService; |
... | ... | @@ -303,23 +304,33 @@ public class ActorSystemContext { |
303 | 304 | |
304 | 305 | @Lazy |
305 | 306 | @Autowired(required = false) |
306 | - @Getter private EdgeService edgeService; | |
307 | + @Getter | |
308 | + private EdgeService edgeService; | |
307 | 309 | |
308 | 310 | @Lazy |
309 | 311 | @Autowired(required = false) |
310 | - @Getter private EdgeEventService edgeEventService; | |
312 | + @Getter | |
313 | + private EdgeEventService edgeEventService; | |
311 | 314 | |
312 | 315 | @Lazy |
313 | 316 | @Autowired(required = false) |
314 | - @Getter private EdgeRpcService edgeRpcService; | |
317 | + @Getter | |
318 | + private EdgeRpcService edgeRpcService; | |
315 | 319 | |
316 | 320 | @Lazy |
317 | 321 | @Autowired(required = false) |
318 | - @Getter private ResourceService resourceService; | |
322 | + @Getter | |
323 | + private ResourceService resourceService; | |
319 | 324 | |
320 | 325 | @Lazy |
321 | 326 | @Autowired(required = false) |
322 | - @Getter private OtaPackageService otaPackageService; | |
327 | + @Getter | |
328 | + private OtaPackageService otaPackageService; | |
329 | + | |
330 | + @Lazy | |
331 | + @Autowired(required = false) | |
332 | + @Getter | |
333 | + private TbRpcService tbRpcService; | |
323 | 334 | |
324 | 335 | @Value("${actors.session.max_concurrent_sessions_per_device:1}") |
325 | 336 | @Getter | ... | ... |
... | ... | @@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor { |
46 | 46 | super.init(ctx); |
47 | 47 | log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); |
48 | 48 | try { |
49 | - processor.initSessionTimeout(ctx); | |
49 | + processor.init(ctx); | |
50 | 50 | log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); |
51 | 51 | } catch (Exception e) { |
52 | 52 | log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e); | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors; |
23 | 23 | import com.google.protobuf.InvalidProtocolBufferException; |
24 | 24 | import lombok.extern.slf4j.Slf4j; |
25 | 25 | import org.apache.commons.collections.CollectionUtils; |
26 | +import org.thingsboard.common.util.JacksonUtil; | |
26 | 27 | import org.thingsboard.rule.engine.api.RpcError; |
27 | 28 | import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; |
28 | 29 | import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; |
... | ... | @@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; |
38 | 39 | import org.thingsboard.server.common.data.edge.EdgeEventType; |
39 | 40 | import org.thingsboard.server.common.data.id.DeviceId; |
40 | 41 | import org.thingsboard.server.common.data.id.EdgeId; |
42 | +import org.thingsboard.server.common.data.id.RpcId; | |
41 | 43 | import org.thingsboard.server.common.data.id.TenantId; |
42 | 44 | import org.thingsboard.server.common.data.kv.AttributeKey; |
43 | 45 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
44 | 46 | import org.thingsboard.server.common.data.kv.KvEntry; |
47 | +import org.thingsboard.server.common.data.page.PageData; | |
48 | +import org.thingsboard.server.common.data.page.PageLink; | |
45 | 49 | import org.thingsboard.server.common.data.relation.EntityRelation; |
46 | 50 | import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
51 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
52 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
47 | 53 | import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; |
48 | 54 | import org.thingsboard.server.common.data.security.DeviceCredentials; |
49 | 55 | import org.thingsboard.server.common.data.security.DeviceCredentialsType; |
... | ... | @@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; |
52 | 58 | import org.thingsboard.server.common.msg.queue.TbCallback; |
53 | 59 | import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
54 | 60 | import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; |
55 | -import org.thingsboard.server.gen.transport.TransportProtos; | |
56 | 61 | import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; |
62 | +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; | |
57 | 63 | import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; |
58 | 64 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; |
59 | 65 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; |
... | ... | @@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; |
68 | 74 | import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; |
69 | 75 | import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; |
70 | 76 | import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; |
77 | +import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg; | |
71 | 78 | import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; |
72 | 79 | import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; |
73 | 80 | import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; |
74 | 81 | import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
82 | +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; | |
75 | 83 | import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; |
76 | 84 | import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; |
77 | 85 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
... | ... | @@ -162,20 +170,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
162 | 170 | |
163 | 171 | void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { |
164 | 172 | ToDeviceRpcRequest request = msg.getMsg(); |
165 | - ToDeviceRpcRequestBody body = request.getBody(); | |
166 | - ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder() | |
167 | - .setRequestId(rpcSeq++) | |
168 | - .setMethodName(body.getMethod()) | |
169 | - .setParams(body.getParams()) | |
170 | - .setExpirationTime(request.getExpirationTime()) | |
171 | - .setRequestIdMSB(request.getId().getMostSignificantBits()) | |
172 | - .setRequestIdLSB(request.getId().getLeastSignificantBits()) | |
173 | - .build(); | |
173 | + ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request); | |
174 | 174 | |
175 | 175 | long timeout = request.getExpirationTime() - System.currentTimeMillis(); |
176 | + boolean persisted = request.isPersisted(); | |
177 | + | |
176 | 178 | if (timeout <= 0) { |
177 | 179 | log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); |
180 | + if (persisted) { | |
181 | + createRpc(request, RpcStatus.TIMEOUT); | |
182 | + } | |
178 | 183 | return; |
184 | + } else if (persisted) { | |
185 | + createRpc(request, RpcStatus.QUEUED); | |
179 | 186 | } |
180 | 187 | |
181 | 188 | boolean sent; |
... | ... | @@ -192,10 +199,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
192 | 199 | syncSessionSet.add(key); |
193 | 200 | } |
194 | 201 | }); |
195 | - log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions); | |
202 | + log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions); | |
196 | 203 | syncSessionSet.forEach(rpcSubscriptions::remove); |
197 | 204 | } |
198 | 205 | |
206 | + if (persisted && !(sent || request.isOneway())) { | |
207 | + ObjectNode response = JacksonUtil.newObjectNode(); | |
208 | + response.put("rpcId", request.getId().toString()); | |
209 | + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); | |
210 | + } | |
211 | + | |
199 | 212 | if (request.isOneway() && sent) { |
200 | 213 | log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); |
201 | 214 | systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); |
... | ... | @@ -209,6 +222,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
209 | 222 | } |
210 | 223 | } |
211 | 224 | |
225 | + private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { | |
226 | + Rpc rpc = new Rpc(new RpcId(request.getId())); | |
227 | + rpc.setCreatedTime(System.currentTimeMillis()); | |
228 | + rpc.setTenantId(tenantId); | |
229 | + rpc.setDeviceId(deviceId); | |
230 | + rpc.setExpirationTime(request.getExpirationTime()); | |
231 | + rpc.setRequest(JacksonUtil.valueToTree(request)); | |
232 | + rpc.setStatus(status); | |
233 | + systemContext.getTbRpcService().save(tenantId, rpc); | |
234 | + return systemContext.getTbRpcService().save(tenantId, rpc); | |
235 | + } | |
236 | + | |
237 | + private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { | |
238 | + ToDeviceRpcRequestBody body = request.getBody(); | |
239 | + return ToDeviceRpcRequestMsg.newBuilder() | |
240 | + .setRequestId(rpcSeq++) | |
241 | + .setMethodName(body.getMethod()) | |
242 | + .setParams(body.getParams()) | |
243 | + .setExpirationTime(request.getExpirationTime()) | |
244 | + .setRequestIdMSB(request.getId().getMostSignificantBits()) | |
245 | + .setRequestIdLSB(request.getId().getLeastSignificantBits()) | |
246 | + .setOneway(request.isOneway()) | |
247 | + .setPersisted(request.isPersisted()) | |
248 | + .build(); | |
249 | + } | |
250 | + | |
212 | 251 | void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) { |
213 | 252 | log.debug("[{}] Processing rpc command response from edge session", deviceId); |
214 | 253 | ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); |
... | ... | @@ -230,6 +269,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
230 | 269 | ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); |
231 | 270 | if (requestMd != null) { |
232 | 271 | log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); |
272 | + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); | |
233 | 273 | systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), |
234 | 274 | null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); |
235 | 275 | } |
... | ... | @@ -271,7 +311,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
271 | 311 | .setExpirationTime(request.getExpirationTime()) |
272 | 312 | .setRequestIdMSB(request.getId().getMostSignificantBits()) |
273 | 313 | .setRequestIdLSB(request.getId().getLeastSignificantBits()) |
314 | + .setOneway(request.isOneway()) | |
315 | + .setPersisted(request.isPersisted()) | |
274 | 316 | .build(); |
317 | + | |
275 | 318 | sendToTransport(rpcRequest, sessionId, nodeId); |
276 | 319 | }; |
277 | 320 | } |
... | ... | @@ -279,31 +322,39 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
279 | 322 | void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) { |
280 | 323 | TransportToDeviceActorMsg msg = wrapper.getMsg(); |
281 | 324 | TbCallback callback = wrapper.getCallback(); |
325 | + var sessionInfo = msg.getSessionInfo(); | |
326 | + | |
282 | 327 | if (msg.hasSessionEvent()) { |
283 | - processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); | |
328 | + processSessionStateMsgs(sessionInfo, msg.getSessionEvent()); | |
284 | 329 | } |
285 | 330 | if (msg.hasSubscribeToAttributes()) { |
286 | - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes()); | |
331 | + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes()); | |
287 | 332 | } |
288 | 333 | if (msg.hasSubscribeToRPC()) { |
289 | - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC()); | |
334 | + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); | |
335 | + } | |
336 | + if (msg.hasSendPendingRPC()) { | |
337 | + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo); | |
290 | 338 | } |
291 | 339 | if (msg.hasGetAttributes()) { |
292 | - handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); | |
340 | + handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); | |
293 | 341 | } |
294 | 342 | if (msg.hasToDeviceRPCCallResponse()) { |
295 | - processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse()); | |
343 | + processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse()); | |
296 | 344 | } |
297 | 345 | if (msg.hasSubscriptionInfo()) { |
298 | - handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo()); | |
346 | + handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo()); | |
299 | 347 | } |
300 | 348 | if (msg.hasClaimDevice()) { |
301 | - handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice()); | |
349 | + handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); | |
350 | + } | |
351 | + if (msg.hasPersistedRpcResponseMsg()) { | |
352 | + processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg()); | |
302 | 353 | } |
303 | 354 | callback.onSuccess(); |
304 | 355 | } |
305 | 356 | |
306 | - private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) { | |
357 | + private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) { | |
307 | 358 | DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); |
308 | 359 | systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()); |
309 | 360 | } |
... | ... | @@ -442,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
442 | 493 | if (success) { |
443 | 494 | systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), |
444 | 495 | responseMsg.getPayload(), null)); |
496 | + if (requestMd.getMsg().getMsg().isPersisted()) { | |
497 | + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload())); | |
498 | + } | |
445 | 499 | } else { |
446 | 500 | log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); |
501 | + if (requestMd.getMsg().getMsg().isPersisted()) { | |
502 | + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload())); | |
503 | + } | |
447 | 504 | } |
448 | 505 | } |
449 | 506 | |
507 | + private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { | |
508 | + UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); | |
509 | + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null); | |
510 | + } | |
511 | + | |
450 | 512 | private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { |
451 | 513 | UUID sessionId = getSessionId(sessionInfo); |
452 | 514 | if (subscribeCmd.getUnsubscribe()) { |
... | ... | @@ -565,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
565 | 627 | |
566 | 628 | void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) { |
567 | 629 | log.info("2) LwM2Mtype: "); |
568 | - TransportProtos.ToTransportUpdateCredentialsProto.Builder notification = TransportProtos.ToTransportUpdateCredentialsProto.newBuilder(); | |
630 | + ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder(); | |
569 | 631 | notification.addCredentialsId(deviceCredentials.getCredentialsId()); |
570 | 632 | notification.addCredentialsValue(deviceCredentials.getCredentialsValue()); |
571 | 633 | ToTransportMsg msg = ToTransportMsg.newBuilder() |
... | ... | @@ -640,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
640 | 702 | ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent); |
641 | 703 | Futures.addCallback(future, new FutureCallback<EdgeEvent>() { |
642 | 704 | @Override |
643 | - public void onSuccess( EdgeEvent result) { | |
705 | + public void onSuccess(EdgeEvent result) { | |
644 | 706 | systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); |
645 | 707 | } |
646 | 708 | |
... | ... | @@ -756,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
756 | 818 | .addAllSessions(sessionsList).build().toByteArray()); |
757 | 819 | } |
758 | 820 | |
759 | - void initSessionTimeout(TbActorCtx ctx) { | |
821 | + void init(TbActorCtx ctx) { | |
760 | 822 | schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); |
823 | + PageLink pageLink = new PageLink(1024); | |
824 | + PageData<Rpc> pageData; | |
825 | + do { | |
826 | + pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink); | |
827 | + pageData.getData().forEach(rpc -> { | |
828 | + ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class); | |
829 | + long timeout = rpc.getExpirationTime() - System.currentTimeMillis(); | |
830 | + if (timeout <= 0) { | |
831 | + rpc.setStatus(RpcStatus.TIMEOUT); | |
832 | + systemContext.getTbRpcService().save(tenantId, rpc); | |
833 | + } else { | |
834 | + registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); | |
835 | + } | |
836 | + }); | |
837 | + if (pageData.hasNext()) { | |
838 | + pageLink = pageLink.nextPageLink(); | |
839 | + } | |
840 | + } while (pageData.hasNext()); | |
761 | 841 | } |
762 | 842 | |
763 | 843 | void checkSessionsTimeout() { | ... | ... |
... | ... | @@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId; |
69 | 69 | import org.thingsboard.server.common.data.id.EntityIdFactory; |
70 | 70 | import org.thingsboard.server.common.data.id.EntityViewId; |
71 | 71 | import org.thingsboard.server.common.data.id.OtaPackageId; |
72 | +import org.thingsboard.server.common.data.id.RpcId; | |
72 | 73 | import org.thingsboard.server.common.data.id.TbResourceId; |
73 | 74 | import org.thingsboard.server.common.data.id.RuleChainId; |
74 | 75 | import org.thingsboard.server.common.data.id.RuleNodeId; |
... | ... | @@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink; |
83 | 84 | import org.thingsboard.server.common.data.plugin.ComponentDescriptor; |
84 | 85 | import org.thingsboard.server.common.data.plugin.ComponentType; |
85 | 86 | import org.thingsboard.server.common.data.relation.EntityRelation; |
87 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
86 | 88 | import org.thingsboard.server.common.data.rule.RuleChain; |
87 | 89 | import org.thingsboard.server.common.data.rule.RuleChainType; |
88 | 90 | import org.thingsboard.server.common.data.rule.RuleNode; |
... | ... | @@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants; |
106 | 108 | import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService; |
107 | 109 | import org.thingsboard.server.dao.oauth2.OAuth2Service; |
108 | 110 | import org.thingsboard.server.dao.relation.RelationService; |
111 | +import org.thingsboard.server.dao.rpc.RpcService; | |
109 | 112 | import org.thingsboard.server.dao.rule.RuleChainService; |
110 | 113 | import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
111 | 114 | import org.thingsboard.server.dao.tenant.TenantProfileService; |
... | ... | @@ -246,6 +249,9 @@ public abstract class BaseController { |
246 | 249 | protected OtaPackageStateService otaPackageStateService; |
247 | 250 | |
248 | 251 | @Autowired |
252 | + protected RpcService rpcService; | |
253 | + | |
254 | + @Autowired | |
249 | 255 | protected TbQueueProducerProvider producerProvider; |
250 | 256 | |
251 | 257 | @Autowired |
... | ... | @@ -786,6 +792,18 @@ public abstract class BaseController { |
786 | 792 | } |
787 | 793 | } |
788 | 794 | |
795 | + Rpc checkRpcId(RpcId rpcId, Operation operation) throws ThingsboardException { | |
796 | + try { | |
797 | + validateId(rpcId, "Incorrect rpcId " + rpcId); | |
798 | + Rpc rpc = rpcService.findById(getCurrentUser().getTenantId(), rpcId); | |
799 | + checkNotNull(rpc); | |
800 | + accessControlService.checkPermission(getCurrentUser(), Resource.RPC, operation, rpcId, rpc); | |
801 | + return rpc; | |
802 | + } catch (Exception e) { | |
803 | + throw handleException(e, false); | |
804 | + } | |
805 | + } | |
806 | + | |
789 | 807 | @SuppressWarnings("unchecked") |
790 | 808 | protected <I extends EntityId> I emptyId(EntityType entityType) { |
791 | 809 | return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID); | ... | ... |
... | ... | @@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable; |
29 | 29 | import org.springframework.web.bind.annotation.RequestBody; |
30 | 30 | import org.springframework.web.bind.annotation.RequestMapping; |
31 | 31 | import org.springframework.web.bind.annotation.RequestMethod; |
32 | +import org.springframework.web.bind.annotation.RequestParam; | |
32 | 33 | import org.springframework.web.bind.annotation.ResponseBody; |
33 | 34 | import org.springframework.web.bind.annotation.RestController; |
34 | 35 | import org.springframework.web.context.request.async.DeferredResult; |
... | ... | @@ -38,8 +39,13 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; |
38 | 39 | import org.thingsboard.server.common.data.exception.ThingsboardException; |
39 | 40 | import org.thingsboard.server.common.data.id.DeviceId; |
40 | 41 | import org.thingsboard.server.common.data.id.EntityId; |
42 | +import org.thingsboard.server.common.data.id.RpcId; | |
41 | 43 | import org.thingsboard.server.common.data.id.TenantId; |
42 | 44 | import org.thingsboard.server.common.data.id.UUIDBased; |
45 | +import org.thingsboard.server.common.data.page.PageData; | |
46 | +import org.thingsboard.server.common.data.page.PageLink; | |
47 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
48 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
43 | 49 | import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; |
44 | 50 | import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; |
45 | 51 | import org.thingsboard.server.queue.util.TbCoreComponent; |
... | ... | @@ -93,6 +99,52 @@ public class RpcController extends BaseController { |
93 | 99 | return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); |
94 | 100 | } |
95 | 101 | |
102 | + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") | |
103 | + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET) | |
104 | + @ResponseBody | |
105 | + public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException { | |
106 | + checkParameter("RpcId", strRpc); | |
107 | + try { | |
108 | + RpcId rpcId = new RpcId(UUID.fromString(strRpc)); | |
109 | + return checkRpcId(rpcId, Operation.READ); | |
110 | + } catch (Exception e) { | |
111 | + throw handleException(e); | |
112 | + } | |
113 | + } | |
114 | + | |
115 | + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") | |
116 | + @RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET) | |
117 | + @ResponseBody | |
118 | + public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId, | |
119 | + @RequestParam int pageSize, | |
120 | + @RequestParam int page, | |
121 | + @RequestParam RpcStatus rpcStatus, | |
122 | + @RequestParam(required = false) String textSearch, | |
123 | + @RequestParam(required = false) String sortProperty, | |
124 | + @RequestParam(required = false) String sortOrder) throws ThingsboardException { | |
125 | + checkParameter("DeviceId", strDeviceId); | |
126 | + try { | |
127 | + TenantId tenantId = getCurrentUser().getTenantId(); | |
128 | + PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); | |
129 | + DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId)); | |
130 | + return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink)); | |
131 | + } catch (Exception e) { | |
132 | + throw handleException(e); | |
133 | + } | |
134 | + } | |
135 | + | |
136 | + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") | |
137 | + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE) | |
138 | + @ResponseBody | |
139 | + public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { | |
140 | + checkParameter("RpcId", strRpc); | |
141 | + try { | |
142 | + rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc))); | |
143 | + } catch (Exception e) { | |
144 | + throw handleException(e); | |
145 | + } | |
146 | + } | |
147 | + | |
96 | 148 | private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { |
97 | 149 | try { |
98 | 150 | JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); |
... | ... | @@ -103,6 +155,7 @@ public class RpcController extends BaseController { |
103 | 155 | long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout; |
104 | 156 | long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); |
105 | 157 | UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); |
158 | + boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean(); | |
106 | 159 | accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { |
107 | 160 | @Override |
108 | 161 | public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { |
... | ... | @@ -111,7 +164,8 @@ public class RpcController extends BaseController { |
111 | 164 | deviceId, |
112 | 165 | oneWay, |
113 | 166 | expTime, |
114 | - body | |
167 | + body, | |
168 | + persisted | |
115 | 169 | ); |
116 | 170 | deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser); |
117 | 171 | } | ... | ... |
... | ... | @@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { |
157 | 157 | metaData.putValue("originServiceId", serviceId); |
158 | 158 | metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); |
159 | 159 | metaData.putValue("oneway", Boolean.toString(msg.isOneway())); |
160 | + metaData.putValue("persisted", Boolean.toString(msg.isPersisted())); | |
160 | 161 | |
161 | 162 | Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); |
162 | 163 | if (device != null) { | ... | ... |
... | ... | @@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi |
100 | 100 | @Override |
101 | 101 | public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { |
102 | 102 | ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), |
103 | - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); | |
103 | + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted()); | |
104 | 104 | forwardRpcRequestToDeviceActor(request, response -> { |
105 | 105 | if (src.isRestApiCall()) { |
106 | 106 | sendRpcResponseToTbCore(src.getOriginServiceId(), response); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.rpc; | |
17 | + | |
18 | +import com.fasterxml.jackson.databind.JsonNode; | |
19 | +import lombok.RequiredArgsConstructor; | |
20 | +import lombok.extern.slf4j.Slf4j; | |
21 | +import org.springframework.stereotype.Service; | |
22 | +import org.thingsboard.common.util.JacksonUtil; | |
23 | +import org.thingsboard.server.common.data.id.DeviceId; | |
24 | +import org.thingsboard.server.common.data.id.RpcId; | |
25 | +import org.thingsboard.server.common.data.id.TenantId; | |
26 | +import org.thingsboard.server.common.data.page.PageData; | |
27 | +import org.thingsboard.server.common.data.page.PageLink; | |
28 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
29 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
30 | +import org.thingsboard.server.common.msg.TbMsg; | |
31 | +import org.thingsboard.server.common.msg.TbMsgMetaData; | |
32 | +import org.thingsboard.server.dao.rpc.RpcService; | |
33 | +import org.thingsboard.server.queue.util.TbCoreComponent; | |
34 | +import org.thingsboard.server.service.queue.TbClusterService; | |
35 | + | |
36 | +@TbCoreComponent | |
37 | +@Service | |
38 | +@RequiredArgsConstructor | |
39 | +@Slf4j | |
40 | +public class TbRpcService { | |
41 | + private final RpcService rpcService; | |
42 | + private final TbClusterService tbClusterService; | |
43 | + | |
44 | + public Rpc save(TenantId tenantId, Rpc rpc) { | |
45 | + Rpc saved = rpcService.save(rpc); | |
46 | + pushRpcMsgToRuleEngine(tenantId, saved); | |
47 | + return saved; | |
48 | + } | |
49 | + | |
50 | + public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) { | |
51 | + Rpc foundRpc = rpcService.findById(tenantId, rpcId); | |
52 | + if (foundRpc != null) { | |
53 | + foundRpc.setStatus(newStatus); | |
54 | + if (response != null) { | |
55 | + foundRpc.setResponse(response); | |
56 | + } | |
57 | + Rpc saved = rpcService.save(foundRpc); | |
58 | + pushRpcMsgToRuleEngine(tenantId, saved); | |
59 | + } else { | |
60 | + log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId); | |
61 | + } | |
62 | + } | |
63 | + | |
64 | + private void pushRpcMsgToRuleEngine(TenantId tenantId, Rpc rpc) { | |
65 | + TbMsg msg = TbMsg.newMsg("RPC_" + rpc.getStatus().name(), rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc)); | |
66 | + tbClusterService.pushMsgToRuleEngine(tenantId, rpc.getId(), msg, null); | |
67 | + } | |
68 | + | |
69 | + public Rpc findRpcById(TenantId tenantId, RpcId rpcId) { | |
70 | + return rpcService.findById(tenantId, rpcId); | |
71 | + } | |
72 | + | |
73 | + public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { | |
74 | + return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink); | |
75 | + } | |
76 | + | |
77 | +} | ... | ... |
... | ... | @@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId; |
47 | 47 | import org.thingsboard.server.common.data.id.EntityIdFactory; |
48 | 48 | import org.thingsboard.server.common.data.id.EntityViewId; |
49 | 49 | import org.thingsboard.server.common.data.id.OtaPackageId; |
50 | +import org.thingsboard.server.common.data.id.RpcId; | |
50 | 51 | import org.thingsboard.server.common.data.id.RuleChainId; |
51 | 52 | import org.thingsboard.server.common.data.id.RuleNodeId; |
52 | 53 | import org.thingsboard.server.common.data.id.TbResourceId; |
53 | 54 | import org.thingsboard.server.common.data.id.TenantId; |
54 | 55 | import org.thingsboard.server.common.data.id.UserId; |
56 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
55 | 57 | import org.thingsboard.server.common.data.rule.RuleChain; |
56 | 58 | import org.thingsboard.server.common.data.rule.RuleNode; |
57 | 59 | import org.thingsboard.server.controller.HttpValidationCallback; |
... | ... | @@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; |
65 | 67 | import org.thingsboard.server.dao.exception.IncorrectParameterException; |
66 | 68 | import org.thingsboard.server.dao.ota.OtaPackageService; |
67 | 69 | import org.thingsboard.server.dao.resource.ResourceService; |
70 | +import org.thingsboard.server.dao.rpc.RpcService; | |
68 | 71 | import org.thingsboard.server.dao.rule.RuleChainService; |
69 | 72 | import org.thingsboard.server.dao.tenant.TenantService; |
70 | 73 | import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; |
... | ... | @@ -137,6 +140,9 @@ public class AccessValidator { |
137 | 140 | @Autowired |
138 | 141 | protected OtaPackageService otaPackageService; |
139 | 142 | |
143 | + @Autowired | |
144 | + protected RpcService rpcService; | |
145 | + | |
140 | 146 | private ExecutorService executor; |
141 | 147 | |
142 | 148 | @PostConstruct |
... | ... | @@ -235,6 +241,9 @@ public class AccessValidator { |
235 | 241 | case OTA_PACKAGE: |
236 | 242 | validateOtaPackage(currentUser, operation, entityId, callback); |
237 | 243 | return; |
244 | + case RPC: | |
245 | + validateRpc(currentUser, operation, entityId, callback); | |
246 | + return; | |
238 | 247 | default: |
239 | 248 | //TODO: add support of other entities |
240 | 249 | throw new IllegalStateException("Not Implemented!"); |
... | ... | @@ -261,6 +270,22 @@ public class AccessValidator { |
261 | 270 | } |
262 | 271 | } |
263 | 272 | |
273 | + private void validateRpc(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { | |
274 | + ListenableFuture<Rpc> rpcFurure = rpcService.findRpcByIdAsync(currentUser.getTenantId(), new RpcId(entityId.getId())); | |
275 | + Futures.addCallback(rpcFurure, getCallback(callback, rpc -> { | |
276 | + if (rpc == null) { | |
277 | + return ValidationResult.entityNotFound("Rpc with requested id wasn't found!"); | |
278 | + } else { | |
279 | + try { | |
280 | + accessControlService.checkPermission(currentUser, Resource.RPC, operation, entityId, rpc); | |
281 | + } catch (ThingsboardException e) { | |
282 | + return ValidationResult.accessDenied(e.getMessage()); | |
283 | + } | |
284 | + return ValidationResult.ok(rpc); | |
285 | + } | |
286 | + }), executor); | |
287 | + } | |
288 | + | |
264 | 289 | private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { |
265 | 290 | if (currentUser.isSystemAdmin()) { |
266 | 291 | callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); | ... | ... |
... | ... | @@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions { |
41 | 41 | put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker); |
42 | 42 | put(Resource.WIDGET_TYPE, widgetsPermissionChecker); |
43 | 43 | put(Resource.EDGE, customerEntityPermissionChecker); |
44 | + put(Resource.RPC, rpcPermissionChecker); | |
44 | 45 | } |
45 | 46 | |
46 | 47 | private static final PermissionChecker customerEntityPermissionChecker = |
... | ... | @@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions { |
138 | 139 | } |
139 | 140 | |
140 | 141 | }; |
142 | + | |
143 | + private static final PermissionChecker rpcPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) { | |
144 | + | |
145 | + @Override | |
146 | + @SuppressWarnings("unchecked") | |
147 | + public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { | |
148 | + if (!super.hasPermission(user, operation, entityId, entity)) { | |
149 | + return false; | |
150 | + } | |
151 | + if (entity.getTenantId() == null || entity.getTenantId().isNullUid()) { | |
152 | + return true; | |
153 | + } | |
154 | + if (!user.getTenantId().equals(entity.getTenantId())) { | |
155 | + return false; | |
156 | + } | |
157 | + return true; | |
158 | + } | |
159 | + }; | |
141 | 160 | } | ... | ... |
... | ... | @@ -39,7 +39,8 @@ public enum Resource { |
39 | 39 | API_USAGE_STATE(EntityType.API_USAGE_STATE), |
40 | 40 | TB_RESOURCE(EntityType.TB_RESOURCE), |
41 | 41 | OTA_PACKAGE(EntityType.OTA_PACKAGE), |
42 | - EDGE(EntityType.EDGE); | |
42 | + EDGE(EntityType.EDGE), | |
43 | + RPC(EntityType.RPC); | |
43 | 44 | |
44 | 45 | private final EntityType entityType; |
45 | 46 | ... | ... |
... | ... | @@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions { |
44 | 44 | put(Resource.TB_RESOURCE, tbResourcePermissionChecker); |
45 | 45 | put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker); |
46 | 46 | put(Resource.EDGE, tenantEntityPermissionChecker); |
47 | + put(Resource.RPC, tenantEntityPermissionChecker); | |
47 | 48 | } |
48 | 49 | |
49 | 50 | public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() { | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures; |
22 | 22 | import com.google.common.util.concurrent.ListenableFuture; |
23 | 23 | import com.google.common.util.concurrent.MoreExecutors; |
24 | 24 | import com.google.protobuf.ByteString; |
25 | +import lombok.RequiredArgsConstructor; | |
25 | 26 | import lombok.extern.slf4j.Slf4j; |
26 | 27 | import org.springframework.stereotype.Service; |
27 | 28 | import org.springframework.util.StringUtils; |
... | ... | @@ -41,13 +42,13 @@ import org.thingsboard.server.common.data.TenantProfile; |
41 | 42 | import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; |
42 | 43 | import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData; |
43 | 44 | import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials; |
44 | -import org.thingsboard.server.common.data.ota.OtaPackageType; | |
45 | -import org.thingsboard.server.common.data.ota.OtaPackageUtil; | |
46 | 45 | import org.thingsboard.server.common.data.id.CustomerId; |
47 | 46 | import org.thingsboard.server.common.data.id.DeviceId; |
48 | 47 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
49 | 48 | import org.thingsboard.server.common.data.id.OtaPackageId; |
50 | 49 | import org.thingsboard.server.common.data.id.TenantId; |
50 | +import org.thingsboard.server.common.data.ota.OtaPackageType; | |
51 | +import org.thingsboard.server.common.data.ota.OtaPackageUtil; | |
51 | 52 | import org.thingsboard.server.common.data.page.PageData; |
52 | 53 | import org.thingsboard.server.common.data.page.PageLink; |
53 | 54 | import org.thingsboard.server.common.data.relation.EntityRelation; |
... | ... | @@ -108,6 +109,7 @@ import java.util.stream.Collectors; |
108 | 109 | @Slf4j |
109 | 110 | @Service |
110 | 111 | @TbCoreComponent |
112 | +@RequiredArgsConstructor | |
111 | 113 | public class DefaultTransportApiService implements TransportApiService { |
112 | 114 | |
113 | 115 | private static final ObjectMapper mapper = new ObjectMapper(); |
... | ... | @@ -129,28 +131,6 @@ public class DefaultTransportApiService implements TransportApiService { |
129 | 131 | |
130 | 132 | private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>(); |
131 | 133 | |
132 | - public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache, | |
133 | - TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, DeviceService deviceService, | |
134 | - RelationService relationService, DeviceCredentialsService deviceCredentialsService, | |
135 | - DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService, | |
136 | - TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService, | |
137 | - DeviceProvisionService deviceProvisionService, TbResourceService resourceService, OtaPackageService otaPackageService, OtaPackageDataCache otaPackageDataCache) { | |
138 | - this.deviceProfileCache = deviceProfileCache; | |
139 | - this.tenantProfileCache = tenantProfileCache; | |
140 | - this.apiUsageStateService = apiUsageStateService; | |
141 | - this.deviceService = deviceService; | |
142 | - this.relationService = relationService; | |
143 | - this.deviceCredentialsService = deviceCredentialsService; | |
144 | - this.deviceStateService = deviceStateService; | |
145 | - this.dbCallbackExecutorService = dbCallbackExecutorService; | |
146 | - this.tbClusterService = tbClusterService; | |
147 | - this.dataDecodingEncodingService = dataDecodingEncodingService; | |
148 | - this.deviceProvisionService = deviceProvisionService; | |
149 | - this.resourceService = resourceService; | |
150 | - this.otaPackageService = otaPackageService; | |
151 | - this.otaPackageDataCache = otaPackageDataCache; | |
152 | - } | |
153 | - | |
154 | 134 | @Override |
155 | 135 | public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) { |
156 | 136 | TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.ttl.rpc; | |
17 | + | |
18 | +import lombok.RequiredArgsConstructor; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.springframework.beans.factory.annotation.Value; | |
21 | +import org.springframework.scheduling.annotation.Scheduled; | |
22 | +import org.springframework.stereotype.Service; | |
23 | +import org.thingsboard.server.common.data.id.TenantId; | |
24 | +import org.thingsboard.server.common.data.page.PageData; | |
25 | +import org.thingsboard.server.common.data.page.PageLink; | |
26 | +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; | |
27 | +import org.thingsboard.server.common.msg.queue.ServiceType; | |
28 | +import org.thingsboard.server.dao.rpc.RpcDao; | |
29 | +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; | |
30 | +import org.thingsboard.server.dao.tenant.TenantDao; | |
31 | +import org.thingsboard.server.queue.discovery.PartitionService; | |
32 | +import org.thingsboard.server.queue.util.TbCoreComponent; | |
33 | + | |
34 | +import java.util.Date; | |
35 | +import java.util.Optional; | |
36 | +import java.util.concurrent.TimeUnit; | |
37 | + | |
38 | +@TbCoreComponent | |
39 | +@Service | |
40 | +@Slf4j | |
41 | +@RequiredArgsConstructor | |
42 | +public class RpcCleanUpService { | |
43 | + @Value("${sql.ttl.rpc.enabled}") | |
44 | + private boolean ttlTaskExecutionEnabled; | |
45 | + | |
46 | + private final TenantDao tenantDao; | |
47 | + private final PartitionService partitionService; | |
48 | + private final TbTenantProfileCache tenantProfileCache; | |
49 | + private final RpcDao rpcDao; | |
50 | + | |
51 | + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}") | |
52 | + public void cleanUp() { | |
53 | + if (ttlTaskExecutionEnabled) { | |
54 | + PageLink tenantsBatchRequest = new PageLink(10_000, 0); | |
55 | + PageData<TenantId> tenantsIds; | |
56 | + do { | |
57 | + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest); | |
58 | + for (TenantId tenantId : tenantsIds.getData()) { | |
59 | + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { | |
60 | + continue; | |
61 | + } | |
62 | + | |
63 | + Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); | |
64 | + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) { | |
65 | + continue; | |
66 | + } | |
67 | + | |
68 | + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays()); | |
69 | + long expirationTime = System.currentTimeMillis() - ttl; | |
70 | + | |
71 | + long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime); | |
72 | + | |
73 | + if (totalRemoved > 0) { | |
74 | + log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime)); | |
75 | + } | |
76 | + } | |
77 | + | |
78 | + tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); | |
79 | + } while (tenantsIds.hasNext()); | |
80 | + } | |
81 | + } | |
82 | + | |
83 | +} | ... | ... |
... | ... | @@ -276,6 +276,9 @@ sql: |
276 | 276 | alarms: |
277 | 277 | checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours |
278 | 278 | removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches |
279 | + rpc: | |
280 | + enabled: "${SQL_TTL_RPC_ENABLED:true}" | |
281 | + checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours | |
279 | 282 | |
280 | 283 | # Actor system parameters |
281 | 284 | actors: | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.rpc; | |
17 | + | |
18 | +import com.google.common.util.concurrent.ListenableFuture; | |
19 | +import org.thingsboard.server.common.data.id.DeviceId; | |
20 | +import org.thingsboard.server.common.data.id.RpcId; | |
21 | +import org.thingsboard.server.common.data.id.TenantId; | |
22 | +import org.thingsboard.server.common.data.page.PageData; | |
23 | +import org.thingsboard.server.common.data.page.PageLink; | |
24 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
25 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
26 | + | |
27 | +public interface RpcService { | |
28 | + Rpc save(Rpc rpc); | |
29 | + | |
30 | + void deleteRpc(TenantId tenantId, RpcId id); | |
31 | + | |
32 | + void deleteAllRpcByTenantId(TenantId tenantId); | |
33 | + | |
34 | + Rpc findById(TenantId tenantId, RpcId id); | |
35 | + | |
36 | + ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id); | |
37 | + | |
38 | + PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); | |
39 | +} | ... | ... |
... | ... | @@ -76,6 +76,12 @@ public class DataConstants { |
76 | 76 | |
77 | 77 | public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; |
78 | 78 | |
79 | + public static final String RPC_QUEUED = "RPC_QUEUED"; | |
80 | + public static final String RPC_DELIVERED = "RPC_DELIVERED"; | |
81 | + public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; | |
82 | + public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; | |
83 | + public static final String RPC_FAILED = "RPC_FAILED"; | |
84 | + | |
79 | 85 | public static final String DEFAULT_SECRET_KEY = ""; |
80 | 86 | public static final String SECRET_KEY_FIELD_NAME = "secretKey"; |
81 | 87 | public static final String DURATION_MS_FIELD_NAME = "durationMs"; | ... | ... |
... | ... | @@ -19,5 +19,5 @@ package org.thingsboard.server.common.data; |
19 | 19 | * @author Andrew Shvayka |
20 | 20 | */ |
21 | 21 | public enum EntityType { |
22 | - TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE; | |
22 | + TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC; | |
23 | 23 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.id; | |
17 | + | |
18 | +import com.fasterxml.jackson.annotation.JsonCreator; | |
19 | +import com.fasterxml.jackson.annotation.JsonIgnore; | |
20 | +import com.fasterxml.jackson.annotation.JsonProperty; | |
21 | +import org.thingsboard.server.common.data.EntityType; | |
22 | + | |
23 | +import java.util.UUID; | |
24 | + | |
25 | +public final class RpcId extends UUIDBased implements EntityId { | |
26 | + | |
27 | + private static final long serialVersionUID = 1L; | |
28 | + | |
29 | + @JsonCreator | |
30 | + public RpcId(@JsonProperty("id") UUID id) { | |
31 | + super(id); | |
32 | + } | |
33 | + | |
34 | + @JsonIgnore | |
35 | + @Override | |
36 | + public EntityType getEntityType() { | |
37 | + return EntityType.RPC; | |
38 | + } | |
39 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.rpc; | |
17 | + | |
18 | +import com.fasterxml.jackson.databind.JsonNode; | |
19 | +import lombok.Data; | |
20 | +import lombok.EqualsAndHashCode; | |
21 | +import org.thingsboard.server.common.data.BaseData; | |
22 | +import org.thingsboard.server.common.data.HasTenantId; | |
23 | +import org.thingsboard.server.common.data.id.DeviceId; | |
24 | +import org.thingsboard.server.common.data.id.RpcId; | |
25 | +import org.thingsboard.server.common.data.id.TenantId; | |
26 | + | |
27 | +@Data | |
28 | +@EqualsAndHashCode(callSuper = true) | |
29 | +public class Rpc extends BaseData<RpcId> implements HasTenantId { | |
30 | + private TenantId tenantId; | |
31 | + private DeviceId deviceId; | |
32 | + private long expirationTime; | |
33 | + private JsonNode request; | |
34 | + private JsonNode response; | |
35 | + private RpcStatus status; | |
36 | + | |
37 | + public Rpc() { | |
38 | + super(); | |
39 | + } | |
40 | + | |
41 | + public Rpc(RpcId id) { | |
42 | + super(id); | |
43 | + } | |
44 | + | |
45 | + public Rpc(Rpc rpc) { | |
46 | + super(rpc); | |
47 | + this.tenantId = rpc.getTenantId(); | |
48 | + this.deviceId = rpc.getDeviceId(); | |
49 | + this.expirationTime = rpc.getExpirationTime(); | |
50 | + this.request = rpc.getRequest(); | |
51 | + this.response = rpc.getResponse(); | |
52 | + this.status = rpc.getStatus(); | |
53 | + } | |
54 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.rpc; | |
17 | + | |
18 | +public enum RpcStatus { | |
19 | + QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED | |
20 | +} | ... | ... |
... | ... | @@ -318,6 +318,9 @@ message SubscribeToRPCMsg { |
318 | 318 | SessionType sessionType = 2; |
319 | 319 | } |
320 | 320 | |
321 | +message SendPendingRPCMsg { | |
322 | +} | |
323 | + | |
321 | 324 | message ToDeviceRpcRequestMsg { |
322 | 325 | int32 requestId = 1; |
323 | 326 | string methodName = 2; |
... | ... | @@ -325,6 +328,8 @@ message ToDeviceRpcRequestMsg { |
325 | 328 | int64 expirationTime = 4; |
326 | 329 | int64 requestIdMSB = 5; |
327 | 330 | int64 requestIdLSB = 6; |
331 | + bool oneway = 7; | |
332 | + bool persisted = 8; | |
328 | 333 | } |
329 | 334 | |
330 | 335 | message ToDeviceRpcResponseMsg { |
... | ... | @@ -332,6 +337,13 @@ message ToDeviceRpcResponseMsg { |
332 | 337 | string payload = 2; |
333 | 338 | } |
334 | 339 | |
340 | +message ToDevicePersistedRpcResponseMsg { | |
341 | + int32 requestId = 1; | |
342 | + int64 requestIdMSB = 2; | |
343 | + int64 requestIdLSB = 3; | |
344 | + string status = 4; | |
345 | +} | |
346 | + | |
335 | 347 | message ToServerRpcRequestMsg { |
336 | 348 | int32 requestId = 1; |
337 | 349 | string methodName = 2; |
... | ... | @@ -435,6 +447,8 @@ message TransportToDeviceActorMsg { |
435 | 447 | SubscriptionInfoProto subscriptionInfo = 7; |
436 | 448 | ClaimDeviceMsg claimDevice = 8; |
437 | 449 | ProvisionDeviceRequestMsg provisionDevice = 9; |
450 | + ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10; | |
451 | + SendPendingRPCMsg sendPendingRPC = 11; | |
438 | 452 | } |
439 | 453 | |
440 | 454 | message TransportToRuleEngineMsg { | ... | ... |
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
... | ... | @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC |
44 | 44 | import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration; |
45 | 45 | import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; |
46 | 46 | import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; |
47 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
47 | 48 | import org.thingsboard.server.common.data.security.DeviceTokenCredentials; |
48 | 49 | import org.thingsboard.server.common.msg.session.FeatureType; |
49 | 50 | import org.thingsboard.server.common.msg.session.SessionMsgType; |
... | ... | @@ -332,14 +333,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
332 | 333 | break; |
333 | 334 | case TO_SERVER_RPC_REQUEST: |
334 | 335 | transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, |
335 | - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); | |
336 | + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout); | |
336 | 337 | transportService.process(sessionInfo, |
337 | 338 | coapTransportAdaptor.convertToServerRpcRequest(sessionId, request), |
338 | 339 | new CoapNoOpCallback(exchange)); |
339 | 340 | break; |
340 | 341 | case GET_ATTRIBUTES_REQUEST: |
341 | 342 | transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, |
342 | - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); | |
343 | + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout); | |
343 | 344 | transportService.process(sessionInfo, |
344 | 345 | coapTransportAdaptor.convertToGetAttributes(sessionId, request), |
345 | 346 | new CoapNoOpCallback(exchange)); |
... | ... | @@ -362,12 +363,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
362 | 363 | |
363 | 364 | private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { |
364 | 365 | tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); |
365 | - transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); | |
366 | + transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); | |
366 | 367 | transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); |
367 | 368 | } |
368 | 369 | |
369 | - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { | |
370 | - return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); | |
370 | + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { | |
371 | + return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); | |
371 | 372 | } |
372 | 373 | |
373 | 374 | private String getTokenFromRequest(Request request) { |
... | ... | @@ -455,12 +456,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
455 | 456 | private final CoapExchange exchange; |
456 | 457 | private final CoapTransportAdaptor coapTransportAdaptor; |
457 | 458 | private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; |
459 | + private final TransportProtos.SessionInfoProto sessionInfo; | |
458 | 460 | |
459 | - CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { | |
461 | + CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { | |
460 | 462 | this.coapTransportResource = coapTransportResource; |
461 | 463 | this.exchange = exchange; |
462 | 464 | this.coapTransportAdaptor = coapTransportAdaptor; |
463 | 465 | this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; |
466 | + this.sessionInfo = sessionInfo; | |
464 | 467 | } |
465 | 468 | |
466 | 469 | @Override |
... | ... | @@ -503,11 +506,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
503 | 506 | |
504 | 507 | @Override |
505 | 508 | public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { |
509 | + boolean successful; | |
506 | 510 | try { |
507 | 511 | exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); |
512 | + successful = true; | |
508 | 513 | } catch (AdaptorException e) { |
509 | 514 | log.trace("Failed to reply due to error", e); |
510 | 515 | exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); |
516 | + successful = false; | |
517 | + } | |
518 | + if (msg.getPersisted()) { | |
519 | + RpcStatus status; | |
520 | + if (!successful) { | |
521 | + status = RpcStatus.FAILED; | |
522 | + } else if (msg.getOneway()) { | |
523 | + status = RpcStatus.SUCCESSFUL; | |
524 | + } else { | |
525 | + status = RpcStatus.DELIVERED; | |
526 | + } | |
527 | + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
528 | + .setRequestId(msg.getRequestId()) | |
529 | + .setRequestIdLSB(msg.getRequestIdLSB()) | |
530 | + .setRequestIdMSB(msg.getRequestIdMSB()) | |
531 | + .setStatus(status.name()) | |
532 | + .build(); | |
533 | + coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); | |
511 | 534 | } |
512 | 535 | } |
513 | 536 | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http; |
17 | 17 | |
18 | 18 | import com.google.gson.JsonObject; |
19 | 19 | import com.google.gson.JsonParser; |
20 | +import lombok.RequiredArgsConstructor; | |
20 | 21 | import lombok.extern.slf4j.Slf4j; |
21 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
22 | 23 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
... | ... | @@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam; |
34 | 35 | import org.springframework.web.bind.annotation.RestController; |
35 | 36 | import org.springframework.web.context.request.async.DeferredResult; |
36 | 37 | import org.thingsboard.server.common.data.DeviceTransportType; |
37 | -import org.thingsboard.server.common.data.ota.OtaPackageType; | |
38 | 38 | import org.thingsboard.server.common.data.TbTransportService; |
39 | 39 | import org.thingsboard.server.common.data.id.DeviceId; |
40 | +import org.thingsboard.server.common.data.ota.OtaPackageType; | |
41 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
40 | 42 | import org.thingsboard.server.common.transport.SessionMsgListener; |
41 | 43 | import org.thingsboard.server.common.transport.TransportContext; |
42 | 44 | import org.thingsboard.server.common.transport.TransportService; |
... | ... | @@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService { |
95 | 97 | request.addAllSharedAttributeNames(sharedKeySet); |
96 | 98 | } |
97 | 99 | TransportService transportService = transportContext.getTransportService(); |
98 | - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); | |
100 | + transportService.registerSyncSession(sessionInfo, | |
101 | + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), | |
102 | + transportContext.getDefaultTimeout()); | |
99 | 103 | transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); |
100 | 104 | })); |
101 | 105 | return responseWriter; |
... | ... | @@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService { |
151 | 155 | transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), |
152 | 156 | new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { |
153 | 157 | TransportService transportService = transportContext.getTransportService(); |
154 | - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), | |
158 | + transportService.registerSyncSession(sessionInfo, | |
159 | + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), | |
155 | 160 | timeout == 0 ? transportContext.getDefaultTimeout() : timeout); |
156 | 161 | transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(), |
157 | 162 | new SessionCloseOnErrorCallback(transportService, sessionInfo)); |
... | ... | @@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService { |
181 | 186 | new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { |
182 | 187 | JsonObject request = new JsonParser().parse(json).getAsJsonObject(); |
183 | 188 | TransportService transportService = transportContext.getTransportService(); |
184 | - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); | |
189 | + transportService.registerSyncSession(sessionInfo, | |
190 | + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), | |
191 | + transportContext.getDefaultTimeout()); | |
185 | 192 | transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0) |
186 | 193 | .setMethodName(request.get("method").getAsString()) |
187 | 194 | .setParams(request.get("params").toString()).build(), |
... | ... | @@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService { |
198 | 205 | transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), |
199 | 206 | new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { |
200 | 207 | TransportService transportService = transportContext.getTransportService(); |
201 | - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), | |
208 | + transportService.registerSyncSession(sessionInfo, | |
209 | + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), | |
202 | 210 | timeout == 0 ? transportContext.getDefaultTimeout() : timeout); |
203 | 211 | transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(), |
204 | 212 | new SessionCloseOnErrorCallback(transportService, sessionInfo)); |
... | ... | @@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService { |
372 | 380 | } |
373 | 381 | } |
374 | 382 | |
383 | + @RequiredArgsConstructor | |
375 | 384 | private static class HttpSessionListener implements SessionMsgListener { |
376 | 385 | |
377 | 386 | private final DeferredResult<ResponseEntity> responseWriter; |
378 | - | |
379 | - HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) { | |
380 | - this.responseWriter = responseWriter; | |
381 | - } | |
387 | + private final TransportService transportService; | |
388 | + private final SessionInfoProto sessionInfo; | |
382 | 389 | |
383 | 390 | @Override |
384 | 391 | public void onGetAttributesResponse(GetAttributeResponseMsg msg) { |
... | ... | @@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService { |
399 | 406 | @Override |
400 | 407 | public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { |
401 | 408 | responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); |
409 | + if (msg.getPersisted()) { | |
410 | + RpcStatus status; | |
411 | + if (msg.getOneway()) { | |
412 | + status = RpcStatus.SUCCESSFUL; | |
413 | + } else { | |
414 | + status = RpcStatus.DELIVERED; | |
415 | + } | |
416 | + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
417 | + .setRequestId(msg.getRequestId()) | |
418 | + .setRequestIdLSB(msg.getRequestIdLSB()) | |
419 | + .setRequestIdMSB(msg.getRequestIdMSB()) | |
420 | + .setStatus(status.name()) | |
421 | + .build(); | |
422 | + transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); | |
423 | + } | |
402 | 424 | } |
403 | 425 | |
404 | 426 | @Override | ... | ... |
... | ... | @@ -155,7 +155,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { |
155 | 155 | LwM2MServerBootstrap profileLwm2mServer = JacksonUtil.fromString(JacksonUtil.toString(bootstrapObject.getLwm2mServer()), LwM2MServerBootstrap.class); |
156 | 156 | UUID sessionUUiD = UUID.randomUUID(); |
157 | 157 | TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits()); |
158 | - context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo)); | |
158 | + context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo, context.getTransportService())); | |
159 | 159 | if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) { |
160 | 160 | lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap); |
161 | 161 | lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer); | ... | ... |
... | ... | @@ -23,7 +23,10 @@ import org.jetbrains.annotations.NotNull; |
23 | 23 | import org.thingsboard.server.common.data.Device; |
24 | 24 | import org.thingsboard.server.common.data.DeviceProfile; |
25 | 25 | import org.thingsboard.server.common.data.ResourceType; |
26 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
26 | 27 | import org.thingsboard.server.common.transport.SessionMsgListener; |
28 | +import org.thingsboard.server.common.transport.TransportService; | |
29 | +import org.thingsboard.server.common.transport.TransportServiceCallback; | |
27 | 30 | import org.thingsboard.server.gen.transport.TransportProtos; |
28 | 31 | import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; |
29 | 32 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; |
... | ... | @@ -45,6 +48,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s |
45 | 48 | private final LwM2MAttributesService attributesService; |
46 | 49 | private final LwM2MRpcRequestHandler rpcHandler; |
47 | 50 | private final TransportProtos.SessionInfoProto sessionInfo; |
51 | + private final TransportService transportService; | |
48 | 52 | |
49 | 53 | @Override |
50 | 54 | public void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse) { |
... | ... | @@ -78,7 +82,22 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s |
78 | 82 | |
79 | 83 | @Override |
80 | 84 | public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { |
81 | - this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest,this.sessionInfo); | |
85 | + this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo); | |
86 | + if (toDeviceRequest.getPersisted()) { | |
87 | + RpcStatus status; | |
88 | + if (toDeviceRequest.getOneway()) { | |
89 | + status = RpcStatus.SUCCESSFUL; | |
90 | + } else { | |
91 | + status = RpcStatus.DELIVERED; | |
92 | + } | |
93 | + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
94 | + .setRequestId(toDeviceRequest.getRequestId()) | |
95 | + .setRequestIdLSB(toDeviceRequest.getRequestIdLSB()) | |
96 | + .setRequestIdMSB(toDeviceRequest.getRequestIdMSB()) | |
97 | + .setStatus(status.name()) | |
98 | + .build(); | |
99 | + transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); | |
100 | + } | |
82 | 101 | } |
83 | 102 | |
84 | 103 | @Override | ... | ... |
... | ... | @@ -212,7 +212,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl |
212 | 212 | } |
213 | 213 | logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); |
214 | 214 | SessionInfoProto sessionInfo = lwM2MClient.getSession(); |
215 | - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo)); | |
215 | + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); | |
216 | 216 | log.warn("40) sessionId [{}] Registering rpc subscription after Registration client", new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB())); |
217 | 217 | TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder() |
218 | 218 | .setSessionInfo(sessionInfo) |
... | ... | @@ -888,7 +888,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl |
888 | 888 | */ |
889 | 889 | private void reportActivityAndRegister(SessionInfoProto sessionInfo) { |
890 | 890 | if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { |
891 | - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo)); | |
891 | + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); | |
892 | 892 | this.reportActivitySubscription(sessionInfo); |
893 | 893 | } |
894 | 894 | } | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.databind.JsonNode; |
19 | 19 | import com.google.gson.JsonParseException; |
20 | +import io.netty.channel.ChannelFuture; | |
20 | 21 | import io.netty.channel.ChannelHandlerContext; |
21 | 22 | import io.netty.channel.ChannelInboundHandlerAdapter; |
22 | 23 | import io.netty.handler.codec.mqtt.MqttConnAckMessage; |
... | ... | @@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile; |
47 | 48 | import org.thingsboard.server.common.data.DeviceTransportType; |
48 | 49 | import org.thingsboard.server.common.data.TransportPayloadType; |
49 | 50 | import org.thingsboard.server.common.data.device.profile.MqttTopics; |
50 | -import org.thingsboard.server.common.data.ota.OtaPackageType; | |
51 | 51 | import org.thingsboard.server.common.data.id.OtaPackageId; |
52 | +import org.thingsboard.server.common.data.ota.OtaPackageType; | |
53 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
52 | 54 | import org.thingsboard.server.common.msg.EncryptionUtil; |
53 | 55 | import org.thingsboard.server.common.msg.tools.TbRateLimitsException; |
54 | 56 | import org.thingsboard.server.common.transport.SessionMsgListener; |
... | ... | @@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
813 | 815 | public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { |
814 | 816 | log.trace("[{}] Received RPC command to device", sessionId); |
815 | 817 | try { |
816 | - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); | |
818 | + deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest) | |
819 | + .ifPresent(payload -> { | |
820 | + ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload); | |
821 | + if (rpcRequest.getPersisted()) { | |
822 | + channelFuture.addListener(future -> { | |
823 | + RpcStatus status; | |
824 | + Throwable t = future.cause(); | |
825 | + if (t != null) { | |
826 | + log.error("Failed delivering RPC command to device!", t); | |
827 | + status = RpcStatus.FAILED; | |
828 | + } else if (rpcRequest.getOneway()) { | |
829 | + status = RpcStatus.SUCCESSFUL; | |
830 | + } else { | |
831 | + status = RpcStatus.DELIVERED; | |
832 | + } | |
833 | + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
834 | + .setRequestId(rpcRequest.getRequestId()) | |
835 | + .setRequestIdLSB(rpcRequest.getRequestIdLSB()) | |
836 | + .setRequestIdMSB(rpcRequest.getRequestIdMSB()) | |
837 | + .setStatus(status.name()) | |
838 | + .build(); | |
839 | + transportService.process(deviceSessionCtx.getSessionInfo(), msg, TransportServiceCallback.EMPTY); | |
840 | + }); | |
841 | + } | |
842 | + }); | |
817 | 843 | } catch (Exception e) { |
818 | 844 | log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); |
819 | 845 | } | ... | ... |
... | ... | @@ -15,9 +15,13 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.mqtt.session; |
17 | 17 | |
18 | +import io.netty.channel.ChannelFuture; | |
18 | 19 | import lombok.extern.slf4j.Slf4j; |
19 | 20 | import org.thingsboard.server.common.data.DeviceProfile; |
21 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
20 | 22 | import org.thingsboard.server.common.transport.SessionMsgListener; |
23 | +import org.thingsboard.server.common.transport.TransportService; | |
24 | +import org.thingsboard.server.common.transport.TransportServiceCallback; | |
21 | 25 | import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; |
22 | 26 | import org.thingsboard.server.gen.transport.TransportProtos; |
23 | 27 | import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; |
... | ... | @@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap; |
32 | 36 | public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { |
33 | 37 | |
34 | 38 | private final GatewaySessionHandler parent; |
39 | + private final TransportService transportService; | |
35 | 40 | |
36 | 41 | public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, |
37 | - DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { | |
42 | + DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, | |
43 | + TransportService transportService) { | |
38 | 44 | super(UUID.randomUUID(), mqttQoSMap); |
39 | 45 | this.parent = parent; |
40 | 46 | setSessionInfo(SessionInfoProto.newBuilder() |
... | ... | @@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple |
56 | 62 | .build()); |
57 | 63 | setDeviceInfo(deviceInfo); |
58 | 64 | setDeviceProfile(deviceProfile); |
65 | + this.transportService = transportService; | |
59 | 66 | } |
60 | 67 | |
61 | 68 | @Override |
... | ... | @@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple |
89 | 96 | @Override |
90 | 97 | public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { |
91 | 98 | try { |
92 | - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush); | |
99 | + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( | |
100 | + payload -> { | |
101 | + ChannelFuture channelFuture = parent.writeAndFlush(payload); | |
102 | + if (request.getPersisted()) { | |
103 | + channelFuture.addListener(future -> { | |
104 | + RpcStatus status; | |
105 | + Throwable t = future.cause(); | |
106 | + if (t != null) { | |
107 | + log.error("Failed delivering RPC command to device!", t); | |
108 | + status = RpcStatus.FAILED; | |
109 | + } else if (request.getOneway()) { | |
110 | + status = RpcStatus.SUCCESSFUL; | |
111 | + } else { | |
112 | + status = RpcStatus.DELIVERED; | |
113 | + } | |
114 | + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
115 | + .setRequestId(request.getRequestId()) | |
116 | + .setRequestIdLSB(request.getRequestIdLSB()) | |
117 | + .setRequestIdMSB(request.getRequestIdMSB()) | |
118 | + .setStatus(status.name()) | |
119 | + .build(); | |
120 | + transportService.process(getSessionInfo(), msg, TransportServiceCallback.EMPTY); | |
121 | + }); | |
122 | + } | |
123 | + } | |
124 | + ); | |
93 | 125 | } catch (Exception e) { |
94 | 126 | log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); |
95 | 127 | } | ... | ... |
... | ... | @@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException; |
28 | 28 | import com.google.protobuf.InvalidProtocolBufferException; |
29 | 29 | import com.google.protobuf.ProtocolStringList; |
30 | 30 | import io.netty.buffer.ByteBuf; |
31 | +import io.netty.channel.ChannelFuture; | |
31 | 32 | import io.netty.channel.ChannelHandlerContext; |
32 | 33 | import io.netty.handler.codec.mqtt.MqttMessage; |
33 | 34 | import io.netty.handler.codec.mqtt.MqttPublishMessage; |
... | ... | @@ -188,8 +189,8 @@ public class GatewaySessionHandler { |
188 | 189 | } |
189 | 190 | } |
190 | 191 | |
191 | - void writeAndFlush(MqttMessage mqttMessage) { | |
192 | - channel.writeAndFlush(mqttMessage); | |
192 | + ChannelFuture writeAndFlush(MqttMessage mqttMessage) { | |
193 | + return channel.writeAndFlush(mqttMessage); | |
193 | 194 | } |
194 | 195 | |
195 | 196 | int nextMsgId() { |
... | ... | @@ -251,7 +252,7 @@ public class GatewaySessionHandler { |
251 | 252 | new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() { |
252 | 253 | @Override |
253 | 254 | public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { |
254 | - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap); | |
255 | + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); | |
255 | 256 | if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { |
256 | 257 | log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); |
257 | 258 | SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); | ... | ... |
... | ... | @@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile; |
26 | 26 | import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; |
27 | 27 | import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; |
28 | 28 | import org.thingsboard.server.common.data.id.DeviceId; |
29 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
29 | 30 | import org.thingsboard.server.common.transport.SessionMsgListener; |
31 | +import org.thingsboard.server.common.transport.TransportServiceCallback; | |
30 | 32 | import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; |
31 | 33 | import org.thingsboard.server.gen.transport.TransportProtos; |
32 | 34 | import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; |
... | ... | @@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S |
139 | 141 | @Override |
140 | 142 | public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { |
141 | 143 | snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); |
144 | + if (toDeviceRequest.getPersisted()) { | |
145 | + RpcStatus status; | |
146 | + if (toDeviceRequest.getOneway()) { | |
147 | + status = RpcStatus.SUCCESSFUL; | |
148 | + } else { | |
149 | + status = RpcStatus.DELIVERED; | |
150 | + } | |
151 | + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() | |
152 | + .setRequestId(toDeviceRequest.getRequestId()) | |
153 | + .setRequestIdLSB(toDeviceRequest.getRequestIdLSB()) | |
154 | + .setRequestIdMSB(toDeviceRequest.getRequestIdMSB()) | |
155 | + .setStatus(status.name()) | |
156 | + .build(); | |
157 | + snmpTransportContext.getTransportService().process(getSessionInfo(), responseMsg, TransportServiceCallback.EMPTY); | |
158 | + } | |
142 | 159 | } |
143 | 160 | |
144 | 161 | @Override | ... | ... |
... | ... | @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType; |
20 | 20 | import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; |
21 | 21 | import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; |
22 | 22 | import org.thingsboard.server.common.transport.service.SessionMetaData; |
23 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
23 | 24 | import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; |
24 | 25 | import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; |
25 | 26 | import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; |
... | ... | @@ -109,6 +110,8 @@ public interface TransportService { |
109 | 110 | |
110 | 111 | void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback); |
111 | 112 | |
113 | + void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback); | |
114 | + | |
112 | 115 | void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback); |
113 | 116 | |
114 | 117 | void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback); | ... | ... |
... | ... | @@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService { |
557 | 557 | } |
558 | 558 | } |
559 | 559 | |
560 | + @Override | |
561 | + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback) { | |
562 | + if (checkLimits(sessionInfo, msg, callback)) { | |
563 | + reportActivityInternal(sessionInfo); | |
564 | + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(msg).build(), | |
565 | + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); | |
566 | + } | |
567 | + } | |
568 | + | |
560 | 569 | private void processTimeout(String requestId) { |
561 | 570 | RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); |
562 | 571 | if (data != null) { | ... | ... |
... | ... | @@ -507,6 +507,17 @@ public class ModelConstants { |
507 | 507 | public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY; |
508 | 508 | |
509 | 509 | /** |
510 | + * Persisted RPC constants. | |
511 | + */ | |
512 | + public static final String RPC_TABLE_NAME = "rpc"; | |
513 | + public static final String RPC_TENANT_ID_COLUMN = TENANT_ID_COLUMN; | |
514 | + public static final String RPC_DEVICE_ID = "device_id"; | |
515 | + public static final String RPC_EXPIRATION_TIME = "expiration_time"; | |
516 | + public static final String RPC_REQUEST = "request"; | |
517 | + public static final String RPC_RESPONSE = "response"; | |
518 | + public static final String RPC_STATUS = "status"; | |
519 | + | |
520 | + /** | |
510 | 521 | * Edge constants. |
511 | 522 | */ |
512 | 523 | public static final String EDGE_COLUMN_FAMILY_NAME = "edge"; | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.model.sql; | |
17 | + | |
18 | +import com.fasterxml.jackson.databind.JsonNode; | |
19 | +import lombok.Data; | |
20 | +import lombok.EqualsAndHashCode; | |
21 | +import org.hibernate.annotations.Type; | |
22 | +import org.hibernate.annotations.TypeDef; | |
23 | +import org.thingsboard.server.common.data.id.DeviceId; | |
24 | +import org.thingsboard.server.common.data.id.RpcId; | |
25 | +import org.thingsboard.server.common.data.id.TenantId; | |
26 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
27 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
28 | +import org.thingsboard.server.dao.model.BaseEntity; | |
29 | +import org.thingsboard.server.dao.model.BaseSqlEntity; | |
30 | +import org.thingsboard.server.dao.util.mapping.JsonStringType; | |
31 | + | |
32 | +import javax.persistence.Column; | |
33 | +import javax.persistence.Entity; | |
34 | +import javax.persistence.EnumType; | |
35 | +import javax.persistence.Enumerated; | |
36 | +import javax.persistence.Table; | |
37 | +import java.util.UUID; | |
38 | + | |
39 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID; | |
40 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME; | |
41 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST; | |
42 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_RESPONSE; | |
43 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_STATUS; | |
44 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TABLE_NAME; | |
45 | +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TENANT_ID_COLUMN; | |
46 | + | |
47 | +@Data | |
48 | +@EqualsAndHashCode(callSuper = true) | |
49 | +@Entity | |
50 | +@TypeDef(name = "json", typeClass = JsonStringType.class) | |
51 | +@Table(name = RPC_TABLE_NAME) | |
52 | +public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> { | |
53 | + | |
54 | + @Column(name = RPC_TENANT_ID_COLUMN) | |
55 | + private UUID tenantId; | |
56 | + | |
57 | + @Column(name = RPC_DEVICE_ID) | |
58 | + private UUID deviceId; | |
59 | + | |
60 | + @Column(name = RPC_EXPIRATION_TIME) | |
61 | + private long expirationTime; | |
62 | + | |
63 | + @Type(type = "json") | |
64 | + @Column(name = RPC_REQUEST) | |
65 | + private JsonNode request; | |
66 | + | |
67 | + @Type(type = "json") | |
68 | + @Column(name = RPC_RESPONSE) | |
69 | + private JsonNode response; | |
70 | + | |
71 | + @Enumerated(EnumType.STRING) | |
72 | + @Column(name = RPC_STATUS) | |
73 | + private RpcStatus status; | |
74 | + | |
75 | + public RpcEntity() { | |
76 | + super(); | |
77 | + } | |
78 | + | |
79 | + public RpcEntity(Rpc rpc) { | |
80 | + this.setUuid(rpc.getUuidId()); | |
81 | + this.createdTime = rpc.getCreatedTime(); | |
82 | + this.tenantId = rpc.getTenantId().getId(); | |
83 | + this.deviceId = rpc.getDeviceId().getId(); | |
84 | + this.expirationTime = rpc.getExpirationTime(); | |
85 | + this.request = rpc.getRequest(); | |
86 | + this.response = rpc.getResponse(); | |
87 | + this.status = rpc.getStatus(); | |
88 | + } | |
89 | + | |
90 | + @Override | |
91 | + public Rpc toData() { | |
92 | + Rpc rpc = new Rpc(new RpcId(id)); | |
93 | + rpc.setCreatedTime(createdTime); | |
94 | + rpc.setTenantId(new TenantId(tenantId)); | |
95 | + rpc.setDeviceId(new DeviceId(deviceId)); | |
96 | + rpc.setExpirationTime(expirationTime); | |
97 | + rpc.setRequest(request); | |
98 | + rpc.setResponse(response); | |
99 | + rpc.setStatus(status); | |
100 | + return rpc; | |
101 | + } | |
102 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.rpc; | |
17 | + | |
18 | +import com.google.common.util.concurrent.ListenableFuture; | |
19 | +import lombok.RequiredArgsConstructor; | |
20 | +import lombok.extern.slf4j.Slf4j; | |
21 | +import org.springframework.stereotype.Service; | |
22 | +import org.thingsboard.server.common.data.id.DeviceId; | |
23 | +import org.thingsboard.server.common.data.id.RpcId; | |
24 | +import org.thingsboard.server.common.data.id.TenantId; | |
25 | +import org.thingsboard.server.common.data.page.PageData; | |
26 | +import org.thingsboard.server.common.data.page.PageLink; | |
27 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
28 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
29 | +import org.thingsboard.server.dao.service.PaginatedRemover; | |
30 | + | |
31 | +import static org.thingsboard.server.dao.service.Validator.validateId; | |
32 | +import static org.thingsboard.server.dao.service.Validator.validatePageLink; | |
33 | + | |
34 | +@Service | |
35 | +@Slf4j | |
36 | +@RequiredArgsConstructor | |
37 | +public class BaseRpcService implements RpcService { | |
38 | + public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; | |
39 | + public static final String INCORRECT_RPC_ID = "Incorrect rpcId "; | |
40 | + | |
41 | + private final RpcDao rpcDao; | |
42 | + | |
43 | + @Override | |
44 | + public Rpc save(Rpc rpc) { | |
45 | + log.trace("Executing save, [{}]", rpc); | |
46 | + return rpcDao.save(rpc.getTenantId(), rpc); | |
47 | + } | |
48 | + | |
49 | + @Override | |
50 | + public void deleteRpc(TenantId tenantId, RpcId rpcId) { | |
51 | + log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId); | |
52 | + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
53 | + validateId(rpcId, INCORRECT_RPC_ID + rpcId); | |
54 | + rpcDao.removeById(tenantId, rpcId.getId()); | |
55 | + } | |
56 | + | |
57 | + @Override | |
58 | + public void deleteAllRpcByTenantId(TenantId tenantId) { | |
59 | + log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId); | |
60 | + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
61 | + tenantRpcRemover.removeEntities(tenantId, tenantId); | |
62 | + } | |
63 | + | |
64 | + @Override | |
65 | + public Rpc findById(TenantId tenantId, RpcId rpcId) { | |
66 | + log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId); | |
67 | + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
68 | + validateId(rpcId, INCORRECT_RPC_ID + rpcId); | |
69 | + return rpcDao.findById(tenantId, rpcId.getId()); | |
70 | + } | |
71 | + | |
72 | + @Override | |
73 | + public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId rpcId) { | |
74 | + log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId); | |
75 | + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
76 | + validateId(rpcId, INCORRECT_RPC_ID + rpcId); | |
77 | + return rpcDao.findByIdAsync(tenantId, rpcId.getId()); | |
78 | + } | |
79 | + | |
80 | + @Override | |
81 | + public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { | |
82 | + log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink); | |
83 | + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); | |
84 | + validatePageLink(pageLink); | |
85 | + return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink); | |
86 | + } | |
87 | + | |
88 | + private PaginatedRemover<TenantId, Rpc> tenantRpcRemover = | |
89 | + new PaginatedRemover<>() { | |
90 | + @Override | |
91 | + protected PageData<Rpc> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { | |
92 | + return rpcDao.findAllRpcByTenantId(id, pageLink); | |
93 | + } | |
94 | + | |
95 | + @Override | |
96 | + protected void removeEntity(TenantId tenantId, Rpc entity) { | |
97 | + deleteRpc(tenantId, entity.getId()); | |
98 | + } | |
99 | + }; | |
100 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.rpc; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.id.DeviceId; | |
19 | +import org.thingsboard.server.common.data.id.TenantId; | |
20 | +import org.thingsboard.server.common.data.page.PageData; | |
21 | +import org.thingsboard.server.common.data.page.PageLink; | |
22 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
23 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
24 | +import org.thingsboard.server.dao.Dao; | |
25 | + | |
26 | +public interface RpcDao extends Dao<Rpc> { | |
27 | + PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); | |
28 | + | |
29 | + PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink); | |
30 | + | |
31 | + Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime); | |
32 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.sql.rpc; | |
17 | + | |
18 | +import lombok.AllArgsConstructor; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.springframework.data.repository.CrudRepository; | |
21 | +import org.springframework.stereotype.Component; | |
22 | +import org.thingsboard.server.common.data.id.DeviceId; | |
23 | +import org.thingsboard.server.common.data.id.TenantId; | |
24 | +import org.thingsboard.server.common.data.page.PageData; | |
25 | +import org.thingsboard.server.common.data.page.PageLink; | |
26 | +import org.thingsboard.server.common.data.rpc.Rpc; | |
27 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
28 | +import org.thingsboard.server.dao.DaoUtil; | |
29 | +import org.thingsboard.server.dao.model.sql.RpcEntity; | |
30 | +import org.thingsboard.server.dao.rpc.RpcDao; | |
31 | +import org.thingsboard.server.dao.sql.JpaAbstractDao; | |
32 | + | |
33 | +import java.util.UUID; | |
34 | + | |
35 | +@Slf4j | |
36 | +@Component | |
37 | +@AllArgsConstructor | |
38 | +public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao { | |
39 | + | |
40 | + private final RpcRepository rpcRepository; | |
41 | + | |
42 | + @Override | |
43 | + protected Class<RpcEntity> getEntityClass() { | |
44 | + return RpcEntity.class; | |
45 | + } | |
46 | + | |
47 | + @Override | |
48 | + protected CrudRepository<RpcEntity, UUID> getCrudRepository() { | |
49 | + return rpcRepository; | |
50 | + } | |
51 | + | |
52 | + @Override | |
53 | + public PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { | |
54 | + return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); | |
55 | + } | |
56 | + | |
57 | + @Override | |
58 | + public PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) { | |
59 | + return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink))); | |
60 | + } | |
61 | + | |
62 | + @Override | |
63 | + public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) { | |
64 | + return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime); | |
65 | + } | |
66 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.sql.rpc; | |
17 | + | |
18 | +import org.springframework.data.domain.Page; | |
19 | +import org.springframework.data.domain.Pageable; | |
20 | +import org.springframework.data.jpa.repository.Query; | |
21 | +import org.springframework.data.repository.CrudRepository; | |
22 | +import org.springframework.data.repository.query.Param; | |
23 | +import org.thingsboard.server.common.data.rpc.RpcStatus; | |
24 | +import org.thingsboard.server.dao.model.sql.RpcEntity; | |
25 | + | |
26 | +import java.util.UUID; | |
27 | + | |
28 | +public interface RpcRepository extends CrudRepository<RpcEntity, UUID> { | |
29 | + Page<RpcEntity> findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable); | |
30 | + | |
31 | + Page<RpcEntity> findAllByTenantId(UUID tenantId, Pageable pageable); | |
32 | + | |
33 | + @Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted", | |
34 | + nativeQuery = true) | |
35 | + Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime); | |
36 | +} | ... | ... |
... | ... | @@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; |
37 | 37 | import org.thingsboard.server.dao.exception.DataValidationException; |
38 | 38 | import org.thingsboard.server.dao.ota.OtaPackageService; |
39 | 39 | import org.thingsboard.server.dao.resource.ResourceService; |
40 | +import org.thingsboard.server.dao.rpc.RpcService; | |
40 | 41 | import org.thingsboard.server.dao.rule.RuleChainService; |
41 | 42 | import org.thingsboard.server.dao.service.DataValidator; |
42 | 43 | import org.thingsboard.server.dao.service.PaginatedRemover; |
... | ... | @@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe |
96 | 97 | @Autowired |
97 | 98 | private OtaPackageService otaPackageService; |
98 | 99 | |
100 | + @Autowired | |
101 | + private RpcService rpcService; | |
102 | + | |
99 | 103 | @Override |
100 | 104 | public Tenant findTenantById(TenantId tenantId) { |
101 | 105 | log.trace("Executing findTenantById [{}]", tenantId); |
... | ... | @@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe |
151 | 155 | apiUsageStateService.deleteApiUsageStateByTenantId(tenantId); |
152 | 156 | resourceService.deleteResourcesByTenantId(tenantId); |
153 | 157 | otaPackageService.deleteOtaPackagesByTenantId(tenantId); |
158 | + rpcService.deleteAllRpcByTenantId(tenantId); | |
154 | 159 | tenantDao.removeById(tenantId, tenantId.getId()); |
155 | 160 | deleteEntityRelations(tenantId, tenantId); |
156 | 161 | } | ... | ... |
... | ... | @@ -567,3 +567,14 @@ CREATE TABLE IF NOT EXISTS edge_event ( |
567 | 567 | tenant_id uuid, |
568 | 568 | ts bigint NOT NULL |
569 | 569 | ); |
570 | + | |
571 | +CREATE TABLE IF NOT EXISTS rpc ( | |
572 | + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, | |
573 | + created_time bigint NOT NULL, | |
574 | + tenant_id uuid NOT NULL, | |
575 | + device_id uuid NOT NULL, | |
576 | + expiration_time bigint NOT NULL, | |
577 | + request varchar(10000000) NOT NULL, | |
578 | + response varchar(10000000), | |
579 | + status varchar(255) NOT NULL | |
580 | +); | ... | ... |
... | ... | @@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu |
46 | 46 | |
47 | 47 | CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); |
48 | 48 | |
49 | +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); | ... | ... |
... | ... | @@ -605,6 +605,17 @@ CREATE TABLE IF NOT EXISTS edge_event ( |
605 | 605 | ts bigint NOT NULL |
606 | 606 | ); |
607 | 607 | |
608 | +CREATE TABLE IF NOT EXISTS rpc ( | |
609 | + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, | |
610 | + created_time bigint NOT NULL, | |
611 | + tenant_id uuid NOT NULL, | |
612 | + device_id uuid NOT NULL, | |
613 | + expiration_time bigint NOT NULL, | |
614 | + request varchar(10000000) NOT NULL, | |
615 | + response varchar(10000000), | |
616 | + status varchar(255) NOT NULL | |
617 | +); | |
618 | + | |
608 | 619 | CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint) |
609 | 620 | LANGUAGE plpgsql AS |
610 | 621 | $$ | ... | ... |
... | ... | @@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest { |
35 | 35 | private final UUID requestUUID; |
36 | 36 | private final String originServiceId; |
37 | 37 | private final boolean oneway; |
38 | + private final boolean persisted; | |
38 | 39 | private final String method; |
39 | 40 | private final String body; |
40 | 41 | private final long expirationTime; | ... | ... |
... | ... | @@ -33,8 +33,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; |
33 | 33 | type = ComponentType.FILTER, |
34 | 34 | name = "message type switch", |
35 | 35 | configClazz = EmptyNodeConfiguration.class, |
36 | - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event", | |
37 | - "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", | |
36 | + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", | |
37 | + "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", | |
38 | 38 | "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", |
39 | 39 | "Timeseries Updated", "Timeseries Deleted"}, |
40 | 40 | nodeDescription = "Route incoming messages by Message Type", |
... | ... | @@ -95,6 +95,16 @@ public class TbMsgTypeSwitchNode implements TbNode { |
95 | 95 | relationType = "Timeseries Updated"; |
96 | 96 | } else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) { |
97 | 97 | relationType = "Timeseries Deleted"; |
98 | + } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { | |
99 | + relationType = "RPC Queued"; | |
100 | + } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { | |
101 | + relationType = "RPC Delivered"; | |
102 | + } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { | |
103 | + relationType = "RPC Successful"; | |
104 | + } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { | |
105 | + relationType = "RPC Timeout"; | |
106 | + } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { | |
107 | + relationType = "RPC Failed"; | |
98 | 108 | } else { |
99 | 109 | relationType = "Other"; |
100 | 110 | } | ... | ... |
... | ... | @@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode { |
81 | 81 | tmp = msg.getMetaData().getValue("oneway"); |
82 | 82 | boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); |
83 | 83 | |
84 | + tmp = msg.getMetaData().getValue("persisted"); | |
85 | + boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); | |
86 | + | |
84 | 87 | tmp = msg.getMetaData().getValue("requestUUID"); |
85 | 88 | UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased(); |
86 | 89 | tmp = msg.getMetaData().getValue("originServiceId"); |
... | ... | @@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode { |
108 | 111 | .originServiceId(originServiceId) |
109 | 112 | .expirationTime(expirationTime) |
110 | 113 | .restApiCall(restApiCall) |
114 | + .persisted(persisted) | |
111 | 115 | .build(); |
112 | 116 | |
113 | 117 | ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { | ... | ... |
... | ... | @@ -197,6 +197,18 @@ |
197 | 197 | </mat-error> |
198 | 198 | </mat-form-field> |
199 | 199 | <mat-form-field class="mat-block"> |
200 | + <mat-label translate>tenant-profile.rpc-ttl-days</mat-label> | |
201 | + <input matInput required min="0" step="1" | |
202 | + formControlName="rpcTtlDays" | |
203 | + type="number"> | |
204 | + <mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('required')"> | |
205 | + {{ 'tenant-profile.rpc-ttl-days-required' | translate}} | |
206 | + </mat-error> | |
207 | + <mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('min')"> | |
208 | + {{ 'tenant-profile.rpc-ttl-days-days-range' | translate}} | |
209 | + </mat-error> | |
210 | + </mat-form-field> | |
211 | + <mat-form-field class="mat-block"> | |
200 | 212 | <mat-label translate>tenant-profile.max-rule-node-executions-per-message</mat-label> |
201 | 213 | <input matInput required min="0" step="1" |
202 | 214 | formControlName="maxRuleNodeExecutionsPerMessage" | ... | ... |
... | ... | @@ -77,7 +77,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA |
77 | 77 | maxSms: [null, [Validators.required, Validators.min(0)]], |
78 | 78 | maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]], |
79 | 79 | defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]], |
80 | - alarmsTtlDays: [null, [Validators.required, Validators.min(0)]] | |
80 | + alarmsTtlDays: [null, [Validators.required, Validators.min(0)]], | |
81 | + rpcTtlDays: [null, [Validators.required, Validators.min(0)]] | |
81 | 82 | }); |
82 | 83 | this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => { |
83 | 84 | this.updateModel(); | ... | ... |
... | ... | @@ -352,7 +352,12 @@ export enum MessageType { |
352 | 352 | ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED', |
353 | 353 | ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED', |
354 | 354 | TIMESERIES_UPDATED = 'TIMESERIES_UPDATED', |
355 | - TIMESERIES_DELETED = 'TIMESERIES_DELETED' | |
355 | + TIMESERIES_DELETED = 'TIMESERIES_DELETED', | |
356 | + RPC_QUEUED = 'RPC_QUEUED', | |
357 | + RPC_DELIVERED = 'RPC_DELIVERED', | |
358 | + RPC_SUCCESSFUL = 'RPC_SUCCESSFUL', | |
359 | + RPC_TIMEOUT = 'RPC_TIMEOUT', | |
360 | + RPC_FAILED = 'RPC_FAILED' | |
356 | 361 | } |
357 | 362 | |
358 | 363 | export const messageTypeNames = new Map<MessageType, string>( |
... | ... | @@ -373,7 +378,12 @@ export const messageTypeNames = new Map<MessageType, string>( |
373 | 378 | [MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'], |
374 | 379 | [MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'], |
375 | 380 | [MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'], |
376 | - [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'] | |
381 | + [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'], | |
382 | + [MessageType.RPC_QUEUED, 'RPC Queued'], | |
383 | + [MessageType.RPC_DELIVERED, 'RPC Delivered'], | |
384 | + [MessageType.RPC_SUCCESSFUL, 'RPC Successful'], | |
385 | + [MessageType.RPC_TIMEOUT, 'RPC Timeout'], | |
386 | + [MessageType.RPC_FAILED, 'RPC Failed'] | |
377 | 387 | ] |
378 | 388 | ); |
379 | 389 | ... | ... |
... | ... | @@ -53,6 +53,7 @@ export interface DefaultTenantProfileConfiguration { |
53 | 53 | |
54 | 54 | defaultStorageTtlDays: number; |
55 | 55 | alarmsTtlDays: number; |
56 | + rpcTtlDays: number; | |
56 | 57 | } |
57 | 58 | |
58 | 59 | export type TenantProfileConfigurations = DefaultTenantProfileConfiguration; |
... | ... | @@ -85,7 +86,8 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan |
85 | 86 | maxSms: 0, |
86 | 87 | maxCreatedAlarms: 0, |
87 | 88 | defaultStorageTtlDays: 0, |
88 | - alarmsTtlDays: 0 | |
89 | + alarmsTtlDays: 0, | |
90 | + rpcTtlDays: 0 | |
89 | 91 | }; |
90 | 92 | configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; |
91 | 93 | break; | ... | ... |
... | ... | @@ -2638,6 +2638,9 @@ |
2638 | 2638 | "alarms-ttl-days": "Alarms TTL days (0 - unlimited)", |
2639 | 2639 | "alarms-ttl-days-required": "Alarms TTL days required", |
2640 | 2640 | "alarms-ttl-days-days-range": "Alarms TTL days can't be negative", |
2641 | + "rpc-ttl-days": "RPC TTL days (0 - unlimited)", | |
2642 | + "rpc-ttl-days-required": "RPC TTL days required", | |
2643 | + "rpc-ttl-days-days-range": "RPC TTL days can't be negative", | |
2641 | 2644 | "max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)", |
2642 | 2645 | "max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.", |
2643 | 2646 | "max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative", | ... | ... |