Commit 0bf8b966f47dc3c83f9a77fe30323321db1d524f

Authored by 芯火源
1 parent 61f4b8fa

fix: 场景联动逻辑调整

@@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
5 import com.baomidou.mybatisplus.core.metadata.IPage; 5 import com.baomidou.mybatisplus.core.metadata.IPage;
6 import com.fasterxml.jackson.databind.JsonNode; 6 import com.fasterxml.jackson.databind.JsonNode;
7 import com.fasterxml.jackson.databind.node.ObjectNode; 7 import com.fasterxml.jackson.databind.node.ObjectNode;
  8 +import java.util.*;
  9 +import java.util.stream.Collectors;
8 import lombok.RequiredArgsConstructor; 10 import lombok.RequiredArgsConstructor;
9 import org.apache.commons.lang3.StringUtils; 11 import org.apache.commons.lang3.StringUtils;
10 import org.springframework.stereotype.Service; 12 import org.springframework.stereotype.Service;
@@ -27,13 +29,13 @@ import org.thingsboard.server.dao.yunteng.entities.*; @@ -27,13 +29,13 @@ import org.thingsboard.server.dao.yunteng.entities.*;
27 import org.thingsboard.server.dao.yunteng.mapper.*; 29 import org.thingsboard.server.dao.yunteng.mapper.*;
28 import org.thingsboard.server.dao.yunteng.service.*; 30 import org.thingsboard.server.dao.yunteng.service.*;
29 31
30 -import java.util.*;  
31 -import java.util.stream.Collectors;  
32 -  
33 -/** @Description 场景联动业务实现层 @Author cxy @Date 2021/11/25 11:22 */ 32 +/**
  33 + * @Description 场景联动业务实现层 @Author cxy @Date 2021/11/25 11:22
  34 + */
34 @Service 35 @Service
35 @RequiredArgsConstructor 36 @RequiredArgsConstructor
36 -public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageMapper, TkSceneLinkageEntity> 37 +public class TkSceneLinkageServiceImpl
  38 + extends AbstractBaseService<SceneLinkageMapper, TkSceneLinkageEntity>
37 implements SceneLinkageService { 39 implements SceneLinkageService {
38 40
39 private final SceneLinkageMapper sceneLinkageMapper; 41 private final SceneLinkageMapper sceneLinkageMapper;
@@ -74,14 +76,20 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -74,14 +76,20 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
74 76
75 /** 77 /**
76 * 更新场景联动附加信息,触发器、执行条件、动作 78 * 更新场景联动附加信息,触发器、执行条件、动作
77 - * @param sceneLinkageDTO 场景联动表单数据  
78 - * @param tenantId 租户ID 79 + *
  80 + * @param sceneLinkageDTO 场景联动表单数据
  81 + * @param tenantId 租户ID
79 * @param customerId 客户ID 82 * @param customerId 客户ID
80 * @param sceneLinkage 场景联动主表实体 83 * @param sceneLinkage 场景联动主表实体
81 */ 84 */
82 - private void updateScene(SceneLinkageDTO sceneLinkageDTO, String tenantId, String customerId, TkSceneLinkageEntity sceneLinkage) { 85 + private void updateScene(
  86 + SceneLinkageDTO sceneLinkageDTO,
  87 + String tenantId,
  88 + String customerId,
  89 + TkSceneLinkageEntity sceneLinkage) {
83 String organizationId = sceneLinkage.getOrganizationId(); 90 String organizationId = sceneLinkage.getOrganizationId();
84 - List<DeviceDTO> organizationDevices = findDeviceList(organizationId, tenantId, customerId,new ArrayList<>()); 91 + List<DeviceDTO> organizationDevices =
  92 + findDeviceList(organizationId, tenantId, customerId, new ArrayList<>());
85 93
86 List<String> tbDeviceIds = new ArrayList<>(); 94 List<String> tbDeviceIds = new ArrayList<>();
87 for (DeviceDTO item : organizationDevices) { 95 for (DeviceDTO item : organizationDevices) {
@@ -105,7 +113,8 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -105,7 +113,8 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
105 new QueryWrapper<TkSceneLinkageEntity>() 113 new QueryWrapper<TkSceneLinkageEntity>()
106 .lambda() 114 .lambda()
107 .eq(TkSceneLinkageEntity::getTenantId, tenantId) 115 .eq(TkSceneLinkageEntity::getTenantId, tenantId)
108 -// .eq(StringUtils.isNotBlank(currentUserId),SceneLinkage::getCreator, currentUserId) 116 + // .eq(StringUtils.isNotBlank(currentUserId),SceneLinkage::getCreator,
  117 + // currentUserId)
109 .in(TkSceneLinkageEntity::getId, ids); 118 .in(TkSceneLinkageEntity::getId, ids);
110 int result = sceneLinkageMapper.delete(Wrapper); 119 int result = sceneLinkageMapper.delete(Wrapper);
111 if (result != ids.size()) { 120 if (result != ids.size()) {
@@ -239,19 +248,22 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -239,19 +248,22 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
239 doActionDTO.setTenantId(sceneLinkageDTO.getTenantId()); 248 doActionDTO.setTenantId(sceneLinkageDTO.getTenantId());
240 doActionDTO.setSceneLinkageId(sceneLinkageDTO.getId()); 249 doActionDTO.setSceneLinkageId(sceneLinkageDTO.getId());
241 if (ActionTypeEnum.DEVICE_OUT.equals(doActionDTO.getOutTarget())) { 250 if (ActionTypeEnum.DEVICE_OUT.equals(doActionDTO.getOutTarget())) {
242 - JsonNode inputContext = doActionDTO.getDoContext().get(FastIotConstants.Rpc.PARAMS_NAME); 251 + JsonNode inputContext =
  252 + doActionDTO.getDoContext().get(FastIotConstants.Rpc.PARAMS_NAME);
243 ObjectNode outputContext = JacksonUtil.newObjectNode(); 253 ObjectNode outputContext = JacksonUtil.newObjectNode();
244 outputContext.put(FastIotConstants.Rpc.METHOD_NAME, "methodThingskit"); 254 outputContext.put(FastIotConstants.Rpc.METHOD_NAME, "methodThingskit");
245 - if(inputContext == null){  
246 - outputContext.put(FastIotConstants.Rpc.PARAMS_NAME,"");  
247 - }else if(inputContext.isTextual()){  
248 - outputContext.put(FastIotConstants.Rpc.PARAMS_NAME,inputContext.asText());  
249 - }else{ 255 + if (inputContext == null) {
  256 + outputContext.put(FastIotConstants.Rpc.PARAMS_NAME, "");
  257 + } else if (inputContext.isTextual()) {
  258 + outputContext.put(FastIotConstants.Rpc.PARAMS_NAME, inputContext.asText());
  259 + } else {
250 outputContext.set(FastIotConstants.Rpc.PARAMS_NAME, inputContext); 260 outputContext.set(FastIotConstants.Rpc.PARAMS_NAME, inputContext);
251 } 261 }
252 262
253 ObjectNode addtionalInfo = JacksonUtil.newObjectNode(); 263 ObjectNode addtionalInfo = JacksonUtil.newObjectNode();
254 - addtionalInfo.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, doActionDTO.getCommandType()); 264 + addtionalInfo.put(
  265 + ModelConstants.TablePropertyMapping.COMMAND_TYPE,
  266 + doActionDTO.getCommandType());
255 outputContext.set(DataConstants.ADDITIONAL_INFO, addtionalInfo); 267 outputContext.set(DataConstants.ADDITIONAL_INFO, addtionalInfo);
256 doActionDTO.setDoContext(outputContext); 268 doActionDTO.setDoContext(outputContext);
257 } 269 }
@@ -264,11 +276,13 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -264,11 +276,13 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
264 276
265 /** 277 /**
266 * 验证设备输出的目标设备是否合法有效 278 * 验证设备输出的目标设备是否合法有效
267 - * @param tbDeviceIds 场景联动所属组织的全部设备ID  
268 - * @param deviceIds 场景联动选择的设备ID  
269 - * @param entityType 设备输出类型 279 + *
  280 + * @param tbDeviceIds 场景联动所属组织的全部设备ID
  281 + * @param deviceIds 场景联动选择的设备ID
  282 + * @param entityType 设备输出类型
270 */ 283 */
271 - private void validateRpcDevice(List<String> tbDeviceIds, List<String> deviceIds, ScopeEnum entityType) { 284 + private void validateRpcDevice(
  285 + List<String> tbDeviceIds, List<String> deviceIds, ScopeEnum entityType) {
272 if (ScopeEnum.PART.equals(entityType)) { 286 if (ScopeEnum.PART.equals(entityType)) {
273 if (deviceIds == null || deviceIds.isEmpty()) { 287 if (deviceIds == null || deviceIds.isEmpty()) {
274 throw new TkDataValidationException(ErrorMessage.DEVICE_LOSED.getMessage()); 288 throw new TkDataValidationException(ErrorMessage.DEVICE_LOSED.getMessage());
@@ -338,13 +352,13 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -338,13 +352,13 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
338 String organizationId = (String) queryMap.get("organizationId"); 352 String organizationId = (String) queryMap.get("organizationId");
339 // 不为空 353 // 不为空
340 if (null != organizationId && !StringUtils.isEmpty(organizationId)) { 354 if (null != organizationId && !StringUtils.isEmpty(organizationId)) {
341 - queryMap.put(  
342 - "organizationIds", getQueryOrganizationIds(tenantId, List.of(organizationId))); 355 + queryMap.put("organizationIds", getQueryOrganizationIds(tenantId, List.of(organizationId)));
343 } 356 }
344 if (!isCustomerUser) { 357 if (!isCustomerUser) {
345 queryMap.remove("currentUser"); 358 queryMap.remove("currentUser");
346 } 359 }
347 - IPage<TkSceneLinkageEntity> page = getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false); 360 + IPage<TkSceneLinkageEntity> page =
  361 + getPage(queryMap, FastIotConstants.DefaultOrder.CREATE_TIME, false);
348 IPage<SceneLinkageDTO> scenePage = baseMapper.getScenePage(page, queryMap); 362 IPage<SceneLinkageDTO> scenePage = baseMapper.getScenePage(page, queryMap);
349 return getPageData(scenePage, SceneLinkageDTO.class); 363 return getPageData(scenePage, SceneLinkageDTO.class);
350 } 364 }
@@ -392,7 +406,8 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -392,7 +406,8 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
392 * @return 设备集合 406 * @return 设备集合
393 */ 407 */
394 @Override 408 @Override
395 - public List<DeviceDTO> findDeviceList(String organizationId, String tenantId, String customerId,List<DeviceTypeEnum> typeFilter) { 409 + public List<DeviceDTO> findDeviceList(
  410 + String organizationId, String tenantId, String customerId, List<DeviceTypeEnum> typeFilter) {
396 List<String> organizationFilter = new ArrayList<>(); 411 List<String> organizationFilter = new ArrayList<>();
397 organizationFilter.add(organizationId); 412 organizationFilter.add(organizationId);
398 // 查询该组织的所有子类 413 // 查询该组织的所有子类
@@ -405,7 +420,6 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -405,7 +420,6 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
405 throw new TkDataValidationException(ErrorMessage.ORGANIZATION_NOT_EXTIED.getMessage()); 420 throw new TkDataValidationException(ErrorMessage.ORGANIZATION_NOT_EXTIED.getMessage());
406 } 421 }
407 422
408 -  
409 List<String> customerDevices = null; 423 List<String> customerDevices = null;
410 // 不等于默认的UUID就是客户用户 424 // 不等于默认的UUID就是客户用户
411 if (!EntityId.NULL_UUID.toString().equals(customerId)) { 425 if (!EntityId.NULL_UUID.toString().equals(customerId)) {
@@ -416,18 +430,19 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -416,18 +430,19 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
416 } 430 }
417 431
418 List<TkDeviceEntity> orgDevices = 432 List<TkDeviceEntity> orgDevices =
419 - deviceMapper.selectList(  
420 - new QueryWrapper<TkDeviceEntity>().lambda().in(TkDeviceEntity::getOrganizationId, orgIds)); 433 + deviceMapper.selectList(
  434 + new QueryWrapper<TkDeviceEntity>()
  435 + .lambda()
  436 + .in(TkDeviceEntity::getOrganizationId, orgIds));
421 List<DeviceDTO> result = 437 List<DeviceDTO> result =
422 orgDevices.stream() 438 orgDevices.stream()
423 - .filter(t -> typeFilter.isEmpty() || typeFilter.contains(t.getDeviceType())) 439 + .filter(t -> typeFilter.isEmpty() || typeFilter.contains(t.getDeviceType()))
424 .map(device -> device.getDTO(DeviceDTO.class)) 440 .map(device -> device.getDTO(DeviceDTO.class))
425 .collect(Collectors.toList()); 441 .collect(Collectors.toList());
426 442
427 -  
428 if (null != customerDevices && customerDevices.size() > 0) { 443 if (null != customerDevices && customerDevices.size() > 0) {
429 List<DeviceDTO> customerAllDevice = new ArrayList<>(); 444 List<DeviceDTO> customerAllDevice = new ArrayList<>();
430 - for(DeviceDTO item: result){ 445 + for (DeviceDTO item : result) {
431 if (customerDevices.contains(item.getTbDeviceId())) { 446 if (customerDevices.contains(item.getTbDeviceId())) {
432 customerAllDevice.add(item); 447 customerAllDevice.add(item);
433 } 448 }
@@ -480,9 +495,6 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -480,9 +495,6 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
480 return null; 495 return null;
481 } 496 }
482 497
483 -  
484 -  
485 -  
486 Map<String, List<String>> matchedDevices = new HashMap<>(); 498 Map<String, List<String>> matchedDevices = new HashMap<>();
487 Map<String, List<String>> matchedProjectes = new HashMap<>(); 499 Map<String, List<String>> matchedProjectes = new HashMap<>();
488 500
@@ -493,27 +505,45 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -493,27 +505,45 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
493 .eq(TkTriggerEntity::getTenantId, tenantId) 505 .eq(TkTriggerEntity::getTenantId, tenantId)
494 .eq(TkTriggerEntity::getTriggerType, TriggerTypeEnum.DEVICE_TRIGGER) 506 .eq(TkTriggerEntity::getTriggerType, TriggerTypeEnum.DEVICE_TRIGGER)
495 .in(TkTriggerEntity::getSceneLinkageId, enableIds)); 507 .in(TkTriggerEntity::getSceneLinkageId, enableIds));
496 -  
497 triggers.forEach( 508 triggers.forEach(
498 trigger -> { 509 trigger -> {
499 - String scenId = trigger.getSceneLinkageId();  
500 - if (ScopeEnum.ALL.equals(trigger.getEntityType()) ) {  
501 - List<String> triggerIds = new ArrayList<>();  
502 - triggerIds.add(trigger.getDeviceProfileId());  
503 - deviceSceneMap(matchedProjectes, triggerIds, scenId);  
504 - }else{  
505 - deviceSceneMap(matchedDevices, trigger.getEntityId(), scenId);  
506 - } 510 + deviceSceneMap(
  511 + matchedProjectes,
  512 + matchedDevices,
  513 + trigger.getSceneLinkageId(),
  514 + trigger.getEntityType(),
  515 + trigger.getDeviceProfileId(),
  516 + trigger.getEntityId());
  517 + });
507 518
  519 + List<TkDoConditionEntity> conditions =
  520 + doConditionMapper.selectList(
  521 + new QueryWrapper<TkDoConditionEntity>()
  522 + .lambda()
  523 + .eq(TkDoConditionEntity::getTenantId, tenantId)
  524 + .eq(TkDoConditionEntity::getTriggerType, TriggerTypeEnum.DEVICE_TRIGGER)
  525 + .in(TkDoConditionEntity::getSceneLinkageId, enableIds));
  526 + conditions.forEach(
  527 + condition -> {
  528 + deviceSceneMap(
  529 + matchedProjectes,
  530 + matchedDevices,
  531 + condition.getSceneLinkageId(),
  532 + condition.getEntityType(),
  533 + condition.getDeviceProfileId(),
  534 + condition.getEntityId());
508 }); 535 });
509 536
510 - if (matchedDevices.isEmpty()&&matchedProjectes.isEmpty()) { 537 + if (matchedDevices.isEmpty() && matchedProjectes.isEmpty()) {
511 return null; 538 return null;
512 } 539 }
513 540
514 -List<TkDeviceProfileEntity> profiles = profileMapper.selectList(new LambdaQueryWrapper<TkDeviceProfileEntity>().eq(TkDeviceProfileEntity::getTenantId,tenantId)); 541 + List<TkDeviceProfileEntity> profiles =
  542 + profileMapper.selectList(
  543 + new LambdaQueryWrapper<TkDeviceProfileEntity>()
  544 + .eq(TkDeviceProfileEntity::getTenantId, tenantId));
515 Map<String, String> projects = new HashMap<>(); 545 Map<String, String> projects = new HashMap<>();
516 - profiles.forEach(f->projects.put(f.getTbProfileId(),f.getId())); 546 + profiles.forEach(f -> projects.put(f.getTbProfileId(), f.getId()));
517 547
518 Map<String, Map> engineConfig = new HashMap<>(); 548 Map<String, Map> engineConfig = new HashMap<>();
519 engineConfig.put("scenes", matchedDevices); 549 engineConfig.put("scenes", matchedDevices);
@@ -525,18 +555,32 @@ List<TkDeviceProfileEntity> profiles = profileMapper.selectList(new LambdaQueryW @@ -525,18 +555,32 @@ List<TkDeviceProfileEntity> profiles = profileMapper.selectList(new LambdaQueryW
525 return JacksonUtil.convertValue(engineConfig, JsonNode.class); 555 return JacksonUtil.convertValue(engineConfig, JsonNode.class);
526 } 556 }
527 557
528 - 558 + private void deviceSceneMap(
  559 + Map<String, List<String>> matchedProjectes,
  560 + Map<String, List<String>> matchedDevices,
  561 + String scenId,
  562 + ScopeEnum scope,
  563 + String profileId,
  564 + List<String> deviceIds) {
  565 + if (ScopeEnum.ALL.equals(scope)) {
  566 + List<String> profileIds = new ArrayList<>();
  567 + profileIds.add(profileId);
  568 + deviceSceneMap(matchedProjectes, profileIds, scenId);
  569 + } else {
  570 + deviceSceneMap(matchedDevices, deviceIds, scenId);
  571 + }
  572 + }
529 573
530 /** 574 /**
531 * 设备与场景联动的映射集合 575 * 设备与场景联动的映射集合
532 * 576 *
533 * @param resultMap 缓存设备和场景联动映射结果的集合 577 * @param resultMap 缓存设备和场景联动映射结果的集合
534 - * @param devices 设备主键集合 578 + * @param triggerIds 设备或产品主键集合
535 * @param scenId 场景联动主键 579 * @param scenId 场景联动主键
536 */ 580 */
537 private void deviceSceneMap( 581 private void deviceSceneMap(
538 - Map<String, List<String>> resultMap, List<String> devices, String scenId) {  
539 - for (String deviceId : devices) { 582 + Map<String, List<String>> resultMap, List<String> triggerIds, String scenId) {
  583 + for (String deviceId : triggerIds) {
540 List<String> scenes = resultMap.computeIfAbsent(deviceId, k -> new ArrayList<String>()); 584 List<String> scenes = resultMap.computeIfAbsent(deviceId, k -> new ArrayList<String>());
541 if (!scenes.contains(scenId)) { 585 if (!scenes.contains(scenId)) {
542 scenes.add(scenId); 586 scenes.add(scenId);
@@ -3,12 +3,10 @@ package org.thingsboard.rule.engine.yunteng.scene; @@ -3,12 +3,10 @@ package org.thingsboard.rule.engine.yunteng.scene;
3 3
4 import com.fasterxml.jackson.databind.JsonNode; 4 import com.fasterxml.jackson.databind.JsonNode;
5 import com.fasterxml.jackson.databind.node.ObjectNode; 5 import com.fasterxml.jackson.databind.node.ObjectNode;
6 -import java.util.HashSet;  
7 -import java.util.List;  
8 -import java.util.Set;  
9 -import java.util.UUID; 6 +import java.util.*;
10 import java.util.concurrent.ConcurrentHashMap; 7 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ExecutionException; 8 import java.util.concurrent.ExecutionException;
  9 +import java.util.concurrent.atomic.AtomicBoolean;
12 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
13 import org.apache.commons.lang3.StringUtils; 11 import org.apache.commons.lang3.StringUtils;
14 import org.jetbrains.annotations.NotNull; 12 import org.jetbrains.annotations.NotNull;
@@ -86,7 +84,16 @@ class ReactState { @@ -86,7 +84,16 @@ class ReactState {
86 this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class); 84 this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
87 } 85 }
88 86
89 - public void process(TbContext ctx, TbMsg msg, String profileId, String deviceId) 87 + /**
  88 + * 触发器内部逻辑:或 执行条件内部逻辑:且 触发器和执行条件之间的逻辑:且
  89 + *
  90 + * @param ctx
  91 + * @param msg 设备推送的遥测数据
  92 + * @param deviceId 遥测数据的来源设备的TB设备ID
  93 + * @throws ExecutionException
  94 + * @throws InterruptedException
  95 + */
  96 + public void process(TbContext ctx, TbMsg msg, String deviceId)
90 throws ExecutionException, InterruptedException { 97 throws ExecutionException, InterruptedException {
91 98
92 StringBuilder detail = new StringBuilder(); 99 StringBuilder detail = new StringBuilder();
@@ -94,74 +101,110 @@ class ReactState { @@ -94,74 +101,110 @@ class ReactState {
94 ctx.tellSuccess(msg); 101 ctx.tellSuccess(msg);
95 } 102 }
96 103
97 - boolean matched = false;  
98 - if (triggers == null || triggers.isEmpty()) {  
99 - matched = true;  
100 - } else {  
101 - boolean itemMatched = false;  
102 - for (TriggerDTO trigger : triggers) {  
103 - TriggerState triggerState = getOrCreateTriggerState(trigger, profileId, deviceId);  
104 - if (triggerState == null) {  
105 - continue;  
106 - }  
107 - itemMatched = triggerState.process(ctx, msg);  
108 - if (itemMatched) {  
109 - matched = true;  
110 - detail.append(  
111 - triggerState.getAlarmDetails() == null ? "" : triggerState.getAlarmDetails());  
112 - if (this.alarmAction != null) {  
113 - noticeMsg(  
114 - ctx,  
115 - msg,  
116 - alarmAction,  
117 - deviceId,  
118 - triggerState.getAlarmDetails(),  
119 - triggerState.getLatestValues().getTs());  
120 - }  
121 - } else if (currentAlarms.containsKey(deviceId) && this.alarmAction != null) {  
122 - // 清除设备告警  
123 - for (AlarmConditionFilterKey entityKey : triggerState.getEntityKeys()) {  
124 - clearAlarm(ctx, msg, deviceId, entityKey.getKey());  
125 - }  
126 - }  
127 - }  
128 - } 104 + AtomicBoolean triggerMatched = new AtomicBoolean(true);
  105 + Optional.ofNullable(triggers)
  106 + .ifPresent(
  107 + t -> {
  108 + triggerMatched.set(false);
  109 + t.forEach(
  110 + trigger -> {
  111 + ScopeEnum entityType = trigger.getEntityType();
  112 + List<String> trifggerDevices = trigger.getEntityId();
  113 + String tkProjectId = trigger.getDeviceProfileId();
  114 + if (ScopeEnum.ALL.equals(entityType)) {
  115 + trifggerDevices =
  116 + ytDeviceService.findTbDeviceIdsByDeviceProfileId(
  117 + tkProjectId, trigger.getTenantId());
  118 + }
  119 + triggerMatched.set(
  120 + trifggerDevices.stream()
  121 + .anyMatch(
  122 + id -> {
  123 + TriggerState triggerState =
  124 + getOrCreateTriggerState(trigger, tkProjectId, id);
  125 + if (triggerState == null) {
  126 + return false;
  127 + }
  128 + try {
  129 + if (triggerState.process(ctx, msg)) {
  130 + detail.append(
  131 + triggerState.getAlarmDetails() == null
  132 + ? ""
  133 + : triggerState.getAlarmDetails());
  134 + return true;
  135 + } else if (currentAlarms.containsKey(deviceId)) {
  136 + // 清除设备告警
  137 + for (AlarmConditionFilterKey entityKey :
  138 + triggerState.getEntityKeys()) {
  139 + clearAlarm(ctx, msg, deviceId, entityKey.getKey());
  140 + }
  141 + }
  142 + } catch (ExecutionException e) {
  143 + throw new RuntimeException(e);
  144 + } catch (InterruptedException e) {
  145 + throw new RuntimeException(e);
  146 + }
  147 + return false;
  148 + }));
  149 + });
  150 + });
129 151
130 - if (matched && conditions.size() > 0) {  
131 - matched = false;  
132 - for (TkDoConditionEntity item : conditions) {  
133 - List<String> entityIds = item.getEntityId();  
134 - if (entityIds == null || entityIds.isEmpty()) {  
135 - matched = true;  
136 - break;  
137 - }  
138 - for (String id : entityIds) {  
139 - TriggerState conditionState = getOrCreateConditionState(item, profileId, id);  
140 - if (conditionState == null || conditionState.process(ctx, msg)) {  
141 - detail.append(";");  
142 - detail.append(conditionState.getAlarmDetails());  
143 - matched = true;  
144 - break;  
145 - }  
146 - }  
147 - if (matched) {  
148 - break;  
149 - }  
150 - } 152 + /** 执行条件的所有设备都满足才为true */
  153 + AtomicBoolean conditionMatched = new AtomicBoolean(false);
  154 + if (triggerMatched.get()) {
  155 + conditionMatched.set(true);
  156 + Optional.ofNullable(conditions)
  157 + .ifPresent(
  158 + t -> {
  159 + t.forEach(
  160 + condition -> {
  161 + ScopeEnum entityType = condition.getEntityType();
  162 + List<String> conditionDevices = condition.getEntityId();
  163 + String tkProjectId = condition.getDeviceProfileId();
  164 + if (ScopeEnum.ALL.equals(entityType)) {
  165 + conditionDevices =
  166 + ytDeviceService.findTbDeviceIdsByDeviceProfileId(
  167 + tkProjectId, condition.getTenantId());
  168 + }
  169 + conditionMatched.set(
  170 + !conditionDevices.stream()
  171 + .anyMatch(
  172 + id -> {
  173 + TriggerState conditionState =
  174 + getOrCreateConditionState(condition, tkProjectId, id);
  175 + try {
  176 + return !conditionState.process(ctx, msg);
  177 + } catch (ExecutionException e) {
  178 + throw new RuntimeException(e);
  179 + } catch (InterruptedException e) {
  180 + throw new RuntimeException(e);
  181 + }
  182 + }));
  183 + });
  184 + });
151 } 185 }
152 186
153 - if (matched) { 187 + if (triggerMatched.get() && conditionMatched.get()) {
154 for (TkDoActionEntity item : actions) { 188 for (TkDoActionEntity item : actions) {
155 if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) { 189 if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {
156 - continue; 190 + noticeMsg(ctx, msg, item, deviceId, detail.toString(), msg.getTs());
  191 + } else {
  192 + pushMsg(ctx, msg, item, detail.toString());
157 } 193 }
158 - pushMsg(ctx, msg, item, detail.toString());  
159 } 194 }
160 } else { 195 } else {
161 ctx.tellSuccess(msg); 196 ctx.tellSuccess(msg);
162 } 197 }
163 } 198 }
164 199
  200 + /**
  201 + * 获取触发器状态
  202 + *
  203 + * @param trigger 触发器数据
  204 + * @param profileId 遥测数据的来源设备的TK产品ID
  205 + * @param deviceId 遥测数据的来源设备的TB设备ID
  206 + * @return
  207 + */
165 protected TriggerState getOrCreateTriggerState( 208 protected TriggerState getOrCreateTriggerState(
166 TriggerDTO trigger, String profileId, String deviceId) { 209 TriggerDTO trigger, String profileId, String deviceId) {
167 String triggerId = trigger.getId(); 210 String triggerId = trigger.getId();
@@ -242,10 +285,6 @@ class ReactState { @@ -242,10 +285,6 @@ class ReactState {
242 } 285 }
243 286
244 private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) { 287 private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {
245 - TbMsgMetaData metaData = // lastMsgMetaData != null ? lastMsgMetaData.copy() :  
246 - new TbMsgMetaData();  
247 - String relationType = "";  
248 - TbMsg newMsg = null;  
249 switch (action.getOutTarget()) { 288 switch (action.getOutTarget()) {
250 case DEVICE_OUT: 289 case DEVICE_OUT:
251 List<String> rpcDevices = action.getDeviceId(); 290 List<String> rpcDevices = action.getDeviceId();
1 /** 1 /**
2 * Copyright © 2016-2022 The Thingsboard Authors 2 * Copyright © 2016-2022 The Thingsboard Authors
3 * 3 *
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at 4 + * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  5 + * except in compliance with the License. You may obtain a copy of the License at
7 * 6 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 7 + * <p>http://www.apache.org/licenses/LICENSE-2.0
9 * 8 *
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and 9 + * <p>Unless required by applicable law or agreed to in writing, software distributed under the
  10 + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  11 + * express or implied. See the License for the specific language governing permissions and
14 * limitations under the License. 12 * limitations under the License.
15 */ 13 */
16 package org.thingsboard.rule.engine.yunteng.scene; 14 package org.thingsboard.rule.engine.yunteng.scene;
17 15
18 import com.fasterxml.jackson.databind.ObjectMapper; 16 import com.fasterxml.jackson.databind.ObjectMapper;
19 -import com.google.gson.JsonParser; 17 +import java.util.List;
  18 +import java.util.Map;
  19 +import java.util.concurrent.ConcurrentHashMap;
20 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.rule.engine.api.*; 21 import org.thingsboard.rule.engine.api.*;
22 import org.thingsboard.rule.engine.api.util.TbNodeUtils; 22 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
@@ -24,129 +24,115 @@ import org.thingsboard.rule.engine.filter.TbCheckAlarmStatusNodeConfig; @@ -24,129 +24,115 @@ import org.thingsboard.rule.engine.filter.TbCheckAlarmStatusNodeConfig;
24 import org.thingsboard.rule.engine.profile.SnapshotUpdate; 24 import org.thingsboard.rule.engine.profile.SnapshotUpdate;
25 import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; 25 import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState;
26 import org.thingsboard.server.common.data.device.profile.AlarmConditionKeyType; 26 import org.thingsboard.server.common.data.device.profile.AlarmConditionKeyType;
27 -import org.thingsboard.server.common.data.id.DeviceId;  
28 -import org.thingsboard.server.common.data.kv.AttributeKvEntry;  
29 import org.thingsboard.server.common.data.plugin.ComponentType; 27 import org.thingsboard.server.common.data.plugin.ComponentType;
30 import org.thingsboard.server.common.msg.TbMsg; 28 import org.thingsboard.server.common.msg.TbMsg;
31 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; 29 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
32 -import org.thingsboard.server.common.msg.session.SessionMsgType;  
33 -import org.thingsboard.server.common.transport.adaptor.JsonConverter;  
34 -  
35 -import java.util.List;  
36 -import java.util.Map;  
37 -import java.util.Optional;  
38 -import java.util.Set;  
39 -import java.util.concurrent.ConcurrentHashMap;  
40 -import java.util.concurrent.ExecutionException;  
41 30
42 @Slf4j 31 @Slf4j
43 @RuleNode( 32 @RuleNode(
44 - type = ComponentType.FILTER,  
45 - name = "scene react",  
46 - configClazz = TbCheckAlarmStatusNodeConfig.class,  
47 - relationTypes = {"Message","Alarm Created", "Alarm Updated", "RPC Request"},  
48 - nodeDescription = "基于业务场景,实现设备的交互控制。",  
49 - nodeDetails = "基于业务场景,实现设备的交互控制。",  
50 - uiResources = {"static/rulenode/rulenode-core-config.js"},  
51 - configDirective = "tbSceneReactNodeConfig") 33 + type = ComponentType.FILTER,
  34 + name = "scene react",
  35 + configClazz = TbCheckAlarmStatusNodeConfig.class,
  36 + relationTypes = {"Message", "Alarm Created", "Alarm Updated", "RPC Request"},
  37 + nodeDescription = "基于业务场景,实现设备的交互控制。",
  38 + nodeDetails = "基于业务场景,实现设备的交互控制。",
  39 + uiResources = {"static/rulenode/rulenode-core-config.js"},
  40 + configDirective = "tbSceneReactNodeConfig")
52 public class TbSceneReactNode implements TbNode { 41 public class TbSceneReactNode implements TbNode {
53 - private final ObjectMapper mapper = new ObjectMapper();  
54 - private TbSceneReactNodeConfig config;  
55 -  
56 -  
57 -  
58 -  
59 - /**场景联动状态  
60 - * 键:场景联动主键  
61 - * 值:场景联动状态  
62 - * */  
63 - private final Map<String, ReactState> reactStates = new ConcurrentHashMap<>();  
64 -  
65 -  
66 -  
67 - private TbContext ctx;  
68 -  
69 - @Override  
70 - public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {  
71 - this.config = TbNodeUtils.convert(configuration, TbSceneReactNodeConfig.class);  
72 - this.ctx = ctx; 42 + private final ObjectMapper mapper = new ObjectMapper();
  43 + private TbSceneReactNodeConfig config;
  44 +
  45 + /** 场景联动状态 键:场景联动主键 值:场景联动状态 */
  46 + private final Map<String, ReactState> reactStates = new ConcurrentHashMap<>();
  47 +
  48 + private TbContext ctx;
  49 +
  50 + @Override
  51 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  52 + this.config = TbNodeUtils.convert(configuration, TbSceneReactNodeConfig.class);
  53 + this.ctx = ctx;
  54 + }
  55 +
  56 + @Override
  57 + public void onMsg(TbContext ctx, TbMsg msg) {
  58 + String deviceId = msg.getOriginator().getId().toString();
  59 + String tbProfileId = msg.getMetaData().getValue("deviceProfileId");
  60 + String projectId = config.getProfile().get(tbProfileId);
  61 + boolean deviceHas = config.getScenes().containsKey(deviceId);
  62 + boolean profileHas = config.getProject().containsKey(projectId);
  63 + if (!deviceHas && !profileHas) {
  64 + ctx.tellSuccess(msg);
  65 + return;
73 } 66 }
74 -  
75 - @Override  
76 - public void onMsg(TbContext ctx, TbMsg msg) {  
77 - String deviceId = msg.getOriginator().getId().toString();  
78 - String tbProfileId = msg.getMetaData().getValue("deviceProfileId");  
79 - String projectId = config.getProfile().get(tbProfileId);  
80 - boolean deviceHas = config.getScenes().containsKey(deviceId);  
81 - boolean profileHas = config.getProject().containsKey(projectId);  
82 - if(!deviceHas && !profileHas){  
83 - ctx.tellSuccess(msg);  
84 - return;  
85 - }  
86 - List<String> devScence = config.getScenes().get(deviceId);  
87 - if(devScence !=null && !devScence.isEmpty()){  
88 - devScence.stream().forEach(t ->{  
89 - ReactState react = getOrCreateReactState(ctx,config,t);  
90 - if(react != null){  
91 - try {  
92 - react.process(ctx, msg,projectId,deviceId);  
93 - } catch (Exception e) {  
94 - ctx.tellFailure(msg,e);  
95 - }  
96 - }  
97 - });  
98 - }  
99 - List<String> projectScence = config.getProject().get(projectId);  
100 - if(projectScence !=null && !projectScence.isEmpty()){  
101 - projectScence.stream().forEach(t ->{  
102 - ReactState react = getOrCreateReactState(ctx,config,t);  
103 - if(react != null){  
104 - try {  
105 - react.process(ctx, msg,projectId,deviceId);  
106 - } catch (Exception e) {  
107 - ctx.tellFailure(msg,e);  
108 - } 67 + List<String> devScence = config.getScenes().get(deviceId);
  68 + if (devScence != null && !devScence.isEmpty()) {
  69 + devScence.stream()
  70 + .forEach(
  71 + t -> {
  72 + ReactState react = getOrCreateReactState(ctx, config, t);
  73 + if (react != null) {
  74 + try {
  75 + react.process(ctx, msg, deviceId);
  76 + } catch (Exception e) {
  77 + ctx.tellFailure(msg, e);
  78 + }
109 } 79 }
110 - });  
111 - }  
112 - }  
113 -  
114 - /**  
115 - * 设备数据是否包含触发器的指标  
116 - * @param update 最新数据  
117 - * @param condition 触发条件工具类  
118 - * @return  
119 - */  
120 - public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState condition) {  
121 - if (update != null) {  
122 - //Check that the update type and that keys match.  
123 - if (update.getType().equals(AlarmConditionKeyType.TIME_SERIES)) {  
124 - return condition.validateTsUpdate(update.getKeys());  
125 - } else if (update.getType().equals(AlarmConditionKeyType.ATTRIBUTE)) {  
126 - return condition.validateAttrUpdate(update.getKeys());  
127 - }  
128 - }  
129 - return true; 80 + });
130 } 81 }
131 - @Override  
132 - public void destroy() {  
133 - reactStates.clear(); 82 + List<String> projectScence = config.getProject().get(projectId);
  83 + if (projectScence != null && !projectScence.isEmpty()) {
  84 + projectScence.stream()
  85 + .forEach(
  86 + t -> {
  87 + ReactState react = getOrCreateReactState(ctx, config, t);
  88 + if (react != null) {
  89 + try {
  90 + react.process(ctx, msg, deviceId);
  91 + } catch (Exception e) {
  92 + ctx.tellFailure(msg, e);
  93 + }
  94 + }
  95 + });
134 } 96 }
135 -  
136 - @Override  
137 - public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {  
138 - // Cleanup the cache for all entities that are no longer assigned to current server partitions  
139 - reactStates.clear(); 97 + }
  98 +
  99 + /**
  100 + * 设备数据是否包含触发器的指标
  101 + *
  102 + * @param update 最新数据
  103 + * @param condition 触发条件工具类
  104 + * @return
  105 + */
  106 + public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState condition) {
  107 + if (update != null) {
  108 + // Check that the update type and that keys match.
  109 + if (update.getType().equals(AlarmConditionKeyType.TIME_SERIES)) {
  110 + return condition.validateTsUpdate(update.getKeys());
  111 + } else if (update.getType().equals(AlarmConditionKeyType.ATTRIBUTE)) {
  112 + return condition.validateAttrUpdate(update.getKeys());
  113 + }
140 } 114 }
141 -  
142 - protected ReactState getOrCreateReactState(TbContext ctx, TbSceneReactNodeConfig config, String sceneId) {  
143 - ReactState reactState = reactStates.get(sceneId);  
144 - if (reactState == null) {  
145 - reactState = new ReactState(sceneId,ctx, config);  
146 - reactStates.put(sceneId, reactState);  
147 - }  
148 - return reactState; 115 + return true;
  116 + }
  117 +
  118 + @Override
  119 + public void destroy() {
  120 + reactStates.clear();
  121 + }
  122 +
  123 + @Override
  124 + public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
  125 + // Cleanup the cache for all entities that are no longer assigned to current server partitions
  126 + reactStates.clear();
  127 + }
  128 +
  129 + protected ReactState getOrCreateReactState(
  130 + TbContext ctx, TbSceneReactNodeConfig config, String sceneId) {
  131 + ReactState reactState = reactStates.get(sceneId);
  132 + if (reactState == null) {
  133 + reactState = new ReactState(sceneId, ctx, config);
  134 + reactStates.put(sceneId, reactState);
149 } 135 }
150 -  
151 - 136 + return reactState;
  137 + }
152 } 138 }
1 -/**  
2 - * 场景联动配置文件  
3 - */ 1 +/** 场景联动配置文件 */
4 package org.thingsboard.rule.engine.yunteng.scene; 2 package org.thingsboard.rule.engine.yunteng.scene;
5 3
6 -import lombok.Data;  
7 -import org.thingsboard.rule.engine.api.NodeConfiguration;  
8 -import org.thingsboard.server.common.data.id.EntityId;  
9 -import org.thingsboard.server.common.data.id.TenantId;  
10 -  
11 import java.util.HashMap; 4 import java.util.HashMap;
12 import java.util.List; 5 import java.util.List;
13 import java.util.Map; 6 import java.util.Map;
  7 +import lombok.Data;
  8 +import org.thingsboard.rule.engine.api.NodeConfiguration;
14 9
15 @Data 10 @Data
16 public class TbSceneReactNodeConfig implements NodeConfiguration<TbSceneReactNodeConfig> { 11 public class TbSceneReactNodeConfig implements NodeConfiguration<TbSceneReactNodeConfig> {
17 12
18 - /**【设备ID,场景】设备的哪些指标会触发场景联动*/  
19 - private Map<String, List<String>> scenes;  
20 - /**【产品ID,场景】设备的哪些指标会触发场景联动*/  
21 - private Map<String, List<String>> project;  
22 - /**【场景ID,场景名称】场景联动信息*/  
23 - private Map<String, String> names;  
24 - /**【场景ID,组织ID】场景联动所属组织信息*/  
25 - private Map<String, String> orgs;  
26 - private Map<String, String> profile;  
27 -  
28 -  
29 -  
30 -  
31 -  
32 - @Override  
33 - public TbSceneReactNodeConfig defaultConfiguration() {  
34 - TbSceneReactNodeConfig config = new TbSceneReactNodeConfig();  
35 - Map<String, List<String>> sceneMap = new HashMap<>();  
36 - Map<String, List<String>> projectScenes = new HashMap<>();  
37 -  
38 - config.setScenes(sceneMap);  
39 - config.setProject(projectScenes);  
40 - config.setNames(new HashMap<>());  
41 - config.setOrgs(new HashMap<>());  
42 - config.setProfile(new HashMap<>());  
43 - return config;  
44 - } 13 + /** 【TB设备配置ID,TK产品ID】产品信息 */
  14 + private Map<String, String> profile;
  15 + /** 【触发器的TK产品ID,场景】哪些产品会触发场景联动 */
  16 + private Map<String, List<String>> project;
  17 + /** 【触发器的TB设备ID,场景】哪些设备会触发场景联动 */
  18 + private Map<String, List<String>> scenes;
  19 + /** 【TK场景ID,场景名称】场景联动信息 */
  20 + private Map<String, String> names;
  21 + /** 【TK场景ID,组织ID】场景联动所属组织信息 */
  22 + private Map<String, String> orgs;
  23 +
  24 + @Override
  25 + public TbSceneReactNodeConfig defaultConfiguration() {
  26 + TbSceneReactNodeConfig config = new TbSceneReactNodeConfig();
  27 + Map<String, List<String>> sceneMap = new HashMap<>();
  28 + Map<String, List<String>> projectScenes = new HashMap<>();
  29 +
  30 + config.setScenes(sceneMap);
  31 + config.setProject(projectScenes);
  32 + config.setNames(new HashMap<>());
  33 + config.setOrgs(new HashMap<>());
  34 + config.setProfile(new HashMap<>());
  35 + return config;
  36 + }
45 } 37 }
1 /** 1 /**
2 * Copyright © 2016-2022 The Thingsboard Authors 2 * Copyright © 2016-2022 The Thingsboard Authors
3 - * <p>  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p>  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and 3 + *
  4 + * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  5 + * except in compliance with the License. You may obtain a copy of the License at
  6 + *
  7 + * <p>http://www.apache.org/licenses/LICENSE-2.0
  8 + *
  9 + * <p>Unless required by applicable law or agreed to in writing, software distributed under the
  10 + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  11 + * express or implied. See the License for the specific language governing permissions and
14 * limitations under the License. 12 * limitations under the License.
15 */ 13 */
16 package org.thingsboard.rule.engine.yunteng.scene; 14 package org.thingsboard.rule.engine.yunteng.scene;
17 15
18 import com.google.gson.JsonParser; 16 import com.google.gson.JsonParser;
  17 +import java.util.*;
  18 +import java.util.concurrent.ExecutionException;
  19 +import java.util.function.BiFunction;
19 import lombok.Data; 20 import lombok.Data;
  21 +import lombok.Getter;
20 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.rule.engine.api.TbContext; 23 import org.thingsboard.rule.engine.api.TbContext;
22 import org.thingsboard.rule.engine.profile.*; 24 import org.thingsboard.rule.engine.profile.*;
@@ -36,243 +38,312 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -36,243 +38,312 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
36 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 38 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 import org.thingsboard.server.dao.sql.query.EntityKeyMapping; 39 import org.thingsboard.server.dao.sql.query.EntityKeyMapping;
38 40
39 -import java.util.*;  
40 -import java.util.concurrent.ExecutionException;  
41 -import java.util.function.BiFunction;  
42 -  
43 @Data 41 @Data
44 @Slf4j 42 @Slf4j
45 class TriggerState { 43 class TriggerState {
46 - private final String originator;  
47 - private volatile TriggerRuleState ruleState;  
48 - private volatile Alarm currentAlarm;  
49 - private volatile boolean initialFetchDone;  
50 - private volatile TbMsgMetaData lastMsgMetaData;  
51 -  
52 - private final DynamicPredicateValueCtx dynamicPredicateValueCtx;  
53 - private DataSnapshot latestValues;  
54 -  
55 - private final Set<AlarmConditionFilterKey> entityKeys;  
56 - private final String alarmDetails;  
57 -  
58 - TriggerState(String originator, AlarmRule rule, Set<AlarmConditionFilterKey> filterKeys, String alarmDetails, DynamicPredicateValueCtx dynamicPredicateValueCtx) {  
59 -  
60 - this.originator = originator;  
61 - this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;  
62 - ruleState = new TriggerRuleState(rule.getCondition(), filterKeys, new PersistedAlarmRuleState(), rule.getSchedule());  
63 - this.entityKeys = filterKeys;  
64 - this.alarmDetails = alarmDetails; 44 + private final String originator;
  45 + private volatile TriggerRuleState ruleState;
  46 + private volatile Alarm currentAlarm;
  47 + private volatile boolean initialFetchDone;
  48 + private volatile TbMsgMetaData lastMsgMetaData;
  49 +
  50 + private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
  51 + private DataSnapshot latestValues;
  52 +
  53 + private final Set<AlarmConditionFilterKey> entityKeys;
  54 + private final String alarmDetails;
  55 +
  56 + @Getter
  57 + /** 触发条件执行结果 */
  58 + private boolean ruleMatched = false;
  59 +
  60 + TriggerState(
  61 + String originator,
  62 + AlarmRule rule,
  63 + Set<AlarmConditionFilterKey> filterKeys,
  64 + String alarmDetails,
  65 + DynamicPredicateValueCtx dynamicPredicateValueCtx) {
  66 +
  67 + this.originator = originator;
  68 + this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;
  69 + ruleState =
  70 + new TriggerRuleState(
  71 + rule.getCondition(), filterKeys, new PersistedAlarmRuleState(), rule.getSchedule());
  72 + this.entityKeys = filterKeys;
  73 + this.alarmDetails = alarmDetails;
  74 + }
  75 +
  76 + public boolean process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
  77 + if (!msg.getOriginator().getId().toString().equals(originator)) {
  78 + return ruleMatched;
65 } 79 }
66 -  
67 -  
68 - public boolean process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {  
69 - if (latestValues == null) {  
70 - latestValues = fetchLatestValues(ctx, originator);  
71 - }  
72 - lastMsgMetaData = msg.getMetaData();  
73 - SnapshotUpdate update = null;  
74 - if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {  
75 - update = processTelemetry(ctx, msg);  
76 - } else {  
77 - update = processAttributes(ctx, msg);  
78 - }  
79 -  
80 - if (update != null && update.hasUpdate()) {  
81 - return createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval);  
82 - }  
83 - return false; 80 + if (latestValues == null) {
  81 + latestValues = fetchLatestValues(ctx, originator);
84 } 82 }
85 -  
86 -  
87 - public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {  
88 - return createOrClearAlarms(ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, latestValues)); 83 + lastMsgMetaData = msg.getMetaData();
  84 + SnapshotUpdate update;
  85 + if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
  86 + update = processTelemetry(ctx, msg);
  87 + } else {
  88 + update = processAttributes(ctx, msg);
89 } 89 }
90 90
91 - public <T> boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction<TriggerRuleState, T, AlarmEvalResult> evalFunction) {  
92 - boolean stateUpdate = false;  
93 - if (!validateUpdate(update, ruleState)) {  
94 - return false;  
95 - }  
96 - AlarmEvalResult evalResult = evalFunction.apply(ruleState, data);  
97 - if (AlarmEvalResult.TRUE.equals(evalResult)) {  
98 - stateUpdate = true;  
99 - clearAlarmState(stateUpdate, ruleState);  
100 - } else if (AlarmEvalResult.FALSE.equals(evalResult)) {  
101 - clearAlarmState(stateUpdate, ruleState);  
102 - }  
103 - return stateUpdate; 91 + if (update != null && update.hasUpdate()) {
  92 + ruleMatched = createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval);
  93 + return ruleMatched;
104 } 94 }
105 -  
106 -  
107 - public boolean clearAlarmState(boolean stateUpdate, TriggerRuleState state) {  
108 - if (state != null) {  
109 - state.clear();  
110 - stateUpdate |= state.checkUpdate();  
111 - }  
112 - return stateUpdate; 95 + return false;
  96 + }
  97 +
  98 + public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
  99 + return createOrClearAlarms(
  100 + ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, latestValues));
  101 + }
  102 +
  103 + public <T> boolean createOrClearAlarms(
  104 + TbContext ctx,
  105 + TbMsg msg,
  106 + T data,
  107 + SnapshotUpdate update,
  108 + BiFunction<TriggerRuleState, T, AlarmEvalResult> evalFunction) {
  109 + boolean stateUpdate = false;
  110 + if (!validateUpdate(update, ruleState)) {
  111 + return false;
113 } 112 }
114 -  
115 - public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState state) {  
116 - if (update != null) {  
117 - //Check that the update type and that keys match.  
118 - if (update.getType().equals(AlarmConditionKeyType.TIME_SERIES)) {  
119 - return state.validateTsUpdate(update.getKeys());  
120 - } else if (update.getType().equals(AlarmConditionKeyType.ATTRIBUTE)) {  
121 - return state.validateAttrUpdate(update.getKeys());  
122 - }  
123 - }  
124 - return true; 113 + AlarmEvalResult evalResult = evalFunction.apply(ruleState, data);
  114 + if (AlarmEvalResult.TRUE.equals(evalResult)) {
  115 + stateUpdate = true;
  116 + clearAlarmState(stateUpdate, ruleState);
  117 + } else if (AlarmEvalResult.FALSE.equals(evalResult)) {
  118 + clearAlarmState(stateUpdate, ruleState);
125 } 119 }
  120 + return stateUpdate;
  121 + }
126 122
127 -  
128 - protected void setAlarmConditionMetadata(TriggerRuleState ruleState, TbMsgMetaData metaData) {  
129 - if (ruleState.getSpec().getType() == AlarmConditionSpecType.REPEATING) {  
130 - metaData.putValue(DataConstants.ALARM_CONDITION_REPEATS, String.valueOf(ruleState.getState().getEventCount()));  
131 - }  
132 - if (ruleState.getSpec().getType() == AlarmConditionSpecType.DURATION) {  
133 - metaData.putValue(DataConstants.ALARM_CONDITION_DURATION, String.valueOf(ruleState.getState().getDuration()));  
134 - } 123 + public boolean clearAlarmState(boolean stateUpdate, TriggerRuleState state) {
  124 + if (state != null) {
  125 + state.clear();
  126 + stateUpdate |= state.checkUpdate();
135 } 127 }
136 -  
137 -  
138 - public void processAckAlarm(Alarm alarm) {  
139 - if (currentAlarm != null && currentAlarm.getId().equals(alarm.getId())) {  
140 - currentAlarm.setStatus(alarm.getStatus());  
141 - currentAlarm.setAckTs(alarm.getAckTs());  
142 - } 128 + return stateUpdate;
  129 + }
  130 +
  131 + public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState state) {
  132 + if (update != null) {
  133 + // Check that the update type and that keys match.
  134 + if (update.getType().equals(AlarmConditionKeyType.TIME_SERIES)) {
  135 + return state.validateTsUpdate(update.getKeys());
  136 + } else if (update.getType().equals(AlarmConditionKeyType.ATTRIBUTE)) {
  137 + return state.validateAttrUpdate(update.getKeys());
  138 + }
143 } 139 }
144 -  
145 -  
146 - protected SnapshotUpdate processAttributes(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {  
147 - Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));  
148 - if (!attributes.isEmpty()) {  
149 - return merge(attributes);  
150 - }  
151 - return null; 140 + return true;
  141 + }
  142 +
  143 + protected void setAlarmConditionMetadata(TriggerRuleState ruleState, TbMsgMetaData metaData) {
  144 + if (ruleState.getSpec().getType() == AlarmConditionSpecType.REPEATING) {
  145 + metaData.putValue(
  146 + DataConstants.ALARM_CONDITION_REPEATS,
  147 + String.valueOf(ruleState.getState().getEventCount()));
152 } 148 }
153 -  
154 - protected SnapshotUpdate processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {  
155 - Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), msg.getMetaDataTs());  
156 - for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {  
157 - Long ts = entry.getKey();  
158 - List<KvEntry> data = entry.getValue();  
159 - return merge(ts, data);  
160 - }  
161 -  
162 - return null; 149 + if (ruleState.getSpec().getType() == AlarmConditionSpecType.DURATION) {
  150 + metaData.putValue(
  151 + DataConstants.ALARM_CONDITION_DURATION,
  152 + String.valueOf(ruleState.getState().getDuration()));
163 } 153 }
  154 + }
164 155
165 -  
166 - private DataSnapshot fetchLatestValues(TbContext ctx, String originator) throws ExecutionException, InterruptedException {  
167 - DataSnapshot result = new DataSnapshot(entityKeys);  
168 - addEntityKeysToSnapshot(ctx, originator, result);  
169 - return result; 156 + public void processAckAlarm(Alarm alarm) {
  157 + if (currentAlarm != null && currentAlarm.getId().equals(alarm.getId())) {
  158 + currentAlarm.setStatus(alarm.getStatus());
  159 + currentAlarm.setAckTs(alarm.getAckTs());
170 } 160 }
171 -  
172 - private void addEntityKeysToSnapshot(TbContext ctx, String originator, DataSnapshot result) throws InterruptedException, ExecutionException {  
173 - Set<String> attributeKeys = new HashSet<>();  
174 - Set<String> latestTsKeys = new HashSet<>();  
175 -  
176 - Device device = null;  
177 - for (AlarmConditionFilterKey entityKey : entityKeys) {  
178 - String key = entityKey.getKey();  
179 - switch (entityKey.getType()) {  
180 - case ATTRIBUTE:  
181 - attributeKeys.add(key);  
182 - break;  
183 - case TIME_SERIES:  
184 - latestTsKeys.add(key);  
185 - break;  
186 - case ENTITY_FIELD:  
187 - if (device == null) {  
188 - device = ctx.getDeviceService().findDeviceById(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)));  
189 - }  
190 - if (device != null) {  
191 - switch (key) {  
192 - case EntityKeyMapping.NAME:  
193 - result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getName()));  
194 - break;  
195 - case EntityKeyMapping.TYPE:  
196 - result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getType()));  
197 - break;  
198 - case EntityKeyMapping.CREATED_TIME:  
199 - result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromLong(device.getCreatedTime()));  
200 - break;  
201 - case EntityKeyMapping.LABEL:  
202 - result.putValue(entityKey, device.getCreatedTime(), EntityKeyValue.fromString(device.getLabel()));  
203 - break;  
204 - }  
205 - }  
206 - break;  
207 - }  
208 - }  
209 -  
210 - if (!latestTsKeys.isEmpty()) {  
211 - List<TsKvEntry> data = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)), latestTsKeys).get();  
212 - for (TsKvEntry entry : data) {  
213 - if (entry.getValue() != null) {  
214 - result.putValue(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey()), entry.getTs(), toEntityValue(entry));  
215 - }  
216 - }  
217 - }  
218 - if (!attributeKeys.isEmpty()) {  
219 - addToSnapshot(result, ctx.getAttributesService().find(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)), DataConstants.CLIENT_SCOPE, attributeKeys).get());  
220 - addToSnapshot(result, ctx.getAttributesService().find(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)), DataConstants.SHARED_SCOPE, attributeKeys).get());  
221 - addToSnapshot(result, ctx.getAttributesService().find(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)), DataConstants.SERVER_SCOPE, attributeKeys).get());  
222 - } 161 + }
  162 +
  163 + protected SnapshotUpdate processAttributes(TbContext ctx, TbMsg msg)
  164 + throws ExecutionException, InterruptedException {
  165 + Set<AttributeKvEntry> attributes =
  166 + JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
  167 + if (!attributes.isEmpty()) {
  168 + return merge(attributes);
  169 + }
  170 + return null;
  171 + }
  172 +
  173 + protected SnapshotUpdate processTelemetry(TbContext ctx, TbMsg msg)
  174 + throws ExecutionException, InterruptedException {
  175 + Map<Long, List<KvEntry>> tsKvMap =
  176 + JsonConverter.convertToSortedTelemetry(
  177 + new JsonParser().parse(msg.getData()), msg.getMetaDataTs());
  178 + for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {
  179 + Long ts = entry.getKey();
  180 + List<KvEntry> data = entry.getValue();
  181 + return merge(ts, data);
223 } 182 }
224 183
225 - private void addToSnapshot(DataSnapshot snapshot, List<AttributeKvEntry> data) {  
226 - for (AttributeKvEntry entry : data) {  
227 - if (entry.getValue() != null) {  
228 - EntityKeyValue value = toEntityValue(entry);  
229 - snapshot.putValue(new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, entry.getKey()), entry.getLastUpdateTs(), value); 184 + return null;
  185 + }
  186 +
  187 + private DataSnapshot fetchLatestValues(TbContext ctx, String originator)
  188 + throws ExecutionException, InterruptedException {
  189 + DataSnapshot result = new DataSnapshot(entityKeys);
  190 + addEntityKeysToSnapshot(ctx, originator, result);
  191 + return result;
  192 + }
  193 +
  194 + private void addEntityKeysToSnapshot(TbContext ctx, String originator, DataSnapshot result)
  195 + throws InterruptedException, ExecutionException {
  196 + Set<String> attributeKeys = new HashSet<>();
  197 + Set<String> latestTsKeys = new HashSet<>();
  198 +
  199 + Device device = null;
  200 + for (AlarmConditionFilterKey entityKey : entityKeys) {
  201 + String key = entityKey.getKey();
  202 + switch (entityKey.getType()) {
  203 + case ATTRIBUTE:
  204 + attributeKeys.add(key);
  205 + break;
  206 + case TIME_SERIES:
  207 + latestTsKeys.add(key);
  208 + break;
  209 + case ENTITY_FIELD:
  210 + if (device == null) {
  211 + device =
  212 + ctx.getDeviceService()
  213 + .findDeviceById(ctx.getTenantId(), new DeviceId(UUID.fromString(originator)));
  214 + }
  215 + if (device != null) {
  216 + switch (key) {
  217 + case EntityKeyMapping.NAME:
  218 + result.putValue(
  219 + entityKey,
  220 + device.getCreatedTime(),
  221 + EntityKeyValue.fromString(device.getName()));
  222 + break;
  223 + case EntityKeyMapping.TYPE:
  224 + result.putValue(
  225 + entityKey,
  226 + device.getCreatedTime(),
  227 + EntityKeyValue.fromString(device.getType()));
  228 + break;
  229 + case EntityKeyMapping.CREATED_TIME:
  230 + result.putValue(
  231 + entityKey,
  232 + device.getCreatedTime(),
  233 + EntityKeyValue.fromLong(device.getCreatedTime()));
  234 + break;
  235 + case EntityKeyMapping.LABEL:
  236 + result.putValue(
  237 + entityKey,
  238 + device.getCreatedTime(),
  239 + EntityKeyValue.fromString(device.getLabel()));
  240 + break;
230 } 241 }
231 - } 242 + }
  243 + break;
  244 + }
232 } 245 }
233 246
234 - public static EntityKeyValue toEntityValue(KvEntry entry) {  
235 - switch (entry.getDataType()) {  
236 - case STRING:  
237 - return EntityKeyValue.fromString(entry.getStrValue().get());  
238 - case LONG:  
239 - return EntityKeyValue.fromLong(entry.getLongValue().get());  
240 - case DOUBLE:  
241 - return EntityKeyValue.fromDouble(entry.getDoubleValue().get());  
242 - case BOOLEAN:  
243 - return EntityKeyValue.fromBool(entry.getBooleanValue().get());  
244 - case JSON:  
245 - return EntityKeyValue.fromJson(entry.getJsonValue().get());  
246 - default:  
247 - throw new RuntimeException("Can't parse entry: " + entry.getDataType()); 247 + if (!latestTsKeys.isEmpty()) {
  248 + List<TsKvEntry> data =
  249 + ctx.getTimeseriesService()
  250 + .findLatest(
  251 + ctx.getTenantId(), new DeviceId(UUID.fromString(originator)), latestTsKeys)
  252 + .get();
  253 + for (TsKvEntry entry : data) {
  254 + if (entry.getValue() != null) {
  255 + result.putValue(
  256 + new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey()),
  257 + entry.getTs(),
  258 + toEntityValue(entry));
248 } 259 }
  260 + }
249 } 261 }
250 -  
251 - private SnapshotUpdate merge(Long newTs, List<KvEntry> data) {  
252 - Set<AlarmConditionFilterKey> keys = new HashSet<>();  
253 - for (KvEntry entry : data) {  
254 - AlarmConditionFilterKey entityKey = new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey());  
255 - if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {  
256 - keys.add(entityKey);  
257 - }  
258 - }  
259 - latestValues.setTs(newTs);  
260 - return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys); 262 + if (!attributeKeys.isEmpty()) {
  263 + addToSnapshot(
  264 + result,
  265 + ctx.getAttributesService()
  266 + .find(
  267 + ctx.getTenantId(),
  268 + new DeviceId(UUID.fromString(originator)),
  269 + DataConstants.CLIENT_SCOPE,
  270 + attributeKeys)
  271 + .get());
  272 + addToSnapshot(
  273 + result,
  274 + ctx.getAttributesService()
  275 + .find(
  276 + ctx.getTenantId(),
  277 + new DeviceId(UUID.fromString(originator)),
  278 + DataConstants.SHARED_SCOPE,
  279 + attributeKeys)
  280 + .get());
  281 + addToSnapshot(
  282 + result,
  283 + ctx.getAttributesService()
  284 + .find(
  285 + ctx.getTenantId(),
  286 + new DeviceId(UUID.fromString(originator)),
  287 + DataConstants.SERVER_SCOPE,
  288 + attributeKeys)
  289 + .get());
261 } 290 }
262 -  
263 - private SnapshotUpdate merge(Set<AttributeKvEntry> attributes) {  
264 - long newTs = 0;  
265 - Set<AlarmConditionFilterKey> keys = new HashSet<>();  
266 - for (AttributeKvEntry entry : attributes) {  
267 - newTs = Math.max(newTs, entry.getLastUpdateTs());  
268 - AlarmConditionFilterKey entityKey = new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, entry.getKey());  
269 - if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {  
270 - keys.add(entityKey);  
271 - }  
272 - }  
273 - latestValues.setTs(newTs);  
274 - return new SnapshotUpdate(AlarmConditionKeyType.ATTRIBUTE, keys); 291 + }
  292 +
  293 + private void addToSnapshot(DataSnapshot snapshot, List<AttributeKvEntry> data) {
  294 + for (AttributeKvEntry entry : data) {
  295 + if (entry.getValue() != null) {
  296 + EntityKeyValue value = toEntityValue(entry);
  297 + snapshot.putValue(
  298 + new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, entry.getKey()),
  299 + entry.getLastUpdateTs(),
  300 + value);
  301 + }
275 } 302 }
276 -  
277 - 303 + }
  304 +
  305 + public static EntityKeyValue toEntityValue(KvEntry entry) {
  306 + switch (entry.getDataType()) {
  307 + case STRING:
  308 + return EntityKeyValue.fromString(entry.getStrValue().get());
  309 + case LONG:
  310 + return EntityKeyValue.fromLong(entry.getLongValue().get());
  311 + case DOUBLE:
  312 + return EntityKeyValue.fromDouble(entry.getDoubleValue().get());
  313 + case BOOLEAN:
  314 + return EntityKeyValue.fromBool(entry.getBooleanValue().get());
  315 + case JSON:
  316 + return EntityKeyValue.fromJson(entry.getJsonValue().get());
  317 + default:
  318 + throw new RuntimeException("Can't parse entry: " + entry.getDataType());
  319 + }
  320 + }
  321 +
  322 + private SnapshotUpdate merge(Long newTs, List<KvEntry> data) {
  323 + Set<AlarmConditionFilterKey> keys = new HashSet<>();
  324 + for (KvEntry entry : data) {
  325 + AlarmConditionFilterKey entityKey =
  326 + new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey());
  327 + if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {
  328 + keys.add(entityKey);
  329 + }
  330 + }
  331 + latestValues.setTs(newTs);
  332 + return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys);
  333 + }
  334 +
  335 + private SnapshotUpdate merge(Set<AttributeKvEntry> attributes) {
  336 + long newTs = 0;
  337 + Set<AlarmConditionFilterKey> keys = new HashSet<>();
  338 + for (AttributeKvEntry entry : attributes) {
  339 + newTs = Math.max(newTs, entry.getLastUpdateTs());
  340 + AlarmConditionFilterKey entityKey =
  341 + new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, entry.getKey());
  342 + if (latestValues.putValue(entityKey, newTs, toEntityValue(entry))) {
  343 + keys.add(entityKey);
  344 + }
  345 + }
  346 + latestValues.setTs(newTs);
  347 + return new SnapshotUpdate(AlarmConditionKeyType.ATTRIBUTE, keys);
  348 + }
278 } 349 }