Commit 1e1b58f072372334b1c947c33f8c5ff0a63e29b1

Authored by Andrii Shvaika
1 parent db00c2ee

TMP commit

... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -84,7 +84,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
84 84
85 85 private static final int DEFAULT_LIMIT = 100;
86 86 private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
87   - private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>();
  87 + private final Map<String, Map<Integer, TbEntityDataSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<>();
88 88
89 89 @Autowired
90 90 private TelemetryWebSocketService wsService;
... ... @@ -155,39 +155,87 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
155 155
156 156 @Override
157 157 public void handleCmd(TelemetryWebSocketSessionRef session, EntityDataCmd cmd) {
  158 + TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
  159 + if (ctx != null) {
  160 + log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
  161 + //TODO: cleanup old subscription;
  162 + } else {
  163 + log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
  164 + ctx = createSubCtx(session, cmd);
  165 + }
  166 + if (cmd.getQuery() != null) {
  167 + if (ctx.getQuery() == null) {
  168 + log.debug("[{}][{}] Initializing data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery());
  169 + } else {
  170 + log.debug("[{}][{}] Updating data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery());
  171 + }
  172 + ctx.setQuery(cmd.getQuery());
  173 + TenantId tenantId = ctx.getTenantId();
  174 + CustomerId customerId = ctx.getCustomerId();
  175 + EntityDataQuery query = ctx.getQuery();
  176 + //Step 1. Update existing query with the contents of LatestValueCmd
  177 + if (cmd.getLatestCmd() != null) {
  178 + cmd.getLatestCmd().getKeys().forEach(key -> {
  179 + if (!query.getLatestValues().contains(key)) {
  180 + query.getLatestValues().add(key);
  181 + }
  182 + });
  183 + }
  184 + PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery());
  185 + ctx.setData(data);
  186 + }
  187 + ListenableFuture<TbEntityDataSubCtx> historyFuture;
158 188 if (cmd.getHistoryCmd() != null) {
159   - handleHistoryCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getHistoryCmd());
160   - } else if (cmd.getLatestCmd() != null) {
161   - handleLatestCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getLatestCmd());
  189 + historyFuture = handleHistoryCmd(ctx, cmd.getHistoryCmd());
162 190 } else {
163   - handleTimeseriesCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getTsCmd());
  191 + historyFuture = Futures.immediateFuture(ctx);
  192 + }
  193 + if (cmd.getLatestCmd() != null) {
  194 + Futures.addCallback(historyFuture, new FutureCallback<TbEntityDataSubCtx>() {
  195 + @Override
  196 + public void onSuccess(@Nullable TbEntityDataSubCtx theCtx) {
  197 + handleLatestCmd(theCtx, cmd.getLatestCmd());
  198 + }
  199 +
  200 + @Override
  201 + public void onFailure(Throwable t) {
  202 + log.warn("[{}][{}] Failed to process command", session.getSessionId(), cmd.getCmdId());
  203 + }
  204 + }, wsCallBackExecutor);
  205 + } else if (cmd.getTsCmd() != null) {
  206 + handleTimeseriesCmd(ctx, cmd.getTsCmd());
164 207 }
165 208 }
166 209
167   - private void handleTimeseriesCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, TimeSeriesCmd tsCmd) {
  210 + private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
  211 + Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
  212 + TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(sessionRef, cmd.getCmdId());
  213 + ctx.setQuery(cmd.getQuery());
  214 + sessionSubs.put(cmd.getCmdId(), ctx);
  215 + return ctx;
168 216 }
169 217
170   - private void handleLatestCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, LatestValueCmd latestCmd) {
171   - TenantId tenantId = session.getSecurityCtx().getTenantId();
172   - CustomerId customerId = session.getSecurityCtx().getCustomerId();
173   - //Step 1. Update existing query with the contents of LatestValueCmd
174   - latestCmd.getKeys().forEach(key -> {
175   - if (!query.getLatestValues().contains(key)) {
176   - query.getLatestValues().add(key);
177   - }
178   - });
  218 + private TbEntityDataSubCtx getSubCtx(String sessionId, int cmdId) {
  219 + Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.get(sessionId);
  220 + if (sessionSubs != null) {
  221 + return sessionSubs.get(cmdId);
  222 + } else {
  223 + return null;
  224 + }
  225 + }
179 226
180   - //Step 2. Fetch the initial data
181   - PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, query);
  227 + private void handleTimeseriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd tsCmd) {
  228 + }
182 229
183   - //Step 3. Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
  230 + private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
  231 + //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
184 232 if (!tsInSqlDB) {
185 233 List<String> allTsKeys = latestCmd.getKeys().stream()
186 234 .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES))
187 235 .map(EntityKey::getKey).collect(Collectors.toList());
188 236
189 237 Map<EntityData, ListenableFuture<Map<String, TsValue>>> missingTelemetryFurutes = new HashMap<>();
190   - for (EntityData entityData : data.getData()) {
  238 + for (EntityData entityData : ctx.getData().getData()) {
191 239 Map<EntityKeyType, Map<String, TsValue>> latestEntityData = entityData.getLatest();
192 240 Map<String, TsValue> tsEntityData = latestEntityData.get(EntityKeyType.TIME_SERIES);
193 241 Set<String> missingTsKeys = new LinkedHashSet<>(allTsKeys);
... ... @@ -198,7 +246,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
198 246 latestEntityData.put(EntityKeyType.TIME_SERIES, tsEntityData);
199 247 }
200 248
201   - ListenableFuture<List<TsKvEntry>> missingTsData = tsService.findLatest(tenantId, entityData.getEntityId(), missingTsKeys);
  249 + ListenableFuture<List<TsKvEntry>> missingTsData = tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys);
202 250 missingTelemetryFurutes.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor()));
203 251 }
204 252 Futures.addCallback(Futures.allAsList(missingTelemetryFurutes.values()), new FutureCallback<List<Map<String, TsValue>>>() {
... ... @@ -208,24 +256,31 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
208 256 try {
209 257 key.getLatest().get(EntityKeyType.TIME_SERIES).putAll(value.get());
210 258 } catch (InterruptedException | ExecutionException e) {
211   - log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", session.getSessionId(), cmdId, key.getEntityId(), allTsKeys, e);
  259 + log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), allTsKeys, e);
212 260 }
213 261 });
214   - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null);
215   - wsService.sendWsMsg(session.getSessionId(), update);
  262 + EntityDataUpdate update;
  263 + if (!ctx.isInitialDataSent()) {
  264 + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  265 + } else {
  266 + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
  267 + }
  268 + wsService.sendWsMsg(ctx.getSessionId(), update);
216 269 //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates.
217 270 }
218 271
219 272 @Override
220 273 public void onFailure(Throwable t) {
221   - log.warn("[{}][{}] Failed to process websocket command: {}:{}", session.getSessionId(), cmdId, query, latestCmd, t);
222   - wsService.sendWsMsg(session.getSessionId(),
223   - new EntityDataUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
  274 + log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), latestCmd, t);
  275 + wsService.sendWsMsg(ctx.getSessionId(),
  276 + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
224 277 }
225 278 }, wsCallBackExecutor);
226 279 } else {
227   - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null);
228   - wsService.sendWsMsg(session.getSessionId(), update);
  280 + if (!ctx.isInitialDataSent()) {
  281 + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  282 + wsService.sendWsMsg(ctx.getSessionId(), update);
  283 + }
229 284 //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates.
230 285 }
231 286 }
... ... @@ -234,17 +289,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
234 289 return data.stream().collect(Collectors.toMap(TsKvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString())));
235 290 }
236 291
237   - private void handleHistoryCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, EntityHistoryCmd historyCmd) {
238   - TenantId tenantId = session.getSecurityCtx().getTenantId();
239   - CustomerId customerId = session.getSecurityCtx().getCustomerId();
240   - PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, query);
  292 + private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) {
241 293 List<ReadTsKvQuery> tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
242 294 key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg()
243 295 )).collect(Collectors.toList());
244 296 Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();
245   - data.getData().forEach(entityData -> fetchResultMap.put(entityData,
246   - tsService.findAll(tenantId, entityData.getEntityId(), tsKvQueryList)));
247   - Futures.allAsList(fetchResultMap.values()).addListener(() -> {
  297 + ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,
  298 + tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), tsKvQueryList)));
  299 + return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
248 300 fetchResultMap.forEach((entityData, future) -> {
249 301 Map<String, List<TsValue>> keyData = new LinkedHashMap<>();
250 302 historyCmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>()));
... ... @@ -255,20 +307,21 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
255 307 }
256 308 keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
257 309 } catch (InterruptedException | ExecutionException e) {
258   - log.warn("[{}][{}][{}] Failed to fetch historical data", session.getSessionId(), cmdId, entityData.getEntityId(), e);
259   - wsService.sendWsMsg(session.getSessionId(),
260   - new EntityDataUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
  310 + log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
  311 + wsService.sendWsMsg(ctx.getSessionId(),
  312 + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
261 313 }
262 314 });
263   - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null);
264   - wsService.sendWsMsg(session.getSessionId(), update);
  315 + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  316 + wsService.sendWsMsg(ctx.getSessionId(), update);
  317 + ctx.setInitialDataSent(true);
  318 + return ctx;
265 319 }, wsCallBackExecutor);
266 320 }
267 321
268   -
269 322 @Override
270   - public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd subscriptionId) {
271   -
  323 + public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) {
  324 + TbEntityDataSubCtx ctx = getSubCtx(sessionId, cmd.getCmdId());
272 325 }
273 326
274 327 // //TODO 3.1: replace null callbacks with callbacks from websocket service.
... ... @@ -377,11 +430,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
377 430 .keyStates(keyStates).build();
378 431 }
379 432
380   - private void registerSubscription(TbSubscription subscription) {
381   - Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
382   - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
383   - }
384   -
385 433 private int getLimit(int limit) {
386 434 return limit == 0 ? DEFAULT_LIMIT : limit;
387 435 }
... ...
  1 +package org.thingsboard.server.service.subscription;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.id.CustomerId;
  5 +import org.thingsboard.server.common.data.id.TenantId;
  6 +import org.thingsboard.server.common.data.page.PageData;
  7 +import org.thingsboard.server.common.data.query.EntityData;
  8 +import org.thingsboard.server.common.data.query.EntityDataQuery;
  9 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
  10 +import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
  11 +import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
  12 +
  13 +@Data
  14 +public class TbEntityDataSubCtx {
  15 +
  16 + private final TelemetryWebSocketSessionRef sessionRef;
  17 + private final int cmdId;
  18 + private EntityDataQuery query;
  19 + private LatestValueCmd latestCmd;
  20 + private TimeSeriesCmd tsCmd;
  21 + private PageData<EntityData> data;
  22 + private boolean initialDataSent;
  23 +
  24 + public TbEntityDataSubCtx(TelemetryWebSocketSessionRef sessionRef, int cmdId) {
  25 + this.sessionRef = sessionRef;
  26 + this.cmdId = cmdId;
  27 + }
  28 +
  29 + public String getSessionId() {
  30 + return sessionRef.getSessionId();
  31 + }
  32 +
  33 + public TenantId getTenantId() {
  34 + return sessionRef.getSecurityCtx().getTenantId();
  35 + }
  36 +
  37 + public CustomerId getCustomerId() {
  38 + return sessionRef.getSecurityCtx().getCustomerId();
  39 + }
  40 +
  41 +
  42 + public void setData(PageData<EntityData> data) {
  43 + this.data = data;
  44 + }
  45 +
  46 +}
... ...