Commit 03fbc4665439e2212ffeae769bf76dc65097e16e

Authored by Igor Kulikov
Committed by GitHub
2 parents 8ec23f22 e5ad5e0b

Merge pull request #3383 from YevhenBondarenko/queue/improvements

in memory queue logs
@@ -29,6 +29,8 @@ @@ -29,6 +29,8 @@
29 29
30 <!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />--> 30 <!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
31 <!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />--> 31 <!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
  32 +<!-- <logger name="org.thingsboard.server.queue.memory.InMemoryStorage" level="DEBUG" />-->
  33 +
32 34
33 <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" /> 35 <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
34 36
@@ -594,6 +594,10 @@ swagger: @@ -594,6 +594,10 @@ swagger:
594 594
595 queue: 595 queue:
596 type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) 596 type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
  597 + in_memory:
  598 + stats:
  599 + # For debug lvl
  600 + print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
597 kafka: 601 kafka:
598 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" 602 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
599 acks: "${TB_KAFKA_ACKS:all}" 603 acks: "${TB_KAFKA_ACKS:all}"
@@ -23,27 +23,21 @@ import java.util.Collections; @@ -23,27 +23,21 @@ import java.util.Collections;
23 import java.util.List; 23 import java.util.List;
24 import java.util.concurrent.BlockingQueue; 24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap; 25 import java.util.concurrent.ConcurrentHashMap;
26 -import java.util.concurrent.Executors;  
27 import java.util.concurrent.LinkedBlockingQueue; 26 import java.util.concurrent.LinkedBlockingQueue;
28 -import java.util.concurrent.ScheduledExecutorService;  
29 -import java.util.concurrent.TimeUnit;  
30 27
31 @Slf4j 28 @Slf4j
32 public final class InMemoryStorage { 29 public final class InMemoryStorage {
33 private static InMemoryStorage instance; 30 private static InMemoryStorage instance;
34 private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage; 31 private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
35 - private static ScheduledExecutorService statExecutor;  
36 32
37 private InMemoryStorage() { 33 private InMemoryStorage() {
38 storage = new ConcurrentHashMap<>(); 34 storage = new ConcurrentHashMap<>();
39 - statExecutor = Executors.newSingleThreadScheduledExecutor();  
40 - statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);  
41 } 35 }
42 36
43 - private void printStats() { 37 + public void printStats() {
44 storage.forEach((topic, queue) -> { 38 storage.forEach((topic, queue) -> {
45 if (queue.size() > 0) { 39 if (queue.size() > 0) {
46 - log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size()); 40 + log.debug("[{}] Queue Size [{}]", topic, queue.size());
47 } 41 }
48 }); 42 });
49 } 43 }
@@ -90,9 +84,4 @@ public final class InMemoryStorage { @@ -90,9 +84,4 @@ public final class InMemoryStorage {
90 storage.clear(); 84 storage.clear();
91 } 85 }
92 86
93 - public void destroy() {  
94 - if (statExecutor != null) {  
95 - statExecutor.shutdownNow();  
96 - }  
97 - }  
98 } 87 }
@@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro @@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
53 53
54 @Override 54 @Override
55 public void stop() { 55 public void stop() {
56 - storage.destroy(); 56 +
57 } 57 }
58 } 58 }
@@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider; @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.provider;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.scheduling.annotation.Scheduled;
20 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
21 import org.thingsboard.server.common.msg.queue.ServiceType; 22 import org.thingsboard.server.common.msg.queue.ServiceType;
22 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
@@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; @@ -28,6 +29,7 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
28 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 29 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 import org.thingsboard.server.queue.discovery.PartitionService; 30 import org.thingsboard.server.queue.discovery.PartitionService;
30 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; 31 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  32 +import org.thingsboard.server.queue.memory.InMemoryStorage;
31 import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; 33 import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
32 import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; 34 import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
33 import org.thingsboard.server.queue.settings.TbQueueCoreSettings; 35 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
@@ -47,6 +49,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -47,6 +49,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
47 private final TbQueueRuleEngineSettings ruleEngineSettings; 49 private final TbQueueRuleEngineSettings ruleEngineSettings;
48 private final TbQueueTransportApiSettings transportApiSettings; 50 private final TbQueueTransportApiSettings transportApiSettings;
49 private final TbQueueTransportNotificationSettings transportNotificationSettings; 51 private final TbQueueTransportNotificationSettings transportNotificationSettings;
  52 + private final InMemoryStorage storage;
50 53
51 public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, 54 public InMemoryMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
52 TbQueueRuleEngineSettings ruleEngineSettings, 55 TbQueueRuleEngineSettings ruleEngineSettings,
@@ -59,6 +62,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -59,6 +62,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
59 this.ruleEngineSettings = ruleEngineSettings; 62 this.ruleEngineSettings = ruleEngineSettings;
60 this.transportApiSettings = transportApiSettings; 63 this.transportApiSettings = transportApiSettings;
61 this.transportNotificationSettings = transportNotificationSettings; 64 this.transportNotificationSettings = transportNotificationSettings;
  65 + this.storage = InMemoryStorage.getInstance();
62 } 66 }
63 67
64 @Override 68 @Override
@@ -120,4 +124,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -120,4 +124,9 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
120 public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { 124 public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
121 return null; 125 return null;
122 } 126 }
  127 +
  128 + @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")
  129 + private void printInMemoryStats() {
  130 + storage.printStats();
  131 + }
123 } 132 }