Commit 9e8489628a6fbea4ee454a9c2d59f5a59266ef01

Authored by Andrii Shvaika
1 parent 808707af

Latest Attributes Subscription Implementation

... ... @@ -84,42 +84,52 @@ public class TbEntityDataSubCtx {
84 84 public List<TbSubscription> createSubscriptions(List<EntityKey> keys) {
85 85 this.subToEntityIdMap = new HashMap<>();
86 86 tbSubs = new ArrayList<>();
87   - List<EntityKey> attrSubKeys = new ArrayList<>();
88   - List<EntityKey> tsSubKeys = new ArrayList<>();
89   - for (EntityKey key : keys) {
90   - switch (key.getType()) {
91   - case TIME_SERIES:
92   - tsSubKeys.add(key);
93   - break;
94   - case ATTRIBUTE:
95   - case CLIENT_ATTRIBUTE:
96   - case SHARED_ATTRIBUTE:
97   - case SERVER_ATTRIBUTE:
98   - attrSubKeys.add(key);
99   - }
100   - }
  87 + Map<EntityKeyType, List<EntityKey>> keysByType = new HashMap<>();
  88 + keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key));
101 89 for (EntityData entityData : data.getData()) {
102   - if (!tsSubKeys.isEmpty()) {
103   - tbSubs.add(createTsSub(entityData, tsSubKeys));
104   - }
  90 + keysByType.forEach((keysType, keysList) -> {
  91 + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
  92 + subToEntityIdMap.put(subIdx, entityData.getEntityId());
  93 + switch (keysType) {
  94 + case TIME_SERIES:
  95 + tbSubs.add(createTsSub(entityData, subIdx, keysList));
  96 + break;
  97 + case CLIENT_ATTRIBUTE:
  98 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
  99 + break;
  100 + case SHARED_ATTRIBUTE:
  101 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
  102 + break;
  103 + case SERVER_ATTRIBUTE:
  104 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
  105 + break;
  106 + case ATTRIBUTE:
  107 + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
  108 + break;
  109 + }
  110 + });
105 111 }
106 112 return tbSubs;
107 113 }
108 114
109   - private TbSubscription createTsSub(EntityData entityData, List<EntityKey> tsSubKeys) {
110   - int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
111   - subToEntityIdMap.put(subIdx, entityData.getEntityId());
112   - Map<String, Long> keyStates = new HashMap<>();
113   - tsSubKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
114   - if (entityData.getLatest() != null) {
115   - Map<String, TsValue> currentValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
116   - if (currentValues != null) {
117   - currentValues.forEach((k, v) -> {
118   - log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, v.getTs());
119   - keyStates.put(k, v.getTs());
120   - });
121   - }
122   - }
  115 + private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) {
  116 + Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys);
  117 + log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
  118 + return TbAttributeSubscription.builder()
  119 + .serviceId(serviceId)
  120 + .sessionId(sessionRef.getSessionId())
  121 + .subscriptionId(subIdx)
  122 + .tenantId(sessionRef.getSecurityCtx().getTenantId())
  123 + .entityId(entityData.getEntityId())
  124 + .updateConsumer((s, subscriptionUpdate) -> sendWsMsg(s, subscriptionUpdate, keysType))
  125 + .allKeys(false)
  126 + .keyStates(keyStates)
  127 + .scope(scope)
  128 + .build();
  129 + }
  130 +
  131 + private TbSubscription createTsSub(EntityData entityData, int subIdx, List<EntityKey> subKeys) {
  132 + Map<String, Long> keyStates = buildKeyStats(entityData, EntityKeyType.TIME_SERIES, subKeys);
123 133 if (entityData.getTimeseries() != null) {
124 134 entityData.getTimeseries().forEach((k, v) -> {
125 135 long ts = Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L);
... ... @@ -127,8 +137,7 @@ public class TbEntityDataSubCtx {
127 137 keyStates.put(k, ts);
128 138 });
129 139 }
130   -
131   - log.trace("[{}][{}][{}] Creating subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
  140 + log.trace("[{}][{}][{}] Creating time-series subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
132 141 return TbTimeseriesSubscription.builder()
133 142 .serviceId(serviceId)
134 143 .sessionId(sessionRef.getSessionId())
... ... @@ -137,14 +146,33 @@ public class TbEntityDataSubCtx {
137 146 .entityId(entityData.getEntityId())
138 147 .updateConsumer(this::sendTsWsMsg)
139 148 .allKeys(false)
140   - .keyStates(keyStates).build();
  149 + .keyStates(keyStates)
  150 + .build();
141 151 }
142 152
  153 + private Map<String, Long> buildKeyStats(EntityData entityData, EntityKeyType keysType, List<EntityKey> subKeys) {
  154 + Map<String, Long> keyStates = new HashMap<>();
  155 + subKeys.forEach(key -> keyStates.put(key.getKey(), 0L));
  156 + if (entityData.getLatest() != null) {
  157 + Map<String, TsValue> currentValues = entityData.getLatest().get(keysType);
  158 + if (currentValues != null) {
  159 + currentValues.forEach((k, v) -> {
  160 + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, v.getTs());
  161 + keyStates.put(k, v.getTs());
  162 + });
  163 + }
  164 + }
  165 + return keyStates;
  166 + }
143 167
144 168 private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) {
  169 + sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES);
  170 + }
  171 +
  172 + private void sendWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
145 173 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
146 174 if (entityId != null) {
147   - log.trace("[{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate);
  175 + log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
148 176 Map<String, TsValue> latestUpdate = new HashMap<>();
149 177 subscriptionUpdate.getData().forEach((k, v) -> {
150 178 Object[] data = (Object[]) v.get(0);
... ... @@ -152,7 +180,7 @@ public class TbEntityDataSubCtx {
152 180 });
153 181 EntityData entityData = getDataForEntity(entityId);
154 182 if (entityData != null && entityData.getLatest() != null) {
155   - Map<String, TsValue> latestCtxValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES);
  183 + Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
156 184 log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
157 185 if (latestCtxValues != null) {
158 186 latestCtxValues.forEach((k, v) -> {
... ... @@ -170,12 +198,12 @@ public class TbEntityDataSubCtx {
170 198 }
171 199 }
172 200 if (!latestUpdate.isEmpty()) {
173   - Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latestUpdate);
  201 + Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
174 202 entityData = new EntityData(entityId, latestMap, null);
175 203 wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
176 204 }
177 205 } else {
178   - log.trace("[{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate);
  206 + log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
179 207 }
180 208 }
181 209
... ...
... ... @@ -28,6 +28,8 @@ import org.thingsboard.server.common.data.Device;
28 28 import org.thingsboard.server.common.data.Tenant;
29 29 import org.thingsboard.server.common.data.User;
30 30 import org.thingsboard.server.common.data.kv.Aggregation;
  31 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  32 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
31 33 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
32 34 import org.thingsboard.server.common.data.kv.LongDataEntry;
33 35 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -41,6 +43,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
41 43 import org.thingsboard.server.common.data.query.TsValue;
42 44 import org.thingsboard.server.common.data.security.Authority;
43 45 import org.thingsboard.server.dao.timeseries.TimeseriesService;
  46 +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
44 47 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
45 48 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
46 49 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
... ... @@ -48,6 +51,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
48 51 import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
49 52 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
50 53
  54 +import java.util.ArrayList;
51 55 import java.util.Arrays;
52 56 import java.util.Collections;
53 57 import java.util.List;
... ... @@ -139,7 +143,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
139 143 List<TsKvEntry> tsData = Arrays.asList(dataPoint1, dataPoint2, dataPoint3);
140 144
141 145 sendTelemetry(device, tsData);
142   - Thread.sleep(1000);
  146 + Thread.sleep(100);
143 147
144 148 wsClient.send(mapper.writeValueAsString(wrapper));
145 149 msg = wsClient.waitForReply();
... ... @@ -156,24 +160,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
156 160 Assert.assertEquals(new TsValue(dataPoint3.getTs(), dataPoint3.getValueAsString()), tsArray[2]);
157 161 }
158 162
159   - private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
160   - CountDownLatch latch = new CountDownLatch(1);
161   - tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
162   - @Override
163   - public void onSuccess(@Nullable Void result) {
164   - latch.countDown();
165   - }
166   -
167   - @Override
168   - public void onFailure(Throwable t) {
169   - latch.countDown();
170   - }
171   - });
172   - latch.await(3, TimeUnit.SECONDS);
173   - }
174   -
175 163 @Test
176   - public void testEntityDataLatestWsCmd() throws Exception {
  164 + public void testEntityDataLatestTsWsCmd() throws Exception {
177 165 Device device = new Device();
178 166 device.setName("Device");
179 167 device.setType("default");
... ... @@ -210,6 +198,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
210 198 List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
211 199 sendTelemetry(device, tsData);
212 200
  201 + Thread.sleep(100);
  202 +
213 203 cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
214 204 wrapper = new TelemetryPluginCmdsWrapper();
215 205 wrapper.setEntityDataCmds(Collections.singletonList(cmd));
... ... @@ -258,4 +248,267 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
258 248 Assert.assertNull(msg);
259 249 }
260 250
  251 + @Test
  252 + public void testEntityDataLatestAttrWsCmd() throws Exception {
  253 + Device device = new Device();
  254 + device.setName("Device");
  255 + device.setType("default");
  256 + device.setLabel("testLabel" + (int) (Math.random() * 1000));
  257 + device = doPost("/api/device", device, Device.class);
  258 +
  259 + long now = System.currentTimeMillis();
  260 +
  261 + DeviceTypeFilter dtf = new DeviceTypeFilter();
  262 + dtf.setDeviceNameFilter("D");
  263 + dtf.setDeviceType("default");
  264 + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
  265 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
  266 +
  267 + LatestValueCmd latestCmd = new LatestValueCmd();
  268 + latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey")));
  269 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  270 +
  271 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  272 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  273 +
  274 + wsClient.send(mapper.writeValueAsString(wrapper));
  275 + String msg = wsClient.waitForReply();
  276 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  277 + Assert.assertEquals(1, update.getCmdId());
  278 + PageData<EntityData> pageData = update.getData();
  279 + Assert.assertNotNull(pageData);
  280 + Assert.assertEquals(1, pageData.getData().size());
  281 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  282 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey"));
  283 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs());
  284 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
  285 +
  286 + AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
  287 + List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
  288 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
  289 +
  290 + Thread.sleep(100);
  291 +
  292 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  293 + wrapper = new TelemetryPluginCmdsWrapper();
  294 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  295 +
  296 + wsClient.send(mapper.writeValueAsString(wrapper));
  297 + msg = wsClient.waitForReply();
  298 + update = mapper.readValue(msg, EntityDataUpdate.class);
  299 +
  300 + Assert.assertEquals(1, update.getCmdId());
  301 +
  302 + pageData = update.getData();
  303 + Assert.assertNotNull(pageData);
  304 + Assert.assertEquals(1, pageData.getData().size());
  305 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  306 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  307 + TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  308 + Assert.assertEquals(new TsValue(dataPoint1.getLastUpdateTs(), dataPoint1.getValueAsString()), tsValue);
  309 +
  310 + now = System.currentTimeMillis();
  311 + AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("serverAttributeKey", 52L));
  312 +
  313 + wsClient.registerWaitForUpdate();
  314 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
  315 + msg = wsClient.waitForUpdate();
  316 +
  317 + update = mapper.readValue(msg, EntityDataUpdate.class);
  318 + Assert.assertEquals(1, update.getCmdId());
  319 + List<EntityData> eData = update.getUpdate();
  320 + Assert.assertNotNull(eData);
  321 + Assert.assertEquals(1, eData.size());
  322 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  323 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  324 + tsValue = eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  325 + Assert.assertEquals(new TsValue(dataPoint2.getLastUpdateTs(), dataPoint2.getValueAsString()), tsValue);
  326 +
  327 + //Sending update from the past, while latest value has new timestamp;
  328 + wsClient.registerWaitForUpdate();
  329 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint1));
  330 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  331 + Assert.assertNull(msg);
  332 +
  333 + //Sending duplicate update again
  334 + wsClient.registerWaitForUpdate();
  335 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint2));
  336 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  337 + Assert.assertNull(msg);
  338 + }
  339 +
  340 + @Test
  341 + public void testEntityDataLatestAttrTypesWsCmd() throws Exception {
  342 + Device device = new Device();
  343 + device.setName("Device");
  344 + device.setType("default");
  345 + device.setLabel("testLabel" + (int) (Math.random() * 1000));
  346 + device = doPost("/api/device", device, Device.class);
  347 +
  348 + long now = System.currentTimeMillis();
  349 +
  350 + DeviceTypeFilter dtf = new DeviceTypeFilter();
  351 + dtf.setDeviceNameFilter("D");
  352 + dtf.setDeviceType("default");
  353 + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null),
  354 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
  355 +
  356 + LatestValueCmd latestCmd = new LatestValueCmd();
  357 + List<EntityKey> keys = new ArrayList<>();
  358 + keys.add(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "serverAttributeKey"));
  359 + keys.add(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, "clientAttributeKey"));
  360 + keys.add(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, "sharedAttributeKey"));
  361 + keys.add(new EntityKey(EntityKeyType.ATTRIBUTE, "anyAttributeKey"));
  362 + latestCmd.setKeys(keys);
  363 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  364 +
  365 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  366 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  367 +
  368 + wsClient.send(mapper.writeValueAsString(wrapper));
  369 + String msg = wsClient.waitForReply();
  370 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  371 + Assert.assertEquals(1, update.getCmdId());
  372 + PageData<EntityData> pageData = update.getData();
  373 + Assert.assertNotNull(pageData);
  374 + Assert.assertEquals(1, pageData.getData().size());
  375 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  376 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey"));
  377 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getTs());
  378 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey").getValue());
  379 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey"));
  380 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey").getTs());
  381 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey").getValue());
  382 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey"));
  383 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey").getTs());
  384 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey").getValue());
  385 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey"));
  386 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getTs());
  387 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getValue());
  388 +
  389 +
  390 + wsClient.registerWaitForUpdate();
  391 + AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
  392 + List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
  393 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
  394 +
  395 + Thread.sleep(100);
  396 +
  397 + cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  398 + wrapper = new TelemetryPluginCmdsWrapper();
  399 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  400 +
  401 + msg = wsClient.waitForUpdate();
  402 +
  403 + update = mapper.readValue(msg, EntityDataUpdate.class);
  404 + Assert.assertEquals(1, update.getCmdId());
  405 + List<EntityData> eData = update.getUpdate();
  406 + Assert.assertNotNull(eData);
  407 + Assert.assertEquals(1, eData.size());
  408 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  409 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  410 + TsValue attrValue = eData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  411 + Assert.assertEquals(new TsValue(dataPoint1.getLastUpdateTs(), dataPoint1.getValueAsString()), attrValue);
  412 +
  413 + //Sending update from the past, while latest value has new timestamp;
  414 + wsClient.registerWaitForUpdate();
  415 + sendAttributes(device, TbAttributeSubscriptionScope.SHARED_SCOPE, Arrays.asList(dataPoint1));
  416 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  417 + Assert.assertNull(msg);
  418 +
  419 + //Sending duplicate update again
  420 + wsClient.registerWaitForUpdate();
  421 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint1));
  422 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  423 + Assert.assertNull(msg);
  424 +
  425 + //Sending update from the past, while latest value has new timestamp;
  426 + wsClient.registerWaitForUpdate();
  427 + AttributeKvEntry dataPoint2 = new BaseAttributeKvEntry(now, new LongDataEntry("sharedAttributeKey", 42L));
  428 + sendAttributes(device, TbAttributeSubscriptionScope.SHARED_SCOPE, Arrays.asList(dataPoint2));
  429 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  430 + update = mapper.readValue(msg, EntityDataUpdate.class);
  431 + Assert.assertEquals(1, update.getCmdId());
  432 + eData = update.getUpdate();
  433 + Assert.assertNotNull(eData);
  434 + Assert.assertEquals(1, eData.size());
  435 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  436 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE));
  437 + attrValue = eData.get(0).getLatest().get(EntityKeyType.SHARED_ATTRIBUTE).get("sharedAttributeKey");
  438 + Assert.assertEquals(new TsValue(dataPoint2.getLastUpdateTs(), dataPoint2.getValueAsString()), attrValue);
  439 +
  440 + wsClient.registerWaitForUpdate();
  441 + AttributeKvEntry dataPoint3 = new BaseAttributeKvEntry(now, new LongDataEntry("clientAttributeKey", 42L));
  442 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint3));
  443 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  444 + update = mapper.readValue(msg, EntityDataUpdate.class);
  445 + Assert.assertEquals(1, update.getCmdId());
  446 + eData = update.getUpdate();
  447 + Assert.assertNotNull(eData);
  448 + Assert.assertEquals(1, eData.size());
  449 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  450 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE));
  451 + attrValue = eData.get(0).getLatest().get(EntityKeyType.CLIENT_ATTRIBUTE).get("clientAttributeKey");
  452 + Assert.assertEquals(new TsValue(dataPoint3.getLastUpdateTs(), dataPoint3.getValueAsString()), attrValue);
  453 +
  454 + wsClient.registerWaitForUpdate();
  455 + AttributeKvEntry dataPoint4 = new BaseAttributeKvEntry(now, new LongDataEntry("anyAttributeKey", 42L));
  456 + sendAttributes(device, TbAttributeSubscriptionScope.CLIENT_SCOPE, Arrays.asList(dataPoint4));
  457 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  458 + update = mapper.readValue(msg, EntityDataUpdate.class);
  459 + Assert.assertEquals(1, update.getCmdId());
  460 + eData = update.getUpdate();
  461 + Assert.assertNotNull(eData);
  462 + Assert.assertEquals(1, eData.size());
  463 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  464 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE));
  465 + attrValue = eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey");
  466 + Assert.assertEquals(new TsValue(dataPoint4.getLastUpdateTs(), dataPoint4.getValueAsString()), attrValue);
  467 +
  468 + wsClient.registerWaitForUpdate();
  469 + AttributeKvEntry dataPoint5 = new BaseAttributeKvEntry(now, new LongDataEntry("anyAttributeKey", 43L));
  470 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Arrays.asList(dataPoint5));
  471 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  472 + update = mapper.readValue(msg, EntityDataUpdate.class);
  473 + Assert.assertEquals(1, update.getCmdId());
  474 + eData = update.getUpdate();
  475 + Assert.assertNotNull(eData);
  476 + Assert.assertEquals(1, eData.size());
  477 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  478 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE));
  479 + attrValue = eData.get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey");
  480 + Assert.assertEquals(new TsValue(dataPoint5.getLastUpdateTs(), dataPoint5.getValueAsString()), attrValue);
  481 + }
  482 +
  483 + private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
  484 + CountDownLatch latch = new CountDownLatch(1);
  485 + tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback<Void>() {
  486 + @Override
  487 + public void onSuccess(@Nullable Void result) {
  488 + latch.countDown();
  489 + }
  490 +
  491 + @Override
  492 + public void onFailure(Throwable t) {
  493 + latch.countDown();
  494 + }
  495 + });
  496 + latch.await(3, TimeUnit.SECONDS);
  497 + }
  498 +
  499 + private void sendAttributes(Device device, TbAttributeSubscriptionScope scope, List<AttributeKvEntry> attrData) throws InterruptedException {
  500 + CountDownLatch latch = new CountDownLatch(1);
  501 + tsService.saveAndNotify(device.getTenantId(), device.getId(), scope.name(), attrData, new FutureCallback<Void>() {
  502 + @Override
  503 + public void onSuccess(@Nullable Void result) {
  504 + latch.countDown();
  505 + }
  506 +
  507 + @Override
  508 + public void onFailure(Throwable t) {
  509 + latch.countDown();
  510 + }
  511 + });
  512 + latch.await(3, TimeUnit.SECONDS);
  513 + }
261 514 }
... ...
... ... @@ -136,7 +136,7 @@ public class EntityKeyMapping {
136 136 } else {
137 137 scope = DataConstants.SERVER_SCOPE;
138 138 }
139   - query = String.format("%s AND %s.attribute_type=%s", query, alias, scope);
  139 + query = String.format("%s AND %s.attribute_type='%s'", query, alias, scope);
140 140 }
141 141 return query;
142 142 }
... ...