Commit 4240ac10f28f3405f46ec9ada0baaf8bf6783e54

Authored by Volodymyr Babak
2 parents 7b4ebc75 294b0e9e

Merge remote-tracking branch 'upstream/master'

... ... @@ -193,8 +193,8 @@
193 193 <artifactId>logback-classic</artifactId>
194 194 </dependency>
195 195 <dependency>
196   - <groupId>javax.mail</groupId>
197   - <artifactId>mail</artifactId>
  196 + <groupId>com.sun.mail</groupId>
  197 + <artifactId>javax.mail</artifactId>
198 198 </dependency>
199 199 <dependency>
200 200 <groupId>org.apache.curator</groupId>
... ...
... ... @@ -5,7 +5,7 @@
5 5 "94715984-ae74-76e4-20b7-2f956b01ed80": {
6 6 "isSystemType": true,
7 7 "bundleAlias": "entity_admin_widgets",
8   - "typeAlias": "device_admin_table2",
  8 + "typeAlias": "device_admin_table",
9 9 "type": "latest",
10 10 "title": "New widget",
11 11 "sizeX": 24,
... ... @@ -1271,4 +1271,4 @@
1271 1271 }
1272 1272 },
1273 1273 "name": "Gateways"
1274   -}
\ No newline at end of file
  1274 +}
... ...
... ... @@ -6,7 +6,7 @@
6 6 },
7 7 "widgetTypes": [
8 8 {
9   - "alias": "device_admin_table2",
  9 + "alias": "device_admin_table",
10 10 "name": "Device admin table",
11 11 "descriptor": {
12 12 "type": "latest",
... ... @@ -22,7 +22,7 @@
22 22 }
23 23 },
24 24 {
25   - "alias": "device_admin_table",
  25 + "alias": "asset_admin_table",
26 26 "name": "Asset admin table",
27 27 "descriptor": {
28 28 "type": "latest",
... ...
... ... @@ -338,7 +338,7 @@ public class AuthController extends BaseController {
338 338
339 339 @RequestMapping(value = "/noauth/oauth2Clients", method = RequestMethod.POST)
340 340 @ResponseBody
341   - public List<OAuth2ClientInfo> getOath2Clients() throws ThingsboardException {
  341 + public List<OAuth2ClientInfo> getOAuth2Clients() throws ThingsboardException {
342 342 try {
343 343 return oauth2Service.getOAuth2Clients();
344 344 } catch (Exception e) {
... ...
... ... @@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
23 23 import org.thingsboard.rule.engine.api.RpcError;
24 24 import org.thingsboard.server.actors.ActorSystemContext;
25 25 import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.msg.MsgType;
26 27 import org.thingsboard.server.common.msg.TbActorMsg;
27 28 import org.thingsboard.server.common.msg.queue.ServiceType;
28 29 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
45 46 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
46 47 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
47 48 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  49 +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
48 50 import org.thingsboard.server.service.state.DeviceStateService;
49 51 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
50 52 import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
... ... @@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
100 102 }
101 103
102 104 @PreDestroy
103   - public void destroy(){
  105 + public void destroy() {
104 106 super.destroy();
105 107 }
106 108
... ... @@ -143,8 +145,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
143 145 } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
144 146 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
145 147 if (actorMsg.isPresent()) {
146   - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
147   - actorContext.tell(actorMsg.get(), ActorRef.noSender());
  148 + TbActorMsg tbActorMsg = actorMsg.get();
  149 + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
  150 + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
  151 + } else {
  152 + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  153 + actorContext.tell(actorMsg.get(), ActorRef.noSender());
  154 + }
148 155 }
149 156 callback.onSuccess();
150 157 }
... ...
... ... @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 21 import org.springframework.beans.factory.annotation.Value;
22 22 import org.springframework.scheduling.annotation.Scheduled;
23 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.rule.engine.api.RpcError;
24 25 import org.thingsboard.server.actors.ActorSystemContext;
25 26 import org.thingsboard.server.common.data.id.TenantId;
26 27 import org.thingsboard.server.common.msg.TbActorMsg;
... ... @@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
31 32 import org.thingsboard.server.common.msg.queue.ServiceType;
32 33 import org.thingsboard.server.common.msg.queue.TbCallback;
33 34 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  35 +import org.thingsboard.server.gen.transport.TransportProtos;
34 36 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
35 37 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
36 38 import org.thingsboard.server.queue.TbQueueConsumer;
... ... @@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr
48 50 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
49 51 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
50 52 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
  53 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
  54 +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  55 +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
51 56 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
52 57
53 58 import javax.annotation.PostConstruct;
... ... @@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
81 86 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
82 87 private final TbQueueRuleEngineSettings ruleEngineSettings;
83 88 private final RuleEngineStatisticsService statisticsService;
  89 + private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
84 90 private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
85 91 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
86 92 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
... ... @@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
90 96 TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
91 97 TbQueueRuleEngineSettings ruleEngineSettings,
92 98 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
93   - ActorSystemContext actorContext, DataDecodingEncodingService encodingService) {
  99 + ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
  100 + TbRuleEngineDeviceRpcService tbDeviceRpcService) {
94 101 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
95 102 this.statisticsService = statisticsService;
96 103 this.ruleEngineSettings = ruleEngineSettings;
97 104 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
98 105 this.submitStrategyFactory = submitStrategyFactory;
99 106 this.processingStrategyFactory = processingStrategyFactory;
  107 + this.tbDeviceRpcService = tbDeviceRpcService;
100 108 }
101 109
102 110 @PostConstruct
... ... @@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
227 235 actorContext.tell(actorMsg.get(), ActorRef.noSender());
228 236 }
229 237 callback.onSuccess();
  238 + } else if (nfMsg.hasFromDeviceRpcResponse()) {
  239 + TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
  240 + RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
  241 + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
  242 + , proto.getResponse(), error);
  243 + tbDeviceRpcService.processRpcResponseFromDevice(response);
  244 + callback.onSuccess();
230 245 } else {
  246 + log.trace("Received notification with missing handler");
231 247 callback.onSuccess();
232 248 }
233 249 }
... ...
... ... @@ -621,8 +621,7 @@ queue:
621 621 notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
622 622 js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
623 623 partitions:
624   - hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
625   - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
  624 + hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
626 625 transport_api:
627 626 requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
628 627 responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
... ... @@ -638,7 +637,7 @@ queue:
638 637 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
639 638 stats:
640 639 enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}"
641   - print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
  640 + print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}"
642 641 js:
643 642 # JS Eval request topic
644 643 request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
... ... @@ -658,7 +657,7 @@ queue:
658 657 pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
659 658 stats:
660 659 enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
661   - print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
  660 + print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
662 661 queues:
663 662 - name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}"
664 663 topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
... ...
application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java renamed from application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java
... ... @@ -26,14 +26,14 @@ import org.springframework.context.ApplicationEventPublisher;
26 26 import org.springframework.test.util.ReflectionTestUtils;
27 27 import org.thingsboard.server.common.data.id.DeviceId;
28 28 import org.thingsboard.server.common.data.id.TenantId;
29   -import org.thingsboard.server.queue.discovery.ConsistentHashPartitionService;
  29 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
  30 +import org.thingsboard.server.queue.discovery.HashPartitionService;
30 31 import org.thingsboard.server.common.msg.queue.ServiceType;
31 32 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
32 33 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
33 34 import org.thingsboard.server.gen.transport.TransportProtos;
34 35 import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
35 36 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
36   -import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
37 37
38 38 import java.util.ArrayList;
39 39 import java.util.Collections;
... ... @@ -48,18 +48,18 @@ import static org.mockito.Mockito.when;
48 48
49 49 @Slf4j
50 50 @RunWith(MockitoJUnitRunner.class)
51   -public class ConsistentHashParitionServiceTest {
  51 +public class HashPartitionServiceTest {
52 52
53 53 public static final int ITERATIONS = 1000000;
54   - private ConsistentHashPartitionService clusterRoutingService;
  54 + public static final int SERVER_COUNT = 3;
  55 + private HashPartitionService clusterRoutingService;
55 56
56 57 private TbServiceInfoProvider discoveryService;
57 58 private TenantRoutingInfoService routingInfoService;
58 59 private ApplicationEventPublisher applicationEventPublisher;
59 60 private TbQueueRuleEngineSettings ruleEngineSettings;
60 61
61   - private String hashFunctionName = "murmur3_128";
62   - private Integer virtualNodesSize = 16;
  62 + private String hashFunctionName = "sha256";
63 63
64 64
65 65 @Before
... ... @@ -68,25 +68,28 @@ public class ConsistentHashParitionServiceTest {
68 68 applicationEventPublisher = mock(ApplicationEventPublisher.class);
69 69 routingInfoService = mock(TenantRoutingInfoService.class);
70 70 ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
71   - clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
  71 + clusterRoutingService = new HashPartitionService(discoveryService,
72 72 routingInfoService,
73 73 applicationEventPublisher,
74 74 ruleEngineSettings
75 75 );
76 76 when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
77 77 ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
78   - ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
  78 + ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10);
79 79 ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
80   - ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
81 80 TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
82   - .setServiceId("100.96.1.1")
  81 + .setServiceId("tb-core-0")
  82 + .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
  83 + .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
83 84 .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
84 85 .build();
85 86 // when(discoveryService.getServiceInfo()).thenReturn(currentServer);
86 87 List<TransportProtos.ServiceInfo> otherServers = new ArrayList<>();
87   - for (int i = 1; i < 30; i++) {
  88 + for (int i = 1; i < SERVER_COUNT; i++) {
88 89 otherServers.add(TransportProtos.ServiceInfo.newBuilder()
89   - .setServiceId("100.96." + i * 2 + "." + i)
  90 + .setServiceId("tb-rule-" + i)
  91 + .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
  92 + .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
90 93 .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
91 94 .build());
92 95 }
... ... @@ -116,12 +119,11 @@ public class ConsistentHashParitionServiceTest {
116 119 long end = System.currentTimeMillis();
117 120 double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
118 121 double diffPercent = (diff / ITERATIONS) * 100.0;
119   - System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
  122 + System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
120 123 Assert.assertTrue(diffPercent < 0.5);
121 124 for (Map.Entry<Integer, Integer> entry : data) {
122 125 System.out.println(entry.getKey() + ": " + entry.getValue());
123 126 }
124   -
125 127 }
126 128
127 129 }
... ...
common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java renamed from common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.discovery;
18 18 import com.google.common.hash.HashCode;
19 19 import com.google.common.hash.HashFunction;
20 20 import com.google.common.hash.Hashing;
  21 +import lombok.Getter;
21 22 import lombok.extern.slf4j.Slf4j;
22 23 import org.springframework.beans.factory.annotation.Value;
23 24 import org.springframework.context.ApplicationEventPublisher;
... ... @@ -35,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
35 36 import javax.annotation.PostConstruct;
36 37 import java.nio.charset.StandardCharsets;
37 38 import java.util.ArrayList;
  39 +import java.util.Comparator;
38 40 import java.util.HashMap;
39 41 import java.util.HashSet;
40 42 import java.util.List;
... ... @@ -48,7 +50,7 @@ import java.util.stream.Collectors;
48 50
49 51 @Service
50 52 @Slf4j
51   -public class ConsistentHashPartitionService implements PartitionService {
  53 +public class HashPartitionService implements PartitionService {
52 54
53 55 @Value("${queue.core.topic}")
54 56 private String coreTopic;
... ... @@ -56,8 +58,6 @@ public class ConsistentHashPartitionService implements PartitionService {
56 58 private Integer corePartitions;
57 59 @Value("${queue.partitions.hash_function_name:murmur3_128}")
58 60 private String hashFunctionName;
59   - @Value("${queue.partitions.virtual_nodes_size:16}")
60   - private Integer virtualNodesSize;
61 61
62 62 private final ApplicationEventPublisher applicationEventPublisher;
63 63 private final TbServiceInfoProvider serviceInfoProvider;
... ... @@ -76,10 +76,10 @@ public class ConsistentHashPartitionService implements PartitionService {
76 76
77 77 private HashFunction hashFunction;
78 78
79   - public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
80   - TenantRoutingInfoService tenantRoutingInfoService,
81   - ApplicationEventPublisher applicationEventPublisher,
82   - TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
  79 + public HashPartitionService(TbServiceInfoProvider serviceInfoProvider,
  80 + TenantRoutingInfoService tenantRoutingInfoService,
  81 + ApplicationEventPublisher applicationEventPublisher,
  82 + TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
83 83 this.serviceInfoProvider = serviceInfoProvider;
84 84 this.tenantRoutingInfoService = tenantRoutingInfoService;
85 85 this.applicationEventPublisher = applicationEventPublisher;
... ... @@ -128,20 +128,22 @@ public class ConsistentHashPartitionService implements PartitionService {
128 128 public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
129 129 logServiceInfo(currentService);
130 130 otherServices.forEach(this::logServiceInfo);
131   - Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles = new HashMap<>();
132   - addNode(circles, currentService);
  131 + Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
  132 + addNode(queueServicesMap, currentService);
133 133 for (ServiceInfo other : otherServices) {
134   - addNode(circles, other);
  134 + addNode(queueServicesMap, other);
135 135 }
  136 + queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
  137 +
136 138 ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
137 139 TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
138 140 myPartitions = new ConcurrentHashMap<>();
139   - partitionSizes.forEach((type, size) -> {
140   - ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myIsolatedOrSystemTenantId);
  141 + partitionSizes.forEach((serviceQueue, size) -> {
  142 + ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
141 143 for (int i = 0; i < size; i++) {
142   - ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceQueueKey), i);
  144 + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
143 145 if (currentService.equals(serviceInfo)) {
144   - ServiceQueueKey serviceQueueKey = new ServiceQueueKey(type, getSystemOrIsolatedTenantId(serviceInfo));
  146 + ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
145 147 myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
146 148 }
147 149 }
... ... @@ -293,7 +295,7 @@ public class ConsistentHashPartitionService implements PartitionService {
293 295 return new TenantId(new UUID(serviceInfo.getTenantIdMSB(), serviceInfo.getTenantIdLSB()));
294 296 }
295 297
296   - private void addNode(Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
  298 + private void addNode(Map<ServiceQueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) {
297 299 TenantId tenantId = getSystemOrIsolatedTenantId(instance);
298 300 for (String serviceTypeStr : instance.getServiceTypesList()) {
299 301 ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
... ... @@ -302,34 +304,20 @@ public class ConsistentHashPartitionService implements PartitionService {
302 304 ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType, queue.getName()), tenantId);
303 305 partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getPartitions());
304 306 partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getTopic());
305   - for (int i = 0; i < virtualNodesSize; i++) {
306   - circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
307   - }
  307 + queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
308 308 }
309 309 } else {
310 310 ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType), tenantId);
311   - for (int i = 0; i < virtualNodesSize; i++) {
312   - circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
313   - }
  311 + queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
314 312 }
315 313 }
316 314 }
317 315
318   - private ServiceInfo resolveByPartitionIdx(ConsistentHashCircle<ServiceInfo> circle, Integer partitionIdx) {
319   - if (circle == null || circle.isEmpty()) {
  316 + private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) {
  317 + if (servers == null || servers.isEmpty()) {
320 318 return null;
321 319 }
322   - Long hash = hashFunction.newHasher().putInt(partitionIdx).hash().asLong();
323   - if (!circle.containsKey(hash)) {
324   - ConcurrentNavigableMap<Long, ServiceInfo> tailMap = circle.tailMap(hash);
325   - hash = tailMap.isEmpty() ?
326   - circle.firstKey() : tailMap.firstKey();
327   - }
328   - return circle.get(hash);
329   - }
330   -
331   - private HashCode hash(ServiceInfo instance, int i) {
332   - return hashFunction.newHasher().putString(instance.getServiceId(), StandardCharsets.UTF_8).putInt(i).hash();
  320 + return servers.get(partitionIdx % servers.size());
333 321 }
334 322
335 323 public static HashFunction forName(String name) {
... ... @@ -338,12 +326,11 @@ public class ConsistentHashPartitionService implements PartitionService {
338 326 return Hashing.murmur3_32();
339 327 case "murmur3_128":
340 328 return Hashing.murmur3_128();
341   - case "crc32":
342   - return Hashing.crc32();
343   - case "md5":
344   - return Hashing.md5();
  329 + case "sha256":
  330 + return Hashing.sha256();
345 331 default:
346 332 throw new IllegalArgumentException("Can't find hash function with name " + name);
347 333 }
348 334 }
  335 +
349 336 }
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
37 37 import java.lang.reflect.Field;
38 38 import java.util.List;
39 39 import java.util.Optional;
  40 +import java.util.Set;
40 41 import java.util.UUID;
41 42 import java.util.concurrent.ConcurrentHashMap;
42 43 import java.util.concurrent.ConcurrentMap;
... ... @@ -55,6 +56,8 @@ public class CoapTransportResource extends CoapResource {
55 56 private final Field observerField;
56 57 private final long timeout;
57 58 private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionIdMap = new ConcurrentHashMap<>();
  59 + private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
  60 + private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
58 61
59 62 public CoapTransportResource(CoapTransportContext context, String name) {
60 63 super(name);
... ... @@ -149,11 +152,13 @@ public class CoapTransportResource extends CoapResource {
149 152 transportService.process(sessionInfo,
150 153 transportContext.getAdaptor().convertToPostAttributes(sessionId, request),
151 154 new CoapOkCallback(exchange));
  155 + reportActivity(sessionId, sessionInfo);
152 156 break;
153 157 case POST_TELEMETRY_REQUEST:
154 158 transportService.process(sessionInfo,
155 159 transportContext.getAdaptor().convertToPostTelemetry(sessionId, request),
156 160 new CoapOkCallback(exchange));
  161 + reportActivity(sessionId, sessionInfo);
157 162 break;
158 163 case CLAIM_REQUEST:
159 164 transportService.process(sessionInfo,
... ... @@ -161,6 +166,7 @@ public class CoapTransportResource extends CoapResource {
161 166 new CoapOkCallback(exchange));
162 167 break;
163 168 case SUBSCRIBE_ATTRIBUTES_REQUEST:
  169 + attributeSubscriptions.add(sessionId);
164 170 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
165 171 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
166 172 transportService.process(sessionInfo,
... ... @@ -168,6 +174,7 @@ public class CoapTransportResource extends CoapResource {
168 174 new CoapNoOpCallback(exchange));
169 175 break;
170 176 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
  177 + attributeSubscriptions.remove(sessionId);
171 178 TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(request);
172 179 if (attrSession != null) {
173 180 transportService.process(attrSession,
... ... @@ -177,6 +184,7 @@ public class CoapTransportResource extends CoapResource {
177 184 }
178 185 break;
179 186 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
  187 + rpcSubscriptions.add(sessionId);
180 188 advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
181 189 registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
182 190 transportService.process(sessionInfo,
... ... @@ -184,13 +192,13 @@ public class CoapTransportResource extends CoapResource {
184 192 new CoapNoOpCallback(exchange));
185 193 break;
186 194 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
  195 + rpcSubscriptions.remove(sessionId);
187 196 TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(request);
188 197 if (rpcSession != null) {
189 198 transportService.process(rpcSession,
190 199 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
191 200 new CoapOkCallback(exchange));
192   - transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
193   - transportService.deregisterSession(rpcSession);
  201 + closeAndDeregister(sessionInfo);
194 202 }
195 203 break;
196 204 case TO_DEVICE_RPC_RESPONSE:
... ... @@ -221,6 +229,14 @@ public class CoapTransportResource extends CoapResource {
221 229 }));
222 230 }
223 231
  232 + private void reportActivity(UUID sessionId, TransportProtos.SessionInfoProto sessionInfo) {
  233 + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
  234 + .setAttributeSubscription(attributeSubscriptions.contains(sessionId))
  235 + .setRpcSubscription(rpcSubscriptions.contains(sessionId))
  236 + .setLastActivityTime(System.currentTimeMillis())
  237 + .build(), TransportServiceCallback.EMPTY);
  238 + }
  239 +
224 240 private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(Request request) {
225 241 String token = request.getSource().getHostAddress() + ":" + request.getSourcePort() + ":" + request.getTokenString();
226 242 return tokenToSessionIdMap.remove(token);
... ... @@ -438,6 +454,9 @@ public class CoapTransportResource extends CoapResource {
438 454 private void closeAndDeregister(TransportProtos.SessionInfoProto session) {
439 455 transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
440 456 transportService.deregisterSession(session);
  457 + UUID sessionId = new UUID(session.getSessionIdMSB(), session.getSessionIdLSB());
  458 + rpcSubscriptions.remove(sessionId);
  459 + attributeSubscriptions.remove(sessionId);
441 460 }
442 461
443 462 }
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.TransportContext;
36 36 import org.thingsboard.server.common.transport.TransportService;
37 37 import org.thingsboard.server.common.transport.TransportServiceCallback;
38 38 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  39 +import org.thingsboard.server.gen.transport.TransportProtos;
39 40 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
40 41 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
41 42 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
... ... @@ -102,6 +103,7 @@ public class DeviceApiController {
102 103 TransportService transportService = transportContext.getTransportService();
103 104 transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
104 105 new HttpOkCallback(responseWriter));
  106 + reportActivity(sessionInfo);
105 107 }));
106 108 return responseWriter;
107 109 }
... ... @@ -115,6 +117,7 @@ public class DeviceApiController {
115 117 TransportService transportService = transportContext.getTransportService();
116 118 transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
117 119 new HttpOkCallback(responseWriter));
  120 + reportActivity(sessionInfo);
118 121 }));
119 122 return responseWriter;
120 123 }
... ... @@ -274,7 +277,6 @@ public class DeviceApiController {
274 277 }
275 278 }
276 279
277   -
278 280 private static class HttpSessionListener implements SessionMsgListener {
279 281
280 282 private final DeferredResult<ResponseEntity> responseWriter;
... ... @@ -308,4 +310,13 @@ public class DeviceApiController {
308 310 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
309 311 }
310 312 }
  313 +
  314 + private void reportActivity(SessionInfoProto sessionInfo) {
  315 + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
  316 + .setAttributeSubscription(false)
  317 + .setRpcSubscription(false)
  318 + .setLastActivityTime(System.currentTimeMillis())
  319 + .build(), TransportServiceCallback.EMPTY);
  320 + }
  321 +
311 322 }
... ...
... ... @@ -20,7 +20,20 @@ package org.thingsboard.server.common.transport;
20 20 */
21 21 public interface TransportServiceCallback<T> {
22 22
  23 + TransportServiceCallback<Void> EMPTY = new TransportServiceCallback<Void>() {
  24 + @Override
  25 + public void onSuccess(Void msg) {
  26 +
  27 + }
  28 +
  29 + @Override
  30 + public void onError(Throwable e) {
  31 +
  32 + }
  33 + };
  34 +
23 35 void onSuccess(T msg);
  36 +
24 37 void onError(Throwable e);
25 38
26 39 }
... ...
... ... @@ -30,10 +30,10 @@
30 30 <properties>
31 31 <main.dir>${basedir}</main.dir>
32 32 <pkg.user>thingsboard</pkg.user>
33   - <spring-boot.version>2.2.4.RELEASE</spring-boot.version>
  33 + <spring-boot.version>2.2.6.RELEASE</spring-boot.version>
34 34 <spring-oauth2.version>2.1.2.RELEASE</spring-oauth2.version>
35   - <spring.version>5.2.2.RELEASE</spring.version>
36   - <spring-security.version>5.2.2.RELEASE</spring-security.version>
  35 + <spring.version>5.2.6.RELEASE</spring.version>
  36 + <spring-security.version>5.2.3.RELEASE</spring-security.version>
37 37 <spring-data-redis.version>2.2.4.RELEASE</spring-data-redis.version>
38 38 <jedis.version>3.1.0</jedis.version>
39 39 <jjwt.version>0.7.0</jjwt.version>
... ... @@ -62,14 +62,14 @@
62 62 <gson.version>2.6.2</gson.version>
63 63 <velocity.version>1.7</velocity.version>
64 64 <velocity-tools.version>2.0</velocity-tools.version>
65   - <mail.version>1.4.3</mail.version>
  65 + <mail.version>1.6.2</mail.version>
66 66 <curator.version>4.2.0</curator.version>
67 67 <zookeeper.version>3.5.5</zookeeper.version>
68 68 <protobuf.version>3.11.4</protobuf.version>
69 69 <grpc.version>1.22.1</grpc.version>
70 70 <lombok.version>1.16.18</lombok.version>
71 71 <paho.client.version>1.1.0</paho.client.version>
72   - <netty.version>4.1.45.Final</netty.version>
  72 + <netty.version>4.1.49.Final</netty.version>
73 73 <os-maven-plugin.version>1.5.0</os-maven-plugin.version>
74 74 <rabbitmq.version>4.8.0</rabbitmq.version>
75 75 <surfire.version>2.19.1</surfire.version>
... ... @@ -467,12 +467,12 @@
467 467 <dependency>
468 468 <groupId>org.springframework.security</groupId>
469 469 <artifactId>spring-security-oauth2-client</artifactId>
470   - <version>${spring.version}</version>
  470 + <version>${spring-security.version}</version>
471 471 </dependency>
472 472 <dependency>
473 473 <groupId>org.springframework.security</groupId>
474 474 <artifactId>spring-security-oauth2-jose</artifactId>
475   - <version>${spring.version}</version>
  475 + <version>${spring-security.version}</version>
476 476 </dependency>
477 477 <dependency>
478 478 <groupId>org.springframework.boot</groupId>
... ... @@ -587,8 +587,8 @@
587 587 <version>${rabbitmq.version}</version>
588 588 </dependency>
589 589 <dependency>
590   - <groupId>javax.mail</groupId>
591   - <artifactId>mail</artifactId>
  590 + <groupId>com.sun.mail</groupId>
  591 + <artifactId>javax.mail</artifactId>
592 592 <version>${mail.version}</version>
593 593 </dependency>
594 594 <dependency>
... ... @@ -606,6 +606,12 @@
606 606 <groupId>org.apache.zookeeper</groupId>
607 607 <artifactId>zookeeper</artifactId>
608 608 <version>${zookeeper.version}</version>
  609 + <exclusions>
  610 + <exclusion>
  611 + <groupId>log4j</groupId>
  612 + <artifactId>log4j</artifactId>
  613 + </exclusion>
  614 + </exclusions>
609 615 </dependency>
610 616 <dependency>
611 617 <groupId>com.jayway.jsonpath</groupId>
... ... @@ -688,6 +694,12 @@
688 694 <groupId>com.github.fge</groupId>
689 695 <artifactId>json-schema-validator</artifactId>
690 696 <version>${json-schema-validator.version}</version>
  697 + <exclusions>
  698 + <exclusion>
  699 + <groupId>javax.mail</groupId>
  700 + <artifactId>mailapi</artifactId>
  701 + </exclusion>
  702 + </exclusions>
691 703 </dependency>
692 704 <dependency>
693 705 <groupId>com.typesafe.akka</groupId>
... ... @@ -924,12 +936,6 @@
924 936 <groupId>com.microsoft.azure</groupId>
925 937 <artifactId>azure-servicebus</artifactId>
926 938 <version>${azure-servicebus.version}</version>
927   - <exclusions>
928   - <exclusion>
929   - <groupId>com.microsoft.azure</groupId>
930   - <artifactId>adal4j</artifactId>
931   - </exclusion>
932   - </exclusions>
933 939 </dependency>
934 940 <dependency>
935 941 <groupId>org.passay</groupId>
... ...
... ... @@ -93,5 +93,10 @@
93 93 <artifactId>spring-data-redis</artifactId>
94 94 <scope>provided</scope>
95 95 </dependency>
  96 + <dependency>
  97 + <groupId>com.sun.mail</groupId>
  98 + <artifactId>javax.mail</artifactId>
  99 + <scope>provided</scope>
  100 + </dependency>
96 101 </dependencies>
97 102 </project>
... ...