Commit 8cfe4ef44f356d9e920189664df3abb9e89b708c

Authored by Igor Kulikov
2 parents 2fc25ad3 2728c7cd

Merge branch 'develop/3.0' of github.com:thingsboard/thingsboard into develop/3.0

@@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.routing; @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.routing;
17 17
18 import com.datastax.driver.core.utils.UUIDs; 18 import com.datastax.driver.core.utils.UUIDs;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.junit.Assert;
20 import org.junit.Before; 21 import org.junit.Before;
21 import org.junit.Test; 22 import org.junit.Test;
22 import org.junit.runner.RunWith; 23 import org.junit.runner.RunWith;
@@ -31,6 +32,8 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -31,6 +32,8 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
31 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
32 import org.thingsboard.server.gen.transport.TransportProtos; 33 import org.thingsboard.server.gen.transport.TransportProtos;
33 import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; 34 import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
  35 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  36 +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
34 37
35 import java.util.ArrayList; 38 import java.util.ArrayList;
36 import java.util.Collections; 39 import java.util.Collections;
@@ -41,6 +44,7 @@ import java.util.Map; @@ -41,6 +44,7 @@ import java.util.Map;
41 import java.util.stream.Collectors; 44 import java.util.stream.Collectors;
42 45
43 import static org.mockito.Mockito.mock; 46 import static org.mockito.Mockito.mock;
  47 +import static org.mockito.Mockito.when;
44 48
45 @Slf4j 49 @Slf4j
46 @RunWith(MockitoJUnitRunner.class) 50 @RunWith(MockitoJUnitRunner.class)
@@ -52,6 +56,7 @@ public class ConsistentHashParitionServiceTest { @@ -52,6 +56,7 @@ public class ConsistentHashParitionServiceTest {
52 private TbServiceInfoProvider discoveryService; 56 private TbServiceInfoProvider discoveryService;
53 private TenantRoutingInfoService routingInfoService; 57 private TenantRoutingInfoService routingInfoService;
54 private ApplicationEventPublisher applicationEventPublisher; 58 private ApplicationEventPublisher applicationEventPublisher;
  59 + private TbQueueRuleEngineSettings ruleEngineSettings;
55 60
56 private String hashFunctionName = "murmur3_128"; 61 private String hashFunctionName = "murmur3_128";
57 private Integer virtualNodesSize = 16; 62 private Integer virtualNodesSize = 16;
@@ -62,12 +67,15 @@ public class ConsistentHashParitionServiceTest { @@ -62,12 +67,15 @@ public class ConsistentHashParitionServiceTest {
62 discoveryService = mock(TbServiceInfoProvider.class); 67 discoveryService = mock(TbServiceInfoProvider.class);
63 applicationEventPublisher = mock(ApplicationEventPublisher.class); 68 applicationEventPublisher = mock(ApplicationEventPublisher.class);
64 routingInfoService = mock(TenantRoutingInfoService.class); 69 routingInfoService = mock(TenantRoutingInfoService.class);
65 - clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher); 70 + ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
  71 + clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
  72 + routingInfoService,
  73 + applicationEventPublisher,
  74 + ruleEngineSettings
  75 + );
  76 + when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
66 ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); 77 ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
67 ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3); 78 ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
68 - ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine");  
69 - ReflectionTestUtils.setField(clusterRoutingService, "ruleEnginePartitions", 100);  
70 -  
71 ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); 79 ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
72 ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize); 80 ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
73 TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder() 81 TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
@@ -107,8 +115,9 @@ public class ConsistentHashParitionServiceTest { @@ -107,8 +115,9 @@ public class ConsistentHashParitionServiceTest {
107 List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); 115 List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
108 long end = System.currentTimeMillis(); 116 long end = System.currentTimeMillis();
109 double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue()); 117 double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
110 - System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", (diff / ITERATIONS) * 100.0) + "%)");  
111 - 118 + double diffPercent = (diff / ITERATIONS) * 100.0;
  119 + System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
  120 + Assert.assertTrue(diffPercent < 0.5);
112 for (Map.Entry<Integer, Integer> entry : data) { 121 for (Map.Entry<Integer, Integer> entry : data) {
113 System.out.println(entry.getKey() + ": " + entry.getValue()); 122 System.out.println(entry.getKey() + ": " + entry.getValue());
114 } 123 }
@@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
30 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 30 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
31 import org.thingsboard.server.gen.transport.TransportProtos; 31 import org.thingsboard.server.gen.transport.TransportProtos;
32 import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; 32 import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
  33 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
33 34
34 import javax.annotation.PostConstruct; 35 import javax.annotation.PostConstruct;
35 import java.nio.charset.StandardCharsets; 36 import java.nio.charset.StandardCharsets;
@@ -61,6 +62,7 @@ public class ConsistentHashPartitionService implements PartitionService { @@ -61,6 +62,7 @@ public class ConsistentHashPartitionService implements PartitionService {
61 private final ApplicationEventPublisher applicationEventPublisher; 62 private final ApplicationEventPublisher applicationEventPublisher;
62 private final TbServiceInfoProvider serviceInfoProvider; 63 private final TbServiceInfoProvider serviceInfoProvider;
63 private final TenantRoutingInfoService tenantRoutingInfoService; 64 private final TenantRoutingInfoService tenantRoutingInfoService;
  65 + private final TbQueueRuleEngineSettings tbQueueRuleEngineSettings;
64 private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>(); 66 private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>();
65 private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>(); 67 private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>();
66 private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>(); 68 private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
@@ -74,10 +76,14 @@ public class ConsistentHashPartitionService implements PartitionService { @@ -74,10 +76,14 @@ public class ConsistentHashPartitionService implements PartitionService {
74 76
75 private HashFunction hashFunction; 77 private HashFunction hashFunction;
76 78
77 - public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) { 79 + public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
  80 + TenantRoutingInfoService tenantRoutingInfoService,
  81 + ApplicationEventPublisher applicationEventPublisher,
  82 + TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
78 this.serviceInfoProvider = serviceInfoProvider; 83 this.serviceInfoProvider = serviceInfoProvider;
79 this.tenantRoutingInfoService = tenantRoutingInfoService; 84 this.tenantRoutingInfoService = tenantRoutingInfoService;
80 this.applicationEventPublisher = applicationEventPublisher; 85 this.applicationEventPublisher = applicationEventPublisher;
  86 + this.tbQueueRuleEngineSettings = tbQueueRuleEngineSettings;
81 } 87 }
82 88
83 @PostConstruct 89 @PostConstruct
@@ -85,6 +91,10 @@ public class ConsistentHashPartitionService implements PartitionService { @@ -85,6 +91,10 @@ public class ConsistentHashPartitionService implements PartitionService {
85 this.hashFunction = forName(hashFunctionName); 91 this.hashFunction = forName(hashFunctionName);
86 partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions); 92 partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
87 partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic); 93 partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
  94 + tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
  95 + partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
  96 + partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getPartitions());
  97 + });
88 } 98 }
89 99
90 @Override 100 @Override
@@ -70,6 +70,8 @@ services: @@ -70,6 +70,8 @@ services:
70 - kafka 70 - kafka
71 - redis 71 - redis
72 - tb-js-executor 72 - tb-js-executor
  73 + - tb-rule-engine1
  74 + - tb-rule-engine2
73 tb-core2: 75 tb-core2:
74 restart: always 76 restart: always
75 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" 77 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
@@ -92,6 +94,8 @@ services: @@ -92,6 +94,8 @@ services:
92 - kafka 94 - kafka
93 - redis 95 - redis
94 - tb-js-executor 96 - tb-js-executor
  97 + - tb-rule-engine1
  98 + - tb-rule-engine2
95 tb-rule-engine1: 99 tb-rule-engine1:
96 restart: always 100 restart: always
97 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" 101 image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"