Commit cc4f746b1d343d81d0383e037bd0c9eaff09b9bf

Authored by Andrii Shvaika
1 parent 3358c061

Using Exectutor in Kafka Node. NEVER use Fork-Join pool with parallelism 1

@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
21 import org.springframework.boot.context.event.ApplicationReadyEvent; 21 import org.springframework.boot.context.event.ApplicationReadyEvent;
22 import org.springframework.context.event.EventListener; 22 import org.springframework.context.event.EventListener;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
24 import org.thingsboard.server.actors.ActorSystemContext; 25 import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.actors.DefaultTbActorSystem; 26 import org.thingsboard.server.actors.DefaultTbActorSystem;
26 import org.thingsboard.server.actors.TbActorId; 27 import org.thingsboard.server.actors.TbActorId;
@@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService { @@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService {
83 TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); 84 TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
84 system = new DefaultTbActorSystem(settings); 85 system = new DefaultTbActorSystem(settings);
85 86
86 - system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize));  
87 - system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize));  
88 - system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize));  
89 - system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize)); 87 + system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
  88 + system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
  89 + system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
  90 + system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
90 91
91 actorContext.setActorSystem(system); 92 actorContext.setActorSystem(system);
92 93
@@ -99,13 +100,13 @@ public class DefaultActorService implements ActorService { @@ -99,13 +100,13 @@ public class DefaultActorService implements ActorService {
99 log.info("Actor system initialized."); 100 log.info("Actor system initialized.");
100 } 101 }
101 102
102 - private ExecutorService initDispatcherExecutor(int poolSize) { 103 + private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
103 if (poolSize == 0) { 104 if (poolSize == 0) {
104 int cores = Runtime.getRuntime().availableProcessors(); 105 int cores = Runtime.getRuntime().availableProcessors();
105 poolSize = Math.max(1, cores / 2); 106 poolSize = Math.max(1, cores / 2);
106 } 107 }
107 if (poolSize == 1) { 108 if (poolSize == 1) {
108 - return Executors.newFixedThreadPool(1); 109 + return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName));
109 } else { 110 } else {
110 return Executors.newWorkStealingPool(poolSize); 111 return Executors.newWorkStealingPool(poolSize);
111 } 112 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
164 } 164 }
165 165
166 private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) { 166 private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) {
167 - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 167 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
168 log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); 168 log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
169 scheduler.schedule(() -> { 169 scheduler.schedule(() -> {
170 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); 170 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
@@ -26,7 +26,9 @@ import java.util.Arrays; @@ -26,7 +26,9 @@ import java.util.Arrays;
26 26
27 @RunWith(ClasspathSuite.class) 27 @RunWith(ClasspathSuite.class)
28 @ClasspathSuite.ClassnameFilters({ 28 @ClasspathSuite.ClassnameFilters({
29 - "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"}) 29 + "org.thingsboard.server.mqtt.rpc.sql.*Test",
  30 + "org.thingsboard.server.mqtt.telemetry.sql.*Test"
  31 +})
30 public class MqttSqlTestSuite { 32 public class MqttSqlTestSuite {
31 33
32 @ClassRule 34 @ClassRule
@@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
136 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; 136 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}";
137 String deviceId = savedDevice.getId().getId().toString(); 137 String deviceId = savedDevice.getId().getId().toString();
138 138
139 - doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), 139 + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409),
140 asyncContextTimeoutToUseRpcPlugin); 140 asyncContextTimeoutToUseRpcPlugin);
141 } 141 }
142 142
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -54,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx { @@ -54,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx {
54 dispatcher.getExecutor().execute(() -> tryInit(1)); 54 dispatcher.getExecutor().execute(() -> tryInit(1));
55 } 55 }
56 56
57 -  
58 private void tryInit(int attempt) { 57 private void tryInit(int attempt) {
59 try { 58 try {
60 log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); 59 log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt);
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -40,23 +40,19 @@ import java.util.concurrent.atomic.AtomicLong; @@ -40,23 +40,19 @@ import java.util.concurrent.atomic.AtomicLong;
40 public class ActorSystemTest { 40 public class ActorSystemTest {
41 41
42 public static final String ROOT_DISPATCHER = "root-dispatcher"; 42 public static final String ROOT_DISPATCHER = "root-dispatcher";
43 - private static final int _1M = 1024 * 1024; 43 + private static final int _100K = 100 * 1024;
44 44
45 private volatile TbActorSystem actorSystem; 45 private volatile TbActorSystem actorSystem;
46 private volatile ExecutorService submitPool; 46 private volatile ExecutorService submitPool;
  47 + private int parallelism;
47 48
48 @Before 49 @Before
49 public void initActorSystem() { 50 public void initActorSystem() {
50 int cores = Runtime.getRuntime().availableProcessors(); 51 int cores = Runtime.getRuntime().availableProcessors();
51 - int parallelism = Math.max(1, cores / 2); 52 + parallelism = Math.max(2, cores / 2);
52 TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); 53 TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
53 actorSystem = new DefaultTbActorSystem(settings); 54 actorSystem = new DefaultTbActorSystem(settings);
54 submitPool = Executors.newWorkStealingPool(parallelism); 55 submitPool = Executors.newWorkStealingPool(parallelism);
55 -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newCachedThreadPool());  
56 -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(parallelism));  
57 - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));  
58 -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(1));  
59 -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(1));  
60 } 56 }
61 57
62 @After 58 @After
@@ -66,32 +62,44 @@ public class ActorSystemTest { @@ -66,32 +62,44 @@ public class ActorSystemTest {
66 } 62 }
67 63
68 @Test 64 @Test
69 - public void test1actorsAnd1MMessages() throws InterruptedException {  
70 - testActorsAndMessages(1, _1M, 5); 65 + public void test1actorsAnd100KMessages() throws InterruptedException {
  66 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  67 + testActorsAndMessages(1, _100K, 1);
  68 + }
  69 +
  70 + @Test
  71 + public void test10actorsAnd100KMessages() throws InterruptedException {
  72 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  73 + testActorsAndMessages(10, _100K, 1);
71 } 74 }
72 75
73 @Test 76 @Test
74 - public void test10actorsAnd1MMessages() throws InterruptedException {  
75 - testActorsAndMessages(10, _1M, 5); 77 + public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException {
  78 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor());
  79 + testActorsAndMessages(_100K, 1, 5);
76 } 80 }
77 81
78 @Test 82 @Test
79 - public void test1MActorsAnd1Messages5times() throws InterruptedException {  
80 - testActorsAndMessages(_1M, 1, 5); 83 + public void test100KActorsAnd1Messages5times() throws InterruptedException {
  84 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  85 + testActorsAndMessages(_100K, 1, 5);
81 } 86 }
82 87
83 @Test 88 @Test
84 - public void test1MActorsAnd10Messages() throws InterruptedException {  
85 - testActorsAndMessages(_1M, 10, 1); 89 + public void test100KActorsAnd10Messages() throws InterruptedException {
  90 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  91 + testActorsAndMessages(_100K, 10, 1);
86 } 92 }
87 93
88 @Test 94 @Test
89 public void test1KActorsAnd1KMessages() throws InterruptedException { 95 public void test1KActorsAnd1KMessages() throws InterruptedException {
  96 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
90 testActorsAndMessages(1000, 1000, 10); 97 testActorsAndMessages(1000, 1000, 10);
91 } 98 }
92 99
93 @Test 100 @Test
94 public void testNoMessagesAfterDestroy() throws InterruptedException { 101 public void testNoMessagesAfterDestroy() throws InterruptedException {
  102 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
95 ActorTestCtx testCtx1 = getActorTestCtx(1); 103 ActorTestCtx testCtx1 = getActorTestCtx(1);
96 ActorTestCtx testCtx2 = getActorTestCtx(1); 104 ActorTestCtx testCtx2 = getActorTestCtx(1);
97 105
@@ -105,11 +113,12 @@ public class ActorSystemTest { @@ -105,11 +113,12 @@ public class ActorSystemTest {
105 actorSystem.stop(actorId1); 113 actorSystem.stop(actorId1);
106 114
107 Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); 115 Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
108 - Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); 116 + Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
109 } 117 }
110 118
111 @Test 119 @Test
112 public void testOneActorCreated() throws InterruptedException { 120 public void testOneActorCreated() throws InterruptedException {
  121 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
113 ActorTestCtx testCtx1 = getActorTestCtx(1); 122 ActorTestCtx testCtx1 = getActorTestCtx(1);
114 ActorTestCtx testCtx2 = getActorTestCtx(1); 123 ActorTestCtx testCtx2 = getActorTestCtx(1);
115 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); 124 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
@@ -119,12 +128,13 @@ public class ActorSystemTest { @@ -119,12 +128,13 @@ public class ActorSystemTest {
119 Thread.sleep(1000); 128 Thread.sleep(1000);
120 actorSystem.tell(actorId, new IntTbActorMsg(42)); 129 actorSystem.tell(actorId, new IntTbActorMsg(42));
121 130
122 - Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS));  
123 - Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); 131 + Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
  132 + Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
124 } 133 }
125 134
126 @Test 135 @Test
127 public void testActorCreatorCalledOnce() throws InterruptedException { 136 public void testActorCreatorCalledOnce() throws InterruptedException {
  137 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
128 ActorTestCtx testCtx = getActorTestCtx(1); 138 ActorTestCtx testCtx = getActorTestCtx(1);
129 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); 139 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
130 for (int i = 0; i < 1000; i++) { 140 for (int i = 0; i < 1000; i++) {
@@ -170,7 +180,7 @@ public class ActorSystemTest { @@ -170,7 +180,7 @@ public class ActorSystemTest {
170 testCtxes.forEach(ctx -> { 180 testCtxes.forEach(ctx -> {
171 try { 181 try {
172 boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); 182 boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES);
173 - if(!success){ 183 + if (!success) {
174 log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); 184 log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
175 } 185 }
176 Assert.assertTrue(success); 186 Assert.assertTrue(success);
@@ -96,7 +96,19 @@ public class TbKafkaNode implements TbNode { @@ -96,7 +96,19 @@ public class TbKafkaNode implements TbNode {
96 public void onMsg(TbContext ctx, TbMsg msg) { 96 public void onMsg(TbContext ctx, TbMsg msg) {
97 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); 97 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
98 try { 98 try {
  99 + ctx.getExternalCallExecutor().executeAsync(() -> {
  100 + publish(ctx, msg, topic);
  101 + return null;
  102 + });
  103 + } catch (Exception e) {
  104 + ctx.tellFailure(msg, e);
  105 + }
  106 + }
  107 +
  108 + protected void publish(TbContext ctx, TbMsg msg, String topic) {
  109 + try {
99 if (!addMetadataKeyValuesAsKafkaHeaders) { 110 if (!addMetadataKeyValuesAsKafkaHeaders) {
  111 + //TODO: external system executor
100 producer.send(new ProducerRecord<>(topic, msg.getData()), 112 producer.send(new ProducerRecord<>(topic, msg.getData()),
101 (metadata, e) -> processRecord(ctx, msg, metadata, e)); 113 (metadata, e) -> processRecord(ctx, msg, metadata, e));
102 } else { 114 } else {
@@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode { @@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode {
105 producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), 117 producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
106 (metadata, e) -> processRecord(ctx, msg, metadata, e)); 118 (metadata, e) -> processRecord(ctx, msg, metadata, e));
107 } 119 }
108 -  
109 } catch (Exception e) { 120 } catch (Exception e) {
110 - ctx.tellFailure(msg, e); 121 + log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e);
111 } 122 }
112 } 123 }
113 124