Commit 2ada821d6c3cadb8e0f7fd2f5286bf1955a073ce

Authored by Andrew Shvayka
1 parent 175e7325

Implementation draft

Showing 32 changed files with 1382 additions and 217 deletions
... ... @@ -77,6 +77,10 @@
77 77 <artifactId>dao</artifactId>
78 78 </dependency>
79 79 <dependency>
  80 + <groupId>org.thingsboard.common</groupId>
  81 + <artifactId>queue</artifactId>
  82 + </dependency>
  83 + <dependency>
80 84 <groupId>org.thingsboard</groupId>
81 85 <artifactId>dao</artifactId>
82 86 <type>test-jar</type>
... ...
... ... @@ -26,6 +26,8 @@ public interface DiscoveryService {
26 26
27 27 void unpublishCurrentServer();
28 28
  29 + String getNodeId();
  30 +
29 31 ServerInstance getCurrentServer();
30 32
31 33 List<ServerInstance> getOtherServers();
... ...
... ... @@ -43,6 +43,11 @@ public class DummyDiscoveryService implements DiscoveryService {
43 43 }
44 44
45 45 @Override
  46 + public String getNodeId() {
  47 + return null;
  48 + }
  49 +
  50 + @Override
46 51 public void publishCurrentServer() {
47 52 //Do nothing
48 53 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.cluster.discovery;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.commons.lang3.RandomStringUtils;
19 20 import org.apache.commons.lang3.SerializationException;
20 21 import org.apache.commons.lang3.SerializationUtils;
21 22 import org.apache.curator.framework.CuratorFramework;
... ... @@ -93,11 +94,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
93 94 private CuratorFramework client;
94 95 private PathChildrenCache cache;
95 96 private String nodePath;
96   -
  97 + //TODO: make persistent?
  98 + private String nodeId;
97 99
98 100 @PostConstruct
99 101 public void init() {
100 102 log.info("Initializing...");
  103 + this.nodeId = RandomStringUtils.randomAlphabetic(10);
101 104 Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
102 105 Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
103 106 Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
... ... @@ -181,6 +184,11 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
181 184 }
182 185
183 186 @Override
  187 + public String getNodeId() {
  188 + return nodeId;
  189 + }
  190 +
  191 + @Override
184 192 public ServerInstance getCurrentServer() {
185 193 return serverInstance.getSelf();
186 194 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.script;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +
  22 +import java.util.Map;
  23 +import java.util.UUID;
  24 +import java.util.concurrent.ConcurrentHashMap;
  25 +import java.util.concurrent.atomic.AtomicInteger;
  26 +
  27 +/**
  28 + * Created by ashvayka on 26.09.18.
  29 + */
  30 +@Slf4j
  31 +public abstract class AbstractJsInvokeService implements JsInvokeService {
  32 +
  33 + protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
  34 + protected Map<UUID, AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
  35 +
  36 + @Override
  37 + public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
  38 + UUID scriptId = UUID.randomUUID();
  39 + String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
  40 + String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
  41 + return doEval(scriptId, functionName, jsScript);
  42 + }
  43 +
  44 + @Override
  45 + public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
  46 + String functionName = scriptIdToNameMap.get(scriptId);
  47 + if (functionName == null) {
  48 + return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
  49 + }
  50 + if (!isBlackListed(scriptId)) {
  51 + return doInvokeFunction(scriptId, functionName, args);
  52 + } else {
  53 + return Futures.immediateFailedFuture(
  54 + new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
  55 + }
  56 + }
  57 +
  58 + @Override
  59 + public ListenableFuture<Void> release(UUID scriptId) {
  60 + String functionName = scriptIdToNameMap.get(scriptId);
  61 + if (functionName != null) {
  62 + try {
  63 + scriptIdToNameMap.remove(scriptId);
  64 + blackListedFunctions.remove(scriptId);
  65 + doRelease(scriptId, functionName);
  66 + } catch (Exception e) {
  67 + return Futures.immediateFailedFuture(e);
  68 + }
  69 + }
  70 + return Futures.immediateFuture(null);
  71 + }
  72 +
  73 + protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody);
  74 +
  75 + protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args);
  76 +
  77 + protected abstract void doRelease(UUID scriptId, String functionName) throws Exception;
  78 +
  79 + protected abstract int getMaxErrors();
  80 +
  81 + protected void onScriptExecutionError(UUID scriptId) {
  82 + blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet();
  83 + }
  84 +
  85 + private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
  86 + switch (scriptType) {
  87 + case RULE_NODE_SCRIPT:
  88 + return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
  89 + default:
  90 + throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
  91 + }
  92 + }
  93 +
  94 + private boolean isBlackListed(UUID scriptId) {
  95 + if (blackListedFunctions.containsKey(scriptId)) {
  96 + AtomicInteger errorCount = blackListedFunctions.get(scriptId);
  97 + return errorCount.get() >= getMaxErrors();
  98 + } else {
  99 + return false;
  100 + }
  101 + }
  102 +}
... ...
... ... @@ -20,38 +20,24 @@ import com.google.common.util.concurrent.ListenableFuture;
20 20 import delight.nashornsandbox.NashornSandbox;
21 21 import delight.nashornsandbox.NashornSandboxes;
22 22 import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
23   -import lombok.Data;
24   -import lombok.EqualsAndHashCode;
25   -import lombok.Getter;
26   -import lombok.RequiredArgsConstructor;
27 23 import lombok.extern.slf4j.Slf4j;
28   -import org.thingsboard.server.common.data.id.EntityId;
29 24
30 25 import javax.annotation.PostConstruct;
31 26 import javax.annotation.PreDestroy;
32 27 import javax.script.Invocable;
33 28 import javax.script.ScriptEngine;
34 29 import javax.script.ScriptException;
35   -import java.util.Map;
36 30 import java.util.UUID;
37   -import java.util.concurrent.ConcurrentHashMap;
38 31 import java.util.concurrent.ExecutorService;
39 32 import java.util.concurrent.Executors;
40   -import java.util.concurrent.atomic.AtomicInteger;
41 33
42 34 @Slf4j
43   -public abstract class AbstractNashornJsInvokeService implements JsInvokeService {
  35 +public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeService {
44 36
45 37 private NashornSandbox sandbox;
46 38 private ScriptEngine engine;
47 39 private ExecutorService monitorExecutorService;
48 40
49   - private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
50   - private final Map<BlackListKey, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
51   -
52   - private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
53   - private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
54   -
55 41 @PostConstruct
56 42 public void init() {
57 43 if (useJsSandbox()) {
... ... @@ -80,181 +66,44 @@ public abstract class AbstractNashornJsInvokeService implements JsInvokeService
80 66
81 67 protected abstract long getMaxCpuTime();
82 68
83   - protected abstract int getMaxErrors();
84   -
85 69 @Override
86   - public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
87   - ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
88   - UUID scriptId = scriptInfo.getId();
89   - AtomicInteger duplicateCount = scriptInfo.getCount();
90   -
91   - synchronized (scriptInfo.getLock()) {
92   - if (duplicateCount.compareAndSet(0, 1)) {
93   - try {
94   - evaluate(scriptId, scriptType, scriptBody, argNames);
95   - } catch (Exception e) {
96   - duplicateCount.decrementAndGet();
97   - log.warn("Failed to compile JS script: {}", e.getMessage(), e);
98   - return Futures.immediateFailedFuture(e);
99   - }
  70 + protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) {
  71 + try {
  72 + if (useJsSandbox()) {
  73 + sandbox.eval(jsScript);
100 74 } else {
101   - duplicateCount.incrementAndGet();
  75 + engine.eval(jsScript);
102 76 }
  77 + scriptIdToNameMap.put(scriptId, functionName);
  78 + } catch (Exception e) {
  79 + log.warn("Failed to compile JS script: {}", e.getMessage(), e);
  80 + return Futures.immediateFailedFuture(e);
103 81 }
104 82 return Futures.immediateFuture(scriptId);
105 83 }
106 84
107   - private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
108   - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
109   - String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
110   - if (useJsSandbox()) {
111   - sandbox.eval(jsScript);
112   - } else {
113   - engine.eval(jsScript);
114   - }
115   - functionsMap.put(scriptId, functionName);
116   - }
117   -
118 85 @Override
119   - public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
120   - String functionName = functionsMap.get(scriptId);
121   - if (functionName == null) {
122   - String message = "No compiled script found for scriptId: [" + scriptId + "]!";
123   - log.warn(message);
124   - return Futures.immediateFailedFuture(new RuntimeException(message));
125   - }
126   -
127   - BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId));
128   - if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) {
129   - RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause());
130   - throwable.printStackTrace();
131   - return Futures.immediateFailedFuture(throwable);
132   - }
133   -
  86 + protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
134 87 try {
135   - return invoke(functionName, args);
  88 + Object result;
  89 + if (useJsSandbox()) {
  90 + result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
  91 + } else {
  92 + result = ((Invocable) engine).invokeFunction(functionName, args);
  93 + }
  94 + return Futures.immediateFuture(result);
136 95 } catch (Exception e) {
137   - BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
138   - blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e);
  96 + onScriptExecutionError(scriptId);
139 97 return Futures.immediateFailedFuture(e);
140 98 }
141 99 }
142 100
143   - private ListenableFuture<Object> invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException {
144   - Object result;
  101 + protected void doRelease(UUID scriptId, String functionName) throws ScriptException {
145 102 if (useJsSandbox()) {
146   - result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
  103 + sandbox.eval(functionName + " = undefined;");
147 104 } else {
148   - result = ((Invocable) engine).invokeFunction(functionName, args);
149   - }
150   - return Futures.immediateFuture(result);
151   - }
152   -
153   - @Override
154   - public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
155   - ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
156   - if (scriptInfo == null) {
157   - log.warn("Script release called for not existing script id [{}]", scriptId);
158   - return Futures.immediateFuture(null);
159   - }
160   -
161   - synchronized (scriptInfo.getLock()) {
162   - int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
163   - if (remainingDuplicates > 0) {
164   - return Futures.immediateFuture(null);
165   - }
166   -
167   - String functionName = functionsMap.get(scriptId);
168   - if (functionName != null) {
169   - try {
170   - if (useJsSandbox()) {
171   - sandbox.eval(functionName + " = undefined;");
172   - } else {
173   - engine.eval(functionName + " = undefined;");
174   - }
175   - functionsMap.remove(scriptId);
176   - blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
177   - } catch (ScriptException e) {
178   - log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
179   - return Futures.immediateFailedFuture(e);
180   - }
181   - } else {
182   - log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
183   - }
  105 + engine.eval(functionName + " = undefined;");
184 106 }
185   - return Futures.immediateFuture(null);
186   - }
187   -
188   -
189   - private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
190   - switch (scriptType) {
191   - case RULE_NODE_SCRIPT:
192   - return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
193   - default:
194   - throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
195   - }
196   - }
197   -
198   - private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
199   - ScriptInfo meta = ScriptInfo.preInit();
200   - String key = deduplicateKey(scriptType, scriptBody);
201   - ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
202   - return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
203 107 }
204 108
205   - private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
206   - return scriptType + "_" + scriptBody;
207   - }
208   -
209   - @Getter
210   - private static class ScriptInfo {
211   - private final UUID id;
212   - private final Object lock;
213   - private final AtomicInteger count;
214   -
215   - ScriptInfo(UUID id, Object lock, AtomicInteger count) {
216   - this.id = id;
217   - this.lock = lock;
218   - this.count = count;
219   - }
220   -
221   - static ScriptInfo preInit() {
222   - UUID preId = UUID.randomUUID();
223   - AtomicInteger preCount = new AtomicInteger();
224   - Object preLock = new Object();
225   - return new ScriptInfo(preId, preLock, preCount);
226   - }
227   - }
228   -
229   - @EqualsAndHashCode
230   - @Getter
231   - @RequiredArgsConstructor
232   - private static class BlackListKey {
233   - private final UUID scriptId;
234   - private final EntityId entityId;
235   -
236   - }
237   -
238   - @Data
239   - private static class BlackListInfo {
240   - private final AtomicInteger count;
241   - private Exception ex;
242   -
243   - BlackListInfo() {
244   - this.count = new AtomicInteger(0);
245   - }
246   -
247   - void incrementWithReason(Exception e) {
248   - count.incrementAndGet();
249   - ex = e;
250   - }
251   -
252   - int getCount() {
253   - return count.get();
254   - }
255   -
256   - Exception getCause() {
257   - return ex;
258   - }
259   - }
260 109 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.script;
  17 +
  18 +import java.util.List;
  19 +
  20 +/**
  21 + * Created by ashvayka on 25.09.18.
  22 + */
  23 +public class JsInvokeRequest {
  24 +
  25 + private String scriptId;
  26 + private String scriptBody;
  27 + private List<String> args;
  28 +
  29 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.script;
  17 +
  18 +import java.util.List;
  19 +
  20 +/**
  21 + * Created by ashvayka on 25.09.18.
  22 + */
  23 +public class JsInvokeResponse {
  24 +
  25 + private String scriptId;
  26 + private String scriptBody;
  27 + private List<String> args;
  28 +
  29 +}
... ...
... ... @@ -25,8 +25,8 @@ public interface JsInvokeService {
25 25
26 26 ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
27 27
28   - ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
  28 + ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
29 29
30   - ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
  30 + ListenableFuture<Void> release(UUID scriptId);
31 31
32 32 }
... ...
... ... @@ -15,32 +15,172 @@
15 15 */
16 16 package org.thingsboard.server.service.script;
17 17
  18 +import com.google.common.util.concurrent.Futures;
18 19 import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.Getter;
19 21 import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.beans.factory.annotation.Value;
20 24 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21 25 import org.springframework.stereotype.Service;
22   -import org.thingsboard.server.common.data.id.EntityId;
  26 +import org.thingsboard.server.gen.js.JsInvokeProtos;
  27 +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
  28 +import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
  29 +import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
  30 +import org.thingsboard.server.kafka.TbKafkaSettings;
  31 +import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
23 32
  33 +import javax.annotation.PostConstruct;
  34 +import javax.annotation.PreDestroy;
  35 +import java.util.Map;
24 36 import java.util.UUID;
  37 +import java.util.concurrent.ConcurrentHashMap;
25 38
26 39 @Slf4j
27   -@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "local", matchIfMissing = true)
  40 +@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true)
28 41 @Service
29   -public class RemoteJsInvokeService implements JsInvokeService {
  42 +public class RemoteJsInvokeService extends AbstractJsInvokeService {
  43 +
  44 + @Autowired
  45 + private DiscoveryService discoveryService;
  46 +
  47 + @Autowired
  48 + private TbKafkaSettings kafkaSettings;
  49 +
  50 + @Value("${js.remote.use_js_sandbox}")
  51 + private boolean useJsSandbox;
  52 +
  53 + @Value("${js.remote.request_topic}")
  54 + private String requestTopic;
  55 +
  56 + @Value("${js.remote.response_topic_prefix}")
  57 + private String responseTopicPrefix;
  58 +
  59 + @Value("${js.remote.max_pending_requests}")
  60 + private long maxPendingRequests;
  61 +
  62 + @Value("${js.remote.max_requests_timeout}")
  63 + private long maxRequestsTimeout;
  64 +
  65 + @Value("${js.remote.response_poll_duration}")
  66 + private long responsePollDuration;
  67 +
  68 + @Getter
  69 + @Value("${js.remote.max_errors}")
  70 + private int maxErrors;
  71 +
  72 + private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
  73 + protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
  74 +
  75 + @PostConstruct
  76 + public void init() {
  77 + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<JsInvokeProtos.RemoteJsRequest> requestBuilder = TBKafkaProducerTemplate.builder();
  78 + requestBuilder.settings(kafkaSettings);
  79 + requestBuilder.defaultTopic(requestTopic);
  80 + requestBuilder.encoder(new RemoteJsRequestEncoder());
  81 +
  82 + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
  83 + responseBuilder.settings(kafkaSettings);
  84 + responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId());
  85 + responseBuilder.clientId(discoveryService.getNodeId());
  86 + responseBuilder.groupId("rule-engine-node");
  87 + responseBuilder.autoCommit(true);
  88 + responseBuilder.autoCommitIntervalMs(100);
  89 + responseBuilder.decoder(new RemoteJsResponseDecoder());
  90 +
  91 + TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
  92 + <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
  93 + builder.requestTemplate(requestBuilder.build());
  94 + builder.responseTemplate(responseBuilder.build());
  95 + builder.maxPendingRequests(maxPendingRequests);
  96 + builder.maxRequestTimeout(maxRequestsTimeout);
  97 + builder.pollInterval(responsePollDuration);
  98 + kafkaTemplate = builder.build();
  99 + }
  100 +
  101 + @PreDestroy
  102 + public void destroy(){
  103 + if(kafkaTemplate != null){
  104 + kafkaTemplate.stop();
  105 + }
  106 + }
30 107
31 108 @Override
32   - public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
33   - return null;
  109 + protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) {
  110 + JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
  111 + .setScriptIdMSB(scriptId.getMostSignificantBits())
  112 + .setScriptIdLSB(scriptId.getLeastSignificantBits())
  113 + .setFunctionName(functionName)
  114 + .setScriptBody(scriptBody).build();
  115 +
  116 + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
  117 + .setCompileRequest(jsRequest)
  118 + .build();
  119 +
  120 + ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
  121 + return Futures.transform(future, response -> {
  122 + JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
  123 + UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
  124 + if (compilationResult.getSuccess()) {
  125 + scriptIdToNameMap.put(scriptId, functionName);
  126 + scriptIdToBodysMap.put(scriptId, scriptBody);
  127 + return compiledScriptId;
  128 + } else {
  129 + log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
  130 + throw new RuntimeException(compilationResult.getErrorCode().name());
  131 + }
  132 + });
34 133 }
35 134
36 135 @Override
37   - public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
38   - return null;
  136 + protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
  137 + String scriptBody = scriptIdToBodysMap.get(scriptId);
  138 + if (scriptBody == null) {
  139 + return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
  140 + }
  141 + JsInvokeProtos.JsInvokeRequest jsRequest = JsInvokeProtos.JsInvokeRequest.newBuilder()
  142 + .setScriptIdMSB(scriptId.getMostSignificantBits())
  143 + .setScriptIdLSB(scriptId.getLeastSignificantBits())
  144 + .setFunctionName(functionName)
  145 + .setScriptBody(scriptIdToBodysMap.get(scriptId)).build();
  146 +
  147 + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
  148 + .setInvokeRequest(jsRequest)
  149 + .build();
  150 +
  151 + ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
  152 + return Futures.transform(future, response -> {
  153 + JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse();
  154 + if (invokeResult.getSuccess()) {
  155 + return invokeResult.getResult();
  156 + } else {
  157 + log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
  158 + throw new RuntimeException(invokeResult.getErrorCode().name());
  159 + }
  160 + });
39 161 }
40 162
41 163 @Override
42   - public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
43   - return null;
  164 + protected void doRelease(UUID scriptId, String functionName) throws Exception {
  165 + JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
  166 + .setScriptIdMSB(scriptId.getMostSignificantBits())
  167 + .setScriptIdLSB(scriptId.getLeastSignificantBits())
  168 + .setFunctionName(functionName).build();
  169 +
  170 + JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
  171 + .setReleaseRequest(jsRequest)
  172 + .build();
  173 +
  174 + ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
  175 + JsInvokeProtos.RemoteJsResponse response = future.get();
  176 +
  177 + JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();
  178 + UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
  179 + if (compilationResult.getSuccess()) {
  180 + scriptIdToBodysMap.remove(scriptId);
  181 + } else {
  182 + log.debug("[{}] Failed to release script due", compiledScriptId);
  183 + }
44 184 }
45 185
46 186 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.script;
  17 +
  18 +import org.thingsboard.server.gen.js.JsInvokeProtos;
  19 +import org.thingsboard.server.kafka.TbKafkaEncoder;
  20 +
  21 +/**
  22 + * Created by ashvayka on 25.09.18.
  23 + */
  24 +public class RemoteJsRequestEncoder implements TbKafkaEncoder<JsInvokeProtos.RemoteJsRequest> {
  25 + @Override
  26 + public byte[] encode(JsInvokeProtos.RemoteJsRequest value) {
  27 + return value.toByteArray();
  28 + }
  29 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.script;
  17 +
  18 +import org.thingsboard.server.gen.js.JsInvokeProtos;
  19 +import org.thingsboard.server.kafka.TbKafkaDecoder;
  20 +
  21 +import java.io.IOException;
  22 +
  23 +/**
  24 + * Created by ashvayka on 25.09.18.
  25 + */
  26 +public class RemoteJsResponseDecoder implements TbKafkaDecoder<JsInvokeProtos.RemoteJsResponse> {
  27 +
  28 + @Override
  29 + public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException {
  30 + return JsInvokeProtos.RemoteJsResponse.parseFrom(data);
  31 + }
  32 +}
... ...
... ... @@ -165,7 +165,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
165 165 private JsonNode executeScript(TbMsg msg) throws ScriptException {
166 166 try {
167 167 String[] inArgs = prepareArgs(msg);
168   - String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
  168 + String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
169 169 return mapper.readTree(eval);
170 170 } catch (ExecutionException e) {
171 171 if (e.getCause() instanceof ScriptException) {
... ... @@ -179,6 +179,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
179 179 }
180 180
181 181 public void destroy() {
182   - sandboxService.release(this.scriptId, this.entityId);
  182 + sandboxService.release(this.scriptId);
183 183 }
184 184 }
... ...
... ... @@ -14,24 +14,68 @@
14 14 * limitations under the License.
15 15 */
16 16 syntax = "proto3";
17   -package cluster;
  17 +package js;
18 18
19 19 option java_package = "org.thingsboard.server.gen.js";
20 20 option java_outer_classname = "JsInvokeProtos";
21 21
22   -service JsInvokeRpcService {
23   - rpc handleMsgs(stream JsInvokeRequest) returns (stream JsInvokeResponse) {}
  22 +enum JsInvokeErrorCode{
  23 + COMPILATION_ERROR = 0;
  24 + RUNTIME_ERROR = 1;
  25 + CPU_USAGE_ERROR = 2;
  26 +}
  27 +
  28 +message RemoteJsRequest {
  29 + JsCompileRequest compileRequest = 1;
  30 + JsInvokeRequest invokeRequest = 2;
  31 + JsReleaseRequest releaseRequest = 3;
  32 +}
  33 +
  34 +message RemoteJsResponse {
  35 + JsCompileResponse compileResponse = 1;
  36 + JsInvokeResponse invokeResponse = 2;
  37 + JsReleaseResponse releaseResponse = 3;
  38 +}
  39 +
  40 +message JsCompileRequest {
  41 + int64 scriptIdMSB = 1;
  42 + int64 scriptIdLSB = 2;
  43 + string functionName = 3;
  44 + string scriptBody = 4;
  45 +}
  46 +
  47 +message JsReleaseRequest {
  48 + int64 scriptIdMSB = 1;
  49 + int64 scriptIdLSB = 2;
  50 + string functionName = 3;
  51 +}
  52 +
  53 +message JsReleaseResponse {
  54 + bool success = 1;
  55 + int64 scriptIdMSB = 2;
  56 + int64 scriptIdLSB = 3;
  57 +}
  58 +
  59 +message JsCompileResponse {
  60 + bool success = 1;
  61 + int64 scriptIdMSB = 2;
  62 + int64 scriptIdLSB = 3;
  63 + JsInvokeErrorCode errorCode = 4;
  64 + string errorDetails = 5;
24 65 }
25 66
26 67 message JsInvokeRequest {
27   - string scriptId = 1;
28   - string scriptBody = 2;
29   - repeated string args = 3;
  68 + int64 scriptIdMSB = 1;
  69 + int64 scriptIdLSB = 2;
  70 + string functionName = 3;
  71 + string scriptBody = 4;
  72 + repeated string args = 5;
30 73 }
31 74
32 75 message JsInvokeResponse {
33   - string result = 1;
34   - string errorName = 2;
35   - string errorDetails = 3;
  76 + bool success = 1;
  77 + string result = 2;
  78 + JsInvokeErrorCode errorCode = 3;
  79 + string errorDetails = 4;
36 80 }
37 81
... ...
... ... @@ -405,8 +405,17 @@ state:
405 405 defaultInactivityTimeoutInSec: 10
406 406 defaultStateCheckIntervalInSec: 10
407 407
  408 +kafka:
  409 + enabled: true
  410 + bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
  411 + acks: "${TB_KAFKA_ACKS:all}"
  412 + retries: "${TB_KAFKA_RETRIES:1}"
  413 + batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
  414 + linger.ms: "${TB_KAFKA_LINGER_MS:1}"
  415 + buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  416 +
408 417 js:
409   - evaluator: "${JS_EVALUATOR:external}" # local/external
  418 + evaluator: "${JS_EVALUATOR:local}" # local/external
410 419 # Built-in JVM JavaScript environment properties
411 420 local:
412 421 # Use Sandboxed (secured) JVM JavaScript environment
... ... @@ -421,3 +430,15 @@ js:
421 430 remote:
422 431 # Use Sandboxed (secured) JVM JavaScript environment
423 432 use_js_sandbox: "${USE_REMOTE_JS_SANDBOX:true}"
  433 + # JS Eval request topic
  434 + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
  435 + # JS Eval responses topic prefix that is combined with node id
  436 + response_topic_prefix: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.responses}"
  437 + # JS Eval max pending requests
  438 + max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
  439 + # JS Eval max request timeout
  440 + max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:20000}"
  441 + # JS response poll interval
  442 + response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
  443 + # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
  444 + max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
... ...
... ... @@ -236,8 +236,8 @@ public class RuleNodeJsScriptEngineTest {
236 236 startLatch.await();
237 237 UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
238 238 scriptIds.put(scriptId, new Object());
239   - jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
240   - jsSandboxService.release(scriptId, ruleNodeId).get();
  239 + jsSandboxService.invokeFunction(scriptId, "{}", "{}", "TEXT").get();
  240 + jsSandboxService.release(scriptId).get();
241 241 }
242 242 } catch (Throwable th) {
243 243 failedCount.incrementAndGet();
... ...
... ... @@ -37,7 +37,8 @@
37 37 <modules>
38 38 <module>data</module>
39 39 <module>message</module>
40   - <module>transport</module>
  40 + <module>transport</module>
  41 + <module>queue</module>
41 42 </modules>
42 43
43 44 </project>
... ...
  1 +<!--
  2 +
  3 + Copyright © 2016-2018 The Thingsboard Authors
  4 +
  5 + Licensed under the Apache License, Version 2.0 (the "License");
  6 + you may not use this file except in compliance with the License.
  7 + You may obtain a copy of the License at
  8 +
  9 + http://www.apache.org/licenses/LICENSE-2.0
  10 +
  11 + Unless required by applicable law or agreed to in writing, software
  12 + distributed under the License is distributed on an "AS IS" BASIS,
  13 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + See the License for the specific language governing permissions and
  15 + limitations under the License.
  16 +
  17 +-->
  18 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  19 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  20 + <modelVersion>4.0.0</modelVersion>
  21 + <parent>
  22 + <groupId>org.thingsboard</groupId>
  23 + <version>2.1.1-SNAPSHOT</version>
  24 + <artifactId>common</artifactId>
  25 + </parent>
  26 + <groupId>org.thingsboard.common</groupId>
  27 + <artifactId>queue</artifactId>
  28 + <packaging>jar</packaging>
  29 +
  30 + <name>Thingsboard Server Queue components</name>
  31 + <url>https://thingsboard.io</url>
  32 +
  33 + <properties>
  34 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  35 + <main.dir>${basedir}/../..</main.dir>
  36 + </properties>
  37 +
  38 + <dependencies>
  39 + <dependency>
  40 + <groupId>org.thingsboard.common</groupId>
  41 + <artifactId>data</artifactId>
  42 + </dependency>
  43 + <dependency>
  44 + <groupId>org.thingsboard.common</groupId>
  45 + <artifactId>message</artifactId>
  46 + </dependency>
  47 + <dependency>
  48 + <groupId>org.apache.kafka</groupId>
  49 + <artifactId>kafka-clients</artifactId>
  50 + </dependency>
  51 + <dependency>
  52 + <groupId>org.springframework</groupId>
  53 + <artifactId>spring-context-support</artifactId>
  54 + </dependency>
  55 + <dependency>
  56 + <groupId>org.springframework.boot</groupId>
  57 + <artifactId>spring-boot-autoconfigure</artifactId>
  58 + </dependency>
  59 + <dependency>
  60 + <groupId>com.google.guava</groupId>
  61 + <artifactId>guava</artifactId>
  62 + </dependency>
  63 + <dependency>
  64 + <groupId>com.google.code.gson</groupId>
  65 + <artifactId>gson</artifactId>
  66 + </dependency>
  67 + <dependency>
  68 + <groupId>org.slf4j</groupId>
  69 + <artifactId>slf4j-api</artifactId>
  70 + </dependency>
  71 + <dependency>
  72 + <groupId>org.slf4j</groupId>
  73 + <artifactId>log4j-over-slf4j</artifactId>
  74 + </dependency>
  75 + <dependency>
  76 + <groupId>ch.qos.logback</groupId>
  77 + <artifactId>logback-core</artifactId>
  78 + </dependency>
  79 + <dependency>
  80 + <groupId>ch.qos.logback</groupId>
  81 + <artifactId>logback-classic</artifactId>
  82 + </dependency>
  83 + <dependency>
  84 + <groupId>junit</groupId>
  85 + <artifactId>junit</artifactId>
  86 + <scope>test</scope>
  87 + </dependency>
  88 + <dependency>
  89 + <groupId>org.mockito</groupId>
  90 + <artifactId>mockito-all</artifactId>
  91 + <scope>test</scope>
  92 + </dependency>
  93 + </dependencies>
  94 +
  95 +</project>
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import org.apache.kafka.clients.admin.AdminClient;
  19 +import org.apache.kafka.clients.admin.CreateTopicsResult;
  20 +import org.apache.kafka.clients.admin.NewTopic;
  21 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  22 +import org.apache.kafka.clients.consumer.KafkaConsumer;
  23 +
  24 +import java.time.Duration;
  25 +import java.util.Collections;
  26 +import java.util.Properties;
  27 +
  28 +/**
  29 + * Created by ashvayka on 24.09.18.
  30 + */
  31 +public class TBKafkaAdmin {
  32 +
  33 + AdminClient client;
  34 +
  35 + public TBKafkaAdmin() {
  36 + Properties props = new Properties();
  37 + props.put("bootstrap.servers", "localhost:9092");
  38 + props.put("group.id", "test");
  39 + props.put("enable.auto.commit", "true");
  40 + props.put("auto.commit.interval.ms", "1000");
  41 + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  42 + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  43 + client = AdminClient.create(props);
  44 + }
  45 +
  46 + public CreateTopicsResult createTopic(NewTopic topic){
  47 + return client.createTopics(Collections.singletonList(topic));
  48 + }
  49 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Getter;
  20 +import org.apache.kafka.clients.consumer.ConsumerConfig;
  21 +import org.apache.kafka.clients.consumer.ConsumerRecord;
  22 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  23 +import org.apache.kafka.clients.consumer.KafkaConsumer;
  24 +
  25 +import java.io.IOException;
  26 +import java.time.Duration;
  27 +import java.util.Collections;
  28 +import java.util.Properties;
  29 +
  30 +/**
  31 + * Created by ashvayka on 24.09.18.
  32 + */
  33 +public class TBKafkaConsumerTemplate<T> {
  34 +
  35 + private final KafkaConsumer<String, byte[]> consumer;
  36 + private final TbKafkaDecoder<T> decoder;
  37 + @Getter
  38 + private final String topic;
  39 +
  40 + @Builder
  41 + private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
  42 + String clientId, String groupId, String topic,
  43 + boolean autoCommit, long autoCommitIntervalMs) {
  44 + Properties props = settings.toProps();
  45 + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
  46 + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  47 + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  48 + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
  49 + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  50 + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  51 + this.consumer = new KafkaConsumer<>(props);
  52 + this.decoder = decoder;
  53 + this.topic = topic;
  54 + }
  55 +
  56 + public void subscribe() {
  57 + consumer.subscribe(Collections.singletonList(topic));
  58 + }
  59 +
  60 + public void unsubscribe() {
  61 + consumer.unsubscribe();
  62 + }
  63 +
  64 + public ConsumerRecords<String, byte[]> poll(Duration duration) {
  65 + return consumer.poll(duration);
  66 + }
  67 +
  68 + public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
  69 + return decoder.decode(record.value());
  70 + }
  71 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Getter;
  20 +import org.apache.kafka.clients.producer.KafkaProducer;
  21 +import org.apache.kafka.clients.producer.ProducerConfig;
  22 +import org.apache.kafka.clients.producer.ProducerRecord;
  23 +import org.apache.kafka.clients.producer.RecordMetadata;
  24 +import org.apache.kafka.common.PartitionInfo;
  25 +import org.apache.kafka.common.header.Header;
  26 +
  27 +import java.util.List;
  28 +import java.util.Properties;
  29 +import java.util.concurrent.Future;
  30 +
  31 +/**
  32 + * Created by ashvayka on 24.09.18.
  33 + */
  34 +public class TBKafkaProducerTemplate<T> {
  35 +
  36 + private final KafkaProducer<String, byte[]> producer;
  37 + private final TbKafkaEncoder<T> encoder;
  38 + private final TbKafkaPartitioner<T> partitioner;
  39 + private final List<PartitionInfo> partitionInfoList;
  40 + @Getter
  41 + private final String defaultTopic;
  42 +
  43 + @Builder
  44 + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaPartitioner<T> partitioner, String defaultTopic) {
  45 + Properties props = settings.toProps();
  46 + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  47 + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  48 + this.producer = new KafkaProducer<>(props);
  49 + //Maybe this should not be cached, but we don't plan to change size of partitions
  50 + this.partitionInfoList = producer.partitionsFor(defaultTopic);
  51 + this.encoder = encoder;
  52 + this.partitioner = partitioner;
  53 + this.defaultTopic = defaultTopic;
  54 + }
  55 +
  56 + public Future<RecordMetadata> send(String key, T value) {
  57 + return send(key, value, null, null);
  58 + }
  59 +
  60 + public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
  61 + return send(key, value, null, headers);
  62 + }
  63 +
  64 + public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
  65 + return send(this.defaultTopic, key, value, timestamp, headers);
  66 + }
  67 +
  68 + public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
  69 + byte[] data = encoder.encode(value);
  70 + ProducerRecord<String, byte[]> record;
  71 + Integer partition = getPartition(topic, key, value, data);
  72 + record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
  73 + return producer.send(record);
  74 + }
  75 +
  76 + private Integer getPartition(String topic, String key, T value, byte[] data) {
  77 + if (partitioner == null) {
  78 + return null;
  79 + } else {
  80 + return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
  81 + }
  82 + }
  83 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  20 +import org.apache.kafka.clients.producer.ProducerRecord;
  21 +import org.apache.kafka.common.header.Header;
  22 +
  23 +import java.nio.charset.StandardCharsets;
  24 +import java.util.concurrent.ExecutorService;
  25 +import java.util.concurrent.Executors;
  26 +import java.util.concurrent.atomic.LongAdder;
  27 +
  28 +/**
  29 + * Created by ashvayka on 24.09.18.
  30 + */
  31 +@Slf4j
  32 +public class TbJsEvaluator {
  33 +
  34 +// public static void main(String[] args) {
  35 +// ExecutorService executorService = Executors.newCachedThreadPool();
  36 +//
  37 +// TBKafkaConsumerTemplate requestConsumer = new TBKafkaConsumerTemplate();
  38 +// requestConsumer.subscribe("requests");
  39 +//
  40 +// LongAdder responseCounter = new LongAdder();
  41 +// TBKafkaProducerTemplate responseProducer = new TBKafkaProducerTemplate();
  42 +// executorService.submit((Runnable) () -> {
  43 +// while (true) {
  44 +// ConsumerRecords<String, String> requests = requestConsumer.poll(100);
  45 +// requests.forEach(request -> {
  46 +// Header header = request.headers().lastHeader("responseTopic");
  47 +// ProducerRecord<String, String> response = new ProducerRecord<>(new String(header.value(), StandardCharsets.UTF_8),
  48 +// request.key(), request.value());
  49 +// responseProducer.send(response);
  50 +// responseCounter.add(1);
  51 +// });
  52 +// }
  53 +// });
  54 +//
  55 +// executorService.submit((Runnable) () -> {
  56 +// while (true) {
  57 +// log.warn("Requests: [{}], Responses: [{}]", responseCounter.longValue(), responseCounter.longValue());
  58 +// try {
  59 +// Thread.sleep(1000L);
  60 +// } catch (InterruptedException e) {
  61 +// e.printStackTrace();
  62 +// }
  63 +// }
  64 +// });
  65 +//
  66 +// }
  67 +
  68 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import java.io.IOException;
  19 +
  20 +/**
  21 + * Created by ashvayka on 25.09.18.
  22 + */
  23 +public interface TbKafkaDecoder<T> {
  24 +
  25 + T decode(byte[] data) throws IOException;
  26 +
  27 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +/**
  19 + * Created by ashvayka on 25.09.18.
  20 + */
  21 +public interface TbKafkaEncoder<T> {
  22 +
  23 + byte[] encode(T value);
  24 +
  25 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import org.apache.kafka.clients.producer.Partitioner;
  19 +import org.apache.kafka.common.PartitionInfo;
  20 +
  21 +import java.util.List;
  22 +
  23 +/**
  24 + * Created by ashvayka on 25.09.18.
  25 + */
  26 +public interface TbKafkaPartitioner<T> extends Partitioner {
  27 +
  28 + int partition(String topic, String key, T value, byte[] encodedValue, List<PartitionInfo> partitions);
  29 +
  30 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.Data;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  22 +import org.springframework.stereotype.Component;
  23 +
  24 +/**
  25 + * Created by ashvayka on 25.09.18.
  26 + */
  27 +@Data
  28 +public class TbKafkaProperty {
  29 +
  30 + private String key;
  31 + private String value;
  32 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.SettableFuture;
  21 +import lombok.Builder;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.apache.kafka.clients.admin.CreateTopicsResult;
  24 +import org.apache.kafka.clients.admin.NewTopic;
  25 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  26 +import org.apache.kafka.common.header.Header;
  27 +import org.apache.kafka.common.header.internals.RecordHeader;
  28 +
  29 +import java.io.IOException;
  30 +import java.nio.ByteBuffer;
  31 +import java.nio.charset.StandardCharsets;
  32 +import java.time.Duration;
  33 +import java.util.ArrayList;
  34 +import java.util.List;
  35 +import java.util.UUID;
  36 +import java.util.concurrent.ConcurrentHashMap;
  37 +import java.util.concurrent.ConcurrentMap;
  38 +import java.util.concurrent.ExecutorService;
  39 +import java.util.concurrent.Executors;
  40 +import java.util.concurrent.TimeoutException;
  41 +
  42 +/**
  43 + * Created by ashvayka on 25.09.18.
  44 + */
  45 +@Slf4j
  46 +public class TbKafkaRequestTemplate<Request, Response> {
  47 +
  48 + private final TBKafkaProducerTemplate<Request> requestTemplate;
  49 + private final TBKafkaConsumerTemplate<Response> responseTemplate;
  50 + private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
  51 + private final ExecutorService executor;
  52 + private final long maxRequestTimeout;
  53 + private final long maxPendingRequests;
  54 + private final long pollInterval;
  55 + private volatile long tickTs = 0L;
  56 + private volatile long tickSize = 0L;
  57 + private volatile boolean stopped = false;
  58 +
  59 + @Builder
  60 + public TbKafkaRequestTemplate(TBKafkaProducerTemplate<Request> requestTemplate, TBKafkaConsumerTemplate<Response> responseTemplate,
  61 + long maxRequestTimeout,
  62 + long maxPendingRequests,
  63 + long pollInterval,
  64 + ExecutorService executor) {
  65 + this.requestTemplate = requestTemplate;
  66 + this.responseTemplate = responseTemplate;
  67 + this.pendingRequests = new ConcurrentHashMap<>();
  68 + this.maxRequestTimeout = maxRequestTimeout;
  69 + this.maxPendingRequests = maxPendingRequests;
  70 + this.pollInterval = pollInterval;
  71 + if (executor != null) {
  72 + this.executor = executor;
  73 + } else {
  74 + this.executor = Executors.newSingleThreadExecutor();
  75 + }
  76 + }
  77 +
  78 + public void init() {
  79 + try {
  80 + TBKafkaAdmin admin = new TBKafkaAdmin();
  81 + CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
  82 + result.all().get();
  83 + } catch (Exception e) {
  84 + log.trace("Failed to create topic: {}", e.getMessage(), e);
  85 + }
  86 + tickTs = System.currentTimeMillis();
  87 + responseTemplate.subscribe();
  88 + executor.submit(() -> {
  89 + long nextCleanupMs = 0L;
  90 + while (!stopped) {
  91 + ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
  92 + responses.forEach(response -> {
  93 + Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
  94 + if (requestIdHeader == null) {
  95 + log.error("[{}] Missing requestIdHeader", response);
  96 + }
  97 + UUID requestId = bytesToUuid(requestIdHeader.value());
  98 + ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
  99 + if (expectedResponse == null) {
  100 + log.trace("[{}] Invalid or stale request", requestId);
  101 + } else {
  102 + try {
  103 + expectedResponse.future.set(responseTemplate.decode(response));
  104 + } catch (IOException e) {
  105 + expectedResponse.future.setException(e);
  106 + }
  107 + }
  108 + });
  109 + tickTs = System.currentTimeMillis();
  110 + tickSize = pendingRequests.size();
  111 + if (nextCleanupMs < tickTs) {
  112 + //cleanup;
  113 + pendingRequests.entrySet().forEach(kv -> {
  114 + if (kv.getValue().expTime < tickTs) {
  115 + ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
  116 + if (staleRequest != null) {
  117 + staleRequest.future.setException(new TimeoutException());
  118 + }
  119 + }
  120 + });
  121 + nextCleanupMs = tickTs + maxRequestTimeout;
  122 + }
  123 + }
  124 + });
  125 + }
  126 +
  127 + public void stop() {
  128 + stopped = true;
  129 + }
  130 +
  131 + public ListenableFuture<Response> post(String key, Request request) {
  132 + if (tickSize > maxPendingRequests) {
  133 + return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
  134 + }
  135 + UUID requestId = UUID.randomUUID();
  136 + List<Header> headers = new ArrayList<>(2);
  137 + headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
  138 + headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
  139 + SettableFuture<Response> future = SettableFuture.create();
  140 + pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
  141 + requestTemplate.send(key, request, headers);
  142 + return future;
  143 + }
  144 +
  145 + private byte[] uuidToBytes(UUID uuid) {
  146 + ByteBuffer buf = ByteBuffer.allocate(16);
  147 + buf.putLong(uuid.getMostSignificantBits());
  148 + buf.putLong(uuid.getLeastSignificantBits());
  149 + return buf.array();
  150 + }
  151 +
  152 + private static UUID bytesToUuid(byte[] bytes) {
  153 + ByteBuffer bb = ByteBuffer.wrap(bytes);
  154 + long firstLong = bb.getLong();
  155 + long secondLong = bb.getLong();
  156 + return new UUID(firstLong, secondLong);
  157 + }
  158 +
  159 + private byte[] stringToBytes(String string) {
  160 + return string.getBytes(StandardCharsets.UTF_8);
  161 + }
  162 +
  163 + private static class ResponseMetaData<T> {
  164 + private final long expTime;
  165 + private final SettableFuture<T> future;
  166 +
  167 + ResponseMetaData(long ts, SettableFuture<T> future) {
  168 + this.expTime = ts;
  169 + this.future = future;
  170 + }
  171 + }
  172 +
  173 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.kafka.clients.producer.ProducerConfig;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  22 +import org.springframework.stereotype.Component;
  23 +
  24 +import java.util.List;
  25 +import java.util.Properties;
  26 +
  27 +/**
  28 + * Created by ashvayka on 25.09.18.
  29 + */
  30 +@Slf4j
  31 +@ConditionalOnProperty(prefix = "kafka", value = "enabled", havingValue = "true", matchIfMissing = false)
  32 +@Component
  33 +public class TbKafkaSettings {
  34 +
  35 + public static final String REQUEST_ID_HEADER = "requestId";
  36 + public static final String RESPONSE_TOPIC_HEADER = "responseTopic";
  37 +
  38 +
  39 + @Value("${kafka.bootstrap.server}")
  40 + private String servers;
  41 +
  42 + @Value("${kafka.acks}")
  43 + private String acks;
  44 +
  45 + @Value("${kafka.retries}")
  46 + private int retries;
  47 +
  48 + @Value("${kafka.batch.size}")
  49 + private long batchSize;
  50 +
  51 + @Value("${kafka.linger.ms}")
  52 + private long lingerMs;
  53 +
  54 + @Value("${kafka.buffer.memory}")
  55 + private long bufferMemory;
  56 +
  57 + @Value("${kafka.other:null}")
  58 + private List<TbKafkaProperty> other;
  59 +
  60 + public Properties toProps() {
  61 + Properties props = new Properties();
  62 + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  63 + props.put(ProducerConfig.ACKS_CONFIG, acks);
  64 + props.put(ProducerConfig.RETRIES_CONFIG, retries);
  65 + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  66 + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  67 + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  68 + if(other != null){
  69 + other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
  70 + }
  71 + return props;
  72 + }
  73 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.kafka;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.kafka.clients.admin.CreateTopicsResult;
  20 +import org.apache.kafka.clients.admin.NewTopic;
  21 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  22 +import org.apache.kafka.clients.producer.ProducerRecord;
  23 +import org.apache.kafka.common.header.Header;
  24 +import org.apache.kafka.common.header.internals.RecordHeader;
  25 +
  26 +import java.nio.charset.StandardCharsets;
  27 +import java.util.Collections;
  28 +import java.util.List;
  29 +import java.util.UUID;
  30 +import java.util.concurrent.ConcurrentHashMap;
  31 +import java.util.concurrent.ConcurrentMap;
  32 +import java.util.concurrent.ExecutionException;
  33 +import java.util.concurrent.ExecutorService;
  34 +import java.util.concurrent.Executors;
  35 +import java.util.concurrent.atomic.LongAdder;
  36 +
  37 +/**
  38 + * Created by ashvayka on 24.09.18.
  39 + */
  40 +@Slf4j
  41 +public class TbRuleEngineEmulator {
  42 +//
  43 +// public static void main(String[] args) throws InterruptedException, ExecutionException {
  44 +// ConcurrentMap<String, String> pendingRequestsMap = new ConcurrentHashMap<>();
  45 +//
  46 +// ExecutorService executorService = Executors.newCachedThreadPool();
  47 +//
  48 +// String responseTopic = "server" + Math.abs((int) (5000.0 * Math.random()));
  49 +// try {
  50 +// TBKafkaAdmin admin = new TBKafkaAdmin();
  51 +// CreateTopicsResult result = admin.createTopic(new NewTopic(responseTopic, 1, (short) 1));
  52 +// result.all().get();
  53 +// } catch (Exception e) {
  54 +// log.warn("Failed to create topic: {}", e.getMessage(), e);
  55 +// }
  56 +//
  57 +// List<Header> headers = Collections.singletonList(new RecordHeader("responseTopic", responseTopic.getBytes(StandardCharsets.UTF_8)));
  58 +//
  59 +// TBKafkaConsumerTemplate responseConsumer = new TBKafkaConsumerTemplate();
  60 +// TBKafkaProducerTemplate requestProducer = new TBKafkaProducerTemplate();
  61 +//
  62 +// LongAdder requestCounter = new LongAdder();
  63 +// LongAdder responseCounter = new LongAdder();
  64 +//
  65 +// responseConsumer.subscribe(responseTopic);
  66 +// executorService.submit((Runnable) () -> {
  67 +// while (true) {
  68 +// ConsumerRecords<String, String> responses = responseConsumer.poll(100);
  69 +// responses.forEach(response -> {
  70 +// String expectedResponse = pendingRequestsMap.remove(response.key());
  71 +// if (expectedResponse == null) {
  72 +// log.error("[{}] Invalid request", response.key());
  73 +// } else if (!expectedResponse.equals(response.value())) {
  74 +// log.error("[{}] Invalid response: {} instead of {}", response.key(), response.value(), expectedResponse);
  75 +// }
  76 +// responseCounter.add(1);
  77 +// });
  78 +// }
  79 +// });
  80 +//
  81 +// executorService.submit((Runnable) () -> {
  82 +// int i = 0;
  83 +// while (true) {
  84 +// String requestId = UUID.randomUUID().toString();
  85 +// String expectedResponse = UUID.randomUUID().toString();
  86 +// pendingRequestsMap.put(requestId, expectedResponse);
  87 +// requestProducer.send(new ProducerRecord<>("requests", null, requestId, expectedResponse, headers));
  88 +// requestCounter.add(1);
  89 +// i++;
  90 +// if (i % 10000 == 0) {
  91 +// try {
  92 +// Thread.sleep(500L);
  93 +// } catch (InterruptedException e) {
  94 +// e.printStackTrace();
  95 +// }
  96 +// }
  97 +// }
  98 +// });
  99 +//
  100 +// executorService.submit((Runnable) () -> {
  101 +// while (true) {
  102 +// log.warn("Requests: [{}], Responses: [{}]", requestCounter.longValue(), responseCounter.longValue());
  103 +// try {
  104 +// Thread.sleep(1000L);
  105 +// } catch (InterruptedException e) {
  106 +// e.printStackTrace();
  107 +// }
  108 +// }
  109 +// });
  110 +//
  111 +// Thread.sleep(60000);
  112 +// }
  113 +
  114 +}
... ...
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!--
  3 +
  4 + Copyright © 2016-2018 The Thingsboard Authors
  5 +
  6 + Licensed under the Apache License, Version 2.0 (the "License");
  7 + you may not use this file except in compliance with the License.
  8 + You may obtain a copy of the License at
  9 +
  10 + http://www.apache.org/licenses/LICENSE-2.0
  11 +
  12 + Unless required by applicable law or agreed to in writing, software
  13 + distributed under the License is distributed on an "AS IS" BASIS,
  14 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + See the License for the specific language governing permissions and
  16 + limitations under the License.
  17 +
  18 +-->
  19 +<!DOCTYPE configuration>
  20 +<configuration scan="true" scanPeriod="10 seconds">
  21 +
  22 + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  23 + <encoder>
  24 + <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
  25 + </encoder>
  26 + </appender>
  27 +
  28 + <logger name="org.thingsboard.server" level="INFO" />
  29 + <logger name="akka" level="INFO" />
  30 +
  31 + <root level="INFO">
  32 + <appender-ref ref="STDOUT"/>
  33 + </root>
  34 +
  35 +</configuration>
\ No newline at end of file
... ...
... ... @@ -67,7 +67,6 @@
67 67 <netty.version>4.1.22.Final</netty.version>
68 68 <os-maven-plugin.version>1.5.0</os-maven-plugin.version>
69 69 <rabbitmq.version>3.6.5</rabbitmq.version>
70   - <kafka.version>0.9.0.0</kafka.version>
71 70 <surfire.version>2.19.1</surfire.version>
72 71 <jar-plugin.version>3.0.2</jar-plugin.version>
73 72 <springfox-swagger.version>2.6.1</springfox-swagger.version>
... ... @@ -82,6 +81,7 @@
82 81 </sonar.exclusions>
83 82 <elasticsearch.version>5.0.2</elasticsearch.version>
84 83 <delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
  84 + <kafka.version>2.0.0</kafka.version>
85 85 </properties>
86 86
87 87 <modules>
... ... @@ -371,6 +371,11 @@
371 371 <version>${project.version}</version>
372 372 </dependency>
373 373 <dependency>
  374 + <groupId>org.thingsboard.common</groupId>
  375 + <artifactId>queue</artifactId>
  376 + <version>${project.version}</version>
  377 + </dependency>
  378 + <dependency>
374 379 <groupId>org.thingsboard</groupId>
375 380 <artifactId>tools</artifactId>
376 381 <version>${project.version}</version>
... ... @@ -415,6 +420,11 @@
415 420 <version>${spring-boot.version}</version>
416 421 </dependency>
417 422 <dependency>
  423 + <groupId>org.apache.kafka</groupId>
  424 + <artifactId>kafka-clients</artifactId>
  425 + <version>${kafka.version}</version>
  426 + </dependency>
  427 + <dependency>
418 428 <groupId>org.postgresql</groupId>
419 429 <artifactId>postgresql</artifactId>
420 430 <version>${postgresql.driver.version}</version>
... ... @@ -691,21 +701,6 @@
691 701 <scope>provided</scope>
692 702 </dependency>
693 703 <dependency>
694   - <groupId>org.apache.kafka</groupId>
695   - <artifactId>kafka_2.10</artifactId>
696   - <version>${kafka.version}</version>
697   - <exclusions>
698   - <exclusion>
699   - <groupId>org.slf4j</groupId>
700   - <artifactId>slf4j-log4j12</artifactId>
701   - </exclusion>
702   - <exclusion>
703   - <groupId>log4j</groupId>
704   - <artifactId>log4j</artifactId>
705   - </exclusion>
706   - </exclusions>
707   - </dependency>
708   - <dependency>
709 704 <groupId>org.eclipse.paho</groupId>
710 705 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
711 706 <version>${paho.client.version}</version>
... ...
... ... @@ -78,7 +78,7 @@
78 78 </dependency>
79 79 <dependency>
80 80 <groupId>org.apache.kafka</groupId>
81   - <artifactId>kafka_2.10</artifactId>
  81 + <artifactId>kafka-clients</artifactId>
82 82 </dependency>
83 83 <dependency>
84 84 <groupId>com.amazonaws</groupId>
... ...