Commit cc79b026035e6b9711fd142405b06bf1fcae88ad
Committed by
Andrew Shvayka
1 parent
3d9f1cf1
added default credentials provider chain for aws sqs consumer and producer
Showing
4 changed files
with
28 additions
and
16 deletions
@@ -612,9 +612,6 @@ queue: | @@ -612,9 +612,6 @@ queue: | ||
612 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 612 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
613 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" | 613 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
614 | aws_sqs: | 614 | aws_sqs: |
615 | - # @see https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-roles.html | ||
616 | - # setting this to true, will ignore the access keys below and instead use the | ||
617 | - # default credential provider chain, which includes instance profile credentials etc. | ||
618 | use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | 615 | use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" |
619 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" | 616 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
620 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 617 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
@@ -15,7 +15,11 @@ | @@ -15,7 +15,11 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.sqs; | 16 | package org.thingsboard.server.queue.sqs; |
17 | 17 | ||
18 | -import com.amazonaws.auth.*; | 18 | +import com.amazonaws.auth.AWSCredentials; |
19 | +import com.amazonaws.auth.AWSCredentialsProvider; | ||
20 | +import com.amazonaws.auth.AWSStaticCredentialsProvider; | ||
21 | +import com.amazonaws.auth.BasicAWSCredentials; | ||
22 | +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; | ||
19 | import com.amazonaws.services.sqs.AmazonSQS; | 23 | import com.amazonaws.services.sqs.AmazonSQS; |
20 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | 24 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; |
21 | import com.amazonaws.services.sqs.model.CreateQueueRequest; | 25 | import com.amazonaws.services.sqs.model.CreateQueueRequest; |
@@ -16,8 +16,10 @@ | @@ -16,8 +16,10 @@ | ||
16 | package org.thingsboard.server.queue.sqs; | 16 | package org.thingsboard.server.queue.sqs; |
17 | 17 | ||
18 | import com.amazonaws.auth.AWSCredentials; | 18 | import com.amazonaws.auth.AWSCredentials; |
19 | +import com.amazonaws.auth.AWSCredentialsProvider; | ||
19 | import com.amazonaws.auth.AWSStaticCredentialsProvider; | 20 | import com.amazonaws.auth.AWSStaticCredentialsProvider; |
20 | import com.amazonaws.auth.BasicAWSCredentials; | 21 | import com.amazonaws.auth.BasicAWSCredentials; |
22 | +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; | ||
21 | import com.amazonaws.services.sqs.AmazonSQS; | 23 | import com.amazonaws.services.sqs.AmazonSQS; |
22 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | 24 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; |
23 | import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; | 25 | import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; |
@@ -67,13 +69,19 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractPara | @@ -67,13 +69,19 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractPara | ||
67 | this.decoder = decoder; | 69 | this.decoder = decoder; |
68 | this.sqsSettings = sqsSettings; | 70 | this.sqsSettings = sqsSettings; |
69 | 71 | ||
70 | - AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); | ||
71 | - AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); | 72 | + AWSCredentialsProvider credentialsProvider; |
73 | + if (sqsSettings.getUseDefaultCredentialProviderChain()) { | ||
74 | + credentialsProvider = new DefaultAWSCredentialsProviderChain(); | ||
75 | + } else { | ||
76 | + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); | ||
77 | + credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); | ||
78 | + } | ||
72 | 79 | ||
73 | - this.sqsClient = AmazonSQSClientBuilder.standard() | ||
74 | - .withCredentials(credProvider) | 80 | + sqsClient = AmazonSQSClientBuilder.standard() |
81 | + .withCredentials(credentialsProvider) | ||
75 | .withRegion(sqsSettings.getRegion()) | 82 | .withRegion(sqsSettings.getRegion()) |
76 | .build(); | 83 | .build(); |
84 | + | ||
77 | } | 85 | } |
78 | 86 | ||
79 | @Override | 87 | @Override |
@@ -16,8 +16,10 @@ | @@ -16,8 +16,10 @@ | ||
16 | package org.thingsboard.server.queue.sqs; | 16 | package org.thingsboard.server.queue.sqs; |
17 | 17 | ||
18 | import com.amazonaws.auth.AWSCredentials; | 18 | import com.amazonaws.auth.AWSCredentials; |
19 | +import com.amazonaws.auth.AWSCredentialsProvider; | ||
19 | import com.amazonaws.auth.AWSStaticCredentialsProvider; | 20 | import com.amazonaws.auth.AWSStaticCredentialsProvider; |
20 | import com.amazonaws.auth.BasicAWSCredentials; | 21 | import com.amazonaws.auth.BasicAWSCredentials; |
22 | +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; | ||
21 | import com.amazonaws.services.sqs.AmazonSQS; | 23 | import com.amazonaws.services.sqs.AmazonSQS; |
22 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | 24 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; |
23 | import com.amazonaws.services.sqs.model.SendMessageRequest; | 25 | import com.amazonaws.services.sqs.model.SendMessageRequest; |
@@ -26,7 +28,6 @@ import com.google.common.util.concurrent.FutureCallback; | @@ -26,7 +28,6 @@ import com.google.common.util.concurrent.FutureCallback; | ||
26 | import com.google.common.util.concurrent.Futures; | 28 | import com.google.common.util.concurrent.Futures; |
27 | import com.google.common.util.concurrent.ListenableFuture; | 29 | import com.google.common.util.concurrent.ListenableFuture; |
28 | import com.google.common.util.concurrent.ListeningExecutorService; | 30 | import com.google.common.util.concurrent.ListeningExecutorService; |
29 | -import com.google.common.util.concurrent.MoreExecutors; | ||
30 | import com.google.gson.Gson; | 31 | import com.google.gson.Gson; |
31 | import lombok.extern.slf4j.Slf4j; | 32 | import lombok.extern.slf4j.Slf4j; |
32 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; | 33 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
@@ -39,7 +40,6 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | @@ -39,7 +40,6 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | ||
39 | import java.util.Map; | 40 | import java.util.Map; |
40 | import java.util.UUID; | 41 | import java.util.UUID; |
41 | import java.util.concurrent.ConcurrentHashMap; | 42 | import java.util.concurrent.ConcurrentHashMap; |
42 | -import java.util.concurrent.Executors; | ||
43 | 43 | ||
44 | @Slf4j | 44 | @Slf4j |
45 | public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { | 45 | public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { |
@@ -54,15 +54,18 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | @@ -54,15 +54,18 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | ||
54 | this.admin = admin; | 54 | this.admin = admin; |
55 | this.defaultTopic = defaultTopic; | 55 | this.defaultTopic = defaultTopic; |
56 | 56 | ||
57 | - AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); | ||
58 | - AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); | 57 | + AWSCredentialsProvider credentialsProvider; |
58 | + if (sqsSettings.getUseDefaultCredentialProviderChain()) { | ||
59 | + credentialsProvider = new DefaultAWSCredentialsProviderChain(); | ||
60 | + } else { | ||
61 | + AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); | ||
62 | + credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); | ||
63 | + } | ||
59 | 64 | ||
60 | - this.sqsClient = AmazonSQSClientBuilder.standard() | ||
61 | - .withCredentials(credProvider) | 65 | + sqsClient = AmazonSQSClientBuilder.standard() |
66 | + .withCredentials(credentialsProvider) | ||
62 | .withRegion(sqsSettings.getRegion()) | 67 | .withRegion(sqsSettings.getRegion()) |
63 | .build(); | 68 | .build(); |
64 | - | ||
65 | - producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); | ||
66 | } | 69 | } |
67 | 70 | ||
68 | @Override | 71 | @Override |