Commit 0d7bf40a97332876303583a976d281ca4017ca4e

Authored by Igor Kulikov
1 parent 9ab8b6ff

Add clientId parameter to all Kafka producers.

@@ -77,6 +77,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -77,6 +77,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
77 public void init() { 77 public void init() {
78 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<JsInvokeProtos.RemoteJsRequest> requestBuilder = TBKafkaProducerTemplate.builder(); 78 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<JsInvokeProtos.RemoteJsRequest> requestBuilder = TBKafkaProducerTemplate.builder();
79 requestBuilder.settings(kafkaSettings); 79 requestBuilder.settings(kafkaSettings);
  80 + requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId());
80 requestBuilder.defaultTopic(requestTopic); 81 requestBuilder.defaultTopic(requestTopic);
81 requestBuilder.encoder(new RemoteJsRequestEncoder()); 82 requestBuilder.encoder(new RemoteJsRequestEncoder());
82 requestBuilder.enricher((request, responseTopic, requestId) -> { 83 requestBuilder.enricher((request, responseTopic, requestId) -> {
@@ -112,6 +112,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ @@ -112,6 +112,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
112 public void init() { 112 public void init() {
113 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder(); 113 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
114 notificationsProducerBuilder.settings(kafkaSettings); 114 notificationsProducerBuilder.settings(kafkaSettings);
  115 + notificationsProducerBuilder.clientId("producer-transport-notification-" + nodeIdProvider.getNodeId());
115 notificationsProducerBuilder.encoder(new ToTransportMsgEncoder()); 116 notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
116 117
117 notificationsProducer = notificationsProducerBuilder.build(); 118 notificationsProducer = notificationsProducerBuilder.build();
@@ -68,6 +68,7 @@ public class RemoteTransportApiService { @@ -68,6 +68,7 @@ public class RemoteTransportApiService {
68 68
69 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder(); 69 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
70 responseBuilder.settings(kafkaSettings); 70 responseBuilder.settings(kafkaSettings);
  71 + responseBuilder.clientId("producer-transport-api-response-" + nodeIdProvider.getNodeId());
71 responseBuilder.encoder(new TransportApiResponseEncoder()); 72 responseBuilder.encoder(new TransportApiResponseEncoder());
72 73
73 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder(); 74 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
@@ -62,10 +62,13 @@ public class TBKafkaProducerTemplate<T> { @@ -62,10 +62,13 @@ public class TBKafkaProducerTemplate<T> {
62 62
63 @Builder 63 @Builder
64 private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaEnricher<T> enricher, 64 private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaEnricher<T> enricher,
65 - TbKafkaPartitioner<T> partitioner, String defaultTopic) { 65 + TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId) {
66 Properties props = settings.toProps(); 66 Properties props = settings.toProps();
67 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 67 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
68 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 68 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  69 + if (!StringUtils.isEmpty(clientId)) {
  70 + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
  71 + }
69 this.settings = settings; 72 this.settings = settings;
70 this.producer = new KafkaProducer<>(props); 73 this.producer = new KafkaProducer<>(props);
71 this.encoder = encoder; 74 this.encoder = encoder;
@@ -104,6 +104,7 @@ public class RemoteTransportService extends AbstractTransportService { @@ -104,6 +104,7 @@ public class RemoteTransportService extends AbstractTransportService {
104 104
105 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder(); 105 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
106 requestBuilder.settings(kafkaSettings); 106 requestBuilder.settings(kafkaSettings);
  107 + requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId());
107 requestBuilder.defaultTopic(transportApiRequestsTopic); 108 requestBuilder.defaultTopic(transportApiRequestsTopic);
108 requestBuilder.encoder(new TransportApiRequestEncoder()); 109 requestBuilder.encoder(new TransportApiRequestEncoder());
109 110
@@ -128,6 +129,7 @@ public class RemoteTransportService extends AbstractTransportService { @@ -128,6 +129,7 @@ public class RemoteTransportService extends AbstractTransportService {
128 129
129 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); 130 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
130 ruleEngineProducerBuilder.settings(kafkaSettings); 131 ruleEngineProducerBuilder.settings(kafkaSettings);
  132 + ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId());
131 ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); 133 ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic);
132 ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); 134 ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder());
133 ruleEngineProducer = ruleEngineProducerBuilder.build(); 135 ruleEngineProducer = ruleEngineProducerBuilder.build();
@@ -25,6 +25,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -25,6 +25,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
25 25
26 import java.util.Properties; 26 import java.util.Properties;
27 import java.util.concurrent.ExecutionException; 27 import java.util.concurrent.ExecutionException;
  28 +import java.util.concurrent.atomic.AtomicInteger;
28 29
29 @Slf4j 30 @Slf4j
30 @RuleNode( 31 @RuleNode(
@@ -54,6 +55,7 @@ public class TbKafkaNode implements TbNode { @@ -54,6 +55,7 @@ public class TbKafkaNode implements TbNode {
54 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { 55 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
55 this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class); 56 this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
56 Properties properties = new Properties(); 57 Properties properties = new Properties();
  58 + properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getNodeId());
57 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); 59 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
58 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer()); 60 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
59 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer()); 61 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());