Commit e2f26051979d9f63dd921fe6deb4223331bf134d

Authored by Igor Kulikov
1 parent 266ab233

Fix fork join pool - use current classloader for thread context classloader instead of system

@@ -62,9 +62,6 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro @@ -62,9 +62,6 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
62 } 62 }
63 this.settings = settings; 63 this.settings = settings;
64 64
65 - // Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class  
66 - // details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class  
67 - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());  
68 this.producer = new KafkaProducer<>(props); 65 this.producer = new KafkaProducer<>(props);
69 this.defaultTopic = defaultTopic; 66 this.defaultTopic = defaultTopic;
70 this.admin = admin; 67 this.admin = admin;
@@ -34,8 +34,8 @@ public class ThingsBoardForkJoinWorkerThreadFactory implements ForkJoinPool.Fork @@ -34,8 +34,8 @@ public class ThingsBoardForkJoinWorkerThreadFactory implements ForkJoinPool.Fork
34 @Override 34 @Override
35 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 35 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
36 ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); 36 ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
  37 + thread.setContextClassLoader(this.getClass().getClassLoader());
37 thread.setName(namePrefix +"-"+thread.getPoolIndex()+"-"+threadNumber.getAndIncrement()); 38 thread.setName(namePrefix +"-"+thread.getPoolIndex()+"-"+threadNumber.getAndIncrement());
38 return thread; 39 return thread;
39 } 40 }
40 -  
41 } 41 }
@@ -75,8 +75,8 @@ public class TbKafkaNode implements TbNode { @@ -75,8 +75,8 @@ public class TbKafkaNode implements TbNode {
75 Properties properties = new Properties(); 75 Properties properties = new Properties();
76 properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId()); 76 properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
77 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); 77 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
78 - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getValueSerializer()));  
79 - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getKeySerializer())); 78 + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
  79 + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
80 properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); 80 properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
81 properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); 81 properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
82 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); 82 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
@@ -88,28 +88,12 @@ public class TbKafkaNode implements TbNode { @@ -88,28 +88,12 @@ public class TbKafkaNode implements TbNode {
88 addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false); 88 addMetadataKeyValuesAsKafkaHeaders = BooleanUtils.toBooleanDefaultIfNull(config.isAddMetadataKeyValuesAsKafkaHeaders(), false);
89 toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8; 89 toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
90 try { 90 try {
91 - // Ugly workaround to fix org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class  
92 - // details: https://stackoverflow.com/questions/57574901/kafka-java-client-classloader-doesnt-find-sasl-scram-login-class  
93 - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());  
94 this.producer = new KafkaProducer<>(properties); 91 this.producer = new KafkaProducer<>(properties);
95 } catch (Exception e) { 92 } catch (Exception e) {
96 throw new TbNodeException(e); 93 throw new TbNodeException(e);
97 } 94 }
98 } 95 }
99 96
100 - private Class<?> getKafkaSerializerClass(String serializerClassName) {  
101 - Class<?> serializerClass = null;  
102 - if (!StringUtils.isEmpty(serializerClassName)) {  
103 - try {  
104 - serializerClass = Class.forName(serializerClassName);  
105 - } catch (ClassNotFoundException e) {}  
106 - }  
107 - if (serializerClass == null) {  
108 - serializerClass = StringSerializer.class;  
109 - }  
110 - return serializerClass;  
111 - }  
112 -  
113 @Override 97 @Override
114 public void onMsg(TbContext ctx, TbMsg msg) { 98 public void onMsg(TbContext ctx, TbMsg msg) {
115 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); 99 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);