Commit 0206a8ab39ee7b54071b65d4088f177dd9d122bd

Authored by Igor Kulikov
2 parents 9f0ad608 65c644b8

Merge branch 'develop/2.5.2'

Showing 28 changed files with 213 additions and 62 deletions
... ... @@ -20,6 +20,7 @@ import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
20 20 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
21 21 import org.thingsboard.server.actors.ActorSystemContext;
22 22 import org.thingsboard.server.actors.TbActorCtx;
  23 +import org.thingsboard.server.actors.TbActorException;
23 24 import org.thingsboard.server.actors.service.ContextAwareActor;
24 25 import org.thingsboard.server.common.data.id.DeviceId;
25 26 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -39,7 +40,7 @@ public class DeviceActor extends ContextAwareActor {
39 40 }
40 41
41 42 @Override
42   - public void init(TbActorCtx ctx) {
  43 + public void init(TbActorCtx ctx) throws TbActorException {
43 44 super.init(ctx);
44 45 log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
45 46 try {
... ... @@ -47,6 +48,7 @@ public class DeviceActor extends ContextAwareActor {
47 48 log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
48 49 } catch (Exception e) {
49 50 log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
  51 + throw new TbActorException("Failed to initialize device actor", e);
50 52 }
51 53 }
52 54
... ...
... ... @@ -221,8 +221,8 @@ class DefaultTbContext implements TbContext {
221 221 }
222 222
223 223 @Override
224   - public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
225   - return TbMsg.newMsg(type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
  224 + public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
  225 + return TbMsg.newMsg(queueName, type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
226 226 }
227 227
228 228 @Override
... ...
... ... @@ -244,7 +244,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
244 244 try {
245 245 checkActive(msg);
246 246 EntityId entityId = msg.getOriginator();
247   - TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
  247 + TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
248 248 List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
249 249 .filter(r -> contains(relationTypes, r.getType()))
250 250 .collect(Collectors.toList());
... ...
... ... @@ -17,7 +17,9 @@ package org.thingsboard.server.actors.service;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.actors.ActorSystemContext;
  20 +import org.thingsboard.server.actors.TbActor;
20 21 import org.thingsboard.server.actors.TbActorCtx;
  22 +import org.thingsboard.server.actors.TbActorException;
21 23 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
22 24 import org.thingsboard.server.actors.stats.StatsPersistMsg;
23 25 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -48,13 +50,13 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
48 50 abstract protected P createProcessor(TbActorCtx ctx);
49 51
50 52 @Override
51   - public void init(TbActorCtx ctx) {
  53 + public void init(TbActorCtx ctx) throws TbActorException {
52 54 super.init(ctx);
53 55 this.processor = createProcessor(ctx);
54 56 initProcessor(ctx);
55 57 }
56 58
57   - protected void initProcessor(TbActorCtx ctx) {
  59 + protected void initProcessor(TbActorCtx ctx) throws TbActorException {
58 60 try {
59 61 log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
60 62 processor.start(ctx);
... ... @@ -63,10 +65,10 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
63 65 scheduleStatsPersistTick();
64 66 }
65 67 } catch (Exception e) {
66   - log.warn("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType());
67   - log.warn("Error:", e);
  68 + log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);
68 69 logAndPersist("OnStart", e, true);
69 70 logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
  71 + throw new TbActorException("Failed to init actor", e);
70 72 }
71 73 }
72 74
... ... @@ -158,11 +160,11 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
158 160 errorsOccurred++;
159 161 String componentName = processor != null ? processor.getComponentName() : "Unknown";
160 162 if (critical) {
161   - log.warn("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
162   - log.warn("Critical Error: ", e);
163   - } else {
164 163 log.debug("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
165   - log.debug("Debug Error: ", e);
  164 + log.debug("Critical Error: ", e);
  165 + } else {
  166 + log.trace("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
  167 + log.trace("Debug Error: ", e);
166 168 }
167 169 long ts = System.currentTimeMillis();
168 170 if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.actors.ActorSystemContext;
20 20 import org.thingsboard.server.actors.TbActor;
21 21 import org.thingsboard.server.actors.TbActorCtx;
  22 +import org.thingsboard.server.actors.TbActorException;
22 23 import org.thingsboard.server.actors.TbActorId;
23 24 import org.thingsboard.server.actors.TbActorNotRegisteredException;
24 25 import org.thingsboard.server.actors.TbActorRef;
... ... @@ -62,7 +63,7 @@ public class TenantActor extends RuleChainManagerActor {
62 63 boolean cantFindTenant = false;
63 64
64 65 @Override
65   - public void init(TbActorCtx ctx) {
  66 + public void init(TbActorCtx ctx) throws TbActorException {
66 67 super.init(ctx);
67 68 log.info("[{}] Starting tenant actor.", tenantId);
68 69 try {
... ... @@ -93,6 +94,8 @@ public class TenantActor extends RuleChainManagerActor {
93 94 }
94 95 } catch (Exception e) {
95 96 log.warn("[{}] Unknown failure", tenantId, e);
  97 +// TODO: throw this in 3.1?
  98 +// throw new TbActorException("Failed to init actor", e);
96 99 }
97 100 }
98 101
... ...
... ... @@ -160,7 +160,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
160 160 }
161 161 });
162 162 if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
163   - ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue()));
  163 + ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
164 164 ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
165 165 }
166 166 mainConsumer.commit();
... ...
... ... @@ -166,7 +166,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
166 166 TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx);
167 167 try {
168 168 if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
169   - forwardToRuleEngineActor(tenantId, toRuleEngineMsg, callback);
  169 + forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
170 170 } else {
171 171 callback.onSuccess();
172 172 }
... ... @@ -180,7 +180,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
180 180 timeout = true;
181 181 }
182 182
183   - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, ctx);
  183 + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
184 184 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
185 185 if (statsEnabled) {
186 186 stats.log(result, decision.isCommit());
... ... @@ -246,8 +246,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
246 246 }
247 247 }
248 248
249   - private void forwardToRuleEngineActor(TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
250   - TbMsg tbMsg = TbMsg.fromBytes(toRuleEngineMsg.getTbMsg().toByteArray(), callback);
  249 + private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
  250 + TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
251 251 QueueToRuleEngineMsg msg;
252 252 ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
253 253 Set<String> relationTypes = null;
... ...
... ... @@ -28,13 +28,16 @@ import java.util.concurrent.ConcurrentMap;
28 28 public class TbRuleEngineProcessingResult {
29 29
30 30 @Getter
  31 + private final String queueName;
  32 + @Getter
31 33 private final boolean success;
32 34 @Getter
33 35 private final boolean timeout;
34 36 @Getter
35 37 private final TbMsgPackProcessingContext ctx;
36 38
37   - public TbRuleEngineProcessingResult(boolean timeout, TbMsgPackProcessingContext ctx) {
  39 + public TbRuleEngineProcessingResult(String queueName, boolean timeout, TbMsgPackProcessingContext ctx) {
  40 + this.queueName = queueName;
38 41 this.timeout = timeout;
39 42 this.ctx = ctx;
40 43 this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();
... ...
... ... @@ -100,7 +100,7 @@ public class TbRuleEngineProcessingStrategyFactory {
100 100 }
101 101 log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
102 102 if (log.isTraceEnabled()) {
103   - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  103 + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
104 104 }
105 105 if (pauseBetweenRetries > 0) {
106 106 try {
... ... @@ -129,10 +129,10 @@ public class TbRuleEngineProcessingStrategyFactory {
129 129 log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
130 130 }
131 131 if (log.isTraceEnabled()) {
132   - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  132 + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
133 133 }
134 134 if (log.isTraceEnabled()) {
135   - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  135 + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
136 136 }
137 137 return new TbRuleEngineProcessingDecision(true, null);
138 138 }
... ...
... ... @@ -126,7 +126,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
126 126 scriptIdToNameMap.put(scriptId, functionName);
127 127 return scriptId;
128 128 } catch (Exception e) {
129   - log.warn("Failed to compile JS script: {}", e.getMessage(), e);
  129 + log.debug("Failed to compile JS script: {}", e.getMessage(), e);
130 130 throw new ExecutionException(e);
131 131 }
132 132 });
... ...
... ... @@ -23,7 +23,7 @@ public abstract class AbstractTbActor implements TbActor {
23 23 protected TbActorCtx ctx;
24 24
25 25 @Override
26   - public void init(TbActorCtx ctx) {
  26 + public void init(TbActorCtx ctx) throws TbActorException {
27 27 this.ctx = ctx;
28 28 }
29 29
... ...
... ... @@ -23,14 +23,14 @@ public interface TbActor {
23 23
24 24 TbActorRef getActorRef();
25 25
26   - default void init(TbActorCtx ctx) {
  26 + default void init(TbActorCtx ctx) throws TbActorException {
27 27 }
28 28
29   - default void destroy() {
  29 + default void destroy() throws TbActorException {
30 30 }
31 31
32 32 default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
33   - return InitFailureStrategy.retryWithDelay(5000);
  33 + return InitFailureStrategy.retryWithDelay(5000 * attempt);
34 34 }
35 35
36 36 default ProcessFailureStrategy onProcessFailure(Throwable t) {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors;
  17 +
  18 +public class TbActorException extends Exception {
  19 +
  20 + public TbActorException(String message, Throwable cause) {
  21 + super(message, cause);
  22 + }
  23 +}
... ...
... ... @@ -72,10 +72,12 @@ public final class TbActorMailbox implements TbActorCtx {
72 72 log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
73 73 system.stop(selfId);
74 74 } else if (strategy.getRetryDelay() > 0) {
75   - log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay(), t);
  75 + log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay());
  76 + log.debug("[{}] Error", selfId, t);
76 77 system.getScheduler().schedule(() -> dispatcher.getExecutor().execute(() -> tryInit(attemptIdx)), strategy.getRetryDelay(), TimeUnit.MILLISECONDS);
77 78 } else {
78   - log.info("[{}] Failed to init actor, attempt {}, going to retry immediately", selfId, attempt, t);
  79 + log.info("[{}] Failed to init actor, attempt {}, going to retry immediately", selfId, attempt);
  80 + log.debug("[{}] Error", selfId, t);
79 81 dispatcher.getExecutor().execute(() -> tryInit(attemptIdx));
80 82 }
81 83 }
... ...
... ... @@ -31,7 +31,7 @@ public class TbEntityActorId implements TbActorId {
31 31
32 32 @Override
33 33 public String toString() {
34   - return entityId.toString();
  34 + return entityId.getEntityType() + "|" + entityId.getId();
35 35 }
36 36
37 37 @Override
... ...
... ... @@ -148,6 +148,25 @@ public class ActorSystemTest {
148 148 Assert.assertEquals(2, testCtx.getInvocationCount().get());
149 149 }
150 150
  151 + @Test
  152 + public void testFailedInit() throws InterruptedException {
  153 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  154 + ActorTestCtx testCtx1 = getActorTestCtx(1);
  155 + ActorTestCtx testCtx2 = getActorTestCtx(1);
  156 +
  157 + TbActorRef actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator(
  158 + new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx1, 1, 3000));
  159 + TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator(
  160 + new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2, 2, 1));
  161 +
  162 + actorId1.tell(new IntTbActorMsg(42));
  163 + actorId2.tell(new IntTbActorMsg(42));
  164 +
  165 + Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS));
  166 + Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
  167 + Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS));
  168 + }
  169 +
151 170
152 171 public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException {
153 172 Random random = new Random();
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +
  20 +@Slf4j
  21 +public class FailedToInitActor extends TestRootActor {
  22 +
  23 + int retryAttempts;
  24 + int retryDelay;
  25 + int attempts = 0;
  26 +
  27 + public FailedToInitActor(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) {
  28 + super(actorId, testCtx);
  29 + this.retryAttempts = retryAttempts;
  30 + this.retryDelay = retryDelay;
  31 + }
  32 +
  33 + @Override
  34 + public void init(TbActorCtx ctx) throws TbActorException {
  35 + if (attempts < retryAttempts) {
  36 + attempts++;
  37 + throw new TbActorException("Test attempt", new RuntimeException());
  38 + } else {
  39 + super.init(ctx);
  40 + }
  41 + }
  42 +
  43 + @Override
  44 + public InitFailureStrategy onInitFailure(int attempt, Throwable t) {
  45 + return InitFailureStrategy.retryWithDelay(retryDelay);
  46 + }
  47 +
  48 + public static class FailedToInitActorCreator implements TbActorCreator {
  49 +
  50 + private final TbActorId actorId;
  51 + private final ActorTestCtx testCtx;
  52 + private final int retryAttempts;
  53 + private final int retryDelay;
  54 +
  55 + public FailedToInitActorCreator(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) {
  56 + this.actorId = actorId;
  57 + this.testCtx = testCtx;
  58 + this.retryAttempts = retryAttempts;
  59 + this.retryDelay = retryDelay;
  60 + }
  61 +
  62 + @Override
  63 + public TbActorId createActorId() {
  64 + return actorId;
  65 + }
  66 +
  67 + @Override
  68 + public TbActor createActor() {
  69 + return new FailedToInitActor(actorId, testCtx, retryAttempts, retryDelay);
  70 + }
  71 + }
  72 +}
... ...
... ... @@ -25,7 +25,7 @@ public class SlowInitActor extends TestRootActor {
25 25 }
26 26
27 27 @Override
28   - public void init(TbActorCtx ctx) {
  28 + public void init(TbActorCtx ctx) throws TbActorException {
29 29 try {
30 30 Thread.sleep(500);
31 31 } catch (InterruptedException e) {
... ...
... ... @@ -37,7 +37,7 @@ public class TestRootActor extends AbstractTbActor {
37 37 }
38 38
39 39 @Override
40   - public void init(TbActorCtx ctx) {
  40 + public void init(TbActorCtx ctx) throws TbActorException {
41 41 super.init(ctx);
42 42 initialized = true;
43 43 }
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
25 25 import org.thingsboard.server.common.data.id.RuleChainId;
26 26 import org.thingsboard.server.common.data.id.RuleNodeId;
27 27 import org.thingsboard.server.common.msg.gen.MsgProtos;
  28 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
28 29 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
29 30
30 31 import java.io.IOException;
... ... @@ -39,6 +40,7 @@ import java.util.UUID;
39 40 @Slf4j
40 41 public final class TbMsg implements Serializable {
41 42
  43 + private final String queueName;
42 44 private final UUID id;
43 45 private final long ts;
44 46 private final String type;
... ... @@ -51,39 +53,44 @@ public final class TbMsg implements Serializable {
51 53 //This field is not serialized because we use queues and there is no need to do it
52 54 transient private final TbMsgCallback callback;
53 55
  56 + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  57 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  58 + }
  59 +
54 60 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
55   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
  61 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
56 62 }
57 63
58   - public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
59   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  64 + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
  65 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
60 66 }
61 67
62 68 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
63   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
  69 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
64 70 }
65 71
66 72 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
67   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  73 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
68 74 }
69 75
70 76 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
71   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
  77 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
72 78 }
73 79
74 80 public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
75   - return new TbMsg(origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
  81 + return new TbMsg(origMsg.getQueueName(), origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
76 82 data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
77 83 }
78 84
79 85 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
80   - return new TbMsg(UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
  86 + return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
81 87 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
82 88 }
83 89
84   - private TbMsg(UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
  90 + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
85 91 RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
86 92 this.id = id;
  93 + this.queueName = queueName;
87 94 if (ts > 0) {
88 95 this.ts = ts;
89 96 } else {
... ... @@ -136,7 +143,7 @@ public final class TbMsg implements Serializable {
136 143 return builder.build().toByteArray();
137 144 }
138 145
139   - public static TbMsg fromBytes(byte[] data, TbMsgCallback callback) {
  146 + public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) {
140 147 try {
141 148 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
142 149 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
... ... @@ -150,18 +157,18 @@ public final class TbMsg implements Serializable {
150 157 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
151 158 }
152 159 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
153   - return new TbMsg(UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
  160 + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
154 161 } catch (InvalidProtocolBufferException e) {
155 162 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
156 163 }
157 164 }
158 165
159 166 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
160   - return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
  167 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
161 168 }
162 169
163 170 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
164   - return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
  171 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
165 172 }
166 173
167 174 public TbMsgCallback getCallback() {
... ... @@ -172,4 +179,8 @@ public final class TbMsg implements Serializable {
172 179 return TbMsgCallback.EMPTY;
173 180 }
174 181 }
  182 +
  183 + public String getQueueName() {
  184 + return queueName != null ? queueName : ServiceQueue.MAIN;
  185 + }
175 186 }
... ...
... ... @@ -34,7 +34,7 @@ public class ServiceQueue {
34 34
35 35 public ServiceQueue(ServiceType type, String queue) {
36 36 this.type = type;
37   - this.queue = queue;
  37 + this.queue = queue != null ? queue : MAIN;
38 38 }
39 39
40 40 public ServiceType getType() {
... ...
... ... @@ -521,11 +521,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
521 521 .setDeviceName(msg.getDeviceInfo().getDeviceName())
522 522 .setDeviceType(msg.getDeviceInfo().getDeviceType())
523 523 .build();
524   - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
525   - transportService.registerAsyncSession(sessionInfo, this);
526   - checkGatewaySession();
527   - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
528   - log.info("[{}] Client connected!", sessionId);
  524 + transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
  525 + @Override
  526 + public void onSuccess(Void msg) {
  527 + transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this);
  528 + checkGatewaySession();
  529 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  530 + log.info("[{}] Client connected!", sessionId);
  531 + }
  532 +
  533 + @Override
  534 + public void onError(Throwable e) {
  535 + log.warn("[{}] Failed to submit session event", sessionId, e);
  536 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  537 + ctx.close();
  538 + }
  539 + });
529 540 }
530 541 }
531 542
... ...
... ... @@ -130,7 +130,7 @@ public interface TbContext {
130 130
131 131 void ack(TbMsg tbMsg);
132 132
133   - TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
  133 + TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data);
134 134
135 135 TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
136 136
... ...
... ... @@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
136 136 }
137 137
138 138 private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
139   - ctx.enqueueForTellNext(ctx.newMsg(msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS);
  139 + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS);
140 140 }
141 141
142 142 private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
... ...
... ... @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
24 24 import org.thingsboard.server.common.msg.TbMsg;
25 25 import org.thingsboard.server.common.msg.TbMsgDataType;
26 26 import org.thingsboard.server.common.msg.TbMsgMetaData;
  27 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
27 28 import org.thingsboard.server.common.msg.session.SessionMsgType;
28 29
29 30 import java.util.UUID;
... ... @@ -74,7 +75,7 @@ public class TbMsgCountNode implements TbNode {
74 75 TbMsgMetaData metaData = new TbMsgMetaData();
75 76 metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
76 77
77   - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
  78 + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
78 79 ctx.enqueueForTellNext(tbMsg, SUCCESS);
79 80 scheduleTickMsg(ctx);
80 81 } else {
... ... @@ -90,7 +91,7 @@ public class TbMsgCountNode implements TbNode {
90 91 }
91 92 lastScheduledTs = lastScheduledTs + delay;
92 93 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
93   - TbMsg tickMsg = ctx.newMsg(TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
  94 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
94 95 nextTickId = tickMsg.getId();
95 96 ctx.tellSelf(tickMsg, curDelay);
96 97 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
26 26 import org.thingsboard.server.common.msg.TbMsg;
27 27 import org.thingsboard.server.common.msg.TbMsgMetaData;
28 28 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
  29 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
29 30
30 31 import java.util.UUID;
31 32 import java.util.concurrent.TimeUnit;
... ... @@ -118,7 +119,7 @@ public class TbMsgGeneratorNode implements TbNode {
118 119 }
119 120 lastScheduledTs = lastScheduledTs + delay;
120 121 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
121   - TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
  122 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
122 123 nextTickId = tickMsg.getId();
123 124 ctx.tellSelf(tickMsg, curDelay);
124 125 }
... ... @@ -126,13 +127,13 @@ public class TbMsgGeneratorNode implements TbNode {
126 127 private ListenableFuture<TbMsg> generate(TbContext ctx) {
127 128 return ctx.getJsExecutor().executeAsync(() -> {
128 129 if (prevMsg == null) {
129   - prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
  130 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, new TbMsgMetaData(), "{}");
130 131 }
131 132 if (initialized) {
132 133 ctx.logJsEvalRequest();
133 134 TbMsg generated = jsEngine.executeGenerate(prevMsg);
134 135 ctx.logJsEvalResponse();
135   - prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
  136 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, generated.getMetaData(), generated.getData());
136 137 }
137 138 return prevMsg;
138 139 });
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
26 26 import org.thingsboard.server.common.data.plugin.ComponentType;
27 27 import org.thingsboard.server.common.msg.TbMsg;
28 28 import org.thingsboard.server.common.msg.TbMsgMetaData;
  29 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
29 30
30 31 import java.util.HashMap;
31 32 import java.util.Map;
... ... @@ -70,7 +71,7 @@ public class TbMsgDelayNode implements TbNode {
70 71 } else {
71 72 if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
72 73 pendingMsgs.put(msg.getId(), msg);
73   - TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
  74 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
74 75 ctx.tellSelf(tickMsg, getDelay(msg));
75 76 ctx.ack(msg);
76 77 } else {
... ...
... ... @@ -112,11 +112,11 @@ public class TbSendRPCRequestNode implements TbNode {
112 112
113 113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
114 114 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
115   - TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
  115 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
116 116 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
117 117 } else {
118   - TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
119   - ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name());
  118 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
  119 + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
120 120 }
121 121 });
122 122 ctx.ack(msg);
... ...