Commit de88c843b65df14a6bb275b8db40f3a754f7a522

Authored by Igor Kulikov
2 parents dfaa6ab4 9e848962

Merge branch 'feature/entity-data-query' of github.com:thingsboard/thingsboard i…

…nto feature/entity-data-query
... ... @@ -66,6 +66,7 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
66 66 import javax.annotation.PostConstruct;
67 67 import javax.annotation.PreDestroy;
68 68 import java.util.ArrayList;
  69 +import java.util.Collection;
69 70 import java.util.HashMap;
70 71 import java.util.LinkedHashMap;
71 72 import java.util.LinkedHashSet;
... ... @@ -169,7 +170,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
169 170 if (ctx != null) {
170 171 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
171 172 if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) {
172   - ctx.clearSubscriptions();
  173 + Collection<Integer> oldSubIds = ctx.clearSubscriptions();
  174 + oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
173 175 }
174 176 //TODO: cleanup old subscription;
175 177 } else {
... ... @@ -195,10 +197,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
195 197 });
196 198 }
197 199 PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery());
  200 + if (log.isTraceEnabled()) {
  201 + data.getData().forEach(ed -> {
  202 + log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed);
  203 + });
  204 + }
198 205 ctx.setData(data);
199 206 }
200 207 ListenableFuture<TbEntityDataSubCtx> historyFuture;
201 208 if (cmd.getHistoryCmd() != null) {
  209 + log.trace("[{}][{}] Going to process history command: {}", session.getSessionId(), cmd.getCmdId(), cmd.getHistoryCmd());
202 210 historyFuture = handleHistoryCmd(ctx, cmd.getHistoryCmd());
203 211 } else {
204 212 historyFuture = Futures.immediateFuture(ctx);
... ... @@ -241,8 +249,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
241 249 }
242 250
243 251 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
  252 + log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
244 253 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
245 254 if (!tsInSqlDB) {
  255 + log.trace("[{}][{}] Going to fetch missing latest values: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
246 256 List<String> allTsKeys = latestCmd.getKeys().stream()
247 257 .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES))
248 258 .map(EntityKey::getKey).collect(Collectors.toList());
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.subscription;
17 17
18 18 import lombok.Data;
  19 +import lombok.extern.slf4j.Slf4j;
19 20 import org.thingsboard.server.common.data.id.CustomerId;
20 21 import org.thingsboard.server.common.data.id.EntityId;
21 22 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -34,12 +35,13 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
34 35
35 36 import java.util.ArrayList;
36 37 import java.util.Arrays;
  38 +import java.util.Collection;
37 39 import java.util.Collections;
38 40 import java.util.HashMap;
39 41 import java.util.List;
40 42 import java.util.Map;
41   -import java.util.stream.Collectors;
42 43
  44 +@Slf4j
43 45 @Data
44 46 public class TbEntityDataSubCtx {
45 47
... ... @@ -54,7 +56,6 @@ public class TbEntityDataSubCtx {
54 56 private PageData<EntityData> data;
55 57 private boolean initialDataSent;
56 58 private List<TbSubscription> tbSubs;
57   - private int internalSubIdx;
58 59 private Map<Integer, EntityId> subToEntityIdMap;
59 60
60 61 public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
... ... @@ -82,45 +83,61 @@ public class TbEntityDataSubCtx {
82 83
83 84 public List<TbSubscription> createSubscriptions(List<EntityKey> keys) {
84 85 this.subToEntityIdMap = new HashMap<>();
85   - this.internalSubIdx = cmdId * MAX_SUBS_PER_CMD;
86 86 tbSubs = new ArrayList<>();
87   - List<EntityKey> attrSubKeys = new ArrayList<>();
88   - List<EntityKey> tsSubKeys = new ArrayList<>();
89   - for (EntityKey key : keys) {
90   - switch (key.getType()) {
91   - case TIME_SERIES:
92   - tsSubKeys.add(key);
93   - break;
94   - case ATTRIBUTE:
95   - case CLIENT_ATTRIBUTE:
96   - case SHARED_ATTRIBUTE:
97   - case SERVER_ATTRIBUTE:
98   - attrSubKeys.add(key);
99   - }
100   - }
  87 + Map<EntityKeyType, List<EntityKey>> keysByType = new HashMap<>();
  88 + keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key));
101 89 for (EntityData entityData : data.getData()) {
102   - if (!tsSubKeys.isEmpty()) {
103   - tbSubs.add(createTsSub(entityData, tsSubKeys));
104   - }
  90 + keysByType.forEach((keysType, keysList) -> {
  91 + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
  92 + subToEntityIdMap.put(subIdx, entityData.getEntityId());
  93 + switch (keysType) {
  94 + case TIME_SERIES:
  95 + tbSubs.add(createTsSub(entityData, subIdx, keysList));
  96 + break;
  97 + case CLIENT_ATTRIBUTE:
  98 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
  99 + break;
  100 + case SHARED_ATTRIBUTE:
  101 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
  102 + break;
  103 + case SERVER_ATTRIBUTE:
  104 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
  105 + break;
  106 + case ATTRIBUTE:
  107 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
  108 + break;
  109 + }
  110 + });
105 111 }
106 112 return tbSubs;
107 113 }
108 114
109   - private TbSubscription createTsSub(EntityData entityData, List<EntityKey> tsSubKeys) {
110   - int subIdx = internalSubIdx++;
111   - subToEntityIdMap.put(subIdx, entityData.getEntityId());
112   - Map<String, Long> keyStates = new HashMap<>();
113   - tsSubKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
114   - if (entityData.getLatest() != null) {
115   - Map<String, TsValue> currentValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
116   - if (currentValues != null) {
117   - currentValues.forEach((k, v) -> keyStates.put(k, v.getTs()));
118   - }
119   - }
  115 + private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) {
  116 + Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys);
  117 + log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
  118 + return TbAttributeSubscription.builder()
  119 + .serviceId(serviceId)
  120 + .sessionId(sessionRef.getSessionId())
  121 + .subscriptionId(subIdx)
  122 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  123 + .entityId(entityData.getEntityId())
  124 + .updateConsumer((s, subscriptionUpdate) -> sendWsMsg(s, subscriptionUpdate, keysType))
  125 + .allKeys(false)
  126 + .keyStates(keyStates)
  127 + .scope(scope)
  128 + .build();
  129 + }
  130 +
  131 + private TbSubscription createTsSub(EntityData entityData, int subIdx, List<EntityKey> subKeys) {
  132 + Map<String, Long> keyStates = buildKeyStats(entityData, EntityKeyType.TIME_SERIES, subKeys);
120 133 if (entityData.getTimeseries() != null) {
121   - entityData.getTimeseries().forEach((k, v) -> keyStates.put(k, Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L)));
  134 + entityData.getTimeseries().forEach((k, v) -> {
  135 + long ts = Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L);
  136 + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, ts);
  137 + keyStates.put(k, ts);
  138 + });
122 139 }
123   -
  140 + log.trace("[{}][{}][{}] Creating time-series subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
124 141 return TbTimeseriesSubscription.builder()
125 142 .serviceId(serviceId)
126 143 .sessionId(sessionRef.getSessionId())
... ... @@ -129,26 +146,74 @@ public class TbEntityDataSubCtx {
129 146 .entityId(entityData.getEntityId())
130 147 .updateConsumer(this::sendTsWsMsg)
131 148 .allKeys(false)
132   - .keyStates(keyStates).build();
  149 + .keyStates(keyStates)
  150 + .build();
133 151 }
134 152
  153 + private Map<String, Long> buildKeyStats(EntityData entityData, EntityKeyType keysType, List<EntityKey> subKeys) {
  154 + Map<String, Long> keyStates = new HashMap<>();
  155 + subKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
  156 + if (entityData.getLatest() != null) {
  157 + Map<String, TsValue> currentValues = entityData.getLatest().get(keysType);
  158 + if (currentValues != null) {
  159 + currentValues.forEach((k, v) -> {
  160 + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, v.getTs());
  161 + keyStates.put(k, v.getTs());
  162 + });
  163 + }
  164 + }
  165 + return keyStates;
  166 + }
135 167
136 168 private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) {
  169 + sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES);
  170 + }
  171 +
  172 + private void sendWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
137 173 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
138 174 if (entityId != null) {
139   - Map<String, TsValue> latest = new HashMap<>();
  175 + log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
  176 + Map<String, TsValue> latestUpdate = new HashMap<>();
140 177 subscriptionUpdate.getData().forEach((k, v) -> {
141 178 Object[] data = (Object[]) v.get(0);
142   - latest.put(k, new TsValue((Long) data[0], (String) data[1]));
  179 + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
143 180 });
144   - Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latest);
145   - EntityData entityData = new EntityData(entityId, latestMap, null);
146   - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  181 + EntityData entityData = getDataForEntity(entityId);
  182 + if (entityData != null && entityData.getLatest() != null) {
  183 + Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
  184 + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
  185 + if (latestCtxValues != null) {
  186 + latestCtxValues.forEach((k, v) -> {
  187 + TsValue update = latestUpdate.get(k);
  188 + if (update.getTs() < v.getTs()) {
  189 + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  190 + latestUpdate.remove(k);
  191 + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
  192 + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  193 + latestUpdate.remove(k);
  194 + }
  195 + });
  196 + //Setting new values
  197 + latestUpdate.forEach(latestCtxValues::put);
  198 + }
  199 + }
  200 + if (!latestUpdate.isEmpty()) {
  201 + Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
  202 + entityData = new EntityData(entityId, latestMap, null);
  203 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  204 + }
  205 + } else {
  206 + log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
147 207 }
  208 + }
148 209
  210 + private EntityData getDataForEntity(EntityId entityId) {
  211 + return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null);
149 212 }
150 213
151   - public void clearSubscriptions() {
  214 + public Collection<Integer> clearSubscriptions() {
  215 + List<Integer> oldSubIds = new ArrayList<>(subToEntityIdMap.keySet());
152 216 subToEntityIdMap.clear();
  217 + return oldSubIds;
153 218 }
154 219 }
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
20 20
21 21 import java.net.InetSocketAddress;
22 22 import java.util.Objects;
  23 +import java.util.concurrent.atomic.AtomicInteger;
23 24
24 25 /**
25 26 * Created by ashvayka on 27.03.18.
... ... @@ -36,12 +37,15 @@ public class TelemetryWebSocketSessionRef {
36 37 private final InetSocketAddress localAddress;
37 38 @Getter
38 39 private final InetSocketAddress remoteAddress;
  40 + @Getter
  41 + private final AtomicInteger sessionSubIdSeq;
39 42
40 43 public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
41 44 this.sessionId = sessionId;
42 45 this.securityCtx = securityCtx;
43 46 this.localAddress = localAddress;
44 47 this.remoteAddress = remoteAddress;
  48 + this.sessionSubIdSeq = new AtomicInteger();
45 49 }
46 50
47 51 @Override
... ...
... ... @@ -28,6 +28,8 @@ import org.thingsboard.server.common.data.Device;
28 28 import org.thingsboard.server.common.data.Tenant;
29 29 import org.thingsboard.server.common.data.User;
30 30 import org.thingsboard.server.common.data.kv.Aggregation;
  31 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  32 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
31 33 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
32 34 import org.thingsboard.server.common.data.kv.LongDataEntry;
33 35 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -41,6 +43,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
41 43 import org.thingsboard.server.common.data.query.TsValue;
42 44 import org.thingsboard.server.common.data.security.Authority;
43 45 import org.thingsboard.server.dao.timeseries.TimeseriesService;
  46 +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
44 47 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
45 48 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
46 49 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
... ... @@ -48,6 +51,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
48 51 import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
49 52 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
50 53
  54 +import java.util.ArrayList;
51 55 import java.util.Arrays;
52 56 import java.util.Collections;
53 57 import java.util.List;
... ... @@ -139,7 +143,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
139 143 List<TsKvEntry> tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3);
140 144
141 145 sendTelemetry(device, tsData);
142   - Thread.sleep(1000);
  146 + Thread.sleep(100);
143 147
144 148 wsClient.send(mapper.writeValueAsString(wrapper));
145 149 msg = wsClient.waitForReply();
... ... @@ -156,25 +160,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
156 160 Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]);
157 161 }
158 162
159   - private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
160   - CountDownLatch latch = new CountDownLatch(1);
161   - tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
162   - @Override
163   - public void onSuccess(@Nullable Void result) {
164   - latch.countDown();
165   - }
166   -
167   - @Override
168   - public void onFailure(Throwable t) {
169   - latch.countDown();
170   - }
171   - });
172   - latch.await(3, TimeUnit.SECONDS);
173   - }
174   -
175 163 @Test
176   - @Ignore
177   - public void testEntityDataLatestWsCmd() throws Exception {
  164 + public void testEntityDataLatestTsWsCmd() throws Exception {
178 165 Device device = new Device();
179 166 device.setName("Device");
180 167 device.setType("default");
... ... @@ -211,9 +198,9 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
211 198 List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
212 199 sendTelemetry(device, tsData);
213 200
214   - cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
215   -
  201 + Thread.sleep(100);
216 202
  203 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
217 204 wrapper = new TelemetryPluginCmdsWrapper();
218 205 wrapper.setEntityDataCmds(Collections.singletonList(cmd));
219 206
... ... @@ -231,11 +218,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
231 218 TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
232 219 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue);
233 220
234   - log.error("GOING TO LISTEN FOR UPDATES");
235   - msg = wsClient.waitForUpdate();
236 221 now = System.currentTimeMillis();
237 222 TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L));
  223 +
  224 + wsClient.registerWaitForUpdate();
238 225 sendTelemetry(device, Arrays.asList(dataPoint2));
  226 + msg = wsClient.waitForUpdate();
239 227
240 228 update = mapper.readValue(msg, EntityDataUpdate.class);
241 229 Assert.assertEquals(1, update.getCmdId());
... ... @@ -247,6 +235,280 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
247 235 tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
248 236 Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsValue);
249 237
  238 + //Sending update from the past, while latest value has new timestamp;
  239 + wsClient.registerWaitForUpdate();
  240 + sendTelemetry(device, Arrays.asList(dataPoint1));
  241 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  242 + Assert.assertNull(msg);
  243 +
  244 + //Sending duplicate update again
  245 + wsClient.registerWaitForUpdate();
  246 + sendTelemetry(device, Arrays.asList(dataPoint2));
  247 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  248 + Assert.assertNull(msg);
  249 + }
  250 +
  251 + @Test
  252 + public void testEntityDataLatestAttrWsCmd() throws Exception {
  253 + Device device = new Device();
  254 + device.setName("Device");
  255 + device.setType("default");
  256 + device.setLabel("testLabel" + (int) (Math.random() * 1000));
  257 + device = doPost("/api/device", device, Device.class);
  258 +
  259 + long now = System.currentTimeMillis();
  260 +
  261 + DeviceTypeFilter dtf = new DeviceTypeFilter();
  262 + dtf.setDeviceNameFilter("D");
  263 + dtf.setDeviceType("default");
  264 + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
  265 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
  266 +
  267 + LatestValueCmd latestCmd = new LatestValueCmd();
  268 + latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey")));
  269 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  270 +
  271 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  272 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  273 +
  274 + wsClient.send(mapper.writeValueAsString(wrapper));
  275 + String msg = wsClient.waitForReply();
  276 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  277 + Assert.assertEquals(1, update.getCmdId());
  278 + PageData<EntityData> pageData = update.getData();
  279 + Assert.assertNotNull(pageData);
  280 + Assert.assertEquals(1, pageData.getData().size());
  281 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  282 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey"));
  283 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs());
  284 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
  285 +
  286 + AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
  287 + List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
  288 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
  289 +
  290 + Thread.sleep(100);
  291 +
  292 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  293 + wrapper = new TelemetryPluginCmdsWrapper();
  294 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  295 +
  296 + wsClient.send(mapper.writeValueAsString(wrapper));
  297 + msg = wsClient.waitForReply();
  298 + update = mapper.readValue(msg, EntityDataUpdate.class);
  299 +
  300 + Assert.assertEquals(1, update.getCmdId());
  301 +
  302 + pageData = update.getData();
  303 + Assert.assertNotNull(pageData);
  304 + Assert.assertEquals(1, pageData.getData().size());
  305 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  306 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  307 + TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  308 + Assert.assertEquals(new TsValue(dataPoint1.getLastUpdateTs(), dataPoint1.getValueAsString()), tsValue);
  309 +
  310 + now = System.currentTimeMillis();
  311 + AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("serverAttributeKey", 52L));
  312 +
  313 + wsClient.registerWaitForUpdate();
  314 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
  315 + msg = wsClient.waitForUpdate();
  316 +
  317 + update = mapper.readValue(msg, EntityDataUpdate.class);
  318 + Assert.assertEquals(1, update.getCmdId());
  319 + List<EntityData> eData = update.getUpdate();
  320 + Assert.assertNotNull(eData);
  321 + Assert.assertEquals(1, eData.size());
  322 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  323 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  324 + tsValue = eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  325 + Assert.assertEquals(new TsValue(dataPoint2.getLastUpdateTs(), dataPoint2.getValueAsString()), tsValue);
  326 +
  327 + //Sending update from the past, while latest value has new timestamp;
  328 + wsClient.registerWaitForUpdate();
  329 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint1));
  330 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  331 + Assert.assertNull(msg);
  332 +
  333 + //Sending duplicate update again
  334 + wsClient.registerWaitForUpdate();
  335 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
  336 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  337 + Assert.assertNull(msg);
  338 + }
  339 +
  340 + @Test
  341 + public void testEntityDataLatestAttrTypesWsCmd() throws Exception {
  342 + Device device = new Device();
  343 + device.setName("Device");
  344 + device.setType("default");
  345 + device.setLabel("testLabel" + (int) (Math.random() * 1000));
  346 + device = doPost("/api/device", device, Device.class);
  347 +
  348 + long now = System.currentTimeMillis();
  349 +
  350 + DeviceTypeFilter dtf = new DeviceTypeFilter();
  351 + dtf.setDeviceNameFilter("D");
  352 + dtf.setDeviceType("default");
  353 + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
  354 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
  355 +
  356 + LatestValueCmd latestCmd = new LatestValueCmd();
  357 + List<EntityKey> keys = new ArrayList<>();
  358 + keys.add(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"));
  359 + keys.add(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey"));
  360 + keys.add(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey"));
  361 + keys.add(new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey"));
  362 + latestCmd.setKeys(keys);
  363 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  364 +
  365 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  366 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  367 +
  368 + wsClient.send(mapper.writeValueAsString(wrapper));
  369 + String msg = wsClient.waitForReply();
  370 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  371 + Assert.assertEquals(1, update.getCmdId());
  372 + PageData<EntityData> pageData = update.getData();
  373 + Assert.assertNotNull(pageData);
  374 + Assert.assertEquals(1, pageData.getData().size());
  375 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  376 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey"));
  377 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs());
  378 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
  379 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey"));
  380 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey").getTs());
  381 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey").getValue());
  382 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey"));
  383 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey").getTs());
  384 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey").getValue());
  385 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey"));
  386 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getTs());
  387 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getValue());
  388 +
  389 +
  390 + wsClient.registerWaitForUpdate();
  391 + AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
  392 + List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
  393 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
  394 +
  395 + Thread.sleep(100);
  396 +
  397 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  398 + wrapper = new TelemetryPluginCmdsWrapper();
  399 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  400 +
  401 + msg = wsClient.waitForUpdate();
  402 +
  403 + update = mapper.readValue(msg, EntityDataUpdate.class);
  404 + Assert.assertEquals(1, update.getCmdId());
  405 + List<EntityData> eData = update.getUpdate();
  406 + Assert.assertNotNull(eData);
  407 + Assert.assertEquals(1, eData.size());
  408 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  409 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  410 + TsValue attrValue = eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  411 + Assert.assertEquals(new TsValue(dataPoint1.getLastUpdateTs(), dataPoint1.getValueAsString()), attrValue);
  412 +
  413 + //Sending update from the past, while latest value has new timestamp;
  414 + wsClient.registerWaitForUpdate();
  415 + sendAttributes(device, TbAttributeSubscriptionScope.SHARED_SCOPE, Arrays.asList(dataPoint1));
  416 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  417 + Assert.assertNull(msg);
  418 +
  419 + //Sending duplicate update again
  420 + wsClient.registerWaitForUpdate();
  421 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint1));
  422 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  423 + Assert.assertNull(msg);
  424 +
  425 + //Sending update from the past, while latest value has new timestamp;
  426 + wsClient.registerWaitForUpdate();
  427 + AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("sharedAttributeKey", 42L));
  428 + sendAttributes(device, TbAttributeSubscriptionScope.SHARED_SCOPE, Arrays.asList(dataPoint2));
  429 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  430 + update = mapper.readValue(msg, EntityDataUpdate.class);
  431 + Assert.assertEquals(1, update.getCmdId());
  432 + eData = update.getUpdate();
  433 + Assert.assertNotNull(eData);
  434 + Assert.assertEquals(1, eData.size());
  435 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  436 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE));
  437 + attrValue = eData.get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey");
  438 + Assert.assertEquals(new TsValue(dataPoint2.getLastUpdateTs(), dataPoint2.getValueAsString()), attrValue);
  439 +
  440 + wsClient.registerWaitForUpdate();
  441 + AttributeKvEntry dataPoint3 = new BaseAttributeKvEntry(now, new LongDataEntry("clientAttributeKey", 42L));
  442 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint3));
  443 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  444 + update = mapper.readValue(msg, EntityDataUpdate.class);
  445 + Assert.assertEquals(1, update.getCmdId());
  446 + eData = update.getUpdate();
  447 + Assert.assertNotNull(eData);
  448 + Assert.assertEquals(1, eData.size());
  449 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  450 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE));
  451 + attrValue = eData.get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey");
  452 + Assert.assertEquals(new TsValue(dataPoint3.getLastUpdateTs(), dataPoint3.getValueAsString()), attrValue);
  453 +
  454 + wsClient.registerWaitForUpdate();
  455 + AttributeKvEntry dataPoint4 = new BaseAttributeKvEntry(now, new LongDataEntry("anyAttributeKey", 42L));
  456 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint4));
  457 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  458 + update = mapper.readValue(msg, EntityDataUpdate.class);
  459 + Assert.assertEquals(1, update.getCmdId());
  460 + eData = update.getUpdate();
  461 + Assert.assertNotNull(eData);
  462 + Assert.assertEquals(1, eData.size());
  463 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  464 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE));
  465 + attrValue = eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey");
  466 + Assert.assertEquals(new TsValue(dataPoint4.getLastUpdateTs(), dataPoint4.getValueAsString()), attrValue);
  467 +
  468 + wsClient.registerWaitForUpdate();
  469 + AttributeKvEntry dataPoint5 = new BaseAttributeKvEntry(now, new LongDataEntry("anyAttributeKey", 43L));
  470 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint5));
  471 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  472 + update = mapper.readValue(msg, EntityDataUpdate.class);
  473 + Assert.assertEquals(1, update.getCmdId());
  474 + eData = update.getUpdate();
  475 + Assert.assertNotNull(eData);
  476 + Assert.assertEquals(1, eData.size());
  477 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  478 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE));
  479 + attrValue = eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey");
  480 + Assert.assertEquals(new TsValue(dataPoint5.getLastUpdateTs(), dataPoint5.getValueAsString()), attrValue);
  481 + }
  482 +
  483 + private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
  484 + CountDownLatch latch = new CountDownLatch(1);
  485 + tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
  486 + @Override
  487 + public void onSuccess(@Nullable Void result) {
  488 + latch.countDown();
  489 + }
  490 +
  491 + @Override
  492 + public void onFailure(Throwable t) {
  493 + latch.countDown();
  494 + }
  495 + });
  496 + latch.await(3, TimeUnit.SECONDS);
250 497 }
251 498
  499 + private void sendAttributes(Device device, TbAttributeSubscriptionScope scope, List<AttributeKvEntry> attrData) throws InterruptedException {
  500 + CountDownLatch latch = new CountDownLatch(1);
  501 + tsService.saveAndNotify(device.getTenantId(), device.getId(), scope.name(), attrData, new FutureCallback<Void>() {
  502 + @Override
  503 + public void onSuccess(@Nullable Void result) {
  504 + latch.countDown();
  505 + }
  506 +
  507 + @Override
  508 + public void onFailure(Throwable t) {
  509 + latch.countDown();
  510 + }
  511 + });
  512 + latch.await(3, TimeUnit.SECONDS);
  513 + }
252 514 }
... ...
... ... @@ -27,9 +27,7 @@ import java.util.concurrent.TimeUnit;
27 27 @Slf4j
28 28 public class TbTestWebSocketClient extends WebSocketClient {
29 29
30   - private volatile String lastReply;
31   - private volatile String lastUpdate;
32   - private volatile boolean replyReceived;
  30 + private volatile String lastMsg;
33 31 private CountDownLatch reply;
34 32 private CountDownLatch update;
35 33
... ... @@ -44,23 +42,13 @@ public class TbTestWebSocketClient extends WebSocketClient {
44 42
45 43 @Override
46 44 public void onMessage(String s) {
47   - log.error("RECEIVED: {}", s);
48   - synchronized (this) {
49   - if (!replyReceived) {
50   - replyReceived = true;
51   - lastReply = s;
52   - log.error("LAST REPLY: {}", s);
53   - if (reply != null) {
54   - reply.countDown();
55   - }
56   - } else {
57   - lastUpdate = s;
58   - log.error("LAST UPDATE: {}", s);
59   - if (update == null) {
60   - update = new CountDownLatch(1);
61   - }
62   - update.countDown();
63   - }
  45 + log.info("RECEIVED: {}", s);
  46 + lastMsg = s;
  47 + if (reply != null) {
  48 + reply.countDown();
  49 + }
  50 + if (update != null) {
  51 + update.countDown();
64 52 }
65 53 }
66 54
... ... @@ -74,25 +62,28 @@ public class TbTestWebSocketClient extends WebSocketClient {
74 62
75 63 }
76 64
  65 + public void registerWaitForUpdate() {
  66 + lastMsg = null;
  67 + update = new CountDownLatch(1);
  68 + }
  69 +
77 70 @Override
78 71 public void send(String text) throws NotYetConnectedException {
79   - synchronized (this) {
80   - reply = new CountDownLatch(1);
81   - replyReceived = false;
82   - }
  72 + reply = new CountDownLatch(1);
83 73 super.send(text);
84 74 }
85 75
86 76 public String waitForUpdate() {
87   - synchronized (this) {
88   - update = new CountDownLatch(1);
89   - }
  77 + return waitForUpdate(TimeUnit.SECONDS.toMillis(3));
  78 + }
  79 +
  80 + public String waitForUpdate(long ms) {
90 81 try {
91   - update.await(3, TimeUnit.SECONDS);
  82 + update.await(ms, TimeUnit.MILLISECONDS);
92 83 } catch (InterruptedException e) {
93 84 log.warn("Failed to await reply", e);
94 85 }
95   - return lastUpdate;
  86 + return lastMsg;
96 87 }
97 88
98 89 public String waitForReply() {
... ... @@ -101,6 +92,6 @@ public class TbTestWebSocketClient extends WebSocketClient {
101 92 } catch (InterruptedException e) {
102 93 log.warn("Failed to await reply", e);
103 94 }
104   - return lastReply;
  95 + return lastMsg;
105 96 }
106 97 }
... ...
... ... @@ -7,6 +7,8 @@
7 7 </encoder>
8 8 </appender>
9 9
  10 +<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
  11 + <logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
10 12 <logger name="org.thingsboard.server" level="WARN"/>
11 13 <logger name="org.springframework" level="WARN"/>
12 14 <logger name="org.springframework.boot.test" level="WARN"/>
... ...
... ... @@ -17,9 +17,11 @@ package org.thingsboard.server.common.data.query;
17 17
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import lombok.Getter;
  20 +import lombok.ToString;
20 21
21 22 import java.util.List;
22 23
  24 +@ToString
23 25 public class EntityDataQuery extends EntityCountQuery {
24 26
25 27 @Getter
... ...
... ... @@ -136,7 +136,7 @@ public class EntityKeyMapping {
136 136 } else {
137 137 scope = DataConstants.SERVER_SCOPE;
138 138 }
139   - query = String.format("%s AND %s.attribute_type=%s", query, alias, scope);
  139 + query = String.format("%s AND %s.attribute_type='%s'", query, alias, scope);
140 140 }
141 141 return query;
142 142 }
... ...