Commit a87a3d684e4082b14c17595792a25e558f922dd7

Authored by Andrii Shvaika
1 parent 9f2bae98

Improvement to handling of subscription updates

@@ -21,12 +21,6 @@ import org.springframework.context.annotation.Lazy; @@ -21,12 +21,6 @@ import org.springframework.context.annotation.Lazy;
21 import org.springframework.context.event.EventListener; 21 import org.springframework.context.event.EventListener;
22 import org.springframework.stereotype.Service; 22 import org.springframework.stereotype.Service;
23 import org.thingsboard.common.util.ThingsBoardThreadFactory; 23 import org.thingsboard.common.util.ThingsBoardThreadFactory;
24 -import org.thingsboard.server.common.data.EntityType;  
25 -import org.thingsboard.server.common.data.EntityView;  
26 -import org.thingsboard.server.common.data.id.EntityId;  
27 -import org.thingsboard.server.common.data.id.EntityViewId;  
28 -import org.thingsboard.server.common.data.id.TenantId;  
29 -import org.thingsboard.server.dao.entityview.EntityViewService;  
30 import org.thingsboard.server.gen.transport.TransportProtos; 24 import org.thingsboard.server.gen.transport.TransportProtos;
31 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; 25 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
32 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 26 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
@@ -36,7 +30,6 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -36,7 +30,6 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
36 import org.thingsboard.server.common.msg.queue.TbCallback; 30 import org.thingsboard.server.common.msg.queue.TbCallback;
37 import org.thingsboard.server.queue.util.TbCoreComponent; 31 import org.thingsboard.server.queue.util.TbCoreComponent;
38 import org.thingsboard.server.service.queue.TbClusterService; 32 import org.thingsboard.server.service.queue.TbClusterService;
39 -import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;  
40 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; 33 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
41 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; 34 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
42 35
@@ -49,7 +42,6 @@ import java.util.Set; @@ -49,7 +42,6 @@ import java.util.Set;
49 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors; 44 import java.util.concurrent.Executors;
52 -import java.util.stream.Collectors;  
53 45
54 @Slf4j 46 @Slf4j
55 @TbCoreComponent 47 @TbCoreComponent
@@ -60,9 +52,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -60,9 +52,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
60 private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>(); 52 private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>();
61 53
62 @Autowired 54 @Autowired
63 - private EntityViewService entityViewService;  
64 -  
65 - @Autowired  
66 private PartitionService partitionService; 55 private PartitionService partitionService;
67 56
68 @Autowired 57 @Autowired
@@ -72,17 +61,17 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -72,17 +61,17 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
72 @Lazy 61 @Lazy
73 private SubscriptionManagerService subscriptionManagerService; 62 private SubscriptionManagerService subscriptionManagerService;
74 63
75 - private ExecutorService wsCallBackExecutor; 64 + private ExecutorService subscriptionUpdateExecutor;
76 65
77 @PostConstruct 66 @PostConstruct
78 public void initExecutor() { 67 public void initExecutor() {
79 - wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback")); 68 + subscriptionUpdateExecutor = Executors.newWorkStealingPool(20);
80 } 69 }
81 70
82 @PreDestroy 71 @PreDestroy
83 public void shutdownExecutor() { 72 public void shutdownExecutor() {
84 - if (wsCallBackExecutor != null) {  
85 - wsCallBackExecutor.shutdownNow(); 73 + if (subscriptionUpdateExecutor != null) {
  74 + subscriptionUpdateExecutor.shutdownNow();
86 } 75 }
87 } 76 }
88 77
@@ -148,7 +137,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -148,7 +137,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
148 update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value)); 137 update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value));
149 break; 138 break;
150 } 139 }
151 - subscription.getUpdateConsumer().accept(sessionId, update); 140 + subscriptionUpdateExecutor.submit(() -> subscription.getUpdateConsumer().accept(sessionId, update));
152 } 141 }
153 callback.onSuccess(); 142 callback.onSuccess();
154 } 143 }
@@ -158,7 +147,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -158,7 +147,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
158 TbSubscription subscription = subscriptionsBySessionId 147 TbSubscription subscription = subscriptionsBySessionId
159 .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); 148 .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
160 if (subscription != null && subscription.getType() == TbSubscriptionType.ALARMS) { 149 if (subscription != null && subscription.getType() == TbSubscriptionType.ALARMS) {
161 - subscription.getUpdateConsumer().accept(sessionId, update); 150 + subscriptionUpdateExecutor.submit(() -> subscription.getUpdateConsumer().accept(sessionId, update));
162 } 151 }
163 callback.onSuccess(); 152 callback.onSuccess();
164 } 153 }
@@ -941,8 +941,8 @@ @@ -941,8 +941,8 @@
941 "provision-strategy": "Provision strategy", 941 "provision-strategy": "Provision strategy",
942 "provision-strategy-required": "Provision strategy is required.", 942 "provision-strategy-required": "Provision strategy is required.",
943 "provision-strategy-disabled": "Disabled", 943 "provision-strategy-disabled": "Disabled",
944 - "provision-strategy-created-new": "Allow create new devices",  
945 - "provision-strategy-check-pre-provisioned": "Check pre provisioned devices", 944 + "provision-strategy-created-new": "Allow to create new devices",
  945 + "provision-strategy-check-pre-provisioned": "Check for pre-provisioned devices",
946 "provision-device-key": "Provision device key", 946 "provision-device-key": "Provision device key",
947 "provision-device-key-required": "Provision device key is required.", 947 "provision-device-key-required": "Provision device key is required.",
948 "provision-device-secret": "Provision device secret", 948 "provision-device-secret": "Provision device secret",