Commit 8ebaa22ae2b69a250bb874d130c23dc42066b6ef

Authored by Igor Kulikov
Committed by GitHub
2 parents e33709d8 17ee07e0

Merge pull request #3381 from YevhenBondarenko/queue/improvements

added logs for in memory queue
... ... @@ -23,16 +23,29 @@ import java.util.Collections;
23 23 import java.util.List;
24 24 import java.util.concurrent.BlockingQueue;
25 25 import java.util.concurrent.ConcurrentHashMap;
  26 +import java.util.concurrent.Executors;
26 27 import java.util.concurrent.LinkedBlockingQueue;
  28 +import java.util.concurrent.ScheduledExecutorService;
27 29 import java.util.concurrent.TimeUnit;
28 30
29 31 @Slf4j
30 32 public final class InMemoryStorage {
31 33 private static InMemoryStorage instance;
32 34 private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
  35 + private static ScheduledExecutorService statExecutor;
33 36
34 37 private InMemoryStorage() {
35 38 storage = new ConcurrentHashMap<>();
  39 + statExecutor = Executors.newSingleThreadScheduledExecutor();
  40 + statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
  41 + }
  42 +
  43 + private void printStats() {
  44 + storage.forEach((topic, queue) -> {
  45 + if (queue.size() > 0) {
  46 + log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
  47 + }
  48 + });
36 49 }
37 50
38 51 public static InMemoryStorage getInstance() {
... ... @@ -77,4 +90,9 @@ public final class InMemoryStorage {
77 90 storage.clear();
78 91 }
79 92
  93 + public void destroy() {
  94 + if (statExecutor != null) {
  95 + statExecutor.shutdownNow();
  96 + }
  97 + }
80 98 }
... ...
... ... @@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
53 53
54 54 @Override
55 55 public void stop() {
56   -
  56 + storage.destroy();
57 57 }
58 58 }
... ...