Commit 0ff4e6ac0d86857c5f2492ace8bbdac9976ad31c

Authored by Andrii Shvaika
Committed by Andrew Shvayka
1 parent 3db33855

Improvements to the TbMsg to support CustomerId

@@ -926,7 +926,7 @@ public abstract class BaseController { @@ -926,7 +926,7 @@ public abstract class BaseController {
926 tenantId = ((HasTenantId) entity).getTenantId(); 926 tenantId = ((HasTenantId) entity).getTenantId();
927 } 927 }
928 } 928 }
929 - tbClusterService.pushMsgToRuleEngine(tenantId, customerId, entityId, tbMsg, null); 929 + tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, null);
930 } catch (Exception e) { 930 } catch (Exception e) {
931 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); 931 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
932 } 932 }
@@ -645,8 +645,8 @@ public class DeviceController extends BaseController { @@ -645,8 +645,8 @@ public class DeviceController extends BaseController {
645 private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { 645 private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
646 String data = entityToStr(assignedDevice); 646 String data = entityToStr(assignedDevice);
647 if (data != null) { 647 if (data != null) {
648 - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_ASSIGNED_FROM_TENANT, assignedDevice.getId(), getMetaDataForAssignedFrom(currentTenant), TbMsgDataType.JSON, data);  
649 - tbClusterService.pushMsgToRuleEngine(newTenantId, assignedDevice.getCustomerId(), assignedDevice.getId(), tbMsg, null); 648 + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_ASSIGNED_FROM_TENANT, assignedDevice.getId(), assignedDevice.getCustomerId(), getMetaDataForAssignedFrom(currentTenant), TbMsgDataType.JSON, data);
  649 + tbClusterService.pushMsgToRuleEngine(newTenantId, assignedDevice.getId(), tbMsg, null);
650 } 650 }
651 } 651 }
652 652
@@ -231,9 +231,9 @@ public class DeviceProcessor extends BaseProcessor { @@ -231,9 +231,9 @@ public class DeviceProcessor extends BaseProcessor {
231 try { 231 try {
232 DeviceId deviceId = device.getId(); 232 DeviceId deviceId = device.getId();
233 ObjectNode entityNode = mapper.valueToTree(device); 233 ObjectNode entityNode = mapper.valueToTree(device);
234 - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, 234 + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(),
235 getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); 235 getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
236 - tbClusterService.pushMsgToRuleEngine(tenantId, device.getCustomerId(), deviceId, tbMsg, new TbQueueCallback() { 236 + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
237 @Override 237 @Override
238 public void onSuccess(TbQueueMsgMetadata metadata) { 238 public void onSuccess(TbQueueMsgMetadata metadata) {
239 log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device); 239 log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device);
@@ -156,8 +156,8 @@ public class TelemetryProcessor extends BaseProcessor { @@ -156,8 +156,8 @@ public class TelemetryProcessor extends BaseProcessor {
156 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); 156 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
157 String queueName = defaultQueueAndRuleChain.getKey(); 157 String queueName = defaultQueueAndRuleChain.getKey();
158 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); 158 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
159 - TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);  
160 - tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { 159 + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
  160 + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
161 @Override 161 @Override
162 public void onSuccess(TbQueueMsgMetadata metadata) { 162 public void onSuccess(TbQueueMsgMetadata metadata) {
163 futureToSet.set(null); 163 futureToSet.set(null);
@@ -179,8 +179,8 @@ public class TelemetryProcessor extends BaseProcessor { @@ -179,8 +179,8 @@ public class TelemetryProcessor extends BaseProcessor {
179 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); 179 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
180 String queueName = defaultQueueAndRuleChain.getKey(); 180 String queueName = defaultQueueAndRuleChain.getKey();
181 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); 181 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
182 - TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null);  
183 - tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { 182 + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
  183 + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
184 @Override 184 @Override
185 public void onSuccess(TbQueueMsgMetadata metadata) { 185 public void onSuccess(TbQueueMsgMetadata metadata) {
186 futureToSet.set(null); 186 futureToSet.set(null);
@@ -206,8 +206,8 @@ public class TelemetryProcessor extends BaseProcessor { @@ -206,8 +206,8 @@ public class TelemetryProcessor extends BaseProcessor {
206 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); 206 Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
207 String queueName = defaultQueueAndRuleChain.getKey(); 207 String queueName = defaultQueueAndRuleChain.getKey();
208 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); 208 RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
209 - TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json), ruleChainId, null);  
210 - tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { 209 + TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
  210 + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
211 @Override 211 @Override
212 public void onSuccess(TbQueueMsgMetadata metadata) { 212 public void onSuccess(TbQueueMsgMetadata metadata) {
213 futureToSet.set(null); 213 futureToSet.set(null);
@@ -134,12 +134,7 @@ public class DefaultTbClusterService implements TbClusterService { @@ -134,12 +134,7 @@ public class DefaultTbClusterService implements TbClusterService {
134 } 134 }
135 135
136 @Override 136 @Override
137 - public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback) {  
138 - pushMsgToRuleEngine(tenantId, null, entityId, msg, callback);  
139 - }  
140 -  
141 - @Override  
142 - public void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { 137 + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
143 if (tenantId.isNullUid()) { 138 if (tenantId.isNullUid()) {
144 if (entityId.getEntityType().equals(EntityType.TENANT)) { 139 if (entityId.getEntityType().equals(EntityType.TENANT)) {
145 tenantId = new TenantId(entityId.getId()); 140 tenantId = new TenantId(entityId.getId());
@@ -154,7 +149,6 @@ public class DefaultTbClusterService implements TbClusterService { @@ -154,7 +149,6 @@ public class DefaultTbClusterService implements TbClusterService {
154 tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); 149 tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
155 } 150 }
156 } 151 }
157 - tbMsg.setCustomerId(customerId);  
158 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId); 152 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
159 log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); 153 log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
160 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() 154 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
@@ -51,8 +51,6 @@ public interface TbClusterService { @@ -51,8 +51,6 @@ public interface TbClusterService {
51 51
52 void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback); 52 void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback);
53 53
54 - void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg msg, TbQueueCallback callback);  
55 -  
56 void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); 54 void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback);
57 55
58 void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback); 56 void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);
@@ -168,8 +168,8 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -168,8 +168,8 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
168 entityNode.put("params", msg.getBody().getParams()); 168 entityNode.put("params", msg.getBody().getParams());
169 169
170 try { 170 try {
171 - TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));  
172 - clusterService.pushMsgToRuleEngine(msg.getTenantId(), currentUser.getCustomerId(), msg.getDeviceId(), tbMsg, null); 171 + TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), currentUser.getCustomerId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
  172 + clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null);
173 } catch (JsonProcessingException e) { 173 } catch (JsonProcessingException e) {
174 throw new RuntimeException(e); 174 throw new RuntimeException(e);
175 } 175 }
@@ -508,8 +508,8 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit @@ -508,8 +508,8 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
508 if(!persistToTelemetry){ 508 if(!persistToTelemetry){
509 md.putValue(DataConstants.SCOPE, SERVER_SCOPE); 509 md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
510 } 510 }
511 - TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), md, TbMsgDataType.JSON, data);  
512 - clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getCustomerId(), stateData.getDeviceId(), tbMsg, null); 511 + TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getCustomerId(), md, TbMsgDataType.JSON, data);
  512 + clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
513 } catch (Exception e) { 513 } catch (Exception e) {
514 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); 514 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
515 } 515 }
@@ -273,8 +273,8 @@ public class DefaultTransportApiService implements TransportApiService { @@ -273,8 +273,8 @@ public class DefaultTransportApiService implements TransportApiService {
273 273
274 DeviceId deviceId = device.getId(); 274 DeviceId deviceId = device.getId();
275 ObjectNode entityNode = mapper.valueToTree(device); 275 ObjectNode entityNode = mapper.valueToTree(device);
276 - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));  
277 - tbClusterService.pushMsgToRuleEngine(tenantId, customerId, deviceId, tbMsg, null); 276 + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode));
  277 + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
278 } 278 }
279 GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() 279 GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
280 .setDeviceInfo(getDeviceInfoProto(device)); 280 .setDeviceInfo(getDeviceInfoProto(device));
@@ -48,7 +48,7 @@ public final class TbMsg implements Serializable { @@ -48,7 +48,7 @@ public final class TbMsg implements Serializable {
48 private final long ts; 48 private final long ts;
49 private final String type; 49 private final String type;
50 private final EntityId originator; 50 private final EntityId originator;
51 - private CustomerId customerId; 51 + private final CustomerId customerId;
52 private final TbMsgMetaData metaData; 52 private final TbMsgMetaData metaData;
53 private final TbMsgDataType dataType; 53 private final TbMsgDataType dataType;
54 private final String data; 54 private final String data;
@@ -57,6 +57,7 @@ public final class TbMsg implements Serializable { @@ -57,6 +57,7 @@ public final class TbMsg implements Serializable {
57 @Getter(value = AccessLevel.NONE) 57 @Getter(value = AccessLevel.NONE)
58 private final AtomicInteger ruleNodeExecCounter; 58 private final AtomicInteger ruleNodeExecCounter;
59 59
  60 +
60 public int getAndIncrementRuleNodeCounter() { 61 public int getAndIncrementRuleNodeCounter() {
61 return ruleNodeExecCounter.getAndIncrement(); 62 return ruleNodeExecCounter.getAndIncrement();
62 } 63 }
@@ -66,60 +67,74 @@ public final class TbMsg implements Serializable { @@ -66,60 +67,74 @@ public final class TbMsg implements Serializable {
66 transient private final TbMsgCallback callback; 67 transient private final TbMsgCallback callback;
67 68
68 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 69 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
69 - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, 70 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null,
  71 + metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY);
  72 + }
  73 +
  74 + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  75 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
70 metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); 76 metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY);
71 } 77 }
72 78
73 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { 79 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
74 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); 80 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
75 } 81 }
76 82
77 // REALLY NEW MSG 83 // REALLY NEW MSG
78 84
79 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { 85 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
80 - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); 86 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
  87 + }
  88 +
  89 + public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
  90 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY);
81 } 91 }
82 92
83 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { 93 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
84 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); 94 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY);
85 } 95 }
86 96
87 // For Tests only 97 // For Tests only
88 98
89 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 99 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
90 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); 100 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY);
91 } 101 }
92 102
93 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { 103 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
94 - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback); 104 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback);
95 } 105 }
96 106
97 public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { 107 public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
98 - return new TbMsg(tbMsg.getQueueName(), tbMsg.getId(), tbMsg.getTs(), type, originator, metaData.copy(), tbMsg.getDataType(),  
99 - data, tbMsg.getRuleChainId(), tbMsg.getRuleNodeId(), tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); 108 + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
  109 + data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ruleNodeExecCounter.get(), tbMsg.callback);
  110 + }
  111 +
  112 + public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) {
  113 + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
  114 + tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
100 } 115 }
101 116
102 public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) { 117 public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) {
103 - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, 118 + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
104 tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); 119 tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
105 } 120 }
106 121
107 public static TbMsg transformMsg(TbMsg tbMsg, String queueName) { 122 public static TbMsg transformMsg(TbMsg tbMsg, String queueName) {
108 - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, 123 + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
109 tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); 124 tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
110 } 125 }
111 126
112 public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) { 127 public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) {
113 - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, 128 + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
114 tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); 129 tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
115 } 130 }
116 131
117 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 132 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
118 - return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), 133 + return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
119 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY); 134 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY);
120 } 135 }
121 136
122 - private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, 137 + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
123 RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) { 138 RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) {
124 this.id = id; 139 this.id = id;
125 this.queueName = queueName != null ? queueName : ServiceQueue.MAIN; 140 this.queueName = queueName != null ? queueName : ServiceQueue.MAIN;
@@ -130,6 +145,7 @@ public final class TbMsg implements Serializable { @@ -130,6 +145,7 @@ public final class TbMsg implements Serializable {
130 } 145 }
131 this.type = type; 146 this.type = type;
132 this.originator = originator; 147 this.originator = originator;
  148 + this.customerId = (customerId == null || customerId.isNullUid()) ? null : customerId;
133 this.metaData = metaData; 149 this.metaData = metaData;
134 this.dataType = dataType; 150 this.dataType = dataType;
135 this.data = data; 151 this.data = data;
@@ -156,6 +172,11 @@ public final class TbMsg implements Serializable { @@ -156,6 +172,11 @@ public final class TbMsg implements Serializable {
156 builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits()); 172 builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits());
157 builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits()); 173 builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits());
158 174
  175 + if (msg.getCustomerId() != null) {
  176 + builder.setCustomerIdMSB(msg.getCustomerId().getId().getMostSignificantBits());
  177 + builder.setCustomerIdLSB(msg.getCustomerId().getId().getLeastSignificantBits());
  178 + }
  179 +
159 if (msg.getRuleChainId() != null) { 180 if (msg.getRuleChainId() != null) {
160 builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits()); 181 builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits());
161 builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits()); 182 builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits());
@@ -181,16 +202,21 @@ public final class TbMsg implements Serializable { @@ -181,16 +202,21 @@ public final class TbMsg implements Serializable {
181 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); 202 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
182 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); 203 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
183 EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); 204 EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
  205 + CustomerId customerId = null;
184 RuleChainId ruleChainId = null; 206 RuleChainId ruleChainId = null;
185 RuleNodeId ruleNodeId = null; 207 RuleNodeId ruleNodeId = null;
  208 + if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) {
  209 + customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB()));
  210 + }
186 if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { 211 if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) {
187 ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); 212 ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
188 } 213 }
189 if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { 214 if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) {
190 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); 215 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
191 } 216 }
  217 +
192 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; 218 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
193 - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback); 219 + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback);
194 } catch (InvalidProtocolBufferException e) { 220 } catch (InvalidProtocolBufferException e) {
195 throw new IllegalStateException("Could not parse protobuf for TbMsg", e); 221 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
196 } 222 }
@@ -201,11 +227,11 @@ public final class TbMsg implements Serializable { @@ -201,11 +227,11 @@ public final class TbMsg implements Serializable {
201 } 227 }
202 228
203 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) { 229 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) {
204 - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback); 230 + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback);
205 } 231 }
206 232
207 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) { 233 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) {
208 - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback); 234 + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback);
209 } 235 }
210 236
211 public TbMsgCallback getCallback() { 237 public TbMsgCallback getCallback() {
@@ -46,4 +46,7 @@ message TbMsgProto { @@ -46,4 +46,7 @@ message TbMsgProto {
46 46
47 int64 ts = 15; 47 int64 ts = 15;
48 int32 ruleNodeExecCounter = 16; 48 int32 ruleNodeExecCounter = 16;
  49 +
  50 + int64 customerIdMSB = 17;
  51 + int64 customerIdLSB = 18;
49 } 52 }
@@ -409,15 +409,15 @@ public class DefaultTransportService implements TransportService { @@ -409,15 +409,15 @@ public class DefaultTransportService implements TransportService {
409 reportActivityInternal(sessionInfo); 409 reportActivityInternal(sessionInfo);
410 TenantId tenantId = getTenantId(sessionInfo); 410 TenantId tenantId = getTenantId(sessionInfo);
411 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 411 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
412 - MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), dataPoints, callback)); 412 + CustomerId customerId = getCustomerId(sessionInfo);
  413 + MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, customerId, dataPoints, callback));
413 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { 414 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
414 TbMsgMetaData metaData = new TbMsgMetaData(); 415 TbMsgMetaData metaData = new TbMsgMetaData();
415 metaData.putValue("deviceName", sessionInfo.getDeviceName()); 416 metaData.putValue("deviceName", sessionInfo.getDeviceName());
416 metaData.putValue("deviceType", sessionInfo.getDeviceType()); 417 metaData.putValue("deviceType", sessionInfo.getDeviceType());
417 metaData.putValue("ts", tsKv.getTs() + ""); 418 metaData.putValue("ts", tsKv.getTs() + "");
418 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); 419 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
419 - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback);  
420 - 420 + sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback);
421 } 421 }
422 } 422 }
423 } 423 }
@@ -433,8 +433,9 @@ public class DefaultTransportService implements TransportService { @@ -433,8 +433,9 @@ public class DefaultTransportService implements TransportService {
433 metaData.putValue("deviceName", sessionInfo.getDeviceName()); 433 metaData.putValue("deviceName", sessionInfo.getDeviceName());
434 metaData.putValue("deviceType", sessionInfo.getDeviceType()); 434 metaData.putValue("deviceType", sessionInfo.getDeviceType());
435 metaData.putValue("notifyDevice", "false"); 435 metaData.putValue("notifyDevice", "false");
436 - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,  
437 - new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), msg.getKvList().size(), callback))); 436 + CustomerId customerId = getCustomerId(sessionInfo);
  437 + sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,
  438 + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
438 } 439 }
439 } 440 }
440 441
@@ -515,7 +516,7 @@ public class DefaultTransportService implements TransportService { @@ -515,7 +516,7 @@ public class DefaultTransportService implements TransportService {
515 metaData.putValue("requestId", Integer.toString(msg.getRequestId())); 516 metaData.putValue("requestId", Integer.toString(msg.getRequestId()));
516 metaData.putValue("serviceId", serviceInfoProvider.getServiceId()); 517 metaData.putValue("serviceId", serviceInfoProvider.getServiceId());
517 metaData.putValue("sessionId", sessionId.toString()); 518 metaData.putValue("sessionId", sessionId.toString());
518 - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, 519 + sendToRuleEngine(tenantId, deviceId, getCustomerId(sessionInfo), sessionInfo, json, metaData,
519 SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback)); 520 SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback));
520 String requestId = sessionId + "-" + msg.getRequestId(); 521 String requestId = sessionId + "-" + msg.getRequestId();
521 toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId())); 522 toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId()));
@@ -848,7 +849,7 @@ public class DefaultTransportService implements TransportService { @@ -848,7 +849,7 @@ public class DefaultTransportService implements TransportService {
848 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); 849 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
849 } 850 }
850 851
851 - private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, 852 + private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
852 TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) { 853 TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
853 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); 854 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
854 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); 855 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
@@ -865,7 +866,7 @@ public class DefaultTransportService implements TransportService { @@ -865,7 +866,7 @@ public class DefaultTransportService implements TransportService {
865 queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN; 866 queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
866 } 867 }
867 868
868 - TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, metaData, gson.toJson(json), ruleChainId, null); 869 + TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
869 sendToRuleEngine(tenantId, tbMsg, callback); 870 sendToRuleEngine(tenantId, tbMsg, callback);
870 } 871 }
871 872