Showing
6 changed files
with
24 additions
and
37 deletions
@@ -16,7 +16,6 @@ | @@ -16,7 +16,6 @@ | ||
16 | package org.thingsboard.server.actors.rule; | 16 | package org.thingsboard.server.actors.rule; |
17 | 17 | ||
18 | import java.util.ArrayList; | 18 | import java.util.ArrayList; |
19 | -import java.util.Collections; | ||
20 | import java.util.List; | 19 | import java.util.List; |
21 | import java.util.Set; | 20 | import java.util.Set; |
22 | 21 | ||
@@ -26,7 +25,7 @@ public class SimpleRuleActorChain implements RuleActorChain { | @@ -26,7 +25,7 @@ public class SimpleRuleActorChain implements RuleActorChain { | ||
26 | 25 | ||
27 | public SimpleRuleActorChain(Set<RuleActorMetaData> ruleSet) { | 26 | public SimpleRuleActorChain(Set<RuleActorMetaData> ruleSet) { |
28 | rules = new ArrayList<>(ruleSet); | 27 | rules = new ArrayList<>(ruleSet); |
29 | - Collections.sort(rules, RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR); | 28 | + rules.sort(RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR); |
30 | } | 29 | } |
31 | 30 | ||
32 | public int size() { | 31 | public int size() { |
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -19,7 +19,6 @@ import akka.actor.ActorContext; | @@ -19,7 +19,6 @@ import akka.actor.ActorContext; | ||
19 | import akka.actor.ActorRef; | 19 | import akka.actor.ActorRef; |
20 | import akka.actor.Scheduler; | 20 | import akka.actor.Scheduler; |
21 | import akka.event.LoggingAdapter; | 21 | import akka.event.LoggingAdapter; |
22 | -import com.fasterxml.jackson.core.JsonProcessingException; | ||
23 | import com.fasterxml.jackson.databind.JsonNode; | 22 | import com.fasterxml.jackson.databind.JsonNode; |
24 | import com.fasterxml.jackson.databind.ObjectMapper; | 23 | import com.fasterxml.jackson.databind.ObjectMapper; |
25 | import lombok.AllArgsConstructor; | 24 | import lombok.AllArgsConstructor; |
@@ -15,9 +15,9 @@ | @@ -15,9 +15,9 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.actors.shared.plugin; | 16 | package org.thingsboard.server.actors.shared.plugin; |
17 | 17 | ||
18 | -import java.util.HashMap; | ||
19 | -import java.util.Map; | ||
20 | - | 18 | +import akka.actor.ActorContext; |
19 | +import akka.actor.ActorRef; | ||
20 | +import akka.actor.Props; | ||
21 | import lombok.extern.slf4j.Slf4j; | 21 | import lombok.extern.slf4j.Slf4j; |
22 | import org.thingsboard.server.actors.ActorSystemContext; | 22 | import org.thingsboard.server.actors.ActorSystemContext; |
23 | import org.thingsboard.server.actors.plugin.PluginActor; | 23 | import org.thingsboard.server.actors.plugin.PluginActor; |
@@ -29,12 +29,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable; | @@ -29,12 +29,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable; | ||
29 | import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction; | 29 | import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction; |
30 | import org.thingsboard.server.common.data.plugin.PluginMetaData; | 30 | import org.thingsboard.server.common.data.plugin.PluginMetaData; |
31 | import org.thingsboard.server.dao.plugin.PluginService; | 31 | import org.thingsboard.server.dao.plugin.PluginService; |
32 | -import org.slf4j.Logger; | ||
33 | -import org.slf4j.LoggerFactory; | ||
34 | 32 | ||
35 | -import akka.actor.ActorContext; | ||
36 | -import akka.actor.ActorRef; | ||
37 | -import akka.actor.Props; | 33 | +import java.util.HashMap; |
34 | +import java.util.Map; | ||
38 | 35 | ||
39 | @Slf4j | 36 | @Slf4j |
40 | public abstract class PluginManager { | 37 | public abstract class PluginManager { |
@@ -64,13 +61,9 @@ public abstract class PluginManager { | @@ -64,13 +61,9 @@ public abstract class PluginManager { | ||
64 | abstract TenantId getTenantId(); | 61 | abstract TenantId getTenantId(); |
65 | 62 | ||
66 | public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) { | 63 | public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) { |
67 | - ActorRef pluginActor = pluginActors.get(pluginId); | ||
68 | - if (pluginActor == null) { | ||
69 | - pluginActor = context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pluginId)) | ||
70 | - .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pluginId.toString()); | ||
71 | - pluginActors.put(pluginId, pluginActor); | ||
72 | - } | ||
73 | - return pluginActor; | 64 | + return pluginActors.computeIfAbsent(pluginId, pId -> |
65 | + context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId)) | ||
66 | + .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString())); | ||
74 | } | 67 | } |
75 | 68 | ||
76 | public void broadcast(Object msg) { | 69 | public void broadcast(Object msg) { |
@@ -20,7 +20,6 @@ import org.thingsboard.server.common.data.id.TenantId; | @@ -20,7 +20,6 @@ import org.thingsboard.server.common.data.id.TenantId; | ||
20 | import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction; | 20 | import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction; |
21 | import org.thingsboard.server.common.data.plugin.PluginMetaData; | 21 | import org.thingsboard.server.common.data.plugin.PluginMetaData; |
22 | import org.thingsboard.server.dao.plugin.BasePluginService; | 22 | import org.thingsboard.server.dao.plugin.BasePluginService; |
23 | -import org.thingsboard.server.dao.plugin.PluginService; | ||
24 | 23 | ||
25 | public class SystemPluginManager extends PluginManager { | 24 | public class SystemPluginManager extends PluginManager { |
26 | 25 | ||
@@ -30,7 +29,7 @@ public class SystemPluginManager extends PluginManager { | @@ -30,7 +29,7 @@ public class SystemPluginManager extends PluginManager { | ||
30 | 29 | ||
31 | @Override | 30 | @Override |
32 | FetchFunction<PluginMetaData> getFetchPluginsFunction() { | 31 | FetchFunction<PluginMetaData> getFetchPluginsFunction() { |
33 | - return link -> pluginService.findSystemPlugins(link); | 32 | + return pluginService::findSystemPlugins; |
34 | } | 33 | } |
35 | 34 | ||
36 | @Override | 35 | @Override |
@@ -18,8 +18,7 @@ package org.thingsboard.server.actors.shared.rule; | @@ -18,8 +18,7 @@ package org.thingsboard.server.actors.shared.rule; | ||
18 | import akka.actor.ActorContext; | 18 | import akka.actor.ActorContext; |
19 | import akka.actor.ActorRef; | 19 | import akka.actor.ActorRef; |
20 | import akka.actor.Props; | 20 | import akka.actor.Props; |
21 | -import org.slf4j.Logger; | ||
22 | -import org.slf4j.LoggerFactory; | 21 | +import lombok.extern.slf4j.Slf4j; |
23 | import org.thingsboard.server.actors.ActorSystemContext; | 22 | import org.thingsboard.server.actors.ActorSystemContext; |
24 | import org.thingsboard.server.actors.rule.RuleActor; | 23 | import org.thingsboard.server.actors.rule.RuleActor; |
25 | import org.thingsboard.server.actors.rule.RuleActorChain; | 24 | import org.thingsboard.server.actors.rule.RuleActorChain; |
@@ -38,10 +37,9 @@ import org.thingsboard.server.dao.rule.RuleService; | @@ -38,10 +37,9 @@ import org.thingsboard.server.dao.rule.RuleService; | ||
38 | 37 | ||
39 | import java.util.*; | 38 | import java.util.*; |
40 | 39 | ||
40 | +@Slf4j | ||
41 | public abstract class RuleManager { | 41 | public abstract class RuleManager { |
42 | 42 | ||
43 | - protected static final Logger logger = LoggerFactory.getLogger(RuleManager.class); | ||
44 | - | ||
45 | protected final ActorSystemContext systemContext; | 43 | protected final ActorSystemContext systemContext; |
46 | protected final RuleService ruleService; | 44 | protected final RuleService ruleService; |
47 | protected final Map<RuleId, ActorRef> ruleActors; | 45 | protected final Map<RuleId, ActorRef> ruleActors; |
@@ -63,11 +61,11 @@ public abstract class RuleManager { | @@ -63,11 +61,11 @@ public abstract class RuleManager { | ||
63 | ruleMap = new HashMap<>(); | 61 | ruleMap = new HashMap<>(); |
64 | 62 | ||
65 | for (RuleMetaData rule : ruleIterator) { | 63 | for (RuleMetaData rule : ruleIterator) { |
66 | - logger.debug("[{}] Creating rule actor {}", rule.getId(), rule); | 64 | + log.debug("[{}] Creating rule actor {}", rule.getId(), rule); |
67 | ActorRef ref = getOrCreateRuleActor(context, rule.getId()); | 65 | ActorRef ref = getOrCreateRuleActor(context, rule.getId()); |
68 | RuleActorMetaData actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref); | 66 | RuleActorMetaData actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref); |
69 | ruleMap.put(rule, actorMd); | 67 | ruleMap.put(rule, actorMd); |
70 | - logger.debug("[{}] Rule actor created.", rule.getId()); | 68 | + log.debug("[{}] Rule actor created.", rule.getId()); |
71 | } | 69 | } |
72 | 70 | ||
73 | refreshRuleChain(); | 71 | refreshRuleChain(); |
@@ -79,8 +77,11 @@ public abstract class RuleManager { | @@ -79,8 +77,11 @@ public abstract class RuleManager { | ||
79 | rule = systemContext.getRuleService().findRuleById(ruleId); | 77 | rule = systemContext.getRuleService().findRuleById(ruleId); |
80 | } | 78 | } |
81 | if (rule == null) { | 79 | if (rule == null) { |
82 | - rule = ruleMap.keySet().stream().filter(r -> r.getId().equals(ruleId)).findFirst().orElse(null); | ||
83 | - rule.setState(ComponentLifecycleState.SUSPENDED); | 80 | + rule = ruleMap.keySet().stream() |
81 | + .filter(r -> r.getId().equals(ruleId)) | ||
82 | + .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)) | ||
83 | + .findFirst() | ||
84 | + .orElse(null); | ||
84 | } | 85 | } |
85 | if (rule != null) { | 86 | if (rule != null) { |
86 | RuleActorMetaData actorMd = ruleMap.get(rule); | 87 | RuleActorMetaData actorMd = ruleMap.get(rule); |
@@ -92,7 +93,7 @@ public abstract class RuleManager { | @@ -92,7 +93,7 @@ public abstract class RuleManager { | ||
92 | refreshRuleChain(); | 93 | refreshRuleChain(); |
93 | return Optional.of(actorMd.getActorRef()); | 94 | return Optional.of(actorMd.getActorRef()); |
94 | } else { | 95 | } else { |
95 | - logger.warn("[{}] Can't process unknown rule!", rule.getId()); | 96 | + log.warn("[{}] Can't process unknown rule!", ruleId); |
96 | return Optional.empty(); | 97 | return Optional.empty(); |
97 | } | 98 | } |
98 | } | 99 | } |
@@ -100,13 +101,9 @@ public abstract class RuleManager { | @@ -100,13 +101,9 @@ public abstract class RuleManager { | ||
100 | abstract FetchFunction<RuleMetaData> getFetchRulesFunction(); | 101 | abstract FetchFunction<RuleMetaData> getFetchRulesFunction(); |
101 | 102 | ||
102 | public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) { | 103 | public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) { |
103 | - ActorRef ruleActor = ruleActors.get(ruleId); | ||
104 | - if (ruleActor == null) { | ||
105 | - ruleActor = context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, ruleId)) | ||
106 | - .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), ruleId.toString()); | ||
107 | - ruleActors.put(ruleId, ruleActor); | ||
108 | - } | ||
109 | - return ruleActor; | 104 | + return ruleActors.computeIfAbsent(ruleId, rId -> |
105 | + context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId)) | ||
106 | + .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString())); | ||
110 | } | 107 | } |
111 | 108 | ||
112 | public RuleActorChain getRuleChain() { | 109 | public RuleActorChain getRuleChain() { |
@@ -29,7 +29,7 @@ public class SystemRuleManager extends RuleManager { | @@ -29,7 +29,7 @@ public class SystemRuleManager extends RuleManager { | ||
29 | 29 | ||
30 | @Override | 30 | @Override |
31 | FetchFunction<RuleMetaData> getFetchRulesFunction() { | 31 | FetchFunction<RuleMetaData> getFetchRulesFunction() { |
32 | - return link -> ruleService.findSystemRules(link); | 32 | + return ruleService::findSystemRules; |
33 | } | 33 | } |
34 | 34 | ||
35 | } | 35 | } |