Commit cc1887fc8dac658535b2c6c2858f210b7cc3e936

Authored by Andrii Shvaika
1 parent 9e27d453

Added WS notification on deleted attribute

... ... @@ -355,10 +355,9 @@ public class TelemetryController extends BaseController {
355 355 DataConstants.SHARED_SCOPE.equals(scope) ||
356 356 DataConstants.CLIENT_SCOPE.equals(scope)) {
357 357 return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> {
358   - ListenableFuture<List<Void>> future = attributesService.removeAll(user.getTenantId(), entityId, scope, keys);
359   - Futures.addCallback(future, new FutureCallback<List<Void>>() {
  358 + tsSubService.deleteAndNotify(tenantId, entityId, scope, keys, new FutureCallback<Void>() {
360 359 @Override
361   - public void onSuccess(@Nullable List<Void> tmp) {
  360 + public void onSuccess(@Nullable Void tmp) {
362 361 logAttributesDeleted(user, entityId, scope, keys, null);
363 362 if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
364 363 DeviceId deviceId = new DeviceId(entityId.getId());
... ... @@ -375,7 +374,7 @@ public class TelemetryController extends BaseController {
375 374 logAttributesDeleted(user, entityId, scope, keys, t);
376 375 result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
377 376 }
378   - }, executor);
  377 + });
379 378 });
380 379 } else {
381 380 return getImmediateDeferredResult("Invalid attribute scope: " + scope, HttpStatus.BAD_REQUEST);
... ...
... ... @@ -31,6 +31,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCRespons
31 31 import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
32 32 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
33 33 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
  34 +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
34 35 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
35 36 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
36 37 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
... ... @@ -54,6 +55,7 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
54 55
55 56 import javax.annotation.PostConstruct;
56 57 import javax.annotation.PreDestroy;
  58 +import java.util.ArrayList;
57 59 import java.util.List;
58 60 import java.util.Optional;
59 61 import java.util.UUID;
... ... @@ -259,6 +261,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
259 261 new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
260 262 TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
261 263 proto.getScope(), TbSubscriptionUtils.toAttributeKvList(proto.getDataList()), callback);
  264 + } else if (msg.hasAttrDelete()) {
  265 + TbAttributeDeleteProto proto = msg.getAttrDelete();
  266 + subscriptionManagerService.onAttributesDelete(
  267 + new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
  268 + TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
  269 + proto.getScope(), proto.getKeysList(), callback);
262 270 } else {
263 271 throwNotHandled(msg, callback);
264 272 }
... ...
... ... @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
32 32 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
33 33 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
34 34 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  35 +import org.thingsboard.server.common.data.kv.StringDataEntry;
35 36 import org.thingsboard.server.common.data.kv.TsKvEntry;
36 37 import org.thingsboard.server.common.msg.queue.ServiceType;
37 38 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -253,6 +254,32 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
253 254 callback.onSuccess();
254 255 }
255 256
  257 + @Override
  258 + public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
  259 + onLocalSubUpdate(entityId,
  260 + s -> {
  261 + if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
  262 + return (TbAttributeSubscription) s;
  263 + } else {
  264 + return null;
  265 + }
  266 + },
  267 + s -> (TbAttributeSubscriptionScope.ANY_SCOPE.equals(s.getScope()) || scope.equals(s.getScope().name())),
  268 + s -> {
  269 + List<TsKvEntry> subscriptionUpdate = null;
  270 + for (String key : keys) {
  271 + if (s.isAllKeys() || s.getKeyStates().containsKey(key)) {
  272 + if (subscriptionUpdate == null) {
  273 + subscriptionUpdate = new ArrayList<>();
  274 + }
  275 + subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, null)));
  276 + }
  277 + }
  278 + return subscriptionUpdate;
  279 + });
  280 + callback.onSuccess();
  281 + }
  282 +
256 283 private <T extends TbSubscription> void onLocalSubUpdate(EntityId entityId,
257 284 Function<TbSubscription, T> castFunction,
258 285 Predicate<T> filterFunction,
... ...
... ... @@ -35,4 +35,5 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
35 35
36 36 void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback);
37 37
  38 + void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
38 39 }
... ...
... ... @@ -218,12 +218,17 @@ public class TbEntityDataSubCtx {
218 218 latestCtxValues.forEach((k, v) -> {
219 219 TsValue update = latestUpdate.get(k);
220 220 if (update != null) {
221   - if (update.getTs() < v.getTs()) {
222   - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
223   - latestUpdate.remove(k);
224   - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
225   - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
226   - latestUpdate.remove(k);
  221 + //Ignore notifications about deleted keys
  222 + if (!(update.getTs() == 0 && (update.getValue() == null || update.getValue().isEmpty()))) {
  223 + if (update.getTs() < v.getTs()) {
  224 + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  225 + latestUpdate.remove(k);
  226 + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
  227 + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
  228 + latestUpdate.remove(k);
  229 + }
  230 + } else {
  231 + log.trace("[{}][{}][{}] Received deleted notification for: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k);
227 232 }
228 233 }
229 234 });
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
34 34 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
35 35 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto;
36 36 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
37 38 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
38 39 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto;
39 40 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto;
... ... @@ -182,6 +183,22 @@ public class TbSubscriptionUtils {
182 183 return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
183 184 }
184 185
  186 + public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
  187 + TbAttributeDeleteProto.Builder builder = TbAttributeDeleteProto.newBuilder();
  188 + builder.setEntityType(entityId.getEntityType().name());
  189 + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
  190 + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
  191 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  192 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  193 + builder.setScope(scope);
  194 + builder.addAllKeys(keys);
  195 +
  196 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  197 + msgBuilder.setAttrDelete(builder);
  198 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
  199 + }
  200 +
  201 +
185 202 private static TsKvProto.Builder toKeyValueProto(long ts, KvEntry attr) {
186 203 KeyValueProto.Builder dataBuilder = KeyValueProto.newBuilder();
187 204 dataBuilder.setKey(attr.getKey());
... ...
... ... @@ -134,6 +134,13 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
134 134 }
135 135
136 136 @Override
  137 + public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
  138 + ListenableFuture<List<Void>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
  139 + addMainCallback(deleteFuture, callback);
  140 + addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys));
  141 + }
  142 +
  143 + @Override
137 144 public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
138 145 saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
139 146 , System.currentTimeMillis())), callback);
... ... @@ -171,6 +178,20 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
171 178 }
172 179 }
173 180
  181 + private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
  182 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  183 + if (currentPartitions.contains(tpi)) {
  184 + if (subscriptionManagerService.isPresent()) {
  185 + subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, TbCallback.EMPTY);
  186 + } else {
  187 + log.warn("Possible misconfiguration because subscriptionManagerService is null!");
  188 + }
  189 + } else {
  190 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys);
  191 + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
  192 + }
  193 + }
  194 +
174 195 private void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
175 196 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
176 197 if (currentPartitions.contains(tpi)) {
... ...
... ... @@ -30,6 +30,8 @@
30 30 <!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
31 31 <!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
32 32
  33 +<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
  34 +<!-- <logger name="org.thingsboard.server.service.telemetry" level="TRACE"/>-->
33 35 <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
34 36
35 37 <root level="INFO">
... ...
... ... @@ -295,6 +295,16 @@ message TbAttributeUpdateProto {
295 295 repeated TsKvProto data = 7;
296 296 }
297 297
  298 +message TbAttributeDeleteProto {
  299 + string entityType = 1;
  300 + int64 entityIdMSB = 2;
  301 + int64 entityIdLSB = 3;
  302 + int64 tenantIdMSB = 4;
  303 + int64 tenantIdLSB = 5;
  304 + string scope = 6;
  305 + repeated string keys = 7;
  306 +}
  307 +
298 308 message TbTimeSeriesUpdateProto {
299 309 string entityType = 1;
300 310 int64 entityIdMSB = 2;
... ... @@ -340,6 +350,7 @@ message SubscriptionMgrMsgProto {
340 350 TbSubscriptionCloseProto subClose = 3;
341 351 TbTimeSeriesUpdateProto tsUpdate = 4;
342 352 TbAttributeUpdateProto attrUpdate = 5;
  353 + TbAttributeDeleteProto attrDelete = 6;
343 354 }
344 355
345 356 message LocalSubscriptionServiceMsgProto {
... ...
... ... @@ -398,7 +398,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
398 398 //TODO: fetch last level only.
399 399 //TODO: fetch distinct records.
400 400 String lvlFilter = getLvlFilter(entityFilter.getMaxLevel());
401   - String selectFields = "SELECT tenant_id, customer_id, id, type, name, label FROM " + entityType.name() + " WHERE id in ( SELECT entity_id";
  401 + String selectFields = "SELECT tenant_id, customer_id, id, created_time, type, name, label FROM " + entityType.name() + " WHERE id in ( SELECT entity_id";
402 402 String from = getQueryTemplate(entityFilter.getDirection());
403 403 String whereFilter = " WHERE re.relation_type = :where_relation_type AND re.to_type = :where_entity_type";
404 404
... ...
... ... @@ -24,7 +24,7 @@ import java.util.Arrays;
24 24
25 25 @RunWith(ClasspathSuite.class)
26 26 @ClassnameFilters({
27   - "org.thingsboard.server.dao.service.sql.*SqlTest"
  27 + "org.thingsboard.server.dao.service.sql.EntityServiceSqlTest"
28 28 })
29 29 public class SqlDaoServiceTestSuite {
30 30
... ...
... ... @@ -61,6 +61,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
61 61 import java.util.ArrayList;
62 62 import java.util.Arrays;
63 63 import java.util.Collections;
  64 +import java.util.Comparator;
64 65 import java.util.List;
65 66 import java.util.concurrent.ExecutionException;
66 67 import java.util.stream.Collectors;
... ... @@ -490,13 +491,16 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
490 491
491 492 List<EntityId> loadedIds = loadedEntities.stream().map(EntityData::getEntityId).collect(Collectors.toList());
492 493 List<EntityId> deviceIds = devices.stream().map(Device::getId).collect(Collectors.toList());
493   -
  494 + deviceIds.sort(Comparator.comparing(EntityId::getId));
  495 + loadedIds.sort(Comparator.comparing(EntityId::getId));
494 496 Assert.assertEquals(deviceIds, loadedIds);
495 497
496 498 List<String> loadedNames = loadedEntities.stream().map(entityData ->
497 499 entityData.getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue()).collect(Collectors.toList());
498 500 List<String> deviceNames = devices.stream().map(Device::getName).collect(Collectors.toList());
499 501
  502 + Collections.sort(loadedNames);
  503 + Collections.sort(deviceNames);
500 504 Assert.assertEquals(deviceNames, loadedNames);
501 505
502 506 sortOrder = new EntityDataSortOrder(
... ... @@ -560,8 +564,11 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
560 564 loadedEntities.addAll(data.getData());
561 565 }
562 566 Assert.assertEquals(67, loadedEntities.size());
563   - List<String> loadedTemperatures = loadedEntities.stream().map(entityData ->
564   - entityData.getLatest().get(EntityKeyType.ATTRIBUTE).get("temperature").getValue()).collect(Collectors.toList());
  567 + List<String> loadedTemperatures = new ArrayList<>();
  568 + for (Device device : devices) {
  569 + loadedTemperatures.add(loadedEntities.stream().filter(entityData -> entityData.getEntityId().equals(device.getId())).findFirst().orElse(null)
  570 + .getLatest().get(EntityKeyType.ATTRIBUTE).get("temperature").getValue());
  571 + }
565 572 List<String> deviceTemperatures = temperatures.stream().map(aLong -> Long.toString(aLong)).collect(Collectors.toList());
566 573 Assert.assertEquals(deviceTemperatures, loadedTemperatures);
567 574
... ...
... ... @@ -44,4 +44,7 @@ public interface RuleEngineTelemetryService {
44 44
45 45 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
46 46
  47 + void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
  48 +
  49 +
47 50 }
... ...