Showing
1 changed file
with
7 additions
and
12 deletions
... | ... | @@ -184,12 +184,9 @@ public class DefaultTbClusterService implements TbClusterService { |
184 | 184 | byte[] msgBytes = encodingService.encode(msg); |
185 | 185 | TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); |
186 | 186 | Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)); |
187 | - boolean toCore = msg.getEntityId().getEntityType().equals(EntityType.TENANT) || | |
188 | - msg.getEntityId().getEntityType().equals(EntityType.EDGE); | |
189 | 187 | |
190 | - boolean toRuleEngine = !msg.getEntityId().getEntityType().equals(EntityType.EDGE); | |
191 | - | |
192 | - if (toCore) { | |
188 | + if (msg.getEntityId().getEntityType().equals(EntityType.TENANT) || | |
189 | + msg.getEntityId().getEntityType().equals(EntityType.EDGE)) { | |
193 | 190 | TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); |
194 | 191 | Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); |
195 | 192 | for (String serviceId : tbCoreServices) { |
... | ... | @@ -201,13 +198,11 @@ public class DefaultTbClusterService implements TbClusterService { |
201 | 198 | // No need to push notifications twice |
202 | 199 | tbRuleEngineServices.removeAll(tbCoreServices); |
203 | 200 | } |
204 | - if (toRuleEngine) { | |
205 | - for (String serviceId : tbRuleEngineServices) { | |
206 | - TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); | |
207 | - ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build(); | |
208 | - toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null); | |
209 | - toRuleEngineNfs.incrementAndGet(); | |
210 | - } | |
201 | + for (String serviceId : tbRuleEngineServices) { | |
202 | + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); | |
203 | + ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build(); | |
204 | + toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null); | |
205 | + toRuleEngineNfs.incrementAndGet(); | |
211 | 206 | } |
212 | 207 | } |
213 | 208 | ... | ... |