Commit 3358c061da7d013e632cdd76f858f26c5f833471

Authored by Andrii Shvaika
1 parent 5ac45e43

Improvements to Mailbox. HighPriorityQueue

Showing 17 changed files with 157 additions and 71 deletions
... ... @@ -514,6 +514,9 @@ public class ActorSystemContext {
514 514 appActor.tell(tbActorMsg);
515 515 }
516 516
  517 + public void tellWithHighPriority(TbActorMsg tbActorMsg) {
  518 + appActor.tellWithHighPriority(tbActorMsg);
  519 + }
517 520
518 521 public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) {
519 522 log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor {
82 82 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
83 83 break;
84 84 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
  85 + onToDeviceActorMsg((TenantAwareMsg) msg, false);
  86 + break;
85 87 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
86 88 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
87 89 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
88 90 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
89 91 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
90   - onToDeviceActorMsg((TenantAwareMsg) msg);
  92 + onToDeviceActorMsg((TenantAwareMsg) msg, true);
91 93 break;
92 94 default:
93 95 return false;
... ... @@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor {
155 157 }
156 158 }
157 159 if (target != null) {
158   - target.tell(msg);
  160 + target.tellWithHighPriority(msg);
159 161 } else {
160 162 log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
161 163 }
162 164 }
163 165
164   - private void onToDeviceActorMsg(TenantAwareMsg msg) {
  166 + private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
165 167 if (!deletedTenants.contains(msg.getTenantId())) {
166   - getOrCreateTenantActor(msg.getTenantId()).tell(msg);
  168 + TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
  169 + if (priority) {
  170 + tenantActor.tellWithHighPriority(msg);
  171 + } else {
  172 + tenantActor.tell(msg);
  173 + }
167 174 } else {
168 175 if (msg instanceof TransportToDeviceActorMsgWrapper) {
169 176 ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
... ...
... ... @@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
128 128 } else {
129 129 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
130 130 existing.setSelf(ruleNode);
131   - existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
  131 + existing.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
132 132 }
133 133 }
134 134
... ... @@ -137,7 +137,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
137 137 removedRules.forEach(ruleNodeId -> {
138 138 log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
139 139 RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
140   - removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
  140 + removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
141 141 });
142 142
143 143 initRoutes(ruleChain, ruleNodeList);
... ... @@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
155 155
156 156 @Override
157 157 public void onPartitionChangeMsg(PartitionChangeMsg msg) {
158   - nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tell(msg));
  158 + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
159 159 }
160 160
161 161 private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
... ...
... ... @@ -83,7 +83,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
83 83 public void destroy() {
84 84 try {
85 85 log.debug("[{}][{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
86   - processor.stop(ctx);
  86 + if (processor != null) {
  87 + processor.stop(ctx);
  88 + }
87 89 logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
88 90 } catch (Exception e) {
89 91 log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -104,19 +104,23 @@ public class DefaultActorService implements ActorService {
104 104 int cores = Runtime.getRuntime().availableProcessors();
105 105 poolSize = Math.max(1, cores / 2);
106 106 }
107   - return Executors.newWorkStealingPool(poolSize);
  107 + if (poolSize == 1) {
  108 + return Executors.newFixedThreadPool(1);
  109 + } else {
  110 + return Executors.newWorkStealingPool(poolSize);
  111 + }
108 112 }
109 113
110 114 @EventListener(ApplicationReadyEvent.class)
111 115 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
112 116 log.info("Received application ready event. Sending application init message to actor system");
113   - appActor.tell(new AppInitMsg());
  117 + appActor.tellWithHighPriority(new AppInitMsg());
114 118 }
115 119
116 120 @EventListener(PartitionChangeEvent.class)
117 121 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
118 122 log.info("Received partition change event.");
119   - this.appActor.tell(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
  123 + this.appActor.tellWithHighPriority(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
120 124 }
121 125
122 126 @PreDestroy
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -135,12 +135,14 @@ public class TenantActor extends RuleChainManagerActor {
135 135 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
136 136 break;
137 137 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
  138 + onToDeviceActorMsg((DeviceAwareMsg) msg, false);
  139 + break;
138 140 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
139 141 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
140 142 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
141 143 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
142 144 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
143   - onToDeviceActorMsg((DeviceAwareMsg) msg);
  145 + onToDeviceActorMsg((DeviceAwareMsg) msg, true);
144 146 break;
145 147 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
146 148 onRuleChainMsg((RuleChainAwareMsg) msg);
... ... @@ -183,11 +185,16 @@ public class TenantActor extends RuleChainManagerActor {
183 185 getOrCreateActor(msg.getRuleChainId()).tell(msg);
184 186 }
185 187
186   - private void onToDeviceActorMsg(DeviceAwareMsg msg) {
  188 + private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
187 189 if (!isCore) {
188 190 log.warn("RECEIVED INVALID MESSAGE: {}", msg);
189 191 }
190   - getOrCreateDeviceActor(msg.getDeviceId()).tell(msg);
  192 + TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
  193 + if (priority) {
  194 + deviceActor.tellWithHighPriority(msg);
  195 + } else {
  196 + deviceActor.tell(msg);
  197 + }
191 198 }
192 199
193 200 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
... ... @@ -199,7 +206,7 @@ public class TenantActor extends RuleChainManagerActor {
199 206 findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
200 207 visit(ruleChain, target);
201 208 }
202   - target.tell(msg);
  209 + target.tellWithHighPriority(msg);
203 210 } else {
204 211 log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
205 212 }
... ...
... ... @@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
207 207 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
208 208 if (actorMsg.isPresent()) {
209 209 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
210   - actorContext.tell(actorMsg.get());
  210 + actorContext.tellWithHighPriority(actorMsg.get());
211 211 }
212 212 callback.onSuccess();
213 213 }
... ...
... ... @@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
230 230 Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
231 231 if (actorMsg.isPresent()) {
232 232 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
233   - actorContext.tell(actorMsg.get());
  233 + actorContext.tellWithHighPriority(actorMsg.get());
234 234 }
235 235 callback.onSuccess();
236 236 } else if (nfMsg.hasFromDeviceRpcResponse()) {
... ...
... ... @@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
121 121 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
122 122 UUID requestId = request.getId();
123 123 localToDeviceRpcRequests.put(requestId, rpcMsg);
124   - actorContext.tell(rpcMsg);
  124 + actorContext.tellWithHighPriority(rpcMsg);
125 125 scheduleToDeviceTimeout(request, requestId);
126 126 }
127 127
... ... @@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
175 175 }
176 176
177 177 private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) {
178   - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
  178 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
179 179 log.trace("[{}] processing to rule engine request.", requestId);
180 180 scheduler.schedule(() -> {
181 181 log.trace("[{}] timeout for processing to rule engine request.", requestId);
... ... @@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
187 187 }
188 188
189 189 private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) {
190   - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
  190 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
191 191 log.trace("[{}] processing to device request.", requestId);
192 192 scheduler.schedule(() -> {
193 193 log.trace("[{}] timeout for to device request.", requestId);
... ...
... ... @@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
193 193 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}";
194 194 String deviceId = savedDevice.getId().getId().toString();
195 195
196   - doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
  196 + doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(409),
197 197 asyncContextTimeoutToUseRpcPlugin);
198 198 }
199 199
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem {
132 132 }
133 133
134 134 @Override
135   - public void tell(TbActorRef target, TbActorMsg actorMsg) {
136   - target.tell(actorMsg);
  135 + public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) {
  136 + tell(target, actorMsg, true);
137 137 }
138 138
139 139 @Override
140 140 public void tell(TbActorId target, TbActorMsg actorMsg) {
  141 + tell(target, actorMsg, false);
  142 + }
  143 +
  144 + private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) {
141 145 TbActorMailbox mailbox = actors.get(target);
142 146 if (mailbox == null) {
143 147 throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!");
144 148 }
145   - mailbox.enqueue(actorMsg);
  149 + if (highPriority) {
  150 + mailbox.tellWithHighPriority(actorMsg);
  151 + } else {
  152 + mailbox.tell(actorMsg);
  153 + }
146 154 }
147 155
  156 +
148 157 @Override
149 158 public void broadcastToChildren(TbActorId parent, TbActorMsg msg) {
150 159 broadcastToChildren(parent, id -> true, msg);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -29,6 +29,9 @@ import java.util.function.Supplier;
29 29 @Slf4j
30 30 @Data
31 31 public final class TbActorMailbox implements TbActorCtx {
  32 + private static final boolean HIGH_PRIORITY = true;
  33 + private static final boolean NORMAL_PRIORITY = false;
  34 +
32 35 private static final boolean FREE = false;
33 36 private static final boolean BUSY = true;
34 37
... ... @@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx {
41 44 private final TbActorRef parentRef;
42 45 private final TbActor actor;
43 46 private final Dispatcher dispatcher;
44   - private final ConcurrentLinkedQueue<TbActorMsg> msgs = new ConcurrentLinkedQueue<>();
  47 + private final ConcurrentLinkedQueue<TbActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>();
  48 + private final ConcurrentLinkedQueue<TbActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>();
45 49 private final AtomicBoolean busy = new AtomicBoolean(FREE);
46 50 private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
47 51 private final AtomicBoolean destroyInProgress = new AtomicBoolean();
... ... @@ -78,23 +82,38 @@ public final class TbActorMailbox implements TbActorCtx {
78 82 }
79 83 }
80 84
81   - public void enqueue(TbActorMsg msg) {
82   - msgs.add(msg);
  85 + private void enqueue(TbActorMsg msg, boolean highPriority) {
  86 + if (highPriority) {
  87 + highPriorityMsgs.add(msg);
  88 + } else {
  89 + normalPriorityMsgs.add(msg);
  90 + }
83 91 tryProcessQueue(true);
84 92 }
85 93
86 94 private void tryProcessQueue(boolean newMsg) {
87   - if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) {
88   - dispatcher.getExecutor().execute(this::processMailbox);
  95 + if (ready.get() == READY) {
  96 + if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
  97 + if (busy.compareAndSet(FREE, BUSY)) {
  98 + dispatcher.getExecutor().execute(this::processMailbox);
  99 + } else {
  100 + log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
  101 + }
  102 + } else {
  103 + log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
  104 + }
89 105 } else {
90   - log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
  106 + log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
91 107 }
92 108 }
93 109
94 110 private void processMailbox() {
95 111 boolean noMoreElements = false;
96 112 for (int i = 0; i < settings.getActorThroughput(); i++) {
97   - TbActorMsg msg = msgs.poll();
  113 + TbActorMsg msg = highPriorityMsgs.poll();
  114 + if (msg == null) {
  115 + msg = normalPriorityMsgs.poll();
  116 + }
98 117 if (msg != null) {
99 118 try {
100 119 log.debug("[{}] Going to process message: {}", selfId, msg);
... ... @@ -178,6 +197,12 @@ public final class TbActorMailbox implements TbActorCtx {
178 197
179 198 @Override
180 199 public void tell(TbActorMsg actorMsg) {
181   - enqueue(actorMsg);
  200 + enqueue(actorMsg, NORMAL_PRIORITY);
  201 + }
  202 +
  203 + @Override
  204 + public void tellWithHighPriority(TbActorMsg actorMsg) {
  205 + enqueue(actorMsg, HIGH_PRIORITY);
182 206 }
  207 +
183 208 }
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -23,4 +23,6 @@ public interface TbActorRef {
23 23
24 24 void tell(TbActorMsg actorMsg);
25 25
  26 + void tellWithHighPriority(TbActorMsg actorMsg);
  27 +
26 28 }
... ...
... ... @@ -36,10 +36,10 @@ public interface TbActorSystem {
36 36
37 37 TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent);
38 38
39   - void tell(TbActorRef target, TbActorMsg actorMsg);
40   -
41 39 void tell(TbActorId target, TbActorMsg actorMsg);
42 40
  41 + void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg);
  42 +
43 43 void stop(TbActorRef actorRef);
44 44
45 45 void stop(TbActorId actorId);
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -42,8 +42,8 @@ public class ActorSystemTest {
42 42 public static final String ROOT_DISPATCHER = "root-dispatcher";
43 43 private static final int _1M = 1024 * 1024;
44 44
45   - private TbActorSystem actorSystem;
46   - private ExecutorService submitPool;
  45 + private volatile TbActorSystem actorSystem;
  46 + private volatile ExecutorService submitPool;
47 47
48 48 @Before
49 49 public void initActorSystem() {
... ... @@ -52,7 +52,11 @@ public class ActorSystemTest {
52 52 TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
53 53 actorSystem = new DefaultTbActorSystem(settings);
54 54 submitPool = Executors.newWorkStealingPool(parallelism);
  55 +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newCachedThreadPool());
  56 +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(parallelism));
55 57 actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  58 +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(1));
  59 +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(1));
56 60 }
57 61
58 62 @After
... ... @@ -62,18 +66,28 @@ public class ActorSystemTest {
62 66 }
63 67
64 68 @Test
  69 + public void test1actorsAnd1MMessages() throws InterruptedException {
  70 + testActorsAndMessages(1, _1M, 5);
  71 + }
  72 +
  73 + @Test
65 74 public void test10actorsAnd1MMessages() throws InterruptedException {
66   - testActorsAndMessages(10, _1M);
  75 + testActorsAndMessages(10, _1M, 5);
  76 + }
  77 +
  78 + @Test
  79 + public void test1MActorsAnd1Messages5times() throws InterruptedException {
  80 + testActorsAndMessages(_1M, 1, 5);
67 81 }
68 82
69 83 @Test
70 84 public void test1MActorsAnd10Messages() throws InterruptedException {
71   - testActorsAndMessages(_1M, 10);
  85 + testActorsAndMessages(_1M, 10, 1);
72 86 }
73 87
74 88 @Test
75 89 public void test1KActorsAnd1KMessages() throws InterruptedException {
76   - testActorsAndMessages(1000, 1000);
  90 + testActorsAndMessages(1000, 1000, 10);
77 91 }
78 92
79 93 @Test
... ... @@ -86,8 +100,8 @@ public class ActorSystemTest {
86 100 TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator(
87 101 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2));
88 102
89   - actorSystem.tell(actorId1, new IntTbActorMsg(42));
90   - actorSystem.tell(actorId2, new IntTbActorMsg(42));
  103 + actorId1.tell(new IntTbActorMsg(42));
  104 + actorId2.tell(new IntTbActorMsg(42));
91 105 actorSystem.stop(actorId1);
92 106
93 107 Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
... ... @@ -113,7 +127,7 @@ public class ActorSystemTest {
113 127 public void testActorCreatorCalledOnce() throws InterruptedException {
114 128 ActorTestCtx testCtx = getActorTestCtx(1);
115 129 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
116   - for(int i =0; i < 1000; i++) {
  130 + for (int i = 0; i < 1000; i++) {
117 131 submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx)));
118 132 }
119 133 Thread.sleep(1000);
... ... @@ -125,7 +139,7 @@ public class ActorSystemTest {
125 139 }
126 140
127 141
128   - public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException {
  142 + public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException {
129 143 Random random = new Random();
130 144 int[] randomIntegers = new int[msgNumber];
131 145 long sumTmp = 0;
... ... @@ -141,32 +155,35 @@ public class ActorSystemTest {
141 155 List<TbActorRef> actorRefs = new ArrayList<>();
142 156 for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) {
143 157 ActorTestCtx testCtx = getActorTestCtx(msgNumber);
144   -
145 158 actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator(
146 159 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx)));
147 160 testCtxes.add(testCtx);
148 161 }
149 162
150   - long start = System.nanoTime();
151   -
152   - for (int i = 0; i < msgNumber; i++) {
153   - int tmp = randomIntegers[i];
154   - submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp))));
155   - }
156   - log.info("Submitted all messages");
157   -
158   - testCtxes.forEach(ctx -> {
159   - try {
160   - Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES));
161   - Assert.assertEquals(expected, ctx.getActual().get());
162   - Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());
163   - } catch (InterruptedException e) {
164   - e.printStackTrace();
  163 + for (int t = 0; t < times; t++) {
  164 + long start = System.nanoTime();
  165 + for (int i = 0; i < msgNumber; i++) {
  166 + int tmp = randomIntegers[i];
  167 + submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp))));
165 168 }
166   - });
167   -
168   - long duration = System.nanoTime() - start;
169   - log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration));
  169 + log.info("Submitted all messages");
  170 + testCtxes.forEach(ctx -> {
  171 + try {
  172 + boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES);
  173 + if(!success){
  174 + log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
  175 + }
  176 + Assert.assertTrue(success);
  177 + Assert.assertEquals(expected, ctx.getActual().get());
  178 + Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());
  179 + ctx.clear();
  180 + } catch (InterruptedException e) {
  181 + e.printStackTrace();
  182 + }
  183 + });
  184 + long duration = System.nanoTime() - start;
  185 + log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration));
  186 + }
170 187 }
171 188
172 189 private ActorTestCtx getActorTestCtx(int i) {
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors;
17 17
  18 +import lombok.AllArgsConstructor;
18 19 import lombok.Data;
19 20
20 21 import java.util.concurrent.CountDownLatch;
... ... @@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger;
22 23 import java.util.concurrent.atomic.AtomicLong;
23 24
24 25 @Data
  26 +@AllArgsConstructor
25 27 public class ActorTestCtx {
26 28
27   - private final CountDownLatch latch;
  29 + private volatile CountDownLatch latch;
28 30 private final AtomicInteger invocationCount;
29 31 private final int expectedInvocationCount;
30 32 private final AtomicLong actual;
  33 +
  34 + public void clear() {
  35 + latch = new CountDownLatch(1);
  36 + invocationCount.set(0);
  37 + actual.set(0L);
  38 + }
31 39 }
... ...
... ... @@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor {
51 51 if (count == testCtx.getExpectedInvocationCount()) {
52 52 testCtx.getActual().set(sum);
53 53 testCtx.getInvocationCount().addAndGet(count);
  54 + sum = 0;
  55 + count = 0;
54 56 testCtx.getLatch().countDown();
55 57 }
56 58 }
... ...