Commit 0723988061a74ddf06db72db7903bfaecfa51ab4

Authored by Andrew Shvayka
1 parent 8523ebda

Fixed Shutdown behaviour

@@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
90 90
91 @Override 91 @Override
92 public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) { 92 public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
93 - log.trace("[{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getDeviceId()); 93 + log.trace("[{}][{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
94 UUID requestId = request.getId(); 94 UUID requestId = request.getId();
95 localToRuleEngineRpcRequests.put(requestId, responseConsumer); 95 localToRuleEngineRpcRequests.put(requestId, responseConsumer);
96 sendRpcRequestToRuleEngine(request); 96 sendRpcRequestToRuleEngine(request);
@@ -110,7 +110,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -110,7 +110,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
110 110
111 @Override 111 @Override
112 public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) { 112 public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
113 - log.trace("[{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getDeviceId()); 113 + log.trace("[{}][{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
114 UUID requestId = request.getId(); 114 UUID requestId = request.getId();
115 localToDeviceRpcRequests.put(requestId, responseConsumer); 115 localToDeviceRpcRequests.put(requestId, responseConsumer);
116 sendRpcRequestToDevice(request); 116 sendRpcRequestToDevice(request);
@@ -119,7 +119,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -119,7 +119,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
119 119
120 @Override 120 @Override
121 public void processRpcResponseFromDevice(FromDeviceRpcResponse response) { 121 public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
122 - log.trace("[{}] response to request: [{}]", this.hashCode(), response.getId()); 122 + log.trace("response to request: [{}]", response.getId());
123 if (routingService.getCurrentServer().equals(response.getServerAddress())) { 123 if (routingService.getCurrentServer().equals(response.getServerAddress())) {
124 UUID requestId = response.getId(); 124 UUID requestId = response.getId();
125 Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId); 125 Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
@@ -26,7 +26,6 @@ import java.util.UUID; @@ -26,7 +26,6 @@ import java.util.UUID;
26 import java.util.concurrent.BlockingQueue; 26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ExecutorService; 27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors; 28 import java.util.concurrent.Executors;
29 -import java.util.concurrent.Future;  
30 import java.util.concurrent.LinkedBlockingDeque; 29 import java.util.concurrent.LinkedBlockingDeque;
31 import java.util.concurrent.ScheduledExecutorService; 30 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.TimeUnit;
@@ -46,7 +45,6 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend @@ -46,7 +45,6 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
46 private final ExecutorService callbackExecutor; 45 private final ExecutorService callbackExecutor;
47 private final ScheduledExecutorService timeoutExecutor; 46 private final ScheduledExecutorService timeoutExecutor;
48 private final int concurrencyLimit; 47 private final int concurrencyLimit;
49 - private volatile boolean stopped;  
50 48
51 protected final AtomicInteger concurrencyLevel = new AtomicInteger(); 49 protected final AtomicInteger concurrencyLevel = new AtomicInteger();
52 protected final AtomicInteger totalAdded = new AtomicInteger(); 50 protected final AtomicInteger totalAdded = new AtomicInteger();
@@ -108,10 +106,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend @@ -108,10 +106,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
108 AsyncTaskContext<T, V> taskCtx = null; 106 AsyncTaskContext<T, V> taskCtx = null;
109 try { 107 try {
110 if (curLvl <= concurrencyLimit) { 108 if (curLvl <= concurrencyLimit) {
111 - taskCtx = queue.poll(1, TimeUnit.SECONDS);  
112 - if (taskCtx == null) {  
113 - continue;  
114 - } 109 + taskCtx = queue.take();
115 final AsyncTaskContext<T, V> finalTaskCtx = taskCtx; 110 final AsyncTaskContext<T, V> finalTaskCtx = taskCtx;
116 logTask("Processing", finalTaskCtx); 111 logTask("Processing", finalTaskCtx);
117 concurrencyLevel.incrementAndGet(); 112 concurrencyLevel.incrementAndGet();
@@ -151,6 +146,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend @@ -151,6 +146,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
151 } else { 146 } else {
152 Thread.sleep(pollMs); 147 Thread.sleep(pollMs);
153 } 148 }
  149 + } catch (InterruptedException e) {
  150 + break;
154 } catch (Throwable e) { 151 } catch (Throwable e) {
155 if (taskCtx != null) { 152 if (taskCtx != null) {
156 log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e); 153 log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);