Commit 808707afa4db07ad39c679e3e3727154ac71c4fb

Authored by Andrii Shvaika
1 parent 51ce039a

Implementation and tests for Latest Subscription

... ... @@ -66,6 +66,7 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
66 66 import javax.annotation.PostConstruct;
67 67 import javax.annotation.PreDestroy;
68 68 import java.util.ArrayList;
  69 +import java.util.Collection;
69 70 import java.util.HashMap;
70 71 import java.util.LinkedHashMap;
71 72 import java.util.LinkedHashSet;
... ... @@ -169,7 +170,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
169 170 if (ctx != null) {
170 171 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
171 172 if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) {
172   - ctx.clearSubscriptions();
  173 + Collection<Integer> oldSubIds = ctx.clearSubscriptions();
  174 + oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
173 175 }
174 176 //TODO: cleanup old subscription;
175 177 } else {
... ... @@ -195,10 +197,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
195 197 });
196 198 }
197 199 PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery());
  200 + if (log.isTraceEnabled()) {
  201 + data.getData().forEach(ed -> {
  202 + log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed);
  203 + });
  204 + }
198 205 ctx.setData(data);
199 206 }
200 207 ListenableFuture<TbEntityDataSubCtx> historyFuture;
201 208 if (cmd.getHistoryCmd() != null) {
  209 + log.trace("[{}][{}] Going to process history command: {}", session.getSessionId(), cmd.getCmdId(), cmd.getHistoryCmd());
202 210 historyFuture = handleHistoryCmd(ctx, cmd.getHistoryCmd());
203 211 } else {
204 212 historyFuture = Futures.immediateFuture(ctx);
... ... @@ -241,8 +249,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
241 249 }
242 250
243 251 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
  252 + log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
244 253 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
245 254 if (!tsInSqlDB) {
  255 + log.trace("[{}][{}] Going to fetch missing latest values: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
246 256 List<String> allTsKeys = latestCmd.getKeys().stream()
247 257 .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES))
248 258 .map(EntityKey::getKey).collect(Collectors.toList());
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.subscription;
17 17
18 18 import lombok.Data;
  19 +import lombok.extern.slf4j.Slf4j;
19 20 import org.thingsboard.server.common.data.id.CustomerId;
20 21 import org.thingsboard.server.common.data.id.EntityId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -34,12 +35,13 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
34 35
35 36 import java.util.ArrayList;
36 37 import java.util.Arrays;
  38 +import java.util.Collection;
37 39 import java.util.Collections;
38 40 import java.util.HashMap;
39 41 import java.util.List;
40 42 import java.util.Map;
41   -import java.util.stream.Collectors;
42 43
  44 +@Slf4j
43 45 @Data
44 46 public class TbEntityDataSubCtx {
45 47
... ... @@ -54,7 +56,6 @@ public class TbEntityDataSubCtx {
54 56 private PageData<EntityData> data;
55 57 private boolean initialDataSent;
56 58 private List<TbSubscription> tbSubs;
57   - private int internalSubIdx;
58 59 private Map<Integer, EntityId> subToEntityIdMap;
59 60
60 61 public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
... ... @@ -82,7 +83,6 @@ public class TbEntityDataSubCtx {
82 83
83 84 public List<TbSubscription> createSubscriptions(List<EntityKey> keys) {
84 85 this.subToEntityIdMap = new HashMap<>();
85   - this.internalSubIdx = cmdId * MAX_SUBS_PER_CMD;
86 86 tbSubs = new ArrayList<>();
87 87 List<EntityKey> attrSubKeys = new ArrayList<>();
88 88 List<EntityKey> tsSubKeys = new ArrayList<>();
... ... @@ -107,20 +107,28 @@ public class TbEntityDataSubCtx {
107 107 }
108 108
109 109 private TbSubscription createTsSub(EntityData entityData, List<EntityKey> tsSubKeys) {
110   - int subIdx = internalSubIdx++;
  110 + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
111 111 subToEntityIdMap.put(subIdx, entityData.getEntityId());
112 112 Map<String, Long> keyStates = new HashMap<>();
113 113 tsSubKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
114 114 if (entityData.getLatest() != null) {
115 115 Map<String, TsValue> currentValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
116 116 if (currentValues != null) {
117   - currentValues.forEach((k, v) -> keyStates.put(k, v.getTs()));
  117 + currentValues.forEach((k, v) -> {
  118 + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, v.getTs());
  119 + keyStates.put(k, v.getTs());
  120 + });
118 121 }
119 122 }
120 123 if (entityData.getTimeseries() != null) {
121   - entityData.getTimeseries().forEach((k, v) -> keyStates.put(k, Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L)));
  124 + entityData.getTimeseries().forEach((k, v) -> {
  125 + long ts = Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L);
  126 + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, ts);
  127 + keyStates.put(k, ts);
  128 + });
122 129 }
123 130
  131 + log.trace("[{}][{}][{}] Creating subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
124 132 return TbTimeseriesSubscription.builder()
125 133 .serviceId(serviceId)
126 134 .sessionId(sessionRef.getSessionId())
... ... @@ -136,19 +144,48 @@ public class TbEntityDataSubCtx {
136 144 private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) {
137 145 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
138 146 if (entityId != null) {
139   - Map<String, TsValue> latest = new HashMap<>();
  147 + log.trace("[{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate);
  148 + Map<String, TsValue> latestUpdate = new HashMap<>();
140 149 subscriptionUpdate.getData().forEach((k, v) -> {
141 150 Object[] data = (Object[]) v.get(0);
142   - latest.put(k, new TsValue((Long) data[0], (String) data[1]));
  151 + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
143 152 });
144   - Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latest);
145   - EntityData entityData = new EntityData(entityId, latestMap, null);
146   - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  153 + EntityData entityData = getDataForEntity(entityId);
  154 + if (entityData != null && entityData.getLatest() != null) {
  155 + Map<String, TsValue> latestCtxValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
  156 + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
  157 + if (latestCtxValues != null) {
  158 + latestCtxValues.forEach((k, v) -> {
  159 + TsValue update = latestUpdate.get(k);
  160 + if (update.getTs() < v.getTs()) {
  161 + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  162 + latestUpdate.remove(k);
  163 + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
  164 + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  165 + latestUpdate.remove(k);
  166 + }
  167 + });
  168 + //Setting new values
  169 + latestUpdate.forEach(latestCtxValues::put);
  170 + }
  171 + }
  172 + if (!latestUpdate.isEmpty()) {
  173 + Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latestUpdate);
  174 + entityData = new EntityData(entityId, latestMap, null);
  175 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  176 + }
  177 + } else {
  178 + log.trace("[{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate);
147 179 }
  180 + }
148 181
  182 + private EntityData getDataForEntity(EntityId entityId) {
  183 + return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null);
149 184 }
150 185
151   - public void clearSubscriptions() {
  186 + public Collection<Integer> clearSubscriptions() {
  187 + List<Integer> oldSubIds = new ArrayList<>(subToEntityIdMap.keySet());
152 188 subToEntityIdMap.clear();
  189 + return oldSubIds;
153 190 }
154 191 }
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
20 20
21 21 import java.net.InetSocketAddress;
22 22 import java.util.Objects;
  23 +import java.util.concurrent.atomic.AtomicInteger;
23 24
24 25 /**
25 26 * Created by ashvayka on 27.03.18.
... ... @@ -36,12 +37,15 @@ public class TelemetryWebSocketSessionRef {
36 37 private final InetSocketAddress localAddress;
37 38 @Getter
38 39 private final InetSocketAddress remoteAddress;
  40 + @Getter
  41 + private final AtomicInteger sessionSubIdSeq;
39 42
40 43 public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
41 44 this.sessionId = sessionId;
42 45 this.securityCtx = securityCtx;
43 46 this.localAddress = localAddress;
44 47 this.remoteAddress = remoteAddress;
  48 + this.sessionSubIdSeq = new AtomicInteger();
45 49 }
46 50
47 51 @Override
... ...
... ... @@ -173,7 +173,6 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
173 173 }
174 174
175 175 @Test
176   - @Ignore
177 176 public void testEntityDataLatestWsCmd() throws Exception {
178 177 Device device = new Device();
179 178 device.setName("Device");
... ... @@ -212,8 +211,6 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
212 211 sendTelemetry(device, tsData);
213 212
214 213 cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
215   -
216   -
217 214 wrapper = new TelemetryPluginCmdsWrapper();
218 215 wrapper.setEntityDataCmds(Collections.singletonList(cmd));
219 216
... ... @@ -231,11 +228,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
231 228 TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
232 229 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue);
233 230
234   - log.error("GOING TO LISTEN FOR UPDATES");
235   - msg = wsClient.waitForUpdate();
236 231 now = System.currentTimeMillis();
237 232 TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L));
  233 +
  234 + wsClient.registerWaitForUpdate();
238 235 sendTelemetry(device, Arrays.asList(dataPoint2));
  236 + msg = wsClient.waitForUpdate();
239 237
240 238 update = mapper.readValue(msg, EntityDataUpdate.class);
241 239 Assert.assertEquals(1, update.getCmdId());
... ... @@ -247,6 +245,17 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
247 245 tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
248 246 Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsValue);
249 247
  248 + //Sending update from the past, while latest value has new timestamp;
  249 + wsClient.registerWaitForUpdate();
  250 + sendTelemetry(device, Arrays.asList(dataPoint1));
  251 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  252 + Assert.assertNull(msg);
  253 +
  254 + //Sending duplicate update again
  255 + wsClient.registerWaitForUpdate();
  256 + sendTelemetry(device, Arrays.asList(dataPoint2));
  257 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  258 + Assert.assertNull(msg);
250 259 }
251 260
252 261 }
... ...
... ... @@ -27,9 +27,7 @@ import java.util.concurrent.TimeUnit;
27 27 @Slf4j
28 28 public class TbTestWebSocketClient extends WebSocketClient {
29 29
30   - private volatile String lastReply;
31   - private volatile String lastUpdate;
32   - private volatile boolean replyReceived;
  30 + private volatile String lastMsg;
33 31 private CountDownLatch reply;
34 32 private CountDownLatch update;
35 33
... ... @@ -44,23 +42,13 @@ public class TbTestWebSocketClient extends WebSocketClient {
44 42
45 43 @Override
46 44 public void onMessage(String s) {
47   - log.error("RECEIVED: {}", s);
48   - synchronized (this) {
49   - if (!replyReceived) {
50   - replyReceived = true;
51   - lastReply = s;
52   - log.error("LAST REPLY: {}", s);
53   - if (reply != null) {
54   - reply.countDown();
55   - }
56   - } else {
57   - lastUpdate = s;
58   - log.error("LAST UPDATE: {}", s);
59   - if (update == null) {
60   - update = new CountDownLatch(1);
61   - }
62   - update.countDown();
63   - }
  45 + log.info("RECEIVED: {}", s);
  46 + lastMsg = s;
  47 + if (reply != null) {
  48 + reply.countDown();
  49 + }
  50 + if (update != null) {
  51 + update.countDown();
64 52 }
65 53 }
66 54
... ... @@ -74,25 +62,28 @@ public class TbTestWebSocketClient extends WebSocketClient {
74 62
75 63 }
76 64
  65 + public void registerWaitForUpdate() {
  66 + lastMsg = null;
  67 + update = new CountDownLatch(1);
  68 + }
  69 +
77 70 @Override
78 71 public void send(String text) throws NotYetConnectedException {
79   - synchronized (this) {
80   - reply = new CountDownLatch(1);
81   - replyReceived = false;
82   - }
  72 + reply = new CountDownLatch(1);
83 73 super.send(text);
84 74 }
85 75
86 76 public String waitForUpdate() {
87   - synchronized (this) {
88   - update = new CountDownLatch(1);
89   - }
  77 + return waitForUpdate(TimeUnit.SECONDS.toMillis(3));
  78 + }
  79 +
  80 + public String waitForUpdate(long ms) {
90 81 try {
91   - update.await(3, TimeUnit.SECONDS);
  82 + update.await(ms, TimeUnit.MILLISECONDS);
92 83 } catch (InterruptedException e) {
93 84 log.warn("Failed to await reply", e);
94 85 }
95   - return lastUpdate;
  86 + return lastMsg;
96 87 }
97 88
98 89 public String waitForReply() {
... ... @@ -101,6 +92,6 @@ public class TbTestWebSocketClient extends WebSocketClient {
101 92 } catch (InterruptedException e) {
102 93 log.warn("Failed to await reply", e);
103 94 }
104   - return lastReply;
  95 + return lastMsg;
105 96 }
106 97 }
... ...
... ... @@ -7,6 +7,8 @@
7 7 </encoder>
8 8 </appender>
9 9
  10 +<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
  11 + <logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
10 12 <logger name="org.thingsboard.server" level="WARN"/>
11 13 <logger name="org.springframework" level="WARN"/>
12 14 <logger name="org.springframework.boot.test" level="WARN"/>
... ...
... ... @@ -17,9 +17,11 @@ package org.thingsboard.server.common.data.query;
17 17
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import lombok.Getter;
  20 +import lombok.ToString;
20 21
21 22 import java.util.List;
22 23
  24 +@ToString
23 25 public class EntityDataQuery extends EntityCountQuery {
24 26
25 27 @Getter
... ...