|
@@ -23,16 +23,29 @@ import java.util.Collections; |
|
@@ -23,16 +23,29 @@ import java.util.Collections; |
23
|
import java.util.List;
|
23
|
import java.util.List;
|
24
|
import java.util.concurrent.BlockingQueue;
|
24
|
import java.util.concurrent.BlockingQueue;
|
25
|
import java.util.concurrent.ConcurrentHashMap;
|
25
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
26
|
+import java.util.concurrent.Executors;
|
26
|
import java.util.concurrent.LinkedBlockingQueue;
|
27
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
28
|
+import java.util.concurrent.ScheduledExecutorService;
|
27
|
import java.util.concurrent.TimeUnit;
|
29
|
import java.util.concurrent.TimeUnit;
|
28
|
|
30
|
|
29
|
@Slf4j
|
31
|
@Slf4j
|
30
|
public final class InMemoryStorage {
|
32
|
public final class InMemoryStorage {
|
31
|
private static InMemoryStorage instance;
|
33
|
private static InMemoryStorage instance;
|
32
|
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
|
34
|
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
|
|
|
35
|
+ private static ScheduledExecutorService statExecutor;
|
33
|
|
36
|
|
34
|
private InMemoryStorage() {
|
37
|
private InMemoryStorage() {
|
35
|
storage = new ConcurrentHashMap<>();
|
38
|
storage = new ConcurrentHashMap<>();
|
|
|
39
|
+ statExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
40
|
+ statExecutor.scheduleAtFixedRate(this::printStats, 30, 30, TimeUnit.SECONDS);
|
|
|
41
|
+ }
|
|
|
42
|
+
|
|
|
43
|
+ private void printStats() {
|
|
|
44
|
+ storage.forEach((topic, queue) -> {
|
|
|
45
|
+ if (queue.size() > 0) {
|
|
|
46
|
+ log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
|
|
|
47
|
+ }
|
|
|
48
|
+ });
|
36
|
}
|
49
|
}
|
37
|
|
50
|
|
38
|
public static InMemoryStorage getInstance() {
|
51
|
public static InMemoryStorage getInstance() {
|
|
@@ -77,4 +90,9 @@ public final class InMemoryStorage { |
|
@@ -77,4 +90,9 @@ public final class InMemoryStorage { |
77
|
storage.clear();
|
90
|
storage.clear();
|
78
|
}
|
91
|
}
|
79
|
|
92
|
|
|
|
93
|
+ public void destroy() {
|
|
|
94
|
+ if (statExecutor != null) {
|
|
|
95
|
+ statExecutor.shutdownNow();
|
|
|
96
|
+ }
|
|
|
97
|
+ }
|
80
|
} |
98
|
} |