Commit 2a2441b248616cbc405f3fdc0530d6a8a6e148a1

Authored by YevhenBondarenko
1 parent 5d6ec0dd

used timeout from yml

@@ -199,7 +199,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -199,7 +199,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
199 log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId()); 199 log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
200 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); 200 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
201 sent = true; 201 sent = true;
202 - } else if (!rpcSequenceEnabled || toDeviceRpcPendingMap.isEmpty()) { 202 + } else if (isSendNewRpcAvailable()) {
203 sent = rpcSubscriptions.size() > 0; 203 sent = rpcSubscriptions.size() > 0;
204 Set<UUID> syncSessionSet = new HashSet<>(); 204 Set<UUID> syncSessionSet = new HashSet<>();
205 rpcSubscriptions.forEach((key, value) -> { 205 rpcSubscriptions.forEach((key, value) -> {
@@ -231,6 +231,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -231,6 +231,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
231 } 231 }
232 } 232 }
233 233
  234 + private boolean isSendNewRpcAvailable() {
  235 + if (rpcSequenceEnabled) {
  236 + for (ToDeviceRpcRequestMetadata rpc : toDeviceRpcPendingMap.values()) {
  237 + if (!rpc.isDelivered()) {
  238 + return false;
  239 + }
  240 + }
  241 + }
  242 +
  243 + return true;
  244 + }
  245 +
234 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { 246 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
235 Rpc rpc = new Rpc(new RpcId(request.getId())); 247 Rpc rpc = new Rpc(new RpcId(request.getId()));
236 rpc.setCreatedTime(System.currentTimeMillis()); 248 rpc.setCreatedTime(System.currentTimeMillis());
@@ -347,7 +359,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -347,7 +359,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
347 .setMethodName(body.getMethod()) 359 .setMethodName(body.getMethod())
348 .setParams(body.getParams()) 360 .setParams(body.getParams())
349 .setExpirationTime(request.getExpirationTime()) 361 .setExpirationTime(request.getExpirationTime())
350 - .setTimeout(request.getTimeout())  
351 .setRequestIdMSB(request.getId().getMostSignificantBits()) 362 .setRequestIdMSB(request.getId().getMostSignificantBits())
352 .setRequestIdLSB(request.getId().getLeastSignificantBits()) 363 .setRequestIdLSB(request.getId().getLeastSignificantBits())
353 .setOneway(request.isOneway()) 364 .setOneway(request.isOneway())
@@ -563,7 +574,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -563,7 +574,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
563 systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response); 574 systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
564 } 575 }
565 } finally { 576 } finally {
566 - if (!requestMd.isDelivered() && hasError) { 577 + if (hasError) {
567 sendNextPendingRequest(context); 578 sendNextPendingRequest(context);
568 } 579 }
569 } 580 }
@@ -88,7 +88,6 @@ public abstract class AbstractRpcController extends BaseController { @@ -88,7 +88,6 @@ public abstract class AbstractRpcController extends BaseController {
88 deviceId, 88 deviceId,
89 oneWay, 89 oneWay,
90 expTime, 90 expTime,
91 - timeout,  
92 body, 91 body,
93 persisted, 92 persisted,
94 additionalInfo 93 additionalInfo
@@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
101 @Override 101 @Override
102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { 102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), 103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
104 - src.isOneway(), src.getExpirationTime(), src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); 104 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo());
105 forwardRpcRequestToDeviceActor(request, response -> { 105 forwardRpcRequestToDeviceActor(request, response -> {
106 if (src.isRestApiCall()) { 106 if (src.isRestApiCall()) {
107 sendRpcResponseToTbCore(src.getOriginServiceId(), response); 107 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
@@ -334,7 +334,6 @@ message ToDeviceRpcRequestMsg { @@ -334,7 +334,6 @@ message ToDeviceRpcRequestMsg {
334 int64 requestIdLSB = 6; 334 int64 requestIdLSB = 6;
335 bool oneway = 7; 335 bool oneway = 7;
336 bool persisted = 8; 336 bool persisted = 8;
337 - int64 timeout = 9;  
338 } 337 }
339 338
340 message ToDeviceRpcResponseMsg { 339 message ToDeviceRpcResponseMsg {
@@ -87,9 +87,11 @@ public class DataConstants { @@ -87,9 +87,11 @@ public class DataConstants {
87 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; 87 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
88 88
89 public static final String RPC_QUEUED = "RPC_QUEUED"; 89 public static final String RPC_QUEUED = "RPC_QUEUED";
  90 + public static final String RPC_SENT = "RPC_SENT";
90 public static final String RPC_DELIVERED = "RPC_DELIVERED"; 91 public static final String RPC_DELIVERED = "RPC_DELIVERED";
91 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; 92 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
92 public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; 93 public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
  94 + public static final String RPC_EXPIRED = "RPC_EXPIRED";
93 public static final String RPC_FAILED = "RPC_FAILED"; 95 public static final String RPC_FAILED = "RPC_FAILED";
94 public static final String RPC_DELETED = "RPC_DELETED"; 96 public static final String RPC_DELETED = "RPC_DELETED";
95 97
@@ -34,7 +34,6 @@ public class ToDeviceRpcRequest implements Serializable { @@ -34,7 +34,6 @@ public class ToDeviceRpcRequest implements Serializable {
34 private final DeviceId deviceId; 34 private final DeviceId deviceId;
35 private final boolean oneway; 35 private final boolean oneway;
36 private final long expirationTime; 36 private final long expirationTime;
37 - private final long timeout;  
38 private final ToDeviceRpcRequestBody body; 37 private final ToDeviceRpcRequestBody body;
39 private final boolean persisted; 38 private final boolean persisted;
40 @JsonIgnore 39 @JsonIgnore
@@ -193,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -193,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
193 client.lock(); 193 client.lock();
194 try { 194 try {
195 long uplinkTime = client.updateLastUplinkTime(uplinkTs); 195 long uplinkTime = client.updateLastUplinkTime(uplinkTs);
196 - long timeout;  
197 - if (PowerMode.PSM.equals(powerMode)) {  
198 - Long psmActivityTimer = client.getPsmActivityTimer();  
199 - if (psmActivityTimer == null && profileSettings != null) {  
200 - psmActivityTimer = profileSettings.getPsmActivityTimer();  
201 -  
202 - }  
203 - if (psmActivityTimer == null || psmActivityTimer == 0L) {  
204 - psmActivityTimer = config.getPsmActivityTimer();  
205 - }  
206 -  
207 - timeout = psmActivityTimer;  
208 - } else {  
209 - Long pagingTransmissionWindow = client.getPagingTransmissionWindow();  
210 - if (pagingTransmissionWindow == null && profileSettings != null) {  
211 - pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();  
212 -  
213 - }  
214 - if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {  
215 - pagingTransmissionWindow = config.getPagingTransmissionWindow();  
216 - }  
217 - timeout = pagingTransmissionWindow;  
218 - } 196 + long timeout = getTimeout(client, powerMode, profileSettings);
219 Future<Void> sleepTask = client.getSleepTask(); 197 Future<Void> sleepTask = client.getSleepTask();
220 if (sleepTask != null) { 198 if (sleepTask != null) {
221 sleepTask.cancel(false); 199 sleepTask.cancel(false);
@@ -235,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -235,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext {
235 } 213 }
236 } 214 }
237 215
  216 + private long getTimeout(TbCoapClientState client, PowerMode powerMode, PowerSavingConfiguration profileSettings) {
  217 + long timeout;
  218 + if (PowerMode.PSM.equals(powerMode)) {
  219 + Long psmActivityTimer = client.getPsmActivityTimer();
  220 + if (psmActivityTimer == null && profileSettings != null) {
  221 + psmActivityTimer = profileSettings.getPsmActivityTimer();
  222 +
  223 + }
  224 + if (psmActivityTimer == null || psmActivityTimer == 0L) {
  225 + psmActivityTimer = config.getPsmActivityTimer();
  226 + }
  227 +
  228 + timeout = psmActivityTimer;
  229 + } else {
  230 + Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
  231 + if (pagingTransmissionWindow == null && profileSettings != null) {
  232 + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
  233 +
  234 + }
  235 + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
  236 + pagingTransmissionWindow = config.getPagingTransmissionWindow();
  237 + }
  238 + timeout = pagingTransmissionWindow;
  239 + }
  240 + return timeout;
  241 + }
  242 +
238 private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) { 243 private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) {
239 state.lock(); 244 state.lock();
240 try { 245 try {
@@ -526,13 +531,25 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -526,13 +531,25 @@ public class DefaultCoapClientContext implements CoapClientContext {
526 int requestId = getNextMsgId(); 531 int requestId = getNextMsgId();
527 response.setMID(requestId); 532 response.setMID(requestId);
528 if (conRequest) { 533 if (conRequest) {
  534 + PowerMode powerMode = state.getPowerMode();
  535 + PowerSavingConfiguration profileSettings = null;
  536 + if (powerMode == null) {
  537 + var clientProfile = getProfile(state.getProfileId());
  538 + if (clientProfile.isPresent()) {
  539 + profileSettings = clientProfile.get().getClientSettings();
  540 + if (profileSettings != null) {
  541 + powerMode = profileSettings.getPowerMode();
  542 + }
  543 + }
  544 + }
  545 +
529 transportContext.getRpcAwaitingAck().put(requestId, msg); 546 transportContext.getRpcAwaitingAck().put(requestId, msg);
530 transportContext.getScheduler().schedule(() -> { 547 transportContext.getScheduler().schedule(() -> {
531 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId); 548 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId);
532 if (rpcRequestMsg != null) { 549 if (rpcRequestMsg != null) {
533 transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); 550 transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
534 } 551 }
535 - }, Math.max(0, Math.min(msg.getTimeout(), msg.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); 552 + }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
536 553
537 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { 554 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
538 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); 555 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
@@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext { @@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext {
68 @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") 68 @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
69 private int messageQueueSizePerDeviceLimit; 69 private int messageQueueSizePerDeviceLimit;
70 70
  71 + @Getter
  72 + @Value("${transport.mqtt.timeout:10000}")
  73 + private long timeout;
71 } 74 }
@@ -857,7 +857,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -857,7 +857,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
857 if (msg != null) { 857 if (msg != null) {
858 transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); 858 transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
859 } 859 }
860 - }, Math.max(0, Math.min(rpcRequest.getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); 860 + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
861 } 861 }
862 var cf = publish(payload, deviceSessionCtx); 862 var cf = publish(payload, deviceSessionCtx);
863 cf.addListener(result -> { 863 cf.addListener(result -> {
@@ -39,7 +39,6 @@ public final class RuleEngineDeviceRpcRequest { @@ -39,7 +39,6 @@ public final class RuleEngineDeviceRpcRequest {
39 private final String method; 39 private final String method;
40 private final String body; 40 private final String body;
41 private final long expirationTime; 41 private final long expirationTime;
42 - private final long timeout;  
43 private final boolean restApiCall; 42 private final boolean restApiCall;
44 private final String additionalInfo; 43 private final String additionalInfo;
45 44
@@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
33 type = ComponentType.FILTER, 33 type = ComponentType.FILTER,
34 name = "message type switch", 34 name = "message type switch",
35 configClazz = EmptyNodeConfiguration.class, 35 configClazz = EmptyNodeConfiguration.class,
36 - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted", 36 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted",
37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", 37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", 38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
39 "Timeseries Updated", "Timeseries Deleted"}, 39 "Timeseries Updated", "Timeseries Deleted"},
@@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode { @@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode {
97 relationType = "Timeseries Deleted"; 97 relationType = "Timeseries Deleted";
98 } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { 98 } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
99 relationType = "RPC Queued"; 99 relationType = "RPC Queued";
  100 + } else if (msg.getType().equals(DataConstants.RPC_SENT)) {
  101 + relationType = "RPC Sent";
100 } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { 102 } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
101 relationType = "RPC Delivered"; 103 relationType = "RPC Delivered";
102 } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { 104 } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {
103 relationType = "RPC Successful"; 105 relationType = "RPC Successful";
104 } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { 106 } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) {
105 relationType = "RPC Timeout"; 107 relationType = "RPC Timeout";
  108 + } else if (msg.getType().equals(DataConstants.RPC_EXPIRED)) {
  109 + relationType = "RPC Expired";
106 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { 110 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
107 relationType = "RPC Failed"; 111 relationType = "RPC Failed";
108 } else if (msg.getType().equals(DataConstants.RPC_DELETED)) { 112 } else if (msg.getType().equals(DataConstants.RPC_DELETED)) {
@@ -92,9 +92,6 @@ public class TbSendRPCRequestNode implements TbNode { @@ -92,9 +92,6 @@ public class TbSendRPCRequestNode implements TbNode {
92 tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME); 92 tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME);
93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); 93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
94 94
95 - tmp = msg.getMetaData().getValue(DataConstants.TIMEOUT);  
96 - long timeout = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds());  
97 -  
98 String params; 95 String params;
99 JsonElement paramsEl = json.get("params"); 96 JsonElement paramsEl = json.get("params");
100 if (paramsEl.isJsonPrimitive()) { 97 if (paramsEl.isJsonPrimitive()) {
@@ -115,7 +112,6 @@ public class TbSendRPCRequestNode implements TbNode { @@ -115,7 +112,6 @@ public class TbSendRPCRequestNode implements TbNode {
115 .requestUUID(requestUUID) 112 .requestUUID(requestUUID)
116 .originServiceId(originServiceId) 113 .originServiceId(originServiceId)
117 .expirationTime(expirationTime) 114 .expirationTime(expirationTime)
118 - .timeout(timeout)  
119 .restApiCall(restApiCall) 115 .restApiCall(restApiCall)
120 .persisted(persisted) 116 .persisted(persisted)
121 .additionalInfo(additionalInfo) 117 .additionalInfo(additionalInfo)