Commit befba2bb0f0d95f4cc08387a09add1cd7b0403fd

Authored by Andrii Shvaika
1 parent e6e9be18

WS API Improvements

@@ -171,11 +171,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -171,11 +171,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
171 TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); 171 TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
172 if (ctx != null) { 172 if (ctx != null) {
173 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); 173 log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
174 - if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) { 174 + if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) {
175 Collection<Integer> oldSubIds = ctx.clearSubscriptions(); 175 Collection<Integer> oldSubIds = ctx.clearSubscriptions();
176 oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); 176 oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
177 } 177 }
178 - //TODO: cleanup old subscription;  
179 } else { 178 } else {
180 log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); 179 log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
181 ctx = createSubCtx(session, cmd); 180 ctx = createSubCtx(session, cmd);
@@ -37,10 +37,11 @@ import java.util.ArrayList; @@ -37,10 +37,11 @@ import java.util.ArrayList;
37 import java.util.Arrays; 37 import java.util.Arrays;
38 import java.util.Collection; 38 import java.util.Collection;
39 import java.util.Collections; 39 import java.util.Collections;
  40 +import java.util.Comparator;
40 import java.util.HashMap; 41 import java.util.HashMap;
41 import java.util.List; 42 import java.util.List;
42 import java.util.Map; 43 import java.util.Map;
43 -import java.util.function.BiConsumer; 44 +import java.util.Optional;
44 45
45 @Slf4j 46 @Slf4j
46 @Data 47 @Data
@@ -145,12 +146,7 @@ public class TbEntityDataSubCtx { @@ -145,12 +146,7 @@ public class TbEntityDataSubCtx {
145 .subscriptionId(subIdx) 146 .subscriptionId(subIdx)
146 .tenantId(sessionRef.getSecurityCtx().getTenantId()) 147 .tenantId(sessionRef.getSecurityCtx().getTenantId())
147 .entityId(entityData.getEntityId()) 148 .entityId(entityData.getEntityId())
148 - .updateConsumer(new BiConsumer<String, SubscriptionUpdate>() {  
149 - @Override  
150 - public void accept(String sessionId, SubscriptionUpdate subscriptionUpdate) {  
151 - sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues);  
152 - }  
153 - }) 149 + .updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues))
154 .allKeys(false) 150 .allKeys(false)
155 .keyStates(keyStates) 151 .keyStates(keyStates)
156 .build(); 152 .build();
@@ -179,47 +175,90 @@ public class TbEntityDataSubCtx { @@ -179,47 +175,90 @@ public class TbEntityDataSubCtx {
179 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); 175 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
180 if (entityId != null) { 176 if (entityId != null) {
181 log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); 177 log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
182 - Map<String, TsValue> latestUpdate = new HashMap<>();  
183 - subscriptionUpdate.getData().forEach((k, v) -> {  
184 - Object[] data = (Object[]) v.get(0);  
185 - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));  
186 - });  
187 - EntityData entityData = getDataForEntity(entityId);  
188 - if (entityData != null && entityData.getLatest() != null) {  
189 - Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);  
190 - log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);  
191 - if (latestCtxValues != null) {  
192 - latestCtxValues.forEach((k, v) -> {  
193 - TsValue update = latestUpdate.get(k);  
194 - if (update != null) { 178 + if (resultToLatestValues) {
  179 + sendLatestWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
  180 + } else {
  181 + sendTsWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
  182 + }
  183 + } else {
  184 + log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
  185 + }
  186 + }
  187 +
  188 + private void sendLatestWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
  189 + Map<String, TsValue> latestUpdate = new HashMap<>();
  190 + subscriptionUpdate.getData().forEach((k, v) -> {
  191 + Object[] data = (Object[]) v.get(0);
  192 + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
  193 + });
  194 + EntityData entityData = getDataForEntity(entityId);
  195 + if (entityData != null && entityData.getLatest() != null) {
  196 + Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
  197 + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
  198 + if (latestCtxValues != null) {
  199 + latestCtxValues.forEach((k, v) -> {
  200 + TsValue update = latestUpdate.get(k);
  201 + if (update != null) {
  202 + if (update.getTs() < v.getTs()) {
  203 + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  204 + latestUpdate.remove(k);
  205 + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
  206 + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  207 + latestUpdate.remove(k);
  208 + }
  209 + }
  210 + });
  211 + //Setting new values
  212 + latestUpdate.forEach(latestCtxValues::put);
  213 + }
  214 + }
  215 + if (!latestUpdate.isEmpty()) {
  216 + Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
  217 + entityData = new EntityData(entityId, latestMap, null);
  218 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
  219 + }
  220 + }
  221 +
  222 + private void sendTsWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
  223 + Map<String, List<TsValue>> tsUpdate = new HashMap<>();
  224 + subscriptionUpdate.getData().forEach((k, v) -> {
  225 + Object[] data = (Object[]) v.get(0);
  226 + tsUpdate.computeIfAbsent(k, key -> new ArrayList<>()).add(new TsValue((Long) data[0], (String) data[1]));
  227 + });
  228 + EntityData entityData = getDataForEntity(entityId);
  229 + if (entityData != null && entityData.getLatest() != null) {
  230 + Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
  231 + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
  232 + if (latestCtxValues != null) {
  233 + latestCtxValues.forEach((k, v) -> {
  234 + List<TsValue> updateList = tsUpdate.get(k);
  235 + if (updateList != null) {
  236 + for (TsValue update : new ArrayList<>(updateList)) {
195 if (update.getTs() < v.getTs()) { 237 if (update.getTs() < v.getTs()) {
196 log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); 238 log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
197 - latestUpdate.remove(k); 239 + updateList.remove(update);
198 } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { 240 } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
199 log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); 241 log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
200 - latestUpdate.remove(k); 242 + updateList.remove(update);
  243 + }
  244 + if (updateList.isEmpty()) {
  245 + tsUpdate.remove(k);
201 } 246 }
202 } 247 }
203 - });  
204 - //Setting new values  
205 - latestUpdate.forEach(latestCtxValues::put);  
206 - }  
207 - }  
208 - if (!latestUpdate.isEmpty()) {  
209 - if (resultToLatestValues) {  
210 - Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);  
211 - entityData = new EntityData(entityId, latestMap, null);  
212 - } else {  
213 - Map<String, TsValue[]> tsMap = new HashMap<>();  
214 - latestUpdate.forEach((key, tsValue) -> {  
215 - tsMap.put(key, new TsValue[]{tsValue});  
216 - });  
217 - entityData = new EntityData(entityId, null, tsMap);  
218 - }  
219 - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); 248 + }
  249 + });
  250 + //Setting new values
  251 + tsUpdate.forEach((k, v) -> {
  252 + Optional<TsValue> maxValue = v.stream().max(Comparator.comparingLong(TsValue::getTs));
  253 + maxValue.ifPresent(max -> latestCtxValues.put(k, max));
  254 + });
220 } 255 }
221 - } else {  
222 - log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); 256 + }
  257 + if (!tsUpdate.isEmpty()) {
  258 + Map<String, TsValue[]> tsMap = new HashMap<>();
  259 + tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()])));
  260 + entityData = new EntityData(entityId, null, tsMap);
  261 + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
223 } 262 }
224 } 263 }
225 264
@@ -21,6 +21,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; @@ -21,6 +21,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
21 import org.junit.After; 21 import org.junit.After;
22 import org.junit.Assert; 22 import org.junit.Assert;
23 import org.junit.Before; 23 import org.junit.Before;
  24 +import org.junit.Ignore;
24 import org.junit.Test; 25 import org.junit.Test;
25 import org.springframework.beans.factory.annotation.Autowired; 26 import org.springframework.beans.factory.annotation.Autowired;
26 import org.thingsboard.server.common.data.Device; 27 import org.thingsboard.server.common.data.Device;
@@ -113,7 +114,9 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -113,7 +114,9 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
113 DeviceTypeFilter dtf = new DeviceTypeFilter(); 114 DeviceTypeFilter dtf = new DeviceTypeFilter();
114 dtf.setDeviceNameFilter("D"); 115 dtf.setDeviceNameFilter("D");
115 dtf.setDeviceType("default"); 116 dtf.setDeviceType("default");
116 - EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); 117 + EntityDataQuery edq = new EntityDataQuery(dtf,
  118 + new EntityDataPageLink(1, 0, null, null),
  119 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
117 120
118 EntityHistoryCmd historyCmd = new EntityHistoryCmd(); 121 EntityHistoryCmd historyCmd = new EntityHistoryCmd();
119 historyCmd.setKeys(Arrays.asList("temperature")); 122 historyCmd.setKeys(Arrays.asList("temperature"));
@@ -148,11 +151,11 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -148,11 +151,11 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
148 msg = wsClient.waitForReply(); 151 msg = wsClient.waitForReply();
149 update = mapper.readValue(msg, EntityDataUpdate.class); 152 update = mapper.readValue(msg, EntityDataUpdate.class);
150 Assert.assertEquals(1, update.getCmdId()); 153 Assert.assertEquals(1, update.getCmdId());
151 - pageData = update.getData();  
152 - Assert.assertNotNull(pageData);  
153 - Assert.assertEquals(1, pageData.getData().size());  
154 - Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());  
155 - TsValue[] tsArray = pageData.getData().get(0).getTimeseries().get("temperature"); 154 + List<EntityData> dataList = update.getUpdate();
  155 + Assert.assertNotNull(dataList);
  156 + Assert.assertEquals(1, dataList.size());
  157 + Assert.assertEquals(device.getId(), dataList.get(0).getEntityId());
  158 + TsValue[] tsArray = dataList.get(0).getTimeseries().get("temperature");
156 Assert.assertEquals(3, tsArray.length); 159 Assert.assertEquals(3, tsArray.length);
157 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]); 160 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]);
158 Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]); 161 Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]);
@@ -223,8 +226,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -223,8 +226,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
223 226
224 now = System.currentTimeMillis(); 227 now = System.currentTimeMillis();
225 TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L)); 228 TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L));
226 -  
227 wsClient.registerWaitForUpdate(); 229 wsClient.registerWaitForUpdate();
  230 + Thread.sleep(100);
228 sendTelemetry(device, Arrays.asList(dataPoint4)); 231 sendTelemetry(device, Arrays.asList(dataPoint4));
229 msg = wsClient.waitForUpdate(); 232 msg = wsClient.waitForUpdate();
230 233
@@ -26,9 +26,9 @@ import java.util.Arrays; @@ -26,9 +26,9 @@ import java.util.Arrays;
26 26
27 @RunWith(ClasspathSuite.class) 27 @RunWith(ClasspathSuite.class)
28 @ClasspathSuite.ClassnameFilters({ 28 @ClasspathSuite.ClassnameFilters({
29 - "org.thingsboard.server.controller.sql.WebsocketApiSqlTest", 29 +// "org.thingsboard.server.controller.sql.WebsocketApiSqlTest",
30 // "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest", 30 // "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest",
31 -// "org.thingsboard.server.controller.sql.*Test", 31 + "org.thingsboard.server.controller.sql.*Test",
32 }) 32 })
33 public class ControllerSqlTestSuite { 33 public class ControllerSqlTestSuite {
34 34
@@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
7 </encoder> 7 </encoder>
8 </appender> 8 </appender>
9 9
10 - <logger name="org.thingsboard.server.service.subscription" level="TRACE"/> 10 +<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
11 <logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/> 11 <logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
12 <logger name="org.thingsboard.server" level="WARN"/> 12 <logger name="org.thingsboard.server" level="WARN"/>
13 <logger name="org.springframework" level="WARN"/> 13 <logger name="org.springframework" level="WARN"/>