Commit 6a706b32da934eac55d288aacba1e3a231f458b2

Authored by Sergey Matvienko
1 parent 3f97bb68

executors: shutdownNow added for tb-rule-engine-consumer-repartition executor on @PreDestroy

@@ -103,8 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -103,8 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
103 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>(); 103 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
104 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>(); 104 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
105 private final ConcurrentMap<String, TbTopicWithConsumerPerPartition> topicsConsumerPerPartition = new ConcurrentHashMap<>(); 105 private final ConcurrentMap<String, TbTopicWithConsumerPerPartition> topicsConsumerPerPartition = new ConcurrentHashMap<>();
106 - final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor"));  
107 - final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition-executor")); 106 + final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit"));
  107 + final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition"));
108 108
109 public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, 109 public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
110 TbRuleEngineSubmitStrategyFactory submitStrategyFactory, 110 TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
@@ -146,6 +146,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -146,6 +146,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
146 public void stop() { 146 public void stop() {
147 super.destroy(); 147 super.destroy();
148 submitExecutor.shutdownNow(); 148 submitExecutor.shutdownNow();
  149 + repartitionExecutor.shutdownNow();
149 ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config)); 150 ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config));
150 } 151 }
151 152