Commit d473f1c9fbdabb6e15e243bfa34350b7321c6409

Authored by xp.Huang
1 parent a0cda70c

fix: 调整场景联动逻辑不使用anyMatch或allMatch

... ... @@ -10,6 +10,7 @@ import lombok.RequiredArgsConstructor;
10 10 import lombok.extern.slf4j.Slf4j;
11 11 import org.apache.commons.lang3.StringUtils;
12 12 import org.jetbrains.annotations.Nullable;
  13 +import org.springframework.cache.annotation.CacheEvict;
13 14 import org.springframework.cache.annotation.Cacheable;
14 15 import org.springframework.stereotype.Service;
15 16 import org.springframework.transaction.annotation.Transactional;
... ... @@ -27,7 +28,6 @@ import org.thingsboard.server.common.data.yunteng.dto.convert.ConvertConfigDTO;
27 28 import org.thingsboard.server.common.data.yunteng.dto.convert.ConvertDevicesDTO;
28 29 import org.thingsboard.server.common.data.yunteng.dto.convert.DatasourceContentDTO;
29 30 import org.thingsboard.server.common.data.yunteng.enums.*;
30   -import org.thingsboard.server.common.data.yunteng.utils.ReflectUtils;
31 31 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
32 32 import org.thingsboard.server.dao.device.DeviceProfileDao;
33 33 import org.thingsboard.server.dao.yunteng.entities.*;
... ... @@ -55,9 +55,10 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
55 55 private final ThingsModelService thingsModelService;
56 56 private final ConvertConfigService convertConfigService;
57 57 private final TkUserCollectService userCollectService;
58   -
  58 + private final TkDeviceProfileService tkDeviceProfileService;
59 59 @Override
60 60 @Transactional
  61 + @CacheEvict(cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT,key = "{#tenantId, #deviceDTO.profileId}")
61 62 public DeviceDTO insertOrUpdate(String tenantId, DeviceDTO deviceDTO) {
62 63
63 64 if (StringUtils.isBlank(deviceDTO.getId())) {
... ... @@ -83,10 +84,36 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
83 84 ModelConstants.TablePropertyMapping.CREATE_TIME,
84 85 ModelConstants.TablePropertyMapping.UPDATE,
85 86 ModelConstants.TablePropertyMapping.UPDATE_TIME);
  87 + //如果是网关设备,且网关修改了其组织
  88 + if(deviceDTO.getDeviceType().equals(DeviceTypeEnum.GATEWAY)){
  89 + updateSensorDeviceOrganization(deviceDTO.getTenantId(),deviceDTO.getTbDeviceId(),deviceDTO.getOrganizationId());
  90 + }
86 91 baseMapper.updateById(device);
87 92 return device.getDTO(DeviceDTO.class);
88 93 }
89 94
  95 + private void updateSensorDeviceOrganization(String tenantId,String getWayTbDeviceId,String newOrganizationId){
  96 + TkDeviceEntity entity = baseMapper.selectOne(new LambdaQueryWrapper<TkDeviceEntity>()
  97 + .eq(TkDeviceEntity::getTenantId,tenantId).eq(TkDeviceEntity::getTbDeviceId,getWayTbDeviceId));
  98 + //发生组织变更
  99 + if(null != entity && !entity.getOrganizationId().equals(newOrganizationId)){
  100 + //查询新组织下的所有组织
  101 + List<String> allOrganization = organizationService.organizationAllIds(tenantId,newOrganizationId);
  102 + //网关子设备没在该网关新组织下,则同步更新为网关的新组织
  103 + if(null != allOrganization && !allOrganization.isEmpty()){
  104 + List<TkDeviceEntity> list = baseMapper.selectList(new LambdaQueryWrapper<TkDeviceEntity>()
  105 + .eq(TkDeviceEntity::getTenantId,tenantId)
  106 + .eq(TkDeviceEntity::getGatewayId,getWayTbDeviceId)
  107 + .notIn(TkDeviceEntity::getOrganizationId,allOrganization));
  108 + if(!list.isEmpty()){
  109 + list.stream().forEach(obj->{
  110 + obj.setOrganizationId(newOrganizationId);
  111 + baseMapper.updateById(obj);
  112 + });
  113 + }
  114 + }
  115 + }
  116 + }
90 117 private void validateUpdate(DeviceDTO deviceDTO) {
91 118 if (StringUtils.isEmpty(deviceDTO.getName())) {
92 119 throw new TkDataValidationException("device name must be specific");
... ... @@ -693,25 +720,19 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
693 720 }
694 721
695 722 @Cacheable(
696   - cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT,
697   - key = "{#tenantId, #organizationId, #projectId}")
  723 + cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT, key = "{#tenantId, #tbDeviceProfileId}")
698 724 @Override
699   - public List<String> rpcDevices(String tenantId, String organizationId, String projectId) {
  725 + public List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId, String tbDeviceProfileId) {
700 726 List<String> orgIds = organizationService.organizationAllIds(tenantId, organizationId);
701 727
702 728 List<TkDeviceEntity> organizationDevices =
703 729 baseMapper.selectList(
704   - new LambdaQueryWrapper<TkDeviceEntity>()
705   - .eq(TkDeviceEntity::getDeviceProfileId, projectId)
  730 + new LambdaQueryWrapper<TkDeviceEntity>().eq(TkDeviceEntity::getTenantId,tenantId)
  731 + .eq(TkDeviceEntity::getProfileId, tbDeviceProfileId)
706 732 .in(TkDeviceEntity::getOrganizationId, orgIds));
707 733 List<String> allDevices = new ArrayList<>();
708 734 if (organizationDevices != null && !organizationDevices.isEmpty()) {
709 735 for (TkDeviceEntity item : organizationDevices) {
710   - DeviceTypeEnum deviceType = item.getDeviceType();
711   - if (DeviceTypeEnum.SENSOR.equals(deviceType)) {
712   - allDevices.add(item.getGatewayId());
713   - continue;
714   - }
715 736 allDevices.add(item.getTbDeviceId());
716 737 }
717 738 }
... ... @@ -795,4 +816,24 @@ public class TkDeviceServiceImpl extends AbstractBaseService<DeviceMapper, TkDev
795 816 .eq(TkDeviceEntity::getGatewayId,gateWayId));
796 817 return Optional.ofNullable(entity).map(obj->obj.getTbDeviceId()).orElse(null);
797 818 }
  819 + @Override
  820 + @Transactional
  821 + @CacheEvict(cacheNames = FastIotConstants.CacheConfigKey.SCENE_REACT,key = "{#tenantId, #tbDeviceProfileId}")
  822 + public boolean updateDeviceProfileByTbDeviceId(String tenantId, String tbDeviceId, String tbDeviceProfileId) {
  823 + boolean result = false;
  824 + TkDeviceEntity entity =
  825 + baseMapper.selectOne(new LambdaQueryWrapper<TkDeviceEntity>()
  826 + .eq(TkDeviceEntity::getTenantId, tenantId)
  827 + .eq(TkDeviceEntity::getTbDeviceId, tbDeviceId));
  828 + if(null != entity){
  829 + DeviceProfileDTO deviceProfileDTO = tkDeviceProfileService.findByTbDeviceProfileId(tenantId,tbDeviceProfileId);
  830 + if(null !=deviceProfileDTO){
  831 + entity.setProfileId(tbDeviceProfileId);
  832 + entity.setDeviceProfileId(deviceProfileDTO.getId());
  833 + baseMapper.updateById(entity);
  834 + result = true;
  835 + }
  836 + }
  837 + return result;
  838 + }
798 839 }
... ...
... ... @@ -202,7 +202,7 @@ public class TkNoticeServiceImpl implements TkNoticeService {
202 202 .parallel()
203 203 .forEach(
204 204 item -> {
205   - if (!item.getEmail().isEmpty()) {
  205 + if (StringUtils.isNotEmpty(item.getEmail())) {
206 206 emailReceivers.add(item.getEmail());
207 207 }
208 208 });
... ...
... ... @@ -208,13 +208,13 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
208 208
209 209
210 210 /**
211   - * 需要接收RPC控制的设备ID
  211 + * 根据组织和产品ID查询对应的设备
212 212 * @param tenantId 租户ID
213 213 * @param organizationId 组织ID
214 214 * @param projectId 产品ID
215 215 * @return 需要接收RPC控制的网关设备或直连设备的ID
216 216 */
217   - List<String> rpcDevices(String tenantId,String organizationId,String projectId);
  217 + List<String> getDevicesByOrganizationIdAndProjectId(String tenantId, String organizationId, String projectId);
218 218
219 219 ListenableFuture<List<DeviceDTO>> findDeviceListByDeviceProfileId(String deviceProfileId,String tenantId);
220 220
... ... @@ -251,4 +251,13 @@ public interface TkDeviceService extends BaseService<TkDeviceEntity> {
251 251 * @return TBDeviceId
252 252 */
253 253 String getTbDeviceIdByDeviceNameAndGateWayId(String tenantId, String name,String gateWayId);
  254 +
  255 + /**
  256 + * 修改设备的产品ID(profileId)
  257 + * @param tenantId 租户ID
  258 + * @param tbDeviceId TB设备ID
  259 + * @param tbDeviceProfileId TB产品ID
  260 + * @return true成功 false失败
  261 + */
  262 + boolean updateDeviceProfileByTbDeviceId(String tenantId,String tbDeviceId,String tbDeviceProfileId);
254 263 }
... ...
... ... @@ -127,53 +127,51 @@ class ReactState {
127 127 calculateTrigger(ctx,msg,detail,triggerMatched,prefixId,deviceId);
128 128
129 129 /** 1、单个执行条件内全部设备满足条件才为true 2、多个执行条件全部执行条件满足条件才为true */
130   - AtomicBoolean conditionMatched = new AtomicBoolean(false);
131   - boolean allowExecuteCondition = triggerMatched.get() || triggerDevices.size() > FastIotConstants.MagicNumber.ZERO;
  130 + AtomicBoolean conditionMatched = new AtomicBoolean(true);
  131 + boolean allowExecuteCondition = triggerMatched.get() && triggerDevices.size() > FastIotConstants.MagicNumber.ZERO;
132 132 /** 执行条件执行 **/
133   - if(null!=conditions && !conditions.isEmpty()){
134   - calculateCondition(ctx,msg,conditionMatched,prefixId);
135   - }else{
136   - conditionMatched.set(true);
137   - }
  133 + calculateCondition(ctx,msg,conditionMatched,prefixId);
138 134
139 135 String msgData = msg.getData();
140 136 if (allowExecuteCondition && conditionMatched.get()) {
141 137 log.debug(String.format("设备【%s】的消息内容【%s】触发动作", deviceId, msgData));
  138 + ConcurrentHashMap<String, TriggerData> clonedMap =
  139 + actions.isEmpty()?null:SerializationUtils.clone(triggerDevices);
142 140 actions.forEach(
143 141 act -> {
144 142 if (ActionTypeEnum.MSG_NOTIFY.equals(act.getOutTarget())) {
145 143 /** 创建告警,只会以触发器的设备进行告警通知* */
146   - ConcurrentHashMap<String, TriggerData> clonedMap =
147   - SerializationUtils.clone(triggerDevices);
148 144 clonedMap.entrySet().stream()
149 145 .forEach(
150 146 triggerDataEntry -> {
151 147 String key = triggerDataEntry.getKey();
152 148 TriggerData triggerData = triggerDataEntry.getValue();
153   - noticeMsg(
154   - ctx,
155   - triggerData.getMsg(),
156   - act,
157   - key.split("_")[0],
158   - triggerData.getTriggerDetail(),
159   - triggerData.getMsg().getTs());
  149 + noticeMsg(ctx,triggerData.getMsg(),act,key.split("_")[0],triggerData.getTriggerDetail(),
  150 + triggerData.getMsg().getTs());
160 151 /** 通知完一个移除一个 */
161   - triggerDevices.remove(key);
162   - if(triggerDevices.isEmpty()){
163   - cache.evict(triggerDevicesKey);
164   - }else{
165   - cache.put(triggerDevicesKey,triggerDevices);
166   - }
  152 + updateDeviceTriggerCache(key,true);
167 153 });
168 154 } else {
169 155 pushMsg(ctx, msg, act);
170 156 }
171 157 });
  158 + clonedMap.entrySet().stream().forEach(triggerDataEntry-> updateDeviceTriggerCache(triggerDataEntry.getKey(),true));
172 159 } else {
173 160 ctx.tellSuccess(msg);
174 161 }
175 162 }
176 163
  164 + private void updateDeviceTriggerCache(String key, boolean needRemove)
  165 + {
  166 + if(needRemove){
  167 + triggerDevices.remove(key);
  168 + }
  169 + if(triggerDevices.isEmpty()){
  170 + cache.evict(triggerDevicesKey);
  171 + }else{
  172 + cache.put(triggerDevicesKey,triggerDevices);
  173 + }
  174 + }
177 175 private AtomicBoolean calculateTrigger(TbContext ctx, TbMsg msg,ObjectNode detail,AtomicBoolean triggerMatched,
178 176 String prefixId, String deviceId)
179 177 {
... ... @@ -192,40 +190,20 @@ class ReactState {
192 190 tkProjectId, trigger.getTenantId());
193 191 }
194 192 AtomicReference<String> matchKey = new AtomicReference<>("");
195   - boolean matched =
196   - trifggerDevices.stream()
197   - .anyMatch(
  193 + AtomicBoolean matched = new AtomicBoolean(false);
  194 + trifggerDevices.stream()
  195 + .forEach(
198 196 devId -> {
199 197 TriggerState triggerState =
200 198 getOrCreateTriggerState(trigger, tkProjectId, devId);
201 199 if (triggerState == null) {
202   - return false;
  200 + return;
203 201 }
204 202 boolean triggerResult;
205 203 try {
206   - boolean fresh = false;
207   - String originatorId;
208   - if(msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())){
209   - originatorId = msg.getMetaData().getValue("deviceId");
210   - }else{
211   - originatorId = msg.getOriginator().getId().toString();
212   - }
213   - if (trigger.getId().equals(prefixId)
214   - && originatorId.equals(devId)) {
215   - fresh = true;
216   - }
217   - triggerResult = triggerState.process(ctx, msg,fresh);
218   - if(fresh){
219   - String triggerKey = deviceId + "_" + matchKey.get();
220   - if(triggerResult){
221   - //放入满足的触发器信息,设备ID+触发关键字
222   - triggerDevices.put(triggerKey,
223   - new TriggerData(msg,detail));
224   - cache.put(triggerDevicesKey,triggerDevices);
225   - }else{
226   - triggerDevices.remove(triggerKey);
227   - }
228   - }
  204 + boolean isCurrentDeviceTrigger = isCurrentDeviceTrigger(msg,
  205 + trigger.getId(),prefixId,devId);
  206 + triggerResult = triggerState.process(ctx, msg,isCurrentDeviceTrigger);
229 207 ObjectNode result = triggerState.getRuleState().getDetailInform();
230 208 log.debug(
231 209 String.format(
... ... @@ -235,7 +213,7 @@ class ReactState {
235 213 devId,
236 214 msg.getOriginator(),
237 215 msg.getData()));
238   - if (!result.isEmpty() && triggerResult && fresh) {
  216 + if (!result.isEmpty() && triggerResult && isCurrentDeviceTrigger) {
239 217 ObjectNode triggerData = JacksonUtil.newObjectNode();
240 218 triggerData.put(FastIotConstants.Scene.TRIGGER_DATA,result);
241 219 detail.set(devId,triggerData);
... ... @@ -247,78 +225,117 @@ class ReactState {
247 225 clearAlarm(ctx, msg, deviceId, entityKey.getKey());
248 226 }
249 227 }
  228 + if(isCurrentDeviceTrigger){
  229 + String triggerKey = deviceId + "_" + matchKey.get();
  230 + if(triggerResult){
  231 + //放入满足的触发器信息,设备ID+触发关键字
  232 + triggerDevices.put(triggerKey,
  233 + new TriggerData(msg,detail));
  234 + }else{
  235 + triggerDevices.remove(triggerKey);
  236 + }
  237 + updateDeviceTriggerCache(triggerKey,false);
  238 + }
250 239 } catch (ExecutionException e) {
251 240 throw new RuntimeException(e);
252 241 } catch (InterruptedException e) {
253 242 throw new RuntimeException(e);
254 243 }
255   - return triggerResult;
  244 + if(triggerResult){
  245 + matched.set(true);
  246 + }
256 247 });
257   - if(matched){
258   - triggerMatched.set(matched);
  248 + if(matched.get()){
  249 + triggerMatched.set(matched.get());
259 250 }
260 251 });
261 252 });
262 253 return triggerMatched;
263 254 }
  255 +
  256 +
  257 + /**
  258 + * 根据当前设备的ID和触发器ID与场景联动里面的触发器ID进行比较,判断当前设备是否与触发条件里面设备匹配
  259 + * @param msg 设备消息
  260 + * @param triggerId 场景联动里面的触发器ID
  261 + * @param currentTriggerId 当前触发器ID
  262 + * @param currentDeviceId 当前设备ID
  263 + * @return true匹配 false不匹配
  264 + */
  265 + private boolean isCurrentDeviceTrigger(TbMsg msg,String triggerId,String currentTriggerId,String currentDeviceId){
  266 + boolean isCurrentDeviceTrigger = false;
  267 + String originatorId;
  268 + if(msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())){
  269 + originatorId = msg.getMetaData().getValue("deviceId");
  270 + }else{
  271 + originatorId = msg.getOriginator().getId().toString();
  272 + }
  273 + if (triggerId.equals(currentTriggerId)
  274 + && originatorId.equals(currentDeviceId)) {
  275 + isCurrentDeviceTrigger = true;
  276 + }
  277 + return isCurrentDeviceTrigger;
  278 + }
264 279 private AtomicBoolean calculateCondition(TbContext ctx, TbMsg msg,AtomicBoolean conditionMatched,String prefixId){
265 280 Optional.ofNullable(conditions)
266 281 .ifPresent(
267   - all -> {
268   - conditionMatched.set(
269   - all.stream()
270   - .allMatch(
271   - condition -> {
272   - ScopeEnum entityType = condition.getEntityType();
273   - List<String> conditionDevices = condition.getEntityId();
274   - String tkProjectId = condition.getDeviceProfileId();
275   - if (ScopeEnum.ALL.equals(entityType)) {
276   - conditionDevices =
277   - ytDeviceService.findTbDeviceIdsByDeviceProfileId(
278   - tkProjectId, condition.getTenantId());
279   - }
280   -
281   - return conditionDevices.stream()
282   - .allMatch(
283   - devId -> {
284   - TriggerState conditionState =
285   - getOrCreateConditionState(condition, tkProjectId, devId);
286   - boolean conditionResult;
287   - try {
288   - String originatorId;
289   - boolean fresh = false;
290   - if(msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())){
291   - originatorId = msg.getMetaData().getValue("deviceId");
292   - }else{
293   - originatorId = msg.getOriginator().getId().toString();
294   - }
295   - if (condition.getId().equals(prefixId)
296   - && originatorId.equals(devId)) {
297   - fresh = true;
298   - }
299   - conditionResult = conditionState.process(ctx, msg,fresh);
300   - ObjectNode result = conditionState.getRuleState().getDetailInform();
301   - log.debug(
302   - String.format(
303   - "执行器【%s】结果【%s】执行器设备【%s】数据设备【%s】数据内容【%s】",
304   - condition.getId(),
305   - result,
306   - devId,
307   - msg.getOriginator(),
308   - msg.getData()));
309   - if (!result.isEmpty() && conditionResult) {
310   - //更新触发器的详情
311   - updateTriggerDetail(devId,result);
312   - }
313   - } catch (ExecutionException e) {
314   - throw new RuntimeException(e);
315   - } catch (InterruptedException e) {
316   - throw new RuntimeException(e);
317   - }
318   - return conditionResult;
319   - });
320   - }));
321   - });
  282 + all -> all.stream()
  283 + .forEach(
  284 + condition -> {
  285 + ScopeEnum entityType = condition.getEntityType();
  286 + List<String> conditionDevices = condition.getEntityId();
  287 + String tkProjectId = condition.getDeviceProfileId();
  288 + if (ScopeEnum.ALL.equals(entityType)) {
  289 + conditionDevices =
  290 + ytDeviceService.findTbDeviceIdsByDeviceProfileId(
  291 + tkProjectId, condition.getTenantId());
  292 + }
  293 + AtomicBoolean oneConditionResult = new AtomicBoolean(true);
  294 + conditionDevices.stream()
  295 + .forEach(
  296 + devId -> {
  297 + TriggerState conditionState =
  298 + getOrCreateConditionState(condition, tkProjectId, devId);
  299 + boolean conditionResult;
  300 + try {
  301 + String originatorId;
  302 + boolean fresh = false;
  303 + if(msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())){
  304 + originatorId = msg.getMetaData().getValue("deviceId");
  305 + }else{
  306 + originatorId = msg.getOriginator().getId().toString();
  307 + }
  308 + if (condition.getId().equals(prefixId)
  309 + && originatorId.equals(devId)) {
  310 + fresh = true;
  311 + }
  312 + conditionResult = conditionState.process(ctx, msg,fresh);
  313 + ObjectNode result = conditionState.getRuleState().getDetailInform();
  314 + log.debug(
  315 + String.format(
  316 + "执行器【%s】结果【%s】执行器设备【%s】数据设备【%s】数据内容【%s】",
  317 + condition.getId(),
  318 + result,
  319 + devId,
  320 + msg.getOriginator(),
  321 + msg.getData()));
  322 + if (!result.isEmpty() && conditionResult) {
  323 + //更新触发器的详情
  324 + updateTriggerDetail(devId,result);
  325 + }
  326 + } catch (ExecutionException e) {
  327 + throw new RuntimeException(e);
  328 + } catch (InterruptedException e) {
  329 + throw new RuntimeException(e);
  330 + }
  331 + if(!conditionResult){
  332 + oneConditionResult.set(false);
  333 + }
  334 + });
  335 + if(!oneConditionResult.get()){
  336 + conditionMatched.set(false);
  337 + }
  338 + }));
322 339 return conditionMatched;
323 340 }
324 341 /**
... ... @@ -416,7 +433,8 @@ class ReactState {
416 433 List<String> rpcDevices = action.getDeviceId();
417 434 if (ScopeEnum.ALL.equals(action.getEntityType())) {
418 435 rpcDevices =
419   - ytDeviceService.rpcDevices(action.getTenantId(), orgId, action.getDeviceProfileId());
  436 + ytDeviceService.getDevicesByOrganizationIdAndProjectId(action.getTenantId(), orgId,
  437 + msg.getMetaData().getValue("deviceProfileId"));
420 438 }
421 439 rpcMsg(ctx, msg, rpcDevices, action.getDoContext(), action.getCallType());
422 440 break;
... ...
... ... @@ -95,7 +95,7 @@ public class TbSceneReactNode implements TbNode {
95 95 String scenId = t.getScenId();
96 96 List<String> orgDevices =
97 97 ctx.getTkDeviceService()
98   - .rpcDevices(tenantId, config.getOrgs().get(scenId), projectId);
  98 + .getDevicesByOrganizationIdAndProjectId(tenantId, config.getOrgs().get(scenId), tbProfileId);
99 99 Optional.ofNullable(orgDevices)
100 100 .ifPresent(
101 101 f -> {
... ...