Commit 53eb09bcb939ae28cbefefbbab5cb2dfe3d05434

Authored by Andrii Shvaika
1 parent d965df8c

Improvements to tests due to new architecture

@@ -249,7 +249,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -249,7 +249,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
249 if (relationsCount == 0) { 249 if (relationsCount == 0) {
250 log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); 250 log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
251 if (relationTypes.contains(TbRelationTypes.FAILURE)) { 251 if (relationTypes.contains(TbRelationTypes.FAILURE)) {
252 - RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId); 252 + RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
253 if (ruleNodeCtx != null) { 253 if (ruleNodeCtx != null) {
254 msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf())); 254 msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
255 } else { 255 } else {
@@ -327,6 +327,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -327,6 +327,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
327 private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) { 327 private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
328 if (nodeCtx != null) { 328 if (nodeCtx != null) {
329 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self); 329 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self);
  330 + } else {
  331 + log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
  332 + msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));
330 } 333 }
331 } 334 }
332 335
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId; @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
35 import org.thingsboard.server.common.data.id.RuleChainId; 35 import org.thingsboard.server.common.data.id.RuleChainId;
36 import org.thingsboard.server.common.data.id.TenantId; 36 import org.thingsboard.server.common.data.id.TenantId;
37 import org.thingsboard.server.common.data.rule.RuleChain; 37 import org.thingsboard.server.common.data.rule.RuleChain;
  38 +import org.thingsboard.server.common.msg.MsgType;
38 import org.thingsboard.server.common.msg.TbActorMsg; 39 import org.thingsboard.server.common.msg.TbActorMsg;
39 import org.thingsboard.server.common.msg.TbMsg; 40 import org.thingsboard.server.common.msg.TbMsg;
40 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; 41 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
@@ -73,24 +74,30 @@ public class TenantActor extends RuleChainManagerActor { @@ -73,24 +74,30 @@ public class TenantActor extends RuleChainManagerActor {
73 log.info("[{}] Starting tenant actor.", tenantId); 74 log.info("[{}] Starting tenant actor.", tenantId);
74 try { 75 try {
75 Tenant tenant = systemContext.getTenantService().findTenantById(tenantId); 76 Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
76 - // This Service may be started for specific tenant only.  
77 - Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant(); 77 + if (tenant == null) {
  78 + cantFindTenant = true;
  79 + log.info("[{}] Started tenant actor for missing tenant.", tenantId);
  80 + } else {
  81 + // This Service may be started for specific tenant only.
  82 + Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
78 83
79 - isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);  
80 - isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); 84 + isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
  85 + isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
81 86
82 - if (isRuleEngineForCurrentTenant) {  
83 - try {  
84 - if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {  
85 - initRuleChains();  
86 - } else {  
87 - isRuleEngineForCurrentTenant = false; 87 + if (isRuleEngineForCurrentTenant) {
  88 + try {
  89 + if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
  90 + log.info("[{}] Going to init rule chains", tenantId);
  91 + initRuleChains();
  92 + } else {
  93 + isRuleEngineForCurrentTenant = false;
  94 + }
  95 + } catch (Exception e) {
  96 + cantFindTenant = true;
88 } 97 }
89 - } catch (Exception e) {  
90 - cantFindTenant = true;  
91 } 98 }
  99 + log.info("[{}] Tenant actor started.", tenantId);
92 } 100 }
93 - log.info("[{}] Tenant actor started.", tenantId);  
94 } catch (Exception e) { 101 } catch (Exception e) {
95 log.warn("[{}] Unknown failure", tenantId, e); 102 log.warn("[{}] Unknown failure", tenantId, e);
96 } 103 }
@@ -104,7 +111,12 @@ public class TenantActor extends RuleChainManagerActor { @@ -104,7 +111,12 @@ public class TenantActor extends RuleChainManagerActor {
104 @Override 111 @Override
105 protected boolean process(TbActorMsg msg) { 112 protected boolean process(TbActorMsg msg) {
106 if (cantFindTenant) { 113 if (cantFindTenant) {
107 - log.info("Missing Tenant msg: {}", msg); 114 + log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
  115 + if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
  116 + QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
  117 + queueMsg.getTbMsg().getCallback().onSuccess();
  118 + }
  119 + return true;
108 } 120 }
109 switch (msg.getMsgType()) { 121 switch (msg.getMsgType()) {
110 case PARTITION_CHANGE_MSG: 122 case PARTITION_CHANGE_MSG:
@@ -344,7 +344,7 @@ public class TelemetryController extends BaseController { @@ -344,7 +344,7 @@ public class TelemetryController extends BaseController {
344 return deleteAttributes(entityId, scope, keysStr); 344 return deleteAttributes(entityId, scope, keysStr);
345 } 345 }
346 346
347 - private DeferredResult<ResponseEntity> deleteAttributes(EntityId entityIdStr, String scope, String keysStr) throws ThingsboardException { 347 + private DeferredResult<ResponseEntity> deleteAttributes(EntityId entityIdSrc, String scope, String keysStr) throws ThingsboardException {
348 List<String> keys = toKeysList(keysStr); 348 List<String> keys = toKeysList(keysStr);
349 if (keys.isEmpty()) { 349 if (keys.isEmpty()) {
350 return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); 350 return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
@@ -354,13 +354,13 @@ public class TelemetryController extends BaseController { @@ -354,13 +354,13 @@ public class TelemetryController extends BaseController {
354 if (DataConstants.SERVER_SCOPE.equals(scope) || 354 if (DataConstants.SERVER_SCOPE.equals(scope) ||
355 DataConstants.SHARED_SCOPE.equals(scope) || 355 DataConstants.SHARED_SCOPE.equals(scope) ||
356 DataConstants.CLIENT_SCOPE.equals(scope)) { 356 DataConstants.CLIENT_SCOPE.equals(scope)) {
357 - return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdStr, (result, tenantId, entityId) -> { 357 + return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> {
358 ListenableFuture<List<Void>> future = attributesService.removeAll(user.getTenantId(), entityId, scope, keys); 358 ListenableFuture<List<Void>> future = attributesService.removeAll(user.getTenantId(), entityId, scope, keys);
359 Futures.addCallback(future, new FutureCallback<List<Void>>() { 359 Futures.addCallback(future, new FutureCallback<List<Void>>() {
360 @Override 360 @Override
361 public void onSuccess(@Nullable List<Void> tmp) { 361 public void onSuccess(@Nullable List<Void> tmp) {
362 logAttributesDeleted(user, entityId, scope, keys, null); 362 logAttributesDeleted(user, entityId, scope, keys, null);
363 - if (entityId.getEntityType() == EntityType.DEVICE) { 363 + if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
364 DeviceId deviceId = new DeviceId(entityId.getId()); 364 DeviceId deviceId = new DeviceId(entityId.getId());
365 Set<AttributeKey> keysToNotify = new HashSet<>(); 365 Set<AttributeKey> keysToNotify = new HashSet<>();
366 keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key))); 366 keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key)));
@@ -397,7 +397,7 @@ public class TelemetryController extends BaseController { @@ -397,7 +397,7 @@ public class TelemetryController extends BaseController {
397 @Override 397 @Override
398 public void onSuccess(@Nullable Void tmp) { 398 public void onSuccess(@Nullable Void tmp) {
399 logAttributesUpdated(user, entityId, scope, attributes, null); 399 logAttributesUpdated(user, entityId, scope, attributes, null);
400 - if (entityId.getEntityType() == EntityType.DEVICE) { 400 + if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
401 DeviceId deviceId = new DeviceId(entityId.getId()); 401 DeviceId deviceId = new DeviceId(entityId.getId());
402 tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate( 402 tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(
403 user.getTenantId(), deviceId, scope, attributes), null); 403 user.getTenantId(), deviceId, scope, attributes), null);
@@ -87,6 +87,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -87,6 +87,7 @@ public class DefaultTbClusterService implements TbClusterService {
87 @Override 87 @Override
88 public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { 88 public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) {
89 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); 89 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId());
  90 + log.trace("PUSHING msg: {} to:{}", msg, tpi);
90 byte[] msgBytes = encodingService.encode(msg); 91 byte[] msgBytes = encodingService.encode(msg);
91 ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build(); 92 ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build();
92 producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); 93 producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback);
@@ -96,6 +97,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -96,6 +97,7 @@ public class DefaultTbClusterService implements TbClusterService {
96 @Override 97 @Override
97 public void pushNotificationToCore(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { 98 public void pushNotificationToCore(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) {
98 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); 99 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
  100 + log.trace("PUSHING msg: {} to:{}", response, tpi);
99 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() 101 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder()
100 .setRequestIdMSB(response.getId().getMostSignificantBits()) 102 .setRequestIdMSB(response.getId().getMostSignificantBits())
101 .setRequestIdLSB(response.getId().getLeastSignificantBits()) 103 .setRequestIdLSB(response.getId().getLeastSignificantBits())
@@ -108,6 +110,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -108,6 +110,7 @@ public class DefaultTbClusterService implements TbClusterService {
108 110
109 @Override 111 @Override
110 public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { 112 public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
  113 + log.trace("PUSHING msg: {} to:{}", msg, tpi);
111 producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); 114 producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
112 toRuleEngineMsgs.incrementAndGet(); 115 toRuleEngineMsgs.incrementAndGet();
113 } 116 }
@@ -123,6 +126,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -123,6 +126,7 @@ public class DefaultTbClusterService implements TbClusterService {
123 } 126 }
124 } 127 }
125 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); 128 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
  129 + log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
126 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() 130 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
127 .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) 131 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
128 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) 132 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
@@ -134,6 +138,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -134,6 +138,7 @@ public class DefaultTbClusterService implements TbClusterService {
134 @Override 138 @Override
135 public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { 139 public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) {
136 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); 140 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
  141 + log.trace("PUSHING msg: {} to:{}", response, tpi);
137 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() 142 FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder()
138 .setRequestIdMSB(response.getId().getMostSignificantBits()) 143 .setRequestIdMSB(response.getId().getMostSignificantBits())
139 .setRequestIdLSB(response.getId().getLeastSignificantBits()) 144 .setRequestIdLSB(response.getId().getLeastSignificantBits())
@@ -147,6 +152,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -147,6 +152,7 @@ public class DefaultTbClusterService implements TbClusterService {
147 @Override 152 @Override
148 public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) { 153 public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) {
149 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId); 154 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId);
  155 + log.trace("PUSHING msg: {} to:{}", response, tpi);
150 producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback); 156 producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback);
151 toTransportNfs.incrementAndGet(); 157 toTransportNfs.incrementAndGet();
152 } 158 }
@@ -174,7 +174,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -174,7 +174,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
174 })); 174 }));
175 175
176 boolean timeout = false; 176 boolean timeout = false;
177 - if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { 177 + if (!ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
178 timeout = true; 178 timeout = true;
179 } 179 }
180 180
@@ -17,6 +17,8 @@ package org.thingsboard.server.service.queue.processing; @@ -17,6 +17,8 @@ package org.thingsboard.server.service.queue.processing;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.stereotype.Component; 19 import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.TbMsg;
  21 +import org.thingsboard.server.common.msg.queue.TbMsgCallback;
20 import org.thingsboard.server.gen.transport.TransportProtos; 22 import org.thingsboard.server.gen.transport.TransportProtos;
21 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 23 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
22 import org.thingsboard.server.queue.settings.TbRuleEngineQueueAckStrategyConfiguration; 24 import org.thingsboard.server.queue.settings.TbRuleEngineQueueAckStrategyConfiguration;
@@ -32,7 +34,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -32,7 +34,7 @@ public class TbRuleEngineProcessingStrategyFactory {
32 34
33 public TbRuleEngineProcessingStrategy newInstance(String name, TbRuleEngineQueueAckStrategyConfiguration configuration) { 35 public TbRuleEngineProcessingStrategy newInstance(String name, TbRuleEngineQueueAckStrategyConfiguration configuration) {
34 switch (configuration.getType()) { 36 switch (configuration.getType()) {
35 - case "SKIP_ALL": 37 + case "SKIP_ALL_FAILURES":
36 return new SkipStrategy(name); 38 return new SkipStrategy(name);
37 case "RETRY_ALL": 39 case "RETRY_ALL":
38 return new RetryStrategy(name, true, true, true, configuration); 40 return new RetryStrategy(name, true, true, true, configuration);
@@ -98,7 +100,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -98,7 +100,7 @@ public class TbRuleEngineProcessingStrategyFactory {
98 } 100 }
99 log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); 101 log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
100 if (log.isTraceEnabled()) { 102 if (log.isTraceEnabled()) {
101 - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, msg.getValue())); 103 + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
102 } 104 }
103 if (pauseBetweenRetries > 0) { 105 if (pauseBetweenRetries > 0) {
104 try { 106 try {
@@ -123,7 +125,15 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -123,7 +125,15 @@ public class TbRuleEngineProcessingStrategyFactory {
123 125
124 @Override 126 @Override
125 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { 127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
126 - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); 128 + if (!result.isSuccess()) {
  129 + log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
  130 + }
  131 + if (log.isTraceEnabled()) {
  132 + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  133 + }
  134 + if (log.isTraceEnabled()) {
  135 + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  136 + }
127 return new TbRuleEngineProcessingDecision(true, null); 137 return new TbRuleEngineProcessingDecision(true, null);
128 } 138 }
129 } 139 }
@@ -106,7 +106,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -106,7 +106,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
106 106
107 @Override 107 @Override
108 public void processRpcResponseFromRuleEngine(FromDeviceRpcResponse response) { 108 public void processRpcResponseFromRuleEngine(FromDeviceRpcResponse response) {
109 - log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId()); 109 + log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId(), response);
110 UUID requestId = response.getId(); 110 UUID requestId = response.getId();
111 Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId); 111 Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
112 if (consumer != null) { 112 if (consumer != null) {
@@ -177,9 +177,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -177,9 +177,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
177 177
178 private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) { 178 private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) {
179 long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 179 long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
180 - log.trace("[{}] processing to rule engine request: [{}]", this.hashCode(), requestId); 180 + log.trace("[{}] processing to rule engine request.", requestId);
181 scheduler.schedule(() -> { 181 scheduler.schedule(() -> {
182 - log.trace("[{}] timeout for to rule engine request: [{}]", this.hashCode(), requestId); 182 + log.trace("[{}] timeout for processing to rule engine request.", requestId);
183 Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId); 183 Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
184 if (consumer != null) { 184 if (consumer != null) {
185 consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); 185 consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
@@ -189,9 +189,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -189,9 +189,9 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
189 189
190 private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) { 190 private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) {
191 long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 191 long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
192 - log.trace("[{}] processing to device request: [{}]", this.hashCode(), requestId); 192 + log.trace("[{}] processing to device request.", requestId);
193 scheduler.schedule(() -> { 193 scheduler.schedule(() -> {
194 - log.trace("[{}] timeout for to device request: [{}]", this.hashCode(), requestId); 194 + log.trace("[{}] timeout for to device request.", requestId);
195 localToDeviceRpcRequests.remove(requestId); 195 localToDeviceRpcRequests.remove(requestId);
196 }, timeout, TimeUnit.MILLISECONDS); 196 }, timeout, TimeUnit.MILLISECONDS);
197 } 197 }
@@ -622,7 +622,7 @@ queue: @@ -622,7 +622,7 @@ queue:
622 # For BATCH only 622 # For BATCH only
623 batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch 623 batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
624 processing-strategy: 624 processing-strategy:
625 - type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT 625 + type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
626 # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT 626 # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
627 retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited 627 retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
628 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; 628 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
@@ -37,6 +37,7 @@ import org.thingsboard.server.service.security.AccessValidator; @@ -37,6 +37,7 @@ import org.thingsboard.server.service.security.AccessValidator;
37 import java.util.Arrays; 37 import java.util.Arrays;
38 import java.util.concurrent.CountDownLatch; 38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeUnit;
  40 +import java.util.concurrent.atomic.AtomicInteger;
40 41
41 import static org.junit.Assert.assertEquals; 42 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull; 43 import static org.junit.Assert.assertNotNull;
@@ -55,6 +56,8 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -55,6 +56,8 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
55 private User tenantAdmin; 56 private User tenantAdmin;
56 private Long asyncContextTimeoutToUseRpcPlugin; 57 private Long asyncContextTimeoutToUseRpcPlugin;
57 58
  59 + private static final AtomicInteger atomicInteger = new AtomicInteger(2);
  60 +
58 61
59 @Before 62 @Before
60 public void beforeTest() throws Exception { 63 public void beforeTest() throws Exception {
@@ -70,7 +73,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -70,7 +73,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
70 tenantAdmin = new User(); 73 tenantAdmin = new User();
71 tenantAdmin.setAuthority(Authority.TENANT_ADMIN); 74 tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
72 tenantAdmin.setTenantId(savedTenant.getId()); 75 tenantAdmin.setTenantId(savedTenant.getId());
73 - tenantAdmin.setEmail("tenant2@thingsboard.org"); 76 + tenantAdmin.setEmail("tenant" + atomicInteger.getAndIncrement() + "@thingsboard.org");
74 tenantAdmin.setFirstName("Joe"); 77 tenantAdmin.setFirstName("Joe");
75 tenantAdmin.setLastName("Downs"); 78 tenantAdmin.setLastName("Downs");
76 79
@@ -130,7 +133,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -130,7 +133,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
130 String accessToken = deviceCredentials.getCredentialsId(); 133 String accessToken = deviceCredentials.getCredentialsId();
131 assertNotNull(accessToken); 134 assertNotNull(accessToken);
132 135
133 - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"timeout\": 6000}"; 136 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}";
134 String deviceId = savedDevice.getId().getId().toString(); 137 String deviceId = savedDevice.getId().getId().toString();
135 138
136 doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), 139 doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
@@ -139,7 +142,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -139,7 +142,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
139 142
140 @Test 143 @Test
141 public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception { 144 public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception {
142 - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 145 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"25\",\"value\": 1}}";
143 String nonExistentDeviceId = UUIDs.timeBased().toString(); 146 String nonExistentDeviceId = UUIDs.timeBased().toString();
144 147
145 String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, 148 String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class,
@@ -169,7 +172,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -169,7 +172,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
169 172
170 Thread.sleep(2000); 173 Thread.sleep(2000);
171 174
172 - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 175 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
173 String deviceId = savedDevice.getId().getId().toString(); 176 String deviceId = savedDevice.getId().getId().toString();
174 177
175 String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); 178 String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk());
@@ -187,7 +190,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -187,7 +190,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
187 String accessToken = deviceCredentials.getCredentialsId(); 190 String accessToken = deviceCredentials.getCredentialsId();
188 assertNotNull(accessToken); 191 assertNotNull(accessToken);
189 192
190 - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1},\"timeout\": 6000}"; 193 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}";
191 String deviceId = savedDevice.getId().getId().toString(); 194 String deviceId = savedDevice.getId().getId().toString();
192 195
193 doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), 196 doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
@@ -196,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -196,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
196 199
197 @Test 200 @Test
198 public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception { 201 public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception {
199 - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 202 + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"28\",\"value\": 1}}";
200 String nonExistentDeviceId = UUIDs.timeBased().toString(); 203 String nonExistentDeviceId = UUIDs.timeBased().toString();
201 204
202 String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class, 205 String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class,
@@ -107,13 +107,13 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -107,13 +107,13 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
107 CountDownLatch latch = new CountDownLatch(1); 107 CountDownLatch latch = new CountDownLatch(1);
108 TestMqttCallback callback = new TestMqttCallback(client, latch); 108 TestMqttCallback callback = new TestMqttCallback(client, latch);
109 client.setCallback(callback); 109 client.setCallback(callback);
110 - client.connect(options).waitForCompletion(3000); 110 + client.connect(options).waitForCompletion(5000);
111 client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); 111 client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
112 String payload = "{\"key\":\"value\"}"; 112 String payload = "{\"key\":\"value\"}";
113 // TODO 3.1: we need to acknowledge subscription only after it is processed by device actor and not when the message is pushed to queue. 113 // TODO 3.1: we need to acknowledge subscription only after it is processed by device actor and not when the message is pushed to queue.
114 // MqttClient -> SUB REQUEST -> Transport -> Kafka -> Device Actor (subscribed) 114 // MqttClient -> SUB REQUEST -> Transport -> Kafka -> Device Actor (subscribed)
115 // MqttClient <- SUB_ACK <- Transport 115 // MqttClient <- SUB_ACK <- Transport
116 - Thread.sleep(1000); 116 + Thread.sleep(5000);
117 doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); 117 doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
118 latch.await(10, TimeUnit.SECONDS); 118 latch.await(10, TimeUnit.SECONDS);
119 assertEquals(payload, callback.getPayload()); 119 assertEquals(payload, callback.getPayload());
@@ -152,7 +152,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -152,7 +152,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
152 QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); 152 QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null);
153 // Pushing Message to the system 153 // Pushing Message to the system
154 actorSystem.tell(qMsg, ActorRef.noSender()); 154 actorSystem.tell(qMsg, ActorRef.noSender());
155 - Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); 155 + Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess();
156 156
157 TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); 157 TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
158 List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); 158 List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
@@ -265,7 +265,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -265,7 +265,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
265 // Pushing Message to the system 265 // Pushing Message to the system
266 actorSystem.tell(qMsg, ActorRef.noSender()); 266 actorSystem.tell(qMsg, ActorRef.noSender());
267 267
268 - Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); 268 + Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess();
269 269
270 TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000); 270 TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
271 List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); 271 List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
@@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
7 </encoder> 7 </encoder>
8 </appender> 8 </appender>
9 9
10 - <logger name="org.thingsboard.server" level="WARN"/> 10 + <logger name="org.thingsboard.server" level="TRACE"/>
11 <logger name="org.springframework" level="WARN"/> 11 <logger name="org.springframework" level="WARN"/>
12 <logger name="org.springframework.boot.test" level="WARN"/> 12 <logger name="org.springframework.boot.test" level="WARN"/>
13 <logger name="org.apache.cassandra" level="WARN"/> 13 <logger name="org.apache.cassandra" level="WARN"/>
@@ -50,10 +50,10 @@ public final class InMemoryStorage { @@ -50,10 +50,10 @@ public final class InMemoryStorage {
50 return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); 50 return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
51 } 51 }
52 52
53 - public <T extends TbQueueMsg> List<T> get(String topic, long durationInMillis) throws InterruptedException { 53 + public <T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException {
54 if (storage.containsKey(topic)) { 54 if (storage.containsKey(topic)) {
55 List<T> entities; 55 List<T> entities;
56 - T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS); 56 + T first = (T) storage.get(topic).poll();
57 if (first != null) { 57 if (first != null) {
58 entities = new ArrayList<>(); 58 entities = new ArrayList<>();
59 entities.add(first); 59 entities.add(first);
@@ -68,7 +68,7 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon @@ -68,7 +68,7 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
68 .stream() 68 .stream()
69 .map(tpi -> { 69 .map(tpi -> {
70 try { 70 try {
71 - return storage.get(tpi.getFullTopicName(), durationInMillis); 71 + return storage.get(tpi.getFullTopicName());
72 } catch (InterruptedException e) { 72 } catch (InterruptedException e) {
73 if (!stopped) { 73 if (!stopped) {
74 log.error("Queue was interrupted.", e); 74 log.error("Queue was interrupted.", e);
@@ -24,7 +24,7 @@ public class TbRuleEngineQueueConfiguration { @@ -24,7 +24,7 @@ public class TbRuleEngineQueueConfiguration {
24 private String topic; 24 private String topic;
25 private int pollInterval; 25 private int pollInterval;
26 private int partitions; 26 private int partitions;
27 - private String packProcessingTimeout; 27 + private long packProcessingTimeout;
28 private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy; 28 private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy;
29 private TbRuleEngineQueueAckStrategyConfiguration processingStrategy; 29 private TbRuleEngineQueueAckStrategyConfiguration processingStrategy;
30 30
@@ -35,4 +35,12 @@ service.type=monolith @@ -35,4 +35,12 @@ service.type=monolith
35 #spring.datasource.password=postgres 35 #spring.datasource.password=postgres
36 #spring.datasource.url=jdbc:postgresql://localhost:5432/sqltest 36 #spring.datasource.url=jdbc:postgresql://localhost:5432/sqltest
37 #spring.datasource.driverClassName=org.postgresql.Driver 37 #spring.datasource.driverClassName=org.postgresql.Driver
38 -#spring.datasource.hikari.maximumPoolSize = 50  
  38 +#spring.datasource.hikari.maximumPoolSize = 50
  39 +
  40 +queue.rule-engine.queues[0].name=Main
  41 +queue.rule-engine.queues[0].topic=tb_rule_engine.main
  42 +queue.rule-engine.queues[0].poll-interval=25
  43 +queue.rule-engine.queues[0].partitions=3
  44 +queue.rule-engine.queues[0].pack-processing-timeout=3000
  45 +queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES
  46 +queue.rule-engine.queues[0].submit-strategy.type=BURST