Commit 030d80d05a27345db4eef03f3ff51dea45c56d4a

Authored by Viacheslav Klimov
Committed by Andrew Shvayka
1 parent 0ff4e6ac

CustomerId to TbMsg and SessionInfo

Showing 17 changed files with 68 additions and 44 deletions
@@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.EntityType; @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.EntityType;
41 import org.thingsboard.server.common.data.TenantProfile; 41 import org.thingsboard.server.common.data.TenantProfile;
42 import org.thingsboard.server.common.data.alarm.Alarm; 42 import org.thingsboard.server.common.data.alarm.Alarm;
43 import org.thingsboard.server.common.data.asset.Asset; 43 import org.thingsboard.server.common.data.asset.Asset;
  44 +import org.thingsboard.server.common.data.id.CustomerId;
44 import org.thingsboard.server.common.data.id.DeviceId; 45 import org.thingsboard.server.common.data.id.DeviceId;
45 import org.thingsboard.server.common.data.id.EdgeId; 46 import org.thingsboard.server.common.data.id.EdgeId;
46 import org.thingsboard.server.common.data.id.EntityId; 47 import org.thingsboard.server.common.data.id.EntityId;
@@ -269,7 +270,12 @@ class DefaultTbContext implements TbContext { @@ -269,7 +270,12 @@ class DefaultTbContext implements TbContext {
269 270
270 @Override 271 @Override
271 public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { 272 public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
272 - return TbMsg.newMsg(queueName, type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); 273 + return newMsg(queueName, type, originator, null, metaData, data);
  274 + }
  275 +
  276 + @Override
  277 + public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
  278 + return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
273 } 279 }
274 280
275 @Override 281 @Override
@@ -919,7 +919,7 @@ public abstract class BaseController { @@ -919,7 +919,7 @@ public abstract class BaseController {
919 entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo)); 919 entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo));
920 } 920 }
921 } 921 }
922 - TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); 922 + TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, customerId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
923 TenantId tenantId = user.getTenantId(); 923 TenantId tenantId = user.getTenantId();
924 if (tenantId.isNullUid()) { 924 if (tenantId.isNullUid()) {
925 if (entity instanceof HasTenantId) { 925 if (entity instanceof HasTenantId) {
@@ -230,7 +230,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { @@ -230,7 +230,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
230 private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) { 230 private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) {
231 try { 231 try {
232 JsonNode entityNode = JacksonUtil.valueToTree(request); 232 JsonNode entityNode = JacksonUtil.valueToTree(request);
233 - TbMsg msg = TbMsg.newMsg(type, device.getId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode)); 233 + TbMsg msg = TbMsg.newMsg(type, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode));
234 sendToRuleEngine(device.getTenantId(), msg, null); 234 sendToRuleEngine(device.getTenantId(), msg, null);
235 } catch (IllegalArgumentException e) { 235 } catch (IllegalArgumentException e) {
236 log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), type, e); 236 log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), type, e);
@@ -240,7 +240,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { @@ -240,7 +240,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
240 private void pushDeviceCreatedEventToRuleEngine(Device device) { 240 private void pushDeviceCreatedEventToRuleEngine(Device device) {
241 try { 241 try {
242 ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); 242 ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device);
243 - TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); 243 + TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
244 sendToRuleEngine(device.getTenantId(), msg, null); 244 sendToRuleEngine(device.getTenantId(), msg, null);
245 } catch (JsonProcessingException | IllegalArgumentException e) { 245 } catch (JsonProcessingException | IllegalArgumentException e) {
246 log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); 246 log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e);
@@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.DeviceProfile;
22 import org.thingsboard.server.common.data.TbResource; 22 import org.thingsboard.server.common.data.TbResource;
23 import org.thingsboard.server.common.data.Tenant; 23 import org.thingsboard.server.common.data.Tenant;
24 import org.thingsboard.server.common.data.TenantProfile; 24 import org.thingsboard.server.common.data.TenantProfile;
25 -import org.thingsboard.server.common.data.id.CustomerId;  
26 import org.thingsboard.server.common.data.id.EdgeId; 25 import org.thingsboard.server.common.data.id.EdgeId;
27 import org.thingsboard.server.common.data.id.EntityId; 26 import org.thingsboard.server.common.data.id.EntityId;
28 import org.thingsboard.server.common.data.id.TenantId; 27 import org.thingsboard.server.common.data.id.TenantId;
@@ -412,8 +412,8 @@ public class DefaultTransportApiService implements TransportApiService { @@ -412,8 +412,8 @@ public class DefaultTransportApiService implements TransportApiService {
412 return DeviceInfoProto.newBuilder() 412 return DeviceInfoProto.newBuilder()
413 .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) 413 .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
414 .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) 414 .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
415 - .setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits())  
416 - .setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()) 415 + .setCustomerIdMSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getMostSignificantBits()).orElse(0L))
  416 + .setCustomerIdLSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getLeastSignificantBits()).orElse(0L))
417 .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) 417 .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
418 .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) 418 .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
419 .setDeviceName(device.getName()) 419 .setDeviceName(device.getName())
@@ -19,10 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; @@ -19,10 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
19 import com.google.protobuf.ByteString; 19 import com.google.protobuf.ByteString;
20 import com.google.protobuf.InvalidProtocolBufferException; 20 import com.google.protobuf.InvalidProtocolBufferException;
21 import lombok.AccessLevel; 21 import lombok.AccessLevel;
22 -import lombok.Builder;  
23 import lombok.Data; 22 import lombok.Data;
24 import lombok.Getter; 23 import lombok.Getter;
25 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
  25 +import org.thingsboard.server.common.data.EntityType;
26 import org.thingsboard.server.common.data.id.CustomerId; 26 import org.thingsboard.server.common.data.id.CustomerId;
27 import org.thingsboard.server.common.data.id.EntityId; 27 import org.thingsboard.server.common.data.id.EntityId;
28 import org.thingsboard.server.common.data.id.EntityIdFactory; 28 import org.thingsboard.server.common.data.id.EntityIdFactory;
@@ -67,8 +67,7 @@ public final class TbMsg implements Serializable { @@ -67,8 +67,7 @@ public final class TbMsg implements Serializable {
67 transient private final TbMsgCallback callback; 67 transient private final TbMsgCallback callback;
68 68
69 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 69 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
70 - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,  
71 - metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); 70 + return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId);
72 } 71 }
73 72
74 public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 73 public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
@@ -77,13 +76,21 @@ public final class TbMsg implements Serializable { @@ -77,13 +76,21 @@ public final class TbMsg implements Serializable {
77 } 76 }
78 77
79 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { 78 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
80 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); 79 + return newMsg(type, originator, null, metaData, data);
  80 + }
  81 +
  82 + public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
  83 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
81 } 84 }
82 85
83 // REALLY NEW MSG 86 // REALLY NEW MSG
84 87
85 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { 88 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
86 - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); 89 + return newMsg(queueName, type, originator, null, metaData, data);
  90 + }
  91 +
  92 + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
  93 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
87 } 94 }
88 95
89 public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { 96 public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
@@ -91,7 +98,7 @@ public final class TbMsg implements Serializable { @@ -91,7 +98,7 @@ public final class TbMsg implements Serializable {
91 } 98 }
92 99
93 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { 100 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
94 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); 101 + return newMsg(type, originator, null, metaData, dataType, data);
95 } 102 }
96 103
97 // For Tests only 104 // For Tests only
@@ -145,7 +152,15 @@ public final class TbMsg implements Serializable { @@ -145,7 +152,15 @@ public final class TbMsg implements Serializable {
145 } 152 }
146 this.type = type; 153 this.type = type;
147 this.originator = originator; 154 this.originator = originator;
148 - this.customerId = (customerId == null || customerId.isNullUid()) ? null : customerId; 155 + if (customerId == null || customerId.isNullUid()) {
  156 + if (originator.getEntityType() == EntityType.CUSTOMER) {
  157 + this.customerId = (CustomerId) originator;
  158 + } else {
  159 + this.customerId = null;
  160 + }
  161 + } else {
  162 + this.customerId = customerId;
  163 + }
149 this.metaData = metaData; 164 this.metaData = metaData;
150 this.dataType = dataType; 165 this.dataType = dataType;
151 this.data = data; 166 this.data = data;
@@ -141,6 +141,8 @@ public class LwM2mTransportContextServer extends TransportContext { @@ -141,6 +141,8 @@ public class LwM2mTransportContextServer extends TransportContext {
141 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) 141 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
142 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) 142 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
143 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) 143 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
  144 + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB())
  145 + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB())
144 .setDeviceName(msg.getDeviceInfo().getDeviceName()) 146 .setDeviceName(msg.getDeviceInfo().getDeviceName())
145 .setDeviceType(msg.getDeviceInfo().getDeviceType()) 147 .setDeviceType(msg.getDeviceInfo().getDeviceType())
146 .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) 148 .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB())
@@ -1177,6 +1177,8 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { @@ -1177,6 +1177,8 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
1177 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) 1177 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
1178 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) 1178 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
1179 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) 1179 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
  1180 + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB())
  1181 + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB())
1180 .setDeviceName(msg.getDeviceInfo().getDeviceName()) 1182 .setDeviceName(msg.getDeviceInfo().getDeviceName())
1181 .setDeviceType(msg.getDeviceInfo().getDeviceType()) 1183 .setDeviceType(msg.getDeviceInfo().getDeviceType())
1182 .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) 1184 .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB())
@@ -45,6 +45,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -45,6 +45,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
45 .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits()) 45 .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
46 .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits()) 46 .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
47 .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits()) 47 .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
  48 + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
  49 + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
48 .setDeviceName(deviceInfo.getDeviceName()) 50 .setDeviceName(deviceInfo.getDeviceName())
49 .setDeviceType(deviceInfo.getDeviceType()) 51 .setDeviceType(deviceInfo.getDeviceType())
50 .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) 52 .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
@@ -16,16 +16,15 @@ @@ -16,16 +16,15 @@
16 package org.thingsboard.rule.engine.api; 16 package org.thingsboard.rule.engine.api;
17 17
18 import io.netty.channel.EventLoopGroup; 18 import io.netty.channel.EventLoopGroup;
19 -import org.springframework.data.redis.core.RedisTemplate;  
20 import org.thingsboard.common.util.ListeningExecutor; 19 import org.thingsboard.common.util.ListeningExecutor;
21 import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; 20 import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
22 -import org.thingsboard.server.common.data.ApiUsageRecordKey;  
23 import org.thingsboard.server.common.data.Customer; 21 import org.thingsboard.server.common.data.Customer;
24 import org.thingsboard.server.common.data.Device; 22 import org.thingsboard.server.common.data.Device;
25 import org.thingsboard.server.common.data.DeviceProfile; 23 import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.TenantProfile; 24 import org.thingsboard.server.common.data.TenantProfile;
27 import org.thingsboard.server.common.data.alarm.Alarm; 25 import org.thingsboard.server.common.data.alarm.Alarm;
28 import org.thingsboard.server.common.data.asset.Asset; 26 import org.thingsboard.server.common.data.asset.Asset;
  27 +import org.thingsboard.server.common.data.id.CustomerId;
29 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
30 import org.thingsboard.server.common.data.id.EdgeId; 29 import org.thingsboard.server.common.data.id.EdgeId;
31 import org.thingsboard.server.common.data.id.EntityId; 30 import org.thingsboard.server.common.data.id.EntityId;
@@ -144,6 +143,8 @@ public interface TbContext { @@ -144,6 +143,8 @@ public interface TbContext {
144 143
145 TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data); 144 TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data);
146 145
  146 + TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data);
  147 +
147 TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data); 148 TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
148 149
149 TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId); 150 TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId);
@@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { @@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
136 } 136 }
137 137
138 private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) { 138 private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
139 - ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS); 139 + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS);
140 } 140 }
141 141
142 private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) { 142 private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
@@ -63,7 +63,7 @@ public class TbMsgCountNode implements TbNode { @@ -63,7 +63,7 @@ public class TbMsgCountNode implements TbNode {
63 TbMsgCountNodeConfiguration config = TbNodeUtils.convert(configuration, TbMsgCountNodeConfiguration.class); 63 TbMsgCountNodeConfiguration config = TbNodeUtils.convert(configuration, TbMsgCountNodeConfiguration.class);
64 this.delay = TimeUnit.SECONDS.toMillis(config.getInterval()); 64 this.delay = TimeUnit.SECONDS.toMillis(config.getInterval());
65 this.telemetryPrefix = config.getTelemetryPrefix(); 65 this.telemetryPrefix = config.getTelemetryPrefix();
66 - scheduleTickMsg(ctx); 66 + scheduleTickMsg(ctx, null);
67 67
68 } 68 }
69 69
@@ -78,23 +78,23 @@ public class TbMsgCountNode implements TbNode { @@ -78,23 +78,23 @@ public class TbMsgCountNode implements TbNode {
78 TbMsgMetaData metaData = new TbMsgMetaData(); 78 TbMsgMetaData metaData = new TbMsgMetaData();
79 metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay)); 79 metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
80 80
81 - TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson)); 81 + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson));
82 ctx.enqueueForTellNext(tbMsg, SUCCESS); 82 ctx.enqueueForTellNext(tbMsg, SUCCESS);
83 - scheduleTickMsg(ctx); 83 + scheduleTickMsg(ctx, tbMsg);
84 } else { 84 } else {
85 messagesProcessed.incrementAndGet(); 85 messagesProcessed.incrementAndGet();
86 ctx.ack(msg); 86 ctx.ack(msg);
87 } 87 }
88 } 88 }
89 89
90 - private void scheduleTickMsg(TbContext ctx) { 90 + private void scheduleTickMsg(TbContext ctx, TbMsg msg) {
91 long curTs = System.currentTimeMillis(); 91 long curTs = System.currentTimeMillis();
92 if (lastScheduledTs == 0L) { 92 if (lastScheduledTs == 0L) {
93 lastScheduledTs = curTs; 93 lastScheduledTs = curTs;
94 } 94 }
95 lastScheduledTs = lastScheduledTs + delay; 95 lastScheduledTs = lastScheduledTs + delay;
96 long curDelay = Math.max(0L, (lastScheduledTs - curTs)); 96 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
97 - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); 97 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), "");
98 nextTickId = tickMsg.getId(); 98 nextTickId = tickMsg.getId();
99 ctx.tellSelf(tickMsg, curDelay); 99 ctx.tellSelf(tickMsg, curDelay);
100 } 100 }
@@ -100,7 +100,7 @@ public class TbMsgGeneratorNode implements TbNode { @@ -100,7 +100,7 @@ public class TbMsgGeneratorNode implements TbNode {
100 @Override 100 @Override
101 public void onMsg(TbContext ctx, TbMsg msg) { 101 public void onMsg(TbContext ctx, TbMsg msg) {
102 if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { 102 if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
103 - withCallback(generate(ctx), 103 + withCallback(generate(ctx, msg),
104 m -> { 104 m -> {
105 if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { 105 if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
106 ctx.enqueueForTellNext(m, SUCCESS); 106 ctx.enqueueForTellNext(m, SUCCESS);
@@ -130,16 +130,16 @@ public class TbMsgGeneratorNode implements TbNode { @@ -130,16 +130,16 @@ public class TbMsgGeneratorNode implements TbNode {
130 ctx.tellSelf(tickMsg, curDelay); 130 ctx.tellSelf(tickMsg, curDelay);
131 } 131 }
132 132
133 - private ListenableFuture<TbMsg> generate(TbContext ctx) { 133 + private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) {
134 return ctx.getJsExecutor().executeAsync(() -> { 134 return ctx.getJsExecutor().executeAsync(() -> {
135 if (prevMsg == null) { 135 if (prevMsg == null) {
136 - prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, new TbMsgMetaData(), "{}"); 136 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}");
137 } 137 }
138 if (initialized) { 138 if (initialized) {
139 ctx.logJsEvalRequest(); 139 ctx.logJsEvalRequest();
140 TbMsg generated = jsEngine.executeGenerate(prevMsg); 140 TbMsg generated = jsEngine.executeGenerate(prevMsg);
141 ctx.logJsEvalResponse(); 141 ctx.logJsEvalResponse();
142 - prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, generated.getMetaData(), generated.getData()); 142 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
143 } 143 }
144 return prevMsg; 144 return prevMsg;
145 }); 145 });
@@ -70,7 +70,7 @@ public class TbMsgDelayNode implements TbNode { @@ -70,7 +70,7 @@ public class TbMsgDelayNode implements TbNode {
70 } else { 70 } else {
71 if (pendingMsgs.size() < config.getMaxPendingMsgs()) { 71 if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
72 pendingMsgs.put(msg.getId(), msg); 72 pendingMsgs.put(msg.getId(), msg);
73 - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString()); 73 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), msg.getCustomerId(), new TbMsgMetaData(), msg.getId().toString());
74 ctx.tellSelf(tickMsg, getDelay(msg)); 74 ctx.tellSelf(tickMsg, getDelay(msg));
75 ctx.ack(msg); 75 ctx.ack(msg);
76 } else { 76 } else {
@@ -74,15 +74,15 @@ class AlarmState { @@ -74,15 +74,15 @@ class AlarmState {
74 lastMsgMetaData = msg.getMetaData(); 74 lastMsgMetaData = msg.getMetaData();
75 lastMsgQueueName = msg.getQueueName(); 75 lastMsgQueueName = msg.getQueueName();
76 this.dataSnapshot = data; 76 this.dataSnapshot = data;
77 - return createOrClearAlarms(ctx, data, update, AlarmRuleState::eval); 77 + return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval);
78 } 78 }
79 79
80 public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { 80 public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
81 initCurrentAlarm(ctx); 81 initCurrentAlarm(ctx);
82 - return createOrClearAlarms(ctx, ts, null, AlarmRuleState::eval); 82 + return createOrClearAlarms(ctx, null, ts, null, AlarmRuleState::eval);
83 } 83 }
84 84
85 - public <T> boolean createOrClearAlarms(TbContext ctx, T data, SnapshotUpdate update, BiFunction<AlarmRuleState, T, AlarmEvalResult> evalFunction) { 85 + public <T> boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction<AlarmRuleState, T, AlarmEvalResult> evalFunction) {
86 boolean stateUpdate = false; 86 boolean stateUpdate = false;
87 AlarmRuleState resultState = null; 87 AlarmRuleState resultState = null;
88 log.debug("[{}] processing update: {}", alarmDefinition.getId(), data); 88 log.debug("[{}] processing update: {}", alarmDefinition.getId(), data);
@@ -103,7 +103,7 @@ class AlarmState { @@ -103,7 +103,7 @@ class AlarmState {
103 if (resultState != null) { 103 if (resultState != null) {
104 TbAlarmResult result = calculateAlarmResult(ctx, resultState); 104 TbAlarmResult result = calculateAlarmResult(ctx, resultState);
105 if (result != null) { 105 if (result != null) {
106 - pushMsg(ctx, result, resultState); 106 + pushMsg(ctx, msg, result, resultState);
107 } 107 }
108 stateUpdate = clearAlarmState(stateUpdate, clearState); 108 stateUpdate = clearAlarmState(stateUpdate, clearState);
109 } else if (currentAlarm != null && clearState != null) { 109 } else if (currentAlarm != null && clearState != null) {
@@ -122,7 +122,7 @@ class AlarmState { @@ -122,7 +122,7 @@ class AlarmState {
122 ); 122 );
123 DonAsynchron.withCallback(alarmClearOperationResult, 123 DonAsynchron.withCallback(alarmClearOperationResult,
124 result -> { 124 result -> {
125 - pushMsg(ctx, new TbAlarmResult(false, false, true, result.getAlarm()), clearState); 125 + pushMsg(ctx, msg, new TbAlarmResult(false, false, true, result.getAlarm()), clearState);
126 }, 126 },
127 throwable -> { 127 throwable -> {
128 throw new RuntimeException(throwable); 128 throw new RuntimeException(throwable);
@@ -165,7 +165,7 @@ class AlarmState { @@ -165,7 +165,7 @@ class AlarmState {
165 } 165 }
166 } 166 }
167 167
168 - public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, AlarmRuleState ruleState) { 168 + public void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, AlarmRuleState ruleState) {
169 JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm()); 169 JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm());
170 String data = jsonNodes.toString(); 170 String data = jsonNodes.toString();
171 TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData(); 171 TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData();
@@ -185,7 +185,8 @@ class AlarmState { @@ -185,7 +185,8 @@ class AlarmState {
185 metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); 185 metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
186 } 186 }
187 setAlarmConditionMetadata(ruleState, metaData); 187 setAlarmConditionMetadata(ruleState, metaData);
188 - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", originator, metaData, data); 188 + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM",
  189 + originator, msg != null ? msg.getCustomerId() : null, metaData, data);
189 ctx.tellNext(newMsg, relationType); 190 ctx.tellNext(newMsg, relationType);
190 } 191 }
191 192
@@ -74,7 +74,7 @@ public class TbDeviceProfileNode implements TbNode { @@ -74,7 +74,7 @@ public class TbDeviceProfileNode implements TbNode {
74 this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class); 74 this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class);
75 this.cache = ctx.getDeviceProfileCache(); 75 this.cache = ctx.getDeviceProfileCache();
76 this.ctx = ctx; 76 this.ctx = ctx;
77 - scheduleAlarmHarvesting(ctx); 77 + scheduleAlarmHarvesting(ctx, null);
78 ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate); 78 ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate);
79 if (config.isFetchAlarmRulesStateOnStart()) { 79 if (config.isFetchAlarmRulesStateOnStart()) {
80 log.info("[{}] Fetching alarm rule state", ctx.getSelfId()); 80 log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
@@ -108,7 +108,7 @@ public class TbDeviceProfileNode implements TbNode { @@ -108,7 +108,7 @@ public class TbDeviceProfileNode implements TbNode {
108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
109 EntityType originatorType = msg.getOriginator().getEntityType(); 109 EntityType originatorType = msg.getOriginator().getEntityType();
110 if (msg.getType().equals(PERIODIC_MSG_TYPE)) { 110 if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
111 - scheduleAlarmHarvesting(ctx); 111 + scheduleAlarmHarvesting(ctx, msg);
112 harvestAlarms(ctx, System.currentTimeMillis()); 112 harvestAlarms(ctx, System.currentTimeMillis());
113 } else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) { 113 } else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) {
114 updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData()))); 114 updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData())));
@@ -168,8 +168,8 @@ public class TbDeviceProfileNode implements TbNode { @@ -168,8 +168,8 @@ public class TbDeviceProfileNode implements TbNode {
168 return deviceState; 168 return deviceState;
169 } 169 }
170 170
171 - protected void scheduleAlarmHarvesting(TbContext ctx) {  
172 - TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, "{}"); 171 + protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) {
  172 + TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}");
173 ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1)); 173 ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1));
174 } 174 }
175 175
@@ -16,10 +16,6 @@ @@ -16,10 +16,6 @@
16 package org.thingsboard.rule.engine.rpc; 16 package org.thingsboard.rule.engine.rpc;
17 17
18 import com.datastax.oss.driver.api.core.uuid.Uuids; 18 import com.datastax.oss.driver.api.core.uuid.Uuids;
19 -import com.fasterxml.jackson.databind.ObjectMapper;  
20 -import com.google.common.util.concurrent.FutureCallback;  
21 -import com.google.common.util.concurrent.Futures;  
22 -import com.google.common.util.concurrent.ListenableFuture;  
23 import com.google.gson.Gson; 19 import com.google.gson.Gson;
24 import com.google.gson.JsonElement; 20 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject; 21 import com.google.gson.JsonObject;
@@ -116,10 +112,10 @@ public class TbSendRPCRequestNode implements TbNode { @@ -116,10 +112,10 @@ public class TbSendRPCRequestNode implements TbNode {
116 112
117 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { 113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
118 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { 114 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
119 - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); 115 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
120 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); 116 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
121 } else { 117 } else {
122 - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); 118 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
123 ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); 119 ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
124 } 120 }
125 }); 121 });