Commit 5a0336aae21bb1ad293a035e52d1b1d65ef9aa8c
Committed by
Andrew Shvayka
1 parent
053d1d8e
tests: refactored actor system Test (Thread.sleep replaced with countDownLatch)
Showing
2 changed files
with
39 additions
and
13 deletions
@@ -41,6 +41,7 @@ public class ActorSystemTest { | @@ -41,6 +41,7 @@ public class ActorSystemTest { | ||
41 | 41 | ||
42 | public static final String ROOT_DISPATCHER = "root-dispatcher"; | 42 | public static final String ROOT_DISPATCHER = "root-dispatcher"; |
43 | private static final int _100K = 100 * 1024; | 43 | private static final int _100K = 100 * 1024; |
44 | + public static final int TIMEOUT_AWAIT_MAX_SEC = 10; | ||
44 | 45 | ||
45 | private volatile TbActorSystem actorSystem; | 46 | private volatile TbActorSystem actorSystem; |
46 | private volatile ExecutorService submitPool; | 47 | private volatile ExecutorService submitPool; |
@@ -52,7 +53,7 @@ public class ActorSystemTest { | @@ -52,7 +53,7 @@ public class ActorSystemTest { | ||
52 | parallelism = Math.max(2, cores / 2); | 53 | parallelism = Math.max(2, cores / 2); |
53 | TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); | 54 | TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); |
54 | actorSystem = new DefaultTbActorSystem(settings); | 55 | actorSystem = new DefaultTbActorSystem(settings); |
55 | - submitPool = Executors.newWorkStealingPool(parallelism); | 56 | + submitPool = Executors.newFixedThreadPool(parallelism); //order guaranteed |
56 | } | 57 | } |
57 | 58 | ||
58 | @After | 59 | @After |
@@ -122,13 +123,23 @@ public class ActorSystemTest { | @@ -122,13 +123,23 @@ public class ActorSystemTest { | ||
122 | ActorTestCtx testCtx1 = getActorTestCtx(1); | 123 | ActorTestCtx testCtx1 = getActorTestCtx(1); |
123 | ActorTestCtx testCtx2 = getActorTestCtx(1); | 124 | ActorTestCtx testCtx2 = getActorTestCtx(1); |
124 | TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); | 125 | TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); |
125 | - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1))); | ||
126 | - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2))); | 126 | + final CountDownLatch initLatch = new CountDownLatch(1); |
127 | + final CountDownLatch actorsReadyLatch = new CountDownLatch(2); | ||
128 | + submitPool.submit(() -> { | ||
129 | + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1, initLatch)); | ||
130 | + actorsReadyLatch.countDown(); | ||
131 | + }); | ||
132 | + submitPool.submit(() -> { | ||
133 | + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2, initLatch)); | ||
134 | + actorsReadyLatch.countDown(); | ||
135 | + }); | ||
136 | + | ||
137 | + initLatch.countDown(); //replacement for Thread.wait(500) in the SlowCreateActorCreator | ||
138 | + Assert.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); | ||
127 | 139 | ||
128 | - Thread.sleep(1000); | ||
129 | actorSystem.tell(actorId, new IntTbActorMsg(42)); | 140 | actorSystem.tell(actorId, new IntTbActorMsg(42)); |
130 | 141 | ||
131 | - Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); | 142 | + Assert.assertTrue(testCtx1.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); |
132 | Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); | 143 | Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); |
133 | } | 144 | } |
134 | 145 | ||
@@ -137,13 +148,21 @@ public class ActorSystemTest { | @@ -137,13 +148,21 @@ public class ActorSystemTest { | ||
137 | actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); | 148 | actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); |
138 | ActorTestCtx testCtx = getActorTestCtx(1); | 149 | ActorTestCtx testCtx = getActorTestCtx(1); |
139 | TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); | 150 | TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); |
140 | - for (int i = 0; i < 1000; i++) { | ||
141 | - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); | 151 | + final int actorsCount = 1000; |
152 | + final CountDownLatch initLatch = new CountDownLatch(1); | ||
153 | + final CountDownLatch actorsReadyLatch = new CountDownLatch(actorsCount); | ||
154 | + for (int i = 0; i < actorsCount; i++) { | ||
155 | + submitPool.submit(() -> { | ||
156 | + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx, initLatch)); | ||
157 | + actorsReadyLatch.countDown(); | ||
158 | + }); | ||
142 | } | 159 | } |
143 | - Thread.sleep(1000); | 160 | + initLatch.countDown(); |
161 | + Assert.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); | ||
162 | + | ||
144 | actorSystem.tell(actorId, new IntTbActorMsg(42)); | 163 | actorSystem.tell(actorId, new IntTbActorMsg(42)); |
145 | 164 | ||
146 | - Assert.assertTrue(testCtx.getLatch().await(1, TimeUnit.SECONDS)); | 165 | + Assert.assertTrue(testCtx.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); |
147 | //One for creation and one for message | 166 | //One for creation and one for message |
148 | Assert.assertEquals(2, testCtx.getInvocationCount().get()); | 167 | Assert.assertEquals(2, testCtx.getInvocationCount().get()); |
149 | } | 168 | } |
@@ -17,13 +17,18 @@ package org.thingsboard.server.actors; | @@ -17,13 +17,18 @@ package org.thingsboard.server.actors; | ||
17 | 17 | ||
18 | import lombok.extern.slf4j.Slf4j; | 18 | import lombok.extern.slf4j.Slf4j; |
19 | 19 | ||
20 | +import java.util.concurrent.CountDownLatch; | ||
21 | +import java.util.concurrent.TimeUnit; | ||
22 | + | ||
20 | @Slf4j | 23 | @Slf4j |
21 | public class SlowCreateActor extends TestRootActor { | 24 | public class SlowCreateActor extends TestRootActor { |
22 | 25 | ||
23 | - public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx) { | 26 | + public static final int TIMEOUT_AWAIT_MAX_MS = 5000; |
27 | + | ||
28 | + public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) { | ||
24 | super(actorId, testCtx); | 29 | super(actorId, testCtx); |
25 | try { | 30 | try { |
26 | - Thread.sleep(500); | 31 | + initLatch.await(TIMEOUT_AWAIT_MAX_MS, TimeUnit.MILLISECONDS); |
27 | } catch (InterruptedException e) { | 32 | } catch (InterruptedException e) { |
28 | e.printStackTrace(); | 33 | e.printStackTrace(); |
29 | } | 34 | } |
@@ -34,10 +39,12 @@ public class SlowCreateActor extends TestRootActor { | @@ -34,10 +39,12 @@ public class SlowCreateActor extends TestRootActor { | ||
34 | 39 | ||
35 | private final TbActorId actorId; | 40 | private final TbActorId actorId; |
36 | private final ActorTestCtx testCtx; | 41 | private final ActorTestCtx testCtx; |
42 | + private final CountDownLatch initLatch; | ||
37 | 43 | ||
38 | - public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx) { | 44 | + public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) { |
39 | this.actorId = actorId; | 45 | this.actorId = actorId; |
40 | this.testCtx = testCtx; | 46 | this.testCtx = testCtx; |
47 | + this.initLatch = initLatch; | ||
41 | } | 48 | } |
42 | 49 | ||
43 | @Override | 50 | @Override |
@@ -47,7 +54,7 @@ public class SlowCreateActor extends TestRootActor { | @@ -47,7 +54,7 @@ public class SlowCreateActor extends TestRootActor { | ||
47 | 54 | ||
48 | @Override | 55 | @Override |
49 | public TbActor createActor() { | 56 | public TbActor createActor() { |
50 | - return new SlowCreateActor(actorId, testCtx); | 57 | + return new SlowCreateActor(actorId, testCtx, initLatch); |
51 | } | 58 | } |
52 | } | 59 | } |
53 | } | 60 | } |