Showing
8 changed files
with
25 additions
and
70 deletions
@@ -101,22 +101,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { | @@ -101,22 +101,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { | ||
101 | requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); | 101 | requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); |
102 | requestBuilder.defaultTopic(requestTopic); | 102 | requestBuilder.defaultTopic(requestTopic); |
103 | requestBuilder.encoder(new RemoteJsRequestEncoder()); | 103 | requestBuilder.encoder(new RemoteJsRequestEncoder()); |
104 | - requestBuilder.enricher((request, responseTopic, requestId) -> { | ||
105 | - JsInvokeProtos.RemoteJsRequest.Builder remoteRequest = JsInvokeProtos.RemoteJsRequest.newBuilder(); | ||
106 | - if (request.hasCompileRequest()) { | ||
107 | - remoteRequest.setCompileRequest(request.getCompileRequest()); | ||
108 | - } | ||
109 | - if (request.hasInvokeRequest()) { | ||
110 | - remoteRequest.setInvokeRequest(request.getInvokeRequest()); | ||
111 | - } | ||
112 | - if (request.hasReleaseRequest()) { | ||
113 | - remoteRequest.setReleaseRequest(request.getReleaseRequest()); | ||
114 | - } | ||
115 | - remoteRequest.setResponseTopic(responseTopic); | ||
116 | - remoteRequest.setRequestIdMSB(requestId.getMostSignificantBits()); | ||
117 | - remoteRequest.setRequestIdLSB(requestId.getLeastSignificantBits()); | ||
118 | - return remoteRequest.build(); | ||
119 | - }); | ||
120 | 104 | ||
121 | TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder(); | 105 | TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder(); |
122 | responseBuilder.settings(kafkaSettings); | 106 | responseBuilder.settings(kafkaSettings); |
@@ -15,15 +15,23 @@ | @@ -15,15 +15,23 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.service.script; | 16 | package org.thingsboard.server.service.script; |
17 | 17 | ||
18 | +import com.google.protobuf.InvalidProtocolBufferException; | ||
19 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 20 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
19 | import org.thingsboard.server.kafka.TbKafkaEncoder; | 21 | import org.thingsboard.server.kafka.TbKafkaEncoder; |
20 | 22 | ||
23 | +import java.nio.charset.StandardCharsets; | ||
24 | + | ||
21 | /** | 25 | /** |
22 | * Created by ashvayka on 25.09.18. | 26 | * Created by ashvayka on 25.09.18. |
23 | */ | 27 | */ |
24 | public class RemoteJsRequestEncoder implements TbKafkaEncoder<JsInvokeProtos.RemoteJsRequest> { | 28 | public class RemoteJsRequestEncoder implements TbKafkaEncoder<JsInvokeProtos.RemoteJsRequest> { |
25 | @Override | 29 | @Override |
26 | public byte[] encode(JsInvokeProtos.RemoteJsRequest value) { | 30 | public byte[] encode(JsInvokeProtos.RemoteJsRequest value) { |
27 | - return value.toByteArray(); | 31 | + try { |
32 | + return JsonFormat.printer().print(value).getBytes(StandardCharsets.UTF_8); | ||
33 | + } catch (InvalidProtocolBufferException e) { | ||
34 | + throw new RuntimeException(e); | ||
35 | + } | ||
28 | } | 36 | } |
29 | } | 37 | } |
@@ -15,10 +15,12 @@ | @@ -15,10 +15,12 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.service.script; | 16 | package org.thingsboard.server.service.script; |
17 | 17 | ||
18 | +import com.google.protobuf.util.JsonFormat; | ||
18 | import org.thingsboard.server.gen.js.JsInvokeProtos; | 19 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
19 | import org.thingsboard.server.kafka.TbKafkaDecoder; | 20 | import org.thingsboard.server.kafka.TbKafkaDecoder; |
20 | 21 | ||
21 | import java.io.IOException; | 22 | import java.io.IOException; |
23 | +import java.nio.charset.StandardCharsets; | ||
22 | 24 | ||
23 | /** | 25 | /** |
24 | * Created by ashvayka on 25.09.18. | 26 | * Created by ashvayka on 25.09.18. |
@@ -27,6 +29,8 @@ public class RemoteJsResponseDecoder implements TbKafkaDecoder<JsInvokeProtos.Re | @@ -27,6 +29,8 @@ public class RemoteJsResponseDecoder implements TbKafkaDecoder<JsInvokeProtos.Re | ||
27 | 29 | ||
28 | @Override | 30 | @Override |
29 | public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException { | 31 | public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException { |
30 | - return JsInvokeProtos.RemoteJsResponse.parseFrom(data); | 32 | + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); |
33 | + JsonFormat.parser().ignoringUnknownFields().merge(new String(data, StandardCharsets.UTF_8), builder); | ||
34 | + return builder.build(); | ||
31 | } | 35 | } |
32 | } | 36 | } |
@@ -26,12 +26,9 @@ enum JsInvokeErrorCode { | @@ -26,12 +26,9 @@ enum JsInvokeErrorCode { | ||
26 | } | 26 | } |
27 | 27 | ||
28 | message RemoteJsRequest { | 28 | message RemoteJsRequest { |
29 | - string responseTopic = 1; | ||
30 | - int64 requestIdMSB = 2; | ||
31 | - int64 requestIdLSB = 3; | ||
32 | - JsCompileRequest compileRequest = 4; | ||
33 | - JsInvokeRequest invokeRequest = 5; | ||
34 | - JsReleaseRequest releaseRequest = 6; | 29 | + JsCompileRequest compileRequest = 1; |
30 | + JsInvokeRequest invokeRequest = 2; | ||
31 | + JsReleaseRequest releaseRequest = 3; | ||
35 | } | 32 | } |
36 | 33 | ||
37 | message RemoteJsResponse { | 34 | message RemoteJsResponse { |
@@ -49,9 +49,6 @@ public class TBKafkaProducerTemplate<T> { | @@ -49,9 +49,6 @@ public class TBKafkaProducerTemplate<T> { | ||
49 | private final KafkaProducer<String, byte[]> producer; | 49 | private final KafkaProducer<String, byte[]> producer; |
50 | private final TbKafkaEncoder<T> encoder; | 50 | private final TbKafkaEncoder<T> encoder; |
51 | 51 | ||
52 | - @Builder.Default | ||
53 | - private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value); | ||
54 | - | ||
55 | private final TbKafkaPartitioner<T> partitioner; | 52 | private final TbKafkaPartitioner<T> partitioner; |
56 | private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap; | 53 | private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap; |
57 | @Getter | 54 | @Getter |
@@ -61,7 +58,7 @@ public class TBKafkaProducerTemplate<T> { | @@ -61,7 +58,7 @@ public class TBKafkaProducerTemplate<T> { | ||
61 | private final TbKafkaSettings settings; | 58 | private final TbKafkaSettings settings; |
62 | 59 | ||
63 | @Builder | 60 | @Builder |
64 | - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaEnricher<T> enricher, | 61 | + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, |
65 | TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId) { | 62 | TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId) { |
66 | Properties props = settings.toProps(); | 63 | Properties props = settings.toProps(); |
67 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | 64 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); |
@@ -72,7 +69,6 @@ public class TBKafkaProducerTemplate<T> { | @@ -72,7 +69,6 @@ public class TBKafkaProducerTemplate<T> { | ||
72 | this.settings = settings; | 69 | this.settings = settings; |
73 | this.producer = new KafkaProducer<>(props); | 70 | this.producer = new KafkaProducer<>(props); |
74 | this.encoder = encoder; | 71 | this.encoder = encoder; |
75 | - this.enricher = enricher; | ||
76 | this.partitioner = partitioner; | 72 | this.partitioner = partitioner; |
77 | this.defaultTopic = defaultTopic; | 73 | this.defaultTopic = defaultTopic; |
78 | } | 74 | } |
@@ -93,14 +89,6 @@ public class TBKafkaProducerTemplate<T> { | @@ -93,14 +89,6 @@ public class TBKafkaProducerTemplate<T> { | ||
93 | } | 89 | } |
94 | } | 90 | } |
95 | 91 | ||
96 | - T enrich(T value, String responseTopic, UUID requestId) { | ||
97 | - if (enricher != null) { | ||
98 | - return enricher.enrich(value, responseTopic, requestId); | ||
99 | - } else { | ||
100 | - return value; | ||
101 | - } | ||
102 | - } | ||
103 | - | ||
104 | public Future<RecordMetadata> send(String key, T value, Callback callback) { | 92 | public Future<RecordMetadata> send(String key, T value, Callback callback) { |
105 | return send(key, value, null, callback); | 93 | return send(key, value, null, callback); |
106 | } | 94 | } |
common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java
deleted
100644 → 0
1 | -/** | ||
2 | - * Copyright © 2016-2019 The Thingsboard Authors | ||
3 | - * | ||
4 | - * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | - * you may not use this file except in compliance with the License. | ||
6 | - * You may obtain a copy of the License at | ||
7 | - * | ||
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | - * | ||
10 | - * Unless required by applicable law or agreed to in writing, software | ||
11 | - * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | - * See the License for the specific language governing permissions and | ||
14 | - * limitations under the License. | ||
15 | - */ | ||
16 | -package org.thingsboard.server.kafka; | ||
17 | - | ||
18 | -import java.util.UUID; | ||
19 | - | ||
20 | -public interface TbKafkaEnricher<T> { | ||
21 | - | ||
22 | - T enrich(T value, String responseTopic, UUID requestId); | ||
23 | - | ||
24 | -} |
@@ -144,11 +144,11 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe | @@ -144,11 +144,11 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe | ||
144 | tickSize = pendingRequests.size(); | 144 | tickSize = pendingRequests.size(); |
145 | if (nextCleanupMs < tickTs) { | 145 | if (nextCleanupMs < tickTs) { |
146 | //cleanup; | 146 | //cleanup; |
147 | - pendingRequests.entrySet().forEach(kv -> { | ||
148 | - if (kv.getValue().expTime < tickTs) { | ||
149 | - ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey()); | 147 | + pendingRequests.forEach((key, value) -> { |
148 | + if (value.expTime < tickTs) { | ||
149 | + ResponseMetaData<Response> staleRequest = pendingRequests.remove(key); | ||
150 | if (staleRequest != null) { | 150 | if (staleRequest != null) { |
151 | - log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs); | 151 | + log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs); |
152 | staleRequest.future.setException(new TimeoutException()); | 152 | staleRequest.future.setException(new TimeoutException()); |
153 | } | 153 | } |
154 | } | 154 | } |
@@ -189,13 +189,12 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe | @@ -189,13 +189,12 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe | ||
189 | SettableFuture<Response> future = SettableFuture.create(); | 189 | SettableFuture<Response> future = SettableFuture.create(); |
190 | ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); | 190 | ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); |
191 | pendingRequests.putIfAbsent(requestId, responseMetaData); | 191 | pendingRequests.putIfAbsent(requestId, responseMetaData); |
192 | - request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); | ||
193 | log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); | 192 | log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); |
194 | requestTemplate.send(key, request, headers, (metadata, exception) -> { | 193 | requestTemplate.send(key, request, headers, (metadata, exception) -> { |
195 | if (exception != null) { | 194 | if (exception != null) { |
196 | log.trace("[{}] Failed to post the request", requestId, exception); | 195 | log.trace("[{}] Failed to post the request", requestId, exception); |
197 | } else { | 196 | } else { |
198 | - log.trace("[{}] Posted the request", requestId, metadata); | 197 | + log.trace("[{}] Posted the request: {}", requestId, metadata); |
199 | } | 198 | } |
200 | }); | 199 | }); |
201 | return future; | 200 | return future; |
@@ -32,9 +32,8 @@ import java.util.Properties; | @@ -32,9 +32,8 @@ import java.util.Properties; | ||
32 | @Component | 32 | @Component |
33 | public class TbKafkaSettings { | 33 | public class TbKafkaSettings { |
34 | 34 | ||
35 | - public static final String REQUEST_ID_HEADER = "requestId"; | ||
36 | - public static final String RESPONSE_TOPIC_HEADER = "responseTopic"; | ||
37 | - | 35 | + static final String REQUEST_ID_HEADER = "requestId"; |
36 | + static final String RESPONSE_TOPIC_HEADER = "responseTopic"; | ||
38 | 37 | ||
39 | @Value("${kafka.bootstrap.servers}") | 38 | @Value("${kafka.bootstrap.servers}") |
40 | private String servers; | 39 | private String servers; |