Commit b6ca497098b6e0c4f72a15f1e16fac8b5919475d

Authored by Andrew Shvayka
1 parent d342e453

Rate Limiter for Kafka Consumer

  1 +--
  2 +-- Copyright © 2016-2018 The Thingsboard Authors
  3 +--
  4 +-- Licensed under the Apache License, Version 2.0 (the "License");
  5 +-- you may not use this file except in compliance with the License.
  6 +-- You may obtain a copy of the License at
  7 +--
  8 +-- http://www.apache.org/licenses/LICENSE-2.0
  9 +--
  10 +-- Unless required by applicable law or agreed to in writing, software
  11 +-- distributed under the License is distributed on an "AS IS" BASIS,
  12 +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +-- See the License for the specific language governing permissions and
  14 +-- limitations under the License.
  15 +--
  16 +
  17 +ALTER TABLE component_descriptor ADD UNIQUE (clazz);
... ...
... ... @@ -101,6 +101,10 @@ public class ThingsboardInstallService {
101 101 log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ...");
102 102
103 103 databaseUpgradeService.upgradeDatabase("2.1.1");
  104 + case "2.1.3":
  105 + log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ...");
  106 +
  107 + databaseUpgradeService.upgradeDatabase("2.1.3");
104 108
105 109 log.info("Updating system data...");
106 110
... ...
... ... @@ -97,6 +97,11 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
97 97 log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
98 98 cause = e;
99 99 retryCount++;
  100 + try {
  101 + Thread.sleep(1000);
  102 + } catch (InterruptedException e1) {
  103 + throw new RuntimeException(e1);
  104 + }
100 105 }
101 106 }
102 107 if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
... ...
... ... @@ -251,7 +251,8 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
251 251 log.info("Entity views restored.");
252 252
253 253 break;
254   -
  254 + case "2.1.3":
  255 + break;
255 256 default:
256 257 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion);
257 258 }
... ...
... ... @@ -149,7 +149,14 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
149 149 log.info("Entity views restored.");
150 150 }
151 151 break;
152   -
  152 + case "2.1.3":
  153 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  154 + log.info("Updating schema ...");
  155 + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.2.0", SCHEMA_UPDATE_SQL);
  156 + loadSql(schemaUpdateFile, conn);
  157 + log.info("Schema updated.");
  158 + }
  159 + break;
153 160 default:
154 161 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
155 162 }
... ...
... ... @@ -75,7 +75,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
75 75 private int autoCommitInterval;
76 76
77 77 @Value("${transport.remote.rule_engine.poll_records_pack_size}")
78   - private long pollRecordsPackSize;
  78 + private int pollRecordsPackSize;
79 79 @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
80 80 private long pollRecordsPerSecond;
81 81 @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
... ...
... ... @@ -395,7 +395,7 @@ transport:
395 395 auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
396 396 poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
397 397 max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
398   - max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:120000}"
  398 + max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_MINUTE:120000}"
399 399 notifications:
400 400 topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
401 401 sessions:
... ...
... ... @@ -47,7 +47,7 @@ public class TBKafkaConsumerTemplate<T> {
47 47 TbKafkaRequestIdExtractor<T> requestIdExtractor,
48 48 String clientId, String groupId, String topic,
49 49 boolean autoCommit, int autoCommitIntervalMs,
50   - long maxPollRecords) {
  50 + int maxPollRecords) {
51 51 Properties props = settings.toProps();
52 52 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
53 53 if (groupId != null) {
... ... @@ -57,7 +57,9 @@ public class TBKafkaConsumerTemplate<T> {
57 57 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
58 58 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
59 59 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
60   - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  60 + if (maxPollRecords > 0) {
  61 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  62 + }
61 63 this.consumer = new KafkaConsumer<>(props);
62 64 this.decoder = decoder;
63 65 this.requestIdExtractor = requestIdExtractor;
... ...
... ... @@ -34,6 +34,7 @@ import javax.persistence.Entity;
34 34 import javax.persistence.EnumType;
35 35 import javax.persistence.Enumerated;
36 36 import javax.persistence.Table;
  37 +import javax.persistence.UniqueConstraint;
37 38
38 39 @Data
39 40 @EqualsAndHashCode(callSuper = true)
... ... @@ -53,7 +54,7 @@ public class ComponentDescriptorEntity extends BaseSqlEntity<ComponentDescriptor
53 54 @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_NAME_PROPERTY)
54 55 private String name;
55 56
56   - @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY)
  57 + @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, unique = true)
57 58 private String clazz;
58 59
59 60 @Type(type = "json")
... ...
... ... @@ -78,7 +78,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
78 78 CREATE TABLE IF NOT EXISTS component_descriptor (
79 79 id varchar(31) NOT NULL CONSTRAINT component_descriptor_pkey PRIMARY KEY,
80 80 actions varchar(255),
81   - clazz varchar,
  81 + clazz varchar UNIQUE,
82 82 configuration_descriptor varchar,
83 83 name varchar(255),
84 84 scope varchar(255),
... ...