Commit ae4a71346f862b3a71201c2d17f6265cf58f6dcf

Authored by Andrew Shvayka
1 parent c3e62a7b

TB-73: Implementation

@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule; @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule;
18 import java.util.*; 18 import java.util.*;
19 19
20 import com.fasterxml.jackson.core.JsonProcessingException; 20 import com.fasterxml.jackson.core.JsonProcessingException;
  21 +import org.springframework.util.StringUtils;
21 import org.thingsboard.server.actors.ActorSystemContext; 22 import org.thingsboard.server.actors.ActorSystemContext;
22 import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper; 23 import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper;
23 import org.thingsboard.server.actors.shared.ComponentMsgProcessor; 24 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
@@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
113 } 114 }
114 115
115 private void initAction() throws Exception { 116 private void initAction() throws Exception {
116 - JsonNode actionMd = ruleMd.getAction();  
117 - action = initComponent(actionMd); 117 + if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) {
  118 + action = initComponent(ruleMd.getAction());
  119 + }
118 } 120 }
119 121
120 private void initProcessor() throws Exception { 122 private void initProcessor() throws Exception {
@@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
131 } 133 }
132 134
133 private void fetchPluginInfo() { 135 private void fetchPluginInfo() {
134 - PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());  
135 - pluginTenantId = pluginMd.getTenantId();  
136 - pluginId = pluginMd.getId(); 136 + if (!StringUtils.isEmpty(ruleMd.getPluginToken())) {
  137 + PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
  138 + pluginTenantId = pluginMd.getTenantId();
  139 + pluginId = pluginMd.getId();
  140 + }
137 } 141 }
138 142
139 protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException { 143 protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException {
@@ -162,25 +166,26 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -162,25 +166,26 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
162 inMsgMd = new RuleProcessingMetaData(); 166 inMsgMd = new RuleProcessingMetaData();
163 } 167 }
164 logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg); 168 logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg);
165 - Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);  
166 - if (ruleToPluginMsgOptional.isPresent()) {  
167 - RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();  
168 - logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);  
169 - context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());  
170 - if (action.isOneWayAction()) {  
171 - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);  
172 - } else {  
173 - pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);  
174 - scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); 169 + if (action != null) {
  170 + Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
  171 + if (ruleToPluginMsgOptional.isPresent()) {
  172 + RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
  173 + logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
  174 + context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
  175 + if (action.isOneWayAction()) {
  176 + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
  177 + } else {
  178 + pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
  179 + scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
  180 + }
175 } 181 }
176 } else { 182 } else {
177 logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); 183 logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
178 - pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS);  
179 - return; 184 + pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
180 } 185 }
181 } 186 }
182 187
183 - public void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) { 188 + void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
184 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); 189 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
185 if (pendingMsg != null) { 190 if (pendingMsg != null) {
186 ChainProcessingContext ctx = pendingMsg.getCtx(); 191 ChainProcessingContext ctx = pendingMsg.getCtx();
@@ -196,7 +201,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -196,7 +201,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
196 } 201 }
197 } 202 }
198 203
199 - public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { 204 + void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
200 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); 205 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
201 if (pendingMsg != null) { 206 if (pendingMsg != null) {
202 logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg); 207 logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg);
@@ -269,18 +274,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -269,18 +274,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
269 public void onActivate(ActorContext context) throws Exception { 274 public void onActivate(ActorContext context) throws Exception {
270 logger.info("[{}] Going to process onActivate rule.", entityId); 275 logger.info("[{}] Going to process onActivate rule.", entityId);
271 this.state = ComponentLifecycleState.ACTIVE; 276 this.state = ComponentLifecycleState.ACTIVE;
272 - if (action != null) {  
273 - if (filters != null) {  
274 - filters.forEach(f -> f.resume());  
275 - } else {  
276 - initFilters();  
277 - } 277 + if (filters != null) {
  278 + filters.forEach(RuleLifecycleComponent::resume);
278 if (processor != null) { 279 if (processor != null) {
279 processor.resume(); 280 processor.resume();
280 } else { 281 } else {
281 initProcessor(); 282 initProcessor();
282 } 283 }
283 - action.resume(); 284 + if (action != null) {
  285 + action.resume();
  286 + }
284 logger.info("[{}] Rule resumed.", entityId); 287 logger.info("[{}] Rule resumed.", entityId);
285 } else { 288 } else {
286 start(); 289 start();
@@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic @@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
91 if (rule.getProcessor() != null && !rule.getProcessor().isNull()) { 91 if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
92 validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR); 92 validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
93 } 93 }
94 - validateComponentJson(rule.getAction(), ComponentType.ACTION); 94 + if (rule.getAction() != null && !rule.getAction().isNull()) {
  95 + validateComponentJson(rule.getAction(), ComponentType.ACTION);
  96 + }
95 validateRuleAndPluginState(rule); 97 validateRuleAndPluginState(rule);
96 return ruleDao.save(rule); 98 return ruleDao.save(rule);
97 } 99 }
@@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic @@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
129 } 131 }
130 132
131 private void validateRuleAndPluginState(RuleMetaData rule) { 133 private void validateRuleAndPluginState(RuleMetaData rule) {
  134 + if (org.springframework.util.StringUtils.isEmpty(rule.getPluginToken())) {
  135 + return;
  136 + }
132 PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken()); 137 PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken());
133 if (pluginMd == null) { 138 if (pluginMd == null) {
134 throw new IncorrectParameterException("Rule points to non-existent plugin!"); 139 throw new IncorrectParameterException("Rule points to non-existent plugin!");