Commit 522a31c098d6e518f984a3dc5d70f1a215be4937

Authored by Andrew Shvayka
Committed by GitHub
2 parents 42dc8053 0745ee3d

Merge pull request #5451 from dmytro-landiak/fix-tb-rule-engine-queue-factory-topic

[3.3.2] Fixed passed topic name for consumer constructor in rule engine queue
@@ -866,10 +866,15 @@ queue: @@ -866,10 +866,15 @@ queue:
866 sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" 866 sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
867 sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" 867 sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
868 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" 868 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
  869 + # Key-value properties for Kafka consumer per specific topic, e.g. tb_ota_package is a topic name for ota, tb_rule_engine.sq is a topic name for default SequentialByOriginator queue.
  870 + # Check TB_QUEUE_CORE_OTA_TOPIC and TB_QUEUE_RE_SQ_TOPIC params
869 consumer-properties-per-topic: 871 consumer-properties-per-topic:
870 tb_ota_package: 872 tb_ota_package:
871 - key: max.poll.records 873 - key: max.poll.records
872 - value: 10 874 + value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}"
  875 +# tb_rule_engine.sq:
  876 +# - key: max.poll.records
  877 +# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
873 other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside 878 other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
874 - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms 879 - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
875 value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) 880 value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
@@ -117,7 +117,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @@ -117,7 +117,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
117 117
118 @Override 118 @Override
119 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 119 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
120 - return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(), 120 + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
121 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 121 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
122 } 122 }
123 123
@@ -109,7 +109,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory @@ -109,7 +109,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
109 109
110 @Override 110 @Override
111 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 111 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
112 - return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(), 112 + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
113 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 113 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
114 } 114 }
115 115
@@ -92,7 +92,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -92,7 +92,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
92 92
93 @Override 93 @Override
94 public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 94 public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
95 - return new InMemoryTbQueueConsumer<>(ruleEngineSettings.getTopic()); 95 + return new InMemoryTbQueueConsumer<>(configuration.getTopic());
96 } 96 }
97 97
98 @Override 98 @Override
@@ -160,7 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -160,7 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
160 String queueName = configuration.getName(); 160 String queueName = configuration.getName();
161 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); 161 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
162 consumerBuilder.settings(kafkaSettings); 162 consumerBuilder.settings(kafkaSettings);
163 - consumerBuilder.topic(ruleEngineSettings.getTopic()); 163 + consumerBuilder.topic(configuration.getTopic());
164 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); 164 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
165 consumerBuilder.groupId("re-" + queueName + "-consumer"); 165 consumerBuilder.groupId("re-" + queueName + "-consumer");
166 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 166 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
@@ -159,7 +159,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { @@ -159,7 +159,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
159 String queueName = configuration.getName(); 159 String queueName = configuration.getName();
160 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); 160 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
161 consumerBuilder.settings(kafkaSettings); 161 consumerBuilder.settings(kafkaSettings);
162 - consumerBuilder.topic(ruleEngineSettings.getTopic()); 162 + consumerBuilder.topic(configuration.getTopic());
163 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); 163 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
164 consumerBuilder.groupId("re-" + queueName + "-consumer"); 164 consumerBuilder.groupId("re-" + queueName + "-consumer");
165 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 165 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
@@ -127,7 +127,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @@ -127,7 +127,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
127 127
128 @Override 128 @Override
129 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 129 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
130 - return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(), 130 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
131 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 131 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
132 } 132 }
133 133
@@ -114,7 +114,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory @@ -114,7 +114,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
114 114
115 @Override 115 @Override
116 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 116 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
117 - return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(), 117 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
118 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 118 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
119 } 119 }
120 120
@@ -125,7 +125,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -125,7 +125,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
125 125
126 @Override 126 @Override
127 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 127 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
128 - return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), 128 + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
129 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 129 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
130 } 130 }
131 131
@@ -112,7 +112,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor @@ -112,7 +112,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
112 112
113 @Override 113 @Override
114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
115 - return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), 115 + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
117 } 117 }
118 118
@@ -124,7 +124,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul @@ -124,7 +124,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
124 124
125 @Override 125 @Override
126 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 126 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
127 - return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), 127 + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
128 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 128 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
129 } 129 }
130 130
@@ -112,7 +112,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact @@ -112,7 +112,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
112 112
113 @Override 113 @Override
114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { 114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
115 - return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), 115 + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); 116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
117 } 117 }
118 118