Commit 9e27d453941f9036045c5223c69f91ce85251f27

Authored by Igor Kulikov
2 parents ad479028 8cba4400

Merge branch 'feature/entity-data-query' of github.com:thingsboard/thingsboard i…

…nto feature/entity-data-query
@@ -124,7 +124,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -124,7 +124,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
124 124
125 @Override 125 @Override
126 public void addSubscription(TbSubscription subscription, TbCallback callback) { 126 public void addSubscription(TbSubscription subscription, TbCallback callback) {
127 - log.trace("[{}][{}][{}] Registering remote subscription for entity [{}]", 127 + log.trace("[{}][{}][{}] Registering subscription for entity [{}]",
128 subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); 128 subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
129 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); 129 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
130 if (currentPartitions.contains(tpi)) { 130 if (currentPartitions.contains(tpi)) {
@@ -145,6 +145,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -145,6 +145,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
145 if (wsCallBackExecutor != null) { 145 if (wsCallBackExecutor != null) {
146 wsCallBackExecutor.shutdownNow(); 146 wsCallBackExecutor.shutdownNow();
147 } 147 }
  148 + if (scheduler != null) {
  149 + scheduler.shutdownNow();
  150 + }
148 } 151 }
149 152
150 @Override 153 @Override
@@ -230,11 +233,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -230,11 +233,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
230 private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) { 233 private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) {
231 try { 234 try {
232 long start = System.currentTimeMillis(); 235 long start = System.currentTimeMillis();
233 - Collection<Integer> oldSubIds = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery())); 236 + TbEntityDataSubCtx.TbEntityDataSubCtxUpdateResult result = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery()));
234 long end = System.currentTimeMillis(); 237 long end = System.currentTimeMillis();
235 dynamicQueryInvocationCnt.incrementAndGet(); 238 dynamicQueryInvocationCnt.incrementAndGet();
236 dynamicQueryTimeSpent.addAndGet(end - start); 239 dynamicQueryTimeSpent.addAndGet(end - start);
237 - oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); 240 + result.getSubsToCancel().forEach(subId -> localSubscriptionService.cancelSubscription(finalCtx.getSessionId(), subId));
  241 + result.getSubsToAdd().forEach(localSubscriptionService::addSubscription);
238 } catch (Exception e) { 242 } catch (Exception e) {
239 log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e); 243 log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e);
240 } 244 }
@@ -340,21 +344,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -340,21 +344,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
340 }, wsCallBackExecutor); 344 }, wsCallBackExecutor);
341 } 345 }
342 346
343 - private List<ReadTsKvQuery> getReadTsKvQueries(GetTsCmd cmd) {  
344 - List<ReadTsKvQuery> finalTsKvQueryList;  
345 - List<ReadTsKvQuery> queries = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(),  
346 - getLimit(cmd.getLimit()), cmd.getAgg())).collect(Collectors.toList());  
347 - if (cmd.isFetchLatestPreviousPoint()) {  
348 - finalTsKvQueryList = new ArrayList<>(queries);  
349 - finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(  
350 - key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()  
351 - )).collect(Collectors.toList()));  
352 - } else {  
353 - finalTsKvQueryList = queries;  
354 - }  
355 - return finalTsKvQueryList;  
356 - }  
357 -  
358 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { 347 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
359 log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); 348 log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
360 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. 349 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.service.subscription; 16 package org.thingsboard.server.service.subscription;
17 17
  18 +import lombok.AllArgsConstructor;
18 import lombok.Data; 19 import lombok.Data;
19 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
20 import org.thingsboard.server.common.data.id.CustomerId; 21 import org.thingsboard.server.common.data.id.CustomerId;
@@ -62,7 +63,6 @@ public class TbEntityDataSubCtx { @@ -62,7 +63,6 @@ public class TbEntityDataSubCtx {
62 private TimeSeriesCmd tsCmd; 63 private TimeSeriesCmd tsCmd;
63 private PageData<EntityData> data; 64 private PageData<EntityData> data;
64 private boolean initialDataSent; 65 private boolean initialDataSent;
65 - private List<TbSubscription> tbSubs;  
66 private Map<Integer, EntityId> subToEntityIdMap; 66 private Map<Integer, EntityId> subToEntityIdMap;
67 private volatile ScheduledFuture<?> refreshTask; 67 private volatile ScheduledFuture<?> refreshTask;
68 private TimeSeriesCmd curTsCmd; 68 private TimeSeriesCmd curTsCmd;
@@ -93,10 +93,10 @@ public class TbEntityDataSubCtx { @@ -93,10 +93,10 @@ public class TbEntityDataSubCtx {
93 93
94 public List<TbSubscription> createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) { 94 public List<TbSubscription> createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) {
95 this.subToEntityIdMap = new HashMap<>(); 95 this.subToEntityIdMap = new HashMap<>();
96 - tbSubs = new ArrayList<>(); 96 + List<TbSubscription> tbSubs = new ArrayList<>();
97 Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys); 97 Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
98 for (EntityData entityData : data.getData()) { 98 for (EntityData entityData : data.getData()) {
99 - addSubscription(entityData, keysByType, resultToLatestValues); 99 + tbSubs.addAll(addSubscriptions(entityData, keysByType, resultToLatestValues));
100 } 100 }
101 return tbSubs; 101 return tbSubs;
102 } 102 }
@@ -107,33 +107,35 @@ public class TbEntityDataSubCtx { @@ -107,33 +107,35 @@ public class TbEntityDataSubCtx {
107 return keysByType; 107 return keysByType;
108 } 108 }
109 109
110 - private void addSubscription(EntityData entityData, Map<EntityKeyType, List<EntityKey>> keysByType, boolean resultToLatestValues) { 110 + private List<TbSubscription> addSubscriptions(EntityData entityData, Map<EntityKeyType, List<EntityKey>> keysByType, boolean resultToLatestValues) {
  111 + List<TbSubscription> subscriptionList = new ArrayList<>();
111 keysByType.forEach((keysType, keysList) -> { 112 keysByType.forEach((keysType, keysList) -> {
112 int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); 113 int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
113 subToEntityIdMap.put(subIdx, entityData.getEntityId()); 114 subToEntityIdMap.put(subIdx, entityData.getEntityId());
114 switch (keysType) { 115 switch (keysType) {
115 case TIME_SERIES: 116 case TIME_SERIES:
116 - tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues)); 117 + subscriptionList.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues));
117 break; 118 break;
118 case CLIENT_ATTRIBUTE: 119 case CLIENT_ATTRIBUTE:
119 - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList)); 120 + subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
120 break; 121 break;
121 case SHARED_ATTRIBUTE: 122 case SHARED_ATTRIBUTE:
122 - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList)); 123 + subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
123 break; 124 break;
124 case SERVER_ATTRIBUTE: 125 case SERVER_ATTRIBUTE:
125 - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList)); 126 + subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
126 break; 127 break;
127 case ATTRIBUTE: 128 case ATTRIBUTE:
128 - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList)); 129 + subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
129 break; 130 break;
130 } 131 }
131 }); 132 });
  133 + return subscriptionList;
132 } 134 }
133 135
134 private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) { 136 private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) {
135 Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys); 137 Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys);
136 - log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); 138 + log.trace("[{}][{}][{}] Creating attributes subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates);
137 return TbAttributeSubscription.builder() 139 return TbAttributeSubscription.builder()
138 .serviceId(serviceId) 140 .serviceId(serviceId)
139 .sessionId(sessionRef.getSessionId()) 141 .sessionId(sessionRef.getSessionId())
@@ -156,7 +158,7 @@ public class TbEntityDataSubCtx { @@ -156,7 +158,7 @@ public class TbEntityDataSubCtx {
156 keyStates.put(k, ts); 158 keyStates.put(k, ts);
157 }); 159 });
158 } 160 }
159 - log.trace("[{}][{}][{}] Creating time-series subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); 161 + log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates);
160 return TbTimeseriesSubscription.builder() 162 return TbTimeseriesSubscription.builder()
161 .serviceId(serviceId) 163 .serviceId(serviceId)
162 .sessionId(sessionRef.getSessionId()) 164 .sessionId(sessionRef.getSessionId())
@@ -304,7 +306,7 @@ public class TbEntityDataSubCtx { @@ -304,7 +306,7 @@ public class TbEntityDataSubCtx {
304 } 306 }
305 } 307 }
306 308
307 - public Collection<Integer> update(PageData<EntityData> newData) { 309 + public TbEntityDataSubCtxUpdateResult update(PageData<EntityData> newData) {
308 Map<EntityId, EntityData> oldDataMap; 310 Map<EntityId, EntityData> oldDataMap;
309 if (data != null && !data.getData().isEmpty()) { 311 if (data != null && !data.getData().isEmpty()) {
310 oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); 312 oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
@@ -314,20 +316,21 @@ public class TbEntityDataSubCtx { @@ -314,20 +316,21 @@ public class TbEntityDataSubCtx {
314 Map<EntityId, EntityData> newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); 316 Map<EntityId, EntityData> newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
315 if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) { 317 if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) {
316 log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId); 318 log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId);
317 - return Collections.emptyList(); 319 + return TbEntityDataSubCtxUpdateResult.EMPTY;
318 } else { 320 } else {
319 this.data = newData; 321 this.data = newData;
320 - List<Integer> subIdsToRemove = new ArrayList<>(); 322 + List<Integer> subIdsToCancel = new ArrayList<>();
  323 + List<TbSubscription> subsToAdd = new ArrayList<>();
321 Set<EntityId> currentSubs = new HashSet<>(); 324 Set<EntityId> currentSubs = new HashSet<>();
322 subToEntityIdMap.forEach((subId, entityId) -> { 325 subToEntityIdMap.forEach((subId, entityId) -> {
323 if (!newDataMap.containsKey(entityId)) { 326 if (!newDataMap.containsKey(entityId)) {
324 - subIdsToRemove.add(subId); 327 + subIdsToCancel.add(subId);
325 } else { 328 } else {
326 currentSubs.add(entityId); 329 currentSubs.add(entityId);
327 } 330 }
328 }); 331 });
329 - log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToRemove);  
330 - subIdsToRemove.forEach(subToEntityIdMap::remove); 332 + log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToCancel);
  333 + subIdsToCancel.forEach(subToEntityIdMap::remove);
331 List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList()); 334 List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList());
332 if (!newSubsList.isEmpty()) { 335 if (!newSubsList.isEmpty()) {
333 boolean resultToLatestValues; 336 boolean resultToLatestValues;
@@ -346,13 +349,13 @@ public class TbEntityDataSubCtx { @@ -346,13 +349,13 @@ public class TbEntityDataSubCtx {
346 newSubsList.forEach( 349 newSubsList.forEach(
347 entity -> { 350 entity -> {
348 log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId()); 351 log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId());
349 - addSubscription(entity, keysByType, resultToLatestValues); 352 + subsToAdd.addAll(addSubscriptions(entity, keysByType, resultToLatestValues));
350 } 353 }
351 ); 354 );
352 } 355 }
353 } 356 }
354 wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null)); 357 wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null));
355 - return subIdsToRemove; 358 + return new TbEntityDataSubCtxUpdateResult(subIdsToCancel, subsToAdd);
356 } 359 }
357 } 360 }
358 361
@@ -360,4 +363,14 @@ public class TbEntityDataSubCtx { @@ -360,4 +363,14 @@ public class TbEntityDataSubCtx {
360 curTsCmd = cmd.getTsCmd(); 363 curTsCmd = cmd.getTsCmd();
361 latestValueCmd = cmd.getLatestCmd(); 364 latestValueCmd = cmd.getLatestCmd();
362 } 365 }
  366 +
  367 + @Data
  368 + @AllArgsConstructor
  369 + public static class TbEntityDataSubCtxUpdateResult {
  370 +
  371 + private static TbEntityDataSubCtxUpdateResult EMPTY = new TbEntityDataSubCtxUpdateResult(Collections.emptyList(), Collections.emptyList());
  372 +
  373 + private List<Integer> subsToCancel;
  374 + private List<TbSubscription> subsToAdd;
  375 + }
363 } 376 }