Commit c8d8321f749889bf807d53949824de8df25081d2

Authored by VoBa
Committed by GitHub
1 parent 3cc9ad50

Use msg queue instead of default (#4116)

* Remove device from cache in case null value cached in the distributed redis

* Handle case when device was removed from db but message in the queue exists

* Code review chagnes

* Added usage statistics configuration to yml file

* Use msg queue instead of default

* Make private

* Make private
@@ -763,8 +763,8 @@ public class DefaultTransportService implements TransportService { @@ -763,8 +763,8 @@ public class DefaultTransportService implements TransportService {
763 wrappedCallback); 763 wrappedCallback);
764 } 764 }
765 765
766 - protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {  
767 - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator()); 766 + private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
  767 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
768 if (log.isTraceEnabled()) { 768 if (log.isTraceEnabled()) {
769 log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); 769 log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
770 } 770 }
@@ -776,7 +776,7 @@ public class DefaultTransportService implements TransportService { @@ -776,7 +776,7 @@ public class DefaultTransportService implements TransportService {
776 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); 776 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
777 } 777 }
778 778
779 - protected void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, 779 + private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
780 TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) { 780 TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
781 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); 781 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
782 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); 782 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);