Commit 7ee2cdfe3e277a89260ddfba2c3a00ab11cb0146

Authored by Yevhen Bondarenko
Committed by GitHub
1 parent dc5eb639

Fixes and refactoring (#2761)

* fix sqs js executor and refactored RemoteJsInvokeService

* added REMOTE_JS_MAX_REQUEST_TIMEOUT=20000 for aws-sqs, pubsub, service-bus docker environments

* added REMOTE_JS_MAX_REQUEST_TIMEOUT=20000 for aws-sqs, pubsub, service-bus docker environments

* refactored

* docker-compose.pubsub.yml improvements

* rabbitmq js executor improvements
... ... @@ -59,22 +59,22 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
59 59 @Value("${js.remote.stats.enabled:false}")
60 60 private boolean statsEnabled;
61 61
62   - private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0);
63   - private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0);
64   - private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0);
65   - private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0);
66   - private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0);
  62 + private final AtomicInteger queuePushedMsgs = new AtomicInteger(0);
  63 + private final AtomicInteger queueInvokeMsgs = new AtomicInteger(0);
  64 + private final AtomicInteger queueEvalMsgs = new AtomicInteger(0);
  65 + private final AtomicInteger queueFailedMsgs = new AtomicInteger(0);
  66 + private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0);
67 67
68 68 @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}")
69 69 public void printStats() {
70 70 if (statsEnabled) {
71   - int pushedMsgs = kafkaPushedMsgs.getAndSet(0);
72   - int invokeMsgs = kafkaInvokeMsgs.getAndSet(0);
73   - int evalMsgs = kafkaEvalMsgs.getAndSet(0);
74   - int failed = kafkaFailedMsgs.getAndSet(0);
75   - int timedOut = kafkaTimeoutMsgs.getAndSet(0);
  71 + int pushedMsgs = queuePushedMsgs.getAndSet(0);
  72 + int invokeMsgs = queueInvokeMsgs.getAndSet(0);
  73 + int evalMsgs = queueEvalMsgs.getAndSet(0);
  74 + int failed = queueFailedMsgs.getAndSet(0);
  75 + int timedOut = queueTimeoutMsgs.getAndSet(0);
76 76 if (pushedMsgs > 0 || invokeMsgs > 0 || evalMsgs > 0 || failed > 0 || timedOut > 0) {
77   - log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]",
  77 + log.info("Queue JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]",
78 78 pushedMsgs, invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed, timedOut);
79 79 }
80 80 }
... ... @@ -116,19 +116,19 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
116 116 if (maxRequestsTimeout > 0) {
117 117 future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
118 118 }
119   - kafkaPushedMsgs.incrementAndGet();
  119 + queuePushedMsgs.incrementAndGet();
120 120 Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
121 121 @Override
122 122 public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
123   - kafkaEvalMsgs.incrementAndGet();
  123 + queueEvalMsgs.incrementAndGet();
124 124 }
125 125
126 126 @Override
127 127 public void onFailure(Throwable t) {
128 128 if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
129   - kafkaTimeoutMsgs.incrementAndGet();
  129 + queueTimeoutMsgs.incrementAndGet();
130 130 }
131   - kafkaFailedMsgs.incrementAndGet();
  131 + queueFailedMsgs.incrementAndGet();
132 132 }
133 133 }, MoreExecutors.directExecutor());
134 134 return Futures.transform(future, response -> {
... ... @@ -170,20 +170,20 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
170 170 if (maxRequestsTimeout > 0) {
171 171 future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
172 172 }
173   - kafkaPushedMsgs.incrementAndGet();
  173 + queuePushedMsgs.incrementAndGet();
174 174 Futures.addCallback(future, new FutureCallback<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>>() {
175 175 @Override
176 176 public void onSuccess(@Nullable TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse> result) {
177   - kafkaInvokeMsgs.incrementAndGet();
  177 + queueInvokeMsgs.incrementAndGet();
178 178 }
179 179
180 180 @Override
181 181 public void onFailure(Throwable t) {
182 182 onScriptExecutionError(scriptId);
183 183 if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
184   - kafkaTimeoutMsgs.incrementAndGet();
  184 + queueTimeoutMsgs.incrementAndGet();
185 185 }
186   - kafkaFailedMsgs.incrementAndGet();
  186 + queueFailedMsgs.incrementAndGet();
187 187 }
188 188 }, MoreExecutors.directExecutor());
189 189 return Futures.transform(future, response -> {
... ...
... ... @@ -19,10 +19,10 @@ version: '2.2'
19 19 services:
20 20 tb-js-executor:
21 21 env_file:
22   - - queue-pubsub.env.env
  22 + - queue-pubsub.env
23 23 tb-core1:
24 24 env_file:
25   - - queue-pubsub.env.env
  25 + - queue-pubsub.env
26 26 depends_on:
27 27 - zookeeper
28 28 - redis
... ...
... ... @@ -2,3 +2,4 @@ TB_QUEUE_TYPE=aws-sqs
2 2 TB_QUEUE_AWS_SQS_ACCESS_KEY_ID=YOUR_KEY
3 3 TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY=YOUR_SECRET
4 4 TB_QUEUE_AWS_SQS_REGION=YOUR_REGION
  5 +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000
... ...
1 1 TB_QUEUE_TYPE=pubsub
2 2 TB_QUEUE_PUBSUB_PROJECT_ID=YOUR_PROJECT_ID
3   -TB_QUEUE_PUBSUB_SERVICE_ACCOUNT=YOUR_SERVICE_ACCOUNT
\ No newline at end of file
  3 +TB_QUEUE_PUBSUB_SERVICE_ACCOUNT=YOUR_SERVICE_ACCOUNT
  4 +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000
... ...
1 1 TB_QUEUE_TYPE=service-bus
2 2 TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME=YOUR_NAMESPACE_NAME
3 3 TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME=YOUR_SAS_KEY_NAME
4   -TB_QUEUE_SERVICE_BUS_SAS_KEY=YOUR_SAS_KEY
\ No newline at end of file
  4 +TB_QUEUE_SERVICE_BUS_SAS_KEY=YOUR_SAS_KEY
  5 +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000
... ...
... ... @@ -100,7 +100,7 @@ function AwsSqsProducer() {
100 100 const params = {
101 101 MaxNumberOfMessages: 10,
102 102 QueueUrl: requestQueueURL,
103   - WaitTimeSeconds: poolInterval / 1000
  103 + WaitTimeSeconds: pollInterval / 1000
104 104 };
105 105 while (!stopped) {
106 106 let pollStartTs = new Date().getTime();
... ...
... ... @@ -68,9 +68,8 @@ function RabbitMqProducer() {
68 68 (async () => {
69 69 try {
70 70 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
71   - const url = `amqp://${host}:${port}${vhost}`;
  71 + const url = `amqp://${username}:${password}@${host}:${port}${vhost}`;
72 72
73   - amqp.credentials.amqplain(username, password);
74 73 connection = await new Promise((resolve, reject) => {
75 74 amqp.connect(url, function (err, connection) {
76 75 if (err) {
... ...