Commit eb47eb3c4f41feb42bc95c186404c8e2dae80cbe

Authored by Andrew Shvayka
1 parent 8fd37663

Fixed concurrency issues on high load

@@ -29,6 +29,7 @@ import org.thingsboard.server.kafka.*; @@ -29,6 +29,7 @@ import org.thingsboard.server.kafka.*;
29 import javax.annotation.PostConstruct; 29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy; 30 import javax.annotation.PreDestroy;
31 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.ExecutorService;
  32 +import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.SynchronousQueue; 33 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.ThreadPoolExecutor; 34 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit; 35 import java.util.concurrent.TimeUnit;
@@ -67,7 +68,7 @@ public class RemoteTransportApiService { @@ -67,7 +68,7 @@ public class RemoteTransportApiService {
67 68
68 @PostConstruct 69 @PostConstruct
69 public void init() { 70 public void init() {
70 - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); 71 + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
71 72
72 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder(); 73 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
73 responseBuilder.settings(kafkaSettings); 74 responseBuilder.settings(kafkaSettings);
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap; 32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.ExecutorService; 33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors; 34 import java.util.concurrent.Executors;
  35 +import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.ScheduledExecutorService; 36 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.SynchronousQueue; 37 import java.util.concurrent.SynchronousQueue;
37 import java.util.concurrent.ThreadPoolExecutor; 38 import java.util.concurrent.ThreadPoolExecutor;
@@ -277,7 +278,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -277,7 +278,7 @@ public abstract class AbstractTransportService implements TransportService {
277 new TbRateLimits(perDevicesLimitsConf); 278 new TbRateLimits(perDevicesLimitsConf);
278 } 279 }
279 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); 280 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
280 - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); 281 + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
281 this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); 282 this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
282 } 283 }
283 284
@@ -31,7 +31,9 @@ import java.util.concurrent.ConcurrentMap; @@ -31,7 +31,9 @@ import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors; 32 import java.util.concurrent.Executors;
33 import java.util.concurrent.LinkedBlockingDeque; 33 import java.util.concurrent.LinkedBlockingDeque;
  34 +import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.ScheduledExecutorService; 35 import java.util.concurrent.ScheduledExecutorService;
  36 +import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit; 37 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException; 38 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.atomic.AtomicInteger; 39 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,7 +72,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend @@ -70,7 +72,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
70 this.concurrencyLimit = concurrencyLimit; 72 this.concurrencyLimit = concurrencyLimit;
71 this.queue = new LinkedBlockingDeque<>(queueLimit); 73 this.queue = new LinkedBlockingDeque<>(queueLimit);
72 this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); 74 this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
73 - this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads); 75 + this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
74 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); 76 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
75 this.perTenantLimitsEnabled = perTenantLimitsEnabled; 77 this.perTenantLimitsEnabled = perTenantLimitsEnabled;
76 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration; 78 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;