Commit e276c9a936c7f0e9d16b7145ebf3367c6bd06c52

Authored by Andrii Shvaika
1 parent 46fec515

Initial WebSocker API

@@ -49,6 +49,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -49,6 +49,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
49 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; 49 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
50 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 50 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
51 import org.thingsboard.server.queue.discovery.PartitionService; 51 import org.thingsboard.server.queue.discovery.PartitionService;
  52 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
52 import org.thingsboard.server.queue.util.TbCoreComponent; 53 import org.thingsboard.server.queue.util.TbCoreComponent;
53 import org.thingsboard.server.service.queue.TbClusterService; 54 import org.thingsboard.server.service.queue.TbClusterService;
54 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; 55 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
@@ -106,18 +107,28 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -106,18 +107,28 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
106 private SubscriptionManagerService subscriptionManagerService; 107 private SubscriptionManagerService subscriptionManagerService;
107 108
108 @Autowired 109 @Autowired
  110 + @Lazy
  111 + private TbLocalSubscriptionService localSubscriptionService;
  112 +
  113 + @Autowired
109 private TimeseriesService tsService; 114 private TimeseriesService tsService;
110 115
  116 + @Autowired
  117 + private TbServiceInfoProvider serviceInfoProvider;
  118 +
111 @Value("${database.ts.type}") 119 @Value("${database.ts.type}")
112 private String databaseTsType; 120 private String databaseTsType;
113 121
114 private ExecutorService wsCallBackExecutor; 122 private ExecutorService wsCallBackExecutor;
115 private boolean tsInSqlDB; 123 private boolean tsInSqlDB;
  124 + private String serviceId;
116 125
117 @PostConstruct 126 @PostConstruct
118 public void initExecutor() { 127 public void initExecutor() {
  128 + serviceId = serviceInfoProvider.getServiceId();
119 wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback")); 129 wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback"));
120 tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale"); 130 tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale");
  131 +
121 } 132 }
122 133
123 @PreDestroy 134 @PreDestroy
@@ -158,6 +169,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -158,6 +169,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
158 TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); 169 TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
159 if (ctx != null) { 170 if (ctx != null) {
160 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); 171 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
  172 + if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) {
  173 + ctx.clearSubscriptions();
  174 + }
161 //TODO: cleanup old subscription; 175 //TODO: cleanup old subscription;
162 } else { 176 } else {
163 log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); 177 log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
@@ -209,7 +223,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -209,7 +223,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
209 223
210 private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { 224 private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
211 Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); 225 Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
212 - TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(sessionRef, cmd.getCmdId()); 226 + TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId());
213 ctx.setQuery(cmd.getQuery()); 227 ctx.setQuery(cmd.getQuery());
214 sessionSubs.put(cmd.getCmdId(), ctx); 228 sessionSubs.put(cmd.getCmdId(), ctx);
215 return ctx; 229 return ctx;
@@ -266,7 +280,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -266,7 +280,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
266 update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); 280 update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
267 } 281 }
268 wsService.sendWsMsg(ctx.getSessionId(), update); 282 wsService.sendWsMsg(ctx.getSessionId(), update);
269 - //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates. 283 + createLatestSubscriptions(ctx, latestCmd);
270 } 284 }
271 285
272 @Override 286 @Override
@@ -281,10 +295,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -281,10 +295,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
281 EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); 295 EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
282 wsService.sendWsMsg(ctx.getSessionId(), update); 296 wsService.sendWsMsg(ctx.getSessionId(), update);
283 } 297 }
284 - //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates. 298 + createLatestSubscriptions(ctx, latestCmd);
285 } 299 }
286 } 300 }
287 301
  302 + private void createLatestSubscriptions(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
  303 + //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates.
  304 + List<TbSubscription> tbSubs = ctx.createSubscriptions(latestCmd.getKeys());
  305 + tbSubs.forEach(sub -> localSubscriptionService.addSubscription(sub));
  306 + }
  307 +
288 private Map<String, TsValue> toTsValue(List<TsKvEntry> data) { 308 private Map<String, TsValue> toTsValue(List<TsKvEntry> data) {
289 return data.stream().collect(Collectors.toMap(TsKvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString()))); 309 return data.stream().collect(Collectors.toMap(TsKvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString())));
290 } 310 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -59,9 +59,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -59,9 +59,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
59 private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>(); 59 private final Map<String, Map<Integer, TbSubscription>> subscriptionsBySessionId = new ConcurrentHashMap<>();
60 60
61 @Autowired 61 @Autowired
62 - private TelemetryWebSocketService wsService;  
63 -  
64 - @Autowired  
65 private EntityViewService entityViewService; 62 private EntityViewService entityViewService;
66 63
67 @Autowired 64 @Autowired
@@ -155,7 +152,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -155,7 +152,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
155 update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value)); 152 update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value));
156 break; 153 break;
157 } 154 }
158 - wsService.sendWsMsg(sessionId, update); 155 + subscription.getUpdateConsumer().accept(sessionId, update);
159 } 156 }
160 callback.onSuccess(); 157 callback.onSuccess();
161 } 158 }
@@ -20,8 +20,10 @@ import lombok.Data; @@ -20,8 +20,10 @@ import lombok.Data;
20 import lombok.Getter; 20 import lombok.Getter;
21 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
22 import org.thingsboard.server.common.data.id.TenantId; 22 import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
23 24
24 import java.util.Map; 25 import java.util.Map;
  26 +import java.util.function.BiConsumer;
25 27
26 public class TbAttributeSubscription extends TbSubscription { 28 public class TbAttributeSubscription extends TbSubscription {
27 29
@@ -31,8 +33,9 @@ public class TbAttributeSubscription extends TbSubscription { @@ -31,8 +33,9 @@ public class TbAttributeSubscription extends TbSubscription {
31 33
32 @Builder 34 @Builder
33 public TbAttributeSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 35 public TbAttributeSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
  36 + BiConsumer<String, SubscriptionUpdate> updateConsumer,
34 boolean allKeys, Map<String, Long> keyStates, TbAttributeSubscriptionScope scope) { 37 boolean allKeys, Map<String, Long> keyStates, TbAttributeSubscriptionScope scope) {
35 - super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES); 38 + super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES, updateConsumer);
36 this.allKeys = allKeys; 39 this.allKeys = allKeys;
37 this.keyStates = keyStates; 40 this.keyStates = keyStates;
38 this.scope = scope; 41 this.scope = scope;
@@ -2,17 +2,35 @@ package org.thingsboard.server.service.subscription; @@ -2,17 +2,35 @@ package org.thingsboard.server.service.subscription;
2 2
3 import lombok.Data; 3 import lombok.Data;
4 import org.thingsboard.server.common.data.id.CustomerId; 4 import org.thingsboard.server.common.data.id.CustomerId;
  5 +import org.thingsboard.server.common.data.id.EntityId;
5 import org.thingsboard.server.common.data.id.TenantId; 6 import org.thingsboard.server.common.data.id.TenantId;
6 import org.thingsboard.server.common.data.page.PageData; 7 import org.thingsboard.server.common.data.page.PageData;
7 import org.thingsboard.server.common.data.query.EntityData; 8 import org.thingsboard.server.common.data.query.EntityData;
8 import org.thingsboard.server.common.data.query.EntityDataQuery; 9 import org.thingsboard.server.common.data.query.EntityDataQuery;
  10 +import org.thingsboard.server.common.data.query.EntityKey;
  11 +import org.thingsboard.server.common.data.query.EntityKeyType;
  12 +import org.thingsboard.server.common.data.query.TsValue;
  13 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
9 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; 14 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
  15 +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
10 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; 16 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
11 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; 17 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
  18 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
  19 +
  20 +import java.util.ArrayList;
  21 +import java.util.Arrays;
  22 +import java.util.Collections;
  23 +import java.util.HashMap;
  24 +import java.util.List;
  25 +import java.util.Map;
  26 +import java.util.stream.Collectors;
12 27
13 @Data 28 @Data
14 public class TbEntityDataSubCtx { 29 public class TbEntityDataSubCtx {
15 30
  31 + public static final int MAX_SUBS_PER_CMD = 1024 * 8;
  32 + private final String serviceId;
  33 + private final TelemetryWebSocketService wsService;
16 private final TelemetryWebSocketSessionRef sessionRef; 34 private final TelemetryWebSocketSessionRef sessionRef;
17 private final int cmdId; 35 private final int cmdId;
18 private EntityDataQuery query; 36 private EntityDataQuery query;
@@ -20,8 +38,13 @@ public class TbEntityDataSubCtx { @@ -20,8 +38,13 @@ public class TbEntityDataSubCtx {
20 private TimeSeriesCmd tsCmd; 38 private TimeSeriesCmd tsCmd;
21 private PageData<EntityData> data; 39 private PageData<EntityData> data;
22 private boolean initialDataSent; 40 private boolean initialDataSent;
  41 + private List<TbSubscription> tbSubs;
  42 + private int internalSubIdx;
  43 + private Map<Integer, EntityId> subToEntityIdMap;
23 44
24 - public TbEntityDataSubCtx(TelemetryWebSocketSessionRef sessionRef, int cmdId) { 45 + public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
  46 + this.serviceId = serviceId;
  47 + this.wsService = wsService;
25 this.sessionRef = sessionRef; 48 this.sessionRef = sessionRef;
26 this.cmdId = cmdId; 49 this.cmdId = cmdId;
27 } 50 }
@@ -38,9 +61,79 @@ public class TbEntityDataSubCtx { @@ -38,9 +61,79 @@ public class TbEntityDataSubCtx {
38 return sessionRef.getSecurityCtx().getCustomerId(); 61 return sessionRef.getSecurityCtx().getCustomerId();
39 } 62 }
40 63
41 -  
42 public void setData(PageData<EntityData> data) { 64 public void setData(PageData<EntityData> data) {
43 this.data = data; 65 this.data = data;
44 } 66 }
45 67
  68 + public List<TbSubscription> createSubscriptions(List<EntityKey> keys) {
  69 + this.subToEntityIdMap = new HashMap<>();
  70 + this.internalSubIdx = cmdId * MAX_SUBS_PER_CMD;
  71 + tbSubs = new ArrayList<>();
  72 + List<EntityKey> attrSubKeys = new ArrayList<>();
  73 + List<EntityKey> tsSubKeys = new ArrayList<>();
  74 + for (EntityKey key : keys) {
  75 + switch (key.getType()) {
  76 + case TIME_SERIES:
  77 + tsSubKeys.add(key);
  78 + break;
  79 + case ATTRIBUTE:
  80 + case CLIENT_ATTRIBUTE:
  81 + case SHARED_ATTRIBUTE:
  82 + case SERVER_ATTRIBUTE:
  83 + attrSubKeys.add(key);
  84 + }
  85 + }
  86 + for (EntityData entityData : data.getData()) {
  87 + if (!tsSubKeys.isEmpty()) {
  88 + tbSubs.add(createTsSub(entityData, tsSubKeys));
  89 + }
  90 + }
  91 + return tbSubs;
  92 + }
  93 +
  94 + private TbSubscription createTsSub(EntityData entityData, List<EntityKey> tsSubKeys) {
  95 + int subIdx = internalSubIdx++;
  96 + subToEntityIdMap.put(subIdx, entityData.getEntityId());
  97 + Map<String, Long> keyStates = new HashMap<>();
  98 + tsSubKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
  99 + if (entityData.getLatest() != null) {
  100 + Map<String, TsValue> currentValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
  101 + if (currentValues != null) {
  102 + currentValues.forEach((k, v) -> keyStates.put(k, v.getTs()));
  103 + }
  104 + }
  105 + if (entityData.getTimeseries() != null) {
  106 + entityData.getTimeseries().forEach((k, v) -> keyStates.put(k, Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L)));
  107 + }
  108 +
  109 + return TbTimeseriesSubscription.builder()
  110 + .serviceId(serviceId)
  111 + .sessionId(sessionRef.getSessionId())
  112 + .subscriptionId(subIdx)
  113 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  114 + .entityId(entityData.getEntityId())
  115 + .updateConsumer(this::sendTsWsMsg)
  116 + .allKeys(false)
  117 + .keyStates(keyStates).build();
  118 + }
  119 +
  120 +
  121 + private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) {
  122 + EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
  123 + if (entityId != null) {
  124 + Map<String, TsValue> latest = new HashMap<>();
  125 + subscriptionUpdate.getData().forEach((k, v) -> {
  126 + Object[] data = (Object[]) v.get(0);
  127 + latest.put(k, new TsValue((Long) data[0], (String) data[1]));
  128 + });
  129 + Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latest);
  130 + EntityData entityData = new EntityData(entityId, latestMap, null);
  131 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  132 + }
  133 +
  134 + }
  135 +
  136 + public void clearSubscriptions() {
  137 + subToEntityIdMap.clear();
  138 + }
46 } 139 }
@@ -19,8 +19,10 @@ import lombok.AllArgsConstructor; @@ -19,8 +19,10 @@ import lombok.AllArgsConstructor;
19 import lombok.Data; 19 import lombok.Data;
20 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
22 23
23 import java.util.Objects; 24 import java.util.Objects;
  25 +import java.util.function.BiConsumer;
24 26
25 @Data 27 @Data
26 @AllArgsConstructor 28 @AllArgsConstructor
@@ -32,6 +34,7 @@ public abstract class TbSubscription { @@ -32,6 +34,7 @@ public abstract class TbSubscription {
32 private final TenantId tenantId; 34 private final TenantId tenantId;
33 private final EntityId entityId; 35 private final EntityId entityId;
34 private final TbSubscriptionType type; 36 private final TbSubscriptionType type;
  37 + private final BiConsumer<String, SubscriptionUpdate> updateConsumer;
35 38
36 @Override 39 @Override
37 public boolean equals(Object o) { 40 public boolean equals(Object o) {
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,27 @@ import lombok.Builder; @@ -19,20 +19,27 @@ import lombok.Builder;
19 import lombok.Getter; 19 import lombok.Getter;
20 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
22 23
23 import java.util.Map; 24 import java.util.Map;
  25 +import java.util.function.BiConsumer;
24 26
25 public class TbTimeseriesSubscription extends TbSubscription { 27 public class TbTimeseriesSubscription extends TbSubscription {
26 28
27 - @Getter private final boolean allKeys;  
28 - @Getter private final Map<String, Long> keyStates;  
29 - @Getter private final long startTime;  
30 - @Getter private final long endTime; 29 + @Getter
  30 + private final boolean allKeys;
  31 + @Getter
  32 + private final Map<String, Long> keyStates;
  33 + @Getter
  34 + private final long startTime;
  35 + @Getter
  36 + private final long endTime;
31 37
32 @Builder 38 @Builder
33 public TbTimeseriesSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 39 public TbTimeseriesSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
  40 + BiConsumer<String, SubscriptionUpdate> updateConsumer,
34 boolean allKeys, Map<String, Long> keyStates, long startTime, long endTime) { 41 boolean allKeys, Map<String, Long> keyStates, long startTime, long endTime) {
35 - super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES); 42 + super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES, updateConsumer);
36 this.allKeys = allKeys; 43 this.allKeys = allKeys;
37 this.keyStates = keyStates; 44 this.keyStates = keyStates;
38 this.startTime = startTime; 45 this.startTime = startTime;
@@ -86,6 +86,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -86,6 +86,7 @@ import java.util.concurrent.ConcurrentHashMap;
86 import java.util.concurrent.ConcurrentMap; 86 import java.util.concurrent.ConcurrentMap;
87 import java.util.concurrent.ExecutorService; 87 import java.util.concurrent.ExecutorService;
88 import java.util.concurrent.Executors; 88 import java.util.concurrent.Executors;
  89 +import java.util.function.BiConsumer;
89 import java.util.function.Consumer; 90 import java.util.function.Consumer;
90 import java.util.stream.Collectors; 91 import java.util.stream.Collectors;
91 92
@@ -129,6 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -129,6 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
129 @Autowired 130 @Autowired
130 private TbServiceInfoProvider serviceInfoProvider; 131 private TbServiceInfoProvider serviceInfoProvider;
131 132
  133 +
132 @Value("${server.ws.limits.max_subscriptions_per_tenant:0}") 134 @Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
133 private int maxSubscriptionsPerTenant; 135 private int maxSubscriptionsPerTenant;
134 @Value("${server.ws.limits.max_subscriptions_per_customer:0}") 136 @Value("${server.ws.limits.max_subscriptions_per_customer:0}")
@@ -398,7 +400,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -398,7 +400,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
398 .entityId(entityId) 400 .entityId(entityId)
399 .allKeys(false) 401 .allKeys(false)
400 .keyStates(subState) 402 .keyStates(subState)
401 - .scope(scope).build(); 403 + .scope(scope)
  404 + .updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
  405 + .build();
402 oldSubService.addSubscription(sub); 406 oldSubService.addSubscription(sub);
403 } 407 }
404 408
@@ -495,6 +499,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -495,6 +499,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
495 .entityId(entityId) 499 .entityId(entityId)
496 .allKeys(true) 500 .allKeys(true)
497 .keyStates(subState) 501 .keyStates(subState)
  502 + .updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
498 .scope(scope).build(); 503 .scope(scope).build();
499 oldSubService.addSubscription(sub); 504 oldSubService.addSubscription(sub);
500 } 505 }
@@ -575,6 +580,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -575,6 +580,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
575 .subscriptionId(cmd.getCmdId()) 580 .subscriptionId(cmd.getCmdId())
576 .tenantId(sessionRef.getSecurityCtx().getTenantId()) 581 .tenantId(sessionRef.getSecurityCtx().getTenantId())
577 .entityId(entityId) 582 .entityId(entityId)
  583 + .updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
578 .allKeys(true) 584 .allKeys(true)
579 .keyStates(subState).build(); 585 .keyStates(subState).build();
580 oldSubService.addSubscription(sub); 586 oldSubService.addSubscription(sub);
@@ -612,6 +618,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -612,6 +618,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
612 .subscriptionId(cmd.getCmdId()) 618 .subscriptionId(cmd.getCmdId())
613 .tenantId(sessionRef.getSecurityCtx().getTenantId()) 619 .tenantId(sessionRef.getSecurityCtx().getTenantId())
614 .entityId(entityId) 620 .entityId(entityId)
  621 + .updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
615 .allKeys(false) 622 .allKeys(false)
616 .keyStates(subState).build(); 623 .keyStates(subState).build();
617 oldSubService.addSubscription(sub); 624 oldSubService.addSubscription(sub);
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,9 @@ @@ -15,6 +15,9 @@
15 */ 15 */
16 package org.thingsboard.server.controller; 16 package org.thingsboard.server.controller;
17 17
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.checkerframework.checker.nullness.qual.Nullable;
18 import org.junit.After; 21 import org.junit.After;
19 import org.junit.Assert; 22 import org.junit.Assert;
20 import org.junit.Before; 23 import org.junit.Before;
@@ -38,6 +41,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType; @@ -38,6 +41,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
38 import org.thingsboard.server.common.data.query.TsValue; 41 import org.thingsboard.server.common.data.query.TsValue;
39 import org.thingsboard.server.common.data.security.Authority; 42 import org.thingsboard.server.common.data.security.Authority;
40 import org.thingsboard.server.dao.timeseries.TimeseriesService; 43 import org.thingsboard.server.dao.timeseries.TimeseriesService;
  44 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
41 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; 45 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
42 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; 46 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
43 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; 47 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
@@ -46,10 +50,13 @@ import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; @@ -46,10 +50,13 @@ import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
46 50
47 import java.util.Arrays; 51 import java.util.Arrays;
48 import java.util.Collections; 52 import java.util.Collections;
  53 +import java.util.List;
  54 +import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit; 55 import java.util.concurrent.TimeUnit;
50 56
51 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; 57 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
52 58
  59 +@Slf4j
53 public class BaseWebsocketApiTest extends AbstractWebsocketTest { 60 public class BaseWebsocketApiTest extends AbstractWebsocketTest {
54 61
55 private Tenant savedTenant; 62 private Tenant savedTenant;
@@ -57,7 +64,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -57,7 +64,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
57 private TbTestWebSocketClient wsClient; 64 private TbTestWebSocketClient wsClient;
58 65
59 @Autowired 66 @Autowired
60 - private TimeseriesService tsService; 67 + private TelemetrySubscriptionService tsService;
61 68
62 @Before 69 @Before
63 public void beforeTest() throws Exception { 70 public void beforeTest() throws Exception {
@@ -129,7 +136,10 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -129,7 +136,10 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
129 TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L)); 136 TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L));
130 TsKvEntry dataPoint2 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(2), new LongDataEntry("temperature", 42L)); 137 TsKvEntry dataPoint2 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(2), new LongDataEntry("temperature", 42L));
131 TsKvEntry dataPoint3 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(3), new LongDataEntry("temperature", 42L)); 138 TsKvEntry dataPoint3 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(3), new LongDataEntry("temperature", 42L));
132 - tsService.save(device.getTenantId(), device.getId(), Arrays.asList(dataPoint1, dataPoint2, dataPoint3), 0).get(); 139 + List<TsKvEntry> tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3);
  140 +
  141 + sendTelemetry(device, tsData);
  142 + Thread.sleep(1000);
133 143
134 wsClient.send(mapper.writeValueAsString(wrapper)); 144 wsClient.send(mapper.writeValueAsString(wrapper));
135 msg = wsClient.waitForReply(); 145 msg = wsClient.waitForReply();
@@ -146,6 +156,22 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -146,6 +156,22 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
146 Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]); 156 Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]);
147 } 157 }
148 158
  159 + private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
  160 + CountDownLatch latch = new CountDownLatch(1);
  161 + tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
  162 + @Override
  163 + public void onSuccess(@Nullable Void result) {
  164 + latch.countDown();
  165 + }
  166 +
  167 + @Override
  168 + public void onFailure(Throwable t) {
  169 + latch.countDown();
  170 + }
  171 + });
  172 + latch.await(3, TimeUnit.SECONDS);
  173 + }
  174 +
149 @Test 175 @Test
150 @Ignore 176 @Ignore
151 public void testEntityDataLatestWsCmd() throws Exception { 177 public void testEntityDataLatestWsCmd() throws Exception {
@@ -177,12 +203,15 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -177,12 +203,15 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
177 Assert.assertNotNull(pageData); 203 Assert.assertNotNull(pageData);
178 Assert.assertEquals(1, pageData.getData().size()); 204 Assert.assertEquals(1, pageData.getData().size());
179 Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId()); 205 Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
180 - Assert.assertNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature")); 206 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature"));
  207 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getTs());
  208 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getValue());
181 209
182 TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L)); 210 TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L));
183 - tsService.save(device.getTenantId(), device.getId(), Arrays.asList(dataPoint1), 0).get(); 211 + List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
  212 + sendTelemetry(device, tsData);
184 213
185 - cmd = new EntityDataCmd(2, edq, null, latestCmd, null); 214 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
186 215
187 wrapper = new TelemetryPluginCmdsWrapper(); 216 wrapper = new TelemetryPluginCmdsWrapper();
188 wrapper.setEntityDataCmds(Collections.singletonList(cmd)); 217 wrapper.setEntityDataCmds(Collections.singletonList(cmd));
@@ -190,7 +219,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -190,7 +219,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
190 wsClient.send(mapper.writeValueAsString(wrapper)); 219 wsClient.send(mapper.writeValueAsString(wrapper));
191 msg = wsClient.waitForReply(); 220 msg = wsClient.waitForReply();
192 update = mapper.readValue(msg, EntityDataUpdate.class); 221 update = mapper.readValue(msg, EntityDataUpdate.class);
193 - Assert.assertEquals(2, update.getCmdId()); 222 + Assert.assertEquals(1, update.getCmdId());
194 pageData = update.getData(); 223 pageData = update.getData();
195 Assert.assertNotNull(pageData); 224 Assert.assertNotNull(pageData);
196 Assert.assertEquals(1, pageData.getData().size()); 225 Assert.assertEquals(1, pageData.getData().size());
@@ -198,6 +227,22 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -198,6 +227,22 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
198 Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES)); 227 Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES));
199 TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature"); 228 TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
200 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue); 229 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue);
  230 +
  231 + log.error("GOING TO LISTEN FOR UPDATES");
  232 + msg = wsClient.waitForUpdate();
  233 + now = System.currentTimeMillis();
  234 + TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L));
  235 + sendTelemetry(device, Arrays.asList(dataPoint2));
  236 +
  237 + update = mapper.readValue(msg, EntityDataUpdate.class);
  238 + Assert.assertEquals(1, update.getCmdId());
  239 + List<EntityData> eData = update.getUpdate();
  240 + Assert.assertNotNull(eData);
  241 + Assert.assertEquals(1, eData.size());
  242 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  243 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
  244 + tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
  245 + Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsValue);
201 } 246 }
202 247
203 } 248 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,9 +27,11 @@ import java.util.concurrent.TimeUnit; @@ -27,9 +27,11 @@ import java.util.concurrent.TimeUnit;
27 @Slf4j 27 @Slf4j
28 public class TbTestWebSocketClient extends WebSocketClient { 28 public class TbTestWebSocketClient extends WebSocketClient {
29 29
30 - private volatile String lastMsg; 30 + private volatile String lastReply;
  31 + private volatile String lastUpdate;
31 private volatile boolean replyReceived; 32 private volatile boolean replyReceived;
32 private CountDownLatch reply; 33 private CountDownLatch reply;
  34 + private CountDownLatch update;
33 35
34 public TbTestWebSocketClient(URI serverUri) { 36 public TbTestWebSocketClient(URI serverUri) {
35 super(serverUri); 37 super(serverUri);
@@ -42,11 +44,22 @@ public class TbTestWebSocketClient extends WebSocketClient { @@ -42,11 +44,22 @@ public class TbTestWebSocketClient extends WebSocketClient {
42 44
43 @Override 45 @Override
44 public void onMessage(String s) { 46 public void onMessage(String s) {
45 - if (!replyReceived) {  
46 - replyReceived = true;  
47 - lastMsg = s;  
48 - if (reply != null) {  
49 - reply.countDown(); 47 + log.error("RECEIVED: {}", s);
  48 + synchronized (this) {
  49 + if (!replyReceived) {
  50 + replyReceived = true;
  51 + lastReply = s;
  52 + log.error("LAST REPLY: {}", s);
  53 + if (reply != null) {
  54 + reply.countDown();
  55 + }
  56 + } else {
  57 + lastUpdate = s;
  58 + log.error("LAST UPDATE: {}", s);
  59 + if (update == null) {
  60 + update = new CountDownLatch(1);
  61 + }
  62 + update.countDown();
50 } 63 }
51 } 64 }
52 } 65 }
@@ -63,17 +76,31 @@ public class TbTestWebSocketClient extends WebSocketClient { @@ -63,17 +76,31 @@ public class TbTestWebSocketClient extends WebSocketClient {
63 76
64 @Override 77 @Override
65 public void send(String text) throws NotYetConnectedException { 78 public void send(String text) throws NotYetConnectedException {
66 - reply = new CountDownLatch(1);  
67 - replyReceived = false; 79 + synchronized (this) {
  80 + reply = new CountDownLatch(1);
  81 + replyReceived = false;
  82 + }
68 super.send(text); 83 super.send(text);
69 } 84 }
70 85
  86 + public String waitForUpdate() {
  87 + synchronized (this) {
  88 + update = new CountDownLatch(1);
  89 + }
  90 + try {
  91 + update.await(3, TimeUnit.SECONDS);
  92 + } catch (InterruptedException e) {
  93 + log.warn("Failed to await reply", e);
  94 + }
  95 + return lastUpdate;
  96 + }
  97 +
71 public String waitForReply() { 98 public String waitForReply() {
72 try { 99 try {
73 reply.await(3, TimeUnit.SECONDS); 100 reply.await(3, TimeUnit.SECONDS);
74 } catch (InterruptedException e) { 101 } catch (InterruptedException e) {
75 log.warn("Failed to await reply", e); 102 log.warn("Failed to await reply", e);
76 } 103 }
77 - return lastMsg; 104 + return lastReply;
78 } 105 }
79 } 106 }
@@ -121,7 +121,7 @@ public class EntityKeyMapping { @@ -121,7 +121,7 @@ public class EntityKeyMapping {
121 String join = hasFilter() ? "left join" : "left outer join"; 121 String join = hasFilter() ? "left join" : "left outer join";
122 ctx.addStringParameter(alias + "_key_id", entityKey.getKey()); 122 ctx.addStringParameter(alias + "_key_id", entityKey.getKey());
123 if (entityKey.getType().equals(EntityKeyType.TIME_SERIES)) { 123 if (entityKey.getType().equals(EntityKeyType.TIME_SERIES)) {
124 - return String.format("%s ts_kv_latest %s ON %s.entity_id=to_uuid(entities.id) AND %s.key = (select key_id from ts_kv_dictionary where key = :%s_key_id)", 124 + return String.format("%s ts_kv_latest %s ON %s.entity_id=entities.id AND %s.key = (select key_id from ts_kv_dictionary where key = :%s_key_id)",
125 join, alias, alias, alias, alias); 125 join, alias, alias, alias, alias);
126 } else { 126 } else {
127 String query = String.format("%s attribute_kv %s ON %s.entity_id=entities.id AND %s.entity_type=%s AND %s.attribute_key=:%s_key_id", 127 String query = String.format("%s attribute_kv %s ON %s.entity_id=entities.id AND %s.entity_type=%s AND %s.attribute_key=:%s_key_id",