Commit b86e377bcf799a0b6aa34d28cc4b15876a92511e

Authored by Andrew Shvayka
Committed by GitHub
2 parents 795d54b7 af5e52dd

Merge pull request #78 from thingsboard/feature/dispatchers

Separate Dispatchers for system and tenant rules/plugins
... ... @@ -32,10 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
32 32 import lombok.extern.slf4j.Slf4j;
33 33 import org.thingsboard.server.common.data.DataConstants;
34 34 import org.thingsboard.server.common.data.Device;
35   -import org.thingsboard.server.common.data.id.CustomerId;
36   -import org.thingsboard.server.common.data.id.DeviceId;
37   -import org.thingsboard.server.common.data.id.PluginId;
38   -import org.thingsboard.server.common.data.id.TenantId;
  35 +import org.thingsboard.server.common.data.id.*;
39 36 import org.thingsboard.server.common.data.kv.AttributeKey;
40 37 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
41 38 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -74,6 +71,10 @@ public final class PluginProcessingContext implements PluginContext {
74 71 this.securityCtx = Optional.ofNullable(securityCtx);
75 72 }
76 73
  74 + public void persistError(String method, Exception e) {
  75 + pluginCtx.persistError(method, e);
  76 + }
  77 +
77 78 @Override
78 79 public void sendPluginRpcMsg(RpcMsg msg) {
79 80 this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
... ...
... ... @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
20 20 import org.thingsboard.server.actors.ActorSystemContext;
21 21 import org.thingsboard.server.common.data.Device;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.EntityId;
23 24 import org.thingsboard.server.common.data.id.TenantId;
24 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
25 26 import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
... ... @@ -73,6 +74,10 @@ public final class SharedPluginProcessingContext {
73 74 return pluginId;
74 75 }
75 76
  77 + public TenantId getPluginTenantId() {
  78 + return tenantId;
  79 + }
  80 +
76 81 public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
77 82 forward(msg.getDeviceId(), msg, rpcService::tell);
78 83 }
... ... @@ -105,6 +110,10 @@ public final class SharedPluginProcessingContext {
105 110
106 111 }
107 112
  113 + public void persistError(String method, Exception e) {
  114 + systemContext.persistError(tenantId, pluginId, method, e);
  115 + }
  116 +
108 117 public ActorRef self() {
109 118 return currentActor;
110 119 }
... ...
... ... @@ -69,8 +69,10 @@ public class DefaultActorService implements ActorService {
69 69
70 70 public static final String APP_DISPATCHER_NAME = "app-dispatcher";
71 71 public static final String CORE_DISPATCHER_NAME = "core-dispatcher";
72   - public static final String RULE_DISPATCHER_NAME = "rule-dispatcher";
73   - public static final String PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
  72 + public static final String SYSTEM_RULE_DISPATCHER_NAME = "system-rule-dispatcher";
  73 + public static final String SYSTEM_PLUGIN_DISPATCHER_NAME = "system-plugin-dispatcher";
  74 + public static final String TENANT_RULE_DISPATCHER_NAME = "rule-dispatcher";
  75 + public static final String TENANT_PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
74 76 public static final String SESSION_DISPATCHER_NAME = "session-dispatcher";
75 77 public static final String RPC_DISPATCHER_NAME = "rpc-dispatcher";
76 78
... ...
... ... @@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
22 22 import org.thingsboard.server.actors.ActorSystemContext;
23 23 import org.thingsboard.server.actors.plugin.PluginActor;
24 24 import org.thingsboard.server.actors.service.ContextAwareActor;
25   -import org.thingsboard.server.actors.service.DefaultActorService;
26 25 import org.thingsboard.server.common.data.id.PluginId;
27 26 import org.thingsboard.server.common.data.id.TenantId;
28 27 import org.thingsboard.server.common.data.page.PageDataIterable;
... ... @@ -60,10 +59,12 @@ public abstract class PluginManager {
60 59
61 60 abstract TenantId getTenantId();
62 61
  62 + abstract String getDispatcherName();
  63 +
63 64 public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
64 65 return pluginActors.computeIfAbsent(pluginId, pId ->
65 66 context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
66   - .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString()));
  67 + .withDispatcher(getDispatcherName()), pId.toString()));
67 68 }
68 69
69 70 public void broadcast(Object msg) {
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.actors.shared.plugin;
17 17
18 18 import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.DefaultActorService;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
21 22 import org.thingsboard.server.common.data.plugin.PluginMetaData;
... ... @@ -37,4 +38,8 @@ public class SystemPluginManager extends PluginManager {
37 38 return BasePluginService.SYSTEM_TENANT;
38 39 }
39 40
  41 + @Override
  42 + protected String getDispatcherName() {
  43 + return DefaultActorService.SYSTEM_PLUGIN_DISPATCHER_NAME;
  44 + }
40 45 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.actors.shared.plugin;
17 17
18 18 import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.DefaultActorService;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
21 22 import org.thingsboard.server.common.data.plugin.PluginMetaData;
... ... @@ -38,4 +39,10 @@ public class TenantPluginManager extends PluginManager {
38 39 TenantId getTenantId() {
39 40 return tenantId;
40 41 }
  42 +
  43 + @Override
  44 + protected String getDispatcherName() {
  45 + return DefaultActorService.TENANT_PLUGIN_DISPATCHER_NAME;
  46 + }
  47 +
41 48 }
... ...
... ... @@ -100,10 +100,12 @@ public abstract class RuleManager {
100 100
101 101 abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
102 102
  103 + abstract String getDispatcherName();
  104 +
103 105 public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
104 106 return ruleActors.computeIfAbsent(ruleId, rId ->
105 107 context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
106   - .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString()));
  108 + .withDispatcher(getDispatcherName()), rId.toString()));
107 109 }
108 110
109 111 public RuleActorChain getRuleChain() {
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.actors.shared.rule;
17 17
18 18 import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.DefaultActorService;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
21 22 import org.thingsboard.server.common.data.rule.RuleMetaData;
... ... @@ -32,4 +33,8 @@ public class SystemRuleManager extends RuleManager {
32 33 return ruleService::findSystemRules;
33 34 }
34 35
  36 + @Override
  37 + String getDispatcherName() {
  38 + return DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME;
  39 + }
35 40 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.actors.shared.rule;
17 17
18 18 import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.DefaultActorService;
19 20 import org.thingsboard.server.common.data.id.TenantId;
20 21 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
21 22 import org.thingsboard.server.common.data.rule.RuleMetaData;
... ... @@ -31,4 +32,9 @@ public class TenantRuleManager extends RuleManager {
31 32 return link -> ruleService.findTenantRules(tenantId, link);
32 33 }
33 34
  35 + @Override
  36 + String getDispatcherName() {
  37 + return DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
  38 + }
  39 +
34 40 }
... ...
... ... @@ -92,7 +92,53 @@ core-dispatcher {
92 92 throughput = 5
93 93 }
94 94
95   -# This dispatcher is used for rule actors
  95 +# This dispatcher is used for system rule actors
  96 +system-rule-dispatcher {
  97 + type = Dispatcher
  98 + executor = "fork-join-executor"
  99 + fork-join-executor {
  100 + # Min number of threads to cap factor-based parallelism number to
  101 + parallelism-min = 2
  102 + # Max number of threads to cap factor-based parallelism number to
  103 + parallelism-max = 12
  104 +
  105 + # The parallelism factor is used to determine thread pool size using the
  106 + # following formula: ceil(available processors * factor). Resulting size
  107 + # is then bounded by the parallelism-min and parallelism-max values.
  108 + parallelism-factor = 0.25
  109 + }
  110 + # How long time the dispatcher will wait for new actors until it shuts down
  111 + shutdown-timeout = 1s
  112 +
  113 + # Throughput defines the number of messages that are processed in a batch
  114 + # before the thread is returned to the pool. Set to 1 for as fair as possible.
  115 + throughput = 5
  116 +}
  117 +
  118 +# This dispatcher is used for system plugin actors
  119 +system-plugin-dispatcher {
  120 + type = Dispatcher
  121 + executor = "fork-join-executor"
  122 + fork-join-executor {
  123 + # Min number of threads to cap factor-based parallelism number to
  124 + parallelism-min = 2
  125 + # Max number of threads to cap factor-based parallelism number to
  126 + parallelism-max = 12
  127 +
  128 + # The parallelism factor is used to determine thread pool size using the
  129 + # following formula: ceil(available processors * factor). Resulting size
  130 + # is then bounded by the parallelism-min and parallelism-max values.
  131 + parallelism-factor = 0.25
  132 + }
  133 + # How long time the dispatcher will wait for new actors until it shuts down
  134 + shutdown-timeout = 1s
  135 +
  136 + # Throughput defines the number of messages that are processed in a batch
  137 + # before the thread is returned to the pool. Set to 1 for as fair as possible.
  138 + throughput = 5
  139 +}
  140 +
  141 +# This dispatcher is used for tenant rule actors
96 142 rule-dispatcher {
97 143 type = Dispatcher
98 144 executor = "fork-join-executor"
... ... @@ -115,7 +161,7 @@ rule-dispatcher {
115 161 throughput = 5
116 162 }
117 163
118   -# This dispatcher is used for rule actors
  164 +# This dispatcher is used for tenant plugin actors
119 165 plugin-dispatcher {
120 166 type = Dispatcher
121 167 executor = "fork-join-executor"
... ...
... ... @@ -16,10 +16,7 @@
16 16 package org.thingsboard.server.extensions.api.plugins;
17 17
18 18 import org.thingsboard.server.common.data.Device;
19   -import org.thingsboard.server.common.data.id.CustomerId;
20   -import org.thingsboard.server.common.data.id.DeviceId;
21   -import org.thingsboard.server.common.data.id.PluginId;
22   -import org.thingsboard.server.common.data.id.TenantId;
  19 +import org.thingsboard.server.common.data.id.*;
23 20 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
24 21 import org.thingsboard.server.common.data.kv.TsKvEntry;
25 22 import org.thingsboard.server.common.data.kv.TsKvQuery;
... ... @@ -46,6 +43,8 @@ public interface PluginContext {
46 43
47 44 Optional<PluginApiCallSecurityContext> getSecurityCtx();
48 45
  46 + void persistError(String method, Exception e);
  47 +
49 48 /*
50 49 Device RPC API
51 50 */
... ...
... ... @@ -33,6 +33,9 @@ import org.thingsboard.server.extensions.core.action.mail.SendMailActionMsg;
33 33 import javax.mail.MessagingException;
34 34 import javax.mail.internet.MimeMessage;
35 35 import java.util.Properties;
  36 +import java.util.concurrent.Executor;
  37 +import java.util.concurrent.ExecutorService;
  38 +import java.util.concurrent.Executors;
36 39
37 40 /**
38 41 * @author Andrew Shvayka
... ... @@ -41,6 +44,9 @@ import java.util.Properties;
41 44 @Slf4j
42 45 public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implements RuleMsgHandler {
43 46
  47 + //TODO: Add logic to close this executor on shutdown.
  48 + private static final ExecutorService executor = Executors.newSingleThreadExecutor();
  49 +
44 50 private MailPluginConfiguration configuration;
45 51 private JavaMailSenderImpl mailSender;
46 52
... ... @@ -84,12 +90,14 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
84 90 @Override
85 91 public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
86 92 if (msg.getPayload() instanceof SendMailActionMsg) {
87   - try {
88   - sendMail((SendMailActionMsg) msg.getPayload());
89   - } catch (Exception e) {
90   - log.warn("Failed to send email", e);
91   - throw new RuleException("Failed to send email", e);
92   - }
  93 + executor.submit(() -> {
  94 + try {
  95 + sendMail((SendMailActionMsg) msg.getPayload());
  96 + } catch (Exception e) {
  97 + log.warn("[{}] Failed to send email", ctx.getPluginId(), e);
  98 + ctx.persistError("Failed to send email", e);
  99 + }
  100 + });
93 101 } else {
94 102 throw new RuntimeException("Not supported msg type: " + msg.getPayload().getClass() + "!");
95 103 }
... ...