Commit 8f54fc398951310e5be3a0f1183ce388ae1189a7

Authored by Igor Kulikov
2 parents d1fc5d07 cc4f746b

Merge branch 'develop/2.5.2'

Showing 20 changed files with 192 additions and 83 deletions
@@ -514,6 +514,9 @@ public class ActorSystemContext { @@ -514,6 +514,9 @@ public class ActorSystemContext {
514 appActor.tell(tbActorMsg); 514 appActor.tell(tbActorMsg);
515 } 515 }
516 516
  517 + public void tellWithHighPriority(TbActorMsg tbActorMsg) {
  518 + appActor.tellWithHighPriority(tbActorMsg);
  519 + }
517 520
518 public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { 521 public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) {
519 log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); 522 log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
@@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor { @@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor {
82 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); 82 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
83 break; 83 break;
84 case TRANSPORT_TO_DEVICE_ACTOR_MSG: 84 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
  85 + onToDeviceActorMsg((TenantAwareMsg) msg, false);
  86 + break;
85 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: 87 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
86 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: 88 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
87 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 89 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
88 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 90 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
89 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 91 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
90 - onToDeviceActorMsg((TenantAwareMsg) msg); 92 + onToDeviceActorMsg((TenantAwareMsg) msg, true);
91 break; 93 break;
92 default: 94 default:
93 return false; 95 return false;
@@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor { @@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor {
155 } 157 }
156 } 158 }
157 if (target != null) { 159 if (target != null) {
158 - target.tell(msg); 160 + target.tellWithHighPriority(msg);
159 } else { 161 } else {
160 log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg); 162 log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
161 } 163 }
162 } 164 }
163 165
164 - private void onToDeviceActorMsg(TenantAwareMsg msg) { 166 + private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
165 if (!deletedTenants.contains(msg.getTenantId())) { 167 if (!deletedTenants.contains(msg.getTenantId())) {
166 - getOrCreateTenantActor(msg.getTenantId()).tell(msg); 168 + TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
  169 + if (priority) {
  170 + tenantActor.tellWithHighPriority(msg);
  171 + } else {
  172 + tenantActor.tell(msg);
  173 + }
167 } else { 174 } else {
168 if (msg instanceof TransportToDeviceActorMsgWrapper) { 175 if (msg instanceof TransportToDeviceActorMsgWrapper) {
169 ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); 176 ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
@@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
128 } else { 128 } else {
129 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode); 129 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
130 existing.setSelf(ruleNode); 130 existing.setSelf(ruleNode);
131 - existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED)); 131 + existing.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
132 } 132 }
133 } 133 }
134 134
@@ -137,7 +137,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -137,7 +137,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
137 removedRules.forEach(ruleNodeId -> { 137 removedRules.forEach(ruleNodeId -> {
138 log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); 138 log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
139 RuleNodeCtx removed = nodeActors.remove(ruleNodeId); 139 RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
140 - removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); 140 + removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
141 }); 141 });
142 142
143 initRoutes(ruleChain, ruleNodeList); 143 initRoutes(ruleChain, ruleNodeList);
@@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
155 155
156 @Override 156 @Override
157 public void onPartitionChangeMsg(PartitionChangeMsg msg) { 157 public void onPartitionChangeMsg(PartitionChangeMsg msg) {
158 - nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tell(msg)); 158 + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
159 } 159 }
160 160
161 private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) { 161 private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
@@ -83,7 +83,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP @@ -83,7 +83,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
83 public void destroy() { 83 public void destroy() {
84 try { 84 try {
85 log.debug("[{}][{}][{}] Stopping processor.", tenantId, id, id.getEntityType()); 85 log.debug("[{}][{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
86 - processor.stop(ctx); 86 + if (processor != null) {
  87 + processor.stop(ctx);
  88 + }
87 logLifecycleEvent(ComponentLifecycleEvent.STOPPED); 89 logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
88 } catch (Exception e) { 90 } catch (Exception e) {
89 log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage()); 91 log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
@@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
21 import org.springframework.boot.context.event.ApplicationReadyEvent; 21 import org.springframework.boot.context.event.ApplicationReadyEvent;
22 import org.springframework.context.event.EventListener; 22 import org.springframework.context.event.EventListener;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
24 import org.thingsboard.server.actors.ActorSystemContext; 25 import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.actors.DefaultTbActorSystem; 26 import org.thingsboard.server.actors.DefaultTbActorSystem;
26 import org.thingsboard.server.actors.TbActorId; 27 import org.thingsboard.server.actors.TbActorId;
@@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService { @@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService {
83 TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); 84 TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
84 system = new DefaultTbActorSystem(settings); 85 system = new DefaultTbActorSystem(settings);
85 86
86 - system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize));  
87 - system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize));  
88 - system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize));  
89 - system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize)); 87 + system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
  88 + system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
  89 + system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
  90 + system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
90 91
91 actorContext.setActorSystem(system); 92 actorContext.setActorSystem(system);
92 93
@@ -99,24 +100,28 @@ public class DefaultActorService implements ActorService { @@ -99,24 +100,28 @@ public class DefaultActorService implements ActorService {
99 log.info("Actor system initialized."); 100 log.info("Actor system initialized.");
100 } 101 }
101 102
102 - private ExecutorService initDispatcherExecutor(int poolSize) { 103 + private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
103 if (poolSize == 0) { 104 if (poolSize == 0) {
104 int cores = Runtime.getRuntime().availableProcessors(); 105 int cores = Runtime.getRuntime().availableProcessors();
105 poolSize = Math.max(1, cores / 2); 106 poolSize = Math.max(1, cores / 2);
106 } 107 }
107 - return Executors.newWorkStealingPool(poolSize); 108 + if (poolSize == 1) {
  109 + return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName));
  110 + } else {
  111 + return Executors.newWorkStealingPool(poolSize);
  112 + }
108 } 113 }
109 114
110 @EventListener(ApplicationReadyEvent.class) 115 @EventListener(ApplicationReadyEvent.class)
111 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { 116 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
112 log.info("Received application ready event. Sending application init message to actor system"); 117 log.info("Received application ready event. Sending application init message to actor system");
113 - appActor.tell(new AppInitMsg()); 118 + appActor.tellWithHighPriority(new AppInitMsg());
114 } 119 }
115 120
116 @EventListener(PartitionChangeEvent.class) 121 @EventListener(PartitionChangeEvent.class)
117 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { 122 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
118 log.info("Received partition change event."); 123 log.info("Received partition change event.");
119 - this.appActor.tell(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions())); 124 + this.appActor.tellWithHighPriority(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
120 } 125 }
121 126
122 @PreDestroy 127 @PreDestroy
@@ -135,12 +135,14 @@ public class TenantActor extends RuleChainManagerActor { @@ -135,12 +135,14 @@ public class TenantActor extends RuleChainManagerActor {
135 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); 135 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
136 break; 136 break;
137 case TRANSPORT_TO_DEVICE_ACTOR_MSG: 137 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
  138 + onToDeviceActorMsg((DeviceAwareMsg) msg, false);
  139 + break;
138 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: 140 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
139 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: 141 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
140 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 142 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
141 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 143 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
142 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 144 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
143 - onToDeviceActorMsg((DeviceAwareMsg) msg); 145 + onToDeviceActorMsg((DeviceAwareMsg) msg, true);
144 break; 146 break;
145 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 147 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
146 onRuleChainMsg((RuleChainAwareMsg) msg); 148 onRuleChainMsg((RuleChainAwareMsg) msg);
@@ -183,11 +185,16 @@ public class TenantActor extends RuleChainManagerActor { @@ -183,11 +185,16 @@ public class TenantActor extends RuleChainManagerActor {
183 getOrCreateActor(msg.getRuleChainId()).tell(msg); 185 getOrCreateActor(msg.getRuleChainId()).tell(msg);
184 } 186 }
185 187
186 - private void onToDeviceActorMsg(DeviceAwareMsg msg) { 188 + private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
187 if (!isCore) { 189 if (!isCore) {
188 log.warn("RECEIVED INVALID MESSAGE: {}", msg); 190 log.warn("RECEIVED INVALID MESSAGE: {}", msg);
189 } 191 }
190 - getOrCreateDeviceActor(msg.getDeviceId()).tell(msg); 192 + TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
  193 + if (priority) {
  194 + deviceActor.tellWithHighPriority(msg);
  195 + } else {
  196 + deviceActor.tell(msg);
  197 + }
191 } 198 }
192 199
193 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { 200 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
@@ -199,7 +206,7 @@ public class TenantActor extends RuleChainManagerActor { @@ -199,7 +206,7 @@ public class TenantActor extends RuleChainManagerActor {
199 findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId())); 206 findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
200 visit(ruleChain, target); 207 visit(ruleChain, target);
201 } 208 }
202 - target.tell(msg); 209 + target.tellWithHighPriority(msg);
203 } else { 210 } else {
204 log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); 211 log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
205 } 212 }
@@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
207 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); 207 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
208 if (actorMsg.isPresent()) { 208 if (actorMsg.isPresent()) {
209 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); 209 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
210 - actorContext.tell(actorMsg.get()); 210 + actorContext.tellWithHighPriority(actorMsg.get());
211 } 211 }
212 callback.onSuccess(); 212 callback.onSuccess();
213 } 213 }
@@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
230 Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); 230 Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
231 if (actorMsg.isPresent()) { 231 if (actorMsg.isPresent()) {
232 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); 232 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
233 - actorContext.tell(actorMsg.get()); 233 + actorContext.tellWithHighPriority(actorMsg.get());
234 } 234 }
235 callback.onSuccess(); 235 callback.onSuccess();
236 } else if (nfMsg.hasFromDeviceRpcResponse()) { 236 } else if (nfMsg.hasFromDeviceRpcResponse()) {
@@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
121 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); 121 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
122 UUID requestId = request.getId(); 122 UUID requestId = request.getId();
123 localToDeviceRpcRequests.put(requestId, rpcMsg); 123 localToDeviceRpcRequests.put(requestId, rpcMsg);
124 - actorContext.tell(rpcMsg); 124 + actorContext.tellWithHighPriority(rpcMsg);
125 scheduleToDeviceTimeout(request, requestId); 125 scheduleToDeviceTimeout(request, requestId);
126 } 126 }
127 127
@@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
175 } 175 }
176 176
177 private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) { 177 private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) {
178 - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 178 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
179 log.trace("[{}] processing to rule engine request.", requestId); 179 log.trace("[{}] processing to rule engine request.", requestId);
180 scheduler.schedule(() -> { 180 scheduler.schedule(() -> {
181 log.trace("[{}] timeout for processing to rule engine request.", requestId); 181 log.trace("[{}] timeout for processing to rule engine request.", requestId);
@@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
187 } 187 }
188 188
189 private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) { 189 private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) {
190 - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 190 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
191 log.trace("[{}] processing to device request.", requestId); 191 log.trace("[{}] processing to device request.", requestId);
192 scheduler.schedule(() -> { 192 scheduler.schedule(() -> {
193 log.trace("[{}] timeout for to device request.", requestId); 193 log.trace("[{}] timeout for to device request.", requestId);
@@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
164 } 164 }
165 165
166 private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) { 166 private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) {
167 - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); 167 + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
168 log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); 168 log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
169 scheduler.schedule(() -> { 169 scheduler.schedule(() -> {
170 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); 170 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
@@ -26,7 +26,9 @@ import java.util.Arrays; @@ -26,7 +26,9 @@ import java.util.Arrays;
26 26
27 @RunWith(ClasspathSuite.class) 27 @RunWith(ClasspathSuite.class)
28 @ClasspathSuite.ClassnameFilters({ 28 @ClasspathSuite.ClassnameFilters({
29 - "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"}) 29 + "org.thingsboard.server.mqtt.rpc.sql.*Test",
  30 + "org.thingsboard.server.mqtt.telemetry.sql.*Test"
  31 +})
30 public class MqttSqlTestSuite { 32 public class MqttSqlTestSuite {
31 33
32 @ClassRule 34 @ClassRule
@@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
136 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; 136 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}";
137 String deviceId = savedDevice.getId().getId().toString(); 137 String deviceId = savedDevice.getId().getId().toString();
138 138
139 - doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), 139 + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409),
140 asyncContextTimeoutToUseRpcPlugin); 140 asyncContextTimeoutToUseRpcPlugin);
141 } 141 }
142 142
@@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
193 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}"; 193 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}";
194 String deviceId = savedDevice.getId().getId().toString(); 194 String deviceId = savedDevice.getId().getId().toString();
195 195
196 - doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), 196 + doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(409),
197 asyncContextTimeoutToUseRpcPlugin); 197 asyncContextTimeoutToUseRpcPlugin);
198 } 198 }
199 199
@@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem { @@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem {
132 } 132 }
133 133
134 @Override 134 @Override
135 - public void tell(TbActorRef target, TbActorMsg actorMsg) {  
136 - target.tell(actorMsg); 135 + public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) {
  136 + tell(target, actorMsg, true);
137 } 137 }
138 138
139 @Override 139 @Override
140 public void tell(TbActorId target, TbActorMsg actorMsg) { 140 public void tell(TbActorId target, TbActorMsg actorMsg) {
  141 + tell(target, actorMsg, false);
  142 + }
  143 +
  144 + private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) {
141 TbActorMailbox mailbox = actors.get(target); 145 TbActorMailbox mailbox = actors.get(target);
142 if (mailbox == null) { 146 if (mailbox == null) {
143 throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!"); 147 throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!");
144 } 148 }
145 - mailbox.enqueue(actorMsg); 149 + if (highPriority) {
  150 + mailbox.tellWithHighPriority(actorMsg);
  151 + } else {
  152 + mailbox.tell(actorMsg);
  153 + }
146 } 154 }
147 155
  156 +
148 @Override 157 @Override
149 public void broadcastToChildren(TbActorId parent, TbActorMsg msg) { 158 public void broadcastToChildren(TbActorId parent, TbActorMsg msg) {
150 broadcastToChildren(parent, id -> true, msg); 159 broadcastToChildren(parent, id -> true, msg);
@@ -29,6 +29,9 @@ import java.util.function.Supplier; @@ -29,6 +29,9 @@ import java.util.function.Supplier;
29 @Slf4j 29 @Slf4j
30 @Data 30 @Data
31 public final class TbActorMailbox implements TbActorCtx { 31 public final class TbActorMailbox implements TbActorCtx {
  32 + private static final boolean HIGH_PRIORITY = true;
  33 + private static final boolean NORMAL_PRIORITY = false;
  34 +
32 private static final boolean FREE = false; 35 private static final boolean FREE = false;
33 private static final boolean BUSY = true; 36 private static final boolean BUSY = true;
34 37
@@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx { @@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx {
41 private final TbActorRef parentRef; 44 private final TbActorRef parentRef;
42 private final TbActor actor; 45 private final TbActor actor;
43 private final Dispatcher dispatcher; 46 private final Dispatcher dispatcher;
44 - private final ConcurrentLinkedQueue<TbActorMsg> msgs = new ConcurrentLinkedQueue<>(); 47 + private final ConcurrentLinkedQueue<TbActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>();
  48 + private final ConcurrentLinkedQueue<TbActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>();
45 private final AtomicBoolean busy = new AtomicBoolean(FREE); 49 private final AtomicBoolean busy = new AtomicBoolean(FREE);
46 private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); 50 private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
47 private final AtomicBoolean destroyInProgress = new AtomicBoolean(); 51 private final AtomicBoolean destroyInProgress = new AtomicBoolean();
@@ -50,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx { @@ -50,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx {
50 dispatcher.getExecutor().execute(() -> tryInit(1)); 54 dispatcher.getExecutor().execute(() -> tryInit(1));
51 } 55 }
52 56
53 -  
54 private void tryInit(int attempt) { 57 private void tryInit(int attempt) {
55 try { 58 try {
56 log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); 59 log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt);
@@ -78,23 +81,38 @@ public final class TbActorMailbox implements TbActorCtx { @@ -78,23 +81,38 @@ public final class TbActorMailbox implements TbActorCtx {
78 } 81 }
79 } 82 }
80 83
81 - public void enqueue(TbActorMsg msg) {  
82 - msgs.add(msg); 84 + private void enqueue(TbActorMsg msg, boolean highPriority) {
  85 + if (highPriority) {
  86 + highPriorityMsgs.add(msg);
  87 + } else {
  88 + normalPriorityMsgs.add(msg);
  89 + }
83 tryProcessQueue(true); 90 tryProcessQueue(true);
84 } 91 }
85 92
86 private void tryProcessQueue(boolean newMsg) { 93 private void tryProcessQueue(boolean newMsg) {
87 - if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) {  
88 - dispatcher.getExecutor().execute(this::processMailbox); 94 + if (ready.get() == READY) {
  95 + if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
  96 + if (busy.compareAndSet(FREE, BUSY)) {
  97 + dispatcher.getExecutor().execute(this::processMailbox);
  98 + } else {
  99 + log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
  100 + }
  101 + } else {
  102 + log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
  103 + }
89 } else { 104 } else {
90 - log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); 105 + log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
91 } 106 }
92 } 107 }
93 108
94 private void processMailbox() { 109 private void processMailbox() {
95 boolean noMoreElements = false; 110 boolean noMoreElements = false;
96 for (int i = 0; i < settings.getActorThroughput(); i++) { 111 for (int i = 0; i < settings.getActorThroughput(); i++) {
97 - TbActorMsg msg = msgs.poll(); 112 + TbActorMsg msg = highPriorityMsgs.poll();
  113 + if (msg == null) {
  114 + msg = normalPriorityMsgs.poll();
  115 + }
98 if (msg != null) { 116 if (msg != null) {
99 try { 117 try {
100 log.debug("[{}] Going to process message: {}", selfId, msg); 118 log.debug("[{}] Going to process message: {}", selfId, msg);
@@ -178,6 +196,12 @@ public final class TbActorMailbox implements TbActorCtx { @@ -178,6 +196,12 @@ public final class TbActorMailbox implements TbActorCtx {
178 196
179 @Override 197 @Override
180 public void tell(TbActorMsg actorMsg) { 198 public void tell(TbActorMsg actorMsg) {
181 - enqueue(actorMsg); 199 + enqueue(actorMsg, NORMAL_PRIORITY);
  200 + }
  201 +
  202 + @Override
  203 + public void tellWithHighPriority(TbActorMsg actorMsg) {
  204 + enqueue(actorMsg, HIGH_PRIORITY);
182 } 205 }
  206 +
183 } 207 }
@@ -23,4 +23,6 @@ public interface TbActorRef { @@ -23,4 +23,6 @@ public interface TbActorRef {
23 23
24 void tell(TbActorMsg actorMsg); 24 void tell(TbActorMsg actorMsg);
25 25
  26 + void tellWithHighPriority(TbActorMsg actorMsg);
  27 +
26 } 28 }
@@ -36,10 +36,10 @@ public interface TbActorSystem { @@ -36,10 +36,10 @@ public interface TbActorSystem {
36 36
37 TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); 37 TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent);
38 38
39 - void tell(TbActorRef target, TbActorMsg actorMsg);  
40 -  
41 void tell(TbActorId target, TbActorMsg actorMsg); 39 void tell(TbActorId target, TbActorMsg actorMsg);
42 40
  41 + void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg);
  42 +
43 void stop(TbActorRef actorRef); 43 void stop(TbActorRef actorRef);
44 44
45 void stop(TbActorId actorId); 45 void stop(TbActorId actorId);
@@ -40,19 +40,19 @@ import java.util.concurrent.atomic.AtomicLong; @@ -40,19 +40,19 @@ import java.util.concurrent.atomic.AtomicLong;
40 public class ActorSystemTest { 40 public class ActorSystemTest {
41 41
42 public static final String ROOT_DISPATCHER = "root-dispatcher"; 42 public static final String ROOT_DISPATCHER = "root-dispatcher";
43 - private static final int _1M = 1024 * 1024; 43 + private static final int _100K = 100 * 1024;
44 44
45 - private TbActorSystem actorSystem;  
46 - private ExecutorService submitPool; 45 + private volatile TbActorSystem actorSystem;
  46 + private volatile ExecutorService submitPool;
  47 + private int parallelism;
47 48
48 @Before 49 @Before
49 public void initActorSystem() { 50 public void initActorSystem() {
50 int cores = Runtime.getRuntime().availableProcessors(); 51 int cores = Runtime.getRuntime().availableProcessors();
51 - int parallelism = Math.max(1, cores / 2); 52 + parallelism = Math.max(2, cores / 2);
52 TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); 53 TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
53 actorSystem = new DefaultTbActorSystem(settings); 54 actorSystem = new DefaultTbActorSystem(settings);
54 submitPool = Executors.newWorkStealingPool(parallelism); 55 submitPool = Executors.newWorkStealingPool(parallelism);
55 - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));  
56 } 56 }
57 57
58 @After 58 @After
@@ -62,22 +62,44 @@ public class ActorSystemTest { @@ -62,22 +62,44 @@ public class ActorSystemTest {
62 } 62 }
63 63
64 @Test 64 @Test
65 - public void test10actorsAnd1MMessages() throws InterruptedException {  
66 - testActorsAndMessages(10, _1M); 65 + public void test1actorsAnd100KMessages() throws InterruptedException {
  66 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  67 + testActorsAndMessages(1, _100K, 1);
  68 + }
  69 +
  70 + @Test
  71 + public void test10actorsAnd100KMessages() throws InterruptedException {
  72 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  73 + testActorsAndMessages(10, _100K, 1);
  74 + }
  75 +
  76 + @Test
  77 + public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException {
  78 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor());
  79 + testActorsAndMessages(_100K, 1, 5);
  80 + }
  81 +
  82 + @Test
  83 + public void test100KActorsAnd1Messages5times() throws InterruptedException {
  84 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  85 + testActorsAndMessages(_100K, 1, 5);
67 } 86 }
68 87
69 @Test 88 @Test
70 - public void test1MActorsAnd10Messages() throws InterruptedException {  
71 - testActorsAndMessages(_1M, 10); 89 + public void test100KActorsAnd10Messages() throws InterruptedException {
  90 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  91 + testActorsAndMessages(_100K, 10, 1);
72 } 92 }
73 93
74 @Test 94 @Test
75 public void test1KActorsAnd1KMessages() throws InterruptedException { 95 public void test1KActorsAnd1KMessages() throws InterruptedException {
76 - testActorsAndMessages(1000, 1000); 96 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
  97 + testActorsAndMessages(1000, 1000, 10);
77 } 98 }
78 99
79 @Test 100 @Test
80 public void testNoMessagesAfterDestroy() throws InterruptedException { 101 public void testNoMessagesAfterDestroy() throws InterruptedException {
  102 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
81 ActorTestCtx testCtx1 = getActorTestCtx(1); 103 ActorTestCtx testCtx1 = getActorTestCtx(1);
82 ActorTestCtx testCtx2 = getActorTestCtx(1); 104 ActorTestCtx testCtx2 = getActorTestCtx(1);
83 105
@@ -86,16 +108,17 @@ public class ActorSystemTest { @@ -86,16 +108,17 @@ public class ActorSystemTest {
86 TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( 108 TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator(
87 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2)); 109 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2));
88 110
89 - actorSystem.tell(actorId1, new IntTbActorMsg(42));  
90 - actorSystem.tell(actorId2, new IntTbActorMsg(42)); 111 + actorId1.tell(new IntTbActorMsg(42));
  112 + actorId2.tell(new IntTbActorMsg(42));
91 actorSystem.stop(actorId1); 113 actorSystem.stop(actorId1);
92 114
93 Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); 115 Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
94 - Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); 116 + Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
95 } 117 }
96 118
97 @Test 119 @Test
98 public void testOneActorCreated() throws InterruptedException { 120 public void testOneActorCreated() throws InterruptedException {
  121 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
99 ActorTestCtx testCtx1 = getActorTestCtx(1); 122 ActorTestCtx testCtx1 = getActorTestCtx(1);
100 ActorTestCtx testCtx2 = getActorTestCtx(1); 123 ActorTestCtx testCtx2 = getActorTestCtx(1);
101 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); 124 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
@@ -105,15 +128,16 @@ public class ActorSystemTest { @@ -105,15 +128,16 @@ public class ActorSystemTest {
105 Thread.sleep(1000); 128 Thread.sleep(1000);
106 actorSystem.tell(actorId, new IntTbActorMsg(42)); 129 actorSystem.tell(actorId, new IntTbActorMsg(42));
107 130
108 - Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS));  
109 - Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); 131 + Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
  132 + Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
110 } 133 }
111 134
112 @Test 135 @Test
113 public void testActorCreatorCalledOnce() throws InterruptedException { 136 public void testActorCreatorCalledOnce() throws InterruptedException {
  137 + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
114 ActorTestCtx testCtx = getActorTestCtx(1); 138 ActorTestCtx testCtx = getActorTestCtx(1);
115 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); 139 TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
116 - for(int i =0; i < 1000; i++) { 140 + for (int i = 0; i < 1000; i++) {
117 submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); 141 submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx)));
118 } 142 }
119 Thread.sleep(1000); 143 Thread.sleep(1000);
@@ -125,7 +149,7 @@ public class ActorSystemTest { @@ -125,7 +149,7 @@ public class ActorSystemTest {
125 } 149 }
126 150
127 151
128 - public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException { 152 + public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException {
129 Random random = new Random(); 153 Random random = new Random();
130 int[] randomIntegers = new int[msgNumber]; 154 int[] randomIntegers = new int[msgNumber];
131 long sumTmp = 0; 155 long sumTmp = 0;
@@ -141,32 +165,35 @@ public class ActorSystemTest { @@ -141,32 +165,35 @@ public class ActorSystemTest {
141 List<TbActorRef> actorRefs = new ArrayList<>(); 165 List<TbActorRef> actorRefs = new ArrayList<>();
142 for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { 166 for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) {
143 ActorTestCtx testCtx = getActorTestCtx(msgNumber); 167 ActorTestCtx testCtx = getActorTestCtx(msgNumber);
144 -  
145 actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( 168 actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator(
146 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx))); 169 new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx)));
147 testCtxes.add(testCtx); 170 testCtxes.add(testCtx);
148 } 171 }
149 172
150 - long start = System.nanoTime();  
151 -  
152 - for (int i = 0; i < msgNumber; i++) {  
153 - int tmp = randomIntegers[i];  
154 - submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp))));  
155 - }  
156 - log.info("Submitted all messages");  
157 -  
158 - testCtxes.forEach(ctx -> {  
159 - try {  
160 - Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES));  
161 - Assert.assertEquals(expected, ctx.getActual().get());  
162 - Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());  
163 - } catch (InterruptedException e) {  
164 - e.printStackTrace(); 173 + for (int t = 0; t < times; t++) {
  174 + long start = System.nanoTime();
  175 + for (int i = 0; i < msgNumber; i++) {
  176 + int tmp = randomIntegers[i];
  177 + submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp))));
165 } 178 }
166 - });  
167 -  
168 - long duration = System.nanoTime() - start;  
169 - log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); 179 + log.info("Submitted all messages");
  180 + testCtxes.forEach(ctx -> {
  181 + try {
  182 + boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES);
  183 + if (!success) {
  184 + log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
  185 + }
  186 + Assert.assertTrue(success);
  187 + Assert.assertEquals(expected, ctx.getActual().get());
  188 + Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());
  189 + ctx.clear();
  190 + } catch (InterruptedException e) {
  191 + e.printStackTrace();
  192 + }
  193 + });
  194 + long duration = System.nanoTime() - start;
  195 + log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration));
  196 + }
170 } 197 }
171 198
172 private ActorTestCtx getActorTestCtx(int i) { 199 private ActorTestCtx getActorTestCtx(int i) {
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.actors; 16 package org.thingsboard.server.actors;
17 17
  18 +import lombok.AllArgsConstructor;
18 import lombok.Data; 19 import lombok.Data;
19 20
20 import java.util.concurrent.CountDownLatch; 21 import java.util.concurrent.CountDownLatch;
@@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicLong; 23 import java.util.concurrent.atomic.AtomicLong;
23 24
24 @Data 25 @Data
  26 +@AllArgsConstructor
25 public class ActorTestCtx { 27 public class ActorTestCtx {
26 28
27 - private final CountDownLatch latch; 29 + private volatile CountDownLatch latch;
28 private final AtomicInteger invocationCount; 30 private final AtomicInteger invocationCount;
29 private final int expectedInvocationCount; 31 private final int expectedInvocationCount;
30 private final AtomicLong actual; 32 private final AtomicLong actual;
  33 +
  34 + public void clear() {
  35 + latch = new CountDownLatch(1);
  36 + invocationCount.set(0);
  37 + actual.set(0L);
  38 + }
31 } 39 }
@@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor { @@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor {
51 if (count == testCtx.getExpectedInvocationCount()) { 51 if (count == testCtx.getExpectedInvocationCount()) {
52 testCtx.getActual().set(sum); 52 testCtx.getActual().set(sum);
53 testCtx.getInvocationCount().addAndGet(count); 53 testCtx.getInvocationCount().addAndGet(count);
  54 + sum = 0;
  55 + count = 0;
54 testCtx.getLatch().countDown(); 56 testCtx.getLatch().countDown();
55 } 57 }
56 } 58 }
@@ -96,7 +96,19 @@ public class TbKafkaNode implements TbNode { @@ -96,7 +96,19 @@ public class TbKafkaNode implements TbNode {
96 public void onMsg(TbContext ctx, TbMsg msg) { 96 public void onMsg(TbContext ctx, TbMsg msg) {
97 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); 97 String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
98 try { 98 try {
  99 + ctx.getExternalCallExecutor().executeAsync(() -> {
  100 + publish(ctx, msg, topic);
  101 + return null;
  102 + });
  103 + } catch (Exception e) {
  104 + ctx.tellFailure(msg, e);
  105 + }
  106 + }
  107 +
  108 + protected void publish(TbContext ctx, TbMsg msg, String topic) {
  109 + try {
99 if (!addMetadataKeyValuesAsKafkaHeaders) { 110 if (!addMetadataKeyValuesAsKafkaHeaders) {
  111 + //TODO: external system executor
100 producer.send(new ProducerRecord<>(topic, msg.getData()), 112 producer.send(new ProducerRecord<>(topic, msg.getData()),
101 (metadata, e) -> processRecord(ctx, msg, metadata, e)); 113 (metadata, e) -> processRecord(ctx, msg, metadata, e));
102 } else { 114 } else {
@@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode { @@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode {
105 producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), 117 producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
106 (metadata, e) -> processRecord(ctx, msg, metadata, e)); 118 (metadata, e) -> processRecord(ctx, msg, metadata, e));
107 } 119 }
108 -  
109 } catch (Exception e) { 120 } catch (Exception e) {
110 - ctx.tellFailure(msg, e); 121 + log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e);
111 } 122 }
112 } 123 }
113 124