...
|
...
|
@@ -19,17 +19,16 @@ import com.google.common.util.concurrent.FutureCallback; |
19
|
19
|
import com.google.common.util.concurrent.Futures;
|
20
|
20
|
import com.google.common.util.concurrent.ListenableFuture;
|
21
|
21
|
import com.google.common.util.concurrent.MoreExecutors;
|
|
22
|
+import lombok.Getter;
|
22
|
23
|
import lombok.extern.slf4j.Slf4j;
|
23
|
24
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
24
|
25
|
import org.springframework.beans.factory.annotation.Autowired;
|
25
|
26
|
import org.springframework.beans.factory.annotation.Value;
|
26
|
27
|
import org.springframework.context.annotation.Lazy;
|
27
|
|
-import org.springframework.context.event.EventListener;
|
|
28
|
+import org.springframework.scheduling.annotation.Scheduled;
|
28
|
29
|
import org.springframework.stereotype.Service;
|
29
|
30
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
30
|
|
-import org.thingsboard.server.common.data.EntityView;
|
31
|
31
|
import org.thingsboard.server.common.data.id.CustomerId;
|
32
|
|
-import org.thingsboard.server.common.data.id.EntityViewId;
|
33
|
32
|
import org.thingsboard.server.common.data.id.TenantId;
|
34
|
33
|
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
35
|
34
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
...
|
...
|
@@ -40,19 +39,12 @@ import org.thingsboard.server.common.data.query.EntityDataQuery; |
40
|
39
|
import org.thingsboard.server.common.data.query.EntityKey;
|
41
|
40
|
import org.thingsboard.server.common.data.query.EntityKeyType;
|
42
|
41
|
import org.thingsboard.server.common.data.query.TsValue;
|
43
|
|
-import org.thingsboard.server.common.msg.queue.ServiceType;
|
44
|
|
-import org.thingsboard.server.common.msg.queue.TbCallback;
|
45
|
|
-import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
46
|
42
|
import org.thingsboard.server.dao.entity.EntityService;
|
47
|
43
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
48
|
44
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
49
|
|
-import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
|
50
|
|
-import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
|
51
|
|
-import org.thingsboard.server.queue.discovery.PartitionService;
|
52
|
45
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
53
|
46
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
54
|
|
-import org.thingsboard.server.service.queue.TbClusterService;
|
55
|
|
-import org.thingsboard.server.service.security.permission.Operation;
|
|
47
|
+import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
56
|
48
|
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
|
57
|
49
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
58
|
50
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
|
...
|
...
|
@@ -60,15 +52,18 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; |
60
|
52
|
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
|
61
|
53
|
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
|
62
|
54
|
import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
|
|
55
|
+import org.thingsboard.server.service.telemetry.cmd.v2.GetTsCmd;
|
63
|
56
|
import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
|
64
|
57
|
import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
|
65
|
58
|
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
|
66
|
|
-import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
|
67
|
59
|
|
68
|
60
|
import javax.annotation.PostConstruct;
|
69
|
61
|
import javax.annotation.PreDestroy;
|
70
|
62
|
import java.util.ArrayList;
|
|
63
|
+import java.util.Arrays;
|
71
|
64
|
import java.util.Collection;
|
|
65
|
+import java.util.Collections;
|
|
66
|
+import java.util.Comparator;
|
72
|
67
|
import java.util.HashMap;
|
73
|
68
|
import java.util.LinkedHashMap;
|
74
|
69
|
import java.util.LinkedHashSet;
|
...
|
...
|
@@ -79,6 +74,12 @@ import java.util.concurrent.ConcurrentHashMap; |
79
|
74
|
import java.util.concurrent.ExecutionException;
|
80
|
75
|
import java.util.concurrent.ExecutorService;
|
81
|
76
|
import java.util.concurrent.Executors;
|
|
77
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
78
|
+import java.util.concurrent.ScheduledFuture;
|
|
79
|
+import java.util.concurrent.ThreadFactory;
|
|
80
|
+import java.util.concurrent.TimeUnit;
|
|
81
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
82
|
+import java.util.concurrent.atomic.AtomicLong;
|
82
|
83
|
import java.util.stream.Collectors;
|
83
|
84
|
|
84
|
85
|
@Slf4j
|
...
|
...
|
@@ -87,29 +88,15 @@ import java.util.stream.Collectors; |
87
|
88
|
public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService {
|
88
|
89
|
|
89
|
90
|
private static final int DEFAULT_LIMIT = 100;
|
90
|
|
- private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
|
91
|
91
|
private final Map<String, Map<Integer, TbEntityDataSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<>();
|
92
|
92
|
|
93
|
93
|
@Autowired
|
94
|
94
|
private TelemetryWebSocketService wsService;
|
95
|
95
|
|
96
|
96
|
@Autowired
|
97
|
|
- private EntityViewService entityViewService;
|
98
|
|
-
|
99
|
|
- @Autowired
|
100
|
97
|
private EntityService entityService;
|
101
|
98
|
|
102
|
99
|
@Autowired
|
103
|
|
- private PartitionService partitionService;
|
104
|
|
-
|
105
|
|
- @Autowired
|
106
|
|
- private TbClusterService clusterService;
|
107
|
|
-
|
108
|
|
- @Autowired
|
109
|
|
- @Lazy
|
110
|
|
- private SubscriptionManagerService subscriptionManagerService;
|
111
|
|
-
|
112
|
|
- @Autowired
|
113
|
100
|
@Lazy
|
114
|
101
|
private TbLocalSubscriptionService localSubscriptionService;
|
115
|
102
|
|
...
|
...
|
@@ -119,18 +106,38 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
119
|
106
|
@Autowired
|
120
|
107
|
private TbServiceInfoProvider serviceInfoProvider;
|
121
|
108
|
|
|
109
|
+ @Autowired
|
|
110
|
+ @Getter
|
|
111
|
+ private DbCallbackExecutorService dbCallbackExecutor;
|
|
112
|
+
|
|
113
|
+ private ScheduledExecutorService scheduler;
|
|
114
|
+
|
122
|
115
|
@Value("${database.ts.type}")
|
123
|
116
|
private String databaseTsType;
|
|
117
|
+ @Value("${server.ws.dynamic_page_link_refresh_interval:6}")
|
|
118
|
+ private long dynamicPageLinkRefreshInterval;
|
|
119
|
+ @Value("${server.ws.dynamic_page_link_refresh_pool_size:1}")
|
|
120
|
+ private int dynamicPageLinkRefreshPoolSize;
|
124
|
121
|
|
125
|
122
|
private ExecutorService wsCallBackExecutor;
|
126
|
123
|
private boolean tsInSqlDB;
|
127
|
124
|
private String serviceId;
|
|
125
|
+ private AtomicInteger regularQueryInvocationCnt = new AtomicInteger();
|
|
126
|
+ private AtomicInteger dynamicQueryInvocationCnt = new AtomicInteger();
|
|
127
|
+ private AtomicLong regularQueryTimeSpent = new AtomicLong();
|
|
128
|
+ private AtomicLong dynamicQueryTimeSpent = new AtomicLong();
|
128
|
129
|
|
129
|
130
|
@PostConstruct
|
130
|
131
|
public void initExecutor() {
|
131
|
132
|
serviceId = serviceInfoProvider.getServiceId();
|
132
|
133
|
wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback"));
|
133
|
134
|
tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale");
|
|
135
|
+ ThreadFactory tbThreadFactory = ThingsBoardThreadFactory.forName("ws-entity-sub-scheduler");
|
|
136
|
+ if (dynamicPageLinkRefreshPoolSize == 1) {
|
|
137
|
+ scheduler = Executors.newSingleThreadScheduledExecutor(tbThreadFactory);
|
|
138
|
+ } else {
|
|
139
|
+ scheduler = Executors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, tbThreadFactory);
|
|
140
|
+ }
|
134
|
141
|
}
|
135
|
142
|
|
136
|
143
|
@PreDestroy
|
...
|
...
|
@@ -141,44 +148,18 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
141
|
148
|
}
|
142
|
149
|
|
143
|
150
|
@Override
|
144
|
|
- @EventListener(PartitionChangeEvent.class)
|
145
|
|
- public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
146
|
|
- if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
|
147
|
|
- currentPartitions.clear();
|
148
|
|
- currentPartitions.addAll(partitionChangeEvent.getPartitions());
|
149
|
|
- }
|
150
|
|
- }
|
151
|
|
-
|
152
|
|
- @Override
|
153
|
|
- @EventListener(ClusterTopologyChangeEvent.class)
|
154
|
|
- public void onApplicationEvent(ClusterTopologyChangeEvent event) {
|
155
|
|
- if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
|
156
|
|
- /*
|
157
|
|
- * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
|
158
|
|
- * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
|
159
|
|
- * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
|
160
|
|
- * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
|
161
|
|
- * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
|
162
|
|
- * Since number of subscriptions is usually much less then number of devices that are pushing data.
|
163
|
|
-// */
|
164
|
|
-// subscriptionsBySessionId.values().forEach(map -> map.values()
|
165
|
|
-// .forEach(sub -> pushSubscriptionToManagerService(sub, false)));
|
166
|
|
- }
|
167
|
|
- }
|
168
|
|
-
|
169
|
|
- @Override
|
170
|
151
|
public void handleCmd(TelemetryWebSocketSessionRef session, EntityDataCmd cmd) {
|
171
|
152
|
TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
|
172
|
153
|
if (ctx != null) {
|
173
|
154
|
log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
|
174
|
155
|
if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) {
|
175
|
|
- Collection<Integer> oldSubIds = ctx.clearSubscriptions();
|
176
|
|
- oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
|
156
|
+ clearSubs(ctx);
|
177
|
157
|
}
|
178
|
158
|
} else {
|
179
|
159
|
log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
|
180
|
160
|
ctx = createSubCtx(session, cmd);
|
181
|
161
|
}
|
|
162
|
+ ctx.setCurrentCmd(cmd);
|
182
|
163
|
if (cmd.getQuery() != null) {
|
183
|
164
|
if (ctx.getQuery() == null) {
|
184
|
165
|
log.debug("[{}][{}] Initializing data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery());
|
...
|
...
|
@@ -197,13 +178,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
197
|
178
|
}
|
198
|
179
|
});
|
199
|
180
|
}
|
|
181
|
+ long start = System.currentTimeMillis();
|
200
|
182
|
PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery());
|
|
183
|
+ long end = System.currentTimeMillis();
|
|
184
|
+ regularQueryInvocationCnt.incrementAndGet();
|
|
185
|
+ regularQueryTimeSpent.addAndGet(end - start);
|
|
186
|
+
|
201
|
187
|
if (log.isTraceEnabled()) {
|
202
|
188
|
data.getData().forEach(ed -> {
|
203
|
189
|
log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed);
|
204
|
190
|
});
|
205
|
191
|
}
|
206
|
192
|
ctx.setData(data);
|
|
193
|
+ ctx.cancelRefreshTask();
|
|
194
|
+ if (ctx.getQuery().getPageLink().isDynamic()) {
|
|
195
|
+ TbEntityDataSubCtx finalCtx = ctx;
|
|
196
|
+ ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
|
|
197
|
+ () -> refreshDynamicQuery(tenantId, customerId, finalCtx),
|
|
198
|
+ dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
|
|
199
|
+ finalCtx.setRefreshTask(task);
|
|
200
|
+ }
|
207
|
201
|
}
|
208
|
202
|
ListenableFuture<TbEntityDataSubCtx> historyFuture;
|
209
|
203
|
if (cmd.getHistoryCmd() != null) {
|
...
|
...
|
@@ -233,6 +227,35 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
233
|
227
|
}, wsCallBackExecutor);
|
234
|
228
|
}
|
235
|
229
|
|
|
230
|
+ private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) {
|
|
231
|
+ try {
|
|
232
|
+ long start = System.currentTimeMillis();
|
|
233
|
+ Collection<Integer> oldSubIds = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery()));
|
|
234
|
+ long end = System.currentTimeMillis();
|
|
235
|
+ dynamicQueryInvocationCnt.incrementAndGet();
|
|
236
|
+ dynamicQueryTimeSpent.addAndGet(end - start);
|
|
237
|
+ oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
|
238
|
+ } catch (Exception e) {
|
|
239
|
+ log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e);
|
|
240
|
+ }
|
|
241
|
+ }
|
|
242
|
+
|
|
243
|
+ @Scheduled(fixedDelayString = "${server.ws.dynamic_page_link_stats:10000}")
|
|
244
|
+ public void printStats() {
|
|
245
|
+ int regularQueryInvocationCntValue = regularQueryInvocationCnt.getAndSet(0);
|
|
246
|
+ long regularQueryInvocationTimeValue = regularQueryTimeSpent.getAndSet(0);
|
|
247
|
+ int dynamicQueryInvocationCntValue = dynamicQueryInvocationCnt.getAndSet(0);
|
|
248
|
+ long dynamicQueryInvocationTimeValue = dynamicQueryTimeSpent.getAndSet(0);
|
|
249
|
+ long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count();
|
|
250
|
+ log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}]",
|
|
251
|
+ regularQueryInvocationCntValue, regularQueryInvocationTimeValue, dynamicQueryCnt, dynamicQueryInvocationCntValue, dynamicQueryInvocationTimeValue);
|
|
252
|
+ }
|
|
253
|
+
|
|
254
|
+ private void clearSubs(TbEntityDataSubCtx ctx) {
|
|
255
|
+ Collection<Integer> oldSubIds = ctx.clearSubscriptions();
|
|
256
|
+ oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
|
257
|
+ }
|
|
258
|
+
|
236
|
259
|
private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
237
|
260
|
Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
|
238
|
261
|
TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId());
|
...
|
...
|
@@ -250,49 +273,86 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
250
|
273
|
}
|
251
|
274
|
}
|
252
|
275
|
|
253
|
|
- private void handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {
|
254
|
|
- List<String> keys = cmd.getKeys();
|
|
276
|
+ private ListenableFuture<TbEntityDataSubCtx> handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {
|
255
|
277
|
log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getTimeWindow(), cmd.getKeys());
|
256
|
|
- long startTs = cmd.getStartTs();
|
257
|
|
- long endTs = cmd.getStartTs() + cmd.getTimeWindow();
|
258
|
|
-
|
259
|
|
- Map<EntityData, ListenableFuture<Map<String, List<TsValue>>>> tsFutures = new HashMap<>();
|
260
|
|
- for (EntityData entityData : ctx.getData().getData()) {
|
261
|
|
- List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),
|
262
|
|
- getLimit(cmd.getLimit()), DefaultTelemetryWebSocketService.getAggregation(cmd.getAgg()))).collect(Collectors.toList());
|
263
|
|
- ListenableFuture<List<TsKvEntry>> tsDataFutures = tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), queries);
|
264
|
|
- tsFutures.put(entityData, Futures.transform(tsDataFutures, this::toTsValues, MoreExecutors.directExecutor()));
|
|
278
|
+ return handleGetTsCmd(ctx, cmd, true);
|
|
279
|
+ }
|
|
280
|
+
|
|
281
|
+
|
|
282
|
+ private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd cmd) {
|
|
283
|
+ log.debug("[{}][{}] Fetching history data for start {} and end {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getStartTs(), cmd.getEndTs(), cmd.getKeys());
|
|
284
|
+ return handleGetTsCmd(ctx, cmd, false);
|
|
285
|
+ }
|
|
286
|
+
|
|
287
|
+ private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) {
|
|
288
|
+ List<String> keys = cmd.getKeys();
|
|
289
|
+ List<ReadTsKvQuery> finalTsKvQueryList;
|
|
290
|
+ List<ReadTsKvQuery> tsKvQueryList = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
|
291
|
+ key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
|
|
292
|
+ )).collect(Collectors.toList());
|
|
293
|
+ if (cmd.isFetchLatestPreviousPoint()) {
|
|
294
|
+ finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
|
|
295
|
+ tsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
|
296
|
+ key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
|
|
297
|
+ )).collect(Collectors.toList()));
|
|
298
|
+ } else {
|
|
299
|
+ finalTsKvQueryList = tsKvQueryList;
|
265
|
300
|
}
|
266
|
|
- Futures.addCallback(Futures.allAsList(tsFutures.values()), new FutureCallback<List<Map<String, List<TsValue>>>>() {
|
267
|
|
- @Override
|
268
|
|
- public void onSuccess(@Nullable List<Map<String, List<TsValue>>> result) {
|
269
|
|
- tsFutures.forEach((key, value) -> {
|
270
|
|
- try {
|
271
|
|
- value.get().forEach((k, v) -> key.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
|
272
|
|
- } catch (InterruptedException | ExecutionException e) {
|
273
|
|
- log.warn("[{}][{}] Failed to lookup time-series data: {}:{}", ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), keys, e);
|
|
301
|
+ Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();
|
|
302
|
+ ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,
|
|
303
|
+ tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList)));
|
|
304
|
+ return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
|
|
305
|
+ fetchResultMap.forEach((entityData, future) -> {
|
|
306
|
+ Map<String, List<TsValue>> keyData = new LinkedHashMap<>();
|
|
307
|
+ cmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>()));
|
|
308
|
+ try {
|
|
309
|
+ List<TsKvEntry> entityTsData = future.get();
|
|
310
|
+ if (entityTsData != null) {
|
|
311
|
+ entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString())));
|
274
|
312
|
}
|
275
|
|
- });
|
276
|
|
- EntityDataUpdate update;
|
277
|
|
- if (!ctx.isInitialDataSent()) {
|
278
|
|
- update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
|
279
|
|
- ctx.setInitialDataSent(true);
|
280
|
|
- } else {
|
281
|
|
- update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
|
|
313
|
+ keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
|
|
314
|
+ if (cmd.isFetchLatestPreviousPoint()) {
|
|
315
|
+ entityData.getTimeseries().values().forEach(dataArray -> {
|
|
316
|
+ Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs()));
|
|
317
|
+ });
|
|
318
|
+ }
|
|
319
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
320
|
+ log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
|
|
321
|
+ wsService.sendWsMsg(ctx.getSessionId(),
|
|
322
|
+ new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
|
282
|
323
|
}
|
283
|
|
- wsService.sendWsMsg(ctx.getSessionId(), update);
|
284
|
|
- createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false);
|
|
324
|
+ });
|
|
325
|
+ EntityDataUpdate update;
|
|
326
|
+ if (!ctx.isInitialDataSent()) {
|
|
327
|
+ update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
|
|
328
|
+ ctx.setInitialDataSent(true);
|
|
329
|
+ } else {
|
|
330
|
+ update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
|
285
|
331
|
}
|
286
|
|
-
|
287
|
|
- @Override
|
288
|
|
- public void onFailure(Throwable t) {
|
289
|
|
- log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), cmd, t);
|
290
|
|
- wsService.sendWsMsg(ctx.getSessionId(),
|
291
|
|
- new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
|
|
332
|
+ wsService.sendWsMsg(ctx.getSessionId(), update);
|
|
333
|
+ if (subscribe) {
|
|
334
|
+ createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false);
|
292
|
335
|
}
|
|
336
|
+ ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());
|
|
337
|
+ return ctx;
|
293
|
338
|
}, wsCallBackExecutor);
|
294
|
339
|
}
|
295
|
340
|
|
|
341
|
+ private List<ReadTsKvQuery> getReadTsKvQueries(GetTsCmd cmd) {
|
|
342
|
+ List<ReadTsKvQuery> finalTsKvQueryList;
|
|
343
|
+ List<ReadTsKvQuery> queries = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(),
|
|
344
|
+ getLimit(cmd.getLimit()), cmd.getAgg())).collect(Collectors.toList());
|
|
345
|
+ if (cmd.isFetchLatestPreviousPoint()) {
|
|
346
|
+ finalTsKvQueryList = new ArrayList<>(queries);
|
|
347
|
+ finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
|
348
|
+ key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
|
|
349
|
+ )).collect(Collectors.toList()));
|
|
350
|
+ } else {
|
|
351
|
+ finalTsKvQueryList = queries;
|
|
352
|
+ }
|
|
353
|
+ return finalTsKvQueryList;
|
|
354
|
+ }
|
|
355
|
+
|
296
|
356
|
private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
|
297
|
357
|
log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
|
298
|
358
|
//Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
|
...
|
...
|
@@ -377,150 +437,24 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
377
|
437
|
return results;
|
378
|
438
|
}
|
379
|
439
|
|
380
|
|
- private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) {
|
381
|
|
- List<ReadTsKvQuery> tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
382
|
|
- key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg()
|
383
|
|
- )).collect(Collectors.toList());
|
384
|
|
- Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();
|
385
|
|
- ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,
|
386
|
|
- tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), tsKvQueryList)));
|
387
|
|
- return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
|
388
|
|
- fetchResultMap.forEach((entityData, future) -> {
|
389
|
|
- Map<String, List<TsValue>> keyData = new LinkedHashMap<>();
|
390
|
|
- historyCmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>()));
|
391
|
|
- try {
|
392
|
|
- List<TsKvEntry> entityTsData = future.get();
|
393
|
|
- if (entityTsData != null) {
|
394
|
|
- entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString())));
|
395
|
|
- }
|
396
|
|
- keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
|
397
|
|
- } catch (InterruptedException | ExecutionException e) {
|
398
|
|
- log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
|
399
|
|
- wsService.sendWsMsg(ctx.getSessionId(),
|
400
|
|
- new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
|
401
|
|
- }
|
402
|
|
- });
|
403
|
|
- EntityDataUpdate update;
|
404
|
|
- if (!ctx.isInitialDataSent()) {
|
405
|
|
- update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
|
406
|
|
- ctx.setInitialDataSent(true);
|
407
|
|
- } else {
|
408
|
|
- update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
|
409
|
|
- }
|
410
|
|
- wsService.sendWsMsg(ctx.getSessionId(), update);
|
411
|
|
- return ctx;
|
412
|
|
- }, wsCallBackExecutor);
|
413
|
|
- }
|
414
|
|
-
|
415
|
440
|
@Override
|
416
|
441
|
public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) {
|
417
|
|
- TbEntityDataSubCtx ctx = getSubCtx(sessionId, cmd.getCmdId());
|
|
442
|
+ cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId()));
|
418
|
443
|
}
|
419
|
444
|
|
420
|
|
-// //TODO 3.1: replace null callbacks with callbacks from websocket service.
|
421
|
|
-// @Override
|
422
|
|
-// public void addSubscription(TbSubscription subscription) {
|
423
|
|
-// EntityId entityId = subscription.getEntityId();
|
424
|
|
-// // Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges;
|
425
|
|
-// if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) {
|
426
|
|
-// subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription);
|
427
|
|
-// }
|
428
|
|
-// pushSubscriptionToManagerService(subscription, true);
|
429
|
|
-// registerSubscription(subscription);
|
430
|
|
-// }
|
431
|
|
-
|
432
|
|
-// private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) {
|
433
|
|
-// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
|
434
|
|
-// if (currentPartitions.contains(tpi)) {
|
435
|
|
-// // Subscription is managed on the same server;
|
436
|
|
-// if (pushToLocalService) {
|
437
|
|
-// subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY);
|
438
|
|
-// }
|
439
|
|
-// } else {
|
440
|
|
-// // Push to the queue;
|
441
|
|
-// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription);
|
442
|
|
-// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
|
443
|
|
-// }
|
444
|
|
-// }
|
445
|
|
-
|
446
|
|
- @Override
|
447
|
|
- public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback) {
|
448
|
|
-// TbSubscription subscription = subscriptionsBySessionId
|
449
|
|
-// .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
|
450
|
|
-// if (subscription != null) {
|
451
|
|
-// switch (subscription.getType()) {
|
452
|
|
-// case TIMESERIES:
|
453
|
|
-// TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription;
|
454
|
|
-// update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value));
|
455
|
|
-// break;
|
456
|
|
-// case ATTRIBUTES:
|
457
|
|
-// TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription;
|
458
|
|
-// update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value));
|
459
|
|
-// break;
|
460
|
|
-// }
|
461
|
|
-// wsService.sendWsMsg(sessionId, update);
|
462
|
|
-// }
|
463
|
|
-// callback.onSuccess();
|
|
445
|
+ private void cleanupAndCancel(TbEntityDataSubCtx ctx) {
|
|
446
|
+ if (ctx != null) {
|
|
447
|
+ ctx.cancelRefreshTask();
|
|
448
|
+ clearSubs(ctx);
|
|
449
|
+ }
|
464
|
450
|
}
|
465
|
451
|
|
466
|
|
-// @Override
|
467
|
|
-// public void cancelSubscription(String sessionId, int subscriptionId) {
|
468
|
|
-// log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
|
469
|
|
-// Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
|
470
|
|
-// if (sessionSubscriptions != null) {
|
471
|
|
-// TbSubscription subscription = sessionSubscriptions.remove(subscriptionId);
|
472
|
|
-// if (subscription != null) {
|
473
|
|
-// if (sessionSubscriptions.isEmpty()) {
|
474
|
|
-// subscriptionsBySessionId.remove(sessionId);
|
475
|
|
-// }
|
476
|
|
-// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
|
477
|
|
-// if (currentPartitions.contains(tpi)) {
|
478
|
|
-// // Subscription is managed on the same server;
|
479
|
|
-// subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY);
|
480
|
|
-// } else {
|
481
|
|
-// // Push to the queue;
|
482
|
|
-// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription);
|
483
|
|
-// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
|
484
|
|
-// }
|
485
|
|
-// } else {
|
486
|
|
-// log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
|
487
|
|
-// }
|
488
|
|
-// } else {
|
489
|
|
-// log.debug("[{}] No session subscriptions found!", sessionId);
|
490
|
|
-// }
|
491
|
|
-// }
|
492
|
|
-
|
493
|
452
|
@Override
|
494
|
453
|
public void cancelAllSessionSubscriptions(String sessionId) {
|
495
|
|
-// Map<Integer, TbSubscription> subscriptions = subscriptionsBySessionId.get(sessionId);
|
496
|
|
-// if (subscriptions != null) {
|
497
|
|
-// Set<Integer> toRemove = new HashSet<>(subscriptions.keySet());
|
498
|
|
-// toRemove.forEach(id -> cancelSubscription(sessionId, id));
|
499
|
|
-// }
|
500
|
|
- }
|
501
|
|
-
|
502
|
|
- private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) {
|
503
|
|
- EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId()));
|
504
|
|
-
|
505
|
|
- Map<String, Long> keyStates;
|
506
|
|
- if (subscription.isAllKeys()) {
|
507
|
|
- keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
|
508
|
|
- } else {
|
509
|
|
- keyStates = subscription.getKeyStates().entrySet()
|
510
|
|
- .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
|
511
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
454
|
+ Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.remove(sessionId);
|
|
455
|
+ if (sessionSubs != null) {
|
|
456
|
+ sessionSubs.values().forEach(this::cleanupAndCancel);
|
512
|
457
|
}
|
513
|
|
-
|
514
|
|
- return TbTimeseriesSubscription.builder()
|
515
|
|
- .serviceId(subscription.getServiceId())
|
516
|
|
- .sessionId(subscription.getSessionId())
|
517
|
|
- .subscriptionId(subscription.getSubscriptionId())
|
518
|
|
- .tenantId(subscription.getTenantId())
|
519
|
|
- .entityId(entityView.getEntityId())
|
520
|
|
- .startTime(entityView.getStartTimeMs())
|
521
|
|
- .endTime(entityView.getEndTimeMs())
|
522
|
|
- .allKeys(false)
|
523
|
|
- .keyStates(keyStates).build();
|
524
|
458
|
}
|
525
|
459
|
|
526
|
460
|
private int getLimit(int limit) {
|
...
|
...
|
|