Showing
7 changed files
with
170 additions
and
15 deletions
@@ -20,7 +20,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | @@ -20,7 +20,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
20 | import org.springframework.context.annotation.Bean; | 20 | import org.springframework.context.annotation.Bean; |
21 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
22 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
23 | -import org.thingsboard.server.gen.js.JsInvokeProtos.*; | 23 | +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; |
24 | +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; | ||
24 | import org.thingsboard.server.gen.transport.TransportProtos; | 25 | import org.thingsboard.server.gen.transport.TransportProtos; |
25 | import org.thingsboard.server.queue.TbQueueAdmin; | 26 | import org.thingsboard.server.queue.TbQueueAdmin; |
26 | import org.thingsboard.server.queue.TbQueueConsumer; | 27 | import org.thingsboard.server.queue.TbQueueConsumer; |
@@ -31,8 +32,6 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | @@ -31,8 +32,6 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | ||
31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
32 | import org.thingsboard.server.queue.discovery.PartitionService; | 33 | import org.thingsboard.server.queue.discovery.PartitionService; |
33 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 34 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
34 | -import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | ||
35 | -import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
36 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 35 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
37 | import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | 36 | import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; |
38 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 37 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -30,11 +32,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -30,11 +32,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
30 | import org.thingsboard.server.queue.TbQueueConsumer; | 32 | import org.thingsboard.server.queue.TbQueueConsumer; |
31 | import org.thingsboard.server.queue.TbQueueProducer; | 33 | import org.thingsboard.server.queue.TbQueueProducer; |
32 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 34 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
35 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
33 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 36 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 37 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
35 | import org.thingsboard.server.queue.discovery.PartitionService; | 38 | import org.thingsboard.server.queue.discovery.PartitionService; |
36 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 39 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
37 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 40 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
41 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
38 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 42 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
39 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 43 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
40 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; | 44 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
@@ -44,6 +48,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | @@ -44,6 +48,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | ||
44 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; | 48 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
45 | 49 | ||
46 | import javax.annotation.PreDestroy; | 50 | import javax.annotation.PreDestroy; |
51 | +import java.nio.charset.StandardCharsets; | ||
47 | 52 | ||
48 | @Component | 53 | @Component |
49 | @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") | 54 | @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") |
@@ -55,6 +60,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -55,6 +60,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | ||
55 | private final TbQueueTransportApiSettings transportApiSettings; | 60 | private final TbQueueTransportApiSettings transportApiSettings; |
56 | private final PartitionService partitionService; | 61 | private final PartitionService partitionService; |
57 | private final TbServiceInfoProvider serviceInfoProvider; | 62 | private final TbServiceInfoProvider serviceInfoProvider; |
63 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
58 | 64 | ||
59 | private final TbQueueAdmin coreAdmin; | 65 | private final TbQueueAdmin coreAdmin; |
60 | private final TbQueueAdmin ruleEngineAdmin; | 66 | private final TbQueueAdmin ruleEngineAdmin; |
@@ -68,6 +74,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -68,6 +74,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | ||
68 | TbQueueRuleEngineSettings ruleEngineSettings, | 74 | TbQueueRuleEngineSettings ruleEngineSettings, |
69 | PartitionService partitionService, | 75 | PartitionService partitionService, |
70 | TbServiceInfoProvider serviceInfoProvider, | 76 | TbServiceInfoProvider serviceInfoProvider, |
77 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings, | ||
71 | TbAwsSqsQueueAttributes sqsQueueAttributes) { | 78 | TbAwsSqsQueueAttributes sqsQueueAttributes) { |
72 | this.sqsSettings = sqsSettings; | 79 | this.sqsSettings = sqsSettings; |
73 | this.coreSettings = coreSettings; | 80 | this.coreSettings = coreSettings; |
@@ -75,6 +82,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -75,6 +82,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | ||
75 | this.ruleEngineSettings = ruleEngineSettings; | 82 | this.ruleEngineSettings = ruleEngineSettings; |
76 | this.partitionService = partitionService; | 83 | this.partitionService = partitionService; |
77 | this.serviceInfoProvider = serviceInfoProvider; | 84 | this.serviceInfoProvider = serviceInfoProvider; |
85 | + this.jsInvokeSettings = jsInvokeSettings; | ||
78 | 86 | ||
79 | this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | 87 | this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); |
80 | this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); | 88 | this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); |
@@ -133,8 +141,26 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -133,8 +141,26 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { | ||
133 | } | 141 | } |
134 | 142 | ||
135 | @Override | 143 | @Override |
144 | + @Bean | ||
136 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 145 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
137 | - return null; | 146 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); |
147 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, | ||
148 | + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), | ||
149 | + msg -> { | ||
150 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
151 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
152 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
153 | + }); | ||
154 | + | ||
155 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
156 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
157 | + builder.queueAdmin(jsExecutorAdmin); | ||
158 | + builder.requestTemplate(producer); | ||
159 | + builder.responseTemplate(consumer); | ||
160 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
161 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
162 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
163 | + return builder.build(); | ||
138 | } | 164 | } |
139 | 165 | ||
140 | @PreDestroy | 166 | @PreDestroy |
common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -27,11 +29,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -27,11 +29,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
27 | import org.thingsboard.server.queue.TbQueueConsumer; | 29 | import org.thingsboard.server.queue.TbQueueConsumer; |
28 | import org.thingsboard.server.queue.TbQueueProducer; | 30 | import org.thingsboard.server.queue.TbQueueProducer; |
29 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 31 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
32 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
30 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 33 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
32 | import org.thingsboard.server.queue.discovery.PartitionService; | 35 | import org.thingsboard.server.queue.discovery.PartitionService; |
33 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 36 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
34 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 37 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
38 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
35 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 39 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
36 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | 40 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
37 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; | 41 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
@@ -41,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | @@ -41,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | ||
41 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; | 45 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
42 | 46 | ||
43 | import javax.annotation.PreDestroy; | 47 | import javax.annotation.PreDestroy; |
48 | +import java.nio.charset.StandardCharsets; | ||
44 | 49 | ||
45 | @Component | 50 | @Component |
46 | @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") | 51 | @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") |
@@ -51,6 +56,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -51,6 +56,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
51 | private final TbServiceInfoProvider serviceInfoProvider; | 56 | private final TbServiceInfoProvider serviceInfoProvider; |
52 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 57 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
53 | private final TbAwsSqsSettings sqsSettings; | 58 | private final TbAwsSqsSettings sqsSettings; |
59 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
54 | 60 | ||
55 | private final TbQueueAdmin coreAdmin; | 61 | private final TbQueueAdmin coreAdmin; |
56 | private final TbQueueAdmin ruleEngineAdmin; | 62 | private final TbQueueAdmin ruleEngineAdmin; |
@@ -61,12 +67,14 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -61,12 +67,14 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
61 | TbQueueRuleEngineSettings ruleEngineSettings, | 67 | TbQueueRuleEngineSettings ruleEngineSettings, |
62 | TbServiceInfoProvider serviceInfoProvider, | 68 | TbServiceInfoProvider serviceInfoProvider, |
63 | TbAwsSqsSettings sqsSettings, | 69 | TbAwsSqsSettings sqsSettings, |
64 | - TbAwsSqsQueueAttributes sqsQueueAttributes) { | 70 | + TbAwsSqsQueueAttributes sqsQueueAttributes, |
71 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { | ||
65 | this.partitionService = partitionService; | 72 | this.partitionService = partitionService; |
66 | this.coreSettings = coreSettings; | 73 | this.coreSettings = coreSettings; |
67 | this.serviceInfoProvider = serviceInfoProvider; | 74 | this.serviceInfoProvider = serviceInfoProvider; |
68 | this.ruleEngineSettings = ruleEngineSettings; | 75 | this.ruleEngineSettings = ruleEngineSettings; |
69 | this.sqsSettings = sqsSettings; | 76 | this.sqsSettings = sqsSettings; |
77 | + this.jsInvokeSettings = jsInvokeSettings; | ||
70 | 78 | ||
71 | this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | 79 | this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); |
72 | this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); | 80 | this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); |
@@ -113,8 +121,26 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -113,8 +121,26 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
113 | } | 121 | } |
114 | 122 | ||
115 | @Override | 123 | @Override |
124 | + @Bean | ||
116 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 125 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
117 | - return null; | 126 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); |
127 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, | ||
128 | + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), | ||
129 | + msg -> { | ||
130 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
131 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
132 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
133 | + }); | ||
134 | + | ||
135 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
136 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
137 | + builder.queueAdmin(jsExecutorAdmin); | ||
138 | + builder.requestTemplate(producer); | ||
139 | + builder.responseTemplate(consumer); | ||
140 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
141 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
142 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
143 | + return builder.build(); | ||
118 | } | 144 | } |
119 | 145 | ||
120 | @PreDestroy | 146 | @PreDestroy |
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -28,21 +30,24 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM | @@ -28,21 +30,24 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM | ||
28 | import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; | 30 | import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; |
29 | import org.thingsboard.server.queue.TbQueueAdmin; | 31 | import org.thingsboard.server.queue.TbQueueAdmin; |
30 | import org.thingsboard.server.queue.TbQueueConsumer; | 32 | import org.thingsboard.server.queue.TbQueueConsumer; |
33 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
31 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 34 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
35 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
32 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 36 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
33 | -import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; | ||
34 | -import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; | ||
35 | -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
36 | -import org.thingsboard.server.queue.TbQueueProducer; | ||
37 | -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | ||
38 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 37 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
39 | import org.thingsboard.server.queue.discovery.PartitionService; | 38 | import org.thingsboard.server.queue.discovery.PartitionService; |
40 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 39 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
40 | +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; | ||
41 | import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | 41 | import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; |
42 | import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | 42 | import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; |
43 | import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | 43 | import org.thingsboard.server.queue.pubsub.TbPubSubSettings; |
44 | +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; | ||
45 | +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
46 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
47 | +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | ||
44 | 48 | ||
45 | import javax.annotation.PreDestroy; | 49 | import javax.annotation.PreDestroy; |
50 | +import java.nio.charset.StandardCharsets; | ||
46 | 51 | ||
47 | @Component | 52 | @Component |
48 | @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") | 53 | @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") |
@@ -53,6 +58,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -53,6 +58,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | ||
53 | private final TbQueueTransportApiSettings transportApiSettings; | 58 | private final TbQueueTransportApiSettings transportApiSettings; |
54 | private final PartitionService partitionService; | 59 | private final PartitionService partitionService; |
55 | private final TbServiceInfoProvider serviceInfoProvider; | 60 | private final TbServiceInfoProvider serviceInfoProvider; |
61 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
56 | 62 | ||
57 | private final TbQueueAdmin coreAdmin; | 63 | private final TbQueueAdmin coreAdmin; |
58 | private final TbQueueAdmin jsExecutorAdmin; | 64 | private final TbQueueAdmin jsExecutorAdmin; |
@@ -64,12 +70,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -64,12 +70,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | ||
64 | TbQueueTransportApiSettings transportApiSettings, | 70 | TbQueueTransportApiSettings transportApiSettings, |
65 | PartitionService partitionService, | 71 | PartitionService partitionService, |
66 | TbServiceInfoProvider serviceInfoProvider, | 72 | TbServiceInfoProvider serviceInfoProvider, |
73 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings, | ||
67 | TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { | 74 | TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { |
68 | this.pubSubSettings = pubSubSettings; | 75 | this.pubSubSettings = pubSubSettings; |
69 | this.coreSettings = coreSettings; | 76 | this.coreSettings = coreSettings; |
70 | this.transportApiSettings = transportApiSettings; | 77 | this.transportApiSettings = transportApiSettings; |
71 | this.partitionService = partitionService; | 78 | this.partitionService = partitionService; |
72 | this.serviceInfoProvider = serviceInfoProvider; | 79 | this.serviceInfoProvider = serviceInfoProvider; |
80 | + this.jsInvokeSettings = jsInvokeSettings; | ||
73 | 81 | ||
74 | this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); | 82 | this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); |
75 | this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); | 83 | this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); |
@@ -127,8 +135,26 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -127,8 +135,26 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { | ||
127 | } | 135 | } |
128 | 136 | ||
129 | @Override | 137 | @Override |
138 | + @Bean | ||
130 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 139 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
131 | - return null; | 140 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); |
141 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, | ||
142 | + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), | ||
143 | + msg -> { | ||
144 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
145 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
146 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
147 | + }); | ||
148 | + | ||
149 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
150 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
151 | + builder.queueAdmin(jsExecutorAdmin); | ||
152 | + builder.requestTemplate(producer); | ||
153 | + builder.responseTemplate(consumer); | ||
154 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
155 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
156 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
157 | + return builder.build(); | ||
132 | } | 158 | } |
133 | 159 | ||
134 | @PreDestroy | 160 | @PreDestroy |
common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
28 | import org.thingsboard.server.queue.TbQueueConsumer; | 30 | import org.thingsboard.server.queue.TbQueueConsumer; |
29 | import org.thingsboard.server.queue.TbQueueProducer; | 31 | import org.thingsboard.server.queue.TbQueueProducer; |
30 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 32 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
33 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
31 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 34 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 35 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
33 | import org.thingsboard.server.queue.discovery.PartitionService; | 36 | import org.thingsboard.server.queue.discovery.PartitionService; |
@@ -38,10 +41,12 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
38 | import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | 41 | import org.thingsboard.server.queue.pubsub.TbPubSubSettings; |
39 | import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; | 42 | import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; |
40 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 43 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
44 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
41 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 45 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
42 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | 46 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
43 | 47 | ||
44 | import javax.annotation.PreDestroy; | 48 | import javax.annotation.PreDestroy; |
49 | +import java.nio.charset.StandardCharsets; | ||
45 | 50 | ||
46 | @Component | 51 | @Component |
47 | @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") | 52 | @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") |
@@ -52,6 +57,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -52,6 +57,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
52 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 57 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
53 | private final PartitionService partitionService; | 58 | private final PartitionService partitionService; |
54 | private final TbServiceInfoProvider serviceInfoProvider; | 59 | private final TbServiceInfoProvider serviceInfoProvider; |
60 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
55 | 61 | ||
56 | private final TbQueueAdmin coreAdmin; | 62 | private final TbQueueAdmin coreAdmin; |
57 | private final TbQueueAdmin ruleEngineAdmin; | 63 | private final TbQueueAdmin ruleEngineAdmin; |
@@ -63,12 +69,14 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -63,12 +69,14 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
63 | TbQueueRuleEngineSettings ruleEngineSettings, | 69 | TbQueueRuleEngineSettings ruleEngineSettings, |
64 | PartitionService partitionService, | 70 | PartitionService partitionService, |
65 | TbServiceInfoProvider serviceInfoProvider, | 71 | TbServiceInfoProvider serviceInfoProvider, |
72 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings, | ||
66 | TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { | 73 | TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { |
67 | this.pubSubSettings = pubSubSettings; | 74 | this.pubSubSettings = pubSubSettings; |
68 | this.coreSettings = coreSettings; | 75 | this.coreSettings = coreSettings; |
69 | this.ruleEngineSettings = ruleEngineSettings; | 76 | this.ruleEngineSettings = ruleEngineSettings; |
70 | this.partitionService = partitionService; | 77 | this.partitionService = partitionService; |
71 | this.serviceInfoProvider = serviceInfoProvider; | 78 | this.serviceInfoProvider = serviceInfoProvider; |
79 | + this.jsInvokeSettings = jsInvokeSettings; | ||
72 | 80 | ||
73 | this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); | 81 | this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); |
74 | this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); | 82 | this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); |
@@ -116,8 +124,26 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | @@ -116,8 +124,26 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory | ||
116 | } | 124 | } |
117 | 125 | ||
118 | @Override | 126 | @Override |
127 | + @Bean | ||
119 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 128 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
120 | - return null; | 129 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); |
130 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, | ||
131 | + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), | ||
132 | + msg -> { | ||
133 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
134 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
135 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
136 | + }); | ||
137 | + | ||
138 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
139 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
140 | + builder.queueAdmin(jsExecutorAdmin); | ||
141 | + builder.requestTemplate(producer); | ||
142 | + builder.responseTemplate(consumer); | ||
143 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
144 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
145 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
146 | + return builder.build(); | ||
121 | } | 147 | } |
122 | 148 | ||
123 | @PreDestroy | 149 | @PreDestroy |
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -30,6 +32,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -30,6 +32,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
30 | import org.thingsboard.server.queue.TbQueueConsumer; | 32 | import org.thingsboard.server.queue.TbQueueConsumer; |
31 | import org.thingsboard.server.queue.TbQueueProducer; | 33 | import org.thingsboard.server.queue.TbQueueProducer; |
32 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 34 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
35 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
33 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 36 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 37 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
35 | import org.thingsboard.server.queue.discovery.PartitionService; | 38 | import org.thingsboard.server.queue.discovery.PartitionService; |
@@ -40,10 +43,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | @@ -40,10 +43,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | ||
40 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | 43 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; |
41 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 44 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
42 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 45 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
46 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
43 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 47 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
44 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 48 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
45 | 49 | ||
46 | import javax.annotation.PreDestroy; | 50 | import javax.annotation.PreDestroy; |
51 | +import java.nio.charset.StandardCharsets; | ||
47 | 52 | ||
48 | @Component | 53 | @Component |
49 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") | 54 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") |
@@ -55,6 +60,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -55,6 +60,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
55 | private final TbQueueTransportApiSettings transportApiSettings; | 60 | private final TbQueueTransportApiSettings transportApiSettings; |
56 | private final PartitionService partitionService; | 61 | private final PartitionService partitionService; |
57 | private final TbServiceInfoProvider serviceInfoProvider; | 62 | private final TbServiceInfoProvider serviceInfoProvider; |
63 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
58 | 64 | ||
59 | private final TbQueueAdmin coreAdmin; | 65 | private final TbQueueAdmin coreAdmin; |
60 | private final TbQueueAdmin ruleEngineAdmin; | 66 | private final TbQueueAdmin ruleEngineAdmin; |
@@ -68,6 +74,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -68,6 +74,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
68 | TbQueueRuleEngineSettings ruleEngineSettings, | 74 | TbQueueRuleEngineSettings ruleEngineSettings, |
69 | PartitionService partitionService, | 75 | PartitionService partitionService, |
70 | TbServiceInfoProvider serviceInfoProvider, | 76 | TbServiceInfoProvider serviceInfoProvider, |
77 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings, | ||
71 | TbRabbitMqQueueArguments queueArguments) { | 78 | TbRabbitMqQueueArguments queueArguments) { |
72 | this.rabbitMqSettings = rabbitMqSettings; | 79 | this.rabbitMqSettings = rabbitMqSettings; |
73 | this.coreSettings = coreSettings; | 80 | this.coreSettings = coreSettings; |
@@ -75,6 +82,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -75,6 +82,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
75 | this.ruleEngineSettings = ruleEngineSettings; | 82 | this.ruleEngineSettings = ruleEngineSettings; |
76 | this.partitionService = partitionService; | 83 | this.partitionService = partitionService; |
77 | this.serviceInfoProvider = serviceInfoProvider; | 84 | this.serviceInfoProvider = serviceInfoProvider; |
85 | + this.jsInvokeSettings = jsInvokeSettings; | ||
78 | 86 | ||
79 | this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | 87 | this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); |
80 | this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | 88 | this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); |
@@ -133,8 +141,26 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -133,8 +141,26 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
133 | } | 141 | } |
134 | 142 | ||
135 | @Override | 143 | @Override |
144 | + @Bean | ||
136 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 145 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
137 | - return null; | 146 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); |
147 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, | ||
148 | + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), | ||
149 | + msg -> { | ||
150 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
151 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
152 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
153 | + }); | ||
154 | + | ||
155 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
156 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
157 | + builder.queueAdmin(jsExecutorAdmin); | ||
158 | + builder.requestTemplate(producer); | ||
159 | + builder.responseTemplate(consumer); | ||
160 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
161 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
162 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
163 | + return builder.build(); | ||
138 | } | 164 | } |
139 | 165 | ||
140 | @PreDestroy | 166 | @PreDestroy |
@@ -15,7 +15,9 @@ | @@ -15,7 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.queue.provider; | 16 | package org.thingsboard.server.queue.provider; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 19 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
20 | +import org.springframework.context.annotation.Bean; | ||
19 | import org.springframework.stereotype.Component; | 21 | import org.springframework.stereotype.Component; |
20 | import org.thingsboard.server.common.msg.queue.ServiceType; | 22 | import org.thingsboard.server.common.msg.queue.ServiceType; |
21 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 23 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
@@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
28 | import org.thingsboard.server.queue.TbQueueConsumer; | 30 | import org.thingsboard.server.queue.TbQueueConsumer; |
29 | import org.thingsboard.server.queue.TbQueueProducer; | 31 | import org.thingsboard.server.queue.TbQueueProducer; |
30 | import org.thingsboard.server.queue.TbQueueRequestTemplate; | 32 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
33 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
31 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | 34 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 35 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
33 | import org.thingsboard.server.queue.discovery.PartitionService; | 36 | import org.thingsboard.server.queue.discovery.PartitionService; |
@@ -38,10 +41,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | ||
38 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | 41 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; |
39 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 42 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
40 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 43 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
44 | +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; | ||
41 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 45 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
42 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | 46 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
43 | 47 | ||
44 | import javax.annotation.PreDestroy; | 48 | import javax.annotation.PreDestroy; |
49 | +import java.nio.charset.StandardCharsets; | ||
45 | 50 | ||
46 | @Component | 51 | @Component |
47 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") | 52 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") |
@@ -52,6 +57,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | @@ -52,6 +57,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | ||
52 | private final TbServiceInfoProvider serviceInfoProvider; | 57 | private final TbServiceInfoProvider serviceInfoProvider; |
53 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 58 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
54 | private final TbRabbitMqSettings rabbitMqSettings; | 59 | private final TbRabbitMqSettings rabbitMqSettings; |
60 | + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; | ||
55 | 61 | ||
56 | private final TbQueueAdmin coreAdmin; | 62 | private final TbQueueAdmin coreAdmin; |
57 | private final TbQueueAdmin ruleEngineAdmin; | 63 | private final TbQueueAdmin ruleEngineAdmin; |
@@ -62,12 +68,14 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | @@ -62,12 +68,14 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | ||
62 | TbQueueRuleEngineSettings ruleEngineSettings, | 68 | TbQueueRuleEngineSettings ruleEngineSettings, |
63 | TbServiceInfoProvider serviceInfoProvider, | 69 | TbServiceInfoProvider serviceInfoProvider, |
64 | TbRabbitMqSettings rabbitMqSettings, | 70 | TbRabbitMqSettings rabbitMqSettings, |
71 | + TbQueueRemoteJsInvokeSettings jsInvokeSettings, | ||
65 | TbRabbitMqQueueArguments queueArguments) { | 72 | TbRabbitMqQueueArguments queueArguments) { |
66 | this.partitionService = partitionService; | 73 | this.partitionService = partitionService; |
67 | this.coreSettings = coreSettings; | 74 | this.coreSettings = coreSettings; |
68 | this.serviceInfoProvider = serviceInfoProvider; | 75 | this.serviceInfoProvider = serviceInfoProvider; |
69 | this.ruleEngineSettings = ruleEngineSettings; | 76 | this.ruleEngineSettings = ruleEngineSettings; |
70 | this.rabbitMqSettings = rabbitMqSettings; | 77 | this.rabbitMqSettings = rabbitMqSettings; |
78 | + this.jsInvokeSettings = jsInvokeSettings; | ||
71 | 79 | ||
72 | this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | 80 | this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); |
73 | this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | 81 | this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); |
@@ -114,8 +122,26 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | @@ -114,8 +122,26 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | ||
114 | } | 122 | } |
115 | 123 | ||
116 | @Override | 124 | @Override |
125 | + @Bean | ||
117 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 126 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
118 | - return null; | 127 | + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); |
128 | + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, | ||
129 | + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), | ||
130 | + msg -> { | ||
131 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); | ||
132 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); | ||
133 | + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); | ||
134 | + }); | ||
135 | + | ||
136 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
137 | + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); | ||
138 | + builder.queueAdmin(jsExecutorAdmin); | ||
139 | + builder.requestTemplate(producer); | ||
140 | + builder.responseTemplate(consumer); | ||
141 | + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); | ||
142 | + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); | ||
143 | + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); | ||
144 | + return builder.build(); | ||
119 | } | 145 | } |
120 | 146 | ||
121 | @PreDestroy | 147 | @PreDestroy |