Commit 2ea2b57f3c4debfbb1cff09dec4c3ec25b3b1d39
Committed by
Andrew Shvayka
1 parent
b63be05f
Top 5 rule nodes statistics
(cherry picked from commit 445350b1)
Showing
11 changed files
with
247 additions
and
11 deletions
@@ -105,6 +105,7 @@ class DefaultTbContext implements TbContext { | @@ -105,6 +105,7 @@ class DefaultTbContext implements TbContext { | ||
105 | if (nodeCtx.getSelf().isDebugMode()) { | 105 | if (nodeCtx.getSelf().isDebugMode()) { |
106 | relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); | 106 | relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); |
107 | } | 107 | } |
108 | + msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); | ||
108 | nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); | 109 | nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); |
109 | } | 110 | } |
110 | 111 | ||
@@ -214,6 +215,7 @@ class DefaultTbContext implements TbContext { | @@ -214,6 +215,7 @@ class DefaultTbContext implements TbContext { | ||
214 | if (nodeCtx.getSelf().isDebugMode()) { | 215 | if (nodeCtx.getSelf().isDebugMode()) { |
215 | mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); | 216 | mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); |
216 | } | 217 | } |
218 | + tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); | ||
217 | tbMsg.getCallback().onSuccess(); | 219 | tbMsg.getCallback().onSuccess(); |
218 | } | 220 | } |
219 | 221 |
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod | @@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod | ||
103 | } | 103 | } |
104 | 104 | ||
105 | void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { | 105 | void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { |
106 | - msg.getMsg().getCallback().visit(info); | 106 | + msg.getMsg().getCallback().onProcessingStart(info); |
107 | checkActive(msg.getMsg()); | 107 | checkActive(msg.getMsg()); |
108 | if (ruleNode.isDebugMode()) { | 108 | if (ruleNode.isDebugMode()) { |
109 | systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType()); | 109 | systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType()); |
@@ -165,7 +165,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< | @@ -165,7 +165,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< | ||
165 | submitStrategy.init(msgs); | 165 | submitStrategy.init(msgs); |
166 | 166 | ||
167 | while (!stopped) { | 167 | while (!stopped) { |
168 | - TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(submitStrategy); | 168 | + TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy); |
169 | submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> { | 169 | submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> { |
170 | log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); | 170 | log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); |
171 | ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); | 171 | ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); |
@@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< | @@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< | ||
194 | if (!ctx.getFailedMap().isEmpty()) { | 194 | if (!ctx.getFailedMap().isEmpty()) { |
195 | printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); | 195 | printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); |
196 | } | 196 | } |
197 | + ctx.printProfilerStats(); | ||
198 | + | ||
197 | TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); | 199 | TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); |
198 | if (statsEnabled) { | 200 | if (statsEnabled) { |
199 | stats.log(result, decision.isCommit()); | 201 | stats.log(result, decision.isCommit()); |
@@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback { | @@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback { | ||
49 | } | 49 | } |
50 | 50 | ||
51 | @Override | 51 | @Override |
52 | - public void visit(RuleNodeInfo ruleNodeInfo) { | ||
53 | - log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo); | ||
54 | - ctx.visit(id, ruleNodeInfo); | 52 | + public void onProcessingStart(RuleNodeInfo ruleNodeInfo) { |
53 | + log.trace("[{}] ON PROCESSING START: {}", id, ruleNodeInfo); | ||
54 | + ctx.onProcessingStart(id, ruleNodeInfo); | ||
55 | + } | ||
56 | + | ||
57 | + @Override | ||
58 | + public void onProcessingEnd(RuleNodeId ruleNodeId) { | ||
59 | + log.trace("[{}] ON PROCESSING END: {}", id, ruleNodeId); | ||
60 | + ctx.onProcessingEnd(id, ruleNodeId); | ||
55 | } | 61 | } |
56 | } | 62 | } |
@@ -16,6 +16,7 @@ | @@ -16,6 +16,7 @@ | ||
16 | package org.thingsboard.server.service.queue; | 16 | package org.thingsboard.server.service.queue; |
17 | 17 | ||
18 | import lombok.Getter; | 18 | import lombok.Getter; |
19 | +import lombok.extern.slf4j.Slf4j; | ||
19 | import org.thingsboard.server.common.data.id.RuleNodeId; | 20 | import org.thingsboard.server.common.data.id.RuleNodeId; |
20 | import org.thingsboard.server.common.data.id.TenantId; | 21 | import org.thingsboard.server.common.data.id.TenantId; |
21 | import org.thingsboard.server.common.msg.queue.RuleEngineException; | 22 | import org.thingsboard.server.common.msg.queue.RuleEngineException; |
@@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; | @@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; | ||
24 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 25 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
25 | import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; | 26 | import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; |
26 | 27 | ||
28 | +import java.util.Comparator; | ||
29 | +import java.util.Map; | ||
27 | import java.util.UUID; | 30 | import java.util.UUID; |
28 | import java.util.concurrent.ConcurrentHashMap; | 31 | import java.util.concurrent.ConcurrentHashMap; |
29 | import java.util.concurrent.ConcurrentMap; | 32 | import java.util.concurrent.ConcurrentMap; |
@@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch; | @@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch; | ||
31 | import java.util.concurrent.TimeUnit; | 34 | import java.util.concurrent.TimeUnit; |
32 | import java.util.concurrent.atomic.AtomicInteger; | 35 | import java.util.concurrent.atomic.AtomicInteger; |
33 | 36 | ||
37 | +@Slf4j | ||
34 | public class TbMsgPackProcessingContext { | 38 | public class TbMsgPackProcessingContext { |
35 | 39 | ||
40 | + private final String queueName; | ||
36 | private final TbRuleEngineSubmitStrategy submitStrategy; | 41 | private final TbRuleEngineSubmitStrategy submitStrategy; |
42 | + @Getter | ||
43 | + private final boolean profilerEnabled; | ||
37 | private final AtomicInteger pendingCount; | 44 | private final AtomicInteger pendingCount; |
38 | private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); | 45 | private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); |
39 | @Getter | 46 | @Getter |
@@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext { | @@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext { | ||
47 | 54 | ||
48 | private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>(); | 55 | private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>(); |
49 | 56 | ||
50 | - public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) { | 57 | + public TbMsgPackProcessingContext(String queueName, TbRuleEngineSubmitStrategy submitStrategy) { |
58 | + this.queueName = queueName; | ||
51 | this.submitStrategy = submitStrategy; | 59 | this.submitStrategy = submitStrategy; |
60 | + this.profilerEnabled = log.isDebugEnabled(); | ||
52 | this.pendingMap = submitStrategy.getPendingMap(); | 61 | this.pendingMap = submitStrategy.getPendingMap(); |
53 | this.pendingCount = new AtomicInteger(pendingMap.size()); | 62 | this.pendingCount = new AtomicInteger(pendingMap.size()); |
54 | } | 63 | } |
55 | 64 | ||
56 | public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { | 65 | public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { |
57 | - return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); | 66 | + boolean success = processingTimeoutLatch.await(packProcessingTimeout, milliseconds); |
67 | + if (!success && profilerEnabled) { | ||
68 | + msgProfilerMap.values().forEach(TbMsgProfilerInfo::onTimeout); | ||
69 | + } | ||
70 | + return success; | ||
58 | } | 71 | } |
59 | 72 | ||
60 | public void onSuccess(UUID id) { | 73 | public void onSuccess(UUID id) { |
@@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext { | @@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext { | ||
85 | } | 98 | } |
86 | } | 99 | } |
87 | 100 | ||
88 | - public void visit(UUID id, RuleNodeInfo ruleNodeInfo) { | 101 | + private final ConcurrentHashMap<UUID, TbMsgProfilerInfo> msgProfilerMap = new ConcurrentHashMap<>(); |
102 | + private final ConcurrentHashMap<UUID, TbRuleNodeProfilerInfo> ruleNodeProfilerMap = new ConcurrentHashMap<>(); | ||
103 | + | ||
104 | + public void onProcessingStart(UUID id, RuleNodeInfo ruleNodeInfo) { | ||
89 | lastRuleNodeMap.put(id, ruleNodeInfo); | 105 | lastRuleNodeMap.put(id, ruleNodeInfo); |
106 | + if (profilerEnabled) { | ||
107 | + msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onStart(ruleNodeInfo.getRuleNodeId()); | ||
108 | + ruleNodeProfilerMap.putIfAbsent(ruleNodeInfo.getRuleNodeId().getId(), new TbRuleNodeProfilerInfo(ruleNodeInfo)); | ||
109 | + } | ||
110 | + } | ||
111 | + | ||
112 | + public void onProcessingEnd(UUID id, RuleNodeId ruleNodeId) { | ||
113 | + if (profilerEnabled) { | ||
114 | + long processingTime = msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onEnd(ruleNodeId); | ||
115 | + if (processingTime > 0) { | ||
116 | + ruleNodeProfilerMap.computeIfAbsent(ruleNodeId.getId(), TbRuleNodeProfilerInfo::new).record(processingTime); | ||
117 | + } | ||
118 | + } | ||
119 | + } | ||
120 | + | ||
121 | + public void onTimeout(TbMsgProfilerInfo profilerInfo) { | ||
122 | + Map.Entry<UUID, Long> ruleNodeInfo = profilerInfo.onTimeout(); | ||
123 | + if (ruleNodeInfo != null) { | ||
124 | + ruleNodeProfilerMap.computeIfAbsent(ruleNodeInfo.getKey(), TbRuleNodeProfilerInfo::new).record(ruleNodeInfo.getValue()); | ||
125 | + } | ||
90 | } | 126 | } |
91 | 127 | ||
92 | public RuleNodeInfo getLastVisitedRuleNode(UUID id) { | 128 | public RuleNodeInfo getLastVisitedRuleNode(UUID id) { |
93 | return lastRuleNodeMap.get(id); | 129 | return lastRuleNodeMap.get(id); |
94 | } | 130 | } |
95 | 131 | ||
132 | + public void printProfilerStats() { | ||
133 | + if (profilerEnabled) { | ||
134 | + log.debug("Top Rule Nodes by max execution time:"); | ||
135 | + ruleNodeProfilerMap.values().stream() | ||
136 | + .sorted(Comparator.comparingLong(TbRuleNodeProfilerInfo::getMaxExecutionTime).reversed()).limit(5) | ||
137 | + .forEach(info -> log.debug("[{}][{}] max execution time: {}. {}", queueName, info.getRuleNodeId(), info.getMaxExecutionTime(), info.getLabel())); | ||
138 | + | ||
139 | + log.info("Top Rule Nodes by avg execution time:"); | ||
140 | + ruleNodeProfilerMap.values().stream() | ||
141 | + .sorted(Comparator.comparingDouble(TbRuleNodeProfilerInfo::getAvgExecutionTime).reversed()).limit(5) | ||
142 | + .forEach(info -> log.info("[{}][{}] avg execution time: {}. {}", queueName, info.getRuleNodeId(), info.getAvgExecutionTime(), info.getLabel())); | ||
143 | + | ||
144 | + log.info("Top Rule Nodes by execution count:"); | ||
145 | + ruleNodeProfilerMap.values().stream() | ||
146 | + .sorted(Comparator.comparingInt(TbRuleNodeProfilerInfo::getExecutionCount).reversed()).limit(5) | ||
147 | + .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel())); | ||
148 | + } | ||
149 | + } | ||
96 | } | 150 | } |
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.service.queue; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.thingsboard.server.common.data.id.RuleNodeId; | ||
20 | +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; | ||
21 | + | ||
22 | +import java.util.AbstractMap; | ||
23 | +import java.util.Map; | ||
24 | +import java.util.UUID; | ||
25 | +import java.util.concurrent.atomic.AtomicLong; | ||
26 | +import java.util.concurrent.locks.Lock; | ||
27 | +import java.util.concurrent.locks.ReentrantLock; | ||
28 | + | ||
29 | +@Slf4j | ||
30 | +public class TbMsgProfilerInfo { | ||
31 | + private final UUID msgId; | ||
32 | + private AtomicLong totalProcessingTime = new AtomicLong(); | ||
33 | + private Lock stateLock = new ReentrantLock(); | ||
34 | + private RuleNodeId currentRuleNodeId; | ||
35 | + private long stateChangeTime; | ||
36 | + | ||
37 | + public TbMsgProfilerInfo(UUID msgId) { | ||
38 | + this.msgId = msgId; | ||
39 | + } | ||
40 | + | ||
41 | + public void onStart(RuleNodeId ruleNodeId) { | ||
42 | + long currentTime = System.currentTimeMillis(); | ||
43 | + stateLock.lock(); | ||
44 | + try { | ||
45 | + currentRuleNodeId = ruleNodeId; | ||
46 | + stateChangeTime = currentTime; | ||
47 | + } finally { | ||
48 | + stateLock.unlock(); | ||
49 | + } | ||
50 | + } | ||
51 | + | ||
52 | + public long onEnd(RuleNodeId ruleNodeId) { | ||
53 | + long currentTime = System.currentTimeMillis(); | ||
54 | + stateLock.lock(); | ||
55 | + try { | ||
56 | + if (ruleNodeId.equals(currentRuleNodeId)) { | ||
57 | + long processingTime = currentTime - stateChangeTime; | ||
58 | + stateChangeTime = currentTime; | ||
59 | + totalProcessingTime.addAndGet(processingTime); | ||
60 | + currentRuleNodeId = null; | ||
61 | + return processingTime; | ||
62 | + } else { | ||
63 | + log.trace("[{}] Invalid sequence of rule node processing detected. Expected [{}] but was [{}]", msgId, currentRuleNodeId, ruleNodeId); | ||
64 | + return 0; | ||
65 | + } | ||
66 | + } finally { | ||
67 | + stateLock.unlock(); | ||
68 | + } | ||
69 | + } | ||
70 | + | ||
71 | + public Map.Entry<UUID, Long> onTimeout() { | ||
72 | + long currentTime = System.currentTimeMillis(); | ||
73 | + stateLock.lock(); | ||
74 | + try { | ||
75 | + if (currentRuleNodeId != null && stateChangeTime > 0) { | ||
76 | + long timeoutTime = currentTime - stateChangeTime; | ||
77 | + totalProcessingTime.addAndGet(timeoutTime); | ||
78 | + return new AbstractMap.SimpleEntry<>(currentRuleNodeId.getId(), timeoutTime); | ||
79 | + } | ||
80 | + } finally { | ||
81 | + stateLock.unlock(); | ||
82 | + } | ||
83 | + return null; | ||
84 | + } | ||
85 | +} |
application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.service.queue; | ||
17 | + | ||
18 | +import lombok.Getter; | ||
19 | +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; | ||
20 | + | ||
21 | +import java.util.UUID; | ||
22 | +import java.util.concurrent.atomic.AtomicInteger; | ||
23 | +import java.util.concurrent.atomic.AtomicLong; | ||
24 | + | ||
25 | +public class TbRuleNodeProfilerInfo { | ||
26 | + @Getter | ||
27 | + private final UUID ruleNodeId; | ||
28 | + @Getter | ||
29 | + private final String label; | ||
30 | + private AtomicInteger executionCount = new AtomicInteger(0); | ||
31 | + private AtomicLong executionTime = new AtomicLong(0); | ||
32 | + private AtomicLong maxExecutionTime = new AtomicLong(0); | ||
33 | + | ||
34 | + public TbRuleNodeProfilerInfo(RuleNodeInfo ruleNodeInfo) { | ||
35 | + this.ruleNodeId = ruleNodeInfo.getRuleNodeId().getId(); | ||
36 | + this.label = ruleNodeInfo.toString(); | ||
37 | + } | ||
38 | + | ||
39 | + public TbRuleNodeProfilerInfo(UUID ruleNodeId) { | ||
40 | + this.ruleNodeId = ruleNodeId; | ||
41 | + this.label = ""; | ||
42 | + } | ||
43 | + | ||
44 | + public void record(long processingTime) { | ||
45 | + executionCount.incrementAndGet(); | ||
46 | + executionTime.addAndGet(processingTime); | ||
47 | + while (true) { | ||
48 | + long value = maxExecutionTime.get(); | ||
49 | + if (value >= processingTime) { | ||
50 | + break; | ||
51 | + } | ||
52 | + if (maxExecutionTime.compareAndSet(value, processingTime)) { | ||
53 | + break; | ||
54 | + } | ||
55 | + } | ||
56 | + } | ||
57 | + | ||
58 | + int getExecutionCount() { | ||
59 | + return executionCount.get(); | ||
60 | + } | ||
61 | + | ||
62 | + long getMaxExecutionTime() { | ||
63 | + return maxExecutionTime.get(); | ||
64 | + } | ||
65 | + | ||
66 | + double getAvgExecutionTime() { | ||
67 | + double executionCnt = (double) executionCount.get(); | ||
68 | + if (executionCnt > 0) { | ||
69 | + return executionTime.get() / executionCnt; | ||
70 | + } else { | ||
71 | + return 0.0; | ||
72 | + } | ||
73 | + } | ||
74 | + | ||
75 | +} |
@@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS | @@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS | ||
68 | int listSize = orderedMsgList.size(); | 68 | int listSize = orderedMsgList.size(); |
69 | int startIdx = Math.min(packIdx.get() * batchSize, listSize); | 69 | int startIdx = Math.min(packIdx.get() * batchSize, listSize); |
70 | int endIdx = Math.min(startIdx + batchSize, listSize); | 70 | int endIdx = Math.min(startIdx + batchSize, listSize); |
71 | + Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tmpPack; | ||
71 | synchronized (pendingPack) { | 72 | synchronized (pendingPack) { |
72 | pendingPack.clear(); | 73 | pendingPack.clear(); |
73 | for (int i = startIdx; i < endIdx; i++) { | 74 | for (int i = startIdx; i < endIdx; i++) { |
74 | IdMsgPair pair = orderedMsgList.get(i); | 75 | IdMsgPair pair = orderedMsgList.get(i); |
75 | pendingPack.put(pair.uuid, pair.msg); | 76 | pendingPack.put(pair.uuid, pair.msg); |
76 | } | 77 | } |
78 | + tmpPack = new LinkedHashMap<>(pendingPack); | ||
77 | } | 79 | } |
78 | int submitSize = pendingPack.size(); | 80 | int submitSize = pendingPack.size(); |
79 | if (log.isDebugEnabled() && submitSize > 0) { | 81 | if (log.isDebugEnabled() && submitSize > 0) { |
80 | log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); | 82 | log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); |
81 | } | 83 | } |
82 | - pendingPack.forEach(msgConsumer); | 84 | + tmpPack.forEach(msgConsumer); |
83 | } | 85 | } |
84 | 86 | ||
85 | } | 87 | } |
@@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest { | @@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest { | ||
51 | messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null)); | 51 | messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null)); |
52 | } | 52 | } |
53 | when(strategyMock.getPendingMap()).thenReturn(messages); | 53 | when(strategyMock.getPendingMap()).thenReturn(messages); |
54 | - TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock); | 54 | + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock); |
55 | for (UUID uuid : messages.keySet()) { | 55 | for (UUID uuid : messages.keySet()) { |
56 | for (int i = 0; i < parallelCount; i++) { | 56 | for (int i = 0; i < parallelCount; i++) { |
57 | executorService.submit(() -> context.onSuccess(uuid)); | 57 | executorService.submit(() -> context.onSuccess(uuid)); |
@@ -15,12 +15,16 @@ | @@ -15,12 +15,16 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.common.msg.queue; | 16 | package org.thingsboard.server.common.msg.queue; |
17 | 17 | ||
18 | +import lombok.Getter; | ||
18 | import org.thingsboard.server.common.data.id.RuleNodeId; | 19 | import org.thingsboard.server.common.data.id.RuleNodeId; |
19 | 20 | ||
20 | public class RuleNodeInfo { | 21 | public class RuleNodeInfo { |
21 | private final String label; | 22 | private final String label; |
23 | + @Getter | ||
24 | + private final RuleNodeId ruleNodeId; | ||
22 | 25 | ||
23 | public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { | 26 | public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { |
27 | + this.ruleNodeId = id; | ||
24 | this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; | 28 | this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; |
25 | } | 29 | } |
26 | 30 |
@@ -15,6 +15,8 @@ | @@ -15,6 +15,8 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.common.msg.queue; | 16 | package org.thingsboard.server.common.msg.queue; |
17 | 17 | ||
18 | +import org.thingsboard.server.common.data.id.RuleNodeId; | ||
19 | + | ||
18 | public interface TbMsgCallback { | 20 | public interface TbMsgCallback { |
19 | 21 | ||
20 | TbMsgCallback EMPTY = new TbMsgCallback() { | 22 | TbMsgCallback EMPTY = new TbMsgCallback() { |
@@ -34,7 +36,11 @@ public interface TbMsgCallback { | @@ -34,7 +36,11 @@ public interface TbMsgCallback { | ||
34 | 36 | ||
35 | void onFailure(RuleEngineException e); | 37 | void onFailure(RuleEngineException e); |
36 | 38 | ||
37 | - default void visit(RuleNodeInfo ruleNodeInfo) { | 39 | + default void onProcessingStart(RuleNodeInfo ruleNodeInfo) { |
40 | + } | ||
41 | + | ||
42 | + default void onProcessingEnd(RuleNodeId ruleNodeId) { | ||
38 | } | 43 | } |
39 | 44 | ||
45 | + | ||
40 | } | 46 | } |