Commit 18662201f14934b62d6388c873370dbc63db34cf
1 parent
34c149de
Improved shutdown for BufferedRateExecutor
Showing
1 changed file
with
11 additions
and
6 deletions
1 | /** | 1 | /** |
2 | * Copyright © 2016-2018 The Thingsboard Authors | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | 3 | + * <p> |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. | 5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at | 6 | * You may obtain a copy of the License at |
7 | - * | ||
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | - * | 7 | + * <p> |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * <p> | ||
10 | * Unless required by applicable law or agreed to in writing, software | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
@@ -26,6 +26,7 @@ import java.util.UUID; | @@ -26,6 +26,7 @@ import java.util.UUID; | ||
26 | import java.util.concurrent.BlockingQueue; | 26 | import java.util.concurrent.BlockingQueue; |
27 | import java.util.concurrent.ExecutorService; | 27 | import java.util.concurrent.ExecutorService; |
28 | import java.util.concurrent.Executors; | 28 | import java.util.concurrent.Executors; |
29 | +import java.util.concurrent.Future; | ||
29 | import java.util.concurrent.LinkedBlockingDeque; | 30 | import java.util.concurrent.LinkedBlockingDeque; |
30 | import java.util.concurrent.ScheduledExecutorService; | 31 | import java.util.concurrent.ScheduledExecutorService; |
31 | import java.util.concurrent.TimeUnit; | 32 | import java.util.concurrent.TimeUnit; |
@@ -45,6 +46,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | @@ -45,6 +46,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | ||
45 | private final ExecutorService callbackExecutor; | 46 | private final ExecutorService callbackExecutor; |
46 | private final ScheduledExecutorService timeoutExecutor; | 47 | private final ScheduledExecutorService timeoutExecutor; |
47 | private final int concurrencyLimit; | 48 | private final int concurrencyLimit; |
49 | + private volatile boolean stopped; | ||
48 | 50 | ||
49 | protected final AtomicInteger concurrencyLevel = new AtomicInteger(); | 51 | protected final AtomicInteger concurrencyLevel = new AtomicInteger(); |
50 | protected final AtomicInteger totalAdded = new AtomicInteger(); | 52 | protected final AtomicInteger totalAdded = new AtomicInteger(); |
@@ -106,7 +108,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | @@ -106,7 +108,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | ||
106 | AsyncTaskContext<T, V> taskCtx = null; | 108 | AsyncTaskContext<T, V> taskCtx = null; |
107 | try { | 109 | try { |
108 | if (curLvl <= concurrencyLimit) { | 110 | if (curLvl <= concurrencyLimit) { |
109 | - taskCtx = queue.take(); | 111 | + taskCtx = queue.poll(1, TimeUnit.SECONDS); |
112 | + if (taskCtx == null) { | ||
113 | + continue; | ||
114 | + } | ||
110 | final AsyncTaskContext<T, V> finalTaskCtx = taskCtx; | 115 | final AsyncTaskContext<T, V> finalTaskCtx = taskCtx; |
111 | logTask("Processing", finalTaskCtx); | 116 | logTask("Processing", finalTaskCtx); |
112 | concurrencyLevel.incrementAndGet(); | 117 | concurrencyLevel.incrementAndGet(); |
@@ -167,7 +172,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | @@ -167,7 +172,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend | ||
167 | } | 172 | } |
168 | } | 173 | } |
169 | 174 | ||
170 | - protected int getQueueSize(){ | 175 | + protected int getQueueSize() { |
171 | return queue.size(); | 176 | return queue.size(); |
172 | } | 177 | } |
173 | } | 178 | } |