Commit ca193239bab7ba4331a61bd9e061f0f8998bc42c

Authored by Andrii Shvaika
1 parent ec4e2c03

RE Submit Strategies

Showing 23 changed files with 814 additions and 119 deletions
... ... @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.Event;
44 44 import org.thingsboard.server.common.data.id.EntityId;
45 45 import org.thingsboard.server.common.data.id.TenantId;
46 46 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  47 +import org.thingsboard.server.common.msg.TbActorMsg;
47 48 import org.thingsboard.server.common.msg.TbMsg;
48 49 import org.thingsboard.server.common.msg.queue.ServiceType;
49 50 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
... ... @@ -334,7 +335,6 @@ public class ActorSystemContext {
334 335 @Setter
335 336 private ActorSystem actorSystem;
336 337
337   - @Getter
338 338 @Setter
339 339 private ActorRef appActor;
340 340
... ... @@ -361,6 +361,8 @@ public class ActorSystemContext {
361 361 config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
362 362 }
363 363
  364 +
  365 +
364 366 public Scheduler getScheduler() {
365 367 return actorSystem.scheduler();
366 368 }
... ... @@ -535,4 +537,7 @@ public class ActorSystemContext {
535 537 return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
536 538 }
537 539
  540 + public void tell(TbActorMsg tbActorMsg, ActorRef sender) {
  541 + appActor.tell(tbActorMsg, sender);
  542 + }
538 543 }
... ...
... ... @@ -137,7 +137,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
137 137 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
138 138 if (actorMsg.isPresent()) {
139 139 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
140   - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender());
  140 + actorContext.tell(actorMsg.get(), ActorRef.noSender());
141 141 }
142 142 callback.onSuccess();
143 143 }
... ... @@ -194,7 +194,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
194 194 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getComponentLifecycleMsg().toByteArray());
195 195 if (actorMsg.isPresent()) {
196 196 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
197   - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender());
  197 + actorContext.tell(actorMsg.get(), ActorRef.noSender());
198 198 }
199 199 callback.onSuccess();
200 200 }
... ... @@ -259,7 +259,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
259 259 if (statsEnabled) {
260 260 stats.log(toDeviceActorMsg);
261 261 }
262   - actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
  262 + actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
263 263 }
264 264
265 265 private void throwNotHandled(Object msg, TbCallback callback) {
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.service.queue;
17 17
18 18 import akka.actor.ActorRef;
19   -import com.google.protobuf.ByteString;
20 19 import com.google.protobuf.ProtocolStringList;
21 20 import lombok.extern.slf4j.Slf4j;
22 21 import org.springframework.beans.factory.annotation.Value;
... ... @@ -28,11 +27,11 @@ import org.thingsboard.server.common.msg.TbActorMsg;
28 27 import org.thingsboard.server.common.msg.TbMsg;
29 28 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
30 29 import org.thingsboard.server.common.msg.queue.RuleEngineException;
31   -import org.thingsboard.server.common.msg.queue.RuleNodeException;
32 30 import org.thingsboard.server.common.msg.queue.ServiceQueue;
33 31 import org.thingsboard.server.common.msg.queue.ServiceType;
34 32 import org.thingsboard.server.common.msg.queue.TbCallback;
35 33 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  34 +import org.thingsboard.server.gen.transport.TransportProtos;
36 35 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
37 36 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
38 37 import org.thingsboard.server.queue.TbQueueConsumer;
... ... @@ -48,9 +47,12 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDec
48 47 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
49 48 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
50 49 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
  50 +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
  51 +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
51 52 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
52 53
53 54 import javax.annotation.PostConstruct;
  55 +import javax.annotation.PreDestroy;
54 56 import java.util.Collections;
55 57 import java.util.HashSet;
56 58 import java.util.List;
... ... @@ -59,8 +61,10 @@ import java.util.Set;
59 61 import java.util.UUID;
60 62 import java.util.concurrent.ConcurrentHashMap;
61 63 import java.util.concurrent.ConcurrentMap;
62   -import java.util.concurrent.CountDownLatch;
  64 +import java.util.concurrent.ExecutorService;
  65 +import java.util.concurrent.Executors;
63 66 import java.util.concurrent.TimeUnit;
  67 +import java.util.function.BiConsumer;
64 68 import java.util.function.Function;
65 69 import java.util.stream.Collectors;
66 70
... ... @@ -76,22 +80,27 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
76 80 @Value("${queue.rule-engine.stats.enabled:true}")
77 81 private boolean statsEnabled;
78 82
79   - private final TbRuleEngineProcessingStrategyFactory factory;
  83 + private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
  84 + private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory;
80 85 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
81 86 private final TbQueueRuleEngineSettings ruleEngineSettings;
82 87 private final RuleEngineStatisticsService statisticsService;
83 88 private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
84 89 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
85 90 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
  91 + private ExecutorService submitExecutor;
86 92
87   - public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory factory, TbQueueRuleEngineSettings ruleEngineSettings,
  93 + public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
  94 + TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
  95 + TbQueueRuleEngineSettings ruleEngineSettings,
88 96 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
89 97 ActorSystemContext actorContext, DataDecodingEncodingService encodingService) {
90 98 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
91 99 this.statisticsService = statisticsService;
92 100 this.ruleEngineSettings = ruleEngineSettings;
93 101 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
94   - this.factory = factory;
  102 + this.submitStrategyFactory = submitStrategyFactory;
  103 + this.processingStrategyFactory = processingStrategyFactory;
95 104 }
96 105
97 106 @PostConstruct
... ... @@ -102,6 +111,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
102 111 consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
103 112 consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName()));
104 113 }
  114 + submitExecutor = Executors.newSingleThreadExecutor();
  115 + }
  116 +
  117 + @PreDestroy
  118 + public void stop() {
  119 + if (submitExecutor != null) {
  120 + submitExecutor.shutdownNow();
  121 + }
105 122 }
106 123
107 124 @Override
... ... @@ -131,27 +148,18 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
131 148 if (msgs.isEmpty()) {
132 149 continue;
133 150 }
134   - TbRuleEngineProcessingStrategy strategy = factory.newInstance(configuration.getName(), configuration.getAckStrategy());
135   - TbRuleEngineProcessingDecision decision = null;
136   - boolean firstAttempt = true;
137   - while (!stopped && (firstAttempt || !decision.isCommit())) {
138   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> allMap;
139   - if (firstAttempt) {
140   - allMap = msgs.stream().collect(
141   - Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
142   - firstAttempt = false;
143   - } else {
144   - allMap = decision.getReprocessMap();
145   - }
146   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap = new ConcurrentHashMap<>();
147   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failedMap = new ConcurrentHashMap<>();
148   - ConcurrentMap<TenantId, RuleEngineException> exceptionsMap = new ConcurrentHashMap<>();
149   - CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
150   - allMap.forEach((id, msg) -> {
151   - log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
  151 + TbRuleEngineSubmitStrategy submitStrategy = submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy());
  152 + TbRuleEngineProcessingStrategy ackStrategy = processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy());
  153 +
  154 + submitStrategy.init(msgs);
  155 +
  156 + while (!stopped) {
  157 + ProcessingAttemptContext ctx = new ProcessingAttemptContext(submitStrategy);
  158 + submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> {
  159 + log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
152 160 ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
153 161 TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
154   - TbMsgCallback callback = new TbMsgPackCallback<>(id, tenantId, processingTimeoutLatch, allMap, successMap, failedMap, exceptionsMap);
  162 + TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx);
155 163 try {
156 164 if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
157 165 forwardToRuleEngineActor(tenantId, toRuleEngineMsg, callback);
... ... @@ -161,17 +169,24 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
161 169 } catch (Exception e) {
162 170 callback.onFailure(new RuleEngineException(e.getMessage()));
163 171 }
164   - });
  172 + }));
165 173
166 174 boolean timeout = false;
167   - if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
  175 + if (!ctx.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
168 176 timeout = true;
169 177 }
170   - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, allMap, successMap, failedMap, exceptionsMap);
171   - decision = strategy.analyze(result);
  178 +
  179 + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, ctx);
  180 + TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
172 181 if (statsEnabled) {
173 182 stats.log(result, decision.isCommit());
174 183 }
  184 + if (decision.isCommit()) {
  185 + submitStrategy.stop();
  186 + break;
  187 + } else {
  188 + submitStrategy.update(decision.getReprocessMap());
  189 + }
175 190 }
176 191 consumer.commit();
177 192 } catch (Exception e) {
... ... @@ -211,7 +226,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
211 226 Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
212 227 if (actorMsg.isPresent()) {
213 228 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
214   - actorContext.getAppActor().tell(actorMsg.get(), ActorRef.noSender());
  229 + actorContext.tell(actorMsg.get(), ActorRef.noSender());
215 230 }
216 231 callback.onSuccess();
217 232 } else {
... ... @@ -232,7 +247,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
232 247 }
233 248 }
234 249 msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
235   - actorContext.getAppActor().tell(msg, ActorRef.noSender());
  250 + actorContext.tell(msg, ActorRef.noSender());
236 251 }
237 252
238 253 @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.data.id.TenantId;
  20 +import org.thingsboard.server.common.msg.queue.RuleEngineException;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  23 +import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
  24 +
  25 +import java.util.UUID;
  26 +import java.util.concurrent.ConcurrentHashMap;
  27 +import java.util.concurrent.ConcurrentMap;
  28 +import java.util.concurrent.CountDownLatch;
  29 +import java.util.concurrent.TimeUnit;
  30 +
  31 +public class ProcessingAttemptContext {
  32 +
  33 + private final TbRuleEngineSubmitStrategy submitStrategy;
  34 +
  35 + private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
  36 + @Getter
  37 + private final ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> pendingMap;
  38 + @Getter
  39 + private final ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> successMap = new ConcurrentHashMap<>();
  40 + @Getter
  41 + private final ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> failedMap = new ConcurrentHashMap<>();
  42 + @Getter
  43 + private final ConcurrentMap<TenantId, RuleEngineException> exceptionsMap = new ConcurrentHashMap<>();
  44 +
  45 + public ProcessingAttemptContext(TbRuleEngineSubmitStrategy submitStrategy) {
  46 + this.submitStrategy = submitStrategy;
  47 + this.pendingMap = submitStrategy.getPendingMap();
  48 + }
  49 +
  50 + public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException {
  51 + return processingTimeoutLatch.await(packProcessingTimeout, milliseconds);
  52 + }
  53 +
  54 + public void onSuccess(UUID id) {
  55 + TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> msg;
  56 + boolean empty = false;
  57 + synchronized (pendingMap) {
  58 + msg = pendingMap.remove(id);
  59 + if (msg != null) {
  60 + empty = pendingMap.isEmpty();
  61 + }
  62 + }
  63 + if (msg != null) {
  64 + successMap.put(id, msg);
  65 + }
  66 + submitStrategy.onSuccess(id);
  67 + if (empty) {
  68 + processingTimeoutLatch.countDown();
  69 + }
  70 + }
  71 +
  72 + public void onFailure(TenantId tenantId, UUID id, RuleEngineException e) {
  73 + TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> msg;
  74 + boolean empty = false;
  75 + synchronized (pendingMap) {
  76 + msg = pendingMap.remove(id);
  77 + if (msg != null) {
  78 + empty = pendingMap.isEmpty();
  79 + }
  80 + }
  81 + if (msg != null) {
  82 + failedMap.put(id, msg);
  83 + exceptionsMap.putIfAbsent(tenantId, e);
  84 + }
  85 + if (empty) {
  86 + processingTimeoutLatch.countDown();
  87 + }
  88 + }
  89 +}
... ...
... ... @@ -27,52 +27,26 @@ import java.util.concurrent.ConcurrentMap;
27 27 import java.util.concurrent.CountDownLatch;
28 28
29 29 @Slf4j
30   -public class TbMsgPackCallback<T> implements TbMsgCallback {
31   - private final CountDownLatch processingTimeoutLatch;
32   - private final ConcurrentMap<UUID, T> ackMap;
33   - private final ConcurrentMap<UUID, T> successMap;
34   - private final ConcurrentMap<UUID, T> failedMap;
  30 +public class TbMsgPackCallback implements TbMsgCallback {
35 31 private final UUID id;
36 32 private final TenantId tenantId;
37   - private final ConcurrentMap<TenantId, RuleEngineException> firstExceptions;
  33 + private final ProcessingAttemptContext ctx;
38 34
39   - public TbMsgPackCallback(UUID id, TenantId tenantId,
40   - CountDownLatch processingTimeoutLatch,
41   - ConcurrentMap<UUID, T> ackMap,
42   - ConcurrentMap<UUID, T> successMap,
43   - ConcurrentMap<UUID, T> failedMap,
44   - ConcurrentMap<TenantId, RuleEngineException> firstExceptions) {
  35 + public TbMsgPackCallback(UUID id, TenantId tenantId, ProcessingAttemptContext ctx) {
45 36 this.id = id;
46 37 this.tenantId = tenantId;
47   - this.processingTimeoutLatch = processingTimeoutLatch;
48   - this.ackMap = ackMap;
49   - this.successMap = successMap;
50   - this.failedMap = failedMap;
51   - this.firstExceptions = firstExceptions;
  38 + this.ctx = ctx;
52 39 }
53 40
54 41 @Override
55 42 public void onSuccess() {
56 43 log.trace("[{}] ON SUCCESS", id);
57   - T msg = ackMap.remove(id);
58   - if (msg != null) {
59   - successMap.put(id, msg);
60   - }
61   - if (msg != null && ackMap.isEmpty()) {
62   - processingTimeoutLatch.countDown();
63   - }
  44 + ctx.onSuccess(id);
64 45 }
65 46
66 47 @Override
67 48 public void onFailure(RuleEngineException e) {
68 49 log.trace("[{}] ON FAILURE", id, e);
69   - T msg = ackMap.remove(id);
70   - if (msg != null) {
71   - failedMap.put(id, msg);
72   - firstExceptions.putIfAbsent(tenantId, e);
73   - }
74   - if (ackMap.isEmpty()) {
75   - processingTimeoutLatch.countDown();
76   - }
  50 + ctx.onFailure(tenantId, id, e);
77 51 }
78 52 }
... ...
... ... @@ -19,7 +19,6 @@ import lombok.Data;
19 19 import lombok.extern.slf4j.Slf4j;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21 21 import org.thingsboard.server.common.msg.queue.RuleEngineException;
22   -import org.thingsboard.server.common.msg.queue.RuleNodeException;
23 22 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
24 23 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
25 24 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
... ... @@ -77,7 +76,7 @@ public class TbRuleEngineConsumerStats {
77 76 public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) {
78 77 int success = msg.getSuccessMap().size();
79 78 int pending = msg.getPendingMap().size();
80   - int failed = msg.getFailureMap().size();
  79 + int failed = msg.getFailedMap().size();
81 80 totalMsgCounter.addAndGet(success + pending + failed);
82 81 successMsgCounter.addAndGet(success);
83 82 msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess());
... ... @@ -89,7 +88,7 @@ public class TbRuleEngineConsumerStats {
89 88 msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout());
90 89 }
91 90 if (failed > 0) {
92   - msg.getFailureMap().values().forEach(m -> getTenantStats(m).logFailed());
  91 + msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed());
93 92 }
94 93 failedIterationsCounter.incrementAndGet();
95 94 } else {
... ... @@ -103,7 +102,7 @@ public class TbRuleEngineConsumerStats {
103 102 msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout());
104 103 }
105 104 if (failed > 0) {
106   - msg.getFailureMap().values().forEach(m -> getTenantStats(m).logTmpFailed());
  105 + msg.getFailedMap().values().forEach(m -> getTenantStats(m).logTmpFailed());
107 106 }
108 107 }
109 108 msg.getExceptionsMap().forEach(tenantExceptions::putIfAbsent);
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos;
  19 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  20 +
  21 +import java.util.ArrayList;
  22 +import java.util.List;
  23 +import java.util.UUID;
  24 +import java.util.concurrent.ConcurrentMap;
  25 +import java.util.stream.Collectors;
  26 +
  27 +public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngineSubmitStrategy {
  28 +
  29 + protected final String queueName;
  30 + protected List<IdMsgPair> orderedMsgList;
  31 + private volatile boolean stopped;
  32 +
  33 + public AbstractTbRuleEngineSubmitStrategy(String queueName) {
  34 + this.queueName = queueName;
  35 + }
  36 +
  37 + protected abstract void doOnSuccess(UUID id);
  38 +
  39 + @Override
  40 + public void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs) {
  41 + orderedMsgList = msgs.stream().map(msg -> new IdMsgPair(UUID.randomUUID(), msg)).collect(Collectors.toList());
  42 + }
  43 +
  44 + @Override
  45 + public ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getPendingMap() {
  46 + return orderedMsgList.stream().collect(Collectors.toConcurrentMap(pair -> pair.uuid, pair -> pair.msg));
  47 + }
  48 +
  49 + @Override
  50 + public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
  51 + List<IdMsgPair> newOrderedMsgList = new ArrayList<>(reprocessMap.size());
  52 + for (IdMsgPair pair : orderedMsgList) {
  53 + if (reprocessMap.containsKey(pair.uuid)) {
  54 + newOrderedMsgList.add(pair);
  55 + }
  56 + }
  57 + orderedMsgList = newOrderedMsgList;
  58 + }
  59 +
  60 + @Override
  61 + public void onSuccess(UUID id) {
  62 + if (!stopped) {
  63 + doOnSuccess(id);
  64 + }
  65 + }
  66 +
  67 + @Override
  68 + public void stop() {
  69 + stopped = true;
  70 + }
  71 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.gen.transport.TransportProtos;
  20 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  21 +
  22 +import java.util.LinkedHashMap;
  23 +import java.util.Map;
  24 +import java.util.UUID;
  25 +import java.util.concurrent.ConcurrentMap;
  26 +import java.util.concurrent.ExecutorService;
  27 +import java.util.concurrent.atomic.AtomicInteger;
  28 +import java.util.function.BiConsumer;
  29 +
  30 +@Slf4j
  31 +public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
  32 +
  33 + private final int batchSize;
  34 + private final AtomicInteger packIdx = new AtomicInteger(0);
  35 + private final Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> pendingPack = new LinkedHashMap<>();
  36 + private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;
  37 +
  38 + public BatchTbRuleEngineSubmitStrategy(String queueName, int batchSize) {
  39 + super(queueName);
  40 + this.batchSize = batchSize;
  41 + }
  42 +
  43 + @Override
  44 + public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
  45 + this.msgConsumer = msgConsumer;
  46 + submitNext();
  47 + }
  48 +
  49 + @Override
  50 + public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
  51 + super.update(reprocessMap);
  52 + packIdx.set(0);
  53 + }
  54 +
  55 + @Override
  56 + protected void doOnSuccess(UUID id) {
  57 + boolean endOfPendingPack;
  58 + synchronized (pendingPack) {
  59 + TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> msg = pendingPack.remove(id);
  60 + endOfPendingPack = msg != null && pendingPack.isEmpty();
  61 + }
  62 + if (endOfPendingPack) {
  63 + packIdx.incrementAndGet();
  64 + submitNext();
  65 + }
  66 + }
  67 +
  68 + private void submitNext() {
  69 + int listSize = orderedMsgList.size();
  70 + int startIdx = Math.min(packIdx.get() * batchSize, listSize);
  71 + int endIdx = Math.min(startIdx + batchSize, listSize);
  72 + synchronized (pendingPack) {
  73 + pendingPack.clear();
  74 + for (int i = startIdx; i < endIdx; i++) {
  75 + IdMsgPair pair = orderedMsgList.get(i);
  76 + pendingPack.put(pair.uuid, pair.msg);
  77 + }
  78 + }
  79 + int submitSize = pendingPack.size();
  80 + if (log.isInfoEnabled() && submitSize > 0) {
  81 + log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
  82 + }
  83 + pendingPack.forEach(msgConsumer);
  84 + }
  85 +
  86 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.gen.transport.TransportProtos;
  20 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  21 +
  22 +import java.util.ArrayList;
  23 +import java.util.List;
  24 +import java.util.UUID;
  25 +import java.util.concurrent.ConcurrentMap;
  26 +import java.util.concurrent.ExecutorService;
  27 +import java.util.function.BiConsumer;
  28 +import java.util.function.Function;
  29 +import java.util.stream.Collectors;
  30 +
  31 +@Slf4j
  32 +public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
  33 +
  34 + public BurstTbRuleEngineSubmitStrategy(String queueName) {
  35 + super(queueName);
  36 + }
  37 +
  38 + @Override
  39 + public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
  40 + if (log.isInfoEnabled()) {
  41 + log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
  42 + }
  43 + orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg));
  44 + }
  45 +
  46 + @Override
  47 + protected void doOnSuccess(UUID id) {
  48 +
  49 + }
  50 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  19 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  20 +
  21 +import java.util.UUID;
  22 +
  23 +public class IdMsgPair {
  24 + final UUID uuid;
  25 + final TbProtoQueueMsg<ToRuleEngineMsg> msg;
  26 +
  27 + public IdMsgPair(UUID uuid, TbProtoQueueMsg<ToRuleEngineMsg> msg) {
  28 + this.uuid = uuid;
  29 + this.msg = msg;
  30 + }
  31 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import com.google.protobuf.InvalidProtocolBufferException;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  22 +import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.common.msg.gen.MsgProtos;
  24 +import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  25 +import org.thingsboard.server.gen.transport.TransportProtos;
  26 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  27 +
  28 +import java.util.ArrayList;
  29 +import java.util.LinkedList;
  30 +import java.util.List;
  31 +import java.util.Queue;
  32 +import java.util.UUID;
  33 +import java.util.concurrent.ConcurrentHashMap;
  34 +import java.util.concurrent.ConcurrentMap;
  35 +import java.util.concurrent.atomic.AtomicInteger;
  36 +import java.util.function.BiConsumer;
  37 +import java.util.stream.Collectors;
  38 +
  39 +@Slf4j
  40 +public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
  41 +
  42 + private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;
  43 + private volatile ConcurrentMap<UUID, EntityId> msgToEntityIdMap = new ConcurrentHashMap<>();
  44 + private volatile ConcurrentMap<EntityId, Queue<IdMsgPair>> entityIdToListMap = new ConcurrentHashMap<>();
  45 +
  46 + public SequentialByEntityIdTbRuleEngineSubmitStrategy(String queueName) {
  47 + super(queueName);
  48 + }
  49 +
  50 + @Override
  51 + public void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs) {
  52 + super.init(msgs);
  53 + initMaps();
  54 + }
  55 +
  56 + @Override
  57 + public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
  58 + this.msgConsumer = msgConsumer;
  59 + entityIdToListMap.forEach((entityId, queue) -> {
  60 + IdMsgPair msg = queue.peek();
  61 + if (msg != null) {
  62 + msgConsumer.accept(msg.uuid, msg.msg);
  63 + }
  64 + });
  65 + }
  66 +
  67 + @Override
  68 + public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
  69 + super.update(reprocessMap);
  70 + initMaps();
  71 + }
  72 +
  73 + @Override
  74 + protected void doOnSuccess(UUID id) {
  75 + EntityId entityId = msgToEntityIdMap.get(id);
  76 + if (entityId != null) {
  77 + Queue<IdMsgPair> queue = entityIdToListMap.get(entityId);
  78 + if (queue != null) {
  79 + IdMsgPair next = null;
  80 + synchronized (queue) {
  81 + IdMsgPair expected = queue.peek();
  82 + if (expected != null && expected.uuid.equals(id)) {
  83 + queue.poll();
  84 + next = queue.peek();
  85 + }
  86 + }
  87 + if (next != null) {
  88 + msgConsumer.accept(next.uuid, next.msg);
  89 + }
  90 + }
  91 + }
  92 + }
  93 +
  94 + private void initMaps() {
  95 + msgToEntityIdMap.clear();
  96 + entityIdToListMap.clear();
  97 + for (IdMsgPair pair : orderedMsgList) {
  98 + EntityId entityId = getEntityId(pair.msg.getValue());
  99 + if (entityId != null) {
  100 + msgToEntityIdMap.put(pair.uuid, entityId);
  101 + entityIdToListMap.computeIfAbsent(entityId, id -> new LinkedList<>()).add(pair);
  102 + }
  103 + }
  104 + }
  105 +
  106 + protected abstract EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg);
  107 +
  108 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import com.google.protobuf.InvalidProtocolBufferException;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  22 +import org.thingsboard.server.common.msg.gen.MsgProtos;
  23 +import org.thingsboard.server.gen.transport.TransportProtos;
  24 +
  25 +import java.util.UUID;
  26 +
  27 +@Slf4j
  28 +public class SequentialByOriginatorIdTbRuleEngineSubmitStrategy extends SequentialByEntityIdTbRuleEngineSubmitStrategy {
  29 +
  30 + public SequentialByOriginatorIdTbRuleEngineSubmitStrategy(String queueName) {
  31 + super(queueName);
  32 + }
  33 +
  34 + @Override
  35 + protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
  36 + try {
  37 + MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(msg.getTbMsg());
  38 + return EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
  39 + } catch (InvalidProtocolBufferException e) {
  40 + log.warn("[{}] Failed to parse TbMsg: {}", queueName, msg);
  41 + return null;
  42 + }
  43 + }
  44 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.id.TenantId;
  20 +import org.thingsboard.server.gen.transport.TransportProtos;
  21 +
  22 +import java.util.UUID;
  23 +
  24 +public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialByEntityIdTbRuleEngineSubmitStrategy {
  25 +
  26 + public SequentialByTenantIdTbRuleEngineSubmitStrategy(String queueName) {
  27 + super(queueName);
  28 + }
  29 +
  30 + @Override
  31 + protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
  32 + return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
  33 +
  34 + }
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.gen.transport.TransportProtos;
  20 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  21 +
  22 +import java.util.LinkedHashMap;
  23 +import java.util.Map;
  24 +import java.util.UUID;
  25 +import java.util.concurrent.ConcurrentMap;
  26 +import java.util.concurrent.atomic.AtomicInteger;
  27 +import java.util.function.BiConsumer;
  28 +
  29 +@Slf4j
  30 +public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
  31 +
  32 + private final AtomicInteger msgIdx = new AtomicInteger(0);
  33 + private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;
  34 + private volatile UUID expectedMsgId;
  35 +
  36 + public SequentialTbRuleEngineSubmitStrategy(String queueName) {
  37 + super(queueName);
  38 + }
  39 +
  40 + @Override
  41 + public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
  42 + this.msgConsumer = msgConsumer;
  43 + msgIdx.set(0);
  44 + submitNext();
  45 + }
  46 +
  47 + @Override
  48 + public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
  49 + super.update(reprocessMap);
  50 + }
  51 +
  52 + @Override
  53 + protected void doOnSuccess(UUID id) {
  54 + if (expectedMsgId.equals(id)) {
  55 + msgIdx.incrementAndGet();
  56 + submitNext();
  57 + }
  58 + }
  59 +
  60 + private void submitNext() {
  61 + int listSize = orderedMsgList.size();
  62 + int idx = msgIdx.get();
  63 + if (idx < listSize) {
  64 + IdMsgPair pair = orderedMsgList.get(idx);
  65 + expectedMsgId = pair.uuid;
  66 + if (log.isInfoEnabled()) {
  67 + log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
  68 + }
  69 + msgConsumer.accept(pair.uuid, pair.msg);
  70 + }
  71 + }
  72 +
  73 +}
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.TenantId;
20 20 import org.thingsboard.server.common.msg.queue.RuleEngineException;
21 21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
22 22 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  23 +import org.thingsboard.server.service.queue.ProcessingAttemptContext;
23 24
24 25 import java.util.UUID;
25 26 import java.util.concurrent.ConcurrentMap;
... ... @@ -31,24 +32,27 @@ public class TbRuleEngineProcessingResult {
31 32 @Getter
32 33 private final boolean timeout;
33 34 @Getter
34   - private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pendingMap;
35   - @Getter
36   - private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap;
37   - @Getter
38   - private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap;
39   - @Getter
40   - private final ConcurrentMap<TenantId, RuleEngineException> exceptionsMap;
  35 + private final ProcessingAttemptContext ctx;
41 36
42   - public TbRuleEngineProcessingResult(boolean timeout,
43   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pendingMap,
44   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap,
45   - ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap,
46   - ConcurrentMap<TenantId, RuleEngineException> exceptionsMap) {
  37 + public TbRuleEngineProcessingResult(boolean timeout, ProcessingAttemptContext ctx) {
47 38 this.timeout = timeout;
48   - this.pendingMap = pendingMap;
49   - this.successMap = successMap;
50   - this.failureMap = failureMap;
51   - this.exceptionsMap = exceptionsMap;
52   - this.success = !timeout && pendingMap.isEmpty() && failureMap.isEmpty();
  39 + this.ctx = ctx;
  40 + this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();
  41 + }
  42 +
  43 + public ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> getPendingMap() {
  44 + return ctx.getPendingMap();
  45 + }
  46 +
  47 + public ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> getSuccessMap() {
  48 + return ctx.getSuccessMap();
  49 + }
  50 +
  51 + public ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> getFailedMap() {
  52 + return ctx.getFailedMap();
  53 + }
  54 +
  55 + public ConcurrentMap<TenantId, RuleEngineException> getExceptionsMap() {
  56 + return ctx.getExceptionsMap();
53 57 }
54 58 }
... ...
... ... @@ -16,12 +16,10 @@
16 16 package org.thingsboard.server.service.queue.processing;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.beans.factory.annotation.Value;
20 19 import org.springframework.stereotype.Component;
21 20 import org.thingsboard.server.gen.transport.TransportProtos;
22 21 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
23 22 import org.thingsboard.server.queue.settings.TbRuleEngineQueueAckStrategyConfiguration;
24   -import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
25 23
26 24 import java.util.UUID;
27 25 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -77,10 +75,10 @@ public class TbRuleEngineProcessingStrategyFactory {
77 75 return new TbRuleEngineProcessingDecision(true, null);
78 76 } else {
79 77 if (retryCount == 0) {
80   - initialTotalCount = result.getPendingMap().size() + result.getFailureMap().size() + result.getSuccessMap().size();
  78 + initialTotalCount = result.getPendingMap().size() + result.getFailedMap().size() + result.getSuccessMap().size();
81 79 }
82 80 retryCount++;
83   - double failedCount = result.getFailureMap().size() + result.getPendingMap().size();
  81 + double failedCount = result.getFailedMap().size() + result.getPendingMap().size();
84 82 if (maxRetries > 0 && retryCount > maxRetries) {
85 83 log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
86 84 return new TbRuleEngineProcessingDecision(true, null);
... ... @@ -90,7 +88,7 @@ public class TbRuleEngineProcessingStrategyFactory {
90 88 } else {
91 89 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
92 90 if (retryFailed) {
93   - result.getFailureMap().forEach(toReprocess::put);
  91 + result.getFailedMap().forEach(toReprocess::put);
94 92 }
95 93 if (retryTimeout) {
96 94 result.getPendingMap().forEach(toReprocess::put);
... ... @@ -125,7 +123,7 @@ public class TbRuleEngineProcessingStrategyFactory {
125 123
126 124 @Override
127 125 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
128   - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailureMap().size(), result.getPendingMap().size());
  126 + log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
129 127 return new TbRuleEngineProcessingDecision(true, null);
130 128 }
131 129 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos;
  19 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  20 +
  21 +import java.util.List;
  22 +import java.util.UUID;
  23 +import java.util.concurrent.ConcurrentMap;
  24 +import java.util.function.BiConsumer;
  25 +
  26 +public interface TbRuleEngineSubmitStrategy {
  27 +
  28 + void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs);
  29 +
  30 + ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getPendingMap();
  31 +
  32 + void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer);
  33 +
  34 + void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap);
  35 +
  36 + void onSuccess(UUID id);
  37 +
  38 + void stop();
  39 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue.processing;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.queue.settings.TbRuleEngineQueueSubmitStrategyConfiguration;
  21 +
  22 +@Component
  23 +@Slf4j
  24 +public class TbRuleEngineSubmitStrategyFactory {
  25 +
  26 + public TbRuleEngineSubmitStrategy newInstance(String name, TbRuleEngineQueueSubmitStrategyConfiguration configuration) {
  27 + switch (configuration.getType()) {
  28 + case "BURST":
  29 + return new BurstTbRuleEngineSubmitStrategy(name);
  30 + case "BATCH":
  31 + return new BatchTbRuleEngineSubmitStrategy(name, configuration.getBatchSize());
  32 + case "SEQUENTIAL_WITHIN_ORIGINATOR":
  33 + return new SequentialByOriginatorIdTbRuleEngineSubmitStrategy(name);
  34 + case "SEQUENTIAL_WITHIN_TENANT":
  35 + return new SequentialByTenantIdTbRuleEngineSubmitStrategy(name);
  36 + case "SEQUENTIAL":
  37 + return new SequentialTbRuleEngineSubmitStrategy(name);
  38 + default:
  39 + throw new RuntimeException("TbRuleEngineProcessingStrategy with type " + configuration.getType() + " is not supported!");
  40 + }
  41 + }
  42 +
  43 +}
... ...
... ... @@ -123,7 +123,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
123 123 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
124 124 UUID requestId = request.getId();
125 125 localToDeviceRpcRequests.put(requestId, rpcMsg);
126   - actorContext.getAppActor().tell(rpcMsg, ActorRef.noSender());
  126 + actorContext.tell(rpcMsg, ActorRef.noSender());
127 127 scheduleToDeviceTimeout(request, requestId);
128 128 }
129 129
... ...
... ... @@ -578,27 +578,35 @@ queue:
578 578 print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
579 579 queues: # TODO 2.5: specify correct ENV variable names.
580 580 - name: "Main"
581   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.main}"
582   - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
583   - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}"
584   - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
585   - ack-strategy:
586   - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
  581 + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
  582 + poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
  583 + partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
  584 + pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
  585 + submit-strategy:
  586 + type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
  587 + # For BATCH only
  588 + batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
  589 + processing-strategy:
  590 + type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
587 591 # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
588   - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
589   - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
590   - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
591   - - name: "${TB_QUEUE_RULE_ENGINE_HP_QUEUE_NAME:HighPriority}"
592   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}"
593   - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
594   - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}"
595   - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
596   - ack-strategy:
597   - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
  592 + retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
  593 + failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
  594 + pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
  595 + - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
  596 + topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
  597 + poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
  598 + partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
  599 + pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
  600 + submit-strategy:
  601 + type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
  602 + # For BATCH only
  603 + batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
  604 + processing-strategy:
  605 + type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
598 606 # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
599   - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
600   - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
601   - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:1}"# Time in seconds to wait in consumer thread before retries;
  607 + retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
  608 + failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
  609 + pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
602 610 transport:
603 611 # For high priority notifications that require minimum latency and processing time
604 612 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
... ...
... ... @@ -20,13 +20,9 @@ import lombok.Data;
20 20 @Data
21 21 public class TbRuleEngineQueueAckStrategyConfiguration {
22 22
23   -// @Value("${type}")
24 23 private String type;
25   -// @Value("${retries:3}")
26 24 private int retries;
27   -// @Value("${failure_percentage:0}")
28 25 private double failurePercentage;
29   -// @Value("${pause_between_retries:3}")
30 26 private long pauseBetweenRetries;
31 27
32 28 }
... ...
... ... @@ -25,6 +25,7 @@ public class TbRuleEngineQueueConfiguration {
25 25 private int pollInterval;
26 26 private int partitions;
27 27 private String packProcessingTimeout;
28   - private TbRuleEngineQueueAckStrategyConfiguration ackStrategy;
  28 + private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy;
  29 + private TbRuleEngineQueueAckStrategyConfiguration processingStrategy;
29 30
30 31 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.settings;
  17 +
  18 +import lombok.Data;
  19 +
  20 +@Data
  21 +public class TbRuleEngineQueueSubmitStrategyConfiguration {
  22 +
  23 + private String type;
  24 + private int batchSize;
  25 +
  26 +}
... ...