Commit e41d69b1a8512e2c555fbc1a13a9d2300c8a0916

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 6824e1c3

shared Attributes update improvements (If the update of the shared attributes is…

… originated by the user's REST API call - we push it to the device, if the update arrives from the device, we will not push it back to the device by default.)
@@ -216,6 +216,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -216,6 +216,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
216 216
217 @Override 217 @Override
218 public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) { 218 public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) {
  219 + onAttributesUpdate(tenantId, entityId, scope, attributes, callback, true);
  220 + }
  221 +
  222 + @Override
  223 + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback, boolean notifyDevice) {
219 onLocalSubUpdate(entityId, 224 onLocalSubUpdate(entityId,
220 s -> { 225 s -> {
221 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { 226 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
@@ -244,7 +249,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -244,7 +249,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
244 deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L)); 249 deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
245 } 250 }
246 } 251 }
247 - } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) { 252 + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
248 clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, 253 clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
249 new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) 254 new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
250 , null); 255 , null);
@@ -35,4 +35,6 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio @@ -35,4 +35,6 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
35 35
36 void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback); 36 void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback);
37 37
  38 + void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback, boolean notifyDevice);
  39 +
38 } 40 }
@@ -128,9 +128,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -128,9 +128,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
128 128
129 @Override 129 @Override
130 public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) { 130 public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
  131 + saveAndNotify(tenantId, entityId, scope, attributes, callback, true);
  132 + }
  133 +
  134 + @Override
  135 + public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback, boolean notifyDevice) {
131 ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes); 136 ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
132 addMainCallback(saveFuture, callback); 137 addMainCallback(saveFuture, callback);
133 - addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes)); 138 + addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
134 } 139 }
135 140
136 @Override 141 @Override
@@ -157,11 +162,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -157,11 +162,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
157 , System.currentTimeMillis())), callback); 162 , System.currentTimeMillis())), callback);
158 } 163 }
159 164
160 - private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) { 165 + private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice) {
161 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); 166 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
162 if (currentPartitions.contains(tpi)) { 167 if (currentPartitions.contains(tpi)) {
163 if (subscriptionManagerService.isPresent()) { 168 if (subscriptionManagerService.isPresent()) {
164 - subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY); 169 + subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY, notifyDevice);
165 } else { 170 } else {
166 log.warn("Possible misconfiguration because subscriptionManagerService is null!"); 171 log.warn("Possible misconfiguration because subscriptionManagerService is null!");
167 } 172 }
@@ -298,6 +298,7 @@ public class DefaultTransportService implements TransportService { @@ -298,6 +298,7 @@ public class DefaultTransportService implements TransportService {
298 TbMsgMetaData metaData = new TbMsgMetaData(); 298 TbMsgMetaData metaData = new TbMsgMetaData();
299 metaData.putValue("deviceName", sessionInfo.getDeviceName()); 299 metaData.putValue("deviceName", sessionInfo.getDeviceName());
300 metaData.putValue("deviceType", sessionInfo.getDeviceType()); 300 metaData.putValue("deviceType", sessionInfo.getDeviceType());
  301 + metaData.putValue("notifyDevice", "false");
301 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json)); 302 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json));
302 sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); 303 sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
303 } 304 }
@@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService { @@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService {
36 36
37 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); 37 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
38 38
  39 + void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback, boolean notifyDevice);
  40 +
39 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback); 41 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
40 42
41 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback); 43 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
@@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.telemetry; @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.telemetry;
17 17
18 import com.google.gson.JsonParser; 18 import com.google.gson.JsonParser;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.apache.commons.lang3.StringUtils;
20 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
21 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
22 import org.thingsboard.rule.engine.api.TbNode; 23 import org.thingsboard.rule.engine.api.TbNode;
@@ -63,7 +64,14 @@ public class TbMsgAttributesNode implements TbNode { @@ -63,7 +64,14 @@ public class TbMsgAttributesNode implements TbNode {
63 } 64 }
64 String src = msg.getData(); 65 String src = msg.getData();
65 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); 66 Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
66 - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); 67 + String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice");
  68 + ctx.getTelemetryService().saveAndNotify(
  69 + ctx.getTenantId(),
  70 + msg.getOriginator(),
  71 + config.getScope(),
  72 + new ArrayList<>(attributes),
  73 + new TelemetryNodeCallback(ctx, msg),
  74 + StringUtils.isEmpty(notifyDeviceStr) || !notifyDeviceStr.equals("false"));
67 } 75 }
68 76
69 @Override 77 @Override