Commit 4aa55bccf84519fd7923ffd38cb5eed8541dda0c

Authored by Illia Barkov
Committed by GitHub
1 parent 02714512

Added ability to return arrays in transformation script node (#3910)

* Added ability to return arrays in transformation script node

* fix typo

* Refactoring

* Improvements

* Improvements
... ... @@ -30,7 +30,9 @@ import org.thingsboard.server.common.msg.TbMsg;
30 30 import org.thingsboard.server.common.msg.TbMsgMetaData;
31 31
32 32 import javax.script.ScriptException;
  33 +import java.util.ArrayList;
33 34 import java.util.Collections;
  35 +import java.util.List;
34 36 import java.util.Map;
35 37 import java.util.Set;
36 38 import java.util.UUID;
... ... @@ -116,14 +118,19 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
116 118 }
117 119
118 120 @Override
119   - public ListenableFuture<TbMsg> executeUpdateAsync(TbMsg msg) {
  121 + public ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg) {
120 122 ListenableFuture<JsonNode> result = executeScriptAsync(msg);
121 123 return Futures.transformAsync(result, json -> {
122   - if (!json.isObject()) {
  124 + if (json.isObject()) {
  125 + return Futures.immediateFuture(Collections.singletonList(unbindMsg(json, msg)));
  126 + } else if (json.isArray()){
  127 + List<TbMsg> res = new ArrayList<>(json.size());
  128 + json.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
  129 + return Futures.immediateFuture(res);
  130 + }
  131 + else{
123 132 log.warn("Wrong result type: {}", json.getNodeType());
124 133 return Futures.immediateFailedFuture(new ScriptException("Wrong result type: " + json.getNodeType()));
125   - } else {
126   - return Futures.immediateFuture(unbindMsg(json, msg));
127 134 }
128 135 }, MoreExecutors.directExecutor());
129 136 }
... ...
... ... @@ -20,13 +20,14 @@ import com.google.common.util.concurrent.ListenableFuture;
20 20 import org.thingsboard.server.common.msg.TbMsg;
21 21
22 22 import javax.script.ScriptException;
  23 +import java.util.List;
23 24 import java.util.Set;
24 25
25 26 public interface ScriptEngine {
26 27
27 28 TbMsg executeUpdate(TbMsg msg) throws ScriptException;
28 29
29   - ListenableFuture<TbMsg> executeUpdateAsync(TbMsg msg);
  30 + ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg);
30 31
31 32 TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException;
32 33
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.transform;
  17 +
  18 +import org.thingsboard.server.common.msg.queue.RuleEngineException;
  19 +import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  20 +
  21 +import java.util.concurrent.atomic.AtomicInteger;
  22 +
  23 +public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper {
  24 +
  25 + private final AtomicInteger tbMsgsCallbackCount;
  26 + private final TbMsgCallback callback;
  27 +
  28 + public MultipleTbMsgsCallbackWrapper(int tbMsgsCallbackCount, TbMsgCallback callback) {
  29 + this.tbMsgsCallbackCount = new AtomicInteger(tbMsgsCallbackCount);
  30 + this.callback = callback;
  31 + }
  32 +
  33 + @Override
  34 + public void onSuccess() {
  35 + if (tbMsgsCallbackCount.decrementAndGet() <= 0) {
  36 + callback.onSuccess();
  37 + }
  38 + }
  39 +
  40 + @Override
  41 + public void onFailure(Throwable t) {
  42 + callback.onFailure(new RuleEngineException(t.getMessage()));
  43 + }
  44 +}
  45 +
... ...
... ... @@ -17,16 +17,19 @@ package org.thingsboard.rule.engine.transform;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.thingsboard.rule.engine.api.util.TbNodeUtils;
21 20 import org.thingsboard.rule.engine.api.TbContext;
22 21 import org.thingsboard.rule.engine.api.TbNode;
23 22 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
24 23 import org.thingsboard.rule.engine.api.TbNodeException;
  24 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
25 25 import org.thingsboard.server.common.msg.TbMsg;
  26 +import org.thingsboard.server.common.msg.queue.RuleEngineException;
  27 +import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  28 +
  29 +import java.util.List;
26 30
27 31 import static org.thingsboard.common.util.DonAsynchron.withCallback;
28 32 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
29   -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
30 33
31 34 /**
32 35 * Created by ashvayka on 19.01.18.
... ... @@ -61,7 +64,30 @@ public abstract class TbAbstractTransformNode implements TbNode {
61 64 }
62 65 }
63 66
64   - protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg);
  67 + protected void transformSuccess(TbContext ctx, TbMsg msg, List<TbMsg> msgs) {
  68 + if (msgs != null && !msgs.isEmpty()) {
  69 + if (msgs.size() == 1) {
  70 + ctx.tellSuccess(msgs.get(0));
  71 + } else {
  72 + TbMsgCallbackWrapper wrapper = new MultipleTbMsgsCallbackWrapper(msgs.size(), new TbMsgCallback() {
  73 + @Override
  74 + public void onSuccess() {
  75 + ctx.ack(msg);
  76 + }
  77 +
  78 + @Override
  79 + public void onFailure(RuleEngineException e) {
  80 + ctx.tellFailure(msg, e);
  81 + }
  82 + });
  83 + msgs.forEach(newMsg -> ctx.enqueueForTellNext(newMsg, "Success", wrapper::onSuccess, wrapper::onFailure));
  84 + }
  85 + } else {
  86 + ctx.tellNext(msg, FAILURE);
  87 + }
  88 + }
  89 +
  90 + protected abstract ListenableFuture<List<TbMsg>> transform(TbContext ctx, TbMsg msg);
65 91
66 92 public void setConfig(TbTransformNodeConfiguration config) {
67 93 this.config = config;
... ...
... ... @@ -15,16 +15,15 @@
15 15 */
16 16 package org.thingsboard.rule.engine.transform;
17 17
18   -import com.google.common.base.Function;
19 18 import com.google.common.collect.Sets;
20 19 import com.google.common.util.concurrent.Futures;
21 20 import com.google.common.util.concurrent.ListenableFuture;
22 21 import lombok.extern.slf4j.Slf4j;
23   -import org.thingsboard.rule.engine.api.util.TbNodeUtils;
24 22 import org.thingsboard.rule.engine.api.RuleNode;
25 23 import org.thingsboard.rule.engine.api.TbContext;
26 24 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
27 25 import org.thingsboard.rule.engine.api.TbNodeException;
  26 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
28 27 import org.thingsboard.rule.engine.util.EntitiesAlarmOriginatorIdAsyncLoader;
29 28 import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
30 29 import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
... ... @@ -33,7 +32,9 @@ import org.thingsboard.server.common.data.id.EntityId;
33 32 import org.thingsboard.server.common.data.plugin.ComponentType;
34 33 import org.thingsboard.server.common.msg.TbMsg;
35 34
  35 +import java.util.Collections;
36 36 import java.util.HashSet;
  37 +import java.util.List;
37 38
38 39 @Slf4j
39 40 @RuleNode(
... ... @@ -65,13 +66,13 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
65 66 }
66 67
67 68 @Override
68   - protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
  69 + protected ListenableFuture<List<TbMsg>> transform(TbContext ctx, TbMsg msg) {
69 70 ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
70   - return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> {
  71 + return Futures.transform(newOriginator, n -> {
71 72 if (n == null || n.isNullUid()) {
72 73 return null;
73 74 }
74   - return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData());
  75 + return Collections.singletonList((ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData())));
75 76 }, ctx.getDbCallbackExecutor());
76 77 }
77 78
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.transform;
  17 +
  18 +public interface TbMsgCallbackWrapper {
  19 +
  20 + void onSuccess();
  21 +
  22 + void onFailure(Throwable t);
  23 +}
... ...
... ... @@ -16,13 +16,16 @@
16 16 package org.thingsboard.rule.engine.transform;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.rule.engine.api.RuleNode;
  20 +import org.thingsboard.rule.engine.api.ScriptEngine;
  21 +import org.thingsboard.rule.engine.api.TbContext;
  22 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  23 +import org.thingsboard.rule.engine.api.TbNodeException;
19 24 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
20   -import org.thingsboard.rule.engine.api.*;
21 25 import org.thingsboard.server.common.data.plugin.ComponentType;
22 26 import org.thingsboard.server.common.msg.TbMsg;
23 27
24   -import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
25   -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
  28 +import java.util.List;
26 29
27 30 @RuleNode(
28 31 type = ComponentType.TRANSFORMATION,
... ... @@ -51,7 +54,7 @@ public class TbTransformMsgNode extends TbAbstractTransformNode {
51 54 }
52 55
53 56 @Override
54   - protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
  57 + protected ListenableFuture<List<TbMsg>> transform(TbContext ctx, TbMsg msg) {
55 58 ctx.logJsEvalRequest();
56 59 return jsEngine.executeUpdateAsync(msg);
57 60 }
... ...
... ... @@ -38,6 +38,7 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
38 38 import org.thingsboard.server.common.msg.TbMsgMetaData;
39 39
40 40 import javax.script.ScriptException;
  41 +import java.util.Collections;
41 42 import java.util.concurrent.Callable;
42 43
43 44 import static org.junit.Assert.assertEquals;
... ... @@ -72,7 +73,7 @@ public class TbTransformMsgNodeTest {
72 73 TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON,rawJson, ruleChainId, ruleNodeId);
73 74 TbMsg transformedMsg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, "{new}", ruleChainId, ruleNodeId);
74 75 mockJsExecutor();
75   - when(scriptEngine.executeUpdateAsync(msg)).thenReturn(Futures.immediateFuture(transformedMsg));
  76 + when(scriptEngine.executeUpdateAsync(msg)).thenReturn(Futures.immediateFuture(Collections.singletonList(transformedMsg)));
76 77
77 78 node.onMsg(ctx, msg);
78 79 verify(ctx).getDbCallbackExecutor();
... ...