Commit 7b45c58e5a1a92cd8d8bedba7b4a715e6782a463
Merge branch 'ljl022510' into 'master'
Ljl022510 See merge request huang/thingsboard3.3.2!50
Showing
6 changed files
with
82 additions
and
92 deletions
... | ... | @@ -74,7 +74,7 @@ public class YtDeviceController extends BaseController { |
74 | 74 | } |
75 | 75 | DeviceDTO newDeviceDTO = null; |
76 | 76 | boolean isIncludeRelation = false; |
77 | - String gateWayDeviceId = deviceDTO.getGateWayDeviceId(); | |
77 | + String gateWayDeviceId = deviceDTO.getGatewayId(); | |
78 | 78 | DeviceDTO gateWayDevice = null; |
79 | 79 | if (StringUtils.isNotEmpty(gateWayDeviceId)) { |
80 | 80 | gateWayDevice = | ... | ... |
... | ... | @@ -38,7 +38,8 @@ public class DeviceDTO extends TenantDTO { |
38 | 38 | private String profileId; |
39 | 39 | |
40 | 40 | @ApiModelProperty(value = "关联网关设备") |
41 | - private String gateWayDeviceId; | |
41 | + private String gatewayId; | |
42 | + private String gatewayName; | |
42 | 43 | |
43 | 44 | @ApiModelProperty(value = "设备凭证") |
44 | 45 | private YtCredentialsDto deviceToken; | ... | ... |
... | ... | @@ -24,6 +24,7 @@ public class YtDevice extends TenantBaseEntity { |
24 | 24 | private JsonNode deviceInfo; |
25 | 25 | private String profileId; |
26 | 26 | private String tbDeviceId; |
27 | + private String gatewayId; | |
27 | 28 | private String deviceTypeId; |
28 | 29 | private String label; |
29 | 30 | @TableField(typeHandler = EnumTypeHandler.class) | ... | ... |
... | ... | @@ -25,6 +25,8 @@ |
25 | 25 | <result property="description" column="description"/> |
26 | 26 | <result property="customerId" column="customer_id"/> |
27 | 27 | <result property="organizationId" column="organization_id"/> |
28 | + <result property="gatewayId" column="gateway_id"/> | |
29 | + <result property="gatewayName" column="gateway_name"/> | |
28 | 30 | <association property="deviceProfile" javaType="org.thingsboard.server.common.data.yunteng.dto.DeviceProfileDTO"> |
29 | 31 | <result property="name" column="profile_name"/> |
30 | 32 | <result property="transportType" column="transport_type"/> |
... | ... | @@ -41,6 +43,7 @@ |
41 | 43 | <result property="createdTime" column="created_time"/> |
42 | 44 | <result property="lastOnlineTime" column="last_online_time"/> |
43 | 45 | </resultMap> |
46 | + | |
44 | 47 | <sql id="basicColumns"> |
45 | 48 | ifd.id,ifd.name,ifd.device_info,ifd.profile_id,ifd.active_time,ifd.tenant_id,ifd.description |
46 | 49 | ,ifd.tb_device_id,ifd.label,ifd.last_connect_time,ifd.device_type,ifd.device_state,ifd.create_time,ifd.update_time,ifd.creator, |
... | ... | @@ -48,6 +51,7 @@ |
48 | 51 | </sql> |
49 | 52 | <sql id="detailColumns"> |
50 | 53 | <include refid="basicColumns"/> |
54 | + ,ifd.gateway_id,idg.name gateway_name | |
51 | 55 | ,ifdp.name AS profile_name,ifdp.transport_type |
52 | 56 | ,io.name AS organization_name |
53 | 57 | </sql> |
... | ... | @@ -109,6 +113,7 @@ |
109 | 113 | FROM iotfs_device ifd |
110 | 114 | LEFT JOIN device_profile ifdp ON ifd.profile_id = CAST (ifdp.id AS VARCHAR) |
111 | 115 | LEFT JOIN iotfs_organization io ON io.id = ifd.organization_id |
116 | + LEFT JOIN iotfs_device idg ON idg.id = ifd.gateway_id | |
112 | 117 | <where> |
113 | 118 | <if test="tenantId !=null and tenantId !=''"> |
114 | 119 | AND ifd.tenant_id = #{tenantId} | ... | ... |
... | ... | @@ -4,31 +4,19 @@ |
4 | 4 | package org.thingsboard.rule.engine.yunteng.scene; |
5 | 5 | |
6 | 6 | import com.fasterxml.jackson.databind.JsonNode; |
7 | -import com.google.gson.JsonParser; | |
8 | 7 | import lombok.extern.slf4j.Slf4j; |
9 | 8 | import org.thingsboard.common.util.JacksonUtil; |
10 | 9 | import org.thingsboard.rule.engine.action.TbAlarmResult; |
11 | 10 | import org.thingsboard.rule.engine.api.TbContext; |
12 | -import org.thingsboard.rule.engine.profile.*; | |
13 | -import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; | |
14 | -import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; | |
15 | -import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; | |
16 | 11 | import org.thingsboard.server.common.data.DataConstants; |
17 | -import org.thingsboard.server.common.data.device.profile.*; | |
18 | -import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; | |
19 | -import org.thingsboard.server.common.data.id.EntityId; | |
20 | -import org.thingsboard.server.common.data.id.TenantId; | |
21 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
22 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
23 | -import org.thingsboard.server.common.data.query.EntityKeyType; | |
12 | +import org.thingsboard.server.common.data.device.profile.AlarmCondition; | |
13 | +import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter; | |
14 | +import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey; | |
24 | 15 | import org.thingsboard.server.common.data.rule.RuleNodeState; |
25 | 16 | import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO; |
26 | 17 | import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; |
27 | 18 | import org.thingsboard.server.common.msg.TbMsg; |
28 | 19 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
29 | -import org.thingsboard.server.common.msg.queue.ServiceQueue; | |
30 | -import org.thingsboard.server.common.msg.session.SessionMsgType; | |
31 | -import org.thingsboard.server.common.transport.adaptor.JsonConverter; | |
32 | 20 | import org.thingsboard.server.dao.yunteng.entities.DoAction; |
33 | 21 | import org.thingsboard.server.dao.yunteng.entities.DoCondition; |
34 | 22 | import org.thingsboard.server.dao.yunteng.service.DoActionService; |
... | ... | @@ -36,6 +24,7 @@ import org.thingsboard.server.dao.yunteng.service.DoConditionService; |
36 | 24 | import org.thingsboard.server.dao.yunteng.service.TriggerService; |
37 | 25 | |
38 | 26 | import java.util.*; |
27 | +import java.util.concurrent.ConcurrentHashMap; | |
39 | 28 | import java.util.concurrent.ExecutionException; |
40 | 29 | |
41 | 30 | @Slf4j |
... | ... | @@ -48,7 +37,7 @@ class ReactState { |
48 | 37 | * 键:设备主键 |
49 | 38 | * 值:设备指标参与的触发器 |
50 | 39 | */ |
51 | - private Map<String, TriggerState> triggerState; | |
40 | + private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>(); | |
52 | 41 | |
53 | 42 | |
54 | 43 | |
... | ... | @@ -78,21 +67,17 @@ class ReactState { |
78 | 67 | |
79 | 68 | |
80 | 69 | public void process(TbContext ctx, TbMsg msg,String deviceId) throws ExecutionException, InterruptedException { |
81 | - TriggerState triggerRuleState = getOrCreateTriggerState(deviceId); | |
70 | + TriggerState triggerState = getOrCreateTriggerState(deviceId); | |
82 | 71 | boolean matched = false; |
83 | - if(triggerRuleState != null && actions != null){ | |
84 | - if(msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())){ | |
85 | - matched = processTelemetry(ctx,msg,triggerRuleState); | |
86 | - }else{ | |
87 | - matched = processAttributes(ctx,msg,triggerRuleState); | |
88 | - } | |
72 | + if(triggerState != null && actions != null){ | |
73 | + matched = triggerState.process(ctx,msg); | |
89 | 74 | } |
90 | 75 | |
91 | 76 | if(matched){ |
92 | 77 | // TODO 执行条件 |
93 | 78 | } |
94 | 79 | |
95 | - if(true){ | |
80 | + if(matched){ | |
96 | 81 | // TODO 输出动作 |
97 | 82 | for(DoAction item: actions){ |
98 | 83 | // pushMsg(); |
... | ... | @@ -103,69 +88,26 @@ class ReactState { |
103 | 88 | |
104 | 89 | |
105 | 90 | |
106 | - private boolean processAttributes(TbContext ctx, TbMsg msg,TriggerState triggerRuleState) throws ExecutionException, InterruptedException { | |
107 | - boolean stateChanged = false; | |
108 | - Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); | |
109 | - if (!attributes.isEmpty()) { | |
110 | - SnapshotUpdate update = triggerRuleState.merge(attributes); | |
111 | - stateChanged |= triggerRuleState.process(ctx, msg, update); | |
112 | - | |
113 | - } | |
114 | - return stateChanged; | |
115 | - } | |
116 | - | |
117 | - protected boolean processTelemetry(TbContext ctx, TbMsg msg,TriggerState triggerRuleState) throws ExecutionException, InterruptedException { | |
118 | - boolean stateChanged = false; | |
119 | - Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); | |
120 | - for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) { | |
121 | - Long ts = entry.getKey(); | |
122 | - List<KvEntry> data = entry.getValue(); | |
123 | - SnapshotUpdate update = triggerRuleState.merge( ts, data); | |
124 | - if (update.hasUpdate()) { | |
125 | - stateChanged |= triggerRuleState.process(ctx, msg, update); | |
126 | - } | |
127 | - } | |
128 | - | |
129 | - return stateChanged; | |
130 | - } | |
131 | - | |
132 | - | |
133 | - | |
134 | - private static EntityKeyType getKeyTypeFromScope(String scope) { | |
135 | - switch (scope) { | |
136 | - case DataConstants.CLIENT_SCOPE: | |
137 | - return EntityKeyType.CLIENT_ATTRIBUTE; | |
138 | - case DataConstants.SHARED_SCOPE: | |
139 | - return EntityKeyType.SHARED_ATTRIBUTE; | |
140 | - case DataConstants.SERVER_SCOPE: | |
141 | - return EntityKeyType.SERVER_ATTRIBUTE; | |
142 | - } | |
143 | - return EntityKeyType.ATTRIBUTE; | |
144 | - } | |
145 | - | |
146 | - | |
147 | - | |
148 | 91 | |
149 | 92 | |
150 | 93 | |
151 | 94 | protected TriggerState getOrCreateTriggerState(String deviceId) { |
152 | - TriggerState triggerRuleState = triggerState.computeIfAbsent(deviceId | |
153 | - ,a -> { | |
154 | - TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); | |
155 | - TriggerDTO trigger =triggerService.getTrigger(reactId,deviceId); | |
156 | - if(trigger == null){ | |
157 | - return null; | |
158 | - } | |
159 | - AlarmCondition condition = trigger.getTriggerCondition(); | |
160 | - Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); | |
161 | - for(AlarmConditionFilter filter :condition.getCondition()){ | |
162 | - filterKeys.add(filter.getKey()); | |
163 | - } | |
164 | - TriggerState state = new TriggerState(deviceId,condition, filterKeys,null); | |
165 | - triggerState.put(a, state); | |
166 | - return state; | |
167 | - }); | |
168 | - return triggerRuleState; | |
95 | + if(triggerState.containsKey(deviceId)){ | |
96 | + return triggerState.get(deviceId); | |
97 | + } | |
98 | + TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); | |
99 | + TriggerDTO trigger =triggerService.getTrigger(reactId,deviceId); | |
100 | + if(trigger != null){ | |
101 | + AlarmCondition condition = trigger.getTriggerCondition(); | |
102 | + Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); | |
103 | + for(AlarmConditionFilter filter :condition.getCondition()){ | |
104 | + filterKeys.add(filter.getKey()); | |
105 | + } | |
106 | + TriggerState state = new TriggerState(deviceId,condition, filterKeys,null); | |
107 | + triggerState.put(deviceId, state); | |
108 | + return state; | |
109 | + } | |
110 | + return null; | |
169 | 111 | } |
170 | 112 | |
171 | 113 | private void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult,DoAction action) { | ... | ... |
... | ... | @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.yunteng.scene; |
18 | 18 | import com.fasterxml.jackson.databind.JsonNode; |
19 | 19 | import com.fasterxml.jackson.databind.node.ObjectNode; |
20 | 20 | import com.google.common.util.concurrent.ListenableFuture; |
21 | +import com.google.gson.JsonParser; | |
21 | 22 | import lombok.Data; |
22 | 23 | import lombok.extern.slf4j.Slf4j; |
23 | 24 | import org.apache.commons.lang3.StringUtils; |
... | ... | @@ -28,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; |
28 | 29 | import org.thingsboard.rule.engine.profile.*; |
29 | 30 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; |
30 | 31 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmState; |
32 | +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; | |
31 | 33 | import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; |
32 | 34 | import org.thingsboard.server.common.data.DataConstants; |
33 | 35 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -46,6 +48,8 @@ import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; |
46 | 48 | import org.thingsboard.server.common.msg.TbMsg; |
47 | 49 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
48 | 50 | import org.thingsboard.server.common.msg.queue.ServiceQueue; |
51 | +import org.thingsboard.server.common.msg.session.SessionMsgType; | |
52 | +import org.thingsboard.server.common.transport.adaptor.JsonConverter; | |
49 | 53 | import org.thingsboard.server.dao.alarm.AlarmOperationResult; |
50 | 54 | import org.thingsboard.server.dao.sql.query.EntityKeyMapping; |
51 | 55 | import org.thingsboard.server.dao.yunteng.service.TriggerService; |
... | ... | @@ -64,7 +68,6 @@ class TriggerState { |
64 | 68 | private volatile boolean initialFetchDone; |
65 | 69 | private volatile TbMsgMetaData lastMsgMetaData; |
66 | 70 | private volatile String lastMsgQueueName; |
67 | - private volatile DataSnapshot dataSnapshot; | |
68 | 71 | private final DynamicPredicateValueCtx dynamicPredicateValueCtx; |
69 | 72 | private DataSnapshot latestValues; |
70 | 73 | |
... | ... | @@ -79,18 +82,29 @@ class TriggerState { |
79 | 82 | } |
80 | 83 | |
81 | 84 | |
82 | - public boolean process(TbContext ctx, TbMsg msg, SnapshotUpdate update) throws ExecutionException, InterruptedException { | |
85 | + public boolean process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { | |
83 | 86 | if (latestValues == null) { |
84 | 87 | latestValues = fetchLatestValues(ctx, originator); |
85 | 88 | } |
86 | 89 | lastMsgMetaData = msg.getMetaData(); |
87 | 90 | lastMsgQueueName = msg.getQueueName(); |
88 | - this.dataSnapshot = latestValues; | |
89 | - return createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval); | |
91 | + SnapshotUpdate update = null; | |
92 | + if(msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())){ | |
93 | + update = processTelemetry(ctx,msg); | |
94 | + }else{ | |
95 | + update = processAttributes(ctx,msg); | |
96 | + } | |
97 | + | |
98 | + if (update != null && update.hasUpdate()) { | |
99 | + return createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval); | |
100 | + } | |
101 | + return false; | |
90 | 102 | } |
91 | 103 | |
104 | + | |
105 | + | |
92 | 106 | public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { |
93 | - return createOrClearAlarms(ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, dataSnapshot)); | |
107 | + return createOrClearAlarms(ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, latestValues)); | |
94 | 108 | } |
95 | 109 | |
96 | 110 | public <T> boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction<TriggerRuleState, T, AlarmEvalResult> evalFunction) { |
... | ... | @@ -101,11 +115,21 @@ class TriggerState { |
101 | 115 | AlarmEvalResult evalResult = evalFunction.apply(ruleState, data); |
102 | 116 | if (AlarmEvalResult.TRUE.equals(evalResult)) { |
103 | 117 | stateUpdate = true; |
118 | + } else if (AlarmEvalResult.FALSE.equals(evalResult)) { | |
119 | + stateUpdate = clearAlarmState(stateUpdate, ruleState); | |
104 | 120 | } |
105 | 121 | return stateUpdate; |
106 | 122 | } |
107 | 123 | |
108 | 124 | |
125 | + public boolean clearAlarmState(boolean stateUpdate, TriggerRuleState state) { | |
126 | + if (state != null) { | |
127 | + state.clear(); | |
128 | + stateUpdate |= state.checkUpdate(); | |
129 | + } | |
130 | + return stateUpdate; | |
131 | + } | |
132 | + | |
109 | 133 | public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState state) { |
110 | 134 | if (update != null) { |
111 | 135 | //Check that the update type and that keys match. |
... | ... | @@ -152,7 +176,24 @@ class TriggerState { |
152 | 176 | |
153 | 177 | |
154 | 178 | |
179 | + protected SnapshotUpdate processAttributes(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { | |
180 | + Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); | |
181 | + if (!attributes.isEmpty()) { | |
182 | + return merge(attributes); | |
183 | + } | |
184 | + return null; | |
185 | + } | |
155 | 186 | |
187 | + protected SnapshotUpdate processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { | |
188 | + Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); | |
189 | + for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) { | |
190 | + Long ts = entry.getKey(); | |
191 | + List<KvEntry> data = entry.getValue(); | |
192 | + return merge( ts, data); | |
193 | + } | |
194 | + | |
195 | + return null; | |
196 | + } | |
156 | 197 | |
157 | 198 | |
158 | 199 | |
... | ... | @@ -238,7 +279,7 @@ class TriggerState { |
238 | 279 | throw new RuntimeException("Can't parse entry: " + entry.getDataType()); |
239 | 280 | } |
240 | 281 | } |
241 | - SnapshotUpdate merge( Long newTs, List<KvEntry> data) { | |
282 | + private SnapshotUpdate merge( Long newTs, List<KvEntry> data) { | |
242 | 283 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); |
243 | 284 | for (KvEntry entry : data) { |
244 | 285 | AlarmConditionFilterKey entityKey = new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey()); |
... | ... | @@ -250,7 +291,7 @@ class TriggerState { |
250 | 291 | return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys); |
251 | 292 | } |
252 | 293 | |
253 | - SnapshotUpdate merge(Set<AttributeKvEntry> attributes) { | |
294 | + private SnapshotUpdate merge(Set<AttributeKvEntry> attributes) { | |
254 | 295 | long newTs = 0; |
255 | 296 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); |
256 | 297 | for (AttributeKvEntry entry : attributes) { | ... | ... |