Commit 4d012ac62d01766ee455e98d6651cffb94e700de

Authored by Andrii Shvaika
1 parent 856cc372

DeviceProfile rule node draft

Showing 26 changed files with 1215 additions and 79 deletions
... ... @@ -32,6 +32,7 @@ import org.springframework.data.redis.core.RedisTemplate;
32 32 import org.springframework.scheduling.annotation.Scheduled;
33 33 import org.springframework.stereotype.Component;
34 34 import org.thingsboard.rule.engine.api.MailService;
  35 +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
35 36 import org.thingsboard.server.actors.service.ActorService;
36 37 import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
37 38 import org.thingsboard.server.common.data.DataConstants;
... ... @@ -69,6 +70,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
69 70 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
70 71 import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
71 72 import org.thingsboard.server.service.mail.MailExecutorService;
  73 +import org.thingsboard.server.service.profile.TbDeviceProfileCache;
72 74 import org.thingsboard.server.service.queue.TbClusterService;
73 75 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
74 76 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
... ... @@ -126,6 +128,10 @@ public class ActorSystemContext {
126 128
127 129 @Autowired
128 130 @Getter
  131 + private TbDeviceProfileCache deviceProfileCache;
  132 +
  133 + @Autowired
  134 + @Getter
129 135 private AssetService assetService;
130 136
131 137 @Autowired
... ... @@ -530,4 +536,5 @@ public class ActorSystemContext {
530 536 log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
531 537 getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
532 538 }
  539 +
533 540 }
... ...
... ... @@ -23,6 +23,7 @@ import org.springframework.data.redis.core.RedisTemplate;
23 23 import org.thingsboard.common.util.ListeningExecutor;
24 24 import org.thingsboard.rule.engine.api.MailService;
25 25 import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
  26 +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
26 27 import org.thingsboard.rule.engine.api.RuleEngineRpcService;
27 28 import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
28 29 import org.thingsboard.rule.engine.api.ScriptEngine;
... ... @@ -391,6 +392,11 @@ class DefaultTbContext implements TbContext {
391 392 }
392 393
393 394 @Override
  395 + public RuleEngineDeviceProfileCache getDeviceProfileCache() {
  396 + return mainCtx.getDeviceProfileCache();
  397 + }
  398 +
  399 + @Override
394 400 public EventLoopGroup getSharedEventLoop() {
395 401 return mainCtx.getSharedEventLoopGroupService().getSharedEventLoopGroup();
396 402 }
... ...
... ... @@ -15,16 +15,12 @@
15 15 */
16 16 package org.thingsboard.server.service.profile;
17 17
  18 +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
18 19 import org.thingsboard.server.common.data.DeviceProfile;
19 20 import org.thingsboard.server.common.data.id.DeviceId;
20 21 import org.thingsboard.server.common.data.id.DeviceProfileId;
21   -import org.thingsboard.server.common.data.id.TenantId;
22 22
23   -public interface TbDeviceProfileCache {
24   -
25   - DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId);
26   -
27   - DeviceProfile get(TenantId tenantId, DeviceId deviceId);
  23 +public interface TbDeviceProfileCache extends RuleEngineDeviceProfileCache {
28 24
29 25 void put(DeviceProfile profile);
30 26
... ...
... ... @@ -159,13 +159,10 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
159 159 @PreDestroy
160 160 public void destroy() {
161 161 stopped = true;
162   -
163 162 stopMainConsumers();
164   -
165 163 if (nfConsumer != null) {
166 164 nfConsumer.unsubscribe();
167 165 }
168   -
169 166 if (consumersExecutor != null) {
170 167 consumersExecutor.shutdownNow();
171 168 }
... ...
... ... @@ -28,6 +28,10 @@ public class DataConstants {
28 28 public static final String SERVER_SCOPE = "SERVER_SCOPE";
29 29 public static final String SHARED_SCOPE = "SHARED_SCOPE";
30 30 public static final String LATEST_TS = "LATEST_TS";
  31 + public static final String IS_NEW_ALARM = "isNewAlarm";
  32 + public static final String IS_EXISTING_ALARM = "isExistingAlarm";
  33 + public static final String IS_SEVERITY_UPDATED_ALARM = "isSeverityUpdated";
  34 + public static final String IS_CLEARED_ALARM = "isClearedAlarm";
31 35
32 36 public static final String[] allScopes() {
33 37 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
... ...
... ... @@ -26,7 +26,6 @@ import org.apache.commons.lang3.math.NumberUtils;
26 26 import org.springframework.util.StringUtils;
27 27 import org.thingsboard.server.common.data.DataConstants;
28 28 import org.thingsboard.server.common.data.id.DeviceId;
29   -import org.thingsboard.server.common.data.kv.AttributeKey;
30 29 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
31 30 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
32 31 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
... ... @@ -53,6 +52,7 @@ import java.util.List;
53 52 import java.util.Map;
54 53 import java.util.Map.Entry;
55 54 import java.util.Set;
  55 +import java.util.TreeMap;
56 56 import java.util.function.Consumer;
57 57 import java.util.stream.Collectors;
58 58
... ... @@ -454,11 +454,20 @@ public class JsonConverter {
454 454 }
455 455
456 456 public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException {
457   - Map<Long, List<KvEntry>> result = new HashMap<>();
  457 + return convertToTelemetry(jsonElement, systemTs, false);
  458 + }
  459 +
  460 + public static Map<Long, List<KvEntry>> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException {
  461 + return convertToTelemetry(jsonElement, systemTs, true);
  462 + }
  463 +
  464 + public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws JsonSyntaxException {
  465 + Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>();
458 466 convertToTelemetry(jsonElement, systemTs, result, null);
459 467 return result;
460 468 }
461 469
  470 +
462 471 private static void parseObject(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo) {
463 472 if (jo.has("ts") && jo.has("values")) {
464 473 parseWithTs(result, jo);
... ...
... ... @@ -54,24 +54,24 @@ public class EntityKeyMapping {
54 54 private static final Map<String, String> entityFieldColumnMap = new HashMap<>();
55 55 private static final Map<EntityType, Map<String, String>> aliases = new HashMap<>();
56 56
57   - private static final String CREATED_TIME = "createdTime";
58   - private static final String ENTITY_TYPE = "entityType";
59   - private static final String NAME = "name";
60   - private static final String TYPE = "type";
61   - private static final String LABEL = "label";
62   - private static final String FIRST_NAME = "firstName";
63   - private static final String LAST_NAME = "lastName";
64   - private static final String EMAIL = "email";
65   - private static final String TITLE = "title";
66   - private static final String REGION = "region";
67   - private static final String COUNTRY = "country";
68   - private static final String STATE = "state";
69   - private static final String CITY = "city";
70   - private static final String ADDRESS = "address";
71   - private static final String ADDRESS_2 = "address2";
72   - private static final String ZIP = "zip";
73   - private static final String PHONE = "phone";
74   - private static final String ADDITIONAL_INFO = "additionalInfo";
  57 + public static final String CREATED_TIME = "createdTime";
  58 + public static final String ENTITY_TYPE = "entityType";
  59 + public static final String NAME = "name";
  60 + public static final String TYPE = "type";
  61 + public static final String LABEL = "label";
  62 + public static final String FIRST_NAME = "firstName";
  63 + public static final String LAST_NAME = "lastName";
  64 + public static final String EMAIL = "email";
  65 + public static final String TITLE = "title";
  66 + public static final String REGION = "region";
  67 + public static final String COUNTRY = "country";
  68 + public static final String STATE = "state";
  69 + public static final String CITY = "city";
  70 + public static final String ADDRESS = "address";
  71 + public static final String ADDRESS_2 = "address2";
  72 + public static final String ZIP = "zip";
  73 + public static final String PHONE = "phone";
  74 + public static final String ADDITIONAL_INFO = "additionalInfo";
75 75
76 76 public static final List<String> typedEntityFields = Arrays.asList(CREATED_TIME, ENTITY_TYPE, NAME, TYPE, ADDITIONAL_INFO);
77 77 public static final List<String> widgetEntityFields = Arrays.asList(CREATED_TIME, ENTITY_TYPE, NAME);
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.util.mapping;
18 18 import com.fasterxml.jackson.core.JsonProcessingException;
19 19 import com.fasterxml.jackson.databind.JsonNode;
20 20 import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import org.thingsboard.server.common.data.alarm.Alarm;
21 22
22 23 import java.io.IOException;
23 24
... ... @@ -69,4 +70,8 @@ public class JacksonUtil {
69 70 public static <T> T clone(T value) {
70 71 return fromString(toString(value), (Class<T>) value.getClass());
71 72 }
  73 +
  74 + public static JsonNode valueToTree(Alarm alarm) {
  75 + return OBJECT_MAPPER.valueToTree(alarm);
  76 + }
72 77 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.api;
  17 +
  18 +import org.thingsboard.server.common.data.DeviceProfile;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
  20 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +
  23 +/**
  24 + * Created by ashvayka on 02.04.18.
  25 + */
  26 +public interface RuleEngineDeviceProfileCache {
  27 +
  28 + DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId);
  29 +
  30 + DeviceProfile get(TenantId tenantId, DeviceId deviceId);
  31 +
  32 +}
... ...
... ... @@ -183,6 +183,8 @@ public interface TbContext {
183 183
184 184 EntityViewService getEntityViewService();
185 185
  186 + RuleEngineDeviceProfileCache getDeviceProfileCache();
  187 +
186 188 ListeningExecutor getJsExecutor();
187 189
188 190 ListeningExecutor getMailExecutor();
... ...
... ... @@ -25,9 +25,10 @@ import org.thingsboard.rule.engine.api.TbContext;
25 25 import org.thingsboard.rule.engine.api.TbNode;
26 26 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
27 27 import org.thingsboard.rule.engine.api.TbNodeException;
28   -import org.thingsboard.server.common.data.alarm.Alarm;
  28 +import org.thingsboard.server.common.data.DataConstants;
29 29 import org.thingsboard.server.common.msg.TbMsg;
30 30 import org.thingsboard.server.common.msg.TbMsgMetaData;
  31 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
31 32
32 33 import static org.thingsboard.common.util.DonAsynchron.withCallback;
33 34
... ... @@ -37,10 +38,6 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
37 38
38 39 static final String PREV_ALARM_DETAILS = "prevAlarmDetails";
39 40
40   - static final String IS_NEW_ALARM = "isNewAlarm";
41   - static final String IS_EXISTING_ALARM = "isExistingAlarm";
42   - static final String IS_CLEARED_ALARM = "isClearedAlarm";
43   -
44 41 private final ObjectMapper mapper = new ObjectMapper();
45 42
46 43 protected C config;
... ... @@ -75,7 +72,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
75 72 t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
76 73 }
77 74
78   - protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg);
  75 + protected abstract ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg);
79 76
80 77 protected ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg, JsonNode previousDetails) {
81 78 try {
... ... @@ -91,21 +88,20 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
91 88 }
92 89 }
93 90
94   - private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) {
95   - JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm);
  91 + public static TbMsg toAlarmMsg(TbContext ctx, TbAlarmResult alarmResult, TbMsg originalMsg) {
  92 + JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.alarm);
96 93 String data = jsonNodes.toString();
97 94 TbMsgMetaData metaData = originalMsg.getMetaData().copy();
98 95 if (alarmResult.isCreated) {
99   - metaData.putValue(IS_NEW_ALARM, Boolean.TRUE.toString());
  96 + metaData.putValue(DataConstants.IS_NEW_ALARM, Boolean.TRUE.toString());
100 97 } else if (alarmResult.isUpdated) {
101   - metaData.putValue(IS_EXISTING_ALARM, Boolean.TRUE.toString());
  98 + metaData.putValue(DataConstants.IS_EXISTING_ALARM, Boolean.TRUE.toString());
102 99 } else if (alarmResult.isCleared) {
103   - metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
  100 + metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
104 101 }
105 102 return ctx.transformMsg(originalMsg, "ALARM", originalMsg.getOriginator(), metaData, data);
106 103 }
107 104
108   -
109 105 @Override
110 106 public void destroy() {
111 107 if (buildDetailsJsEngine != null) {
... ... @@ -113,17 +109,4 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
113 109 }
114 110 }
115 111
116   - protected static class AlarmResult {
117   - boolean isCreated;
118   - boolean isUpdated;
119   - boolean isCleared;
120   - Alarm alarm;
121   -
122   - AlarmResult(boolean isCreated, boolean isUpdated, boolean isCleared, Alarm alarm) {
123   - this.isCreated = isCreated;
124   - this.isUpdated = isUpdated;
125   - this.isCleared = isCleared;
126   - this.alarm = alarm;
127   - }
128   - }
129 112 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.action;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.alarm.Alarm;
  21 +
  22 +@Data
  23 +@AllArgsConstructor
  24 +public class TbAlarmResult {
  25 + boolean isCreated;
  26 + boolean isUpdated;
  27 + boolean isSeverityUpdated;
  28 + boolean isCleared;
  29 + Alarm alarm;
  30 +
  31 + public TbAlarmResult(boolean isCreated, boolean isUpdated, boolean isCleared, Alarm alarm) {
  32 + this.isCreated = isCreated;
  33 + this.isUpdated = isUpdated;
  34 + this.isCleared = isCleared;
  35 + this.alarm = alarm;
  36 + }
  37 +}
... ...
... ... @@ -55,7 +55,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
55 55 }
56 56
57 57 @Override
58   - protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
  58 + protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
59 59 String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
60 60 ListenableFuture<Alarm> alarmFuture;
61 61 if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
... ... @@ -67,11 +67,11 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
67 67 if (a != null && !a.getStatus().isCleared()) {
68 68 return clearAlarm(ctx, msg, a);
69 69 }
70   - return Futures.immediateFuture(new AlarmResult(false, false, false, null));
  70 + return Futures.immediateFuture(new TbAlarmResult(false, false, false, null));
71 71 }, ctx.getDbCallbackExecutor());
72 72 }
73 73
74   - private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
  74 + private ListenableFuture<TbAlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
75 75 ctx.logJsEvalRequest();
76 76 ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails());
77 77 return Futures.transformAsync(asyncDetails, details -> {
... ... @@ -86,7 +86,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
86 86 alarm.setClearTs(savedAlarm.getClearTs());
87 87 }
88 88 alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK);
89   - return Futures.immediateFuture(new AlarmResult(false, false, true, alarm));
  89 + return Futures.immediateFuture(new TbAlarmResult(false, false, true, alarm));
90 90 }, ctx.getDbCallbackExecutor());
91 91 }, ctx.getDbCallbackExecutor());
92 92 }, ctx.getDbCallbackExecutor());
... ...
... ... @@ -65,7 +65,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
65 65 }
66 66
67 67 @Override
68   - protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
  68 + protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
69 69 String alarmType;
70 70 final Alarm msgAlarm;
71 71
... ... @@ -106,7 +106,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
106 106 return msgAlarm;
107 107 }
108 108
109   - private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) {
  109 + private ListenableFuture<TbAlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) {
110 110 ListenableFuture<Alarm> asyncAlarm;
111 111 if (msgAlarm != null) {
112 112 asyncAlarm = Futures.immediateFuture(msgAlarm);
... ... @@ -120,10 +120,10 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
120 120 }
121 121 ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
122 122 alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
123   - return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm), MoreExecutors.directExecutor());
  123 + return Futures.transform(asyncCreated, alarm -> new TbAlarmResult(true, false, false, alarm), MoreExecutors.directExecutor());
124 124 }
125 125
126   - private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) {
  126 + private ListenableFuture<TbAlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) {
127 127 ctx.logJsEvalRequest();
128 128 ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg, existingAlarm.getDetails()), (Function<JsonNode, Alarm>) details -> {
129 129 ctx.logJsEvalResponse();
... ... @@ -141,7 +141,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
141 141 return ctx.getAlarmService().createOrUpdateAlarm(existingAlarm);
142 142 }, ctx.getDbCallbackExecutor());
143 143
144   - return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a), MoreExecutors.directExecutor());
  144 + return Futures.transform(asyncUpdated, a -> new TbAlarmResult(false, true, false, a), MoreExecutors.directExecutor());
145 145 }
146 146
147 147 private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +public enum AlarmStateUpdateResult {
  19 +
  20 + NONE, CREATED, UPDATED, SEVERITY_UPDATED, CLEARED;
  21 +
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.Setter;
  20 +import org.thingsboard.server.common.data.query.EntityKey;
  21 +
  22 +import java.util.Map;
  23 +import java.util.Set;
  24 +import java.util.concurrent.ConcurrentHashMap;
  25 +
  26 +public class DeviceDataSnapshot {
  27 +
  28 + private volatile boolean ready;
  29 + @Getter @Setter
  30 + private long ts;
  31 + private final Map<EntityKey, EntityKeyValue> values = new ConcurrentHashMap<>();
  32 +
  33 + public DeviceDataSnapshot(Set<EntityKey> entityKeySet) {
  34 + entityKeySet.forEach(key -> values.put(key, new EntityKeyValue()));
  35 + this.ready = false;
  36 + }
  37 +
  38 + void putValue(EntityKey key, EntityKeyValue value) {
  39 + values.put(key, value);
  40 + }
  41 +
  42 + EntityKeyValue getValue(EntityKey key) {
  43 + return values.get(key);
  44 + }
  45 +
  46 + boolean isReady() {
  47 + return ready;
  48 + }
  49 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import lombok.Data;
  20 +import org.thingsboard.rule.engine.action.TbAlarmResult;
  21 +import org.thingsboard.rule.engine.api.TbContext;
  22 +import org.thingsboard.server.common.data.DataConstants;
  23 +import org.thingsboard.server.common.data.alarm.Alarm;
  24 +import org.thingsboard.server.common.data.alarm.AlarmSeverity;
  25 +import org.thingsboard.server.common.data.device.profile.AlarmCondition;
  26 +import org.thingsboard.server.common.data.device.profile.AlarmRule;
  27 +import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
  28 +import org.thingsboard.server.common.data.id.EntityId;
  29 +import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
  30 +import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
  31 +import org.thingsboard.server.common.data.query.KeyFilter;
  32 +import org.thingsboard.server.common.data.query.KeyFilterPredicate;
  33 +import org.thingsboard.server.common.data.query.NumericFilterPredicate;
  34 +import org.thingsboard.server.common.data.query.StringFilterPredicate;
  35 +import org.thingsboard.server.common.msg.TbMsg;
  36 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  37 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  38 +
  39 +import java.util.Comparator;
  40 +import java.util.Map;
  41 +import java.util.TreeMap;
  42 +
  43 +@Data
  44 +class DeviceProfileAlarmState {
  45 +
  46 + private final EntityId originator;
  47 + private final DeviceProfileAlarm alarmDefinition;
  48 + private volatile Map<AlarmSeverity, AlarmRule> createRulesSortedBySeverityDesc;
  49 + private volatile Alarm currentAlarm;
  50 +
  51 + public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) {
  52 + this.originator = originator;
  53 + this.alarmDefinition = alarmDefinition;
  54 + this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal));
  55 + this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules());
  56 + }
  57 +
  58 + public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) {
  59 + AlarmSeverity resultSeverity = null;
  60 + for (Map.Entry<AlarmSeverity, AlarmRule> kv : createRulesSortedBySeverityDesc.entrySet()) {
  61 + AlarmRule alarmRule = kv.getValue();
  62 + if (eval(alarmRule.getCondition(), data)) {
  63 + resultSeverity = kv.getKey();
  64 + break;
  65 + }
  66 + }
  67 + if (resultSeverity != null) {
  68 + pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity), msg);
  69 + } else if (currentAlarm != null) {
  70 + AlarmRule clearRule = alarmDefinition.getClearRule();
  71 + if (eval(clearRule.getCondition(), data)) {
  72 + pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm), msg);
  73 + currentAlarm = null;
  74 + }
  75 + }
  76 + }
  77 +
  78 + public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, TbMsg originalMsg) {
  79 + JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm());
  80 + String data = jsonNodes.toString();
  81 + TbMsgMetaData metaData = originalMsg.getMetaData().copy();
  82 + String relationType;
  83 + if (alarmResult.isCreated()) {
  84 + relationType = "Alarm Created";
  85 + metaData.putValue(DataConstants.IS_NEW_ALARM, Boolean.TRUE.toString());
  86 + } else if (alarmResult.isUpdated()) {
  87 + relationType = "Alarm Updated";
  88 + metaData.putValue(DataConstants.IS_EXISTING_ALARM, Boolean.TRUE.toString());
  89 + } else if (alarmResult.isSeverityUpdated()) {
  90 + relationType = "Alarm Severity Updated";
  91 + metaData.putValue(DataConstants.IS_EXISTING_ALARM, Boolean.TRUE.toString());
  92 + metaData.putValue(DataConstants.IS_SEVERITY_UPDATED_ALARM, Boolean.TRUE.toString());
  93 + } else {
  94 + relationType = "Alarm Cleared";
  95 + metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
  96 + }
  97 + TbMsg newMsg = ctx.newMsg(originalMsg.getQueueName(), "ALARM", originalMsg.getOriginator(), metaData, data);
  98 + ctx.tellNext(newMsg, relationType);
  99 + }
  100 +
  101 + private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) {
  102 + if (currentAlarm != null) {
  103 + currentAlarm.setEndTs(System.currentTimeMillis());
  104 + AlarmSeverity oldSeverity = currentAlarm.getSeverity();
  105 + if (!oldSeverity.equals(severity)) {
  106 + currentAlarm.setSeverity(severity);
  107 + currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
  108 + return new TbAlarmResult(false, false, true, false, currentAlarm);
  109 + } else {
  110 + currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
  111 + return new TbAlarmResult(false, true, false, false, currentAlarm);
  112 + }
  113 + } else {
  114 + currentAlarm = new Alarm();
  115 + currentAlarm.setSeverity(severity);
  116 + currentAlarm.setStartTs(System.currentTimeMillis());
  117 + currentAlarm.setEndTs(currentAlarm.getStartTs());
  118 + currentAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.createObjectNode());
  119 + currentAlarm.setOriginator(originator);
  120 + currentAlarm.setTenantId(ctx.getTenantId());
  121 + currentAlarm.setPropagate(alarmDefinition.isPropagate());
  122 + if (alarmDefinition.getPropagateRelationTypes() != null) {
  123 + currentAlarm.setPropagateRelationTypes(alarmDefinition.getPropagateRelationTypes());
  124 + }
  125 + currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
  126 + boolean updated = currentAlarm.getStartTs() != currentAlarm.getEndTs();
  127 + return new TbAlarmResult(!updated, updated, false, false, currentAlarm);
  128 + }
  129 + }
  130 +
  131 + private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) {
  132 + boolean eval = true;
  133 + for (KeyFilter keyFilter : condition.getCondition()) {
  134 + EntityKeyValue value = data.getValue(keyFilter.getKey());
  135 + if (value == null) {
  136 + return false;
  137 + }
  138 + eval = eval && eval(value, keyFilter.getPredicate());
  139 + }
  140 + //TODO: use condition duration;
  141 + return eval;
  142 + }
  143 +
  144 + private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) {
  145 + switch (predicate.getType()) {
  146 + case STRING:
  147 + return evalStrPredicate(value, (StringFilterPredicate) predicate);
  148 + case NUMERIC:
  149 + return evalNumPredicate(value, (NumericFilterPredicate) predicate);
  150 + case COMPLEX:
  151 + return evalComplexPredicate(value, (ComplexFilterPredicate) predicate);
  152 + case BOOLEAN:
  153 + return evalBoolPredicate(value, (BooleanFilterPredicate) predicate);
  154 + default:
  155 + return false;
  156 + }
  157 + }
  158 +
  159 + private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) {
  160 + switch (predicate.getOperation()) {
  161 + case OR:
  162 + for (KeyFilterPredicate kfp : predicate.getPredicates()) {
  163 + if (eval(ekv, kfp)) {
  164 + return true;
  165 + }
  166 + }
  167 + return false;
  168 + case AND:
  169 + for (KeyFilterPredicate kfp : predicate.getPredicates()) {
  170 + if (!eval(ekv, kfp)) {
  171 + return false;
  172 + }
  173 + }
  174 + return true;
  175 + default:
  176 + throw new RuntimeException("Operation not supported: " + predicate.getOperation());
  177 + }
  178 + }
  179 +
  180 +
  181 + private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) {
  182 + Boolean value;
  183 + switch (ekv.getDataType()) {
  184 + case LONG:
  185 + value = ekv.getLngValue() > 0;
  186 + break;
  187 + case DOUBLE:
  188 + value = ekv.getDblValue() > 0;
  189 + break;
  190 + case BOOLEAN:
  191 + value = ekv.getBoolValue();
  192 + break;
  193 + case STRING:
  194 + try {
  195 + value = Boolean.parseBoolean(ekv.getStrValue());
  196 + break;
  197 + } catch (RuntimeException e) {
  198 + return false;
  199 + }
  200 + case JSON:
  201 + try {
  202 + value = Boolean.parseBoolean(ekv.getJsonValue());
  203 + break;
  204 + } catch (RuntimeException e) {
  205 + return false;
  206 + }
  207 + default:
  208 + return false;
  209 + }
  210 + if (value == null) {
  211 + return false;
  212 + }
  213 + switch (predicate.getOperation()) {
  214 + case EQUAL:
  215 + return value.equals(predicate.getValue().getDefaultValue());
  216 + case NOT_EQUAL:
  217 + return !value.equals(predicate.getValue().getDefaultValue());
  218 + default:
  219 + throw new RuntimeException("Operation not supported: " + predicate.getOperation());
  220 + }
  221 + }
  222 +
  223 + private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) {
  224 + Double value;
  225 + switch (ekv.getDataType()) {
  226 + case LONG:
  227 + value = ekv.getLngValue().doubleValue();
  228 + break;
  229 + case DOUBLE:
  230 + value = ekv.getDblValue();
  231 + break;
  232 + case BOOLEAN:
  233 + value = ekv.getBoolValue() ? 1.0 : 0.0;
  234 + break;
  235 + case STRING:
  236 + try {
  237 + value = Double.parseDouble(ekv.getStrValue());
  238 + break;
  239 + } catch (RuntimeException e) {
  240 + return false;
  241 + }
  242 + case JSON:
  243 + try {
  244 + value = Double.parseDouble(ekv.getJsonValue());
  245 + break;
  246 + } catch (RuntimeException e) {
  247 + return false;
  248 + }
  249 + default:
  250 + return false;
  251 + }
  252 + if (value == null) {
  253 + return false;
  254 + }
  255 +
  256 + Double predicateValue = predicate.getValue().getDefaultValue();
  257 + switch (predicate.getOperation()) {
  258 + case NOT_EQUAL:
  259 + return !value.equals(predicateValue);
  260 + case EQUAL:
  261 + return value.equals(predicateValue);
  262 + case GREATER:
  263 + return value > predicateValue;
  264 + case GREATER_OR_EQUAL:
  265 + return value >= predicateValue;
  266 + case LESS:
  267 + return value < predicateValue;
  268 + case LESS_OR_EQUAL:
  269 + return value <= predicateValue;
  270 + default:
  271 + throw new RuntimeException("Operation not supported: " + predicate.getOperation());
  272 + }
  273 + }
  274 +
  275 + private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) {
  276 + String val;
  277 + String predicateValue;
  278 + if (predicate.isIgnoreCase()) {
  279 + val = ekv.getStrValue().toLowerCase();
  280 + predicateValue = predicate.getValue().getDefaultValue().toLowerCase();
  281 + } else {
  282 + val = ekv.getStrValue();
  283 + predicateValue = predicate.getValue().getDefaultValue();
  284 + }
  285 + switch (predicate.getOperation()) {
  286 + case CONTAINS:
  287 + return val.contains(predicateValue);
  288 + case EQUAL:
  289 + return val.equals(predicateValue);
  290 + case STARTS_WITH:
  291 + return val.startsWith(predicateValue);
  292 + case ENDS_WITH:
  293 + return val.endsWith(predicateValue);
  294 + case NOT_EQUAL:
  295 + return !val.equals(predicateValue);
  296 + case NOT_CONTAINS:
  297 + return !val.contains(predicateValue);
  298 + default:
  299 + throw new RuntimeException("Operation not supported: " + predicate.getOperation());
  300 + }
  301 + }
  302 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import lombok.AccessLevel;
  19 +import lombok.Getter;
  20 +import org.thingsboard.server.common.data.DeviceProfile;
  21 +import org.thingsboard.server.common.data.device.profile.AlarmRule;
  22 +import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
  23 +import org.thingsboard.server.common.data.query.EntityKey;
  24 +import org.thingsboard.server.common.data.query.KeyFilter;
  25 +
  26 +import java.util.List;
  27 +import java.util.Set;
  28 +import java.util.concurrent.ConcurrentHashMap;
  29 +import java.util.concurrent.CopyOnWriteArrayList;
  30 +
  31 +
  32 +class DeviceProfileState {
  33 +
  34 + private DeviceProfile deviceProfile;
  35 + @Getter(AccessLevel.PACKAGE)
  36 + private final List<DeviceProfileAlarm> alarmSettings = new CopyOnWriteArrayList<>();
  37 + @Getter(AccessLevel.PACKAGE)
  38 + private final Set<EntityKey> entityKeys = ConcurrentHashMap.newKeySet();
  39 +
  40 + DeviceProfileState(DeviceProfile deviceProfile) {
  41 + updateDeviceProfile(deviceProfile);
  42 + }
  43 +
  44 + void updateDeviceProfile(DeviceProfile deviceProfile) {
  45 + this.deviceProfile = deviceProfile;
  46 + alarmSettings.clear();
  47 + if (deviceProfile.getProfileData().getAlarms() != null) {
  48 + alarmSettings.addAll(deviceProfile.getProfileData().getAlarms());
  49 + for (DeviceProfileAlarm alarm : deviceProfile.getProfileData().getAlarms()) {
  50 + for (AlarmRule alarmRule : alarm.getCreateRules().values()) {
  51 + for (KeyFilter keyFilter : alarmRule.getCondition().getCondition()) {
  52 + entityKeys.add(keyFilter.getKey());
  53 + }
  54 + }
  55 + }
  56 + }
  57 + }
  58 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import com.google.gson.JsonParser;
  19 +import org.thingsboard.rule.engine.api.TbContext;
  20 +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
  21 +import org.thingsboard.server.common.data.DataConstants;
  22 +import org.thingsboard.server.common.data.Device;
  23 +import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
  24 +import org.thingsboard.server.common.data.id.DeviceId;
  25 +import org.thingsboard.server.common.data.id.EntityId;
  26 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  27 +import org.thingsboard.server.common.data.kv.KvEntry;
  28 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  29 +import org.thingsboard.server.common.data.query.EntityKey;
  30 +import org.thingsboard.server.common.data.query.EntityKeyType;
  31 +import org.thingsboard.server.common.msg.TbMsg;
  32 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  33 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  34 +import org.thingsboard.server.dao.sql.query.EntityKeyMapping;
  35 +
  36 +import java.util.HashSet;
  37 +import java.util.List;
  38 +import java.util.Map;
  39 +import java.util.Set;
  40 +import java.util.concurrent.ConcurrentHashMap;
  41 +import java.util.concurrent.ConcurrentMap;
  42 +import java.util.concurrent.ExecutionException;
  43 +
  44 +class DeviceState {
  45 +
  46 + private DeviceProfileState deviceProfile;
  47 + private DeviceDataSnapshot latestValues;
  48 + private final ConcurrentMap<String, DeviceProfileAlarmState> alarmStates = new ConcurrentHashMap<>();
  49 +
  50 + public DeviceState(DeviceProfileState deviceProfile) {
  51 + this.deviceProfile = deviceProfile;
  52 + }
  53 +
  54 + public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
  55 + if (latestValues == null) {
  56 + latestValues = fetchLatestValues(ctx, msg.getOriginator());
  57 + }
  58 + if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
  59 + processTelemetry(ctx, msg);
  60 + } else {
  61 + ctx.tellSuccess(msg);
  62 + }
  63 + }
  64 +
  65 + private void processTelemetry(TbContext ctx, TbMsg msg) {
  66 + Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg));
  67 + tsKvMap.forEach((ts, data) -> {
  68 + latestValues = merge(latestValues, ts, data);
  69 + for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
  70 + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(msg.getOriginator(), alarm));
  71 + alarmState.process(ctx, msg, latestValues);
  72 + }
  73 + });
  74 + ctx.tellSuccess(msg);
  75 + }
  76 +
  77 + private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Long ts, List<KvEntry> data) {
  78 + latestValues.setTs(ts);
  79 + for (KvEntry entry : data) {
  80 + latestValues.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry));
  81 + }
  82 + return latestValues;
  83 + }
  84 +
  85 + private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
  86 + DeviceDataSnapshot result = new DeviceDataSnapshot(deviceProfile.getEntityKeys());
  87 +
  88 + Set<String> serverAttributeKeys = new HashSet<>();
  89 + Set<String> clientAttributeKeys = new HashSet<>();
  90 + Set<String> sharedAttributeKeys = new HashSet<>();
  91 + Set<String> commonAttributeKeys = new HashSet<>();
  92 + Set<String> latestTsKeys = new HashSet<>();
  93 +
  94 + Device device = null;
  95 + for (EntityKey entityKey : deviceProfile.getEntityKeys()) {
  96 + String key = entityKey.getKey();
  97 + switch (entityKey.getType()) {
  98 + case SERVER_ATTRIBUTE:
  99 + serverAttributeKeys.add(key);
  100 + break;
  101 + case CLIENT_ATTRIBUTE:
  102 + clientAttributeKeys.add(key);
  103 + break;
  104 + case SHARED_ATTRIBUTE:
  105 + sharedAttributeKeys.add(key);
  106 + break;
  107 + case ATTRIBUTE:
  108 + serverAttributeKeys.add(key);
  109 + clientAttributeKeys.add(key);
  110 + sharedAttributeKeys.add(key);
  111 + commonAttributeKeys.add(key);
  112 + break;
  113 + case TIME_SERIES:
  114 + latestTsKeys.add(key);
  115 + break;
  116 + case ENTITY_FIELD:
  117 + if (device == null) {
  118 + device = ctx.getDeviceService().findDeviceById(ctx.getTenantId(), new DeviceId(originator.getId()));
  119 + }
  120 + if (device != null) {
  121 + switch (key) {
  122 + case EntityKeyMapping.NAME:
  123 + result.putValue(entityKey, EntityKeyValue.fromString(device.getName()));
  124 + break;
  125 + case EntityKeyMapping.TYPE:
  126 + result.putValue(entityKey, EntityKeyValue.fromString(device.getType()));
  127 + break;
  128 + case EntityKeyMapping.CREATED_TIME:
  129 + result.putValue(entityKey, EntityKeyValue.fromLong(device.getCreatedTime()));
  130 + break;
  131 + case EntityKeyMapping.LABEL:
  132 + result.putValue(entityKey, EntityKeyValue.fromString(device.getLabel()));
  133 + break;
  134 + }
  135 + }
  136 + break;
  137 + }
  138 + }
  139 +
  140 + if (!latestTsKeys.isEmpty()) {
  141 + List<TsKvEntry> data = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), originator, latestTsKeys).get();
  142 + for (TsKvEntry entry : data) {
  143 + result.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry));
  144 + }
  145 + }
  146 + if (!clientAttributeKeys.isEmpty()) {
  147 + addToSnapshot(result, commonAttributeKeys,
  148 + ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.CLIENT_SCOPE, clientAttributeKeys).get());
  149 + }
  150 + if (!sharedAttributeKeys.isEmpty()) {
  151 + addToSnapshot(result, commonAttributeKeys,
  152 + ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.SHARED_SCOPE, sharedAttributeKeys).get());
  153 + }
  154 + if (!serverAttributeKeys.isEmpty()) {
  155 + addToSnapshot(result, commonAttributeKeys,
  156 + ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.SERVER_SCOPE, serverAttributeKeys).get());
  157 + }
  158 +
  159 + return result;
  160 + }
  161 +
  162 + private void addToSnapshot(DeviceDataSnapshot snapshot, Set<String> commonAttributeKeys, List<AttributeKvEntry> data) {
  163 + for (AttributeKvEntry entry : data) {
  164 + EntityKeyValue value = toEntityValue(entry);
  165 + snapshot.putValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, entry.getKey()), value);
  166 + if (commonAttributeKeys.contains(entry.getKey())) {
  167 + snapshot.putValue(new EntityKey(EntityKeyType.ATTRIBUTE, entry.getKey()), value);
  168 + }
  169 + }
  170 + }
  171 +
  172 + private EntityKeyValue toEntityValue(KvEntry entry) {
  173 + switch (entry.getDataType()) {
  174 + case STRING:
  175 + return EntityKeyValue.fromString(entry.getStrValue().get());
  176 + case LONG:
  177 + return EntityKeyValue.fromLong(entry.getLongValue().get());
  178 + case DOUBLE:
  179 + return EntityKeyValue.fromDouble(entry.getDoubleValue().get());
  180 + case BOOLEAN:
  181 + return EntityKeyValue.fromBool(entry.getBooleanValue().get());
  182 + case JSON:
  183 + return EntityKeyValue.fromJson(entry.getJsonValue().get());
  184 + default:
  185 + throw new RuntimeException("Can't parse entry: " + entry.getDataType());
  186 + }
  187 + }
  188 +
  189 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +public class EntityKeyState {
  19 +
  20 +
  21 +
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.data.kv.DataType;
  20 +
  21 +class EntityKeyValue {
  22 +
  23 + @Getter
  24 + private DataType dataType;
  25 + private Long lngValue;
  26 + private Double dblValue;
  27 + private Boolean boolValue;
  28 + private String strValue;
  29 +
  30 + public Long getLngValue() {
  31 + return dataType == DataType.LONG ? lngValue : null;
  32 + }
  33 +
  34 + public void setLngValue(Long lngValue) {
  35 + this.dataType = DataType.LONG;
  36 + this.lngValue = lngValue;
  37 + }
  38 +
  39 + public Double getDblValue() {
  40 + return dataType == DataType.DOUBLE ? dblValue : null;
  41 + }
  42 +
  43 + public void setDblValue(Double dblValue) {
  44 + this.dataType = DataType.DOUBLE;
  45 + this.dblValue = dblValue;
  46 + }
  47 +
  48 + public Boolean getBoolValue() {
  49 + return dataType == DataType.BOOLEAN ? boolValue : null;
  50 + }
  51 +
  52 + public void setBoolValue(Boolean boolValue) {
  53 + this.dataType = DataType.BOOLEAN;
  54 + this.boolValue = boolValue;
  55 + }
  56 +
  57 + public String getStrValue() {
  58 + return dataType == DataType.STRING ? strValue : null;
  59 + }
  60 +
  61 + public void setStrValue(String strValue) {
  62 + this.dataType = DataType.STRING;
  63 + this.strValue = strValue;
  64 + }
  65 +
  66 + public void setJsonValue(String jsonValue) {
  67 + this.dataType = DataType.JSON;
  68 + this.strValue = jsonValue;
  69 + }
  70 +
  71 + public String getJsonValue() {
  72 + return dataType == DataType.JSON ? strValue : null;
  73 + }
  74 +
  75 + boolean isSet() {
  76 + return dataType != null;
  77 + }
  78 +
  79 + static EntityKeyValue fromString(String s) {
  80 + EntityKeyValue result = new EntityKeyValue();
  81 + result.setStrValue(s);
  82 + return result;
  83 + }
  84 +
  85 + static EntityKeyValue fromBool(boolean b) {
  86 + EntityKeyValue result = new EntityKeyValue();
  87 + result.setBoolValue(b);
  88 + return result;
  89 + }
  90 +
  91 + static EntityKeyValue fromLong(long l) {
  92 + EntityKeyValue result = new EntityKeyValue();
  93 + result.setLngValue(l);
  94 + return result;
  95 + }
  96 +
  97 + static EntityKeyValue fromDouble(double d) {
  98 + EntityKeyValue result = new EntityKeyValue();
  99 + result.setDblValue(d);
  100 + return result;
  101 + }
  102 +
  103 + static EntityKeyValue fromJson(String s) {
  104 + EntityKeyValue result = new EntityKeyValue();
  105 + result.setJsonValue(s);
  106 + return result;
  107 + }
  108 +
  109 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.commons.lang3.BooleanUtils;
  20 +import org.apache.kafka.clients.producer.KafkaProducer;
  21 +import org.apache.kafka.clients.producer.Producer;
  22 +import org.apache.kafka.clients.producer.ProducerConfig;
  23 +import org.apache.kafka.clients.producer.ProducerRecord;
  24 +import org.apache.kafka.clients.producer.RecordMetadata;
  25 +import org.apache.kafka.common.header.Headers;
  26 +import org.apache.kafka.common.header.internals.RecordHeader;
  27 +import org.apache.kafka.common.header.internals.RecordHeaders;
  28 +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
  29 +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
  30 +import org.thingsboard.rule.engine.api.RuleNode;
  31 +import org.thingsboard.rule.engine.api.TbContext;
  32 +import org.thingsboard.rule.engine.api.TbNode;
  33 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  34 +import org.thingsboard.rule.engine.api.TbNodeException;
  35 +import org.thingsboard.rule.engine.api.TbRelationTypes;
  36 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  37 +import org.thingsboard.rule.engine.kafka.TbKafkaNodeConfiguration;
  38 +import org.thingsboard.server.common.data.DeviceProfile;
  39 +import org.thingsboard.server.common.data.EntityType;
  40 +import org.thingsboard.server.common.data.id.DeviceId;
  41 +import org.thingsboard.server.common.data.id.EntityId;
  42 +import org.thingsboard.server.common.data.plugin.ComponentType;
  43 +import org.thingsboard.server.common.msg.TbMsg;
  44 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  45 +
  46 +import java.nio.charset.Charset;
  47 +import java.nio.charset.StandardCharsets;
  48 +import java.util.HashMap;
  49 +import java.util.Map;
  50 +import java.util.Properties;
  51 +import java.util.concurrent.ConcurrentHashMap;
  52 +import java.util.concurrent.ExecutionException;
  53 +
  54 +@Slf4j
  55 +@RuleNode(
  56 + type = ComponentType.ACTION,
  57 + name = "device profile",
  58 + customRelations = true,
  59 + relationTypes = {"Alarm Created", "Alarm Updated", "Alarm Severity Updated", "Alarm Cleared", "Success", "Failure"},
  60 + configClazz = EmptyNodeConfiguration.class,
  61 + nodeDescription = "Process device messages based on device profile settings",
  62 + nodeDetails = "Create and clear alarms based on alarm rules defined in device profile. Generates ",
  63 + uiResources = {"static/rulenode/rulenode-core-config.js"},
  64 + configDirective = "tbNodeEmptyConfig"
  65 +)
  66 +public class TbDeviceProfileNode implements TbNode {
  67 +
  68 + private RuleEngineDeviceProfileCache cache;
  69 + private Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
  70 +
  71 + @Override
  72 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  73 + cache = ctx.getDeviceProfileCache();
  74 + }
  75 +
  76 + /**
  77 + * TODO:
  78 + * 1. Duration in the alarm conditions;
  79 + * 2. Update of the Profile (rules);
  80 + * 3. Update of the Device attributes (client, server and shared);
  81 + * 4. Dynamic values evaluation;
  82 + */
  83 +
  84 + @Override
  85 + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
  86 + EntityType originatorType = msg.getOriginator().getEntityType();
  87 + if (EntityType.DEVICE.equals(originatorType)) {
  88 + DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
  89 + DeviceState deviceState = getOrCreateDeviceState(ctx, msg, deviceId);
  90 + if (deviceState != null) {
  91 + deviceState.process(ctx, msg);
  92 + } else {
  93 + ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
  94 + }
  95 + } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
  96 + //TODO: check that the profile rule set was changed. If yes - invalidate the rules.
  97 + ctx.tellSuccess(msg);
  98 + } else {
  99 + ctx.tellSuccess(msg);
  100 + }
  101 + }
  102 +
  103 + private DeviceState getOrCreateDeviceState(TbContext ctx, TbMsg msg, DeviceId deviceId) {
  104 + DeviceState deviceState = deviceStates.get(deviceId);
  105 + if (deviceState == null) {
  106 + DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
  107 + if (deviceProfile != null) {
  108 + deviceState = new DeviceState(new DeviceProfileState(deviceProfile));
  109 + deviceStates.put(deviceId, deviceState);
  110 + }
  111 + }
  112 + return deviceState;
  113 + }
  114 +
  115 + @Override
  116 + public void destroy() {
  117 +
  118 + }
  119 +
  120 +}
... ...
... ... @@ -62,16 +62,7 @@ public class TbMsgTimeseriesNode implements TbNode {
62 62 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
63 63 return;
64 64 }
65   - long ts = -1;
66   - String tsStr = msg.getMetaData().getValue("ts");
67   - if (!StringUtils.isEmpty(tsStr)) {
68   - try {
69   - ts = Long.parseLong(tsStr);
70   - } catch (NumberFormatException e) {
71   - }
72   - } else {
73   - ts = msg.getTs();
74   - }
  65 + long ts = getTs(msg);
75 66 String src = msg.getData();
76 67 Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
77 68 if (tsKvMap.isEmpty()) {
... ... @@ -89,6 +80,20 @@ public class TbMsgTimeseriesNode implements TbNode {
89 80 ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
90 81 }
91 82
  83 + public static long getTs(TbMsg msg) {
  84 + long ts = -1;
  85 + String tsStr = msg.getMetaData().getValue("ts");
  86 + if (!StringUtils.isEmpty(tsStr)) {
  87 + try {
  88 + ts = Long.parseLong(tsStr);
  89 + } catch (NumberFormatException e) {
  90 + }
  91 + } else {
  92 + ts = msg.getTs();
  93 + }
  94 + return ts;
  95 + }
  96 +
92 97 @Override
93 98 public void destroy() {
94 99 }
... ...
... ... @@ -45,8 +45,6 @@ import org.thingsboard.server.common.data.id.TenantId;
45 45 import org.thingsboard.server.common.msg.TbMsg;
46 46 import org.thingsboard.server.common.msg.TbMsgDataType;
47 47 import org.thingsboard.server.common.msg.TbMsgMetaData;
48   -import org.thingsboard.server.dao.alarm.AlarmOperationResult;
49   -import org.thingsboard.server.dao.alarm.AlarmService;
50 48
51 49 import javax.script.ScriptException;
52 50 import java.io.IOException;
... ... @@ -65,9 +63,9 @@ import static org.mockito.Mockito.times;
65 63 import static org.mockito.Mockito.verify;
66 64 import static org.mockito.Mockito.verifyNoMoreInteractions;
67 65 import static org.mockito.Mockito.when;
68   -import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_CLEARED_ALARM;
69   -import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_EXISTING_ALARM;
70   -import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_NEW_ALARM;
  66 +import static org.thingsboard.server.common.data.DataConstants.IS_CLEARED_ALARM;
  67 +import static org.thingsboard.server.common.data.DataConstants.IS_EXISTING_ALARM;
  68 +import static org.thingsboard.server.common.data.DataConstants.IS_NEW_ALARM;
71 69 import static org.thingsboard.server.common.data.alarm.AlarmSeverity.CRITICAL;
72 70 import static org.thingsboard.server.common.data.alarm.AlarmSeverity.WARNING;
73 71 import static org.thingsboard.server.common.data.alarm.AlarmStatus.ACTIVE_UNACK;
... ...
... ... @@ -24,6 +24,7 @@ import org.junit.runner.RunWith;
24 24 import org.mockito.ArgumentCaptor;
25 25 import org.mockito.Matchers;
26 26 import org.mockito.Mock;
  27 +import org.mockito.Mockito;
27 28 import org.mockito.runners.MockitoJUnitRunner;
28 29 import org.mockito.stubbing.Answer;
29 30 import org.thingsboard.common.util.ListeningExecutor;
... ... @@ -90,7 +91,7 @@ public class TbJsFilterNodeTest {
90 91 public void metadataConditionCanBeTrue() throws TbNodeException, ScriptException {
91 92 initWithScript();
92 93 TbMsgMetaData metaData = new TbMsgMetaData();
93   - TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
  94 + TbMsg msg = TbMsg.newMsg("USER", null, metaData, TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
94 95 mockJsExecutor();
95 96 when(scriptEngine.executeFilterAsync(msg)).thenReturn(Futures.immediateFuture(true));
96 97
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.profile;
  17 +
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import org.junit.Test;
  22 +import org.junit.runner.RunWith;
  23 +import org.mockito.AdditionalAnswers;
  24 +import org.mockito.Mock;
  25 +import org.mockito.Mockito;
  26 +import org.mockito.invocation.InvocationOnMock;
  27 +import org.mockito.runners.MockitoJUnitRunner;
  28 +import org.mockito.stubbing.Answer;
  29 +import org.springframework.util.StringUtils;
  30 +import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
  31 +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
  32 +import org.thingsboard.rule.engine.api.TbContext;
  33 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  34 +import org.thingsboard.rule.engine.api.TbNodeException;
  35 +import org.thingsboard.server.common.data.DeviceProfile;
  36 +import org.thingsboard.server.common.data.alarm.Alarm;
  37 +import org.thingsboard.server.common.data.alarm.AlarmSeverity;
  38 +import org.thingsboard.server.common.data.device.profile.AlarmCondition;
  39 +import org.thingsboard.server.common.data.device.profile.AlarmRule;
  40 +import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
  41 +import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
  42 +import org.thingsboard.server.common.data.id.DeviceId;
  43 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  44 +import org.thingsboard.server.common.data.id.TenantId;
  45 +import org.thingsboard.server.common.data.query.EntityKey;
  46 +import org.thingsboard.server.common.data.query.EntityKeyType;
  47 +import org.thingsboard.server.common.data.query.EntityKeyValueType;
  48 +import org.thingsboard.server.common.data.query.FilterPredicateValue;
  49 +import org.thingsboard.server.common.data.query.KeyFilter;
  50 +import org.thingsboard.server.common.data.query.NumericFilterPredicate;
  51 +import org.thingsboard.server.common.msg.TbMsg;
  52 +import org.thingsboard.server.common.msg.TbMsgDataType;
  53 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  54 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  55 +import org.thingsboard.server.dao.alarm.AlarmService;
  56 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  57 +
  58 +import java.util.Collections;
  59 +import java.util.UUID;
  60 +
  61 +import static org.mockito.Mockito.verify;
  62 +
  63 +@RunWith(MockitoJUnitRunner.class)
  64 +public class TbDeviceProfileNodeTest {
  65 +
  66 + private static final ObjectMapper mapper = new ObjectMapper();
  67 +
  68 + private TbDeviceProfileNode node;
  69 +
  70 + @Mock
  71 + private TbContext ctx;
  72 + @Mock
  73 + private RuleEngineDeviceProfileCache cache;
  74 + @Mock
  75 + private TimeseriesService timeseriesService;
  76 + @Mock
  77 + private RuleEngineAlarmService alarmService;
  78 +
  79 + private TenantId tenantId = new TenantId(UUID.randomUUID());
  80 + private DeviceId deviceId = new DeviceId(UUID.randomUUID());
  81 + private DeviceProfileId deviceProfileId = new DeviceProfileId(UUID.randomUUID());
  82 +
  83 + @Test
  84 + public void testRandomMessageType() throws Exception {
  85 + init();
  86 +
  87 + DeviceProfile deviceProfile = new DeviceProfile();
  88 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  89 + deviceProfileData.setAlarms(Collections.emptyList());
  90 + deviceProfile.setProfileData(deviceProfileData);
  91 +
  92 + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
  93 + ObjectNode data = mapper.createObjectNode();
  94 + data.put("temperature", 42);
  95 + TbMsg msg = TbMsg.newMsg("123456789", deviceId, new TbMsgMetaData(),
  96 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  97 + node.onMsg(ctx, msg);
  98 + verify(ctx).tellSuccess(msg);
  99 + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
  100 + }
  101 +
  102 + @Test
  103 + public void testEmptyProfile() throws Exception {
  104 + init();
  105 +
  106 + DeviceProfile deviceProfile = new DeviceProfile();
  107 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  108 + deviceProfileData.setAlarms(Collections.emptyList());
  109 + deviceProfile.setProfileData(deviceProfileData);
  110 +
  111 + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
  112 + ObjectNode data = mapper.createObjectNode();
  113 + data.put("temperature", 42);
  114 + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
  115 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  116 + node.onMsg(ctx, msg);
  117 + verify(ctx).tellSuccess(msg);
  118 + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
  119 + }
  120 +
  121 + @Test
  122 + public void testAlarmCreate() throws Exception {
  123 + init();
  124 +
  125 + DeviceProfile deviceProfile = new DeviceProfile();
  126 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  127 + KeyFilter highTempFilter = new KeyFilter();
  128 + highTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
  129 + highTempFilter.setValueType(EntityKeyValueType.NUMERIC);
  130 + NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate();
  131 + highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
  132 + highTemperaturePredicate.setValue(new FilterPredicateValue<>(30.0));
  133 + highTempFilter.setPredicate(highTemperaturePredicate);
  134 + AlarmCondition alarmCondition = new AlarmCondition();
  135 + alarmCondition.setCondition(Collections.singletonList(highTempFilter));
  136 + AlarmRule alarmRule = new AlarmRule();
  137 + alarmRule.setCondition(alarmCondition);
  138 + DeviceProfileAlarm dpa = new DeviceProfileAlarm();
  139 + dpa.setId("highTemperatureAlarmID");
  140 + dpa.setCreateRules(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule));
  141 + deviceProfileData.setAlarms(Collections.singletonList(dpa));
  142 + deviceProfile.setProfileData(deviceProfileData);
  143 +
  144 + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
  145 + Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
  146 + .thenReturn(Futures.immediateFuture(Collections.emptyList()));
  147 + Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any())).thenAnswer(AdditionalAnswers.returnsFirstArg());
  148 +
  149 + TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
  150 + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg);
  151 +
  152 + ObjectNode data = mapper.createObjectNode();
  153 + data.put("temperature", 42);
  154 + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
  155 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  156 + node.onMsg(ctx, msg);
  157 + verify(ctx).tellSuccess(msg);
  158 + verify(ctx).tellNext(theMsg, "Alarm Created");
  159 + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
  160 +
  161 + TbMsg theMsg2 = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "2");
  162 + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg2);
  163 +
  164 +
  165 + TbMsg msg2 = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
  166 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  167 + node.onMsg(ctx, msg2);
  168 + verify(ctx).tellSuccess(msg2);
  169 + verify(ctx).tellNext(theMsg2, "Alarm Updated");
  170 +
  171 + }
  172 +
  173 + private void init() throws TbNodeException {
  174 + Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
  175 + Mockito.when(ctx.getDeviceProfileCache()).thenReturn(cache);
  176 + Mockito.when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
  177 + Mockito.when(ctx.getAlarmService()).thenReturn(alarmService);
  178 + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.createObjectNode());
  179 + node = new TbDeviceProfileNode();
  180 + node.init(ctx, nodeConfiguration);
  181 + }
  182 +
  183 +}
... ...