Commit 34da0fe811f51e2a1ca22fe51b4044ab729b3a67

Authored by Igor Kulikov
2 parents 7f53d531 7e66fd26

Merge branch 'master' of github.com:thingsboard/thingsboard into develop/3.0

@@ -583,6 +583,9 @@ queue: @@ -583,6 +583,9 @@ queue:
583 linger.ms: "${TB_KAFKA_LINGER_MS:1}" 583 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
584 buffer.memory: "${TB_BUFFER_MEMORY:33554432}" 584 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
585 replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" 585 replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  586 + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
  587 + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
  588 + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
586 topic-properties: 589 topic-properties:
587 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 590 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
588 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 591 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
@@ -54,6 +54,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue @@ -54,6 +54,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
54 if (groupId != null) { 54 if (groupId != null) {
55 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 55 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
56 } 56 }
  57 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, settings.getMaxPollRecords());
  58 + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, settings.getMaxPartitionFetchBytes());
  59 + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, settings.getFetchMaxBytes());
57 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); 60 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
58 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); 61 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
59 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 62 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -55,6 +55,18 @@ public class TbKafkaSettings { @@ -55,6 +55,18 @@ public class TbKafkaSettings {
55 @Getter 55 @Getter
56 private short replicationFactor; 56 private short replicationFactor;
57 57
  58 + @Value("${queue.kafka.max_poll_records:8192}")
  59 + @Getter
  60 + private int maxPollRecords;
  61 +
  62 + @Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
  63 + @Getter
  64 + private int maxPartitionFetchBytes;
  65 +
  66 + @Value("${queue.kafka.fetch_max_bytes:134217728}")
  67 + @Getter
  68 + private int fetchMaxBytes;
  69 +
58 @Value("${kafka.other:#{null}}") 70 @Value("${kafka.other:#{null}}")
59 private List<TbKafkaProperty> other; 71 private List<TbKafkaProperty> other;
60 72
@@ -21,7 +21,6 @@ import io.netty.handler.ssl.SslContext; @@ -21,7 +21,6 @@ import io.netty.handler.ssl.SslContext;
21 import io.netty.handler.ssl.SslContextBuilder; 21 import io.netty.handler.ssl.SslContextBuilder;
22 import lombok.Data; 22 import lombok.Data;
23 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
24 -import org.thingsboard.mqtt.MqttClientConfig;  
25 import org.apache.commons.codec.binary.Base64; 24 import org.apache.commons.codec.binary.Base64;
26 import org.bouncycastle.jce.provider.BouncyCastleProvider; 25 import org.bouncycastle.jce.provider.BouncyCastleProvider;
27 import org.bouncycastle.openssl.PEMDecryptorProvider; 26 import org.bouncycastle.openssl.PEMDecryptorProvider;
@@ -30,15 +29,26 @@ import org.bouncycastle.openssl.PEMKeyPair; @@ -30,15 +29,26 @@ import org.bouncycastle.openssl.PEMKeyPair;
30 import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; 29 import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
31 import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; 30 import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
32 import org.springframework.util.StringUtils; 31 import org.springframework.util.StringUtils;
  32 +import org.thingsboard.mqtt.MqttClientConfig;
33 33
  34 +import javax.crypto.Cipher;
  35 +import javax.crypto.EncryptedPrivateKeyInfo;
  36 +import javax.crypto.SecretKeyFactory;
  37 +import javax.crypto.spec.PBEKeySpec;
34 import javax.net.ssl.KeyManagerFactory; 38 import javax.net.ssl.KeyManagerFactory;
35 import javax.net.ssl.TrustManagerFactory; 39 import javax.net.ssl.TrustManagerFactory;
36 import java.io.ByteArrayInputStream; 40 import java.io.ByteArrayInputStream;
37 -import java.security.*; 41 +import java.security.AlgorithmParameters;
  42 +import java.security.Key;
  43 +import java.security.KeyFactory;
  44 +import java.security.KeyPair;
  45 +import java.security.KeyStore;
  46 +import java.security.PrivateKey;
  47 +import java.security.Security;
38 import java.security.cert.Certificate; 48 import java.security.cert.Certificate;
39 import java.security.cert.CertificateFactory; 49 import java.security.cert.CertificateFactory;
40 import java.security.cert.X509Certificate; 50 import java.security.cert.X509Certificate;
41 -import java.security.interfaces.RSAPrivateKey; 51 +import java.security.spec.KeySpec;
42 import java.security.spec.PKCS8EncodedKeySpec; 52 import java.security.spec.PKCS8EncodedKeySpec;
43 import java.util.Optional; 53 import java.util.Optional;
44 54
@@ -138,16 +148,36 @@ public class CertPemClientCredentials implements MqttClientCredentials { @@ -138,16 +148,36 @@ public class CertPemClientCredentials implements MqttClientCredentials {
138 } 148 }
139 149
140 private PrivateKey readPrivateKeyFile(String fileContent) throws Exception { 150 private PrivateKey readPrivateKeyFile(String fileContent) throws Exception {
141 - RSAPrivateKey privateKey = null; 151 + PrivateKey privateKey = null;
142 if (fileContent != null && !fileContent.isEmpty()) { 152 if (fileContent != null && !fileContent.isEmpty()) {
143 fileContent = fileContent.replaceAll(".*BEGIN.*PRIVATE KEY.*", "") 153 fileContent = fileContent.replaceAll(".*BEGIN.*PRIVATE KEY.*", "")
144 .replaceAll(".*END.*PRIVATE KEY.*", "") 154 .replaceAll(".*END.*PRIVATE KEY.*", "")
145 .replaceAll("\\s", ""); 155 .replaceAll("\\s", "");
146 byte[] decoded = Base64.decodeBase64(fileContent); 156 byte[] decoded = Base64.decodeBase64(fileContent);
147 KeyFactory keyFactory = KeyFactory.getInstance("RSA"); 157 KeyFactory keyFactory = KeyFactory.getInstance("RSA");
148 - privateKey = (RSAPrivateKey) keyFactory.generatePrivate(new PKCS8EncodedKeySpec(decoded)); 158 + KeySpec keySpec = getKeySpec(decoded);
  159 + privateKey = keyFactory.generatePrivate(keySpec);
149 } 160 }
150 return privateKey; 161 return privateKey;
151 } 162 }
152 163
  164 + private KeySpec getKeySpec(byte[] encodedKey) throws Exception {
  165 + KeySpec keySpec;
  166 + if (password == null) {
  167 + keySpec = new PKCS8EncodedKeySpec(encodedKey);
  168 + } else {
  169 + PBEKeySpec pbeKeySpec = new PBEKeySpec(password.toCharArray());
  170 +
  171 + EncryptedPrivateKeyInfo privateKeyInfo = new EncryptedPrivateKeyInfo(encodedKey);
  172 + String algorithmName = privateKeyInfo.getAlgName();
  173 + Cipher cipher = Cipher.getInstance(algorithmName);
  174 + SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance(algorithmName);
  175 +
  176 + Key pbeKey = secretKeyFactory.generateSecret(pbeKeySpec);
  177 + AlgorithmParameters algParams = privateKeyInfo.getAlgParameters();
  178 + cipher.init(Cipher.DECRYPT_MODE, pbeKey, algParams);
  179 + keySpec = privateKeyInfo.getKeySpec(cipher);
  180 + }
  181 + return keySpec;
  182 + }
153 } 183 }