Showing
9 changed files
with
721 additions
and
0 deletions
@@ -24,6 +24,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine; | @@ -24,6 +24,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine; | ||
24 | import org.thingsboard.rule.engine.api.TbContext; | 24 | import org.thingsboard.rule.engine.api.TbContext; |
25 | import org.thingsboard.server.actors.ActorSystemContext; | 25 | import org.thingsboard.server.actors.ActorSystemContext; |
26 | import org.thingsboard.server.common.data.id.RuleNodeId; | 26 | import org.thingsboard.server.common.data.id.RuleNodeId; |
27 | +import org.thingsboard.server.common.data.id.TenantId; | ||
27 | import org.thingsboard.server.common.data.rule.RuleNode; | 28 | import org.thingsboard.server.common.data.rule.RuleNode; |
28 | import org.thingsboard.server.common.msg.TbMsg; | 29 | import org.thingsboard.server.common.msg.TbMsg; |
29 | import org.thingsboard.server.common.msg.cluster.ServerAddress; | 30 | import org.thingsboard.server.common.msg.cluster.ServerAddress; |
@@ -120,6 +121,11 @@ class DefaultTbContext implements TbContext { | @@ -120,6 +121,11 @@ class DefaultTbContext implements TbContext { | ||
120 | } | 121 | } |
121 | 122 | ||
122 | @Override | 123 | @Override |
124 | + public TenantId getTenantId() { | ||
125 | + return nodeCtx.getTenantId(); | ||
126 | + } | ||
127 | + | ||
128 | + @Override | ||
123 | public void tellNext(TbMsg msg, Set<String> relationTypes) { | 129 | public void tellNext(TbMsg msg, Set<String> relationTypes) { |
124 | relationTypes.forEach(type -> tellNext(msg, type)); | 130 | relationTypes.forEach(type -> tellNext(msg, type)); |
125 | } | 131 | } |
@@ -146,6 +146,17 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn | @@ -146,6 +146,17 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn | ||
146 | } | 146 | } |
147 | 147 | ||
148 | @Override | 148 | @Override |
149 | + public JsonNode executeJson(TbMsg msg) throws ScriptException { | ||
150 | + return executeScript(msg); | ||
151 | + } | ||
152 | + | ||
153 | + @Override | ||
154 | + public String executeToString(TbMsg msg) throws ScriptException { | ||
155 | + JsonNode result = executeScript(msg); | ||
156 | + return result.asText(); | ||
157 | + } | ||
158 | + | ||
159 | + @Override | ||
149 | public boolean executeFilter(TbMsg msg) throws ScriptException { | 160 | public boolean executeFilter(TbMsg msg) throws ScriptException { |
150 | JsonNode result = executeScript(msg); | 161 | JsonNode result = executeScript(msg); |
151 | if (!result.isBoolean()) { | 162 | if (!result.isBoolean()) { |
@@ -15,6 +15,7 @@ | @@ -15,6 +15,7 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.rule.engine.api; | 16 | package org.thingsboard.rule.engine.api; |
17 | 17 | ||
18 | +import com.fasterxml.jackson.databind.JsonNode; | ||
18 | import org.thingsboard.server.common.msg.TbMsg; | 19 | import org.thingsboard.server.common.msg.TbMsg; |
19 | 20 | ||
20 | import javax.script.ScriptException; | 21 | import javax.script.ScriptException; |
@@ -30,6 +31,10 @@ public interface ScriptEngine { | @@ -30,6 +31,10 @@ public interface ScriptEngine { | ||
30 | 31 | ||
31 | Set<String> executeSwitch(TbMsg msg) throws ScriptException; | 32 | Set<String> executeSwitch(TbMsg msg) throws ScriptException; |
32 | 33 | ||
34 | + JsonNode executeJson(TbMsg msg) throws ScriptException; | ||
35 | + | ||
36 | + String executeToString(TbMsg msg) throws ScriptException; | ||
37 | + | ||
33 | void destroy(); | 38 | void destroy(); |
34 | 39 | ||
35 | } | 40 | } |
@@ -16,6 +16,7 @@ | @@ -16,6 +16,7 @@ | ||
16 | package org.thingsboard.rule.engine.api; | 16 | package org.thingsboard.rule.engine.api; |
17 | 17 | ||
18 | import org.thingsboard.server.common.data.id.RuleNodeId; | 18 | import org.thingsboard.server.common.data.id.RuleNodeId; |
19 | +import org.thingsboard.server.common.data.id.TenantId; | ||
19 | import org.thingsboard.server.common.data.rule.RuleNode; | 20 | import org.thingsboard.server.common.data.rule.RuleNode; |
20 | import org.thingsboard.server.common.msg.TbMsg; | 21 | import org.thingsboard.server.common.msg.TbMsg; |
21 | import org.thingsboard.server.common.msg.cluster.ServerAddress; | 22 | import org.thingsboard.server.common.msg.cluster.ServerAddress; |
@@ -59,6 +60,8 @@ public interface TbContext { | @@ -59,6 +60,8 @@ public interface TbContext { | ||
59 | 60 | ||
60 | RuleNodeId getSelfId(); | 61 | RuleNodeId getSelfId(); |
61 | 62 | ||
63 | + TenantId getTenantId(); | ||
64 | + | ||
62 | AttributesService getAttributesService(); | 65 | AttributesService getAttributesService(); |
63 | 66 | ||
64 | CustomerService getCustomerService(); | 67 | CustomerService getCustomerService(); |
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2018 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.rule.engine.action; | ||
17 | + | ||
18 | +import com.datastax.driver.core.utils.UUIDs; | ||
19 | +import com.fasterxml.jackson.databind.JsonNode; | ||
20 | +import com.fasterxml.jackson.databind.ObjectMapper; | ||
21 | +import com.google.common.base.Function; | ||
22 | +import com.google.common.util.concurrent.AsyncFunction; | ||
23 | +import com.google.common.util.concurrent.Futures; | ||
24 | +import com.google.common.util.concurrent.ListenableFuture; | ||
25 | +import lombok.extern.slf4j.Slf4j; | ||
26 | +import org.thingsboard.rule.engine.TbNodeUtils; | ||
27 | +import org.thingsboard.rule.engine.api.*; | ||
28 | +import org.thingsboard.server.common.data.alarm.Alarm; | ||
29 | +import org.thingsboard.server.common.data.alarm.AlarmStatus; | ||
30 | +import org.thingsboard.server.common.data.id.TenantId; | ||
31 | +import org.thingsboard.server.common.data.plugin.ComponentType; | ||
32 | +import org.thingsboard.server.common.msg.TbMsg; | ||
33 | +import org.thingsboard.server.common.msg.TbMsgMetaData; | ||
34 | + | ||
35 | +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; | ||
36 | + | ||
37 | +@Slf4j | ||
38 | +@RuleNode( | ||
39 | + type = ComponentType.ACTION, | ||
40 | + name = "alarm", relationTypes = {"Created", "Updated", "Cleared", "False"}, | ||
41 | + configClazz = TbAlarmNodeConfiguration.class, | ||
42 | + nodeDescription = "Create/Update/Clear Alarm", | ||
43 | + nodeDetails = "isAlarm - JS function that verifies if Alarm should be CREATED for incoming message.\n" + | ||
44 | + "isCleared - JS function that verifies if Alarm should be CLEARED for incoming message.\n" + | ||
45 | + "Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" + | ||
46 | + "Node output:\n" + | ||
47 | + "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm/isClearedAlarm' " + | ||
48 | + "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" + | ||
49 | + "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>", | ||
50 | + uiResources = {"static/rulenode/rulenode-core-config.js"}) | ||
51 | + | ||
52 | +public class TbAlarmNode implements TbNode { | ||
53 | + | ||
54 | + static final String IS_NEW_ALARM = "isNewAlarm"; | ||
55 | + static final String IS_EXISTING_ALARM = "isExistingAlarm"; | ||
56 | + static final String IS_CLEARED_ALARM = "isClearedAlarm"; | ||
57 | + | ||
58 | + private final ObjectMapper mapper = new ObjectMapper(); | ||
59 | + | ||
60 | + private TbAlarmNodeConfiguration config; | ||
61 | + private ScriptEngine createJsEngine; | ||
62 | + private ScriptEngine clearJsEngine; | ||
63 | + private ScriptEngine buildDetailsJsEngine; | ||
64 | + | ||
65 | + @Override | ||
66 | + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { | ||
67 | + this.config = TbNodeUtils.convert(configuration, TbAlarmNodeConfiguration.class); | ||
68 | + this.createJsEngine = ctx.createJsScriptEngine(config.getCreateConditionJs(), "isAlarm"); | ||
69 | + this.clearJsEngine = ctx.createJsScriptEngine(config.getClearConditionJs(), "isCleared"); | ||
70 | + this.buildDetailsJsEngine = ctx.createJsScriptEngine(config.getAlarmDetailsBuildJs(), "Details"); | ||
71 | + } | ||
72 | + | ||
73 | + @Override | ||
74 | + public void onMsg(TbContext ctx, TbMsg msg) { | ||
75 | + ListeningExecutor jsExecutor = ctx.getJsExecutor(); | ||
76 | + | ||
77 | + ListenableFuture<Boolean> shouldCreate = jsExecutor.executeAsync(() -> createJsEngine.executeFilter(msg)); | ||
78 | + ListenableFuture<AlarmResult> transform = Futures.transform(shouldCreate, (AsyncFunction<Boolean, AlarmResult>) create -> { | ||
79 | + if (create) { | ||
80 | + return createOrUpdate(ctx, msg); | ||
81 | + } else { | ||
82 | + return checkForClearIfExist(ctx, msg); | ||
83 | + } | ||
84 | + }); | ||
85 | + | ||
86 | + withCallback(transform, | ||
87 | + alarmResult -> { | ||
88 | + if (alarmResult.alarm == null) { | ||
89 | + ctx.tellNext(msg, "False"); | ||
90 | + } else if (alarmResult.isCreated) { | ||
91 | + ctx.tellNext(toAlarmMsg(alarmResult, msg), "Created"); | ||
92 | + } else if (alarmResult.isUpdated) { | ||
93 | + ctx.tellNext(toAlarmMsg(alarmResult, msg), "Updated"); | ||
94 | + } else if (alarmResult.isCleared) { | ||
95 | + ctx.tellNext(toAlarmMsg(alarmResult, msg), "Cleared"); | ||
96 | + } | ||
97 | + }, | ||
98 | + t -> ctx.tellError(msg, t)); | ||
99 | + | ||
100 | + } | ||
101 | + | ||
102 | + private ListenableFuture<AlarmResult> createOrUpdate(TbContext ctx, TbMsg msg) { | ||
103 | + ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType()); | ||
104 | + return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> { | ||
105 | + if (a == null || a.getStatus().isCleared()) { | ||
106 | + return createNewAlarm(ctx, msg); | ||
107 | + } else { | ||
108 | + return updateAlarm(ctx, msg, a); | ||
109 | + } | ||
110 | + }); | ||
111 | + } | ||
112 | + | ||
113 | + private ListenableFuture<AlarmResult> checkForClearIfExist(TbContext ctx, TbMsg msg) { | ||
114 | + ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType()); | ||
115 | + return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> { | ||
116 | + if (a != null && !a.getStatus().isCleared()) { | ||
117 | + return clearAlarm(ctx, msg, a); | ||
118 | + } | ||
119 | + return Futures.immediateFuture(new AlarmResult(false, false, false, null)); | ||
120 | + }); | ||
121 | + } | ||
122 | + | ||
123 | + private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) { | ||
124 | + ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId())); | ||
125 | + ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm, (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm)); | ||
126 | + return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm)); | ||
127 | + } | ||
128 | + | ||
129 | + private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { | ||
130 | + ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> { | ||
131 | + alarm.setSeverity(config.getSeverity()); | ||
132 | + alarm.setPropagate(config.isPropagate()); | ||
133 | + alarm.setDetails(details); | ||
134 | + alarm.setEndTs(System.currentTimeMillis()); | ||
135 | + return ctx.getAlarmService().createOrUpdateAlarm(alarm); | ||
136 | + }); | ||
137 | + | ||
138 | + return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a)); | ||
139 | + } | ||
140 | + | ||
141 | + private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { | ||
142 | + ListenableFuture<Boolean> shouldClear = ctx.getJsExecutor().executeAsync(() -> clearJsEngine.executeFilter(msg)); | ||
143 | + return Futures.transform(shouldClear, (AsyncFunction<Boolean, AlarmResult>) clear -> { | ||
144 | + if (clear) { | ||
145 | + ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), System.currentTimeMillis()); | ||
146 | + return Futures.transform(clearFuture, (Function<Boolean, AlarmResult>) cleared -> { | ||
147 | + alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK); | ||
148 | + return new AlarmResult(false, false, true, alarm); | ||
149 | + }); | ||
150 | + } | ||
151 | + return Futures.immediateFuture(new AlarmResult(false, false, false, null)); | ||
152 | + }); | ||
153 | + } | ||
154 | + | ||
155 | + private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) { | ||
156 | + return Alarm.builder() | ||
157 | + .tenantId(tenantId) | ||
158 | + .originator(msg.getOriginator()) | ||
159 | + .status(AlarmStatus.ACTIVE_UNACK) | ||
160 | + .severity(config.getSeverity()) | ||
161 | + .propagate(config.isPropagate()) | ||
162 | + .type(config.getAlarmType()) | ||
163 | + //todo-vp: alarm date should be taken from Message or current Time should be used? | ||
164 | +// .startTs(System.currentTimeMillis()) | ||
165 | +// .endTs(System.currentTimeMillis()) | ||
166 | + .details(details) | ||
167 | + .build(); | ||
168 | + } | ||
169 | + | ||
170 | + private ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg) { | ||
171 | + return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg)); | ||
172 | + } | ||
173 | + | ||
174 | + private TbMsg toAlarmMsg(AlarmResult alarmResult, TbMsg originalMsg) { | ||
175 | + JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm); | ||
176 | + String data = jsonNodes.toString(); | ||
177 | + TbMsgMetaData metaData = originalMsg.getMetaData().copy(); | ||
178 | + if (alarmResult.isCreated) { | ||
179 | + metaData.putValue(IS_NEW_ALARM, Boolean.TRUE.toString()); | ||
180 | + } else if (alarmResult.isUpdated) { | ||
181 | + metaData.putValue(IS_EXISTING_ALARM, Boolean.TRUE.toString()); | ||
182 | + } else if (alarmResult.isCleared) { | ||
183 | + metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString()); | ||
184 | + } | ||
185 | + return new TbMsg(UUIDs.timeBased(), "ALARM", originalMsg.getOriginator(), metaData, data); | ||
186 | + } | ||
187 | + | ||
188 | + | ||
189 | + @Override | ||
190 | + public void destroy() { | ||
191 | + if (createJsEngine != null) { | ||
192 | + createJsEngine.destroy(); | ||
193 | + } | ||
194 | + if (clearJsEngine != null) { | ||
195 | + clearJsEngine.destroy(); | ||
196 | + } | ||
197 | + if (buildDetailsJsEngine != null) { | ||
198 | + buildDetailsJsEngine.destroy(); | ||
199 | + } | ||
200 | + } | ||
201 | + | ||
202 | + private static class AlarmResult { | ||
203 | + boolean isCreated; | ||
204 | + boolean isUpdated; | ||
205 | + boolean isCleared; | ||
206 | + Alarm alarm; | ||
207 | + | ||
208 | + AlarmResult(boolean isCreated, boolean isUpdated, boolean isCleared, Alarm alarm) { | ||
209 | + this.isCreated = isCreated; | ||
210 | + this.isUpdated = isUpdated; | ||
211 | + this.isCleared = isCleared; | ||
212 | + this.alarm = alarm; | ||
213 | + } | ||
214 | + } | ||
215 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2018 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.rule.engine.action; | ||
17 | + | ||
18 | +import lombok.Data; | ||
19 | +import org.thingsboard.rule.engine.api.NodeConfiguration; | ||
20 | +import org.thingsboard.server.common.data.alarm.AlarmSeverity; | ||
21 | + | ||
22 | +@Data | ||
23 | +public class TbAlarmNodeConfiguration implements NodeConfiguration { | ||
24 | + | ||
25 | + private String createConditionJs; | ||
26 | + private String clearConditionJs; | ||
27 | + private String alarmDetailsBuildJs; | ||
28 | + private String alarmType; | ||
29 | + private AlarmSeverity severity; | ||
30 | + private boolean propagate; | ||
31 | + | ||
32 | + | ||
33 | + @Override | ||
34 | + public TbAlarmNodeConfiguration defaultConfiguration() { | ||
35 | + TbAlarmNodeConfiguration configuration = new TbAlarmNodeConfiguration(); | ||
36 | + configuration.setCreateConditionJs("return 'incoming message = ' + msg + meta;"); | ||
37 | + configuration.setClearConditionJs("return 'incoming message = ' + msg + meta;"); | ||
38 | + configuration.setAlarmDetailsBuildJs("return 'incoming message = ' + msg + meta;"); | ||
39 | + configuration.setAlarmType("General Alarm"); | ||
40 | + configuration.setSeverity(AlarmSeverity.CRITICAL); | ||
41 | + configuration.setPropagate(false); | ||
42 | + return configuration; | ||
43 | + } | ||
44 | +} |
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2018 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.rule.engine.action; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.thingsboard.rule.engine.TbNodeUtils; | ||
20 | +import org.thingsboard.rule.engine.api.*; | ||
21 | +import org.thingsboard.server.common.data.plugin.ComponentType; | ||
22 | +import org.thingsboard.server.common.msg.TbMsg; | ||
23 | + | ||
24 | +import static org.thingsboard.rule.engine.DonAsynchron.withCallback; | ||
25 | + | ||
26 | +@Slf4j | ||
27 | +@RuleNode( | ||
28 | + type = ComponentType.ACTION, | ||
29 | + name = "log", | ||
30 | + configClazz = TbLogNodeConfiguration.class, | ||
31 | + nodeDescription = "Log incoming messages using JS script for transformation Message into String", | ||
32 | + nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " + | ||
33 | + "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" + | ||
34 | + "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>") | ||
35 | + | ||
36 | +public class TbLogNode implements TbNode { | ||
37 | + | ||
38 | + private TbLogNodeConfiguration config; | ||
39 | + private ScriptEngine jsEngine; | ||
40 | + | ||
41 | + @Override | ||
42 | + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { | ||
43 | + this.config = TbNodeUtils.convert(configuration, TbLogNodeConfiguration.class); | ||
44 | + this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "ToString"); | ||
45 | + } | ||
46 | + | ||
47 | + @Override | ||
48 | + public void onMsg(TbContext ctx, TbMsg msg) { | ||
49 | + ListeningExecutor jsExecutor = ctx.getJsExecutor(); | ||
50 | + withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)), | ||
51 | + toString -> { | ||
52 | + log.info(toString); | ||
53 | + ctx.tellNext(msg); | ||
54 | + }, | ||
55 | + t -> ctx.tellError(msg, t)); | ||
56 | + } | ||
57 | + | ||
58 | + @Override | ||
59 | + public void destroy() { | ||
60 | + if (jsEngine != null) { | ||
61 | + jsEngine.destroy(); | ||
62 | + } | ||
63 | + } | ||
64 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2018 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.rule.engine.action; | ||
17 | + | ||
18 | +import lombok.Data; | ||
19 | +import org.thingsboard.rule.engine.api.NodeConfiguration; | ||
20 | + | ||
21 | +@Data | ||
22 | +public class TbLogNodeConfiguration implements NodeConfiguration { | ||
23 | + | ||
24 | + private String jsScript; | ||
25 | + | ||
26 | + @Override | ||
27 | + public TbLogNodeConfiguration defaultConfiguration() { | ||
28 | + TbLogNodeConfiguration configuration = new TbLogNodeConfiguration(); | ||
29 | + configuration.setJsScript("return 'incoming message = ' + msg + meta;"); | ||
30 | + return configuration; | ||
31 | + } | ||
32 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2018 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.rule.engine.action; | ||
17 | + | ||
18 | +import com.datastax.driver.core.utils.UUIDs; | ||
19 | +import com.fasterxml.jackson.databind.ObjectMapper; | ||
20 | +import com.google.common.util.concurrent.Futures; | ||
21 | +import com.google.common.util.concurrent.ListenableFuture; | ||
22 | +import org.apache.commons.lang3.NotImplementedException; | ||
23 | +import org.junit.Test; | ||
24 | +import org.junit.runner.RunWith; | ||
25 | +import org.mockito.ArgumentCaptor; | ||
26 | +import org.mockito.Mock; | ||
27 | +import org.mockito.runners.MockitoJUnitRunner; | ||
28 | +import org.mockito.stubbing.Answer; | ||
29 | +import org.thingsboard.rule.engine.api.*; | ||
30 | +import org.thingsboard.server.common.data.alarm.Alarm; | ||
31 | +import org.thingsboard.server.common.data.id.DeviceId; | ||
32 | +import org.thingsboard.server.common.data.id.EntityId; | ||
33 | +import org.thingsboard.server.common.data.id.TenantId; | ||
34 | +import org.thingsboard.server.common.msg.TbMsg; | ||
35 | +import org.thingsboard.server.common.msg.TbMsgMetaData; | ||
36 | +import org.thingsboard.server.dao.alarm.AlarmService; | ||
37 | + | ||
38 | +import javax.script.ScriptException; | ||
39 | +import java.io.IOException; | ||
40 | +import java.util.concurrent.Callable; | ||
41 | + | ||
42 | +import static org.junit.Assert.*; | ||
43 | +import static org.mockito.Matchers.any; | ||
44 | +import static org.mockito.Mockito.*; | ||
45 | +import static org.thingsboard.rule.engine.action.TbAlarmNode.*; | ||
46 | +import static org.thingsboard.server.common.data.alarm.AlarmSeverity.CRITICAL; | ||
47 | +import static org.thingsboard.server.common.data.alarm.AlarmSeverity.WARNING; | ||
48 | +import static org.thingsboard.server.common.data.alarm.AlarmStatus.*; | ||
49 | + | ||
50 | +@RunWith(MockitoJUnitRunner.class) | ||
51 | +public class TbAlarmNodeTest { | ||
52 | + | ||
53 | + private TbAlarmNode node; | ||
54 | + | ||
55 | + @Mock | ||
56 | + private TbContext ctx; | ||
57 | + @Mock | ||
58 | + private ListeningExecutor executor; | ||
59 | + @Mock | ||
60 | + private AlarmService alarmService; | ||
61 | + | ||
62 | + @Mock | ||
63 | + private ScriptEngine createJs; | ||
64 | + @Mock | ||
65 | + private ScriptEngine clearJs; | ||
66 | + @Mock | ||
67 | + private ScriptEngine detailsJs; | ||
68 | + | ||
69 | + private EntityId originator = new DeviceId(UUIDs.timeBased()); | ||
70 | + private TenantId tenantId = new TenantId(UUIDs.timeBased()); | ||
71 | + private TbMsgMetaData metaData = new TbMsgMetaData(); | ||
72 | + private String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; | ||
73 | + | ||
74 | + @Test | ||
75 | + public void newAlarmCanBeCreated() throws ScriptException, IOException { | ||
76 | + initWithScript(); | ||
77 | + metaData.putValue("key", "value"); | ||
78 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
79 | + | ||
80 | + when(createJs.executeFilter(msg)).thenReturn(true); | ||
81 | + when(detailsJs.executeJson(msg)).thenReturn(null); | ||
82 | + when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null)); | ||
83 | + | ||
84 | + doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class)); | ||
85 | + | ||
86 | + node.onMsg(ctx, msg); | ||
87 | + | ||
88 | + ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); | ||
89 | + verify(ctx).tellNext(captor.capture(), eq("Created")); | ||
90 | + TbMsg actualMsg = captor.getValue(); | ||
91 | + | ||
92 | + assertEquals("ALARM", actualMsg.getType()); | ||
93 | + assertEquals(originator, actualMsg.getOriginator()); | ||
94 | + assertEquals("value", actualMsg.getMetaData().getValue("key")); | ||
95 | + assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM)); | ||
96 | + assertNotSame(metaData, actualMsg.getMetaData()); | ||
97 | + | ||
98 | + Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); | ||
99 | + Alarm expectedAlarm = Alarm.builder() | ||
100 | + .tenantId(tenantId) | ||
101 | + .originator(originator) | ||
102 | + .status(ACTIVE_UNACK) | ||
103 | + .severity(CRITICAL) | ||
104 | + .propagate(true) | ||
105 | + .type("SomeType") | ||
106 | + .details(null) | ||
107 | + .build(); | ||
108 | + | ||
109 | + assertEquals(expectedAlarm, actualAlarm); | ||
110 | + | ||
111 | + verify(executor, times(2)).executeAsync(any(Callable.class)); | ||
112 | + } | ||
113 | + | ||
114 | + @Test | ||
115 | + public void shouldCreateScriptThrowsException() throws ScriptException { | ||
116 | + initWithScript(); | ||
117 | + metaData.putValue("key", "value"); | ||
118 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
119 | + | ||
120 | + when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message")); | ||
121 | + | ||
122 | + node.onMsg(ctx, msg); | ||
123 | + | ||
124 | + verifyError(msg, "message", NotImplementedException.class); | ||
125 | + | ||
126 | + | ||
127 | + verify(ctx).createJsScriptEngine("CREATE", "isAlarm"); | ||
128 | + verify(ctx).createJsScriptEngine("CLEAR", "isCleared"); | ||
129 | + verify(ctx).createJsScriptEngine("DETAILS", "Details"); | ||
130 | + verify(ctx).getJsExecutor(); | ||
131 | + | ||
132 | + verifyNoMoreInteractions(ctx, alarmService, clearJs, detailsJs); | ||
133 | + } | ||
134 | + | ||
135 | + @Test | ||
136 | + public void buildDetailsThrowsException() throws ScriptException, IOException { | ||
137 | + initWithScript(); | ||
138 | + metaData.putValue("key", "value"); | ||
139 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
140 | + | ||
141 | + when(createJs.executeFilter(msg)).thenReturn(true); | ||
142 | + when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message")); | ||
143 | + when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null)); | ||
144 | + | ||
145 | + node.onMsg(ctx, msg); | ||
146 | + | ||
147 | + verifyError(msg, "message", NotImplementedException.class); | ||
148 | + | ||
149 | + verify(ctx).createJsScriptEngine("CREATE", "isAlarm"); | ||
150 | + verify(ctx).createJsScriptEngine("CLEAR", "isCleared"); | ||
151 | + verify(ctx).createJsScriptEngine("DETAILS", "Details"); | ||
152 | + verify(ctx, times(2)).getJsExecutor(); | ||
153 | + verify(ctx).getAlarmService(); | ||
154 | + verify(ctx).getTenantId(); | ||
155 | + verify(alarmService).findLatestByOriginatorAndType(tenantId, originator, "SomeType"); | ||
156 | + | ||
157 | + verifyNoMoreInteractions(ctx, alarmService, clearJs); | ||
158 | + } | ||
159 | + | ||
160 | + @Test | ||
161 | + public void ifAlarmClearedCreateNew() throws ScriptException, IOException { | ||
162 | + initWithScript(); | ||
163 | + metaData.putValue("key", "value"); | ||
164 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
165 | + | ||
166 | + Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build(); | ||
167 | + | ||
168 | + when(createJs.executeFilter(msg)).thenReturn(true); | ||
169 | + when(detailsJs.executeJson(msg)).thenReturn(null); | ||
170 | + when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(clearedAlarm)); | ||
171 | + | ||
172 | + doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class)); | ||
173 | + | ||
174 | + node.onMsg(ctx, msg); | ||
175 | + | ||
176 | + ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); | ||
177 | + verify(ctx).tellNext(captor.capture(), eq("Created")); | ||
178 | + TbMsg actualMsg = captor.getValue(); | ||
179 | + | ||
180 | + assertEquals("ALARM", actualMsg.getType()); | ||
181 | + assertEquals(originator, actualMsg.getOriginator()); | ||
182 | + assertEquals("value", actualMsg.getMetaData().getValue("key")); | ||
183 | + assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM)); | ||
184 | + assertNotSame(metaData, actualMsg.getMetaData()); | ||
185 | + | ||
186 | + Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); | ||
187 | + Alarm expectedAlarm = Alarm.builder() | ||
188 | + .tenantId(tenantId) | ||
189 | + .originator(originator) | ||
190 | + .status(ACTIVE_UNACK) | ||
191 | + .severity(CRITICAL) | ||
192 | + .propagate(true) | ||
193 | + .type("SomeType") | ||
194 | + .details(null) | ||
195 | + .build(); | ||
196 | + | ||
197 | + assertEquals(expectedAlarm, actualAlarm); | ||
198 | + | ||
199 | + verify(executor, times(2)).executeAsync(any(Callable.class)); | ||
200 | + } | ||
201 | + | ||
202 | + @Test | ||
203 | + public void alarmCanBeUpdated() throws ScriptException, IOException { | ||
204 | + initWithScript(); | ||
205 | + metaData.putValue("key", "value"); | ||
206 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
207 | + | ||
208 | + long oldEndDate = System.currentTimeMillis(); | ||
209 | + Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); | ||
210 | + | ||
211 | + when(createJs.executeFilter(msg)).thenReturn(true); | ||
212 | + when(clearJs.executeFilter(msg)).thenReturn(false); | ||
213 | + when(detailsJs.executeJson(msg)).thenReturn(null); | ||
214 | + when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); | ||
215 | + | ||
216 | + doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm); | ||
217 | + | ||
218 | + node.onMsg(ctx, msg); | ||
219 | + | ||
220 | + ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); | ||
221 | + verify(ctx).tellNext(captor.capture(), eq("Updated")); | ||
222 | + TbMsg actualMsg = captor.getValue(); | ||
223 | + | ||
224 | + assertEquals("ALARM", actualMsg.getType()); | ||
225 | + assertEquals(originator, actualMsg.getOriginator()); | ||
226 | + assertEquals("value", actualMsg.getMetaData().getValue("key")); | ||
227 | + assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM)); | ||
228 | + assertNotSame(metaData, actualMsg.getMetaData()); | ||
229 | + | ||
230 | + Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); | ||
231 | + assertTrue(activeAlarm.getEndTs() > oldEndDate); | ||
232 | + Alarm expectedAlarm = Alarm.builder() | ||
233 | + .tenantId(tenantId) | ||
234 | + .originator(originator) | ||
235 | + .status(ACTIVE_UNACK) | ||
236 | + .severity(CRITICAL) | ||
237 | + .propagate(true) | ||
238 | + .type("SomeType") | ||
239 | + .details(null) | ||
240 | + .endTs(activeAlarm.getEndTs()) | ||
241 | + .build(); | ||
242 | + | ||
243 | + assertEquals(expectedAlarm, actualAlarm); | ||
244 | + | ||
245 | + verify(executor, times(2)).executeAsync(any(Callable.class)); | ||
246 | + } | ||
247 | + | ||
248 | + @Test | ||
249 | + public void alarmCanBeCleared() throws ScriptException, IOException { | ||
250 | + initWithScript(); | ||
251 | + metaData.putValue("key", "value"); | ||
252 | + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); | ||
253 | + | ||
254 | + long oldEndDate = System.currentTimeMillis(); | ||
255 | + Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); | ||
256 | + | ||
257 | + when(createJs.executeFilter(msg)).thenReturn(false); | ||
258 | + when(clearJs.executeFilter(msg)).thenReturn(true); | ||
259 | +// when(detailsJs.executeJson(msg)).thenReturn(null); | ||
260 | + when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); | ||
261 | + when(alarmService.clearAlarm(eq(activeAlarm.getId()), anyLong())).thenReturn(Futures.immediateFuture(true)); | ||
262 | +// doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm); | ||
263 | + | ||
264 | + node.onMsg(ctx, msg); | ||
265 | + | ||
266 | + ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); | ||
267 | + verify(ctx).tellNext(captor.capture(), eq("Cleared")); | ||
268 | + TbMsg actualMsg = captor.getValue(); | ||
269 | + | ||
270 | + assertEquals("ALARM", actualMsg.getType()); | ||
271 | + assertEquals(originator, actualMsg.getOriginator()); | ||
272 | + assertEquals("value", actualMsg.getMetaData().getValue("key")); | ||
273 | + assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM)); | ||
274 | + assertNotSame(metaData, actualMsg.getMetaData()); | ||
275 | + | ||
276 | + Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); | ||
277 | + Alarm expectedAlarm = Alarm.builder() | ||
278 | + .tenantId(tenantId) | ||
279 | + .originator(originator) | ||
280 | + .status(CLEARED_UNACK) | ||
281 | + .severity(WARNING) | ||
282 | + .propagate(false) | ||
283 | + .type("SomeType") | ||
284 | + .details(null) | ||
285 | + .endTs(oldEndDate) | ||
286 | + .build(); | ||
287 | + | ||
288 | + assertEquals(expectedAlarm, actualAlarm); | ||
289 | + } | ||
290 | + | ||
291 | + private void initWithScript() { | ||
292 | + try { | ||
293 | + TbAlarmNodeConfiguration config = new TbAlarmNodeConfiguration(); | ||
294 | + config.setPropagate(true); | ||
295 | + config.setSeverity(CRITICAL); | ||
296 | + config.setAlarmType("SomeType"); | ||
297 | + config.setCreateConditionJs("CREATE"); | ||
298 | + config.setClearConditionJs("CLEAR"); | ||
299 | + config.setAlarmDetailsBuildJs("DETAILS"); | ||
300 | + ObjectMapper mapper = new ObjectMapper(); | ||
301 | + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); | ||
302 | + | ||
303 | + when(ctx.createJsScriptEngine("CREATE", "isAlarm")).thenReturn(createJs); | ||
304 | + when(ctx.createJsScriptEngine("CLEAR", "isCleared")).thenReturn(clearJs); | ||
305 | + when(ctx.createJsScriptEngine("DETAILS", "Details")).thenReturn(detailsJs); | ||
306 | + | ||
307 | + when(ctx.getTenantId()).thenReturn(tenantId); | ||
308 | + when(ctx.getJsExecutor()).thenReturn(executor); | ||
309 | + when(ctx.getAlarmService()).thenReturn(alarmService); | ||
310 | + | ||
311 | + mockJsExecutor(); | ||
312 | + | ||
313 | + node = new TbAlarmNode(); | ||
314 | + node.init(ctx, nodeConfiguration); | ||
315 | + } catch (TbNodeException ex) { | ||
316 | + throw new IllegalStateException(ex); | ||
317 | + } | ||
318 | + } | ||
319 | + | ||
320 | + private void mockJsExecutor() { | ||
321 | + when(ctx.getJsExecutor()).thenReturn(executor); | ||
322 | + doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> { | ||
323 | + try { | ||
324 | + Callable task = (Callable) (invocationOnMock.getArguments())[0]; | ||
325 | + return Futures.immediateFuture((Boolean) task.call()); | ||
326 | + } catch (Throwable th) { | ||
327 | + return Futures.immediateFailedFuture(th); | ||
328 | + } | ||
329 | + }).when(executor).executeAsync(any(Callable.class)); | ||
330 | + } | ||
331 | + | ||
332 | + private void verifyError(TbMsg msg, String message, Class expectedClass) { | ||
333 | + ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); | ||
334 | + verify(ctx).tellError(same(msg), captor.capture()); | ||
335 | + | ||
336 | + Throwable value = captor.getValue(); | ||
337 | + assertEquals(expectedClass, value.getClass()); | ||
338 | + assertEquals(message, value.getMessage()); | ||
339 | + } | ||
340 | + | ||
341 | +} |