Showing
7 changed files
with
37 additions
and
14 deletions
... | ... | @@ -292,28 +292,38 @@ public class DefaultDeviceStateService implements DeviceStateService { |
292 | 292 | } |
293 | 293 | } |
294 | 294 | |
295 | + volatile Set<TopicPartitionInfo> pendingPartitions; | |
296 | + | |
295 | 297 | @Override |
296 | 298 | public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { |
297 | 299 | if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { |
298 | 300 | synchronized (this) { |
301 | + pendingPartitions = partitionChangeEvent.getPartitions(); | |
299 | 302 | if (!clusterUpdatePending) { |
300 | 303 | clusterUpdatePending = true; |
301 | 304 | queueExecutor.submit(() -> { |
302 | 305 | clusterUpdatePending = false; |
303 | - initStateFromDB(partitionChangeEvent.getPartitions()); | |
306 | + initStateFromDB(); | |
304 | 307 | }); |
305 | 308 | } |
306 | 309 | } |
307 | 310 | } |
308 | 311 | } |
309 | 312 | |
310 | - private void initStateFromDB(Set<TopicPartitionInfo> partitions) { | |
313 | + private void initStateFromDB() { | |
311 | 314 | try { |
312 | - Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions); | |
315 | + log.info("CURRENT PARTITIONS: {}", partitionedDevices.keySet()); | |
316 | + log.info("NEW PARTITIONS: {}", pendingPartitions); | |
317 | + | |
318 | + Set<TopicPartitionInfo> addedPartitions = new HashSet<>(pendingPartitions); | |
313 | 319 | addedPartitions.removeAll(partitionedDevices.keySet()); |
314 | 320 | |
321 | + log.info("ADDED PARTITIONS: {}", addedPartitions); | |
322 | + | |
315 | 323 | Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedDevices.keySet()); |
316 | - removedPartitions.removeAll(partitions); | |
324 | + removedPartitions.removeAll(pendingPartitions); | |
325 | + | |
326 | + log.info("REMOVED PARTITIONS: {}", removedPartitions); | |
317 | 327 | |
318 | 328 | // We no longer manage current partition of devices; |
319 | 329 | removedPartitions.forEach(partition -> { | ... | ... |
... | ... | @@ -668,7 +668,7 @@ queue: |
668 | 668 | topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" |
669 | 669 | poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" |
670 | 670 | partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" |
671 | - pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}" | |
671 | + pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}" | |
672 | 672 | stats: |
673 | 673 | enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}" |
674 | 674 | print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}" |
... | ... | @@ -690,7 +690,7 @@ queue: |
690 | 690 | rule-engine: |
691 | 691 | topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" |
692 | 692 | poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" |
693 | - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" | |
693 | + pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:2000}" | |
694 | 694 | stats: |
695 | 695 | enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" |
696 | 696 | print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}" |
... | ... | @@ -699,7 +699,7 @@ queue: |
699 | 699 | topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" |
700 | 700 | poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" |
701 | 701 | partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" |
702 | - pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:5000}" | |
702 | + pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}" | |
703 | 703 | submit-strategy: |
704 | 704 | type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL |
705 | 705 | # For BATCH only |
... | ... | @@ -714,7 +714,7 @@ queue: |
714 | 714 | topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" |
715 | 715 | poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" |
716 | 716 | partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}" |
717 | - pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}" | |
717 | + pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}" | |
718 | 718 | submit-strategy: |
719 | 719 | type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL |
720 | 720 | # For BATCH only |
... | ... | @@ -729,7 +729,7 @@ queue: |
729 | 729 | topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" |
730 | 730 | poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" |
731 | 731 | partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}" |
732 | - pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:60000}" | |
732 | + pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}" | |
733 | 733 | submit-strategy: |
734 | 734 | type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL |
735 | 735 | # For BATCH only | ... | ... |
... | ... | @@ -101,7 +101,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends Abstract |
101 | 101 | @Override |
102 | 102 | protected void doSubscribe(List<String> topicNames) { |
103 | 103 | createReceivers(); |
104 | - messagesPerQueue = receivers.size() / partitions.size(); | |
104 | + messagesPerQueue = receivers.size() / Math.max(partitions.size(), 1); | |
105 | 105 | } |
106 | 106 | |
107 | 107 | @Override | ... | ... |
... | ... | @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
36 | 36 | import javax.annotation.PostConstruct; |
37 | 37 | import java.nio.charset.StandardCharsets; |
38 | 38 | import java.util.ArrayList; |
39 | +import java.util.Collections; | |
39 | 40 | import java.util.Comparator; |
40 | 41 | import java.util.HashMap; |
41 | 42 | import java.util.HashSet; |
... | ... | @@ -148,6 +149,14 @@ public class HashPartitionService implements PartitionService { |
148 | 149 | } |
149 | 150 | } |
150 | 151 | }); |
152 | + | |
153 | + oldPartitions.forEach((serviceQueueKey, partitions) -> { | |
154 | + if (!myPartitions.containsKey(serviceQueueKey)) { | |
155 | + log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey); | |
156 | + applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet())); | |
157 | + } | |
158 | + }); | |
159 | + | |
151 | 160 | myPartitions.forEach((serviceQueueKey, partitions) -> { |
152 | 161 | if (!partitions.equals(oldPartitions.get(serviceQueueKey))) { |
153 | 162 | log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions); | ... | ... |
... | ... | @@ -71,8 +71,12 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue |
71 | 71 | |
72 | 72 | @Override |
73 | 73 | protected void doSubscribe(List<String> topicNames) { |
74 | - topicNames.forEach(admin::createTopicIfNotExists); | |
75 | - consumer.subscribe(topicNames); | |
74 | + if (!topicNames.isEmpty()) { | |
75 | + topicNames.forEach(admin::createTopicIfNotExists); | |
76 | + consumer.subscribe(topicNames); | |
77 | + } else { | |
78 | + consumer.unsubscribe(); | |
79 | + } | |
76 | 80 | } |
77 | 81 | |
78 | 82 | @Override | ... | ... |
... | ... | @@ -106,7 +106,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractPara |
106 | 106 | subscriptionNames = new LinkedHashSet<>(topicNames); |
107 | 107 | subscriptionNames.forEach(admin::createTopicIfNotExists); |
108 | 108 | initNewExecutor(subscriptionNames.size() + 1); |
109 | - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); | |
109 | + messagesPerTopic = pubSubSettings.getMaxMessages() / Math.max(subscriptionNames.size(), 1); | |
110 | 110 | } |
111 | 111 | |
112 | 112 | @Override | ... | ... |
... | ... | @@ -294,7 +294,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
294 | 294 | break; |
295 | 295 | } |
296 | 296 | } catch (Exception e) { |
297 | - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); | |
297 | + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); | |
298 | 298 | grantedQoSList.add(FAILURE.value()); |
299 | 299 | } |
300 | 300 | } | ... | ... |