Commit 10dd5c352f107cda96d2b85691724372fffdded1

Authored by Andrii Shvaika
1 parent e8e2f211

Max entities for data subscription over WS

@@ -128,6 +128,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -128,6 +128,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
128 private long dynamicPageLinkRefreshInterval; 128 private long dynamicPageLinkRefreshInterval;
129 @Value("${server.ws.dynamic_page_link.refresh_pool_size:1}") 129 @Value("${server.ws.dynamic_page_link.refresh_pool_size:1}")
130 private int dynamicPageLinkRefreshPoolSize; 130 private int dynamicPageLinkRefreshPoolSize;
  131 + @Value("${server.ws.max_entities_per_data_subscription:1000}")
  132 + private int maxEntitiesPerDataSubscription;
131 @Value("${server.ws.max_entities_per_alarm_subscription:1000}") 133 @Value("${server.ws.max_entities_per_alarm_subscription:1000}")
132 private int maxEntitiesPerAlarmSubscription; 134 private int maxEntitiesPerAlarmSubscription;
133 135
@@ -220,7 +222,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -220,7 +222,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
220 } else if (cmd.getTsCmd() != null) { 222 } else if (cmd.getTsCmd() != null) {
221 handleTimeSeriesCmd(theCtx, cmd.getTsCmd()); 223 handleTimeSeriesCmd(theCtx, cmd.getTsCmd());
222 } else if (!theCtx.isInitialDataSent()) { 224 } else if (!theCtx.isInitialDataSent()) {
223 - EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null); 225 + EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription());
224 wsService.sendWsMsg(theCtx.getSessionId(), update); 226 wsService.sendWsMsg(theCtx.getSessionId(), update);
225 theCtx.setInitialDataSent(true); 227 theCtx.setInitialDataSent(true);
226 } 228 }
@@ -298,7 +300,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -298,7 +300,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
298 300
299 private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { 301 private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
300 Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); 302 Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
301 - TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmd.getCmdId()); 303 + TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, entityService, localSubscriptionService,
  304 + attributesService, stats, sessionRef, cmd.getCmdId(), maxEntitiesPerDataSubscription);
302 ctx.setAndResolveQuery(cmd.getQuery()); 305 ctx.setAndResolveQuery(cmd.getQuery());
303 sessionSubs.put(cmd.getCmdId(), ctx); 306 sessionSubs.put(cmd.getCmdId(), ctx);
304 return ctx; 307 return ctx;
@@ -306,7 +309,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -306,7 +309,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
306 309
307 private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { 310 private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
308 Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); 311 Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
309 - TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); 312 + TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService,
  313 + attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription);
310 ctx.setAndResolveQuery(cmd.getQuery()); 314 ctx.setAndResolveQuery(cmd.getQuery());
311 sessionSubs.put(cmd.getCmdId(), ctx); 315 sessionSubs.put(cmd.getCmdId(), ctx);
312 return ctx; 316 return ctx;
@@ -372,10 +376,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -372,10 +376,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
372 }); 376 });
373 EntityDataUpdate update; 377 EntityDataUpdate update;
374 if (!ctx.isInitialDataSent()) { 378 if (!ctx.isInitialDataSent()) {
375 - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); 379 + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
376 ctx.setInitialDataSent(true); 380 ctx.setInitialDataSent(true);
377 } else { 381 } else {
378 - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); 382 + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription());
379 } 383 }
380 wsService.sendWsMsg(ctx.getSessionId(), update); 384 wsService.sendWsMsg(ctx.getSessionId(), update);
381 if (subscribe) { 385 if (subscribe) {
@@ -422,10 +426,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -422,10 +426,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
422 }); 426 });
423 EntityDataUpdate update; 427 EntityDataUpdate update;
424 if (!ctx.isInitialDataSent()) { 428 if (!ctx.isInitialDataSent()) {
425 - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); 429 + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
426 ctx.setInitialDataSent(true); 430 ctx.setInitialDataSent(true);
427 } else { 431 } else {
428 - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); 432 + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData(), ctx.getMaxEntitiesPerDataSubscription());
429 } 433 }
430 wsService.sendWsMsg(ctx.getSessionId(), update); 434 wsService.sendWsMsg(ctx.getSessionId(), update);
431 ctx.createSubscriptions(latestCmd.getKeys(), true); 435 ctx.createSubscriptions(latestCmd.getKeys(), true);
@@ -440,7 +444,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -440,7 +444,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
440 }, wsCallBackExecutor); 444 }, wsCallBackExecutor);
441 } else { 445 } else {
442 if (!ctx.isInitialDataSent()) { 446 if (!ctx.isInitialDataSent()) {
443 - EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); 447 + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
444 wsService.sendWsMsg(ctx.getSessionId(), update); 448 wsService.sendWsMsg(ctx.getSessionId(), update);
445 ctx.setInitialDataSent(true); 449 ctx.setInitialDataSent(true);
446 } 450 }
@@ -57,6 +57,7 @@ import java.util.List; @@ -57,6 +57,7 @@ import java.util.List;
57 import java.util.Map; 57 import java.util.Map;
58 import java.util.Optional; 58 import java.util.Optional;
59 import java.util.Set; 59 import java.util.Set;
  60 +import java.util.concurrent.ConcurrentHashMap;
60 import java.util.concurrent.ExecutionException; 61 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.ScheduledFuture; 62 import java.util.concurrent.ScheduledFuture;
62 import java.util.function.Function; 63 import java.util.function.Function;
@@ -98,9 +99,9 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends @@ -98,9 +99,9 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
98 this.stats = stats; 99 this.stats = stats;
99 this.sessionRef = sessionRef; 100 this.sessionRef = sessionRef;
100 this.cmdId = cmdId; 101 this.cmdId = cmdId;
101 - this.subToEntityIdMap = new HashMap<>();  
102 - this.subToDynamicValueKeySet = new HashSet<>();  
103 - this.dynamicValues = new HashMap<>(); 102 + this.subToEntityIdMap = new ConcurrentHashMap<>();
  103 + this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet();
  104 + this.dynamicValues = new ConcurrentHashMap<>();
104 } 105 }
105 106
106 public void setAndResolveQuery(T query) { 107 public void setAndResolveQuery(T query) {
@@ -15,13 +15,10 @@ @@ -15,13 +15,10 @@
15 */ 15 */
16 package org.thingsboard.server.service.subscription; 16 package org.thingsboard.server.service.subscription;
17 17
18 -import lombok.AllArgsConstructor;  
19 -import lombok.Data;  
20 import lombok.Getter; 18 import lombok.Getter;
21 import lombok.Setter; 19 import lombok.Setter;
22 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
23 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
24 -import org.thingsboard.server.common.data.page.PageData;  
25 import org.thingsboard.server.common.data.query.EntityData; 22 import org.thingsboard.server.common.data.query.EntityData;
26 import org.thingsboard.server.common.data.query.EntityDataQuery; 23 import org.thingsboard.server.common.data.query.EntityDataQuery;
27 import org.thingsboard.server.common.data.query.EntityKey; 24 import org.thingsboard.server.common.data.query.EntityKey;
@@ -38,8 +35,6 @@ import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; @@ -38,8 +35,6 @@ import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
38 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; 35 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
39 36
40 import java.util.ArrayList; 37 import java.util.ArrayList;
41 -import java.util.Arrays;  
42 -import java.util.Collection;  
43 import java.util.Collections; 38 import java.util.Collections;
44 import java.util.Comparator; 39 import java.util.Comparator;
45 import java.util.HashMap; 40 import java.util.HashMap;
@@ -48,8 +43,6 @@ import java.util.List; @@ -48,8 +43,6 @@ import java.util.List;
48 import java.util.Map; 43 import java.util.Map;
49 import java.util.Optional; 44 import java.util.Optional;
50 import java.util.Set; 45 import java.util.Set;
51 -import java.util.concurrent.ScheduledFuture;  
52 -import java.util.function.Function;  
53 import java.util.stream.Collectors; 46 import java.util.stream.Collectors;
54 47
55 @Slf4j 48 @Slf4j
@@ -63,11 +56,14 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -63,11 +56,14 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
63 private boolean initialDataSent; 56 private boolean initialDataSent;
64 private TimeSeriesCmd curTsCmd; 57 private TimeSeriesCmd curTsCmd;
65 private LatestValueCmd latestValueCmd; 58 private LatestValueCmd latestValueCmd;
  59 + @Getter
  60 + private final int maxEntitiesPerDataSubscription;
66 61
67 public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, 62 public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService,
68 TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, 63 TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService,
69 - SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId) { 64 + SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerDataSubscription) {
70 super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); 65 super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
  66 + this.maxEntitiesPerDataSubscription = maxEntitiesPerDataSubscription;
71 } 67 }
72 68
73 @Override 69 @Override
@@ -120,7 +116,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -120,7 +116,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
120 if (!latestUpdate.isEmpty()) { 116 if (!latestUpdate.isEmpty()) {
121 Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate); 117 Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
122 entityData = new EntityData(entityId, latestMap, null); 118 entityData = new EntityData(entityId, latestMap, null);
123 - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); 119 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
124 } 120 }
125 } 121 }
126 122
@@ -163,7 +159,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -163,7 +159,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
163 Map<String, TsValue[]> tsMap = new HashMap<>(); 159 Map<String, TsValue[]> tsMap = new HashMap<>();
164 tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()]))); 160 tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()])));
165 entityData = new EntityData(entityId, null, tsMap); 161 entityData = new EntityData(entityId, null, tsMap);
166 - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); 162 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData), maxEntitiesPerDataSubscription));
167 } 163 }
168 } 164 }
169 165
@@ -207,7 +203,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -207,7 +203,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
207 ); 203 );
208 } 204 }
209 } 205 }
210 - wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null)); 206 + wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
211 subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId)); 207 subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
212 subsToAdd.forEach(localSubscriptionService::addSubscription); 208 subsToAdd.forEach(localSubscriptionService::addSubscription);
213 } 209 }
@@ -18,6 +18,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2; @@ -18,6 +18,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2;
18 import com.fasterxml.jackson.annotation.JsonCreator; 18 import com.fasterxml.jackson.annotation.JsonCreator;
19 import com.fasterxml.jackson.annotation.JsonIgnoreProperties; 19 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20 import com.fasterxml.jackson.annotation.JsonProperty; 20 import com.fasterxml.jackson.annotation.JsonProperty;
  21 +import lombok.Getter;
21 import org.thingsboard.server.common.data.page.PageData; 22 import org.thingsboard.server.common.data.page.PageData;
22 import org.thingsboard.server.common.data.query.EntityData; 23 import org.thingsboard.server.common.data.query.EntityData;
23 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; 24 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
@@ -27,8 +28,12 @@ import java.util.List; @@ -27,8 +28,12 @@ import java.util.List;
27 28
28 public class EntityDataUpdate extends DataUpdate<EntityData> { 29 public class EntityDataUpdate extends DataUpdate<EntityData> {
29 30
30 - public EntityDataUpdate(int cmdId, PageData<EntityData> data, List<EntityData> update) { 31 + @Getter
  32 + private long allowedEntities;
  33 +
  34 + public EntityDataUpdate(int cmdId, PageData<EntityData> data, List<EntityData> update, long allowedEntities) {
31 super(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null); 35 super(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null);
  36 + this.allowedEntities = allowedEntities;
32 } 37 }
33 38
34 public EntityDataUpdate(int cmdId, int errorCode, String errorMsg) { 39 public EntityDataUpdate(int cmdId, int errorCode, String errorMsg) {
@@ -50,7 +50,8 @@ server: @@ -50,7 +50,8 @@ server:
50 refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}" 50 refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}"
51 refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}" 51 refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
52 max_per_user: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_MAX_PER_USER:10}" 52 max_per_user: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_MAX_PER_USER:10}"
53 - max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:1000}" 53 + max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}"
  54 + max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
54 rest: 55 rest:
55 limits: 56 limits:
56 tenant: 57 tenant: