Commit efad71b5ac3161a04ac4f867d064437d514bafdb

Authored by Igor Kulikov
1 parent 75742b28

Fix TbKafkaNode. Configure directly serializer class for key/values instead of string class name.

... ... @@ -22,6 +22,10 @@ import org.apache.kafka.clients.CommonClientConfigs;
22 22 import org.apache.kafka.clients.admin.AdminClientConfig;
23 23 import org.apache.kafka.clients.consumer.ConsumerConfig;
24 24 import org.apache.kafka.clients.producer.ProducerConfig;
  25 +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  26 +import org.apache.kafka.common.serialization.ByteArraySerializer;
  27 +import org.apache.kafka.common.serialization.StringDeserializer;
  28 +import org.apache.kafka.common.serialization.StringSerializer;
25 29 import org.springframework.beans.factory.annotation.Value;
26 30 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
27 31 import org.springframework.boot.context.properties.ConfigurationProperties;
... ... @@ -107,8 +111,8 @@ public class TbKafkaSettings {
107 111 props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
108 112 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
109 113
110   - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
111   - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  114 + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  115 + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
112 116 return props;
113 117 }
114 118
... ... @@ -120,8 +124,8 @@ public class TbKafkaSettings {
120 124 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
121 125 props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
122 126 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
123   - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
124   - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  127 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  128 + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
125 129 return props;
126 130 }
127 131
... ...
... ... @@ -6,6 +6,8 @@ sql.ts_inserts_fixed_thread_pool_size=10
6 6
7 7 spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
8 8 spring.jpa.properties.hibernate.order_by.default_null_ordering=last
  9 +spring.jpa.properties.hibernate.jdbc.log.warnings=false
  10 +
9 11 spring.jpa.show-sql=false
10 12 spring.jpa.hibernate.ddl-auto=none
11 13 spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect
... ...
... ... @@ -7,6 +7,8 @@ sql.ts_key_value_partitioning=MONTHS
7 7 #
8 8 spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
9 9 spring.jpa.properties.hibernate.order_by.default_null_ordering=last
  10 +spring.jpa.properties.hibernate.jdbc.log.warnings=false
  11 +
10 12 spring.jpa.show-sql=false
11 13 spring.jpa.hibernate.ddl-auto=validate
12 14 spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect
... ... @@ -49,4 +51,4 @@ queue.rule-engine.queues[0].pack-processing-timeout=3000
49 51 queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES
50 52 queue.rule-engine.queues[0].submit-strategy.type=BURST
51 53
52   -sql.log_entity_queries=true
\ No newline at end of file
  54 +sql.log_entity_queries=true
... ...
... ... @@ -17,11 +17,13 @@ package org.thingsboard.rule.engine.kafka;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.apache.commons.lang3.BooleanUtils;
  20 +import org.apache.commons.lang3.StringUtils;
20 21 import org.apache.kafka.clients.producer.KafkaProducer;
21 22 import org.apache.kafka.clients.producer.Producer;
22 23 import org.apache.kafka.clients.producer.ProducerConfig;
23 24 import org.apache.kafka.clients.producer.ProducerRecord;
24 25 import org.apache.kafka.clients.producer.RecordMetadata;
  26 +import org.apache.kafka.common.serialization.StringSerializer;
25 27 import org.apache.kafka.common.header.Headers;
26 28 import org.apache.kafka.common.header.internals.RecordHeader;
27 29 import org.apache.kafka.common.header.internals.RecordHeaders;
... ... @@ -73,8 +75,8 @@ public class TbKafkaNode implements TbNode {
73 75 Properties properties = new Properties();
74 76 properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
75 77 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
76   - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
77   - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
  78 + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getValueSerializer()));
  79 + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getKeySerializer()));
78 80 properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
79 81 properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
80 82 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
... ... @@ -92,6 +94,19 @@ public class TbKafkaNode implements TbNode {
92 94 }
93 95 }
94 96
  97 + private Class<?> getKafkaSerializerClass(String serializerClassName) {
  98 + Class<?> serializerClass = null;
  99 + if (!StringUtils.isEmpty(serializerClassName)) {
  100 + try {
  101 + serializerClass = Class.forName(serializerClassName);
  102 + } catch (ClassNotFoundException e) {}
  103 + }
  104 + if (serializerClass == null) {
  105 + serializerClass = StringSerializer.class;
  106 + }
  107 + return serializerClass;
  108 + }
  109 +
95 110 @Override
96 111 public void onMsg(TbContext ctx, TbMsg msg) {
97 112 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
... ...