Commit 5030194b0942b4b5a90c6a6004657221779c7153

Authored by xp.Huang
1 parent 41c9cf58

fix: 解决设备更换产品后的缓存问题

... ... @@ -7,7 +7,7 @@ import java.util.List;
7 7
8 8 @Data
9 9 public class BatchDeviceUpdateDTO {
10   - @ApiModelProperty(value = "批量删除设备ids(tbDeviceId)")
  10 + @ApiModelProperty(value = "批量更新设备ids(tbDeviceId)")
11 11 private List<String> deviceIds;
12 12
13 13 @ApiModelProperty(value = "产品ID(tbDeviceProfileId)")
... ...
... ... @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.EntityId;
21 21 import org.thingsboard.server.common.data.id.TenantId;
22 22 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
23 23 import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  24 +import org.thingsboard.server.common.data.yunteng.core.cache.CacheUtils;
24 25 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
25 26 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
26 27 import org.thingsboard.server.common.data.yunteng.dto.*;
... ... @@ -56,9 +57,11 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
56 57 private final ConvertConfigService convertConfigService;
57 58 private final TkUserCollectService userCollectService;
58 59 private final TkDeviceProfileService tkDeviceProfileService;
  60 + private final CacheUtils cacheUtils;
  61 + private final String cacheName = FastIotConstants.CacheConfigKey.SCENE_REACT;
59 62 @Override
60 63 @Transactional
61   - @CacheEvict(cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT,key = "{#tenantId, #deviceDTO.profileId}")
  64 + @CacheEvict(cacheNames = cacheName,key = "{#tenantId, #deviceDTO.profileId}")
62 65 public DeviceDTO insertOrUpdate(String tenantId, DeviceDTO deviceDTO) {
63 66
64 67 if (StringUtils.isBlank(deviceDTO.getId())) {
... ... @@ -71,7 +74,12 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
71 74 private DeviceDTO update(DeviceDTO deviceDTO) {
72 75
73 76 validateUpdate(deviceDTO);
74   -
  77 + TkDeviceEntity entity = baseMapper.selectOne(new LambdaQueryWrapper<TkDeviceEntity>().eq(TkDeviceEntity::getTenantId,
  78 + deviceDTO.getTenantId()).eq(TkDeviceEntity::getId,deviceDTO.getId()));
  79 + if(null != entity && !entity.getProfileId().equals(deviceDTO.getProfileId())){
  80 + //更改了产品,需将原产品的缓存置为无效
  81 + cacheUtils.invalidate(cacheName,deviceDTO.getTenantId()+","+entity.getProfileId());
  82 + }
75 83 TkDeviceEntity device = new TkDeviceEntity();
76 84 deviceDTO.copyToEntity(
77 85 device,
... ... @@ -719,25 +727,53 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
719 727 return baseMapper.findDeviceInfo(tenantId, tbDeviceId);
720 728 }
721 729
722   - @Cacheable(
723   - cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT, key = "{#tenantId, #tbDeviceProfileId}")
724   - @Override
725   - public List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId, String tbDeviceProfileId) {
726   - List<String> orgIds = organizationService.organizationAllIds(tenantId, organizationId);
727 730
728   - List<TkDeviceEntity> organizationDevices =
729   - baseMapper.selectList(
730   - new LambdaQueryWrapper<TkDeviceEntity>().eq(TkDeviceEntity::getTenantId,tenantId)
731   - .eq(TkDeviceEntity::getProfileId, tbDeviceProfileId)
732   - .in(TkDeviceEntity::getOrganizationId, orgIds));
  731 + @Override
  732 + public List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId,
  733 + Map<String, List<String>> map) {
  734 + //不直接查询数据库的目的: 保证获取缓存的设备ID数据是正确的
733 735 List<String> allDevices = new ArrayList<>();
734   - if (organizationDevices != null && !organizationDevices.isEmpty()) {
735   - for (TkDeviceEntity item : organizationDevices) {
736   - allDevices.add(item.getTbDeviceId());
  736 + if(!map.isEmpty()){
  737 + List<String> orgIds = organizationService.organizationAllIds(tenantId, organizationId);
  738 + if(null !=orgIds && !orgIds.isEmpty()){
  739 + orgIds.stream().forEach(orgId->{
  740 + map.entrySet().stream().forEach(entry->{
  741 + if(entry.getKey().equals(orgId)){
  742 + allDevices.addAll(entry.getValue());
  743 + }
  744 + });
  745 + });
737 746 }
738 747 }
739 748 return allDevices;
740 749 }
  750 + @Cacheable(
  751 + cacheNames = cacheName, key = "{#tenantId, #tbDeviceProfileId}")
  752 + @Override
  753 + public Map<String, List<String>> getDeviceIdsByDeviceProfileId(String tenantId, String tbDeviceProfileId) {
  754 + Map<String, List<String>> result;
  755 + List<TkDeviceEntity> organizationDevices =
  756 + baseMapper.selectList(
  757 + new LambdaQueryWrapper<TkDeviceEntity>().eq(TkDeviceEntity::getTenantId,tenantId)
  758 + .eq(TkDeviceEntity::getProfileId, tbDeviceProfileId));
  759 + if(null !=organizationDevices && !organizationDevices.isEmpty()){
  760 + result = new HashMap<>();
  761 + organizationDevices.stream().forEach(entity->{
  762 + String organizationId = entity.getOrganizationId();
  763 + if(null == result.get(organizationId)){
  764 + result.put(organizationId,List.of(entity.getId()));
  765 + }else{
  766 + ArrayList<String> deviceIds = new ArrayList<>();
  767 + deviceIds.add(entity.getId());
  768 + deviceIds.addAll(result.get(organizationId));
  769 + result.put(organizationId,deviceIds);
  770 + }
  771 + });
  772 + } else {
  773 + result = null;
  774 + }
  775 + return result;
  776 + }
741 777
742 778 @Override
743 779 public ListenableFuture<List<DeviceDTO>> findDeviceListByDeviceProfileId(
... ... @@ -818,16 +854,18 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
818 854 }
819 855 @Override
820 856 @Transactional
821   - @CacheEvict(cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT,key = "{#tenantId, #tbDeviceProfileId}")
  857 + @CacheEvict(cacheNames = cacheName,key = "{#tenantId, #tbDeviceProfileId}")
822 858 public boolean updateDeviceProfileByTbDeviceId(String tenantId, String tbDeviceId, String tbDeviceProfileId) {
823 859 boolean result = false;
824 860 TkDeviceEntity entity =
825 861 baseMapper.selectOne(new LambdaQueryWrapper<TkDeviceEntity>()
826 862 .eq(TkDeviceEntity::getTenantId, tenantId)
827 863 .eq(TkDeviceEntity::getTbDeviceId, tbDeviceId));
828   - if(null != entity){
  864 + if(null != entity && !entity.getProfileId().equals(tbDeviceProfileId)){
829 865 DeviceProfileDTO deviceProfileDTO = tkDeviceProfileService.findByTbDeviceProfileId(tenantId,tbDeviceProfileId);
830 866 if(null !=deviceProfileDTO){
  867 + //更改了产品,需将原产品的缓存置为无效
  868 + cacheUtils.invalidate(cacheName,tenantId+","+entity.getProfileId());
831 869 entity.setProfileId(tbDeviceProfileId);
832 870 entity.setDeviceProfileId(deviceProfileDTO.getId());
833 871 baseMapper.updateById(entity);
... ...
... ... @@ -211,10 +211,18 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
211 211 * 根据组织和产品ID查询对应的设备
212 212 * @param tenantId 租户ID
213 213 * @param organizationId 组织ID
214   - * @param projectId 产品ID
  214 + * @param map 组织设备关系列表
215 215 * @return 需要接收RPC控制的网关设备或直连设备的ID
216 216 */
217   - List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId, String projectId);
  217 + List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId, Map<String, List<String>> map);
  218 +
  219 + /**
  220 + * 根据tbDeviceProfileId查询所有的组织与设备关系
  221 + * @param tenantId 租户ID
  222 + * @param tbDeviceProfileId 设备配置ID
  223 + * @return 组织ID作为key,设备ID列表作为value
  224 + */
  225 + Map<String, List<String>> getDeviceIdsByDeviceProfileId(String tenantId, String tbDeviceProfileId);
218 226
219 227 ListenableFuture<List<DeviceDTO>> findDeviceListByDeviceProfileId(String deviceProfileId,String tenantId);
220 228
... ...
... ... @@ -69,7 +69,7 @@ class ReactState {
69 69 private final TkNoticeService noticeService;
70 70 private final CacheManager cacheManager;
71 71 private final Cache cache;
72   - private TkDeviceService ytDeviceService;
  72 + private TkDeviceService tkDeviceService;
73 73 private String triggerDevicesKey;
74 74
75 75 private String currentAlarmsKey;
... ... @@ -93,7 +93,7 @@ class ReactState {
93 93 }
94 94 this.cacheManager = SpringBeanUtils.getBean(CacheManager.class);
95 95 this.noticeService = SpringBeanUtils.getBean(TkNoticeService.class);
96   - this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
  96 + this.tkDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
97 97 this.cache = cacheManager.getCache(FastIotConstants.CacheConfigKey.SCENE_REACT);
98 98 this.triggerDevicesKey = reactId+ "_triggerDevices";
99 99 this.currentAlarmsKey = reactId+ "_currentAlarms";
... ... @@ -186,7 +186,7 @@ class ReactState {
186 186 String tkProjectId = trigger.getDeviceProfileId();
187 187 if (ScopeEnum.ALL.equals(entityType)) {
188 188 trifggerDevices =
189   - ytDeviceService.findTbDeviceIdsByDeviceProfileId(
  189 + tkDeviceService.findTbDeviceIdsByDeviceProfileId(
190 190 tkProjectId, trigger.getTenantId());
191 191 }
192 192 AtomicReference<String> matchKey = new AtomicReference<>("");
... ... @@ -287,7 +287,7 @@ class ReactState {
287 287 String tkProjectId = condition.getDeviceProfileId();
288 288 if (ScopeEnum.ALL.equals(entityType)) {
289 289 conditionDevices =
290   - ytDeviceService.findTbDeviceIdsByDeviceProfileId(
  290 + tkDeviceService.findTbDeviceIdsByDeviceProfileId(
291 291 tkProjectId, condition.getTenantId());
292 292 }
293 293 AtomicBoolean oneConditionResult = new AtomicBoolean(true);
... ... @@ -433,8 +433,9 @@ class ReactState {
433 433 List<String> rpcDevices = action.getDeviceId();
434 434 if (ScopeEnum.ALL.equals(action.getEntityType())) {
435 435 rpcDevices =
436   - ytDeviceService.getDevicesByOrganizationIdAndProjectId(action.getTenantId(), orgId,
437   - msg.getMetaData().getValue("deviceProfileId"));
  436 + tkDeviceService.getDevicesByOrganizationIdAndProjectId(action.getTenantId(), orgId,
  437 + tkDeviceService.getDeviceIdsByDeviceProfileId(action.getTenantId(),
  438 + msg.getMetaData().getValue("deviceProfileId")));
438 439 }
439 440 rpcMsg(ctx, msg, rpcDevices, action.getDoContext(), action.getCallType());
440 441 break;
... ... @@ -517,7 +518,7 @@ class ReactState {
517 518 && currentAlarm.getSeverity().equals(oldAlarm.getSeverity())) {
518 519 return;
519 520 }
520   - ytDeviceService.freshAlarmStatus(entityId, 1);
  521 + tkDeviceService.freshAlarmStatus(entityId, 1);
521 522 currentAlarms.put(deviceId, currentAlarm);
522 523 alarmMsg(ctx, msg, currentAlarm, "Alarm Cleared");
523 524 if(action.getAlarmProfileId().isEmpty()){
... ... @@ -550,7 +551,7 @@ class ReactState {
550 551 currentAlarms.get(deviceId).getId(),
551 552 null,
552 553 System.currentTimeMillis());
553   - ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
  554 + tkDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
554 555 alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
555 556 currentAlarms.remove(deviceId);
556 557 }
... ...
... ... @@ -102,7 +102,8 @@ public class TbSceneReactNode implements TbNode {
102 102 String scenId = t.getScenId();
103 103 List<String> orgDevices =
104 104 ctx.getTkDeviceService()
105   - .getDevicesByOrganizationIdAndProjectId(tenantId, config.getOrgs().get(scenId), tbProfileId);
  105 + .getDevicesByOrganizationIdAndProjectId(tenantId, config.getOrgs().get(scenId),
  106 + ctx.getTkDeviceService().getDeviceIdsByDeviceProfileId(tenantId,tbProfileId));
106 107 Optional.ofNullable(orgDevices)
107 108 .ifPresent(
108 109 f -> {
... ...