Commit bc6c5be22d35e86c6c04486d7c0edd4ea41c3917

Authored by xp.Huang
2 parents 95b58f7f a197ce13

Merge branch '20230224' into 'master'

fix: 事件问题修复

See merge request yunteng/thingskit!162
@@ -593,8 +593,6 @@ public class DefaultTransportService implements TransportService { @@ -593,8 +593,6 @@ public class DefaultTransportService implements TransportService {
593 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 593 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
594 metaData.putValue("deviceName", sessionInfo.getDeviceName()); 594 metaData.putValue("deviceName", sessionInfo.getDeviceName());
595 metaData.putValue("deviceType", sessionInfo.getDeviceType()); 595 metaData.putValue("deviceType", sessionInfo.getDeviceType());
596 - UUID deviceProfileId = new UUID(sessionInfo.getDeviceProfileIdMSB(),sessionInfo.getDeviceProfileIdLSB());  
597 - metaData.putValue("device_profile_id", deviceProfileId.toString());  
598 metaData.putValue("deviceId",eventInfo[0]); 596 metaData.putValue("deviceId",eventInfo[0]);
599 metaData.putValue("event_identifier", eventInfo[1]); 597 metaData.putValue("event_identifier", eventInfo[1]);
600 sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST, 598 sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST,
@@ -30,7 +30,7 @@ public interface EventKvRepository extends PagingAndSortingRepository<TkEventKvE @@ -30,7 +30,7 @@ public interface EventKvRepository extends PagingAndSortingRepository<TkEventKvE
30 30
31 @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.entityId = :entityId " 31 @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.entityId = :entityId "
32 + "AND (:eventType IS NULL OR a.id.eventType = :eventType) " 32 + "AND (:eventType IS NULL OR a.id.eventType = :eventType) "
33 - + "AND (:eventIdentifier IS NULL OR a.id.eventIdentifier = :eventIdentifier) " 33 + + "AND (a.id.eventIdentifier LIKE concat('%',:eventIdentifier,'%')) "
34 + "AND (:startTime IS NULL OR a.id.eventTime >= :startTime) " 34 + "AND (:startTime IS NULL OR a.id.eventTime >= :startTime) "
35 + "AND (:endTime IS NULL OR a.id.eventTime <= :endTime ) " ) 35 + "AND (:endTime IS NULL OR a.id.eventTime <= :endTime ) " )
36 Page<TkEventKvEntity> findEvents(@Param("entityId") UUID entityId, 36 Page<TkEventKvEntity> findEvents(@Param("entityId") UUID entityId,
@@ -108,7 +108,7 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen @@ -108,7 +108,7 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen
108 @Override 108 @Override
109 public PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink) { 109 public PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink) {
110 110
111 - return DaoUtil.toPageData(eventKvRepository.findEvents(entityId.getId(),eventType,eventIdentifier,pageLink.getStartTime(),pageLink.getEndTime(),DaoUtil.toPageable(pageLink))); 111 + return DaoUtil.toPageData(eventKvRepository.findEvents(entityId.getId(),eventType,eventIdentifier==null?"":eventIdentifier,pageLink.getStartTime(),pageLink.getEndTime(),DaoUtil.toPageable(pageLink)));
112 } 112 }
113 113
114 114
@@ -20,6 +20,7 @@ import org.thingsboard.rule.engine.api.*; @@ -20,6 +20,7 @@ import org.thingsboard.rule.engine.api.*;
20 import org.thingsboard.rule.engine.api.util.TbNodeUtils; 20 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
21 import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback; 21 import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback;
22 import org.thingsboard.server.common.data.DeviceProfile; 22 import org.thingsboard.server.common.data.DeviceProfile;
  23 +import org.thingsboard.server.common.data.id.DeviceId;
23 import org.thingsboard.server.common.data.id.DeviceProfileId; 24 import org.thingsboard.server.common.data.id.DeviceProfileId;
24 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
25 import org.thingsboard.server.common.data.plugin.ComponentType; 26 import org.thingsboard.server.common.data.plugin.ComponentType;
@@ -66,11 +67,15 @@ public class TkMsgEventNode implements TbNode { @@ -66,11 +67,15 @@ public class TkMsgEventNode implements TbNode {
66 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); 67 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
67 return; 68 return;
68 } 69 }
69 - String deviceProfileId = msg.getMetaData().getValue(DEVICE_DEVICE_PROFILE_ID_PROPERTY);  
70 - TenantId tenantId = ctx.getTenantId();  
71 - DeviceProfileId profileId = new DeviceProfileId(UUID.fromString(deviceProfileId));  
72 - DeviceProfile profile = cache.get(tenantId,profileId); 70 + String deviceIdStr = msg.getMetaData().getValue("deviceId");
  71 + DeviceId deviceId = DeviceId.fromString(deviceIdStr);
73 String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN); 72 String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN);
  73 + TenantId tenantId = ctx.getTenantId();
  74 + DeviceProfile profile = cache.get(tenantId,deviceId);
  75 + if(profile == null){
  76 + ctx.tellFailure(msg,new TbNodeException(String.format("设备【%s】不存在。", deviceIdStr)));
  77 + return;
  78 + }
74 List<TkThingsModel> eventList =profile.getProfileData().getThingsModel().stream() 79 List<TkThingsModel> eventList =profile.getProfileData().getThingsModel().stream()
75 .filter(f -> f.getIdentifier().equals(eventIdentifier) && FunctionTypeEnum.events.equals(f.getFunctionType())) 80 .filter(f -> f.getIdentifier().equals(eventIdentifier) && FunctionTypeEnum.events.equals(f.getFunctionType()))
76 .collect(Collectors.toList()); 81 .collect(Collectors.toList());
@@ -80,6 +85,7 @@ public class TkMsgEventNode implements TbNode { @@ -80,6 +85,7 @@ public class TkMsgEventNode implements TbNode {
80 item.setEventIdentifier(i.getIdentifier()); 85 item.setEventIdentifier(i.getIdentifier());
81 item.setEventType(i.getEventType()); 86 item.setEventType(i.getEventType());
82 item.setEventName(i.getFunctionName()); 87 item.setEventName(i.getFunctionName());
  88 + entryList.add(item);
83 }); 89 });
84 if(eventList.isEmpty()){ 90 if(eventList.isEmpty()){
85 ctx.tellFailure(msg,new TbNodeException(String.format("产品物模型中未申明,上报的事件类型【%s】。", eventIdentifier))); 91 ctx.tellFailure(msg,new TbNodeException(String.format("产品物模型中未申明,上报的事件类型【%s】。", eventIdentifier)));
@@ -89,8 +95,8 @@ public class TkMsgEventNode implements TbNode { @@ -89,8 +95,8 @@ public class TkMsgEventNode implements TbNode {
89 long ts = System.currentTimeMillis(); 95 long ts = System.currentTimeMillis();
90 String src = msg.getData(); 96 String src = msg.getData();
91 ctx.getTelemetryService().saveAndNotify( 97 ctx.getTelemetryService().saveAndNotify(
92 - tenantId,profileId,  
93 - msg.getOriginator(), 98 + tenantId,profile.getId(),
  99 + deviceId,
94 entryList, 100 entryList,
95 src,ts, 101 src,ts,
96 new TelemetryNodeCallback(ctx, msg) 102 new TelemetryNodeCallback(ctx, msg)