Commit 445350b1f800b90193f50ee38556e5187acff9e7

Authored by Andrii Shvaika
1 parent 57076e64

Top 5 rule nodes statistics

... ... @@ -105,6 +105,7 @@ class DefaultTbContext implements TbContext {
105 105 if (nodeCtx.getSelf().isDebugMode()) {
106 106 relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
107 107 }
  108 + msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
108 109 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
109 110 }
110 111
... ... @@ -203,6 +204,7 @@ class DefaultTbContext implements TbContext {
203 204 if (nodeCtx.getSelf().isDebugMode()) {
204 205 mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null);
205 206 }
  207 + tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
206 208 tbMsg.getCallback().onSuccess();
207 209 }
208 210
... ...
... ... @@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
103 103 }
104 104
105 105 void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
106   - msg.getMsg().getCallback().visit(info);
  106 + msg.getMsg().getCallback().onProcessingStart(info);
107 107 checkActive(msg.getMsg());
108 108 if (ruleNode.isDebugMode()) {
109 109 systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
... ...
... ... @@ -144,7 +144,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
144 144 submitStrategy.init(msgs);
145 145
146 146 while (!stopped) {
147   - TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(submitStrategy);
  147 + TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy);
148 148 submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> {
149 149 log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
150 150 ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
... ... @@ -175,6 +175,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
175 175 if (!ctx.getFailedMap().isEmpty()) {
176 176 printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
177 177 }
  178 + ctx.printProfilerStats();
  179 +
178 180 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
179 181 if (statsEnabled) {
180 182 stats.log(result, decision.isCommit());
... ...
... ... @@ -67,8 +67,14 @@ public class TbMsgPackCallback implements TbMsgCallback {
67 67 }
68 68
69 69 @Override
70   - public void visit(RuleNodeInfo ruleNodeInfo) {
71   - log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo);
72   - ctx.visit(id, ruleNodeInfo);
  70 + public void onProcessingStart(RuleNodeInfo ruleNodeInfo) {
  71 + log.trace("[{}] ON PROCESSING START: {}", id, ruleNodeInfo);
  72 + ctx.onProcessingStart(id, ruleNodeInfo);
  73 + }
  74 +
  75 + @Override
  76 + public void onProcessingEnd(RuleNodeId ruleNodeId) {
  77 + log.trace("[{}] ON PROCESSING END: {}", id, ruleNodeId);
  78 + ctx.onProcessingEnd(id, ruleNodeId);
73 79 }
74 80 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.queue;
17 17
18 18 import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
19 20 import org.thingsboard.server.common.data.id.RuleNodeId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
21 22 import org.thingsboard.server.common.msg.queue.RuleEngineException;
... ... @@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos;
24 25 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
25 26 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
26 27
  28 +import java.util.Comparator;
  29 +import java.util.Map;
27 30 import java.util.UUID;
28 31 import java.util.concurrent.ConcurrentHashMap;
29 32 import java.util.concurrent.ConcurrentMap;
... ... @@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch;
31 34 import java.util.concurrent.TimeUnit;
32 35 import java.util.concurrent.atomic.AtomicInteger;
33 36
  37 +@Slf4j
34 38 public class TbMsgPackProcessingContext {
35 39
  40 + private final String queueName;
36 41 private final TbRuleEngineSubmitStrategy submitStrategy;
  42 + @Getter
  43 + private final boolean profilerEnabled;
37 44 private final AtomicInteger pendingCount;
38 45 private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
39 46 @Getter
... ... @@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext {
47 54
48 55 private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>();
49 56
50   - public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) {
  57 + public TbMsgPackProcessingContext(String queueName, TbRuleEngineSubmitStrategy submitStrategy) {
  58 + this.queueName = queueName;
51 59 this.submitStrategy = submitStrategy;
  60 + this.profilerEnabled = log.isDebugEnabled();
52 61 this.pendingMap = submitStrategy.getPendingMap();
53 62 this.pendingCount = new AtomicInteger(pendingMap.size());
54 63 }
55 64
56 65 public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException {
57   - return processingTimeoutLatch.await(packProcessingTimeout, milliseconds);
  66 + boolean success = processingTimeoutLatch.await(packProcessingTimeout, milliseconds);
  67 + if (!success && profilerEnabled) {
  68 + msgProfilerMap.values().forEach(TbMsgProfilerInfo::onTimeout);
  69 + }
  70 + return success;
58 71 }
59 72
60 73 public void onSuccess(UUID id) {
... ... @@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext {
85 98 }
86 99 }
87 100
88   - public void visit(UUID id, RuleNodeInfo ruleNodeInfo) {
  101 + private final ConcurrentHashMap<UUID, TbMsgProfilerInfo> msgProfilerMap = new ConcurrentHashMap<>();
  102 + private final ConcurrentHashMap<UUID, TbRuleNodeProfilerInfo> ruleNodeProfilerMap = new ConcurrentHashMap<>();
  103 +
  104 + public void onProcessingStart(UUID id, RuleNodeInfo ruleNodeInfo) {
89 105 lastRuleNodeMap.put(id, ruleNodeInfo);
  106 + if (profilerEnabled) {
  107 + msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onStart(ruleNodeInfo.getRuleNodeId());
  108 + ruleNodeProfilerMap.putIfAbsent(ruleNodeInfo.getRuleNodeId().getId(), new TbRuleNodeProfilerInfo(ruleNodeInfo));
  109 + }
  110 + }
  111 +
  112 + public void onProcessingEnd(UUID id, RuleNodeId ruleNodeId) {
  113 + if (profilerEnabled) {
  114 + long processingTime = msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onEnd(ruleNodeId);
  115 + if (processingTime > 0) {
  116 + ruleNodeProfilerMap.computeIfAbsent(ruleNodeId.getId(), TbRuleNodeProfilerInfo::new).record(processingTime);
  117 + }
  118 + }
  119 + }
  120 +
  121 + public void onTimeout(TbMsgProfilerInfo profilerInfo) {
  122 + Map.Entry<UUID, Long> ruleNodeInfo = profilerInfo.onTimeout();
  123 + if (ruleNodeInfo != null) {
  124 + ruleNodeProfilerMap.computeIfAbsent(ruleNodeInfo.getKey(), TbRuleNodeProfilerInfo::new).record(ruleNodeInfo.getValue());
  125 + }
90 126 }
91 127
92 128 public RuleNodeInfo getLastVisitedRuleNode(UUID id) {
93 129 return lastRuleNodeMap.get(id);
94 130 }
95 131
  132 + public void printProfilerStats() {
  133 + if (profilerEnabled) {
  134 + log.debug("Top Rule Nodes by max execution time:");
  135 + ruleNodeProfilerMap.values().stream()
  136 + .sorted(Comparator.comparingLong(TbRuleNodeProfilerInfo::getMaxExecutionTime).reversed()).limit(5)
  137 + .forEach(info -> log.debug("[{}][{}] max execution time: {}. {}", queueName, info.getRuleNodeId(), info.getMaxExecutionTime(), info.getLabel()));
  138 +
  139 + log.info("Top Rule Nodes by avg execution time:");
  140 + ruleNodeProfilerMap.values().stream()
  141 + .sorted(Comparator.comparingDouble(TbRuleNodeProfilerInfo::getAvgExecutionTime).reversed()).limit(5)
  142 + .forEach(info -> log.info("[{}][{}] avg execution time: {}. {}", queueName, info.getRuleNodeId(), info.getAvgExecutionTime(), info.getLabel()));
  143 +
  144 + log.info("Top Rule Nodes by execution count:");
  145 + ruleNodeProfilerMap.values().stream()
  146 + .sorted(Comparator.comparingInt(TbRuleNodeProfilerInfo::getExecutionCount).reversed()).limit(5)
  147 + .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel()));
  148 + }
  149 + }
96 150 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.common.data.id.RuleNodeId;
  20 +import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
  21 +
  22 +import java.util.AbstractMap;
  23 +import java.util.Map;
  24 +import java.util.UUID;
  25 +import java.util.concurrent.atomic.AtomicLong;
  26 +import java.util.concurrent.locks.Lock;
  27 +import java.util.concurrent.locks.ReentrantLock;
  28 +
  29 +@Slf4j
  30 +public class TbMsgProfilerInfo {
  31 + private final UUID msgId;
  32 + private AtomicLong totalProcessingTime = new AtomicLong();
  33 + private Lock stateLock = new ReentrantLock();
  34 + private RuleNodeId currentRuleNodeId;
  35 + private long stateChangeTime;
  36 +
  37 + public TbMsgProfilerInfo(UUID msgId) {
  38 + this.msgId = msgId;
  39 + }
  40 +
  41 + public void onStart(RuleNodeId ruleNodeId) {
  42 + long currentTime = System.currentTimeMillis();
  43 + stateLock.lock();
  44 + try {
  45 + currentRuleNodeId = ruleNodeId;
  46 + stateChangeTime = currentTime;
  47 + } finally {
  48 + stateLock.unlock();
  49 + }
  50 + }
  51 +
  52 + public long onEnd(RuleNodeId ruleNodeId) {
  53 + long currentTime = System.currentTimeMillis();
  54 + stateLock.lock();
  55 + try {
  56 + if (ruleNodeId.equals(currentRuleNodeId)) {
  57 + long processingTime = currentTime - stateChangeTime;
  58 + stateChangeTime = currentTime;
  59 + totalProcessingTime.addAndGet(processingTime);
  60 + currentRuleNodeId = null;
  61 + return processingTime;
  62 + } else {
  63 + log.trace("[{}] Invalid sequence of rule node processing detected. Expected [{}] but was [{}]", msgId, currentRuleNodeId, ruleNodeId);
  64 + return 0;
  65 + }
  66 + } finally {
  67 + stateLock.unlock();
  68 + }
  69 + }
  70 +
  71 + public Map.Entry<UUID, Long> onTimeout() {
  72 + long currentTime = System.currentTimeMillis();
  73 + stateLock.lock();
  74 + try {
  75 + if (currentRuleNodeId != null && stateChangeTime > 0) {
  76 + long timeoutTime = currentTime - stateChangeTime;
  77 + totalProcessingTime.addAndGet(timeoutTime);
  78 + return new AbstractMap.SimpleEntry<>(currentRuleNodeId.getId(), timeoutTime);
  79 + }
  80 + } finally {
  81 + stateLock.unlock();
  82 + }
  83 + return null;
  84 + }
  85 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
  20 +
  21 +import java.util.UUID;
  22 +import java.util.concurrent.atomic.AtomicInteger;
  23 +import java.util.concurrent.atomic.AtomicLong;
  24 +
  25 +public class TbRuleNodeProfilerInfo {
  26 + @Getter
  27 + private final UUID ruleNodeId;
  28 + @Getter
  29 + private final String label;
  30 + private AtomicInteger executionCount = new AtomicInteger(0);
  31 + private AtomicLong executionTime = new AtomicLong(0);
  32 + private AtomicLong maxExecutionTime = new AtomicLong(0);
  33 +
  34 + public TbRuleNodeProfilerInfo(RuleNodeInfo ruleNodeInfo) {
  35 + this.ruleNodeId = ruleNodeInfo.getRuleNodeId().getId();
  36 + this.label = ruleNodeInfo.toString();
  37 + }
  38 +
  39 + public TbRuleNodeProfilerInfo(UUID ruleNodeId) {
  40 + this.ruleNodeId = ruleNodeId;
  41 + this.label = "";
  42 + }
  43 +
  44 + public void record(long processingTime) {
  45 + executionCount.incrementAndGet();
  46 + executionTime.addAndGet(processingTime);
  47 + while (true) {
  48 + long value = maxExecutionTime.get();
  49 + if (value >= processingTime) {
  50 + break;
  51 + }
  52 + if (maxExecutionTime.compareAndSet(value, processingTime)) {
  53 + break;
  54 + }
  55 + }
  56 + }
  57 +
  58 + int getExecutionCount() {
  59 + return executionCount.get();
  60 + }
  61 +
  62 + long getMaxExecutionTime() {
  63 + return maxExecutionTime.get();
  64 + }
  65 +
  66 + double getAvgExecutionTime() {
  67 + double executionCnt = (double) executionCount.get();
  68 + if (executionCnt > 0) {
  69 + return executionTime.get() / executionCnt;
  70 + } else {
  71 + return 0.0;
  72 + }
  73 + }
  74 +
  75 +}
\ No newline at end of file
... ...
... ... @@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
68 68 int listSize = orderedMsgList.size();
69 69 int startIdx = Math.min(packIdx.get() * batchSize, listSize);
70 70 int endIdx = Math.min(startIdx + batchSize, listSize);
  71 + Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tmpPack;
71 72 synchronized (pendingPack) {
72 73 pendingPack.clear();
73 74 for (int i = startIdx; i < endIdx; i++) {
74 75 IdMsgPair pair = orderedMsgList.get(i);
75 76 pendingPack.put(pair.uuid, pair.msg);
76 77 }
  78 + tmpPack = new LinkedHashMap<>(pendingPack);
77 79 }
78 80 int submitSize = pendingPack.size();
79 81 if (log.isDebugEnabled() && submitSize > 0) {
80 82 log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
81 83 }
82   - pendingPack.forEach(msgConsumer);
  84 + tmpPack.forEach(msgConsumer);
83 85 }
84 86
85 87 }
... ...
... ... @@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest {
51 51 messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null));
52 52 }
53 53 when(strategyMock.getPendingMap()).thenReturn(messages);
54   - TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock);
  54 + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock);
55 55 for (UUID uuid : messages.keySet()) {
56 56 for (int i = 0; i < parallelCount; i++) {
57 57 executorService.submit(() -> context.onSuccess(uuid));
... ...
... ... @@ -15,12 +15,16 @@
15 15 */
16 16 package org.thingsboard.server.common.msg.queue;
17 17
  18 +import lombok.Getter;
18 19 import org.thingsboard.server.common.data.id.RuleNodeId;
19 20
20 21 public class RuleNodeInfo {
21 22 private final String label;
  23 + @Getter
  24 + private final RuleNodeId ruleNodeId;
22 25
23 26 public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) {
  27 + this.ruleNodeId = id;
24 28 this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]";
25 29 }
26 30
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.common.msg.queue;
17 17
  18 +import org.thingsboard.server.common.data.id.RuleNodeId;
  19 +
18 20 public interface TbMsgCallback {
19 21
20 22 TbMsgCallback EMPTY = new TbMsgCallback() {
... ... @@ -34,7 +36,11 @@ public interface TbMsgCallback {
34 36
35 37 void onFailure(RuleEngineException e);
36 38
37   - default void visit(RuleNodeInfo ruleNodeInfo) {
  39 + default void onProcessingStart(RuleNodeInfo ruleNodeInfo) {
  40 + }
  41 +
  42 + default void onProcessingEnd(RuleNodeId ruleNodeId) {
38 43 }
39 44
  45 +
40 46 }
... ...