...
|
...
|
@@ -48,6 +48,7 @@ public class TbKafkaRequestTemplate<Request, Response> { |
48
|
48
|
private final TBKafkaProducerTemplate<Request> requestTemplate;
|
49
|
49
|
private final TBKafkaConsumerTemplate<Response> responseTemplate;
|
50
|
50
|
private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
|
|
51
|
+ private final boolean internalExecutor;
|
51
|
52
|
private final ExecutorService executor;
|
52
|
53
|
private final long maxRequestTimeout;
|
53
|
54
|
private final long maxPendingRequests;
|
...
|
...
|
@@ -69,8 +70,10 @@ public class TbKafkaRequestTemplate<Request, Response> { |
69
|
70
|
this.maxPendingRequests = maxPendingRequests;
|
70
|
71
|
this.pollInterval = pollInterval;
|
71
|
72
|
if (executor != null) {
|
|
73
|
+ internalExecutor = false;
|
72
|
74
|
this.executor = executor;
|
73
|
75
|
} else {
|
|
76
|
+ internalExecutor = true;
|
74
|
77
|
this.executor = Executors.newSingleThreadExecutor();
|
75
|
78
|
}
|
76
|
79
|
}
|
...
|
...
|
@@ -126,6 +129,9 @@ public class TbKafkaRequestTemplate<Request, Response> { |
126
|
129
|
|
127
|
130
|
public void stop() {
|
128
|
131
|
stopped = true;
|
|
132
|
+ if (internalExecutor) {
|
|
133
|
+ executor.shutdownNow();
|
|
134
|
+ }
|
129
|
135
|
}
|
130
|
136
|
|
131
|
137
|
public ListenableFuture<Response> post(String key, Request request) {
|
...
|
...
|
|