Commit 0103da1ba4852f305606c7eb52c4ffd560f78dbc

Authored by Andrii Shvaika
1 parent 076cf655

Improved Callback and Destroy of ndoes

... ... @@ -52,6 +52,7 @@ import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
52 52 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
53 53
54 54 import javax.annotation.PostConstruct;
  55 +import javax.annotation.PreDestroy;
55 56 import java.util.List;
56 57 import java.util.Optional;
57 58 import java.util.UUID;
... ... @@ -98,6 +99,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
98 99 super.init("tb-core-consumer", "tb-core-notifications-consumer");
99 100 }
100 101
  102 + @PreDestroy
  103 + public void destroy(){
  104 + super.destroy();
  105 + }
  106 +
101 107 @Override
102 108 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
103 109 if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
... ... @@ -117,11 +123,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
117 123 }
118 124 ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = msgs.stream().collect(
119 125 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
120   - ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> failedMap = new ConcurrentHashMap<>();
121 126 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
  127 + TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
  128 + processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
122 129 pendingMap.forEach((id, msg) -> {
123 130 log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
124   - TbCallback callback = new TbPackCallback<>(id, processingTimeoutLatch, pendingMap, failedMap);
  131 + TbCallback callback = new TbPackCallback<>(id, ctx);
125 132 try {
126 133 ToCoreMsg toCoreMsg = msg.getValue();
127 134 if (toCoreMsg.hasToSubscriptionMgrMsg()) {
... ... @@ -147,8 +154,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
147 154 }
148 155 });
149 156 if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
150   - pendingMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue()));
151   - failedMap.forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
  157 + ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue()));
  158 + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
152 159 }
153 160 mainConsumer.commit();
154 161 } catch (Exception e) {
... ...
... ... @@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
31 31 import org.thingsboard.server.common.msg.queue.ServiceType;
32 32 import org.thingsboard.server.common.msg.queue.TbCallback;
33 33 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
34   -import org.thingsboard.server.gen.transport.TransportProtos;
35 34 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
36 35 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
37 36 import org.thingsboard.server.queue.TbQueueConsumer;
... ... @@ -64,9 +63,6 @@ import java.util.concurrent.ConcurrentMap;
64 63 import java.util.concurrent.ExecutorService;
65 64 import java.util.concurrent.Executors;
66 65 import java.util.concurrent.TimeUnit;
67   -import java.util.function.BiConsumer;
68   -import java.util.function.Function;
69   -import java.util.stream.Collectors;
70 66
71 67 @Service
72 68 @TbRuleEngineComponent
... ... @@ -116,10 +112,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
116 112
117 113 @PreDestroy
118 114 public void stop() {
  115 + super.destroy();
119 116 if (submitExecutor != null) {
120 117 submitExecutor.shutdownNow();
121 118 }
122   -
123 119 ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config));
124 120 }
125 121
... ... @@ -156,7 +152,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
156 152 submitStrategy.init(msgs);
157 153
158 154 while (!stopped) {
159   - ProcessingAttemptContext ctx = new ProcessingAttemptContext(submitStrategy);
  155 + TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(submitStrategy);
160 156 submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> {
161 157 log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
162 158 ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
... ...
... ... @@ -18,21 +18,17 @@ package org.thingsboard.server.service.queue;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.common.data.id.TenantId;
20 20 import org.thingsboard.server.common.msg.queue.RuleEngineException;
21   -import org.thingsboard.server.common.msg.queue.RuleNodeException;
22   -import org.thingsboard.server.common.msg.queue.TbCallback;
23 21 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
24 22
25 23 import java.util.UUID;
26   -import java.util.concurrent.ConcurrentMap;
27   -import java.util.concurrent.CountDownLatch;
28 24
29 25 @Slf4j
30 26 public class TbMsgPackCallback implements TbMsgCallback {
31 27 private final UUID id;
32 28 private final TenantId tenantId;
33   - private final ProcessingAttemptContext ctx;
  29 + private final TbMsgPackProcessingContext ctx;
34 30
35   - public TbMsgPackCallback(UUID id, TenantId tenantId, ProcessingAttemptContext ctx) {
  31 + public TbMsgPackCallback(UUID id, TenantId tenantId, TbMsgPackProcessingContext ctx) {
36 32 this.id = id;
37 33 this.tenantId = tenantId;
38 34 this.ctx = ctx;
... ...
application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java renamed from application/src/main/java/org/thingsboard/server/service/queue/ProcessingAttemptContext.java
... ... @@ -29,7 +29,7 @@ import java.util.concurrent.CountDownLatch;
29 29 import java.util.concurrent.TimeUnit;
30 30 import java.util.concurrent.atomic.AtomicInteger;
31 31
32   -public class ProcessingAttemptContext {
  32 +public class TbMsgPackProcessingContext {
33 33
34 34 private final TbRuleEngineSubmitStrategy submitStrategy;
35 35
... ... @@ -44,7 +44,7 @@ public class ProcessingAttemptContext {
44 44 @Getter
45 45 private final ConcurrentMap<TenantId, RuleEngineException> exceptionsMap = new ConcurrentHashMap<>();
46 46
47   - public ProcessingAttemptContext(TbRuleEngineSubmitStrategy submitStrategy) {
  47 + public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) {
48 48 this.submitStrategy = submitStrategy;
49 49 this.pendingMap = submitStrategy.getPendingMap();
50 50 this.pendingCount = new AtomicInteger(pendingMap.size());
... ...
... ... @@ -19,44 +19,26 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.common.msg.queue.TbCallback;
20 20
21 21 import java.util.UUID;
22   -import java.util.concurrent.ConcurrentMap;
23   -import java.util.concurrent.CountDownLatch;
24 22
25 23 @Slf4j
26 24 public class TbPackCallback<T> implements TbCallback {
27   - private final CountDownLatch processingTimeoutLatch;
28   - private final ConcurrentMap<UUID, T> ackMap;
29   - private final ConcurrentMap<UUID, T> failedMap;
  25 + private final TbPackProcessingContext<T> ctx;
30 26 private final UUID id;
31 27
32   - public TbPackCallback(UUID id,
33   - CountDownLatch processingTimeoutLatch,
34   - ConcurrentMap<UUID, T> ackMap,
35   - ConcurrentMap<UUID, T> failedMap) {
  28 + public TbPackCallback(UUID id, TbPackProcessingContext<T> ctx) {
36 29 this.id = id;
37   - this.processingTimeoutLatch = processingTimeoutLatch;
38   - this.ackMap = ackMap;
39   - this.failedMap = failedMap;
  30 + this.ctx = ctx;
40 31 }
41 32
42 33 @Override
43 34 public void onSuccess() {
44 35 log.trace("[{}] ON SUCCESS", id);
45   - T msg = ackMap.remove(id);
46   - if (msg != null && ackMap.isEmpty()) {
47   - processingTimeoutLatch.countDown();
48   - }
  36 + ctx.onSuccess(id);
49 37 }
50 38
51 39 @Override
52 40 public void onFailure(Throwable t) {
53 41 log.trace("[{}] ON FAILURE", id, t);
54   - T msg = ackMap.remove(id);
55   - if (msg != null) {
56   - failedMap.put(id, msg);
57   - }
58   - if (ackMap.isEmpty()) {
59   - processingTimeoutLatch.countDown();
60   - }
  42 + ctx.onFailure(id, t);
61 43 }
62 44 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +
  20 +import java.util.UUID;
  21 +import java.util.concurrent.ConcurrentMap;
  22 +import java.util.concurrent.CountDownLatch;
  23 +import java.util.concurrent.TimeUnit;
  24 +import java.util.concurrent.atomic.AtomicInteger;
  25 +
  26 +@Slf4j
  27 +public class TbPackProcessingContext<T> {
  28 +
  29 + private final AtomicInteger pendingCount;
  30 + private final CountDownLatch processingTimeoutLatch;
  31 + private final ConcurrentMap<UUID, T> ackMap;
  32 + private final ConcurrentMap<UUID, T> failedMap;
  33 +
  34 + public TbPackProcessingContext(CountDownLatch processingTimeoutLatch,
  35 + ConcurrentMap<UUID, T> ackMap,
  36 + ConcurrentMap<UUID, T> failedMap) {
  37 + this.processingTimeoutLatch = processingTimeoutLatch;
  38 + this.pendingCount = new AtomicInteger(ackMap.size());
  39 + this.ackMap = ackMap;
  40 + this.failedMap = failedMap;
  41 + }
  42 +
  43 + public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException {
  44 + return processingTimeoutLatch.await(packProcessingTimeout, milliseconds);
  45 + }
  46 +
  47 + public void onSuccess(UUID id) {
  48 + boolean empty = false;
  49 + T msg = ackMap.remove(id);
  50 + if (msg != null) {
  51 + empty = pendingCount.decrementAndGet() == 0;
  52 + }
  53 + if (empty) {
  54 + processingTimeoutLatch.countDown();
  55 + } else {
  56 + if (log.isTraceEnabled()) {
  57 + log.trace("Items left: {}", ackMap.size());
  58 + for (T t : ackMap.values()) {
  59 + log.trace("left item: {}", t);
  60 + }
  61 + }
  62 + }
  63 + }
  64 +
  65 + public void onFailure(UUID id, Throwable t) {
  66 + boolean empty = false;
  67 + T msg = ackMap.remove(id);
  68 + if (msg != null) {
  69 + empty = pendingCount.decrementAndGet() == 0;
  70 + failedMap.put(id, msg);
  71 + if (log.isTraceEnabled()) {
  72 + log.trace("Items left: {}", ackMap.size());
  73 + for (T v : ackMap.values()) {
  74 + log.trace("left item: {}", v);
  75 + }
  76 + }
  77 + }
  78 + if (empty) {
  79 + processingTimeoutLatch.countDown();
  80 + }
  81 + }
  82 +
  83 + public ConcurrentMap<UUID, T> getAckMap() {
  84 + return ackMap;
  85 + }
  86 +
  87 + public ConcurrentMap<UUID, T> getFailedMap() {
  88 + return failedMap;
  89 + }
  90 +}
... ...
... ... @@ -23,11 +23,13 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
23 23 import org.thingsboard.server.actors.ActorSystemContext;
24 24 import org.thingsboard.server.common.msg.queue.ServiceType;
25 25 import org.thingsboard.server.common.msg.queue.TbCallback;
  26 +import org.thingsboard.server.gen.transport.TransportProtos;
26 27 import org.thingsboard.server.queue.TbQueueConsumer;
27 28 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
28 29 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
29 30 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
30 31 import org.thingsboard.server.service.queue.TbPackCallback;
  32 +import org.thingsboard.server.service.queue.TbPackProcessingContext;
31 33
32 34 import javax.annotation.PreDestroy;
33 35 import java.util.List;
... ... @@ -92,11 +94,12 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
92 94 }
93 95 ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = msgs.stream().collect(
94 96 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
95   - ConcurrentMap<UUID, TbProtoQueueMsg<N>> failedMap = new ConcurrentHashMap<>();
96 97 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
  98 + TbPackProcessingContext<TbProtoQueueMsg<N>> ctx = new TbPackProcessingContext<>(
  99 + processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
97 100 pendingMap.forEach((id, msg) -> {
98 101 log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
99   - TbCallback callback = new TbPackCallback<>(id, processingTimeoutLatch, pendingMap, failedMap);
  102 + TbCallback callback = new TbPackCallback<>(id, ctx);
100 103 try {
101 104 handleNotification(id, msg, callback);
102 105 } catch (Throwable e) {
... ... @@ -105,8 +108,8 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
105 108 }
106 109 });
107 110 if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
108   - pendingMap.forEach((id, msg) -> log.warn("[{}] Timeout to process notification: {}", id, msg.getValue()));
109   - failedMap.forEach((id, msg) -> log.warn("[{}] Failed to process notification: {}", id, msg.getValue()));
  111 + ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process notification: {}", id, msg.getValue()));
  112 + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process notification: {}", id, msg.getValue()));
110 113 }
111 114 nfConsumer.commit();
112 115 } catch (Exception e) {
... ...
... ... @@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.id.TenantId;
20 20 import org.thingsboard.server.common.msg.queue.RuleEngineException;
21 21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
22 22 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
23   -import org.thingsboard.server.service.queue.ProcessingAttemptContext;
  23 +import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
24 24
25 25 import java.util.UUID;
26 26 import java.util.concurrent.ConcurrentMap;
... ... @@ -32,9 +32,9 @@ public class TbRuleEngineProcessingResult {
32 32 @Getter
33 33 private final boolean timeout;
34 34 @Getter
35   - private final ProcessingAttemptContext ctx;
  35 + private final TbMsgPackProcessingContext ctx;
36 36
37   - public TbRuleEngineProcessingResult(boolean timeout, ProcessingAttemptContext ctx) {
  37 + public TbRuleEngineProcessingResult(boolean timeout, TbMsgPackProcessingContext ctx) {
38 38 this.timeout = timeout;
39 39 this.ctx = ctx;
40 40 this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();
... ...
application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java renamed from application/src/test/java/org/thingsboard/server/service/queue/ProcessingAttemptContextTest.java
... ... @@ -37,7 +37,7 @@ import static org.mockito.Mockito.when;
37 37
38 38 @Slf4j
39 39 @RunWith(MockitoJUnitRunner.class)
40   -public class ProcessingAttemptContextTest {
  40 +public class TbMsgPackProcessingContextTest {
41 41
42 42 @Test
43 43 public void testHighConcurrencyCase() throws InterruptedException {
... ... @@ -51,7 +51,7 @@ public class ProcessingAttemptContextTest {
51 51 messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null));
52 52 }
53 53 when(strategyMock.getPendingMap()).thenReturn(messages);
54   - ProcessingAttemptContext context = new ProcessingAttemptContext(strategyMock);
  54 + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock);
55 55 for (UUID uuid : messages.keySet()) {
56 56 for (int i = 0; i < parallelCount; i++) {
57 57 executorService.submit(() -> context.onSuccess(uuid));
... ...