Commit 3a0673c29aad9e793c7cee0c63bd7b2345da0c0a

Authored by Igor Kulikov
2 parents 1fd284f3 3f9f6efc

Merge with develop/2.5.5

... ... @@ -247,4 +247,4 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
247 247 log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage());
248 248 }
249 249 }
250   -}
\ No newline at end of file
  250 +}
... ...
... ... @@ -209,4 +209,4 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
209 209 log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage());
210 210 }
211 211 }
212   -}
\ No newline at end of file
  212 +}
... ...
... ... @@ -39,8 +39,7 @@ public abstract class AbstractCleanUpService {
39 39 protected String dbPassword;
40 40
41 41 protected long executeQuery(Connection conn, String query) throws SQLException {
42   - try (Statement statement = conn.createStatement()) {
43   - ResultSet resultSet = statement.executeQuery(query);
  42 + try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) {
44 43 if (log.isDebugEnabled()) {
45 44 getWarnings(statement);
46 45 }
... ...
... ... @@ -33,4 +33,4 @@ public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUp
33 33 long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");
34 34 log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
35 35 }
36   -}
\ No newline at end of file
  36 +}
... ...
... ... @@ -646,11 +646,11 @@ queue:
646 646 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
647 647 other:
648 648 topic-properties:
649   - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
650   - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
651   - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
652   - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
653   - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
  649 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  650 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  651 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  652 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  653 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
654 654 aws_sqs:
655 655 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
656 656 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...
... ... @@ -37,6 +37,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
37 37 private final AdminClient client;
38 38 private final Map<String, String> topicConfigs;
39 39 private final Set<String> topics = ConcurrentHashMap.newKeySet();
  40 + private final int numPartitions;
40 41
41 42 private final short replicationFactor;
42 43
... ... @@ -50,6 +51,13 @@ public class TbKafkaAdmin implements TbQueueAdmin {
50 51 log.error("Failed to get all topics.", e);
51 52 }
52 53
  54 + String numPartitionsStr = topicConfigs.get("partitions");
  55 + if (numPartitionsStr != null) {
  56 + numPartitions = Integer.parseInt(numPartitionsStr);
  57 + topicConfigs.remove("partitions");
  58 + } else {
  59 + numPartitions = 1;
  60 + }
53 61 replicationFactor = settings.getReplicationFactor();
54 62 }
55 63
... ... @@ -59,7 +67,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
59 67 return;
60 68 }
61 69 try {
62   - NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs);
  70 + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs);
63 71 createTopic(newTopic).values().get(topic).get();
64 72 topics.add(topic);
65 73 } catch (ExecutionException ee) {
... ...
1 1 TB_QUEUE_TYPE=kafka
2 2 TB_KAFKA_SERVERS=kafka:9092
  3 +TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100
... ...
... ... @@ -25,7 +25,7 @@ kafka:
25 25 # Kafka Bootstrap Servers
26 26 servers: "localhost:9092"
27 27 replication_factor: "1"
28   - topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
  28 + topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100"
29 29 use_confluent_cloud: false
30 30 confluent:
31 31 sasl:
... ...
... ... @@ -34,7 +34,7 @@ function KafkaProducer() {
34 34 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
35 35
36 36 if (!topics.includes(responseTopic)) {
37   - let createResponseTopicResult = await createTopic(responseTopic);
  37 + let createResponseTopicResult = await createTopic(responseTopic, 1);
38 38 topics.push(responseTopic);
39 39 if (createResponseTopicResult) {
40 40 logger.info('Created new topic: %s', requestTopic);
... ... @@ -88,7 +88,18 @@ function KafkaProducer() {
88 88 kafkaAdmin = kafkaClient.admin();
89 89 await kafkaAdmin.connect();
90 90
91   - let createRequestTopicResult = await createTopic(requestTopic);
  91 + let partitions = 1;
  92 +
  93 + for (let i = 0; i < configEntries.length; i++) {
  94 + let param = configEntries[i];
  95 + if (param.name === 'partitions') {
  96 + partitions = param.value;
  97 + configEntries.splice(i, 1);
  98 + break;
  99 + }
  100 + }
  101 +
  102 + let createRequestTopicResult = await createTopic(requestTopic, partitions);
92 103
93 104 if (createRequestTopicResult) {
94 105 logger.info('Created new topic: %s', requestTopic);
... ... @@ -121,10 +132,11 @@ function KafkaProducer() {
121 132 }
122 133 })();
123 134
124   -function createTopic(topic) {
  135 +function createTopic(topic, partitions) {
125 136 return kafkaAdmin.createTopics({
126 137 topics: [{
127 138 topic: topic,
  139 + numPartitions: partitions,
128 140 replicationFactor: replicationFactor,
129 141 configEntries: configEntries
130 142 }]
... ...
... ... @@ -77,11 +77,11 @@ queue:
77 77 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
78 78 other:
79 79 topic-properties:
80   - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
81   - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
82   - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
83   - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
84   - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
  80 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  81 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  82 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  83 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  84 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
85 85 aws_sqs:
86 86 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
87 87 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...
... ... @@ -70,11 +70,11 @@ queue:
70 70 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
71 71 other:
72 72 topic-properties:
73   - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
74   - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
75   - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
76   - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
77   - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
  73 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  74 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  75 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  76 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  77 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
78 78 aws_sqs:
79 79 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
80 80 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...
... ... @@ -98,11 +98,11 @@ queue:
98 98 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
99 99 other:
100 100 topic-properties:
101   - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
102   - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
103   - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
104   - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
105   - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
  101 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  102 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  103 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  104 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
  105 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
106 106 aws_sqs:
107 107 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
108 108 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...