Commit e0b6c3238c0a22cdea8f66727c14bb540565afb3

Authored by Andrew Shvayka
1 parent 606d21ab

Added Failure relation to be present by default for each node. Forwarding Errors to Failure

Showing 29 changed files with 70 additions and 69 deletions
@@ -25,7 +25,9 @@ import org.thingsboard.rule.engine.api.RuleEngineRpcService; @@ -25,7 +25,9 @@ import org.thingsboard.rule.engine.api.RuleEngineRpcService;
25 import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; 25 import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
26 import org.thingsboard.rule.engine.api.ScriptEngine; 26 import org.thingsboard.rule.engine.api.ScriptEngine;
27 import org.thingsboard.rule.engine.api.TbContext; 27 import org.thingsboard.rule.engine.api.TbContext;
  28 +import org.thingsboard.rule.engine.api.TbRelationTypes;
28 import org.thingsboard.server.actors.ActorSystemContext; 29 import org.thingsboard.server.actors.ActorSystemContext;
  30 +import org.thingsboard.server.common.data.DataConstants;
29 import org.thingsboard.server.common.data.id.DeviceId; 31 import org.thingsboard.server.common.data.id.DeviceId;
30 import org.thingsboard.server.common.data.id.EntityId; 32 import org.thingsboard.server.common.data.id.EntityId;
31 import org.thingsboard.server.common.data.id.RuleNodeId; 33 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -98,11 +100,11 @@ class DefaultTbContext implements TbContext { @@ -98,11 +100,11 @@ class DefaultTbContext implements TbContext {
98 } 100 }
99 101
100 @Override 102 @Override
101 - public void tellError(TbMsg msg, Throwable th) { 103 + public void tellFailure(TbMsg msg, Throwable th) {
102 if (nodeCtx.getSelf().isDebugMode()) { 104 if (nodeCtx.getSelf().isDebugMode()) {
103 - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, "", th); 105 + mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
104 } 106 }
105 - nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor()); 107 + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), msg), nodeCtx.getSelfActor());
106 } 108 }
107 109
108 @Override 110 @Override
@@ -29,6 +29,8 @@ import org.springframework.stereotype.Service; @@ -29,6 +29,8 @@ import org.springframework.stereotype.Service;
29 import org.thingsboard.rule.engine.api.NodeConfiguration; 29 import org.thingsboard.rule.engine.api.NodeConfiguration;
30 import org.thingsboard.rule.engine.api.NodeDefinition; 30 import org.thingsboard.rule.engine.api.NodeDefinition;
31 import org.thingsboard.rule.engine.api.RuleNode; 31 import org.thingsboard.rule.engine.api.RuleNode;
  32 +import org.thingsboard.rule.engine.api.TbRelationTypes;
  33 +import org.thingsboard.server.common.data.DataConstants;
32 import org.thingsboard.server.common.data.plugin.ComponentDescriptor; 34 import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
33 import org.thingsboard.server.common.data.plugin.ComponentType; 35 import org.thingsboard.server.common.data.plugin.ComponentType;
34 import org.thingsboard.server.dao.component.ComponentDescriptorService; 36 import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -36,6 +38,7 @@ import org.thingsboard.server.dao.component.ComponentDescriptorService; @@ -36,6 +38,7 @@ import org.thingsboard.server.dao.component.ComponentDescriptorService;
36 import javax.annotation.PostConstruct; 38 import javax.annotation.PostConstruct;
37 import java.lang.annotation.Annotation; 39 import java.lang.annotation.Annotation;
38 import java.util.ArrayList; 40 import java.util.ArrayList;
  41 +import java.util.Arrays;
39 import java.util.Collection; 42 import java.util.Collection;
40 import java.util.Collections; 43 import java.util.Collections;
41 import java.util.HashMap; 44 import java.util.HashMap;
@@ -163,7 +166,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe @@ -163,7 +166,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
163 nodeDefinition.setDescription(nodeAnnotation.nodeDescription()); 166 nodeDefinition.setDescription(nodeAnnotation.nodeDescription());
164 nodeDefinition.setInEnabled(nodeAnnotation.inEnabled()); 167 nodeDefinition.setInEnabled(nodeAnnotation.inEnabled());
165 nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled()); 168 nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled());
166 - nodeDefinition.setRelationTypes(nodeAnnotation.relationTypes()); 169 + nodeDefinition.setRelationTypes(getRelationTypesWithFailureRelation(nodeAnnotation));
167 nodeDefinition.setCustomRelations(nodeAnnotation.customRelations()); 170 nodeDefinition.setCustomRelations(nodeAnnotation.customRelations());
168 Class<? extends NodeConfiguration> configClazz = nodeAnnotation.configClazz(); 171 Class<? extends NodeConfiguration> configClazz = nodeAnnotation.configClazz();
169 NodeConfiguration config = configClazz.newInstance(); 172 NodeConfiguration config = configClazz.newInstance();
@@ -176,6 +179,14 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe @@ -176,6 +179,14 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
176 return nodeDefinition; 179 return nodeDefinition;
177 } 180 }
178 181
  182 + private String[] getRelationTypesWithFailureRelation(RuleNode nodeAnnotation) {
  183 + List<String> relationTypes = new ArrayList<>(Arrays.asList(nodeAnnotation.relationTypes()));
  184 + if (!relationTypes.contains(TbRelationTypes.FAILURE)) {
  185 + relationTypes.add(TbRelationTypes.FAILURE);
  186 + }
  187 + return relationTypes.toArray(new String[relationTypes.size()]);
  188 + }
  189 +
179 private Set<BeanDefinition> getBeanDefinitions(Class<? extends Annotation> componentType) { 190 private Set<BeanDefinition> getBeanDefinitions(Class<? extends Annotation> componentType) {
180 ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false); 191 ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
181 scanner.addIncludeFilter(new AnnotationTypeFilter(componentType)); 192 scanner.addIncludeFilter(new AnnotationTypeFilter(componentType));
@@ -46,7 +46,7 @@ public interface TbContext { @@ -46,7 +46,7 @@ public interface TbContext {
46 46
47 void tellSelf(TbMsg msg, long delayMs); 47 void tellSelf(TbMsg msg, long delayMs);
48 48
49 - void tellError(TbMsg msg, Throwable th); 49 + void tellFailure(TbMsg msg, Throwable th);
50 50
51 void updateSelf(RuleNode self); 51 void updateSelf(RuleNode self);
52 52
@@ -62,7 +62,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -62,7 +62,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
62 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); 62 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
63 } 63 }
64 }, 64 },
65 - t -> ctx.tellError(msg, t)); 65 + t -> ctx.tellFailure(msg, t));
66 } 66 }
67 67
68 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg); 68 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg);
@@ -57,7 +57,7 @@ public class TbLogNode implements TbNode { @@ -57,7 +57,7 @@ public class TbLogNode implements TbNode {
57 log.info(toString); 57 log.info(toString);
58 ctx.tellNext(msg, SUCCESS); 58 ctx.tellNext(msg, SUCCESS);
59 }, 59 },
60 - t -> ctx.tellError(msg, t)); 60 + t -> ctx.tellFailure(msg, t));
61 } 61 }
62 62
63 @Override 63 @Override
@@ -77,15 +77,15 @@ public class TbSnsNode implements TbNode { @@ -77,15 +77,15 @@ public class TbSnsNode implements TbNode {
77 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS), 77 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
78 t -> { 78 t -> {
79 TbMsg next = processException(ctx, msg, t); 79 TbMsg next = processException(ctx, msg, t);
80 - ctx.tellNext(next, TbRelationTypes.FAILURE, t); 80 + ctx.tellFailure(next, t);
81 }); 81 });
82 } 82 }
83 83
84 - ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) { 84 + private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
85 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg)); 85 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg));
86 } 86 }
87 87
88 - TbMsg publishMessage(TbContext ctx, TbMsg msg) { 88 + private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
89 String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg.getMetaData()); 89 String topicArn = TbNodeUtils.processPattern(this.config.getTopicArnPattern(), msg.getMetaData());
90 PublishRequest publishRequest = new PublishRequest() 90 PublishRequest publishRequest = new PublishRequest()
91 .withTopicArn(topicArn) 91 .withTopicArn(topicArn)
@@ -83,15 +83,15 @@ public class TbSqsNode implements TbNode { @@ -83,15 +83,15 @@ public class TbSqsNode implements TbNode {
83 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS), 83 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
84 t -> { 84 t -> {
85 TbMsg next = processException(ctx, msg, t); 85 TbMsg next = processException(ctx, msg, t);
86 - ctx.tellNext(next, TbRelationTypes.FAILURE, t); 86 + ctx.tellFailure(next, t);
87 }); 87 });
88 } 88 }
89 89
90 - ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) { 90 + private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
91 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg)); 91 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg));
92 } 92 }
93 93
94 - TbMsg publishMessage(TbContext ctx, TbMsg msg) { 94 + private TbMsg publishMessage(TbContext ctx, TbMsg msg) {
95 String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg.getMetaData()); 95 String queueUrl = TbNodeUtils.processPattern(this.config.getQueueUrlPattern(), msg.getMetaData());
96 SendMessageRequest sendMsgRequest = new SendMessageRequest(); 96 SendMessageRequest sendMsgRequest = new SendMessageRequest();
97 sendMsgRequest.withQueueUrl(queueUrl); 97 sendMsgRequest.withQueueUrl(queueUrl);
@@ -74,7 +74,7 @@ public class TbMsgGeneratorNode implements TbNode { @@ -74,7 +74,7 @@ public class TbMsgGeneratorNode implements TbNode {
74 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { 74 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
75 withCallback(generate(ctx), 75 withCallback(generate(ctx),
76 m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);}, 76 m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},
77 - t -> {ctx.tellError(msg, t); sentTickMsg(ctx);}); 77 + t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);});
78 } 78 }
79 } 79 }
80 80
@@ -52,8 +52,8 @@ public class TbJsFilterNode implements TbNode { @@ -52,8 +52,8 @@ public class TbJsFilterNode implements TbNode {
52 public void onMsg(TbContext ctx, TbMsg msg) { 52 public void onMsg(TbContext ctx, TbMsg msg) {
53 ListeningExecutor jsExecutor = ctx.getJsExecutor(); 53 ListeningExecutor jsExecutor = ctx.getJsExecutor();
54 withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)), 54 withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)),
55 - filterResult -> ctx.tellNext(msg, filterResult.booleanValue() ? "True" : "False"),  
56 - t -> ctx.tellError(msg, t)); 55 + filterResult -> ctx.tellNext(msg, filterResult ? "True" : "False"),
  56 + t -> ctx.tellFailure(msg, t));
57 } 57 }
58 58
59 @Override 59 @Override
@@ -55,7 +55,7 @@ public class TbJsSwitchNode implements TbNode { @@ -55,7 +55,7 @@ public class TbJsSwitchNode implements TbNode {
55 ListeningExecutor jsExecutor = ctx.getJsExecutor(); 55 ListeningExecutor jsExecutor = ctx.getJsExecutor();
56 withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)), 56 withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)),
57 result -> processSwitch(ctx, msg, result), 57 result -> processSwitch(ctx, msg, result),
58 - t -> ctx.tellError(msg, t)); 58 + t -> ctx.tellFailure(msg, t));
59 } 59 }
60 60
61 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) { 61 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
@@ -82,11 +82,11 @@ public class TbKafkaNode implements TbNode { @@ -82,11 +82,11 @@ public class TbKafkaNode implements TbNode {
82 ctx.tellNext(next, TbRelationTypes.SUCCESS); 82 ctx.tellNext(next, TbRelationTypes.SUCCESS);
83 } else { 83 } else {
84 TbMsg next = processException(ctx, msg, e); 84 TbMsg next = processException(ctx, msg, e);
85 - ctx.tellNext(next, TbRelationTypes.FAILURE, e); 85 + ctx.tellFailure(next, e);
86 } 86 }
87 }); 87 });
88 } catch (Exception e) { 88 } catch (Exception e) {
89 - ctx.tellError(msg, e); 89 + ctx.tellFailure(msg, e);
90 } 90 }
91 } 91 }
92 92
@@ -61,7 +61,7 @@ public class TbMsgToEmailNode implements TbNode { @@ -61,7 +61,7 @@ public class TbMsgToEmailNode implements TbNode {
61 ctx.tellNext(emailMsg, SUCCESS); 61 ctx.tellNext(emailMsg, SUCCESS);
62 } catch (Exception ex) { 62 } catch (Exception ex) {
63 log.warn("Can not convert message to email " + ex.getMessage()); 63 log.warn("Can not convert message to email " + ex.getMessage());
64 - ctx.tellError(msg, ex); 64 + ctx.tellFailure(msg, ex);
65 } 65 }
66 } 66 }
67 67
@@ -15,9 +15,7 @@ @@ -15,9 +15,7 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.mail; 16 package org.thingsboard.rule.engine.mail;
17 17
18 -import com.fasterxml.jackson.databind.JsonNode;  
19 import com.fasterxml.jackson.databind.ObjectMapper; 18 import com.fasterxml.jackson.databind.ObjectMapper;
20 -import com.google.common.util.concurrent.ListenableFuture;  
21 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
22 import org.apache.commons.lang3.StringUtils; 20 import org.apache.commons.lang3.StringUtils;
23 import org.springframework.mail.javamail.JavaMailSenderImpl; 21 import org.springframework.mail.javamail.JavaMailSenderImpl;
@@ -78,9 +76,9 @@ public class TbSendEmailNode implements TbNode { @@ -78,9 +76,9 @@ public class TbSendEmailNode implements TbNode {
78 return null; 76 return null;
79 }), 77 }),
80 ok -> ctx.tellNext(msg, SUCCESS), 78 ok -> ctx.tellNext(msg, SUCCESS),
81 - fail -> ctx.tellError(msg, fail)); 79 + fail -> ctx.tellFailure(msg, fail));
82 } catch (Exception ex) { 80 } catch (Exception ex) {
83 - ctx.tellError(msg, ex); 81 + ctx.tellFailure(msg, ex);
84 } 82 }
85 } 83 }
86 84
@@ -52,9 +52,9 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -52,9 +52,9 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
52 withCallback( 52 withCallback(
53 findEntityAsync(ctx, msg.getOriginator()), 53 findEntityAsync(ctx, msg.getOriginator()),
54 entityId -> safePutAttributes(ctx, msg, entityId), 54 entityId -> safePutAttributes(ctx, msg, entityId),
55 - t -> ctx.tellError(msg, t), ctx.getDbCallbackExecutor()); 55 + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
56 } catch (Throwable th) { 56 } catch (Throwable th) {
57 - ctx.tellError(msg, th); 57 + ctx.tellFailure(msg, th);
58 } 58 }
59 } 59 }
60 60
@@ -69,7 +69,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -69,7 +69,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
69 putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"), 69 putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
70 putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_") 70 putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_")
71 ); 71 );
72 - withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t)); 72 + withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellFailure(msg, t));
73 } 73 }
74 74
75 private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, String prefix) { 75 private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, String prefix) {
@@ -77,7 +77,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -77,7 +77,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
77 return Futures.immediateFuture(null); 77 return Futures.immediateFuture(null);
78 } 78 }
79 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys); 79 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
80 - return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, Void>) l -> { 80 + return Futures.transform(latest, l -> {
81 l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); 81 l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
82 return null; 82 return null;
83 }); 83 });
@@ -88,7 +88,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -88,7 +88,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
88 return Futures.immediateFuture(null); 88 return Futures.immediateFuture(null);
89 } 89 }
90 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys); 90 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
91 - return Futures.transform(latest, (Function<? super List<TsKvEntry>, Void>) l -> { 91 + return Futures.transform(latest, l -> {
92 l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString())); 92 l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
93 return null; 93 return null;
94 }); 94 });
@@ -54,9 +54,9 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode @@ -54,9 +54,9 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
54 withCallback( 54 withCallback(
55 findEntityAsync(ctx, msg.getOriginator()), 55 findEntityAsync(ctx, msg.getOriginator()),
56 entityId -> safeGetAttributes(ctx, msg, entityId), 56 entityId -> safeGetAttributes(ctx, msg, entityId),
57 - t -> ctx.tellError(msg, t)); 57 + t -> ctx.tellFailure(msg, t));
58 } catch (Throwable th) { 58 } catch (Throwable th) {
59 - ctx.tellError(msg, th); 59 + ctx.tellFailure(msg, th);
60 } 60 }
61 } 61 }
62 62
@@ -68,18 +68,18 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode @@ -68,18 +68,18 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
68 68
69 withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId), 69 withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
70 attributes -> putAttributesAndTell(ctx, msg, attributes), 70 attributes -> putAttributesAndTell(ctx, msg, attributes),
71 - t -> ctx.tellError(msg, t)); 71 + t -> ctx.tellFailure(msg, t));
72 } 72 }
73 73
74 private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) { 74 private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
75 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()); 75 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
76 - return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l -> 76 + return Futures.transform(latest, l ->
77 l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); 77 l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
78 } 78 }
79 79
80 private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) { 80 private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
81 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet()); 81 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
82 - return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l -> 82 + return Futures.transform(latest, l ->
83 l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); 83 l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
84 } 84 }
85 85
@@ -83,7 +83,7 @@ public class TbMqttNode implements TbNode { @@ -83,7 +83,7 @@ public class TbMqttNode implements TbNode {
83 ctx.tellNext(msg, TbRelationTypes.SUCCESS); 83 ctx.tellNext(msg, TbRelationTypes.SUCCESS);
84 } else { 84 } else {
85 TbMsg next = processException(ctx, msg, future.cause()); 85 TbMsg next = processException(ctx, msg, future.cause());
86 - ctx.tellNext(next, TbRelationTypes.FAILURE, future.cause()); 86 + ctx.tellFailure(next, future.cause());
87 } 87 }
88 } 88 }
89 ); 89 );
@@ -80,15 +80,15 @@ public class TbRabbitMqNode implements TbNode { @@ -80,15 +80,15 @@ public class TbRabbitMqNode implements TbNode {
80 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS), 80 m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
81 t -> { 81 t -> {
82 TbMsg next = processException(ctx, msg, t); 82 TbMsg next = processException(ctx, msg, t);
83 - ctx.tellNext(next, TbRelationTypes.FAILURE, t); 83 + ctx.tellFailure(next, t);
84 }); 84 });
85 } 85 }
86 86
87 - ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) { 87 + private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
88 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg)); 88 return ctx.getExternalCallExecutor().executeAsync(() -> publishMessage(ctx, msg));
89 } 89 }
90 90
91 - TbMsg publishMessage(TbContext ctx, TbMsg msg) throws Exception { 91 + private TbMsg publishMessage(TbContext ctx, TbMsg msg) throws Exception {
92 String exchangeName = ""; 92 String exchangeName = "";
93 if (!StringUtils.isEmpty(this.config.getExchangeNamePattern())) { 93 if (!StringUtils.isEmpty(this.config.getExchangeNamePattern())) {
94 exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg.getMetaData()); 94 exchangeName = TbNodeUtils.processPattern(this.config.getExchangeNamePattern(), msg.getMetaData());
@@ -89,7 +89,7 @@ public class TbRestApiCallNode implements TbNode { @@ -89,7 +89,7 @@ public class TbRestApiCallNode implements TbNode {
89 @Override 89 @Override
90 public void onFailure(Throwable throwable) { 90 public void onFailure(Throwable throwable) {
91 TbMsg next = processException(ctx, msg, throwable); 91 TbMsg next = processException(ctx, msg, throwable);
92 - ctx.tellNext(next, TbRelationTypes.FAILURE, throwable); 92 + ctx.tellFailure(next, throwable);
93 } 93 }
94 94
95 @Override 95 @Override
@@ -52,11 +52,11 @@ public class TbSendRPCReplyNode implements TbNode { @@ -52,11 +52,11 @@ public class TbSendRPCReplyNode implements TbNode {
52 public void onMsg(TbContext ctx, TbMsg msg) { 52 public void onMsg(TbContext ctx, TbMsg msg) {
53 String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute()); 53 String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
54 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) { 54 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
55 - ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!")); 55 + ctx.tellFailure(msg, new RuntimeException("Message originator is not a device entity!"));
56 } else if (StringUtils.isEmpty(requestIdStr)) { 56 } else if (StringUtils.isEmpty(requestIdStr)) {
57 - ctx.tellError(msg, new RuntimeException("Request id is not present in the metadata!")); 57 + ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!"));
58 } else if (StringUtils.isEmpty(msg.getData())) { 58 } else if (StringUtils.isEmpty(msg.getData())) {
59 - ctx.tellError(msg, new RuntimeException("Request body is empty!")); 59 + ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
60 } else { 60 } else {
61 ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData()); 61 ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
62 } 62 }
@@ -63,15 +63,15 @@ public class TbSendRPCRequestNode implements TbNode { @@ -63,15 +63,15 @@ public class TbSendRPCRequestNode implements TbNode {
63 JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject(); 63 JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
64 64
65 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) { 65 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
66 - ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!")); 66 + ctx.tellFailure(msg, new RuntimeException("Message originator is not a device entity!"));
67 } else if (!json.has("method")) { 67 } else if (!json.has("method")) {
68 - ctx.tellError(msg, new RuntimeException("Method is not present in the message!")); 68 + ctx.tellFailure(msg, new RuntimeException("Method is not present in the message!"));
69 } else if (!json.has("params")) { 69 } else if (!json.has("params")) {
70 - ctx.tellError(msg, new RuntimeException("Params are not present in the message!")); 70 + ctx.tellFailure(msg, new RuntimeException("Params are not present in the message!"));
71 } else { 71 } else {
72 int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt(); 72 int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
73 RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder() 73 RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
74 - .method(gson.toJson(json.get("method"))) 74 + .method(json.get("method").getAsString())
75 .body(gson.toJson(json.get("params"))) 75 .body(gson.toJson(json.get("params")))
76 .deviceId(new DeviceId(msg.getOriginator().getId())) 76 .deviceId(new DeviceId(msg.getOriginator().getId()))
77 .requestId(requestId) 77 .requestId(requestId)
@@ -84,8 +84,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -84,8 +84,7 @@ public class TbSendRPCRequestNode implements TbNode {
84 ctx.tellNext(next, TbRelationTypes.SUCCESS); 84 ctx.tellNext(next, TbRelationTypes.SUCCESS);
85 } else { 85 } else {
86 TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); 86 TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
87 - ctx.tellNext(next, TbRelationTypes.FAILURE);  
88 - ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); 87 + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
89 } 88 }
90 }); 89 });
91 } 90 }
@@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.telemetry; @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.telemetry;
17 17
18 import com.google.gson.JsonParser; 18 import com.google.gson.JsonParser;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
20 -import org.springframework.util.StringUtils;  
21 import org.thingsboard.rule.engine.TbNodeUtils; 20 import org.thingsboard.rule.engine.TbNodeUtils;
22 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
23 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
@@ -25,19 +24,12 @@ import org.thingsboard.rule.engine.api.TbNode; @@ -25,19 +24,12 @@ import org.thingsboard.rule.engine.api.TbNode;
25 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 24 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
26 import org.thingsboard.rule.engine.api.TbNodeException; 25 import org.thingsboard.rule.engine.api.TbNodeException;
27 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 26 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
28 -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;  
29 -import org.thingsboard.server.common.data.kv.KvEntry;  
30 -import org.thingsboard.server.common.data.kv.TsKvEntry;  
31 import org.thingsboard.server.common.data.plugin.ComponentType; 27 import org.thingsboard.server.common.data.plugin.ComponentType;
32 import org.thingsboard.server.common.msg.TbMsg; 28 import org.thingsboard.server.common.msg.TbMsg;
33 -import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;  
34 -import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;  
35 import org.thingsboard.server.common.msg.session.SessionMsgType; 29 import org.thingsboard.server.common.msg.session.SessionMsgType;
36 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 30 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 31
38 import java.util.ArrayList; 32 import java.util.ArrayList;
39 -import java.util.List;  
40 -import java.util.Map;  
41 import java.util.Set; 33 import java.util.Set;
42 34
43 @Slf4j 35 @Slf4j
@@ -63,7 +55,7 @@ public class TbMsgAttributesNode implements TbNode { @@ -63,7 +55,7 @@ public class TbMsgAttributesNode implements TbNode {
63 @Override 55 @Override
64 public void onMsg(TbContext ctx, TbMsg msg) { 56 public void onMsg(TbContext ctx, TbMsg msg) {
65 if (!msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) { 57 if (!msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
66 - ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); 58 + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
67 return; 59 return;
68 } 60 }
69 61
@@ -60,7 +60,7 @@ public class TbMsgTimeseriesNode implements TbNode { @@ -60,7 +60,7 @@ public class TbMsgTimeseriesNode implements TbNode {
60 @Override 60 @Override
61 public void onMsg(TbContext ctx, TbMsg msg) { 61 public void onMsg(TbContext ctx, TbMsg msg) {
62 if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { 62 if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
63 - ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); 63 + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
64 return; 64 return;
65 } 65 }
66 long ts = -1; 66 long ts = -1;
@@ -71,14 +71,14 @@ public class TbMsgTimeseriesNode implements TbNode { @@ -71,14 +71,14 @@ public class TbMsgTimeseriesNode implements TbNode {
71 } catch (NumberFormatException e) {} 71 } catch (NumberFormatException e) {}
72 } 72 }
73 if (ts == -1) { 73 if (ts == -1) {
74 - ctx.tellError(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData())); 74 + ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
75 return; 75 return;
76 } 76 }
77 String src = msg.getData(); 77 String src = msg.getData();
78 TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); 78 TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
79 Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData(); 79 Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
80 if (tsKvMap == null) { 80 if (tsKvMap == null) {
81 - ctx.tellError(msg, new IllegalArgumentException("Msg body is empty: " + src)); 81 + ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src));
82 return; 82 return;
83 } 83 }
84 List<TsKvEntry> tsKvEntryList = new ArrayList<>(); 84 List<TsKvEntry> tsKvEntryList = new ArrayList<>();
@@ -39,6 +39,6 @@ class TelemetryNodeCallback implements FutureCallback<Void> { @@ -39,6 +39,6 @@ class TelemetryNodeCallback implements FutureCallback<Void> {
39 39
40 @Override 40 @Override
41 public void onFailure(Throwable t) { 41 public void onFailure(Throwable t) {
42 - ctx.tellError(msg, t); 42 + ctx.tellFailure(msg, t);
43 } 43 }
44 } 44 }
@@ -51,7 +51,7 @@ public abstract class TbAbstractTransformNode implements TbNode { @@ -51,7 +51,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
51 ctx.tellNext(msg, FAILURE); 51 ctx.tellNext(msg, FAILURE);
52 } 52 }
53 }, 53 },
54 - t -> ctx.tellError(msg, t)); 54 + t -> ctx.tellFailure(msg, t));
55 } 55 }
56 56
57 protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg); 57 protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg);
@@ -368,7 +368,7 @@ public class TbAlarmNodeTest { @@ -368,7 +368,7 @@ public class TbAlarmNodeTest {
368 368
369 private void verifyError(TbMsg msg, String message, Class expectedClass) { 369 private void verifyError(TbMsg msg, String message, Class expectedClass) {
370 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 370 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
371 - verify(ctx).tellError(same(msg), captor.capture()); 371 + verify(ctx).tellFailure(same(msg), captor.capture());
372 372
373 Throwable value = captor.getValue(); 373 Throwable value = captor.getValue();
374 assertEquals(expectedClass, value.getClass()); 374 assertEquals(expectedClass, value.getClass());
@@ -117,7 +117,7 @@ public class TbJsFilterNodeTest { @@ -117,7 +117,7 @@ public class TbJsFilterNodeTest {
117 117
118 private void verifyError(TbMsg msg, String message, Class expectedClass) { 118 private void verifyError(TbMsg msg, String message, Class expectedClass) {
119 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 119 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
120 - verify(ctx).tellError(same(msg), captor.capture()); 120 + verify(ctx).tellFailure(same(msg), captor.capture());
121 121
122 Throwable value = captor.getValue(); 122 Throwable value = captor.getValue();
123 assertEquals(expectedClass, value.getClass()); 123 assertEquals(expectedClass, value.getClass());
@@ -99,7 +99,7 @@ public class TbJsSwitchNodeTest { @@ -99,7 +99,7 @@ public class TbJsSwitchNodeTest {
99 99
100 private void verifyError(TbMsg msg, String message, Class expectedClass) { 100 private void verifyError(TbMsg msg, String message, Class expectedClass) {
101 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 101 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
102 - verify(ctx).tellError(same(msg), captor.capture()); 102 + verify(ctx).tellFailure(same(msg), captor.capture());
103 103
104 Throwable value = captor.getValue(); 104 Throwable value = captor.getValue();
105 assertEquals(expectedClass, value.getClass()); 105 assertEquals(expectedClass, value.getClass());
@@ -100,7 +100,6 @@ public class TbGetCustomerAttributeNodeTest { @@ -100,7 +100,6 @@ public class TbGetCustomerAttributeNodeTest {
100 User user = new User(); 100 User user = new User();
101 user.setCustomerId(customerId); 101 user.setCustomerId(customerId);
102 102
103 -  
104 msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L); 103 msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
105 104
106 when(ctx.getUserService()).thenReturn(userService); 105 when(ctx.getUserService()).thenReturn(userService);
@@ -112,7 +111,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -112,7 +111,7 @@ public class TbGetCustomerAttributeNodeTest {
112 111
113 node.onMsg(ctx, msg); 112 node.onMsg(ctx, msg);
114 final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 113 final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
115 - verify(ctx).tellError(same(msg), captor.capture()); 114 + verify(ctx).tellFailure(same(msg), captor.capture());
116 115
117 Throwable value = captor.getValue(); 116 Throwable value = captor.getValue();
118 assertEquals("something wrong", value.getMessage()); 117 assertEquals("something wrong", value.getMessage());
@@ -137,7 +136,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -137,7 +136,7 @@ public class TbGetCustomerAttributeNodeTest {
137 136
138 node.onMsg(ctx, msg); 137 node.onMsg(ctx, msg);
139 final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 138 final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
140 - verify(ctx).tellError(same(msg), captor.capture()); 139 + verify(ctx).tellFailure(same(msg), captor.capture());
141 140
142 Throwable value = captor.getValue(); 141 Throwable value = captor.getValue();
143 assertEquals("something wrong", value.getMessage()); 142 assertEquals("something wrong", value.getMessage());
@@ -117,7 +117,7 @@ public class TbTransformMsgNodeTest { @@ -117,7 +117,7 @@ public class TbTransformMsgNodeTest {
117 117
118 private void verifyError(TbMsg msg, String message, Class expectedClass) { 118 private void verifyError(TbMsg msg, String message, Class expectedClass) {
119 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 119 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
120 - verify(ctx).tellError(same(msg), captor.capture()); 120 + verify(ctx).tellFailure(same(msg), captor.capture());
121 121
122 Throwable value = captor.getValue(); 122 Throwable value = captor.getValue();
123 assertEquals(expectedClass, value.getClass()); 123 assertEquals(expectedClass, value.getClass());