Commit 1b905ef7270ca73e328e7d7ed936c489a0c69d3a

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 4de258f2

refactored

@@ -165,7 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -165,7 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
165 consumerBuilder.settings(kafkaSettings); 165 consumerBuilder.settings(kafkaSettings);
166 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); 166 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
167 consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); 167 consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
168 - consumerBuilder.groupId("monolith-rule-engine-notifications-consumer"); 168 + consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
169 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); 169 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
170 consumerBuilder.admin(notificationAdmin); 170 consumerBuilder.admin(notificationAdmin);
171 return consumerBuilder.build(); 171 return consumerBuilder.build();
@@ -189,7 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -189,7 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
189 consumerBuilder.settings(kafkaSettings); 189 consumerBuilder.settings(kafkaSettings);
190 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); 190 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
191 consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); 191 consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
192 - consumerBuilder.groupId("monolith-core-notifications-consumer"); 192 + consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
193 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); 193 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
194 consumerBuilder.admin(notificationAdmin); 194 consumerBuilder.admin(notificationAdmin);
195 return consumerBuilder.build(); 195 return consumerBuilder.build();
@@ -230,7 +230,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -230,7 +230,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
230 responseBuilder.settings(kafkaSettings); 230 responseBuilder.settings(kafkaSettings);
231 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); 231 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
232 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); 232 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
233 - responseBuilder.groupId("rule-engine-node"); 233 + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
234 responseBuilder.decoder(msg -> { 234 responseBuilder.decoder(msg -> {
235 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); 235 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
236 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); 236 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
@@ -159,7 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { @@ -159,7 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
159 consumerBuilder.settings(kafkaSettings); 159 consumerBuilder.settings(kafkaSettings);
160 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); 160 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName());
161 consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); 161 consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
162 - consumerBuilder.groupId("tb-core-notifications-node"); 162 + consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
163 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); 163 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
164 consumerBuilder.admin(notificationAdmin); 164 consumerBuilder.admin(notificationAdmin);
165 return consumerBuilder.build(); 165 return consumerBuilder.build();
@@ -200,7 +200,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { @@ -200,7 +200,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
200 responseBuilder.settings(kafkaSettings); 200 responseBuilder.settings(kafkaSettings);
201 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); 201 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
202 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); 202 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
203 - responseBuilder.groupId("rule-engine-node"); 203 + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
204 responseBuilder.decoder(msg -> { 204 responseBuilder.decoder(msg -> {
205 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); 205 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
206 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); 206 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
@@ -154,7 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -154,7 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
154 consumerBuilder.settings(kafkaSettings); 154 consumerBuilder.settings(kafkaSettings);
155 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); 155 consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName());
156 consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); 156 consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
157 - consumerBuilder.groupId("tb-rule-engine-notifications-node"); 157 + consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
158 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); 158 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
159 consumerBuilder.admin(notificationAdmin); 159 consumerBuilder.admin(notificationAdmin);
160 return consumerBuilder.build(); 160 return consumerBuilder.build();
@@ -173,7 +173,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -173,7 +173,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
173 responseBuilder.settings(kafkaSettings); 173 responseBuilder.settings(kafkaSettings);
174 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); 174 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
175 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); 175 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
176 - responseBuilder.groupId("rule-engine-node"); 176 + responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
177 responseBuilder.decoder(msg -> { 177 responseBuilder.decoder(msg -> {
178 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); 178 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
179 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); 179 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
@@ -92,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { @@ -92,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
92 responseBuilder.settings(kafkaSettings); 92 responseBuilder.settings(kafkaSettings);
93 responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); 93 responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
94 responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId()); 94 responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
95 - responseBuilder.groupId("transport-node"); 95 + responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
96 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); 96 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
97 responseBuilder.admin(transportApiAdmin); 97 responseBuilder.admin(transportApiAdmin);
98 98
@@ -133,7 +133,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { @@ -133,7 +133,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
133 responseBuilder.settings(kafkaSettings); 133 responseBuilder.settings(kafkaSettings);
134 responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); 134 responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId());
135 responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId()); 135 responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId());
136 - responseBuilder.groupId("transport-node"); 136 + responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
137 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); 137 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
138 responseBuilder.admin(notificationAdmin); 138 responseBuilder.admin(notificationAdmin);
139 return responseBuilder.build(); 139 return responseBuilder.build();