Showing
2 changed files
with
73 additions
and
90 deletions
@@ -4,31 +4,19 @@ | @@ -4,31 +4,19 @@ | ||
4 | package org.thingsboard.rule.engine.yunteng.scene; | 4 | package org.thingsboard.rule.engine.yunteng.scene; |
5 | 5 | ||
6 | import com.fasterxml.jackson.databind.JsonNode; | 6 | import com.fasterxml.jackson.databind.JsonNode; |
7 | -import com.google.gson.JsonParser; | ||
8 | import lombok.extern.slf4j.Slf4j; | 7 | import lombok.extern.slf4j.Slf4j; |
9 | import org.thingsboard.common.util.JacksonUtil; | 8 | import org.thingsboard.common.util.JacksonUtil; |
10 | import org.thingsboard.rule.engine.action.TbAlarmResult; | 9 | import org.thingsboard.rule.engine.action.TbAlarmResult; |
11 | import org.thingsboard.rule.engine.api.TbContext; | 10 | import org.thingsboard.rule.engine.api.TbContext; |
12 | -import org.thingsboard.rule.engine.profile.*; | ||
13 | -import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; | ||
14 | -import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; | ||
15 | -import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; | ||
16 | import org.thingsboard.server.common.data.DataConstants; | 11 | import org.thingsboard.server.common.data.DataConstants; |
17 | -import org.thingsboard.server.common.data.device.profile.*; | ||
18 | -import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException; | ||
19 | -import org.thingsboard.server.common.data.id.EntityId; | ||
20 | -import org.thingsboard.server.common.data.id.TenantId; | ||
21 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | ||
22 | -import org.thingsboard.server.common.data.kv.KvEntry; | ||
23 | -import org.thingsboard.server.common.data.query.EntityKeyType; | 12 | +import org.thingsboard.server.common.data.device.profile.AlarmCondition; |
13 | +import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter; | ||
14 | +import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey; | ||
24 | import org.thingsboard.server.common.data.rule.RuleNodeState; | 15 | import org.thingsboard.server.common.data.rule.RuleNodeState; |
25 | import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO; | 16 | import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO; |
26 | import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; | 17 | import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; |
27 | import org.thingsboard.server.common.msg.TbMsg; | 18 | import org.thingsboard.server.common.msg.TbMsg; |
28 | import org.thingsboard.server.common.msg.TbMsgMetaData; | 19 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
29 | -import org.thingsboard.server.common.msg.queue.ServiceQueue; | ||
30 | -import org.thingsboard.server.common.msg.session.SessionMsgType; | ||
31 | -import org.thingsboard.server.common.transport.adaptor.JsonConverter; | ||
32 | import org.thingsboard.server.dao.yunteng.entities.DoAction; | 20 | import org.thingsboard.server.dao.yunteng.entities.DoAction; |
33 | import org.thingsboard.server.dao.yunteng.entities.DoCondition; | 21 | import org.thingsboard.server.dao.yunteng.entities.DoCondition; |
34 | import org.thingsboard.server.dao.yunteng.service.DoActionService; | 22 | import org.thingsboard.server.dao.yunteng.service.DoActionService; |
@@ -36,6 +24,7 @@ import org.thingsboard.server.dao.yunteng.service.DoConditionService; | @@ -36,6 +24,7 @@ import org.thingsboard.server.dao.yunteng.service.DoConditionService; | ||
36 | import org.thingsboard.server.dao.yunteng.service.TriggerService; | 24 | import org.thingsboard.server.dao.yunteng.service.TriggerService; |
37 | 25 | ||
38 | import java.util.*; | 26 | import java.util.*; |
27 | +import java.util.concurrent.ConcurrentHashMap; | ||
39 | import java.util.concurrent.ExecutionException; | 28 | import java.util.concurrent.ExecutionException; |
40 | 29 | ||
41 | @Slf4j | 30 | @Slf4j |
@@ -48,7 +37,7 @@ class ReactState { | @@ -48,7 +37,7 @@ class ReactState { | ||
48 | * 键:设备主键 | 37 | * 键:设备主键 |
49 | * 值:设备指标参与的触发器 | 38 | * 值:设备指标参与的触发器 |
50 | */ | 39 | */ |
51 | - private Map<String, TriggerState> triggerState; | 40 | + private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>(); |
52 | 41 | ||
53 | 42 | ||
54 | 43 | ||
@@ -78,21 +67,17 @@ class ReactState { | @@ -78,21 +67,17 @@ class ReactState { | ||
78 | 67 | ||
79 | 68 | ||
80 | public void process(TbContext ctx, TbMsg msg,String deviceId) throws ExecutionException, InterruptedException { | 69 | public void process(TbContext ctx, TbMsg msg,String deviceId) throws ExecutionException, InterruptedException { |
81 | - TriggerState triggerRuleState = getOrCreateTriggerState(deviceId); | 70 | + TriggerState triggerState = getOrCreateTriggerState(deviceId); |
82 | boolean matched = false; | 71 | boolean matched = false; |
83 | - if(triggerRuleState != null && actions != null){ | ||
84 | - if(msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())){ | ||
85 | - matched = processTelemetry(ctx,msg,triggerRuleState); | ||
86 | - }else{ | ||
87 | - matched = processAttributes(ctx,msg,triggerRuleState); | ||
88 | - } | 72 | + if(triggerState != null && actions != null){ |
73 | + matched = triggerState.process(ctx,msg); | ||
89 | } | 74 | } |
90 | 75 | ||
91 | if(matched){ | 76 | if(matched){ |
92 | // TODO 执行条件 | 77 | // TODO 执行条件 |
93 | } | 78 | } |
94 | 79 | ||
95 | - if(true){ | 80 | + if(matched){ |
96 | // TODO 输出动作 | 81 | // TODO 输出动作 |
97 | for(DoAction item: actions){ | 82 | for(DoAction item: actions){ |
98 | // pushMsg(); | 83 | // pushMsg(); |
@@ -103,69 +88,26 @@ class ReactState { | @@ -103,69 +88,26 @@ class ReactState { | ||
103 | 88 | ||
104 | 89 | ||
105 | 90 | ||
106 | - private boolean processAttributes(TbContext ctx, TbMsg msg,TriggerState triggerRuleState) throws ExecutionException, InterruptedException { | ||
107 | - boolean stateChanged = false; | ||
108 | - Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); | ||
109 | - if (!attributes.isEmpty()) { | ||
110 | - SnapshotUpdate update = triggerRuleState.merge(attributes); | ||
111 | - stateChanged |= triggerRuleState.process(ctx, msg, update); | ||
112 | - | ||
113 | - } | ||
114 | - return stateChanged; | ||
115 | - } | ||
116 | - | ||
117 | - protected boolean processTelemetry(TbContext ctx, TbMsg msg,TriggerState triggerRuleState) throws ExecutionException, InterruptedException { | ||
118 | - boolean stateChanged = false; | ||
119 | - Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); | ||
120 | - for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) { | ||
121 | - Long ts = entry.getKey(); | ||
122 | - List<KvEntry> data = entry.getValue(); | ||
123 | - SnapshotUpdate update = triggerRuleState.merge( ts, data); | ||
124 | - if (update.hasUpdate()) { | ||
125 | - stateChanged |= triggerRuleState.process(ctx, msg, update); | ||
126 | - } | ||
127 | - } | ||
128 | - | ||
129 | - return stateChanged; | ||
130 | - } | ||
131 | - | ||
132 | - | ||
133 | - | ||
134 | - private static EntityKeyType getKeyTypeFromScope(String scope) { | ||
135 | - switch (scope) { | ||
136 | - case DataConstants.CLIENT_SCOPE: | ||
137 | - return EntityKeyType.CLIENT_ATTRIBUTE; | ||
138 | - case DataConstants.SHARED_SCOPE: | ||
139 | - return EntityKeyType.SHARED_ATTRIBUTE; | ||
140 | - case DataConstants.SERVER_SCOPE: | ||
141 | - return EntityKeyType.SERVER_ATTRIBUTE; | ||
142 | - } | ||
143 | - return EntityKeyType.ATTRIBUTE; | ||
144 | - } | ||
145 | - | ||
146 | - | ||
147 | - | ||
148 | 91 | ||
149 | 92 | ||
150 | 93 | ||
151 | protected TriggerState getOrCreateTriggerState(String deviceId) { | 94 | protected TriggerState getOrCreateTriggerState(String deviceId) { |
152 | - TriggerState triggerRuleState = triggerState.computeIfAbsent(deviceId | ||
153 | - ,a -> { | ||
154 | - TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); | ||
155 | - TriggerDTO trigger =triggerService.getTrigger(reactId,deviceId); | ||
156 | - if(trigger == null){ | ||
157 | - return null; | ||
158 | - } | ||
159 | - AlarmCondition condition = trigger.getTriggerCondition(); | ||
160 | - Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); | ||
161 | - for(AlarmConditionFilter filter :condition.getCondition()){ | ||
162 | - filterKeys.add(filter.getKey()); | ||
163 | - } | ||
164 | - TriggerState state = new TriggerState(deviceId,condition, filterKeys,null); | ||
165 | - triggerState.put(a, state); | ||
166 | - return state; | ||
167 | - }); | ||
168 | - return triggerRuleState; | 95 | + if(triggerState.containsKey(deviceId)){ |
96 | + return triggerState.get(deviceId); | ||
97 | + } | ||
98 | + TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); | ||
99 | + TriggerDTO trigger =triggerService.getTrigger(reactId,deviceId); | ||
100 | + if(trigger != null){ | ||
101 | + AlarmCondition condition = trigger.getTriggerCondition(); | ||
102 | + Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); | ||
103 | + for(AlarmConditionFilter filter :condition.getCondition()){ | ||
104 | + filterKeys.add(filter.getKey()); | ||
105 | + } | ||
106 | + TriggerState state = new TriggerState(deviceId,condition, filterKeys,null); | ||
107 | + triggerState.put(deviceId, state); | ||
108 | + return state; | ||
109 | + } | ||
110 | + return null; | ||
169 | } | 111 | } |
170 | 112 | ||
171 | private void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult,DoAction action) { | 113 | private void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult,DoAction action) { |
@@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.yunteng.scene; | @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.yunteng.scene; | ||
18 | import com.fasterxml.jackson.databind.JsonNode; | 18 | import com.fasterxml.jackson.databind.JsonNode; |
19 | import com.fasterxml.jackson.databind.node.ObjectNode; | 19 | import com.fasterxml.jackson.databind.node.ObjectNode; |
20 | import com.google.common.util.concurrent.ListenableFuture; | 20 | import com.google.common.util.concurrent.ListenableFuture; |
21 | +import com.google.gson.JsonParser; | ||
21 | import lombok.Data; | 22 | import lombok.Data; |
22 | import lombok.extern.slf4j.Slf4j; | 23 | import lombok.extern.slf4j.Slf4j; |
23 | import org.apache.commons.lang3.StringUtils; | 24 | import org.apache.commons.lang3.StringUtils; |
@@ -28,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; | @@ -28,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; | ||
28 | import org.thingsboard.rule.engine.profile.*; | 29 | import org.thingsboard.rule.engine.profile.*; |
29 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; | 30 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; |
30 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmState; | 31 | import org.thingsboard.rule.engine.profile.state.PersistedAlarmState; |
32 | +import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; | ||
31 | import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; | 33 | import org.thingsboard.rule.engine.yunteng.utils.TriggerRuleState; |
32 | import org.thingsboard.server.common.data.DataConstants; | 34 | import org.thingsboard.server.common.data.DataConstants; |
33 | import org.thingsboard.server.common.data.Device; | 35 | import org.thingsboard.server.common.data.Device; |
@@ -46,6 +48,8 @@ import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; | @@ -46,6 +48,8 @@ import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; | ||
46 | import org.thingsboard.server.common.msg.TbMsg; | 48 | import org.thingsboard.server.common.msg.TbMsg; |
47 | import org.thingsboard.server.common.msg.TbMsgMetaData; | 49 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
48 | import org.thingsboard.server.common.msg.queue.ServiceQueue; | 50 | import org.thingsboard.server.common.msg.queue.ServiceQueue; |
51 | +import org.thingsboard.server.common.msg.session.SessionMsgType; | ||
52 | +import org.thingsboard.server.common.transport.adaptor.JsonConverter; | ||
49 | import org.thingsboard.server.dao.alarm.AlarmOperationResult; | 53 | import org.thingsboard.server.dao.alarm.AlarmOperationResult; |
50 | import org.thingsboard.server.dao.sql.query.EntityKeyMapping; | 54 | import org.thingsboard.server.dao.sql.query.EntityKeyMapping; |
51 | import org.thingsboard.server.dao.yunteng.service.TriggerService; | 55 | import org.thingsboard.server.dao.yunteng.service.TriggerService; |
@@ -64,7 +68,6 @@ class TriggerState { | @@ -64,7 +68,6 @@ class TriggerState { | ||
64 | private volatile boolean initialFetchDone; | 68 | private volatile boolean initialFetchDone; |
65 | private volatile TbMsgMetaData lastMsgMetaData; | 69 | private volatile TbMsgMetaData lastMsgMetaData; |
66 | private volatile String lastMsgQueueName; | 70 | private volatile String lastMsgQueueName; |
67 | - private volatile DataSnapshot dataSnapshot; | ||
68 | private final DynamicPredicateValueCtx dynamicPredicateValueCtx; | 71 | private final DynamicPredicateValueCtx dynamicPredicateValueCtx; |
69 | private DataSnapshot latestValues; | 72 | private DataSnapshot latestValues; |
70 | 73 | ||
@@ -79,18 +82,29 @@ class TriggerState { | @@ -79,18 +82,29 @@ class TriggerState { | ||
79 | } | 82 | } |
80 | 83 | ||
81 | 84 | ||
82 | - public boolean process(TbContext ctx, TbMsg msg, SnapshotUpdate update) throws ExecutionException, InterruptedException { | 85 | + public boolean process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { |
83 | if (latestValues == null) { | 86 | if (latestValues == null) { |
84 | latestValues = fetchLatestValues(ctx, originator); | 87 | latestValues = fetchLatestValues(ctx, originator); |
85 | } | 88 | } |
86 | lastMsgMetaData = msg.getMetaData(); | 89 | lastMsgMetaData = msg.getMetaData(); |
87 | lastMsgQueueName = msg.getQueueName(); | 90 | lastMsgQueueName = msg.getQueueName(); |
88 | - this.dataSnapshot = latestValues; | ||
89 | - return createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval); | 91 | + SnapshotUpdate update = null; |
92 | + if(msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())){ | ||
93 | + update = processTelemetry(ctx,msg); | ||
94 | + }else{ | ||
95 | + update = processAttributes(ctx,msg); | ||
96 | + } | ||
97 | + | ||
98 | + if (update != null && update.hasUpdate()) { | ||
99 | + return createOrClearAlarms(ctx, msg, latestValues, update, TriggerRuleState::eval); | ||
100 | + } | ||
101 | + return false; | ||
90 | } | 102 | } |
91 | 103 | ||
104 | + | ||
105 | + | ||
92 | public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { | 106 | public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { |
93 | - return createOrClearAlarms(ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, dataSnapshot)); | 107 | + return createOrClearAlarms(ctx, null, ts, null, (alarmState, tsParam) -> alarmState.eval(tsParam, latestValues)); |
94 | } | 108 | } |
95 | 109 | ||
96 | public <T> boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction<TriggerRuleState, T, AlarmEvalResult> evalFunction) { | 110 | public <T> boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction<TriggerRuleState, T, AlarmEvalResult> evalFunction) { |
@@ -101,11 +115,21 @@ class TriggerState { | @@ -101,11 +115,21 @@ class TriggerState { | ||
101 | AlarmEvalResult evalResult = evalFunction.apply(ruleState, data); | 115 | AlarmEvalResult evalResult = evalFunction.apply(ruleState, data); |
102 | if (AlarmEvalResult.TRUE.equals(evalResult)) { | 116 | if (AlarmEvalResult.TRUE.equals(evalResult)) { |
103 | stateUpdate = true; | 117 | stateUpdate = true; |
118 | + } else if (AlarmEvalResult.FALSE.equals(evalResult)) { | ||
119 | + stateUpdate = clearAlarmState(stateUpdate, ruleState); | ||
104 | } | 120 | } |
105 | return stateUpdate; | 121 | return stateUpdate; |
106 | } | 122 | } |
107 | 123 | ||
108 | 124 | ||
125 | + public boolean clearAlarmState(boolean stateUpdate, TriggerRuleState state) { | ||
126 | + if (state != null) { | ||
127 | + state.clear(); | ||
128 | + stateUpdate |= state.checkUpdate(); | ||
129 | + } | ||
130 | + return stateUpdate; | ||
131 | + } | ||
132 | + | ||
109 | public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState state) { | 133 | public boolean validateUpdate(SnapshotUpdate update, TriggerRuleState state) { |
110 | if (update != null) { | 134 | if (update != null) { |
111 | //Check that the update type and that keys match. | 135 | //Check that the update type and that keys match. |
@@ -152,7 +176,24 @@ class TriggerState { | @@ -152,7 +176,24 @@ class TriggerState { | ||
152 | 176 | ||
153 | 177 | ||
154 | 178 | ||
179 | + protected SnapshotUpdate processAttributes(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { | ||
180 | + Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); | ||
181 | + if (!attributes.isEmpty()) { | ||
182 | + return merge(attributes); | ||
183 | + } | ||
184 | + return null; | ||
185 | + } | ||
155 | 186 | ||
187 | + protected SnapshotUpdate processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { | ||
188 | + Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); | ||
189 | + for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) { | ||
190 | + Long ts = entry.getKey(); | ||
191 | + List<KvEntry> data = entry.getValue(); | ||
192 | + return merge( ts, data); | ||
193 | + } | ||
194 | + | ||
195 | + return null; | ||
196 | + } | ||
156 | 197 | ||
157 | 198 | ||
158 | 199 | ||
@@ -238,7 +279,7 @@ class TriggerState { | @@ -238,7 +279,7 @@ class TriggerState { | ||
238 | throw new RuntimeException("Can't parse entry: " + entry.getDataType()); | 279 | throw new RuntimeException("Can't parse entry: " + entry.getDataType()); |
239 | } | 280 | } |
240 | } | 281 | } |
241 | - SnapshotUpdate merge( Long newTs, List<KvEntry> data) { | 282 | + private SnapshotUpdate merge( Long newTs, List<KvEntry> data) { |
242 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); | 283 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); |
243 | for (KvEntry entry : data) { | 284 | for (KvEntry entry : data) { |
244 | AlarmConditionFilterKey entityKey = new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey()); | 285 | AlarmConditionFilterKey entityKey = new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, entry.getKey()); |
@@ -250,7 +291,7 @@ class TriggerState { | @@ -250,7 +291,7 @@ class TriggerState { | ||
250 | return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys); | 291 | return new SnapshotUpdate(AlarmConditionKeyType.TIME_SERIES, keys); |
251 | } | 292 | } |
252 | 293 | ||
253 | - SnapshotUpdate merge(Set<AttributeKvEntry> attributes) { | 294 | + private SnapshotUpdate merge(Set<AttributeKvEntry> attributes) { |
254 | long newTs = 0; | 295 | long newTs = 0; |
255 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); | 296 | Set<AlarmConditionFilterKey> keys = new HashSet<>(); |
256 | for (AttributeKvEntry entry : attributes) { | 297 | for (AttributeKvEntry entry : attributes) { |