Commit f1cc4fbd04fc1ff4e477c6b3e7c3de3f6a7d7197

Authored by Igor Kulikov
2 parents 155b967b 6d185f1c

Merge branch 'master' of github.com:thingsboard/thingsboard

Showing 19 changed files with 460 additions and 178 deletions
... ... @@ -469,6 +469,14 @@ class DefaultTbContext implements TbContext {
469 469 return mainCtx.getRuleNodeStateService().save(getTenantId(), state);
470 470 }
471 471
  472 + @Override
  473 + public void clearRuleNodeStates() {
  474 + if (log.isDebugEnabled()) {
  475 + log.debug("[{}][{}] Going to clear rule node states", getTenantId(), getSelfId());
  476 + }
  477 + mainCtx.getRuleNodeStateService().removeByRuleNodeId(getTenantId(), getSelfId());
  478 + }
  479 +
472 480 private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
473 481 TbMsgMetaData metaData = new TbMsgMetaData();
474 482 metaData.putValue("ruleNodeId", ruleNodeId.toString());
... ...
... ... @@ -164,6 +164,8 @@ public class RuleChainController extends BaseController {
164 164
165 165 RuleChain savedRuleChain = installScripts.createDefaultRuleChain(getCurrentUser().getTenantId(), request.getName());
166 166
  167 + tbClusterService.onEntityStateChange(savedRuleChain.getTenantId(), savedRuleChain.getId(), ComponentLifecycleEvent.CREATED);
  168 +
167 169 logEntityAction(savedRuleChain.getId(), savedRuleChain, null, ActionType.ADDED, null);
168 170
169 171 return savedRuleChain;
... ...
... ... @@ -30,4 +30,5 @@ public interface RuleNodeStateService {
30 30
31 31 RuleNodeState save(TenantId tenantId, RuleNodeState ruleNodeState);
32 32
  33 + void removeByRuleNodeId(TenantId tenantId, RuleNodeId selfId);
33 34 }
... ...
... ... @@ -18,5 +18,6 @@ package org.thingsboard.server.common.data.query;
18 18 public enum DynamicValueSourceType {
19 19 CURRENT_TENANT,
20 20 CURRENT_CUSTOMER,
21   - CURRENT_USER
  21 + CURRENT_USER,
  22 + CURRENT_DEVICE
22 23 }
... ...
... ... @@ -68,6 +68,17 @@ public class BaseRuleNodeStateService extends AbstractEntityService implements R
68 68 return saveOrUpdate(tenantId, ruleNodeState, false);
69 69 }
70 70
  71 + @Override
  72 + public void removeByRuleNodeId(TenantId tenantId, RuleNodeId ruleNodeId) {
  73 + if (tenantId == null) {
  74 + throw new DataValidationException("Tenant id should be specified!.");
  75 + }
  76 + if (ruleNodeId == null) {
  77 + throw new DataValidationException("Rule node id should be specified!.");
  78 + }
  79 + ruleNodeStateDao.removeByRuleNodeId(ruleNodeId.getId());
  80 + }
  81 +
71 82 public RuleNodeState saveOrUpdate(TenantId tenantId, RuleNodeState ruleNodeState, boolean update) {
72 83 try {
73 84 if (update) {
... ...
... ... @@ -16,6 +16,8 @@
16 16 package org.thingsboard.server.dao.rule;
17 17
18 18 import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.id.RuleNodeId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
19 21 import org.thingsboard.server.common.data.page.PageData;
20 22 import org.thingsboard.server.common.data.page.PageLink;
21 23 import org.thingsboard.server.common.data.rule.RuleNodeState;
... ... @@ -31,4 +33,6 @@ public interface RuleNodeStateDao extends Dao<RuleNodeState> {
31 33 PageData<RuleNodeState> findByRuleNodeId(UUID ruleNodeId, PageLink pageLink);
32 34
33 35 RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId);
  36 +
  37 + void removeByRuleNodeId(UUID ruleNodeId);
34 38 }
... ...
... ... @@ -472,13 +472,13 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
472 472 if (entityFilter.isFetchLastLevelOnly()) {
473 473 String fromOrTo = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "from" : "to");
474 474 StringBuilder notExistsPart = new StringBuilder();
475   - notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr where ")
  475 + notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr ")
  476 + .append(whereFilter.replaceAll("re\\.", "nr\\."))
  477 + .append(" and ")
476 478 .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
477 479 .append(" and ")
478 480 .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
479   - if (!StringUtils.isEmpty(entityFilter.getRelationType())) {
480   - notExistsPart.append(" and nr.relation_type = :where_relation_type");
481   - }
  481 +
482 482 notExistsPart.append(")");
483 483 whereFilter += " and ( re.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")";
484 484 }
... ... @@ -551,12 +551,12 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
551 551
552 552 StringBuilder notExistsPart = new StringBuilder();
553 553 notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr WHERE ");
554   - notExistsPart.append(whereFilter.toString());
555 554 notExistsPart
556   - .append(" and ")
557 555 .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
558 556 .append(" and ")
559   - .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
  557 + .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type")
  558 + .append(" and ")
  559 + .append(whereFilter.toString().replaceAll("re\\.", "nr\\."));
560 560
561 561 notExistsPart.append(")");
562 562 whereFilter.append(" and ( re.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")");
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Autowired;
20 20 import org.springframework.data.repository.CrudRepository;
21 21 import org.springframework.stereotype.Component;
  22 +import org.springframework.transaction.annotation.Transactional;
22 23 import org.thingsboard.server.common.data.id.EntityId;
23 24 import org.thingsboard.server.common.data.page.PageData;
24 25 import org.thingsboard.server.common.data.page.PageLink;
... ... @@ -56,4 +57,10 @@ public class JpaRuleNodeStateDao extends JpaAbstractDao<RuleNodeStateEntity, Rul
56 57 public RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId) {
57 58 return DaoUtil.getData(ruleNodeStateRepository.findByRuleNodeIdAndEntityId(ruleNodeId, entityId));
58 59 }
  60 +
  61 + @Transactional
  62 + @Override
  63 + public void removeByRuleNodeId(UUID ruleNodeId) {
  64 + ruleNodeStateRepository.removeByRuleNodeId(ruleNodeId);
  65 + }
59 66 }
... ...
... ... @@ -33,4 +33,7 @@ public interface RuleNodeStateRepository extends PagingAndSortingRepository<Rule
33 33
34 34 @Query("SELECT e FROM RuleNodeStateEntity e WHERE e.ruleNodeId = :ruleNodeId and e.entityId = :entityId")
35 35 RuleNodeStateEntity findByRuleNodeIdAndEntityId(@Param("ruleNodeId") UUID ruleNodeId, @Param("entityId") UUID entityId);
  36 +
  37 + void removeByRuleNodeId(@Param("ruleNodeId") UUID ruleNodeId);
  38 +
36 39 }
... ...
... ... @@ -221,4 +221,6 @@ public interface TbContext {
221 221 RuleNodeState findRuleNodeStateForEntity(EntityId entityId);
222 222
223 223 RuleNodeState saveRuleNodeState(RuleNodeState state);
  224 +
  225 + void clearRuleNodeStates();
224 226 }
... ...
... ... @@ -29,6 +29,9 @@ import org.thingsboard.server.common.data.device.profile.SimpleAlarmConditionSpe
29 29 import org.thingsboard.server.common.data.device.profile.SpecificTimeSchedule;
30 30 import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
31 31 import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
  32 +import org.thingsboard.server.common.data.query.EntityKey;
  33 +import org.thingsboard.server.common.data.query.EntityKeyType;
  34 +import org.thingsboard.server.common.data.query.FilterPredicateValue;
32 35 import org.thingsboard.server.common.data.query.KeyFilter;
33 36 import org.thingsboard.server.common.data.query.KeyFilterPredicate;
34 37 import org.thingsboard.server.common.data.query.NumericFilterPredicate;
... ... @@ -38,22 +41,25 @@ import org.thingsboard.server.common.msg.tools.SchedulerUtils;
38 41 import java.time.Instant;
39 42 import java.time.ZoneId;
40 43 import java.time.ZonedDateTime;
41   -import java.util.Calendar;
  44 +import java.util.Set;
  45 +import java.util.function.Function;
42 46
43 47 @Data
44   -public class AlarmRuleState {
  48 +class AlarmRuleState {
45 49
46 50 private final AlarmSeverity severity;
47 51 private final AlarmRule alarmRule;
48 52 private final AlarmConditionSpec spec;
49 53 private final long requiredDurationInMs;
50 54 private final long requiredRepeats;
  55 + private final Set<EntityKey> entityKeys;
51 56 private PersistedAlarmRuleState state;
52 57 private boolean updateFlag;
53 58
54   - public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, PersistedAlarmRuleState state) {
  59 + AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, Set<EntityKey> entityKeys, PersistedAlarmRuleState state) {
55 60 this.severity = severity;
56 61 this.alarmRule = alarmRule;
  62 + this.entityKeys = entityKeys;
57 63 if (state != null) {
58 64 this.state = state;
59 65 } else {
... ... @@ -76,6 +82,30 @@ public class AlarmRuleState {
76 82 this.requiredRepeats = requiredRepeats;
77 83 }
78 84
  85 + public boolean validateTsUpdate(Set<EntityKey> changedKeys) {
  86 + for (EntityKey key : changedKeys) {
  87 + if (entityKeys.contains(key)) {
  88 + return true;
  89 + }
  90 + }
  91 + return false;
  92 + }
  93 +
  94 + public boolean validateAttrUpdate(Set<EntityKey> changedKeys) {
  95 + //If the attribute was updated, but no new telemetry arrived - we ignore this until new telemetry is there.
  96 + for (EntityKey key : entityKeys) {
  97 + if (key.getType().equals(EntityKeyType.TIME_SERIES)) {
  98 + return false;
  99 + }
  100 + }
  101 + for (EntityKey key : changedKeys) {
  102 + if (entityKeys.contains(key)) {
  103 + return true;
  104 + }
  105 + }
  106 + return false;
  107 + }
  108 +
79 109 public AlarmConditionSpec getSpec(AlarmRule alarmRule) {
80 110 AlarmConditionSpec spec = alarmRule.getCondition().getSpec();
81 111 if (spec == null) {
... ... @@ -93,7 +123,7 @@ public class AlarmRuleState {
93 123 }
94 124 }
95 125
96   - public boolean eval(DeviceDataSnapshot data) {
  126 + public boolean eval(DataSnapshot data) {
97 127 boolean active = isActive(data.getTs());
98 128 switch (spec.getType()) {
99 129 case SIMPLE:
... ... @@ -167,7 +197,7 @@ public class AlarmRuleState {
167 197 }
168 198 }
169 199
170   - private boolean evalRepeating(DeviceDataSnapshot data, boolean active) {
  200 + private boolean evalRepeating(DataSnapshot data, boolean active) {
171 201 if (active && eval(alarmRule.getCondition(), data)) {
172 202 state.setEventCount(state.getEventCount() + 1);
173 203 updateFlag = true;
... ... @@ -177,7 +207,7 @@ public class AlarmRuleState {
177 207 }
178 208 }
179 209
180   - private boolean evalDuration(DeviceDataSnapshot data, boolean active) {
  210 + private boolean evalDuration(DataSnapshot data, boolean active) {
181 211 if (active && eval(alarmRule.getCondition(), data)) {
182 212 if (state.getLastEventTs() > 0) {
183 213 if (data.getTs() > state.getLastEventTs()) {
... ... @@ -211,45 +241,45 @@ public class AlarmRuleState {
211 241 }
212 242 }
213 243
214   - private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) {
  244 + private boolean eval(AlarmCondition condition, DataSnapshot data) {
215 245 boolean eval = true;
216 246 for (KeyFilter keyFilter : condition.getCondition()) {
217 247 EntityKeyValue value = data.getValue(keyFilter.getKey());
218 248 if (value == null) {
219 249 return false;
220 250 }
221   - eval = eval && eval(value, keyFilter.getPredicate());
  251 + eval = eval && eval(data, value, keyFilter.getPredicate());
222 252 }
223 253 return eval;
224 254 }
225 255
226   - private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) {
  256 + private boolean eval(DataSnapshot data, EntityKeyValue value, KeyFilterPredicate predicate) {
227 257 switch (predicate.getType()) {
228 258 case STRING:
229   - return evalStrPredicate(value, (StringFilterPredicate) predicate);
  259 + return evalStrPredicate(data, value, (StringFilterPredicate) predicate);
230 260 case NUMERIC:
231   - return evalNumPredicate(value, (NumericFilterPredicate) predicate);
232   - case COMPLEX:
233   - return evalComplexPredicate(value, (ComplexFilterPredicate) predicate);
  261 + return evalNumPredicate(data, value, (NumericFilterPredicate) predicate);
234 262 case BOOLEAN:
235   - return evalBoolPredicate(value, (BooleanFilterPredicate) predicate);
  263 + return evalBoolPredicate(data, value, (BooleanFilterPredicate) predicate);
  264 + case COMPLEX:
  265 + return evalComplexPredicate(data, value, (ComplexFilterPredicate) predicate);
236 266 default:
237 267 return false;
238 268 }
239 269 }
240 270
241   - private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) {
  271 + private boolean evalComplexPredicate(DataSnapshot data, EntityKeyValue ekv, ComplexFilterPredicate predicate) {
242 272 switch (predicate.getOperation()) {
243 273 case OR:
244 274 for (KeyFilterPredicate kfp : predicate.getPredicates()) {
245   - if (eval(ekv, kfp)) {
  275 + if (eval(data, ekv, kfp)) {
246 276 return true;
247 277 }
248 278 }
249 279 return false;
250 280 case AND:
251 281 for (KeyFilterPredicate kfp : predicate.getPredicates()) {
252   - if (!eval(ekv, kfp)) {
  282 + if (!eval(data, ekv, kfp)) {
253 283 return false;
254 284 }
255 285 }
... ... @@ -259,109 +289,55 @@ public class AlarmRuleState {
259 289 }
260 290 }
261 291
262   - private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) {
263   - Boolean value;
264   - switch (ekv.getDataType()) {
265   - case LONG:
266   - value = ekv.getLngValue() > 0;
267   - break;
268   - case DOUBLE:
269   - value = ekv.getDblValue() > 0;
270   - break;
271   - case BOOLEAN:
272   - value = ekv.getBoolValue();
273   - break;
274   - case STRING:
275   - try {
276   - value = Boolean.parseBoolean(ekv.getStrValue());
277   - break;
278   - } catch (RuntimeException e) {
279   - return false;
280   - }
281   - case JSON:
282   - try {
283   - value = Boolean.parseBoolean(ekv.getJsonValue());
284   - break;
285   - } catch (RuntimeException e) {
286   - return false;
287   - }
288   - default:
289   - return false;
290   - }
291   - if (value == null) {
  292 + private boolean evalBoolPredicate(DataSnapshot data, EntityKeyValue ekv, BooleanFilterPredicate predicate) {
  293 + Boolean val = getBoolValue(ekv);
  294 + if (val == null) {
292 295 return false;
293 296 }
  297 + Boolean predicateValue = getPredicateValue(data, predicate.getValue(), AlarmRuleState::getBoolValue);
294 298 switch (predicate.getOperation()) {
295 299 case EQUAL:
296   - return value.equals(predicate.getValue().getDefaultValue());
  300 + return val.equals(predicateValue);
297 301 case NOT_EQUAL:
298   - return !value.equals(predicate.getValue().getDefaultValue());
  302 + return !val.equals(predicateValue);
299 303 default:
300 304 throw new RuntimeException("Operation not supported: " + predicate.getOperation());
301 305 }
302 306 }
303 307
304   - private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) {
305   - Double value;
306   - switch (ekv.getDataType()) {
307   - case LONG:
308   - value = ekv.getLngValue().doubleValue();
309   - break;
310   - case DOUBLE:
311   - value = ekv.getDblValue();
312   - break;
313   - case BOOLEAN:
314   - value = ekv.getBoolValue() ? 1.0 : 0.0;
315   - break;
316   - case STRING:
317   - try {
318   - value = Double.parseDouble(ekv.getStrValue());
319   - break;
320   - } catch (RuntimeException e) {
321   - return false;
322   - }
323   - case JSON:
324   - try {
325   - value = Double.parseDouble(ekv.getJsonValue());
326   - break;
327   - } catch (RuntimeException e) {
328   - return false;
329   - }
330   - default:
331   - return false;
332   - }
333   - if (value == null) {
  308 + private boolean evalNumPredicate(DataSnapshot data, EntityKeyValue ekv, NumericFilterPredicate predicate) {
  309 + Double val = getDblValue(ekv);
  310 + if (val == null) {
334 311 return false;
335 312 }
336   -
337   - Double predicateValue = predicate.getValue().getDefaultValue();
  313 + Double predicateValue = getPredicateValue(data, predicate.getValue(), AlarmRuleState::getDblValue);
338 314 switch (predicate.getOperation()) {
339 315 case NOT_EQUAL:
340   - return !value.equals(predicateValue);
  316 + return !val.equals(predicateValue);
341 317 case EQUAL:
342   - return value.equals(predicateValue);
  318 + return val.equals(predicateValue);
343 319 case GREATER:
344   - return value > predicateValue;
  320 + return val > predicateValue;
345 321 case GREATER_OR_EQUAL:
346   - return value >= predicateValue;
  322 + return val >= predicateValue;
347 323 case LESS:
348   - return value < predicateValue;
  324 + return val < predicateValue;
349 325 case LESS_OR_EQUAL:
350   - return value <= predicateValue;
  326 + return val <= predicateValue;
351 327 default:
352 328 throw new RuntimeException("Operation not supported: " + predicate.getOperation());
353 329 }
354 330 }
355 331
356   - private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) {
357   - String val;
358   - String predicateValue;
  332 + private boolean evalStrPredicate(DataSnapshot data, EntityKeyValue ekv, StringFilterPredicate predicate) {
  333 + String val = getStrValue(ekv);
  334 + if (val == null) {
  335 + return false;
  336 + }
  337 + String predicateValue = getPredicateValue(data, predicate.getValue(), AlarmRuleState::getStrValue);
359 338 if (predicate.isIgnoreCase()) {
360   - val = ekv.getStrValue().toLowerCase();
361   - predicateValue = predicate.getValue().getDefaultValue().toLowerCase();
362   - } else {
363   - val = ekv.getStrValue();
364   - predicateValue = predicate.getValue().getDefaultValue();
  339 + val = val.toLowerCase();
  340 + predicateValue = predicateValue.toLowerCase();
365 341 }
366 342 switch (predicate.getOperation()) {
367 343 case CONTAINS:
... ... @@ -380,4 +356,100 @@ public class AlarmRuleState {
380 356 throw new RuntimeException("Operation not supported: " + predicate.getOperation());
381 357 }
382 358 }
  359 +
  360 + private <T> T getPredicateValue(DataSnapshot data, FilterPredicateValue<T> value, Function<EntityKeyValue, T> transformFunction) {
  361 + EntityKeyValue ekv = getDynamicPredicateValue(data, value);
  362 + if (ekv != null) {
  363 + T result = transformFunction.apply(ekv);
  364 + if (result != null) {
  365 + return result;
  366 + }
  367 + }
  368 + return value.getDefaultValue();
  369 + }
  370 +
  371 + private <T> EntityKeyValue getDynamicPredicateValue(DataSnapshot data, FilterPredicateValue<T> value) {
  372 + EntityKeyValue ekv = null;
  373 + if (value.getDynamicValue() != null) {
  374 + ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
  375 + if (ekv == null) {
  376 + ekv = data.getValue(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
  377 + if (ekv == null) {
  378 + ekv = data.getValue(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
  379 + if (ekv == null) {
  380 + ekv = data.getValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
  381 + }
  382 + }
  383 + }
  384 + }
  385 + return ekv;
  386 + }
  387 +
  388 + private static String getStrValue(EntityKeyValue ekv) {
  389 + switch (ekv.getDataType()) {
  390 + case LONG:
  391 + return ekv.getLngValue() != null ? ekv.getLngValue().toString() : null;
  392 + case DOUBLE:
  393 + return ekv.getDblValue() != null ? ekv.getDblValue().toString() : null;
  394 + case BOOLEAN:
  395 + return ekv.getBoolValue() != null ? ekv.getBoolValue().toString() : null;
  396 + case STRING:
  397 + return ekv.getStrValue();
  398 + case JSON:
  399 + return ekv.getJsonValue();
  400 + default:
  401 + return null;
  402 + }
  403 + }
  404 +
  405 + private static Double getDblValue(EntityKeyValue ekv) {
  406 + switch (ekv.getDataType()) {
  407 + case LONG:
  408 + return ekv.getLngValue() != null ? ekv.getLngValue().doubleValue() : null;
  409 + case DOUBLE:
  410 + return ekv.getDblValue() != null ? ekv.getDblValue() : null;
  411 + case BOOLEAN:
  412 + return ekv.getBoolValue() != null ? (ekv.getBoolValue() ? 1.0 : 0.0) : null;
  413 + case STRING:
  414 + try {
  415 + return Double.parseDouble(ekv.getStrValue());
  416 + } catch (RuntimeException e) {
  417 + return null;
  418 + }
  419 + case JSON:
  420 + try {
  421 + return Double.parseDouble(ekv.getJsonValue());
  422 + } catch (RuntimeException e) {
  423 + return null;
  424 + }
  425 + default:
  426 + return null;
  427 + }
  428 + }
  429 +
  430 + private static Boolean getBoolValue(EntityKeyValue ekv) {
  431 + switch (ekv.getDataType()) {
  432 + case LONG:
  433 + return ekv.getLngValue() != null ? ekv.getLngValue() > 0 : null;
  434 + case DOUBLE:
  435 + return ekv.getDblValue() != null ? ekv.getDblValue() > 0 : null;
  436 + case BOOLEAN:
  437 + return ekv.getBoolValue();
  438 + case STRING:
  439 + try {
  440 + return Boolean.parseBoolean(ekv.getStrValue());
  441 + } catch (RuntimeException e) {
  442 + return null;
  443 + }
  444 + case JSON:
  445 + try {
  446 + return Boolean.parseBoolean(ekv.getJsonValue());
  447 + } catch (RuntimeException e) {
  448 + return null;
  449 + }
  450 + default:
  451 + return null;
  452 + }
  453 + }
  454 +
383 455 }
... ...
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java renamed from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java
... ... @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.profile;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import lombok.Data;
  20 +import lombok.extern.slf4j.Slf4j;
20 21 import org.thingsboard.rule.engine.action.TbAlarmResult;
21 22 import org.thingsboard.rule.engine.api.TbContext;
22 23 import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState;
... ... @@ -27,6 +28,7 @@ import org.thingsboard.server.common.data.alarm.AlarmSeverity;
27 28 import org.thingsboard.server.common.data.alarm.AlarmStatus;
28 29 import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
29 30 import org.thingsboard.server.common.data.id.EntityId;
  31 +import org.thingsboard.server.common.data.query.EntityKeyType;
30 32 import org.thingsboard.server.common.msg.TbMsg;
31 33 import org.thingsboard.server.common.msg.TbMsgMetaData;
32 34 import org.thingsboard.server.common.msg.queue.ServiceQueue;
... ... @@ -39,8 +41,10 @@ import java.util.concurrent.ExecutionException;
39 41 import java.util.function.BiFunction;
40 42
41 43 @Data
42   -class DeviceProfileAlarmState {
  44 +@Slf4j
  45 +class AlarmState {
43 46
  47 + private final ProfileState deviceProfile;
44 48 private final EntityId originator;
45 49 private DeviceProfileAlarm alarmDefinition;
46 50 private volatile List<AlarmRuleState> createRulesSortedBySeverityDesc;
... ... @@ -50,27 +54,33 @@ class DeviceProfileAlarmState {
50 54 private volatile TbMsgMetaData lastMsgMetaData;
51 55 private volatile String lastMsgQueueName;
52 56
53   - public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) {
  57 + AlarmState(ProfileState deviceProfile, EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) {
  58 + this.deviceProfile = deviceProfile;
54 59 this.originator = originator;
55 60 this.updateState(alarmDefinition, alarmState);
56 61 }
57 62
58   - public boolean process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException {
  63 + public boolean process(TbContext ctx, TbMsg msg, DataSnapshot data, SnapshotUpdate update) throws ExecutionException, InterruptedException {
59 64 initCurrentAlarm(ctx);
60 65 lastMsgMetaData = msg.getMetaData();
61 66 lastMsgQueueName = msg.getQueueName();
62   - return createOrClearAlarms(ctx, data, AlarmRuleState::eval);
  67 + return createOrClearAlarms(ctx, data, update, AlarmRuleState::eval);
63 68 }
64 69
65 70 public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
66 71 initCurrentAlarm(ctx);
67   - return createOrClearAlarms(ctx, ts, AlarmRuleState::eval);
  72 + return createOrClearAlarms(ctx, ts, null, AlarmRuleState::eval);
68 73 }
69 74
70   - public <T> boolean createOrClearAlarms(TbContext ctx, T data, BiFunction<AlarmRuleState, T, Boolean> evalFunction) {
  75 + public <T> boolean createOrClearAlarms(TbContext ctx, T data, SnapshotUpdate update, BiFunction<AlarmRuleState, T, Boolean> evalFunction) {
71 76 boolean stateUpdate = false;
72 77 AlarmSeverity resultSeverity = null;
  78 + log.debug("[{}] processing update: {}", alarmDefinition.getId(), data);
73 79 for (AlarmRuleState state : createRulesSortedBySeverityDesc) {
  80 + if (!validateUpdate(update, state)) {
  81 + log.debug("[{}][{}] Update is not valid for current rule state", alarmDefinition.getId(), state.getSeverity());
  82 + continue;
  83 + }
74 84 boolean evalResult = evalFunction.apply(state, data);
75 85 stateUpdate |= state.checkUpdate();
76 86 if (evalResult) {
... ... @@ -81,9 +91,17 @@ class DeviceProfileAlarmState {
81 91 if (resultSeverity != null) {
82 92 pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity));
83 93 } else if (currentAlarm != null && clearState != null) {
  94 + if (!validateUpdate(update, clearState)) {
  95 + log.debug("[{}] Update is not valid for current clear state", alarmDefinition.getId());
  96 + return stateUpdate;
  97 + }
84 98 Boolean evalResult = evalFunction.apply(clearState, data);
85 99 if (evalResult) {
86 100 stateUpdate |= clearState.checkUpdate();
  101 + for (AlarmRuleState state : createRulesSortedBySeverityDesc) {
  102 + state.clear();
  103 + stateUpdate |= state.checkUpdate();
  104 + }
87 105 ctx.getAlarmService().clearAlarm(ctx.getTenantId(), currentAlarm.getId(), JacksonUtil.OBJECT_MAPPER.createObjectNode(), System.currentTimeMillis());
88 106 pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm));
89 107 currentAlarm = null;
... ... @@ -92,6 +110,18 @@ class DeviceProfileAlarmState {
92 110 return stateUpdate;
93 111 }
94 112
  113 + public boolean validateUpdate(SnapshotUpdate update, AlarmRuleState state) {
  114 + if (update != null) {
  115 + //Check that the update type and that keys match.
  116 + if (update.getType().equals(EntityKeyType.TIME_SERIES)) {
  117 + return state.validateTsUpdate(update.getKeys());
  118 + } else if (update.getType().equals(EntityKeyType.ATTRIBUTE)) {
  119 + return state.validateAttrUpdate(update.getKeys());
  120 + }
  121 + }
  122 + return true;
  123 + }
  124 +
95 125 public void initCurrentAlarm(TbContext ctx) throws InterruptedException, ExecutionException {
96 126 if (!initialFetchDone) {
97 127 Alarm alarm = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), originator, alarmDefinition.getAlarmType()).get();
... ... @@ -137,17 +167,20 @@ class DeviceProfileAlarmState {
137 167 alarmState.getCreateRuleStates().put(severity, ruleState);
138 168 }
139 169 }
140   - createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule, ruleState));
  170 + createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule,
  171 + deviceProfile.getCreateAlarmKeys(alarm.getId(), severity), ruleState));
141 172 });
142 173 createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal()));
143 174 PersistedAlarmRuleState ruleState = alarmState == null ? null : alarmState.getClearRuleState();
144 175 if (alarmDefinition.getClearRule() != null) {
145   - clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), ruleState);
  176 + clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), deviceProfile.getClearAlarmKeys(alarm.getId()), ruleState);
146 177 }
147 178 }
148 179
149 180 private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) {
150 181 if (currentAlarm != null) {
  182 + // TODO: In some extremely rare cases, we might miss the event of alarm clear (If one use in-mem queue and restarted the server) or (if one manipulated the rule chain).
  183 + // Maybe we should fetch alarm every time?
151 184 currentAlarm.setEndTs(System.currentTimeMillis());
152 185 AlarmSeverity oldSeverity = currentAlarm.getSeverity();
153 186 if (!oldSeverity.equals(severity)) {
... ...
... ... @@ -15,7 +15,7 @@
15 15 */
16 16 package org.thingsboard.rule.engine.profile;
17 17
18   -public enum AlarmStateUpdateResult {
  18 +enum AlarmStateUpdateResult {
19 19
20 20 NONE, CREATED, UPDATED, SEVERITY_UPDATED, CLEARED;
21 21
... ...
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DataSnapshot.java renamed from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java
... ... @@ -24,7 +24,7 @@ import java.util.Map;
24 24 import java.util.Set;
25 25 import java.util.concurrent.ConcurrentHashMap;
26 26
27   -public class DeviceDataSnapshot {
  27 +class DataSnapshot {
28 28
29 29 private volatile boolean ready;
30 30 @Getter
... ... @@ -33,7 +33,7 @@ public class DeviceDataSnapshot {
33 33 private final Set<EntityKey> keys;
34 34 private final Map<EntityKey, EntityKeyValue> values = new ConcurrentHashMap<>();
35 35
36   - public DeviceDataSnapshot(Set<EntityKey> entityKeysToFetch) {
  36 + DataSnapshot(Set<EntityKey> entityKeysToFetch) {
37 37 this.keys = entityKeysToFetch;
38 38 }
39 39
... ... @@ -56,28 +56,38 @@ public class DeviceDataSnapshot {
56 56 }
57 57 }
58 58
59   - void putValue(EntityKey key, EntityKeyValue value) {
  59 + boolean putValue(EntityKey key, long newTs, EntityKeyValue value) {
  60 + boolean updateOfTs = ts != newTs;
  61 + boolean result = false;
60 62 switch (key.getType()) {
61 63 case ATTRIBUTE:
62   - putIfKeyExists(key, value);
63   - putIfKeyExists(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE), value);
64   - putIfKeyExists(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE), value);
65   - putIfKeyExists(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE), value);
  64 + result |= putIfKeyExists(key, value, updateOfTs);
  65 + result |= putIfKeyExists(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE), value, updateOfTs);
  66 + result |= putIfKeyExists(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE), value, updateOfTs);
  67 + result |= putIfKeyExists(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE), value, updateOfTs);
66 68 break;
67 69 case CLIENT_ATTRIBUTE:
68 70 case SHARED_ATTRIBUTE:
69 71 case SERVER_ATTRIBUTE:
70   - putIfKeyExists(key, value);
71   - putIfKeyExists(getAttrKey(key, EntityKeyType.ATTRIBUTE), value);
  72 + result |= putIfKeyExists(key, value, updateOfTs);
  73 + result |= putIfKeyExists(getAttrKey(key, EntityKeyType.ATTRIBUTE), value, updateOfTs);
72 74 break;
73 75 default:
74   - putIfKeyExists(key, value);
  76 + result |= putIfKeyExists(key, value, updateOfTs);
75 77 }
  78 + return result;
76 79 }
77 80
78   - private void putIfKeyExists(EntityKey key, EntityKeyValue value) {
  81 + private boolean putIfKeyExists(EntityKey key, EntityKeyValue value, boolean updateOfTs) {
79 82 if (keys.contains(key)) {
80   - values.put(key, value);
  83 + EntityKeyValue oldValue = values.put(key, value);
  84 + if (updateOfTs) {
  85 + return true;
  86 + } else {
  87 + return oldValue == null || !oldValue.equals(value);
  88 + }
  89 + } else {
  90 + return false;
81 91 }
82 92 }
83 93
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.rule.engine.profile;
17 17
18 18 import com.google.gson.JsonParser;
  19 +import lombok.extern.slf4j.Slf4j;
19 20 import org.springframework.util.StringUtils;
20 21 import org.thingsboard.rule.engine.api.TbContext;
21 22 import org.thingsboard.rule.engine.profile.state.PersistedAlarmState;
... ... @@ -29,7 +30,6 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
29 30 import org.thingsboard.server.common.data.id.DeviceId;
30 31 import org.thingsboard.server.common.data.id.DeviceProfileId;
31 32 import org.thingsboard.server.common.data.id.EntityId;
32   -import org.thingsboard.server.common.data.id.RuleNodeStateId;
33 33 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
34 34 import org.thingsboard.server.common.data.kv.KvEntry;
35 35 import org.thingsboard.server.common.data.kv.TsKvEntry;
... ... @@ -53,17 +53,18 @@ import java.util.concurrent.ConcurrentMap;
53 53 import java.util.concurrent.ExecutionException;
54 54 import java.util.stream.Collectors;
55 55
  56 +@Slf4j
56 57 class DeviceState {
57 58
58 59 private final boolean persistState;
59 60 private final DeviceId deviceId;
  61 + private final ProfileState deviceProfile;
60 62 private RuleNodeState state;
61   - private DeviceProfileState deviceProfile;
62 63 private PersistedDeviceState pds;
63   - private DeviceDataSnapshot latestValues;
64   - private final ConcurrentMap<String, DeviceProfileAlarmState> alarmStates = new ConcurrentHashMap<>();
  64 + private DataSnapshot latestValues;
  65 + private final ConcurrentMap<String, AlarmState> alarmStates = new ConcurrentHashMap<>();
65 66
66   - public DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, DeviceProfileState deviceProfile, RuleNodeState state) {
  67 + DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, ProfileState deviceProfile, RuleNodeState state) {
67 68 this.persistState = config.isPersistAlarmRulesState();
68 69 this.deviceId = deviceId;
69 70 this.deviceProfile = deviceProfile;
... ... @@ -86,7 +87,7 @@ class DeviceState {
86 87 if (pds != null) {
87 88 for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
88 89 alarmStates.computeIfAbsent(alarm.getId(),
89   - a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  90 + a -> new AlarmState(deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
90 91 }
91 92 }
92 93 }
... ... @@ -107,14 +108,20 @@ class DeviceState {
107 108 if (alarmStates.containsKey(alarm.getId())) {
108 109 alarmStates.get(alarm.getId()).updateState(alarm, getOrInitPersistedAlarmState(alarm));
109 110 } else {
110   - alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  111 + alarmStates.putIfAbsent(alarm.getId(), new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
111 112 }
112 113 }
113 114 }
114 115
115 116 public void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
116   - for (DeviceProfileAlarmState state : alarmStates.values()) {
117   - state.process(ctx, ts);
  117 + log.debug("[{}] Going to harvest alarms: {}", ctx.getSelfId(), ts);
  118 + boolean stateChanged = false;
  119 + for (AlarmState state : alarmStates.values()) {
  120 + stateChanged |= state.process(ctx, ts);
  121 + }
  122 + if (persistState && stateChanged) {
  123 + state.setStateData(JacksonUtil.toString(pds));
  124 + state = ctx.saveRuleNodeState(state);
118 125 }
119 126 }
120 127
... ... @@ -146,8 +153,8 @@ class DeviceState {
146 153 boolean stateChanged = false;
147 154 Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
148 155 for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
149   - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
150   - a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  156 + AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
  157 + a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
151 158 stateChanged |= alarmState.processAlarmClear(ctx, alarmNf);
152 159 }
153 160 ctx.tellSuccess(msg);
... ... @@ -175,9 +182,9 @@ class DeviceState {
175 182 EntityKeyType keyType = getKeyTypeFromScope(scope);
176 183 keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key)));
177 184 for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
178   - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
179   - a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
180   - stateChanged |= alarmState.process(ctx, msg, latestValues);
  185 + AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
  186 + a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  187 + stateChanged |= alarmState.process(ctx, msg, latestValues, null);
181 188 }
182 189 }
183 190 ctx.tellSuccess(msg);
... ... @@ -192,11 +199,11 @@ class DeviceState {
192 199 private boolean processAttributesUpdate(TbContext ctx, TbMsg msg, Set<AttributeKvEntry> attributes, String scope) throws ExecutionException, InterruptedException {
193 200 boolean stateChanged = false;
194 201 if (!attributes.isEmpty()) {
195   - merge(latestValues, attributes, scope);
  202 + SnapshotUpdate update = merge(latestValues, attributes, scope);
196 203 for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
197   - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
198   - a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
199   - stateChanged |= alarmState.process(ctx, msg, latestValues);
  204 + AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
  205 + a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  206 + stateChanged |= alarmState.process(ctx, msg, latestValues, update);
200 207 }
201 208 }
202 209 ctx.tellSuccess(msg);
... ... @@ -206,34 +213,47 @@ class DeviceState {
206 213 protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
207 214 boolean stateChanged = false;
208 215 Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg));
  216 + // iterate over data by ts (ASC order).
209 217 for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {
210 218 Long ts = entry.getKey();
211 219 List<KvEntry> data = entry.getValue();
212   - merge(latestValues, ts, data);
213   - for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
214   - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
215   - a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
216   - stateChanged |= alarmState.process(ctx, msg, latestValues);
  220 + SnapshotUpdate update = merge(latestValues, ts, data);
  221 + if (update.hasUpdate()) {
  222 + for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
  223 + AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
  224 + a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
  225 + stateChanged |= alarmState.process(ctx, msg, latestValues, update);
  226 + }
217 227 }
218 228 }
219 229 ctx.tellSuccess(msg);
220 230 return stateChanged;
221 231 }
222 232
223   - private void merge(DeviceDataSnapshot latestValues, Long ts, List<KvEntry> data) {
224   - latestValues.setTs(ts);
  233 + private SnapshotUpdate merge(DataSnapshot latestValues, Long newTs, List<KvEntry> data) {
  234 + Set<EntityKey> keys = new HashSet<>();
225 235 for (KvEntry entry : data) {
226   - latestValues.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry));
  236 + EntityKey entityKey = new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey());
  237 + if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {
  238 + keys.add(entityKey);
  239 + }
227 240 }
  241 + latestValues.setTs(newTs);
  242 + return new SnapshotUpdate(EntityKeyType.TIME_SERIES, keys);
228 243 }
229 244
230   - private void merge(DeviceDataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
231   - long ts = latestValues.getTs();
  245 + private SnapshotUpdate merge(DataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
  246 + long newTs = 0;
  247 + Set<EntityKey> keys = new HashSet<>();
232 248 for (AttributeKvEntry entry : attributes) {
233   - ts = Math.max(ts, entry.getLastUpdateTs());
234   - latestValues.putValue(new EntityKey(getKeyTypeFromScope(scope), entry.getKey()), toEntityValue(entry));
  249 + newTs = Math.max(newTs, entry.getLastUpdateTs());
  250 + EntityKey entityKey = new EntityKey(getKeyTypeFromScope(scope), entry.getKey());
  251 + if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {
  252 + keys.add(entityKey);
  253 + }
235 254 }
236   - latestValues.setTs(ts);
  255 + latestValues.setTs(newTs);
  256 + return new SnapshotUpdate(EntityKeyType.ATTRIBUTE, keys);
237 257 }
238 258
239 259 private static EntityKeyType getKeyTypeFromScope(String scope) {
... ... @@ -248,14 +268,14 @@ class DeviceState {
248 268 return EntityKeyType.ATTRIBUTE;
249 269 }
250 270
251   - private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
  271 + private DataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
252 272 Set<EntityKey> entityKeysToFetch = deviceProfile.getEntityKeys();
253   - DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch);
  273 + DataSnapshot result = new DataSnapshot(entityKeysToFetch);
254 274 addEntityKeysToSnapshot(ctx, originator, entityKeysToFetch, result);
255 275 return result;
256 276 }
257 277
258   - private void addEntityKeysToSnapshot(TbContext ctx, EntityId originator, Set<EntityKey> entityKeysToFetch, DeviceDataSnapshot result) throws InterruptedException, ExecutionException {
  278 + private void addEntityKeysToSnapshot(TbContext ctx, EntityId originator, Set<EntityKey> entityKeysToFetch, DataSnapshot result) throws InterruptedException, ExecutionException {
259 279 Set<String> serverAttributeKeys = new HashSet<>();
260 280 Set<String> clientAttributeKeys = new HashSet<>();
261 281 Set<String> sharedAttributeKeys = new HashSet<>();
... ... @@ -291,16 +311,16 @@ class DeviceState {
291 311 if (device != null) {
292 312 switch (key) {
293 313 case EntityKeyMapping.NAME:
294   - result.putValue(entityKey, EntityKeyValue.fromString(device.getName()));
  314 + result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getName()));
295 315 break;
296 316 case EntityKeyMapping.TYPE:
297   - result.putValue(entityKey, EntityKeyValue.fromString(device.getType()));
  317 + result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getType()));
298 318 break;
299 319 case EntityKeyMapping.CREATED_TIME:
300   - result.putValue(entityKey, EntityKeyValue.fromLong(device.getCreatedTime()));
  320 + result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromLong(device.getCreatedTime()));
301 321 break;
302 322 case EntityKeyMapping.LABEL:
303   - result.putValue(entityKey, EntityKeyValue.fromString(device.getLabel()));
  323 + result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getLabel()));
304 324 break;
305 325 }
306 326 }
... ... @@ -312,7 +332,7 @@ class DeviceState {
312 332 List<TsKvEntry> data = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), originator, latestTsKeys).get();
313 333 for (TsKvEntry entry : data) {
314 334 if (entry.getValue() != null) {
315   - result.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry));
  335 + result.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), entry.getTs(), toEntityValue(entry));
316 336 }
317 337 }
318 338 }
... ... @@ -330,13 +350,13 @@ class DeviceState {
330 350 }
331 351 }
332 352
333   - private void addToSnapshot(DeviceDataSnapshot snapshot, Set<String> commonAttributeKeys, List<AttributeKvEntry> data) {
  353 + private void addToSnapshot(DataSnapshot snapshot, Set<String> commonAttributeKeys, List<AttributeKvEntry> data) {
334 354 for (AttributeKvEntry entry : data) {
335 355 if (entry.getValue() != null) {
336 356 EntityKeyValue value = toEntityValue(entry);
337   - snapshot.putValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, entry.getKey()), value);
  357 + snapshot.putValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, entry.getKey()), entry.getLastUpdateTs(), value);
338 358 if (commonAttributeKeys.contains(entry.getKey())) {
339   - snapshot.putValue(new EntityKey(EntityKeyType.ATTRIBUTE, entry.getKey()), value);
  359 + snapshot.putValue(new EntityKey(EntityKeyType.ATTRIBUTE, entry.getKey()), entry.getLastUpdateTs(), value);
340 360 }
341 361 }
342 362 }
... ...
... ... @@ -15,9 +15,11 @@
15 15 */
16 16 package org.thingsboard.rule.engine.profile;
17 17
  18 +import lombok.EqualsAndHashCode;
18 19 import lombok.Getter;
19 20 import org.thingsboard.server.common.data.kv.DataType;
20 21
  22 +@EqualsAndHashCode
21 23 class EntityKeyValue {
22 24
23 25 @Getter
... ...
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/ProfileState.java renamed from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileState.java
... ... @@ -18,19 +18,33 @@ package org.thingsboard.rule.engine.profile;
18 18 import lombok.AccessLevel;
19 19 import lombok.Getter;
20 20 import org.thingsboard.server.common.data.DeviceProfile;
  21 +import org.thingsboard.server.common.data.alarm.AlarmSeverity;
21 22 import org.thingsboard.server.common.data.device.profile.AlarmRule;
22 23 import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
23 24 import org.thingsboard.server.common.data.id.DeviceProfileId;
  25 +import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
  26 +import org.thingsboard.server.common.data.query.DynamicValue;
  27 +import org.thingsboard.server.common.data.query.DynamicValueSourceType;
24 28 import org.thingsboard.server.common.data.query.EntityKey;
  29 +import org.thingsboard.server.common.data.query.EntityKeyType;
  30 +import org.thingsboard.server.common.data.query.FilterPredicateValue;
25 31 import org.thingsboard.server.common.data.query.KeyFilter;
  32 +import org.thingsboard.server.common.data.query.KeyFilterPredicate;
  33 +import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
  34 +import org.thingsboard.server.common.data.query.StringFilterPredicate;
26 35
  36 +import javax.print.attribute.standard.Severity;
  37 +import java.util.Collections;
  38 +import java.util.HashMap;
  39 +import java.util.HashSet;
27 40 import java.util.List;
  41 +import java.util.Map;
28 42 import java.util.Set;
29 43 import java.util.concurrent.ConcurrentHashMap;
30 44 import java.util.concurrent.CopyOnWriteArrayList;
31 45
32 46
33   -class DeviceProfileState {
  47 +class ProfileState {
34 48
35 49 private DeviceProfile deviceProfile;
36 50 @Getter(AccessLevel.PACKAGE)
... ... @@ -38,26 +52,86 @@ class DeviceProfileState {
38 52 @Getter(AccessLevel.PACKAGE)
39 53 private final Set<EntityKey> entityKeys = ConcurrentHashMap.newKeySet();
40 54
41   - DeviceProfileState(DeviceProfile deviceProfile) {
  55 + private final Map<String, Map<AlarmSeverity, Set<EntityKey>>> alarmCreateKeys = new HashMap<>();
  56 + private final Map<String, Set<EntityKey>> alarmClearKeys = new HashMap<>();
  57 +
  58 + ProfileState(DeviceProfile deviceProfile) {
42 59 updateDeviceProfile(deviceProfile);
43 60 }
44 61
45 62 void updateDeviceProfile(DeviceProfile deviceProfile) {
46 63 this.deviceProfile = deviceProfile;
47 64 alarmSettings.clear();
  65 + alarmCreateKeys.clear();
  66 + alarmClearKeys.clear();
48 67 if (deviceProfile.getProfileData().getAlarms() != null) {
49 68 alarmSettings.addAll(deviceProfile.getProfileData().getAlarms());
50 69 for (DeviceProfileAlarm alarm : deviceProfile.getProfileData().getAlarms()) {
51   - for (AlarmRule alarmRule : alarm.getCreateRules().values()) {
  70 + Map<AlarmSeverity, Set<EntityKey>> createAlarmKeys = alarmCreateKeys.computeIfAbsent(alarm.getId(), id -> new HashMap<>());
  71 + alarm.getCreateRules().forEach(((severity, alarmRule) -> {
  72 + Set<EntityKey> ruleKeys = createAlarmKeys.computeIfAbsent(severity, id -> new HashSet<>());
52 73 for (KeyFilter keyFilter : alarmRule.getCondition().getCondition()) {
53 74 entityKeys.add(keyFilter.getKey());
  75 + ruleKeys.add(keyFilter.getKey());
  76 + addDynamicValuesRecursively(keyFilter.getPredicate(), entityKeys, ruleKeys);
  77 + }
  78 + }));
  79 + if (alarm.getClearRule() != null) {
  80 + Set<EntityKey> clearAlarmKeys = alarmClearKeys.computeIfAbsent(alarm.getId(), id -> new HashSet<>());
  81 + for (KeyFilter keyFilter : alarm.getClearRule().getCondition().getCondition()) {
  82 + entityKeys.add(keyFilter.getKey());
  83 + clearAlarmKeys.add(keyFilter.getKey());
  84 + addDynamicValuesRecursively(keyFilter.getPredicate(), entityKeys, clearAlarmKeys);
54 85 }
55 86 }
56 87 }
57 88 }
58 89 }
59 90
60   - public DeviceProfileId getProfileId() {
  91 + private void addDynamicValuesRecursively(KeyFilterPredicate predicate, Set<EntityKey> entityKeys, Set<EntityKey> ruleKeys) {
  92 + switch (predicate.getType()) {
  93 + case STRING:
  94 + case NUMERIC:
  95 + case BOOLEAN:
  96 + DynamicValue value = ((SimpleKeyFilterPredicate) predicate).getValue().getDynamicValue();
  97 + if (value != null && value.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE) {
  98 + EntityKey entityKey = new EntityKey(EntityKeyType.ATTRIBUTE, value.getSourceAttribute());
  99 + entityKeys.add(entityKey);
  100 + ruleKeys.add(entityKey);
  101 + }
  102 + break;
  103 + case COMPLEX:
  104 + for (KeyFilterPredicate child : ((ComplexFilterPredicate) predicate).getPredicates()) {
  105 + addDynamicValuesRecursively(child, entityKeys, ruleKeys);
  106 + }
  107 + break;
  108 + }
  109 + }
  110 +
  111 + DeviceProfileId getProfileId() {
61 112 return deviceProfile.getId();
62 113 }
  114 +
  115 + Set<EntityKey> getCreateAlarmKeys(String id, AlarmSeverity severity) {
  116 + Map<AlarmSeverity, Set<EntityKey>> sKeys = alarmCreateKeys.get(id);
  117 + if (sKeys == null) {
  118 + return Collections.emptySet();
  119 + } else {
  120 + Set<EntityKey> keys = sKeys.get(severity);
  121 + if (keys == null) {
  122 + return Collections.emptySet();
  123 + } else {
  124 + return keys;
  125 + }
  126 + }
  127 + }
  128 +
  129 + Set<EntityKey> getClearAlarmKeys(String id) {
  130 + Set<EntityKey> keys = alarmClearKeys.get(id);
  131 + if (keys == null) {
  132 + return Collections.emptySet();
  133 + } else {
  134 + return keys;
  135 + }
  136 + }
63 137 }
... ...
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/SnapshotUpdate.java renamed from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/EntityKeyState.java
... ... @@ -15,8 +15,25 @@
15 15 */
16 16 package org.thingsboard.rule.engine.profile;
17 17
18   -public class EntityKeyState {
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.data.query.EntityKey;
  20 +import org.thingsboard.server.common.data.query.EntityKeyType;
19 21
  22 +import java.util.Set;
20 23
  24 +class SnapshotUpdate {
21 25
  26 + @Getter
  27 + private final EntityKeyType type;
  28 + @Getter
  29 + private final Set<EntityKey> keys;
  30 +
  31 + SnapshotUpdate(EntityKeyType type, Set<EntityKey> keys) {
  32 + this.type = type;
  33 + this.keys = keys;
  34 + }
  35 +
  36 + boolean hasUpdate(){
  37 + return !keys.isEmpty();
  38 + }
22 39 }
... ...
... ... @@ -23,7 +23,6 @@ import org.thingsboard.rule.engine.api.TbNode;
23 23 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
24 24 import org.thingsboard.rule.engine.api.TbNodeException;
25 25 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
26   -import org.thingsboard.rule.engine.profile.state.PersistedDeviceState;
27 26 import org.thingsboard.server.common.data.DataConstants;
28 27 import org.thingsboard.server.common.data.Device;
29 28 import org.thingsboard.server.common.data.DeviceProfile;
... ... @@ -36,11 +35,10 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
36 35 import org.thingsboard.server.common.data.rule.RuleNodeState;
37 36 import org.thingsboard.server.common.msg.TbMsg;
38 37 import org.thingsboard.server.common.msg.TbMsgMetaData;
  38 +import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
39 39 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
40 40
41   -import java.util.HashMap;
42 41 import java.util.Map;
43   -import java.util.UUID;
44 42 import java.util.concurrent.ConcurrentHashMap;
45 43 import java.util.concurrent.ExecutionException;
46 44 import java.util.concurrent.TimeUnit;
... ... @@ -70,11 +68,14 @@ public class TbDeviceProfileNode implements TbNode {
70 68 this.cache = ctx.getDeviceProfileCache();
71 69 scheduleAlarmHarvesting(ctx);
72 70 if (config.isFetchAlarmRulesStateOnStart()) {
  71 + log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
  72 + int fetchCount = 0;
73 73 PageLink pageLink = new PageLink(1024);
74 74 while (true) {
75 75 PageData<RuleNodeState> states = ctx.findRuleNodeStates(pageLink);
76 76 if (!states.getData().isEmpty()) {
77 77 for (RuleNodeState rns : states.getData()) {
  78 + fetchCount++;
78 79 if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) {
79 80 getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns);
80 81 }
... ... @@ -86,6 +87,11 @@ public class TbDeviceProfileNode implements TbNode {
86 87 pageLink = pageLink.nextPageLink();
87 88 }
88 89 }
  90 + log.info("[{}] Fetched alarm rule state for {} entities", ctx.getSelfId(), fetchCount);
  91 + }
  92 + if (!config.isPersistAlarmRulesState() && ctx.isLocalEntity(ctx.getSelfId())) {
  93 + log.info("[{}] Going to cleanup rule node states", ctx.getSelfId());
  94 + ctx.clearRuleNodeStates();
89 95 }
90 96 }
91 97
... ... @@ -114,11 +120,14 @@ public class TbDeviceProfileNode implements TbNode {
114 120 }
115 121 }
116 122 } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
  123 + log.info("[{}] Received device profile update notification: {}", ctx.getSelfId(), msg.getData());
117 124 if (msg.getType().equals("ENTITY_UPDATED")) {
118 125 DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
119   - for (DeviceState state : deviceStates.values()) {
120   - if (deviceProfile.getId().equals(state.getProfileId())) {
121   - state.updateProfile(ctx, deviceProfile);
  126 + if (deviceProfile != null) {
  127 + for (DeviceState state : deviceStates.values()) {
  128 + if (deviceProfile.getId().equals(state.getProfileId())) {
  129 + state.updateProfile(ctx, deviceProfile);
  130 + }
122 131 }
123 132 }
124 133 }
... ... @@ -141,6 +150,12 @@ public class TbDeviceProfileNode implements TbNode {
141 150 }
142 151
143 152 @Override
  153 + public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
  154 + // Cleanup the cache for all entities that are no longer assigned to current server partitions
  155 + deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
  156 + }
  157 +
  158 + @Override
144 159 public void destroy() {
145 160 deviceStates.clear();
146 161 }
... ... @@ -150,7 +165,7 @@ public class TbDeviceProfileNode implements TbNode {
150 165 if (deviceState == null) {
151 166 DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
152 167 if (deviceProfile != null) {
153   - deviceState = new DeviceState(ctx, config, deviceId, new DeviceProfileState(deviceProfile), rns);
  168 + deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns);
154 169 deviceStates.put(deviceId, deviceState);
155 170 }
156 171 }
... ...