Commit 291634c10fb62fe00843fbfeaff0c6e216bf7ad8

Authored by Andrew Shvayka
1 parent 95865693

Fix for notification to the devices when shared attribute is saved via rule engine

@@ -24,12 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -24,12 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.context.annotation.Lazy; 24 import org.springframework.context.annotation.Lazy;
25 import org.springframework.stereotype.Service; 25 import org.springframework.stereotype.Service;
26 import org.springframework.util.StringUtils; 26 import org.springframework.util.StringUtils;
  27 +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
27 import org.thingsboard.rule.engine.api.util.DonAsynchron; 28 import org.thingsboard.rule.engine.api.util.DonAsynchron;
  29 +import org.thingsboard.server.actors.service.ActorService;
28 import org.thingsboard.server.common.data.DataConstants; 30 import org.thingsboard.server.common.data.DataConstants;
29 import org.thingsboard.server.common.data.EntityType; 31 import org.thingsboard.server.common.data.EntityType;
30 import org.thingsboard.server.common.data.id.DeviceId; 32 import org.thingsboard.server.common.data.id.DeviceId;
31 import org.thingsboard.server.common.data.id.EntityId; 33 import org.thingsboard.server.common.data.id.EntityId;
32 import org.thingsboard.server.common.data.id.EntityIdFactory; 34 import org.thingsboard.server.common.data.id.EntityIdFactory;
  35 +import org.thingsboard.server.common.data.id.TenantId;
33 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 36 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
34 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; 37 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
35 import org.thingsboard.server.common.data.kv.BaseTsKvQuery; 38 import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
@@ -42,6 +45,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -42,6 +45,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
42 import org.thingsboard.server.common.data.kv.StringDataEntry; 45 import org.thingsboard.server.common.data.kv.StringDataEntry;
43 import org.thingsboard.server.common.data.kv.TsKvEntry; 46 import org.thingsboard.server.common.data.kv.TsKvEntry;
44 import org.thingsboard.server.common.data.kv.TsKvQuery; 47 import org.thingsboard.server.common.data.kv.TsKvQuery;
  48 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
45 import org.thingsboard.server.common.msg.cluster.ServerAddress; 49 import org.thingsboard.server.common.msg.cluster.ServerAddress;
46 import org.thingsboard.server.dao.attributes.AttributesService; 50 import org.thingsboard.server.dao.attributes.AttributesService;
47 import org.thingsboard.server.dao.timeseries.TimeseriesService; 51 import org.thingsboard.server.dao.timeseries.TimeseriesService;
@@ -101,6 +105,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -101,6 +105,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
101 @Lazy 105 @Lazy
102 private DeviceStateService stateService; 106 private DeviceStateService stateService;
103 107
  108 + @Autowired
  109 + @Lazy
  110 + private ActorService actorService;
  111 +
104 private ExecutorService tsCallBackExecutor; 112 private ExecutorService tsCallBackExecutor;
105 private ExecutorService wsCallBackExecutor; 113 private ExecutorService wsCallBackExecutor;
106 114
@@ -204,6 +212,13 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -204,6 +212,13 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
204 } 212 }
205 213
206 @Override 214 @Override
  215 + public void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes) {
  216 + DeviceAttributesEventNotificationMsg notificationMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
  217 + deviceId, DataConstants.SHARED_SCOPE, new ArrayList<>(attributes));
  218 + actorService.onMsg(new SendToClusterMsg(deviceId, notificationMsg));
  219 + }
  220 +
  221 + @Override
207 public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) { 222 public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) {
208 ClusterAPIProtos.SubscriptionProto proto; 223 ClusterAPIProtos.SubscriptionProto proto;
209 try { 224 try {
@@ -16,11 +16,14 @@ @@ -16,11 +16,14 @@
16 package org.thingsboard.rule.engine.api; 16 package org.thingsboard.rule.engine.api;
17 17
18 import com.google.common.util.concurrent.FutureCallback; 18 import com.google.common.util.concurrent.FutureCallback;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
19 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
20 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 22 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
21 import org.thingsboard.server.common.data.kv.TsKvEntry; 23 import org.thingsboard.server.common.data.kv.TsKvEntry;
22 24
23 import java.util.List; 25 import java.util.List;
  26 +import java.util.Set;
24 27
25 /** 28 /**
26 * Created by ashvayka on 02.04.18. 29 * Created by ashvayka on 02.04.18.
@@ -41,4 +44,6 @@ public interface RuleEngineTelemetryService { @@ -41,4 +44,6 @@ public interface RuleEngineTelemetryService {
41 44
42 void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback); 45 void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
43 46
  47 + void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes);
  48 +
44 } 49 }
@@ -17,12 +17,15 @@ package org.thingsboard.rule.engine.telemetry; @@ -17,12 +17,15 @@ package org.thingsboard.rule.engine.telemetry;
17 17
18 import com.google.gson.JsonParser; 18 import com.google.gson.JsonParser;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 -import org.thingsboard.rule.engine.api.util.TbNodeUtils;  
21 import org.thingsboard.rule.engine.api.RuleNode; 20 import org.thingsboard.rule.engine.api.RuleNode;
22 import org.thingsboard.rule.engine.api.TbContext; 21 import org.thingsboard.rule.engine.api.TbContext;
23 import org.thingsboard.rule.engine.api.TbNode; 22 import org.thingsboard.rule.engine.api.TbNode;
24 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 23 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
25 import org.thingsboard.rule.engine.api.TbNodeException; 24 import org.thingsboard.rule.engine.api.TbNodeException;
  25 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  26 +import org.thingsboard.server.common.data.DataConstants;
  27 +import org.thingsboard.server.common.data.EntityType;
  28 +import org.thingsboard.server.common.data.id.DeviceId;
26 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 29 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
27 import org.thingsboard.server.common.data.plugin.ComponentType; 30 import org.thingsboard.server.common.data.plugin.ComponentType;
28 import org.thingsboard.server.common.msg.TbMsg; 31 import org.thingsboard.server.common.msg.TbMsg;
@@ -62,6 +65,9 @@ public class TbMsgAttributesNode implements TbNode { @@ -62,6 +65,9 @@ public class TbMsgAttributesNode implements TbNode {
62 String src = msg.getData(); 65 String src = msg.getData();
63 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)).getAttributes(); 66 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)).getAttributes();
64 ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); 67 ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
  68 + if (msg.getOriginator().getEntityType() == EntityType.DEVICE && DataConstants.SHARED_SCOPE.equals(config.getScope())) {
  69 + ctx.getTelemetryService().onSharedAttributesUpdate(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId()), attributes);
  70 + }
65 } 71 }
66 72
67 @Override 73 @Override