Commit 1b9df18c4592ea16d35b0bf0eace8e1f20a2ca43

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 58d9c313

created TbPubSubSubscriptionSettings

... ... @@ -543,9 +543,14 @@ queue:
543 543 pubsub:
544 544 project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
545 545 service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
546   - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
547 546 max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
548 547 max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
  548 + queue-properties:
  549 + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  550 + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  551 + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  552 + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  553 + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
549 554 service_bus:
550 555 namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
551 556 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
... ...
... ... @@ -38,6 +38,7 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
38 38 import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
39 39 import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
40 40 import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
  41 +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
41 42 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
42 43 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
43 44 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
... ... @@ -53,88 +54,99 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
53 54 private final TbQueueRuleEngineSettings ruleEngineSettings;
54 55 private final TbQueueTransportApiSettings transportApiSettings;
55 56 private final TbQueueTransportNotificationSettings transportNotificationSettings;
56   - private final TbQueueAdmin admin;
57 57 private final PartitionService partitionService;
58 58 private final TbServiceInfoProvider serviceInfoProvider;
59 59
  60 + private final TbQueueAdmin coreAdmin;
  61 + private final TbQueueAdmin ruleEngineAdmin;
  62 + private final TbQueueAdmin jsExecutorAdmin;
  63 + private final TbQueueAdmin transportApiAdmin;
  64 + private final TbQueueAdmin notificationAdmin;
  65 +
60 66 public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
61 67 TbQueueCoreSettings coreSettings,
62 68 TbQueueRuleEngineSettings ruleEngineSettings,
63 69 TbQueueTransportApiSettings transportApiSettings,
64 70 TbQueueTransportNotificationSettings transportNotificationSettings,
65 71 PartitionService partitionService,
66   - TbServiceInfoProvider serviceInfoProvider) {
  72 + TbServiceInfoProvider serviceInfoProvider,
  73 + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
67 74 this.pubSubSettings = pubSubSettings;
68 75 this.coreSettings = coreSettings;
69 76 this.ruleEngineSettings = ruleEngineSettings;
70 77 this.transportApiSettings = transportApiSettings;
71 78 this.transportNotificationSettings = transportNotificationSettings;
72   - this.admin = new TbPubSubAdmin(pubSubSettings);
73 79 this.partitionService = partitionService;
74 80 this.serviceInfoProvider = serviceInfoProvider;
  81 +
  82 + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
  83 + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
  84 + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
  85 + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
  86 + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
75 87 }
76 88
77 89 @Override
78 90 public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
79   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
  91 + return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
80 92 }
81 93
82 94 @Override
83 95 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
84   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic());
  96 + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
85 97
86 98 }
87 99
88 100 @Override
89 101 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
90   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic());
  102 + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
91 103 }
92 104
93 105 @Override
94 106 public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
95   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  107 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
96 108 }
97 109
98 110 @Override
99 111 public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
100   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  112 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
101 113 }
102 114
103 115 @Override
104 116 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
105   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(),
  117 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
106 118 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
107 119 }
108 120
109 121 @Override
110 122 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
111   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  123 + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
112 124 partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
113 125 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
114 126 }
115 127
116 128 @Override
117 129 public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
118   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(),
  130 + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
119 131 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
120 132 }
121 133
122 134 @Override
123 135 public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
124   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  136 + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
125 137 partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
126 138 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
127 139 }
128 140
129 141 @Override
130 142 public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
131   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(),
  143 + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
132 144 msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
133 145 }
134 146
135 147 @Override
136 148 public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
137   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getResponsesTopic());
  149 + return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
138 150 }
139 151
140 152 @Override
... ...
... ... @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueAdmin;
30 30 import org.thingsboard.server.queue.TbQueueConsumer;
31 31 import org.thingsboard.server.queue.TbQueueRequestTemplate;
32 32 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
  33 +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
  34 +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
33 35 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
34 36 import org.thingsboard.server.queue.TbQueueProducer;
35 37 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
... ... @@ -47,71 +49,79 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
47 49 private final TbPubSubSettings pubSubSettings;
48 50 private final TbQueueCoreSettings coreSettings;
49 51 private final TbQueueTransportApiSettings transportApiSettings;
50   - private final TbQueueAdmin admin;
51 52 private final PartitionService partitionService;
52 53 private final TbServiceInfoProvider serviceInfoProvider;
53 54
  55 + private final TbQueueAdmin coreAdmin;
  56 + private final TbQueueAdmin jsExecutorAdmin;
  57 + private final TbQueueAdmin transportApiAdmin;
  58 + private final TbQueueAdmin notificationAdmin;
  59 +
54 60 public PubSubTbCoreQueueFactory(TbPubSubSettings pubSubSettings,
55 61 TbQueueCoreSettings coreSettings,
56 62 TbQueueTransportApiSettings transportApiSettings,
57   - TbQueueAdmin admin,
58 63 PartitionService partitionService,
59   - TbServiceInfoProvider serviceInfoProvider) {
  64 + TbServiceInfoProvider serviceInfoProvider,
  65 + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
60 66 this.pubSubSettings = pubSubSettings;
61 67 this.coreSettings = coreSettings;
62 68 this.transportApiSettings = transportApiSettings;
63   - this.admin = admin;
64 69 this.partitionService = partitionService;
65 70 this.serviceInfoProvider = serviceInfoProvider;
  71 +
  72 + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
  73 + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
  74 + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
  75 + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
66 76 }
67 77
68 78 @Override
69 79 public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
70   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  80 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
71 81 }
72 82
73 83 @Override
74 84 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
75   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  85 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
76 86 }
77 87
78 88 @Override
79 89 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
80   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  90 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
81 91 }
82 92
83 93 @Override
84 94 public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
85   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  95 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
86 96 }
87 97
88 98 @Override
89 99 public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
90   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  100 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
91 101 }
92 102
93 103 @Override
94 104 public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
95   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(),
  105 + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic(),
96 106 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
97 107 }
98 108
99 109 @Override
100 110 public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
101   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  111 + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
102 112 partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
103 113 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
104 114 }
105 115
106 116 @Override
107 117 public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
108   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(),
  118 + return new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic(),
109 119 msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
110 120 }
111 121
112 122 @Override
113 123 public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
114   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  124 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
115 125 }
116 126
117 127 @Override
... ...
... ... @@ -32,9 +32,11 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
32 32 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 33 import org.thingsboard.server.queue.discovery.PartitionService;
34 34 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  35 +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
35 36 import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
36 37 import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
37 38 import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
  39 +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
38 40 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
39 41 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
40 42 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
... ... @@ -46,59 +48,67 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
46 48 private final TbPubSubSettings pubSubSettings;
47 49 private final TbQueueCoreSettings coreSettings;
48 50 private final TbQueueRuleEngineSettings ruleEngineSettings;
49   - private final TbQueueAdmin admin;
50 51 private final PartitionService partitionService;
51 52 private final TbServiceInfoProvider serviceInfoProvider;
52 53
  54 + private final TbQueueAdmin coreAdmin;
  55 + private final TbQueueAdmin ruleEngineAdmin;
  56 + private final TbQueueAdmin jsExecutorAdmin;
  57 + private final TbQueueAdmin notificationAdmin;
  58 +
53 59 public PubSubTbRuleEngineQueueFactory(TbPubSubSettings pubSubSettings,
54 60 TbQueueCoreSettings coreSettings,
55 61 TbQueueRuleEngineSettings ruleEngineSettings,
56   - TbQueueAdmin admin,
57 62 PartitionService partitionService,
58   - TbServiceInfoProvider serviceInfoProvider) {
  63 + TbServiceInfoProvider serviceInfoProvider,
  64 + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
59 65 this.pubSubSettings = pubSubSettings;
60 66 this.coreSettings = coreSettings;
61 67 this.ruleEngineSettings = ruleEngineSettings;
62   - this.admin = admin;
63 68 this.partitionService = partitionService;
64 69 this.serviceInfoProvider = serviceInfoProvider;
  70 +
  71 + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
  72 + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
  73 + this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
  74 + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
65 75 }
66 76
67 77 @Override
68 78 public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
69   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  79 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
70 80 }
71 81
72 82 @Override
73 83 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
74   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  84 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
75 85 }
76 86
77 87 @Override
78 88 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
79   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic());
  89 + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
80 90 }
81 91
82 92 @Override
83 93 public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
84   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  94 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
85 95
86 96 }
87 97
88 98 @Override
89 99 public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
90   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  100 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
91 101 }
92 102
93 103 @Override
94 104 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
95   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(),
  105 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
96 106 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
97 107 }
98 108
99 109 @Override
100 110 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
101   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  111 + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
102 112 partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
103 113 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
104 114 }
... ...
... ... @@ -25,12 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
25 25 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
26 26 import org.thingsboard.server.queue.TbQueueAdmin;
27 27 import org.thingsboard.server.queue.TbQueueConsumer;
28   -import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
29 28 import org.thingsboard.server.queue.TbQueueProducer;
30 29 import org.thingsboard.server.queue.TbQueueRequestTemplate;
31   -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
32   -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
33   -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
34 30 import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
35 31 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
36 32 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
... ... @@ -38,6 +34,11 @@ import org.thingsboard.server.queue.pubsub.TbPubSubAdmin;
38 34 import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
39 35 import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
40 36 import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
  37 +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
  38 +import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  39 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  40 +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
  41 +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
41 42
42 43 @Component
43 44 @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
... ... @@ -50,33 +51,42 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
50 51 private final TbQueueRuleEngineSettings ruleEngineSettings;
51 52 private final TbQueueTransportApiSettings transportApiSettings;
52 53 private final TbQueueTransportNotificationSettings transportNotificationSettings;
53   - private final TbQueueAdmin admin;
  54 +
  55 + private final TbQueueAdmin coreAdmin;
  56 + private final TbQueueAdmin ruleEngineAdmin;
  57 + private final TbQueueAdmin transportApiAdmin;
  58 + private final TbQueueAdmin notificationAdmin;
54 59
55 60 public PubSubTransportQueueFactory(TbPubSubSettings pubSubSettings,
56 61 TbServiceInfoProvider serviceInfoProvider,
57 62 TbQueueCoreSettings coreSettings,
58 63 TbQueueRuleEngineSettings ruleEngineSettings,
59 64 TbQueueTransportApiSettings transportApiSettings,
60   - TbQueueTransportNotificationSettings transportNotificationSettings) {
  65 + TbQueueTransportNotificationSettings transportNotificationSettings,
  66 + TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
61 67 this.pubSubSettings = pubSubSettings;
62 68 this.serviceInfoProvider = serviceInfoProvider;
63 69 this.coreSettings = coreSettings;
64 70 this.ruleEngineSettings = ruleEngineSettings;
65 71 this.transportApiSettings = transportApiSettings;
66 72 this.transportNotificationSettings = transportNotificationSettings;
67   - this.admin = new TbPubSubAdmin(pubSubSettings);
  73 +
  74 + this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
  75 + this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
  76 + this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
  77 + this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
68 78 }
69 79
70 80 @Override
71 81 public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
72   - TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producer = new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic());
73   - TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumer = new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  82 + TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producer = new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getRequestsTopic());
  83 + TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumer = new TbPubSubConsumerTemplate<>(transportApiAdmin, pubSubSettings,
74 84 transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
75 85 msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
76 86
77 87 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
78 88 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
79   - templateBuilder.queueAdmin(admin);
  89 + templateBuilder.queueAdmin(transportApiAdmin);
80 90 templateBuilder.requestTemplate(producer);
81 91 templateBuilder.responseTemplate(consumer);
82 92 templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
... ... @@ -87,17 +97,17 @@ public class PubSubTransportQueueFactory implements TbTransportQueueFactory {
87 97
88 98 @Override
89 99 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
90   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic());
  100 + return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
91 101 }
92 102
93 103 @Override
94 104 public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
95   - return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic());
  105 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
96 106 }
97 107
98 108 @Override
99 109 public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() {
100   - return new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  110 + return new TbPubSubConsumerTemplate<>(notificationAdmin, pubSubSettings,
101 111 transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
102 112 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
103 113 }
... ...
... ... @@ -19,32 +19,37 @@ import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
19 19 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
20 20 import com.google.cloud.pubsub.v1.TopicAdminClient;
21 21 import com.google.cloud.pubsub.v1.TopicAdminSettings;
  22 +import com.google.protobuf.Duration;
22 23 import com.google.pubsub.v1.ListSubscriptionsRequest;
23 24 import com.google.pubsub.v1.ListTopicsRequest;
24 25 import com.google.pubsub.v1.ProjectName;
25 26 import com.google.pubsub.v1.ProjectSubscriptionName;
26 27 import com.google.pubsub.v1.ProjectTopicName;
27   -import com.google.pubsub.v1.PushConfig;
28 28 import com.google.pubsub.v1.Subscription;
29 29 import com.google.pubsub.v1.Topic;
30 30 import lombok.extern.slf4j.Slf4j;
31 31 import org.thingsboard.server.queue.TbQueueAdmin;
32 32
33 33 import java.io.IOException;
  34 +import java.util.Map;
34 35 import java.util.Set;
35 36 import java.util.concurrent.ConcurrentHashMap;
36 37
37 38 @Slf4j
38 39 public class TbPubSubAdmin implements TbQueueAdmin {
  40 + private static final String ACK_DEADLINE = "ackDeadlineInSec";
  41 + private static final String MESSAGE_RETENTION = "messageRetentionInSec";
39 42
40 43 private final TbPubSubSettings pubSubSettings;
41 44 private final SubscriptionAdminSettings subscriptionAdminSettings;
42 45 private final TopicAdminSettings topicAdminSettings;
43 46 private final Set<String> topicSet = ConcurrentHashMap.newKeySet();
44 47 private final Set<String> subscriptionSet = ConcurrentHashMap.newKeySet();
  48 + private final Map<String, String> subscriptionProperties;
45 49
46   - public TbPubSubAdmin(TbPubSubSettings pubSubSettings) {
  50 + public TbPubSubAdmin(TbPubSubSettings pubSubSettings, Map<String, String> subscriptionSettings) {
47 51 this.pubSubSettings = pubSubSettings;
  52 + this.subscriptionProperties = subscriptionSettings;
48 53
49 54 try {
50 55 topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build();
... ... @@ -149,8 +154,15 @@ public class TbPubSubAdmin implements TbQueueAdmin {
149 154 }
150 155 }
151 156
152   - subscriptionAdminClient.createSubscription(
153   - subscriptionName, topicName, PushConfig.getDefaultInstance(), pubSubSettings.getAckDeadline()).getName();
  157 + Subscription.Builder subscriptionBuilder = Subscription
  158 + .newBuilder()
  159 + .setName(subscriptionName.toString())
  160 + .setTopic(topicName.toString());
  161 +
  162 + setAckDeadline(subscriptionBuilder);
  163 + setMessageRetention(subscriptionBuilder);
  164 +
  165 + subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
154 166 subscriptionSet.add(subscriptionName.toString());
155 167 log.info("Created new subscription: [{}]", subscriptionName.toString());
156 168 } catch (IOException e) {
... ... @@ -159,4 +171,19 @@ public class TbPubSubAdmin implements TbQueueAdmin {
159 171 }
160 172 }
161 173
  174 + private void setAckDeadline(Subscription.Builder builder) {
  175 + if (subscriptionProperties.containsKey(ACK_DEADLINE)) {
  176 + builder.setAckDeadlineSeconds(Integer.parseInt(subscriptionProperties.get(ACK_DEADLINE)));
  177 + }
  178 + }
  179 +
  180 + private void setMessageRetention(Subscription.Builder builder) {
  181 + if (subscriptionProperties.containsKey(MESSAGE_RETENTION)) {
  182 + Duration duration = Duration
  183 + .newBuilder()
  184 + .setSeconds(Long.parseLong(subscriptionProperties.get(MESSAGE_RETENTION)))
  185 + .build();
  186 + builder.setMessageRetentionDuration(duration);
  187 + }
  188 + }
162 189 }
... ...
... ... @@ -40,9 +40,6 @@ public class TbPubSubSettings {
40 40 @Value("${queue.pubsub.service_account}")
41 41 private String serviceAccount;
42 42
43   - @Value("${queue.pubsub.ack_deadline}")
44   - private int ackDeadline;
45   -
46 43 @Value("${queue.pubsub.max_msg_size}")
47 44 private int maxMsgSize;
48 45
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.pubsub;
  17 +
  18 +import lombok.Getter;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  21 +import org.springframework.stereotype.Component;
  22 +
  23 +import javax.annotation.PostConstruct;
  24 +import java.util.HashMap;
  25 +import java.util.Map;
  26 +
  27 +@Component
  28 +@ConditionalOnExpression("'${queue.type:null}'=='pubsub'")
  29 +public class TbPubSubSubscriptionSettings {
  30 + @Value("${queue.pubsub.queue-properties.core}")
  31 + private String coreProperties;
  32 + @Value("${queue.pubsub.queue-properties.rule-engine}")
  33 + private String ruleEngineProperties;
  34 + @Value("${queue.pubsub.queue-properties.transport-api}")
  35 + private String transportApiProperties;
  36 + @Value("${queue.pubsub.queue-properties.notifications}")
  37 + private String notificationsProperties;
  38 + @Value("${queue.pubsub.queue-properties.js-executor}")
  39 + private String jsExecutorProperties;
  40 +
  41 + @Getter
  42 + private Map<String, String> coreSettings;
  43 + @Getter
  44 + private Map<String, String> ruleEngineSettings;
  45 + @Getter
  46 + private Map<String, String> transportApiSettings;
  47 + @Getter
  48 + private Map<String, String> notificationsSettings;
  49 + @Getter
  50 + private Map<String, String> jsExecutorSettings;
  51 +
  52 + @PostConstruct
  53 + private void init() {
  54 + coreSettings = getSettings(coreProperties);
  55 + ruleEngineSettings = getSettings(ruleEngineProperties);
  56 + transportApiSettings = getSettings(transportApiProperties);
  57 + notificationsSettings = getSettings(notificationsProperties);
  58 + jsExecutorSettings = getSettings(jsExecutorProperties);
  59 + }
  60 +
  61 + private Map<String, String> getSettings(String properties) {
  62 + Map<String, String> configs = new HashMap<>();
  63 + for (String property : properties.split(";")) {
  64 + int delimiterPosition = property.indexOf(":");
  65 + String key = property.substring(0, delimiterPosition);
  66 + String value = property.substring(delimiterPosition + 1);
  67 + configs.put(key, value);
  68 + }
  69 + return configs;
  70 + }
  71 +}
... ...
... ... @@ -72,9 +72,14 @@ queue:
72 72 pubsub:
73 73 project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
74 74 service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
75   - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
76 75 max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
77 76 max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
  77 + queue-properties:
  78 + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  79 + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  80 + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  81 + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  82 + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
78 83 service_bus:
79 84 namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
80 85 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
... ...
... ... @@ -73,9 +73,14 @@ queue:
73 73 pubsub:
74 74 project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
75 75 service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
76   - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
77 76 max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
78 77 max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
  78 + queue-properties:
  79 + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  80 + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  81 + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  82 + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  83 + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
79 84 service_bus:
80 85 namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
81 86 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
... ...
... ... @@ -103,9 +103,14 @@ queue:
103 103 pubsub:
104 104 project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
105 105 service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
106   - ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again
107 106 max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
108 107 max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
  108 + queue-properties:
  109 + rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  110 + core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  111 + transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  112 + notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
  113 + js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
109 114 service_bus:
110 115 namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
111 116 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
... ...