Commit 05e2981b364546693245bccea37b0b37194d7822
Merge branch 'develop/2.2' of github.com:thingsboard/thingsboard into develop/2.2
Showing
2 changed files
with
7 additions
and
0 deletions
... | ... | @@ -96,6 +96,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
96 | 96 | builder.maxRequestTimeout(maxRequestsTimeout); |
97 | 97 | builder.pollInterval(responsePollDuration); |
98 | 98 | kafkaTemplate = builder.build(); |
99 | + kafkaTemplate.init(); | |
99 | 100 | } |
100 | 101 | |
101 | 102 | @PreDestroy | ... | ... |
... | ... | @@ -48,6 +48,7 @@ public class TbKafkaRequestTemplate<Request, Response> { |
48 | 48 | private final TBKafkaProducerTemplate<Request> requestTemplate; |
49 | 49 | private final TBKafkaConsumerTemplate<Response> responseTemplate; |
50 | 50 | private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests; |
51 | + private final boolean internalExecutor; | |
51 | 52 | private final ExecutorService executor; |
52 | 53 | private final long maxRequestTimeout; |
53 | 54 | private final long maxPendingRequests; |
... | ... | @@ -69,8 +70,10 @@ public class TbKafkaRequestTemplate<Request, Response> { |
69 | 70 | this.maxPendingRequests = maxPendingRequests; |
70 | 71 | this.pollInterval = pollInterval; |
71 | 72 | if (executor != null) { |
73 | + internalExecutor = false; | |
72 | 74 | this.executor = executor; |
73 | 75 | } else { |
76 | + internalExecutor = true; | |
74 | 77 | this.executor = Executors.newSingleThreadExecutor(); |
75 | 78 | } |
76 | 79 | } |
... | ... | @@ -126,6 +129,9 @@ public class TbKafkaRequestTemplate<Request, Response> { |
126 | 129 | |
127 | 130 | public void stop() { |
128 | 131 | stopped = true; |
132 | + if (internalExecutor) { | |
133 | + executor.shutdownNow(); | |
134 | + } | |
129 | 135 | } |
130 | 136 | |
131 | 137 | public ListenableFuture<Response> post(String key, Request request) { | ... | ... |