Commit c6cf5c43adc7c0d94374ad6ef7ba6cc7cc8b0fb0

Authored by Andrii Shvaika
2 parents d866fc15 3d54384b

Merge remote-tracking branch 'origin/master' into develop/3.0

@@ -46,7 +46,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -46,7 +46,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
46 46
47 private static final String CREATE_PARTITION_TS_KV_TABLE = "create_partition_ts_kv_table()"; 47 private static final String CREATE_PARTITION_TS_KV_TABLE = "create_partition_ts_kv_table()";
48 private static final String CREATE_NEW_TS_KV_LATEST_TABLE = "create_new_ts_kv_latest_table()"; 48 private static final String CREATE_NEW_TS_KV_LATEST_TABLE = "create_new_ts_kv_latest_table()";
49 - private static final String CREATE_PARTITIONS = "create_partitions()"; 49 + private static final String CREATE_PARTITIONS = "create_partitions(IN partition_type varchar)";
50 private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()"; 50 private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()";
51 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()"; 51 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
52 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv()"; 52 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv()";
@@ -108,7 +108,6 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -108,7 +108,6 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
108 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); 108 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
109 executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); 109 executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
110 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); 110 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
111 - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);  
112 executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA); 111 executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA);
113 112
114 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); 113 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
@@ -81,14 +81,24 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -81,14 +81,24 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
81 81
82 @Override 82 @Override
83 public void subscribe() { 83 public void subscribe() {
84 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
85 - subscribed = false; 84 + consumerLock.lock();
  85 + try {
  86 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  87 + subscribed = false;
  88 + } finally {
  89 + consumerLock.unlock();
  90 + }
86 } 91 }
87 92
88 @Override 93 @Override
89 public void subscribe(Set<TopicPartitionInfo> partitions) { 94 public void subscribe(Set<TopicPartitionInfo> partitions) {
90 - this.partitions = partitions;  
91 - subscribed = false; 95 + consumerLock.lock();
  96 + try {
  97 + this.partitions = partitions;
  98 + subscribed = false;
  99 + } finally {
  100 + consumerLock.unlock();
  101 + }
92 } 102 }
93 103
94 @Override 104 @Override
@@ -100,13 +110,11 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -100,13 +110,11 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
100 log.debug("Failed to await subscription", e); 110 log.debug("Failed to await subscription", e);
101 } 111 }
102 } else { 112 } else {
  113 + consumerLock.lock();
103 try { 114 try {
104 - consumerLock.lock();  
105 -  
106 if (!subscribed) { 115 if (!subscribed) {
107 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); 116 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
108 topicNames.forEach(admin::createTopicIfNotExists); 117 topicNames.forEach(admin::createTopicIfNotExists);
109 - consumer.unsubscribe();  
110 consumer.subscribe(topicNames); 118 consumer.subscribe(topicNames);
111 subscribed = true; 119 subscribed = true;
112 } 120 }
@@ -132,8 +140,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -132,8 +140,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
132 140
133 @Override 141 @Override
134 public void commit() { 142 public void commit() {
  143 + consumerLock.lock();
135 try { 144 try {
136 - consumerLock.lock();  
137 consumer.commitAsync(); 145 consumer.commitAsync();
138 } finally { 146 } finally {
139 consumerLock.unlock(); 147 consumerLock.unlock();
@@ -142,8 +150,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -142,8 +150,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
142 150
143 @Override 151 @Override
144 public void unsubscribe() { 152 public void unsubscribe() {
  153 + consumerLock.lock();
145 try { 154 try {
146 - consumerLock.lock();  
147 if (consumer != null) { 155 if (consumer != null) {
148 consumer.unsubscribe(); 156 consumer.unsubscribe();
149 consumer.close(); 157 consumer.close();
@@ -28,7 +28,7 @@ services: @@ -28,7 +28,7 @@ services:
28 ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181 28 ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181
29 kafka: 29 kafka:
30 restart: always 30 restart: always
31 - image: "wurstmeister/kafka:2.12-2.2.1" 31 + image: "wurstmeister/kafka:2.12-2.3.0"
32 ports: 32 ports:
33 - "9092:9092" 33 - "9092:9092"
34 env_file: 34 env_file: