Commit 2553cf6b6fcfb1ccfa8f8d0a5cfbfe1466990f8c

Authored by Yevhen Bondarenko
Committed by GitHub
1 parent c4269023

created Aws Sqs Queue (#2534)

* created Aws Sqs Queue

* improvement AwsSqs providers

* revert package-lock.json

* Aws sqs improvements

* Aws sqs improvements

* Aws sqs improvements

* Aws sqs improvements

* aws improvements

* aws improvements

* aws improvements

* added visibility timeout to aws queue
Showing 25 changed files with 1074 additions and 10 deletions
@@ -88,7 +88,6 @@ public class RpcController extends BaseController { @@ -88,7 +88,6 @@ public class RpcController extends BaseController {
88 return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); 88 return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
89 } 89 }
90 90
91 -  
92 private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { 91 private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
93 try { 92 try {
94 JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); 93 JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
@@ -136,6 +136,15 @@ public abstract class AbstractConsumerService<T extends com.google.protobuf.Gene @@ -136,6 +136,15 @@ public abstract class AbstractConsumerService<T extends com.google.protobuf.Gene
136 @PreDestroy 136 @PreDestroy
137 public void destroy() { 137 public void destroy() {
138 stopped = true; 138 stopped = true;
  139 +
  140 + if (mainConsumer != null) {
  141 + mainConsumer.unsubscribe();
  142 + }
  143 +
  144 + if (nfConsumer != null) {
  145 + nfConsumer.unsubscribe();
  146 + }
  147 +
139 if (mainConsumerExecutor != null) { 148 if (mainConsumerExecutor != null) {
140 mainConsumerExecutor.shutdownNow(); 149 mainConsumerExecutor.shutdownNow();
141 } 150 }
@@ -517,7 +517,7 @@ swagger: @@ -517,7 +517,7 @@ swagger:
517 version: "${SWAGGER_VERSION:2.0}" 517 version: "${SWAGGER_VERSION:2.0}"
518 518
519 queue: 519 queue:
520 - type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory 520 + type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs
521 kafka: 521 kafka:
522 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" 522 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
523 acks: "${TB_KAFKA_ACKS:all}" 523 acks: "${TB_KAFKA_ACKS:all}"
@@ -525,6 +525,12 @@ queue: @@ -525,6 +525,12 @@ queue:
525 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" 525 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
526 linger.ms: "${TB_KAFKA_LINGER_MS:1}" 526 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
527 buffer.memory: "${TB_BUFFER_MEMORY:33554432}" 527 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  528 + aws_sqs:
  529 + access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
  530 + secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
  531 + region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
  532 + threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
  533 + visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #in seconds
528 partitions: 534 partitions:
529 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" 535 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
530 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" 536 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
@@ -53,6 +53,11 @@ @@ -53,6 +53,11 @@
53 <artifactId>kafka-clients</artifactId> 53 <artifactId>kafka-clients</artifactId>
54 </dependency> 54 </dependency>
55 <dependency> 55 <dependency>
  56 + <groupId>com.amazonaws</groupId>
  57 + <artifactId>aws-java-sdk-sqs</artifactId>
  58 + </dependency>
  59 +
  60 + <dependency>
56 <groupId>org.springframework</groupId> 61 <groupId>org.springframework</groupId>
57 <artifactId>spring-context-support</artifactId> 62 <artifactId>spring-context-support</artifactId>
58 </dependency> 63 </dependency>
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue;
  17 +
  18 +import com.google.protobuf.InvalidProtocolBufferException;
  19 +import org.thingsboard.server.queue.TbQueueMsg;
  20 +
  21 +public interface TbQueueMsgDecoder<T extends TbQueueMsg> {
  22 +
  23 + T decode(TbQueueMsg msg) throws InvalidProtocolBufferException;
  24 +}
@@ -25,4 +25,5 @@ public interface TbQueueProducer<T extends TbQueueMsg> { @@ -25,4 +25,5 @@ public interface TbQueueProducer<T extends TbQueueMsg> {
25 25
26 void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback); 26 void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback);
27 27
  28 + void stop();
28 } 29 }
@@ -92,6 +92,8 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response @@ -92,6 +92,8 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
92 List<Response> responses = responseTemplate.poll(pollInterval); 92 List<Response> responses = responseTemplate.poll(pollInterval);
93 if (responses.size() > 0) { 93 if (responses.size() > 0) {
94 log.trace("Polling responses completed, consumer records count [{}]", responses.size()); 94 log.trace("Polling responses completed, consumer records count [{}]", responses.size());
  95 + } else {
  96 + continue;
95 } 97 }
96 responses.forEach(response -> { 98 responses.forEach(response -> {
97 log.trace("Received response to Kafka Template request: {}", response); 99 log.trace("Received response to Kafka Template request: {}", response);
@@ -145,6 +147,15 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response @@ -145,6 +147,15 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
145 @Override 147 @Override
146 public void stop() { 148 public void stop() {
147 stopped = true; 149 stopped = true;
  150 +
  151 + if (responseTemplate != null) {
  152 + responseTemplate.unsubscribe();
  153 + }
  154 +
  155 + if (requestTemplate != null) {
  156 + requestTemplate.stop();
  157 + }
  158 +
148 if (internalExecutor) { 159 if (internalExecutor) {
149 executor.shutdownNow(); 160 executor.shutdownNow();
150 } 161 }
@@ -18,12 +18,12 @@ package org.thingsboard.server.queue.common; @@ -18,12 +18,12 @@ package org.thingsboard.server.queue.common;
18 import lombok.Builder; 18 import lombok.Builder;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 import org.apache.kafka.common.errors.InterruptException; 20 import org.apache.kafka.common.errors.InterruptException;
  21 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
21 import org.thingsboard.server.queue.TbQueueConsumer; 22 import org.thingsboard.server.queue.TbQueueConsumer;
22 import org.thingsboard.server.queue.TbQueueHandler; 23 import org.thingsboard.server.queue.TbQueueHandler;
23 import org.thingsboard.server.queue.TbQueueMsg; 24 import org.thingsboard.server.queue.TbQueueMsg;
24 import org.thingsboard.server.queue.TbQueueProducer; 25 import org.thingsboard.server.queue.TbQueueProducer;
25 import org.thingsboard.server.queue.TbQueueResponseTemplate; 26 import org.thingsboard.server.queue.TbQueueResponseTemplate;
26 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
27 27
28 import java.util.List; 28 import java.util.List;
29 import java.util.UUID; 29 import java.util.UUID;
@@ -87,6 +87,10 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response @@ -87,6 +87,10 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
87 } 87 }
88 List<Request> requests = requestTemplate.poll(pollInterval); 88 List<Request> requests = requestTemplate.poll(pollInterval);
89 89
  90 + if (requests.isEmpty()) {
  91 + continue;
  92 + }
  93 +
90 requests.forEach(request -> { 94 requests.forEach(request -> {
91 long currentTime = System.currentTimeMillis(); 95 long currentTime = System.currentTimeMillis();
92 long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME)); 96 long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
@@ -147,6 +151,12 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response @@ -147,6 +151,12 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
147 151
148 public void stop() { 152 public void stop() {
149 stopped = true; 153 stopped = true;
  154 + if (requestTemplate != null) {
  155 + requestTemplate.unsubscribe();
  156 + }
  157 + if (responseTemplate != null) {
  158 + responseTemplate.stop();
  159 + }
150 if (timeoutExecutor != null) { 160 if (timeoutExecutor != null) {
151 timeoutExecutor.shutdownNow(); 161 timeoutExecutor.shutdownNow();
152 } 162 }
@@ -22,9 +22,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -22,9 +22,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
22 import org.apache.kafka.clients.consumer.ConsumerRecord; 22 import org.apache.kafka.clients.consumer.ConsumerRecord;
23 import org.apache.kafka.clients.consumer.ConsumerRecords; 23 import org.apache.kafka.clients.consumer.ConsumerRecords;
24 import org.apache.kafka.clients.consumer.KafkaConsumer; 24 import org.apache.kafka.clients.consumer.KafkaConsumer;
  25 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
25 import org.thingsboard.server.queue.TbQueueConsumer; 26 import org.thingsboard.server.queue.TbQueueConsumer;
26 import org.thingsboard.server.queue.TbQueueMsg; 27 import org.thingsboard.server.queue.TbQueueMsg;
27 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
28 28
29 import java.io.IOException; 29 import java.io.IOException;
30 import java.time.Duration; 30 import java.time.Duration;
@@ -96,7 +96,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -96,7 +96,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
96 } else { 96 } else {
97 if (!subscribed) { 97 if (!subscribed) {
98 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); 98 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
99 - topicNames.forEach(this::createTopicIfNotExists); 99 + topicNames.forEach(admin::createTopicIfNotExists);
100 consumer.subscribe(topicNames); 100 consumer.subscribe(topicNames);
101 subscribed = true; 101 subscribed = true;
102 } 102 }
@@ -130,7 +130,4 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -130,7 +130,4 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
130 return decoder.decode(new KafkaTbQueueMsg(record)); 130 return decoder.decode(new KafkaTbQueueMsg(record));
131 } 131 }
132 132
133 - private void createTopicIfNotExists(String topic) {  
134 - admin.createTopicIfNotExists(topic);  
135 - }  
136 } 133 }
@@ -84,4 +84,9 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro @@ -84,4 +84,9 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
84 } 84 }
85 }); 85 });
86 } 86 }
  87 +
  88 + @Override
  89 + public void stop() {
  90 +
  91 + }
87 } 92 }
@@ -50,4 +50,9 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro @@ -50,4 +50,9 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
50 } 50 }
51 } 51 }
52 } 52 }
  53 +
  54 + @Override
  55 + public void stop() {
  56 +
  57 + }
53 } 58 }
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.queue.TbQueueAdmin;
  23 +import org.thingsboard.server.queue.TbQueueConsumer;
  24 +import org.thingsboard.server.queue.TbQueueCoreSettings;
  25 +import org.thingsboard.server.queue.TbQueueProducer;
  26 +import org.thingsboard.server.queue.TbQueueRuleEngineSettings;
  27 +import org.thingsboard.server.queue.TbQueueTransportApiSettings;
  28 +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings;
  29 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  30 +import org.thingsboard.server.queue.discovery.PartitionService;
  31 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  32 +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
  33 +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
  34 +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
  35 +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
  36 +
  37 +@Component
  38 +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'")
  39 +public class AwsSqsMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEngineQueueProvider {
  40 +
  41 + private final PartitionService partitionService;
  42 + private final TbQueueCoreSettings coreSettings;
  43 + private final TbServiceInfoProvider serviceInfoProvider;
  44 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  45 + private final TbQueueTransportApiSettings transportApiSettings;
  46 + private final TbQueueTransportNotificationSettings transportNotificationSettings;
  47 + private final TbAwsSqsSettings sqsSettings;
  48 + private final TbQueueAdmin admin;
  49 +
  50 + public AwsSqsMonolithQueueProvider(PartitionService partitionService, TbQueueCoreSettings coreSettings,
  51 + TbQueueRuleEngineSettings ruleEngineSettings,
  52 + TbServiceInfoProvider serviceInfoProvider,
  53 + TbQueueTransportApiSettings transportApiSettings,
  54 + TbQueueTransportNotificationSettings transportNotificationSettings,
  55 + TbAwsSqsSettings sqsSettings) {
  56 + this.partitionService = partitionService;
  57 + this.coreSettings = coreSettings;
  58 + this.serviceInfoProvider = serviceInfoProvider;
  59 + this.ruleEngineSettings = ruleEngineSettings;
  60 + this.transportApiSettings = transportApiSettings;
  61 + this.transportNotificationSettings = transportNotificationSettings;
  62 + this.sqsSettings = sqsSettings;
  63 + admin = new TbAwsSqsAdmin(sqsSettings);
  64 + }
  65 +
  66 + @Override
  67 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsMsgProducer() {
  68 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
  69 + }
  70 +
  71 + @Override
  72 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
  73 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
  74 + }
  75 +
  76 + @Override
  77 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
  78 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
  79 + }
  80 +
  81 + @Override
  82 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
  83 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  84 + }
  85 +
  86 + @Override
  87 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
  88 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  89 + }
  90 +
  91 + @Override
  92 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getToRuleEngineMsgConsumer() {
  93 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(),
  94 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  95 + }
  96 +
  97 + @Override
  98 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getToRuleEngineNotificationsMsgConsumer() {
  99 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
  100 + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  101 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  102 + }
  103 +
  104 + @Override
  105 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getToCoreMsgConsumer() {
  106 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(),
  107 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  108 + }
  109 +
  110 + @Override
  111 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getToCoreNotificationsMsgConsumer() {
  112 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
  113 + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  114 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  115 + }
  116 +
  117 + @Override
  118 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> getTransportApiRequestConsumer() {
  119 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(),
  120 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  121 + }
  122 +
  123 + @Override
  124 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> getTransportApiResponseProducer() {
  125 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getResponsesTopic());
  126 + }
  127 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  27 +import org.thingsboard.server.queue.TbQueueAdmin;
  28 +import org.thingsboard.server.queue.TbQueueConsumer;
  29 +import org.thingsboard.server.queue.TbQueueCoreSettings;
  30 +import org.thingsboard.server.queue.TbQueueProducer;
  31 +import org.thingsboard.server.queue.TbQueueRuleEngineSettings;
  32 +import org.thingsboard.server.queue.TbQueueTransportApiSettings;
  33 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  34 +import org.thingsboard.server.queue.discovery.PartitionService;
  35 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  36 +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
  37 +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
  38 +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
  39 +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
  40 +
  41 +@Component
  42 +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'")
  43 +public class AwsSqsTbCoreQueueProvider implements TbCoreQueueProvider {
  44 +
  45 + private final TbAwsSqsSettings sqsSettings;
  46 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  47 + private final TbQueueCoreSettings coreSettings;
  48 + private final TbQueueTransportApiSettings transportApiSettings;
  49 + private final PartitionService partitionService;
  50 + private final TbServiceInfoProvider serviceInfoProvider;
  51 +
  52 +
  53 + private final TbQueueAdmin admin;
  54 +
  55 + public AwsSqsTbCoreQueueProvider(TbAwsSqsSettings sqsSettings,
  56 + TbQueueCoreSettings coreSettings,
  57 + TbQueueTransportApiSettings transportApiSettings,
  58 + TbQueueRuleEngineSettings ruleEngineSettings,
  59 + PartitionService partitionService,
  60 + TbServiceInfoProvider serviceInfoProvider) {
  61 + this.sqsSettings = sqsSettings;
  62 + this.coreSettings = coreSettings;
  63 + this.transportApiSettings = transportApiSettings;
  64 + this.ruleEngineSettings = ruleEngineSettings;
  65 + this.partitionService = partitionService;
  66 + this.serviceInfoProvider = serviceInfoProvider;
  67 + this.admin = new TbAwsSqsAdmin(sqsSettings);
  68 + }
  69 +
  70 + @Override
  71 + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
  72 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  73 + }
  74 +
  75 + @Override
  76 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
  77 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  78 + }
  79 +
  80 + @Override
  81 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
  82 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
  83 + }
  84 +
  85 + @Override
  86 + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
  87 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  88 + }
  89 +
  90 + @Override
  91 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
  92 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  93 + }
  94 +
  95 + @Override
  96 + public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> getToCoreMsgConsumer() {
  97 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(),
  98 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  99 + }
  100 +
  101 + @Override
  102 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getToCoreNotificationsMsgConsumer() {
  103 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
  104 + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  105 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  106 + }
  107 +
  108 + @Override
  109 + public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> getTransportApiRequestConsumer() {
  110 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  111 + }
  112 +
  113 + @Override
  114 + public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiResponseProducer() {
  115 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  116 + }
  117 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  25 +import org.thingsboard.server.queue.TbQueueAdmin;
  26 +import org.thingsboard.server.queue.TbQueueConsumer;
  27 +import org.thingsboard.server.queue.TbQueueCoreSettings;
  28 +import org.thingsboard.server.queue.TbQueueProducer;
  29 +import org.thingsboard.server.queue.TbQueueRuleEngineSettings;
  30 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  31 +import org.thingsboard.server.queue.discovery.PartitionService;
  32 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  33 +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
  34 +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
  35 +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
  36 +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
  37 +
  38 +@Component
  39 +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'")
  40 +public class AwsSqsTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider {
  41 +
  42 + private final PartitionService partitionService;
  43 + private final TbQueueCoreSettings coreSettings;
  44 + private final TbServiceInfoProvider serviceInfoProvider;
  45 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  46 + private final TbAwsSqsSettings sqsSettings;
  47 + private final TbQueueAdmin admin;
  48 +
  49 + public AwsSqsTbRuleEngineQueueProvider(PartitionService partitionService, TbQueueCoreSettings coreSettings,
  50 + TbQueueRuleEngineSettings ruleEngineSettings,
  51 + TbServiceInfoProvider serviceInfoProvider,
  52 + TbAwsSqsSettings sqsSettings) {
  53 + this.partitionService = partitionService;
  54 + this.coreSettings = coreSettings;
  55 + this.serviceInfoProvider = serviceInfoProvider;
  56 + this.ruleEngineSettings = ruleEngineSettings;
  57 + this.sqsSettings = sqsSettings;
  58 + admin = new TbAwsSqsAdmin(sqsSettings);
  59 + }
  60 +
  61 + @Override
  62 + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() {
  63 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  64 + }
  65 +
  66 + @Override
  67 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
  68 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  69 + }
  70 +
  71 + @Override
  72 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() {
  73 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic());
  74 + }
  75 +
  76 + @Override
  77 + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
  78 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  79 + }
  80 +
  81 + @Override
  82 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() {
  83 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic());
  84 + }
  85 +
  86 + @Override
  87 + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> getToRuleEngineMsgConsumer() {
  88 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  89 + }
  90 +
  91 + @Override
  92 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> getToRuleEngineNotificationsMsgConsumer() {
  93 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
  94 + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  95 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  96 + }
  97 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.queue.TbQueueAdmin;
  23 +import org.thingsboard.server.queue.TbQueueConsumer;
  24 +import org.thingsboard.server.queue.TbQueueProducer;
  25 +import org.thingsboard.server.queue.TbQueueRequestTemplate;
  26 +import org.thingsboard.server.queue.TbQueueTransportApiSettings;
  27 +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings;
  28 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
  29 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  30 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  31 +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
  32 +import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
  33 +import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
  34 +import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
  35 +
  36 +@Component
  37 +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
  38 +@Slf4j
  39 +public class AwsSqsTransportQueueProvider implements TbTransportQueueProvider {
  40 + private final TbQueueTransportApiSettings transportApiSettings;
  41 + private final TbQueueTransportNotificationSettings transportNotificationSettings;
  42 + private final TbAwsSqsSettings sqsSettings;
  43 + private final TbQueueAdmin admin;
  44 + private final TbServiceInfoProvider serviceInfoProvider;
  45 +
  46 + public AwsSqsTransportQueueProvider(TbQueueTransportApiSettings transportApiSettings,
  47 + TbQueueTransportNotificationSettings transportNotificationSettings,
  48 + TbAwsSqsSettings sqsSettings,
  49 + TbServiceInfoProvider serviceInfoProvider) {
  50 + this.transportApiSettings = transportApiSettings;
  51 + this.transportNotificationSettings = transportNotificationSettings;
  52 + this.sqsSettings = sqsSettings;
  53 + admin = new TbAwsSqsAdmin(sqsSettings);
  54 + this.serviceInfoProvider = serviceInfoProvider;
  55 + }
  56 +
  57 + @Override
  58 + public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> getTransportApiRequestTemplate() {
  59 + TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate =
  60 + new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
  61 +
  62 + TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate =
  63 + new TbAwsSqsConsumerTemplate<>(admin, sqsSettings,
  64 + transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
  65 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
  66 +
  67 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  68 + <TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
  69 + templateBuilder.queueAdmin(admin);
  70 + templateBuilder.requestTemplate(producerTemplate);
  71 + templateBuilder.responseTemplate(consumerTemplate);
  72 + templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
  73 + templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
  74 + templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
  75 + return templateBuilder.build();
  76 + }
  77 +
  78 + @Override
  79 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
  80 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
  81 + }
  82 +
  83 + @Override
  84 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
  85 + return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic());
  86 + }
  87 +
  88 + @Override
  89 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsConsumer() {
  90 + return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(),
  91 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
  92 + }
  93 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import com.amazonaws.http.SdkHttpMetadata;
  19 +import lombok.AllArgsConstructor;
  20 +import lombok.Data;
  21 +import org.thingsboard.server.queue.TbQueueMsgMetadata;
  22 +
  23 +@Data
  24 +@AllArgsConstructor
  25 +public class AwsSqsTbQueueMsgMetadata implements TbQueueMsgMetadata {
  26 +
  27 + private final SdkHttpMetadata metadata;
  28 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import com.amazonaws.auth.AWSCredentials;
  19 +import com.amazonaws.auth.AWSStaticCredentialsProvider;
  20 +import com.amazonaws.auth.BasicAWSCredentials;
  21 +import com.amazonaws.services.sqs.AmazonSQS;
  22 +import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
  23 +import com.amazonaws.services.sqs.model.CreateQueueRequest;
  24 +import com.amazonaws.services.sqs.model.QueueAttributeName;
  25 +import org.thingsboard.server.queue.TbQueueAdmin;
  26 +
  27 +import java.util.HashMap;
  28 +import java.util.Map;
  29 +
  30 +public class TbAwsSqsAdmin implements TbQueueAdmin {
  31 +
  32 + private final TbAwsSqsSettings sqsSettings;
  33 + private final Map<String, String> attributes = new HashMap<>();
  34 + private final AWSStaticCredentialsProvider credProvider;
  35 +
  36 + public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings) {
  37 + this.sqsSettings = sqsSettings;
  38 +
  39 + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
  40 + this.credProvider = new AWSStaticCredentialsProvider(awsCredentials);
  41 +
  42 + attributes.put("FifoQueue", "true");
  43 + attributes.put("ContentBasedDeduplication", "true");
  44 + attributes.put(QueueAttributeName.VisibilityTimeout.toString(), sqsSettings.getVisibilityTimeout());
  45 + }
  46 +
  47 + @Override
  48 + public void createTopicIfNotExists(String topic) {
  49 + AmazonSQS sqsClient = AmazonSQSClientBuilder.standard()
  50 + .withCredentials(credProvider)
  51 + .withRegion(sqsSettings.getRegion())
  52 + .build();
  53 +
  54 + final CreateQueueRequest createQueueRequest =
  55 + new CreateQueueRequest(topic.replaceAll("\\.", "_") + ".fifo")
  56 + .withAttributes(attributes);
  57 + try {
  58 + sqsClient.createQueue(createQueueRequest);
  59 + } finally {
  60 + if (sqsClient != null) {
  61 + sqsClient.shutdown();
  62 + }
  63 + }
  64 + }
  65 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import com.amazonaws.auth.AWSCredentials;
  19 +import com.amazonaws.auth.AWSStaticCredentialsProvider;
  20 +import com.amazonaws.auth.BasicAWSCredentials;
  21 +import com.amazonaws.services.sqs.AmazonSQS;
  22 +import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
  23 +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
  24 +import com.amazonaws.services.sqs.model.Message;
  25 +import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
  26 +import com.google.common.reflect.TypeToken;
  27 +import com.google.common.util.concurrent.Futures;
  28 +import com.google.common.util.concurrent.ListenableFuture;
  29 +import com.google.common.util.concurrent.ListeningExecutorService;
  30 +import com.google.common.util.concurrent.MoreExecutors;
  31 +import com.google.gson.Gson;
  32 +import com.google.protobuf.InvalidProtocolBufferException;
  33 +import lombok.Data;
  34 +import lombok.extern.slf4j.Slf4j;
  35 +import org.springframework.util.CollectionUtils;
  36 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  37 +import org.thingsboard.server.queue.TbQueueAdmin;
  38 +import org.thingsboard.server.queue.TbQueueConsumer;
  39 +import org.thingsboard.server.queue.TbQueueMsg;
  40 +import org.thingsboard.server.queue.TbQueueMsgDecoder;
  41 +import org.thingsboard.server.queue.TbQueueMsgHeaders;
  42 +import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders;
  43 +
  44 +import java.io.IOException;
  45 +import java.util.ArrayList;
  46 +import java.util.Collections;
  47 +import java.util.List;
  48 +import java.util.Map;
  49 +import java.util.Objects;
  50 +import java.util.Set;
  51 +import java.util.concurrent.CopyOnWriteArrayList;
  52 +import java.util.concurrent.ExecutionException;
  53 +import java.util.concurrent.Executors;
  54 +import java.util.concurrent.TimeUnit;
  55 +import java.util.stream.Collectors;
  56 +import java.util.stream.Stream;
  57 +
  58 +@Slf4j
  59 +public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  60 +
  61 + private static final int MAX_NUM_MSGS = 10;
  62 +
  63 + private final Gson gson = new Gson();
  64 + private final TbQueueAdmin admin;
  65 + private final AmazonSQS sqsClient;
  66 + private final String topic;
  67 + private final TbQueueMsgDecoder<T> decoder;
  68 + private final TbAwsSqsSettings sqsSettings;
  69 +
  70 + private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>();
  71 + private volatile Set<String> queueUrls;
  72 + private volatile Set<TopicPartitionInfo> partitions;
  73 + private ListeningExecutorService consumerExecutor;
  74 + private volatile boolean subscribed;
  75 + private volatile boolean stopped = false;
  76 +
  77 + public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  78 + this.admin = admin;
  79 + this.decoder = decoder;
  80 + this.topic = topic;
  81 + this.sqsSettings = sqsSettings;
  82 +
  83 + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
  84 + AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
  85 +
  86 + this.sqsClient = AmazonSQSClientBuilder.standard()
  87 + .withCredentials(credProvider)
  88 + .withRegion(sqsSettings.getRegion())
  89 + .build();
  90 + }
  91 +
  92 + @Override
  93 + public String getTopic() {
  94 + return topic;
  95 + }
  96 +
  97 + @Override
  98 + public void subscribe() {
  99 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  100 + subscribed = false;
  101 + }
  102 +
  103 + @Override
  104 + public void subscribe(Set<TopicPartitionInfo> partitions) {
  105 + this.partitions = partitions;
  106 + subscribed = false;
  107 + }
  108 +
  109 + @Override
  110 + public void unsubscribe() {
  111 + stopped = true;
  112 +
  113 + if (sqsClient != null) {
  114 + sqsClient.shutdown();
  115 + }
  116 + if (consumerExecutor != null) {
  117 + consumerExecutor.shutdownNow();
  118 + }
  119 + }
  120 +
  121 + @Override
  122 + public List<T> poll(long durationInMillis) {
  123 + if (!subscribed && partitions == null) {
  124 + try {
  125 + Thread.sleep(durationInMillis);
  126 + } catch (InterruptedException e) {
  127 + log.debug("Failed to await subscription", e);
  128 + }
  129 + } else {
  130 + if (!subscribed) {
  131 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  132 + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
  133 + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1));
  134 + subscribed = true;
  135 + }
  136 +
  137 + if (!pendingMessages.isEmpty()) {
  138 + log.warn("Present {} non committed messages.", pendingMessages.size());
  139 + return Collections.emptyList();
  140 + }
  141 +
  142 + List<ListenableFuture<List<Message>>> futureList = queueUrls
  143 + .stream()
  144 + .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis)))
  145 + .collect(Collectors.toList());
  146 + ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
  147 + try {
  148 + return futureResult.get().stream()
  149 + .flatMap(List::stream)
  150 + .map(msg -> {
  151 + try {
  152 + return decode(msg);
  153 + } catch (IOException e) {
  154 + log.error("Failed to decode message: [{}]", msg);
  155 + return null;
  156 + }
  157 + }).filter(Objects::nonNull)
  158 + .collect(Collectors.toList());
  159 + } catch (InterruptedException | ExecutionException e) {
  160 + if (stopped) {
  161 + log.info("[{}] Aws SQS consumer is stopped.", topic);
  162 + } else {
  163 + log.error("Failed to pool messages.", e);
  164 + }
  165 + }
  166 + }
  167 + return Collections.emptyList();
  168 + }
  169 +
  170 + private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
  171 + List<ListenableFuture<List<Message>>> result = new ArrayList<>();
  172 +
  173 + for (int i = 0; i < sqsSettings.getThreadsPerTopic(); i++) {
  174 + result.add(consumerExecutor.submit(() -> {
  175 + ReceiveMessageRequest request = new ReceiveMessageRequest();
  176 + request
  177 + .withWaitTimeSeconds(waitTimeSeconds)
  178 + .withMessageAttributeNames("headers")
  179 + .withQueueUrl(url)
  180 + .withMaxNumberOfMessages(MAX_NUM_MSGS);
  181 + return sqsClient.receiveMessage(request).getMessages();
  182 + }));
  183 + }
  184 + return Futures.transform(Futures.allAsList(result), list -> {
  185 + if (!CollectionUtils.isEmpty(list)) {
  186 + return list.stream()
  187 + .flatMap(messageList -> {
  188 + if (!messageList.isEmpty()) {
  189 + this.pendingMessages.add(new AwsSqsMsgWrapper(url, messageList));
  190 + return messageList.stream();
  191 + }
  192 + return Stream.empty();
  193 + })
  194 + .collect(Collectors.toList());
  195 + }
  196 + return Collections.emptyList();
  197 + }, consumerExecutor);
  198 + }
  199 +
  200 + @Override
  201 + public void commit() {
  202 + pendingMessages.forEach(msg ->
  203 + consumerExecutor.submit(() -> {
  204 + List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
  205 + .stream()
  206 + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
  207 + .collect(Collectors.toList());
  208 + sqsClient.deleteMessageBatch(msg.getUrl(), entries);
  209 + }));
  210 +
  211 + pendingMessages.clear();
  212 + }
  213 +
  214 + public T decode(Message message) throws InvalidProtocolBufferException {
  215 + TbAwsSqsMsg msg = gson.fromJson(message.getBody(), TbAwsSqsMsg.class);
  216 + TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders();
  217 + Map<String, byte[]> headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken<Map<String, byte[]>>() {
  218 + }.getType());
  219 + headerMap.forEach(headers::put);
  220 + msg.setHeaders(headers);
  221 + return decoder.decode(msg);
  222 + }
  223 +
  224 + @Data
  225 + private static class AwsSqsMsgWrapper {
  226 + private final String url;
  227 + private final List<Message> messages;
  228 +
  229 + public AwsSqsMsgWrapper(String url, List<Message> messages) {
  230 + this.url = url;
  231 + this.messages = messages;
  232 + }
  233 + }
  234 +
  235 + private String getQueueUrl(String topic) {
  236 + admin.createTopicIfNotExists(topic);
  237 + return sqsClient.getQueueUrl(topic.replaceAll("\\.", "_") + ".fifo").getQueueUrl();
  238 + }
  239 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import com.google.gson.annotations.Expose;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.queue.TbQueueMsg;
  21 +import org.thingsboard.server.queue.TbQueueMsgHeaders;
  22 +
  23 +import java.util.UUID;
  24 +
  25 +@Data
  26 +public class TbAwsSqsMsg implements TbQueueMsg {
  27 + private final UUID key;
  28 + private final byte[] data;
  29 +
  30 + public TbAwsSqsMsg(UUID key, byte[] data) {
  31 + this.key = key;
  32 + this.data = data;
  33 + }
  34 +
  35 + @Expose(serialize = false, deserialize = false)
  36 + private TbQueueMsgHeaders headers;
  37 +
  38 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import com.amazonaws.auth.AWSCredentials;
  19 +import com.amazonaws.auth.AWSStaticCredentialsProvider;
  20 +import com.amazonaws.auth.BasicAWSCredentials;
  21 +import com.amazonaws.services.sqs.AmazonSQS;
  22 +import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
  23 +import com.amazonaws.services.sqs.model.MessageAttributeValue;
  24 +import com.amazonaws.services.sqs.model.SendMessageRequest;
  25 +import com.amazonaws.services.sqs.model.SendMessageResult;
  26 +import com.google.common.util.concurrent.FutureCallback;
  27 +import com.google.common.util.concurrent.Futures;
  28 +import com.google.common.util.concurrent.ListenableFuture;
  29 +import com.google.common.util.concurrent.ListeningExecutorService;
  30 +import com.google.common.util.concurrent.MoreExecutors;
  31 +import com.google.gson.Gson;
  32 +import lombok.extern.slf4j.Slf4j;
  33 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  34 +import org.thingsboard.server.queue.TbQueueAdmin;
  35 +import org.thingsboard.server.queue.TbQueueCallback;
  36 +import org.thingsboard.server.queue.TbQueueMsg;
  37 +import org.thingsboard.server.queue.TbQueueProducer;
  38 +
  39 +import java.util.HashMap;
  40 +import java.util.Map;
  41 +import java.util.concurrent.ConcurrentHashMap;
  42 +import java.util.concurrent.Executors;
  43 +
  44 +@Slf4j
  45 +public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
  46 + private final String defaultTopic;
  47 + private final AmazonSQS sqsClient;
  48 + private final Gson gson = new Gson();
  49 + private final Map<String, String> queueUrlMap = new ConcurrentHashMap<>();
  50 + private final TbQueueAdmin admin;
  51 + private ListeningExecutorService producerExecutor;
  52 +
  53 + public TbAwsSqsProducerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String defaultTopic) {
  54 + this.admin = admin;
  55 + this.defaultTopic = defaultTopic;
  56 +
  57 + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
  58 + AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials);
  59 +
  60 + this.sqsClient = AmazonSQSClientBuilder.standard()
  61 + .withCredentials(credProvider)
  62 + .withRegion(sqsSettings.getRegion())
  63 + .build();
  64 +
  65 + producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
  66 + }
  67 +
  68 + @Override
  69 + public void init() {
  70 +
  71 + }
  72 +
  73 + @Override
  74 + public String getDefaultTopic() {
  75 + return defaultTopic;
  76 + }
  77 +
  78 + @Override
  79 + public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  80 + SendMessageRequest sendMsgRequest = new SendMessageRequest();
  81 + sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
  82 + sendMsgRequest.withMessageBody(gson.toJson(new TbAwsSqsMsg(msg.getKey(), msg.getData())));
  83 +
  84 + Map<String, MessageAttributeValue> attributes = new HashMap<>();
  85 +
  86 + attributes.put("headers", new MessageAttributeValue()
  87 + .withStringValue(gson.toJson(msg.getHeaders().getData()))
  88 + .withDataType("String"));
  89 +
  90 + sendMsgRequest.withMessageAttributes(attributes);
  91 + sendMsgRequest.withMessageGroupId(msg.getKey().toString());
  92 + ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
  93 +
  94 + Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
  95 + @Override
  96 + public void onSuccess(SendMessageResult result) {
  97 + if (callback != null) {
  98 + callback.onSuccess(new AwsSqsTbQueueMsgMetadata(result.getSdkHttpMetadata()));
  99 + }
  100 + }
  101 +
  102 + @Override
  103 + public void onFailure(Throwable t) {
  104 + if (callback != null) {
  105 + callback.onFailure(t);
  106 + }
  107 + }
  108 + });
  109 + }
  110 +
  111 + @Override
  112 + public void stop() {
  113 + if (producerExecutor != null) {
  114 + producerExecutor.shutdownNow();
  115 + }
  116 + if (sqsClient != null) {
  117 + sqsClient.shutdown();
  118 + }
  119 + }
  120 +
  121 + private String getQueueUrl(String topic) {
  122 + return queueUrlMap.computeIfAbsent(topic, k -> {
  123 + admin.createTopicIfNotExists(topic);
  124 + return sqsClient.getQueueUrl(topic.replaceAll("\\.", "_") + ".fifo").getQueueUrl();
  125 + });
  126 + }
  127 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.sqs;
  17 +
  18 +import lombok.Data;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  22 +import org.springframework.stereotype.Component;
  23 +
  24 +@Slf4j
  25 +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'")
  26 +@Component
  27 +@Data
  28 +public class TbAwsSqsSettings {
  29 +
  30 + @Value("${queue.aws_sqs.access_key_id}")
  31 + private String accessKeyId;
  32 +
  33 + @Value("${queue.aws_sqs.secret_access_key}")
  34 + private String secretAccessKey;
  35 +
  36 + @Value("${queue.aws_sqs.region}")
  37 + private String region;
  38 +
  39 + @Value("${queue.aws_sqs.threads_per_topic}")
  40 + private int threadsPerTopic;
  41 +
  42 + @Value("${queue.aws_sqs.visibility_timeout}")
  43 + private String visibilityTimeout;
  44 +}
@@ -138,6 +138,9 @@ public class DefaultTransportService implements TransportService { @@ -138,6 +138,9 @@ public class DefaultTransportService implements TransportService {
138 while (!stopped) { 138 while (!stopped) {
139 try { 139 try {
140 List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration); 140 List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
  141 + if (records.size() == 0) {
  142 + continue;
  143 + }
141 records.forEach(record -> { 144 records.forEach(record -> {
142 try { 145 try {
143 ToTransportMsg toTransportMsg = record.getValue(); 146 ToTransportMsg toTransportMsg = record.getValue();
@@ -170,6 +173,10 @@ public class DefaultTransportService implements TransportService { @@ -170,6 +173,10 @@ public class DefaultTransportService implements TransportService {
170 perDeviceLimits.clear(); 173 perDeviceLimits.clear();
171 } 174 }
172 stopped = true; 175 stopped = true;
  176 +
  177 + if (transportNotificationsConsumer != null) {
  178 + transportNotificationsConsumer.unsubscribe();
  179 + }
173 if (schedulerExecutor != null) { 180 if (schedulerExecutor != null) {
174 schedulerExecutor.shutdownNow(); 181 schedulerExecutor.shutdownNow();
175 } 182 }
@@ -92,6 +92,7 @@ @@ -92,6 +92,7 @@
92 <fst.version>2.57</fst.version> 92 <fst.version>2.57</fst.version>
93 <antlr.version>2.7.7</antlr.version> 93 <antlr.version>2.7.7</antlr.version>
94 <snakeyaml.version>1.23</snakeyaml.version> 94 <snakeyaml.version>1.23</snakeyaml.version>
  95 + <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
95 <passay.version>1.5.0</passay.version> 96 <passay.version>1.5.0</passay.version>
96 <ua-parser.version>1.4.3</ua-parser.version> 97 <ua-parser.version>1.4.3</ua-parser.version>
97 </properties> 98 </properties>
@@ -887,6 +888,11 @@ @@ -887,6 +888,11 @@
887 <version>${jts.version}</version> 888 <version>${jts.version}</version>
888 </dependency> 889 </dependency>
889 <dependency> 890 <dependency>
  891 + <groupId>com.amazonaws</groupId>
  892 + <artifactId>aws-java-sdk-sqs</artifactId>
  893 + <version>${amazonaws.sqs.version}</version>
  894 + </dependency>
  895 + <dependency>
890 <groupId>org.passay</groupId> 896 <groupId>org.passay</groupId>
891 <artifactId>passay</artifactId> 897 <artifactId>passay</artifactId>
892 <version>${passay.version}</version> 898 <version>${passay.version}</version>
@@ -35,7 +35,7 @@ @@ -35,7 +35,7 @@
35 <properties> 35 <properties>
36 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 36 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
37 <main.dir>${basedir}/../..</main.dir> 37 <main.dir>${basedir}/../..</main.dir>
38 - <aws.sdk.version>1.11.323</aws.sdk.version> 38 + <aws.sdk.version>1.11.747</aws.sdk.version>
39 <pubsub.client.version>1.83.0</pubsub.client.version> 39 <pubsub.client.version>1.83.0</pubsub.client.version>
40 <google.common.protos.version>1.16.0</google.common.protos.version> 40 <google.common.protos.version>1.16.0</google.common.protos.version>
41 </properties> 41 </properties>
@@ -63,7 +63,7 @@ transport: @@ -63,7 +63,7 @@ transport:
63 max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" 63 max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
64 64
65 queue: 65 queue:
66 - type: "${TB_QUEUE_TYPE:kafka}" # kafka or ? 66 + type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs
67 kafka: 67 kafka:
68 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" 68 bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
69 acks: "${TB_KAFKA_ACKS:all}" 69 acks: "${TB_KAFKA_ACKS:all}"
@@ -71,6 +71,10 @@ queue: @@ -71,6 +71,10 @@ queue:
71 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" 71 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
72 linger.ms: "${TB_KAFKA_LINGER_MS:1}" 72 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
73 buffer.memory: "${TB_BUFFER_MEMORY:33554432}" 73 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  74 + aws_sqs:
  75 + access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
  76 + secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
  77 + region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
74 partitions: 78 partitions:
75 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" 79 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
76 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" 80 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"