Commit 515dc983d3deff4e320eebd6219b6d378d04d24c

Authored by ShvaykaD
Committed by GitHub
1 parent 72ef0ede

Improvements/kafka rule node (#2505)

* added metadata key-values as kafka headers

* added default charset to configuration

* fix typo
@@ -16,16 +16,21 @@ @@ -16,16 +16,21 @@
16 package org.thingsboard.rule.engine.kafka; 16 package org.thingsboard.rule.engine.kafka;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.commons.lang3.BooleanUtils;
19 import org.apache.kafka.clients.producer.*; 20 import org.apache.kafka.clients.producer.*;
  21 +import org.apache.kafka.common.header.Headers;
  22 +import org.apache.kafka.common.header.internals.RecordHeader;
  23 +import org.apache.kafka.common.header.internals.RecordHeaders;
20 import org.thingsboard.rule.engine.api.util.TbNodeUtils; 24 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
21 import org.thingsboard.rule.engine.api.*; 25 import org.thingsboard.rule.engine.api.*;
22 import org.thingsboard.server.common.data.plugin.ComponentType; 26 import org.thingsboard.server.common.data.plugin.ComponentType;
23 import org.thingsboard.server.common.msg.TbMsg; 27 import org.thingsboard.server.common.msg.TbMsg;
24 import org.thingsboard.server.common.msg.TbMsgMetaData; 28 import org.thingsboard.server.common.msg.TbMsgMetaData;
25 29
  30 +import java.nio.charset.Charset;
  31 +import java.nio.charset.StandardCharsets;
26 import java.util.Properties; 32 import java.util.Properties;
27 import java.util.concurrent.ExecutionException; 33 import java.util.concurrent.ExecutionException;
28 -import java.util.concurrent.atomic.AtomicInteger;  
29 34
30 @Slf4j 35 @Slf4j
31 @RuleNode( 36 @RuleNode(
@@ -46,8 +51,11 @@ public class TbKafkaNode implements TbNode { @@ -46,8 +51,11 @@ public class TbKafkaNode implements TbNode {
46 private static final String PARTITION = "partition"; 51 private static final String PARTITION = "partition";
47 private static final String TOPIC = "topic"; 52 private static final String TOPIC = "topic";
48 private static final String ERROR = "error"; 53 private static final String ERROR = "error";
  54 + public static final String TB_MSG_MD_PREFIX = "tb_msg_md_";
49 55
50 private TbKafkaNodeConfiguration config; 56 private TbKafkaNodeConfiguration config;
  57 + private boolean addMetadataKeyValuesAsKafkaHeaders;
  58 + private Charset toBytesCharset;
51 59
52 private Producer<?, String> producer; 60 private Producer<?, String> producer;
53 61
@@ -66,8 +74,10 @@ public class TbKafkaNode implements TbNode { @@ -66,8 +74,10 @@ public class TbKafkaNode implements TbNode {
66 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory()); 74 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
67 if (config.getOtherProperties() != null) { 75 if (config.getOtherProperties() != null) {
68 config.getOtherProperties() 76 config.getOtherProperties()
69 - .forEach((k,v) -> properties.put(k, v)); 77 + .forEach(properties::put);
70 } 78 }
  79 + addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
  80 + toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
71 try { 81 try {
72 this.producer = new KafkaProducer<>(properties); 82 this.producer = new KafkaProducer<>(properties);
73 } catch (Exception e) { 83 } catch (Exception e) {
@@ -79,16 +89,16 @@ public class TbKafkaNode implements TbNode { @@ -79,16 +89,16 @@ public class TbKafkaNode implements TbNode {
79 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { 89 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
80 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); 90 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
81 try { 91 try {
82 - producer.send(new ProducerRecord<>(topic, msg.getData()),  
83 - (metadata, e) -> {  
84 - if (metadata != null) {  
85 - TbMsg next = processResponse(ctx, msg, metadata);  
86 - ctx.tellNext(next, TbRelationTypes.SUCCESS);  
87 - } else {  
88 - TbMsg next = processException(ctx, msg, e);  
89 - ctx.tellFailure(next, e);  
90 - }  
91 - }); 92 + if (!addMetadataKeyValuesAsKafkaHeaders) {
  93 + producer.send(new ProducerRecord<>(topic, msg.getData()),
  94 + (metadata, e) -> processRecord(ctx, msg, metadata, e));
  95 + } else {
  96 + Headers headers = new RecordHeaders();
  97 + msg.getMetaData().values().forEach((key, value) -> headers.add(new RecordHeader(TB_MSG_MD_PREFIX + key, value.getBytes(toBytesCharset))));
  98 + producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
  99 + (metadata, e) -> processRecord(ctx, msg, metadata, e));
  100 + }
  101 +
92 } catch (Exception e) { 102 } catch (Exception e) {
93 ctx.tellFailure(msg, e); 103 ctx.tellFailure(msg, e);
94 } 104 }
@@ -105,6 +115,16 @@ public class TbKafkaNode implements TbNode { @@ -105,6 +115,16 @@ public class TbKafkaNode implements TbNode {
105 } 115 }
106 } 116 }
107 117
  118 + private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) {
  119 + if (metadata != null) {
  120 + TbMsg next = processResponse(ctx, msg, metadata);
  121 + ctx.tellNext(next, TbRelationTypes.SUCCESS);
  122 + } else {
  123 + TbMsg next = processException(ctx, msg, e);
  124 + ctx.tellFailure(next, e);
  125 + }
  126 + }
  127 +
108 private TbMsg processResponse(TbContext ctx, TbMsg origMsg, RecordMetadata recordMetadata) { 128 private TbMsg processResponse(TbContext ctx, TbMsg origMsg, RecordMetadata recordMetadata) {
109 TbMsgMetaData metaData = origMsg.getMetaData().copy(); 129 TbMsgMetaData metaData = origMsg.getMetaData().copy();
110 metaData.putValue(OFFSET, String.valueOf(recordMetadata.offset())); 130 metaData.putValue(OFFSET, String.valueOf(recordMetadata.offset()));
@@ -36,6 +36,9 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo @@ -36,6 +36,9 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
36 private String valueSerializer; 36 private String valueSerializer;
37 private Map<String, String> otherProperties; 37 private Map<String, String> otherProperties;
38 38
  39 + private boolean addMetadataKeyValuesAsKafkaHeaders;
  40 + private String kafkaHeadersCharset;
  41 +
39 @Override 42 @Override
40 public TbKafkaNodeConfiguration defaultConfiguration() { 43 public TbKafkaNodeConfiguration defaultConfiguration() {
41 TbKafkaNodeConfiguration configuration = new TbKafkaNodeConfiguration(); 44 TbKafkaNodeConfiguration configuration = new TbKafkaNodeConfiguration();
@@ -49,6 +52,8 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo @@ -49,6 +52,8 @@ public class TbKafkaNodeConfiguration implements NodeConfiguration<TbKafkaNodeCo
49 configuration.setKeySerializer(StringSerializer.class.getName()); 52 configuration.setKeySerializer(StringSerializer.class.getName());
50 configuration.setValueSerializer(StringSerializer.class.getName()); 53 configuration.setValueSerializer(StringSerializer.class.getName());
51 configuration.setOtherProperties(Collections.emptyMap()); 54 configuration.setOtherProperties(Collections.emptyMap());
  55 + configuration.setAddMetadataKeyValuesAsKafkaHeaders(false);
  56 + configuration.setKafkaHeadersCharset("UTF-8");
52 return configuration; 57 return configuration;
53 } 58 }
54 } 59 }