Commit 8c51655d6d42d842600726ef331f1dde5c780fb0

Authored by 芯火源
1 parent 59e99111

fix: 场景联动命令下发同步异步问题修复

@@ -171,5 +171,7 @@ public interface FastIotConstants { @@ -171,5 +171,7 @@ public interface FastIotConstants {
171 public static String TARGET_ID = "target"; 171 public static String TARGET_ID = "target";
172 /**实控设备名称*/ 172 /**实控设备名称*/
173 public static String TARGET_NAME = "deviceName"; 173 public static String TARGET_NAME = "deviceName";
  174 + /**RPC单项双向*/
  175 + public static String ONEWAY = "oneway";
174 } 176 }
175 } 177 }
1 -/**  
2 - * 设备场景联动状态  
3 - */ 1 +/** 设备场景联动状态 */
4 package org.thingsboard.rule.engine.yunteng.scene; 2 package org.thingsboard.rule.engine.yunteng.scene;
5 3
6 import com.fasterxml.jackson.databind.JsonNode; 4 import com.fasterxml.jackson.databind.JsonNode;
7 import com.fasterxml.jackson.databind.node.ObjectNode; 5 import com.fasterxml.jackson.databind.node.ObjectNode;
  6 +import java.util.HashSet;
  7 +import java.util.List;
  8 +import java.util.Set;
  9 +import java.util.UUID;
  10 +import java.util.concurrent.ConcurrentHashMap;
  11 +import java.util.concurrent.ExecutionException;
8 import lombok.extern.slf4j.Slf4j; 12 import lombok.extern.slf4j.Slf4j;
9 import org.apache.commons.lang3.StringUtils; 13 import org.apache.commons.lang3.StringUtils;
10 import org.jetbrains.annotations.NotNull; 14 import org.jetbrains.annotations.NotNull;
11 import org.thingsboard.rule.engine.api.TbContext; 15 import org.thingsboard.rule.engine.api.TbContext;
12 import org.thingsboard.server.common.data.DataConstants; 16 import org.thingsboard.server.common.data.DataConstants;
13 -import org.thingsboard.server.common.data.EntityType;  
14 import org.thingsboard.server.common.data.alarm.Alarm; 17 import org.thingsboard.server.common.data.alarm.Alarm;
15 import org.thingsboard.server.common.data.alarm.AlarmStatus; 18 import org.thingsboard.server.common.data.alarm.AlarmStatus;
16 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter; 19 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter;
17 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey; 20 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey;
18 import org.thingsboard.server.common.data.device.profile.AlarmRule; 21 import org.thingsboard.server.common.data.device.profile.AlarmRule;
19 import org.thingsboard.server.common.data.id.DeviceId; 22 import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
20 import org.thingsboard.server.common.data.yunteng.dto.ActionAlarmDTO; 24 import org.thingsboard.server.common.data.yunteng.dto.ActionAlarmDTO;
21 import org.thingsboard.server.common.data.yunteng.dto.AlarmInfoDTO; 25 import org.thingsboard.server.common.data.yunteng.dto.AlarmInfoDTO;
22 import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO; 26 import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO;
23 -import org.thingsboard.server.common.data.yunteng.enums.ActionTypeEnum;  
24 -import org.thingsboard.server.common.data.yunteng.enums.ScopeEnum;  
25 -import org.thingsboard.server.common.data.yunteng.enums.TkAlarmSeverity;  
26 -import org.thingsboard.server.common.data.yunteng.enums.TriggerTypeEnum; 27 +import org.thingsboard.server.common.data.yunteng.enums.*;
27 import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil; 28 import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil;
28 import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils; 29 import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils;
29 import org.thingsboard.server.common.msg.TbMsg; 30 import org.thingsboard.server.common.msg.TbMsg;
@@ -33,368 +34,371 @@ import org.thingsboard.server.dao.yunteng.entities.TkDoActionEntity; @@ -33,368 +34,371 @@ import org.thingsboard.server.dao.yunteng.entities.TkDoActionEntity;
33 import org.thingsboard.server.dao.yunteng.entities.TkDoConditionEntity; 34 import org.thingsboard.server.dao.yunteng.entities.TkDoConditionEntity;
34 import org.thingsboard.server.dao.yunteng.service.*; 35 import org.thingsboard.server.dao.yunteng.service.*;
35 36
36 -import java.util.HashSet;  
37 -import java.util.List;  
38 -import java.util.Set;  
39 -import java.util.UUID;  
40 -import java.util.concurrent.ConcurrentHashMap;  
41 -import java.util.concurrent.ExecutionException;  
42 -  
43 @Slf4j 37 @Slf4j
44 class ReactState { 38 class ReactState {
45 39
46 - /**  
47 - * 场景联动主键  
48 - */  
49 - String reactId;  
50 - String reactName;  
51 - String orgId;  
52 -  
53 - /**  
54 - * 场景联动的触发器状态  
55 - * 键:触发器ID和设备ID为键  
56 - * 值:设备指标参与的触发器  
57 - */  
58 - private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>();  
59 -  
60 - /**  
61 - * 场景联动的告警清除触发器状态  
62 - * 键:设备ID为键  
63 - * 值:设备指标参与的触发器  
64 - */  
65 - private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>();  
66 - /**  
67 - * 设备告警信息  
68 - * 键:设备ID为键  
69 - * 值:设备最新告警信息  
70 - */  
71 - private ConcurrentHashMap<String, Alarm> currentAlarms = new ConcurrentHashMap<>();  
72 -  
73 - /**  
74 - * 场景联动的执行条件状态  
75 - * 键:触发器ID和设备ID为键  
76 - * 值:设备指标参与的触发器  
77 - */  
78 - private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>();  
79 - 40 + /** 场景联动主键 */
  41 + String reactId;
  42 +
  43 + String reactName;
  44 + String orgId;
  45 +
  46 + /** 场景联动的触发器状态 键:触发器ID和设备ID为键 值:设备指标参与的触发器 */
  47 + private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>();
  48 +
  49 + /** 场景联动的告警清除触发器状态 键:设备ID为键 值:设备指标参与的触发器 */
  50 + private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>();
  51 + /** 设备告警信息 键:设备ID为键 值:设备最新告警信息 */
  52 + private ConcurrentHashMap<String, Alarm> currentAlarms = new ConcurrentHashMap<>();
  53 +
  54 + /** 场景联动的执行条件状态 键:触发器ID和设备ID为键 值:设备指标参与的触发器 */
  55 + private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>();
  56 +
  57 + private final List<TriggerDTO> triggers;
  58 + private TkDoActionEntity alarmAction = null;
  59 + /** 【场景联动的执行条件】懒加载 */
  60 + private final List<TkDoConditionEntity> conditions;
  61 + /** 【场景联动的执行集合】懒加载 */
  62 + private final List<TkDoActionEntity> actions;
  63 +
  64 + private final TkNoticeService noticeService;
  65 + private TkDeviceService ytDeviceService;
  66 +
  67 + ReactState(String reactId, TbContext ctx, TbSceneReactNodeConfig config) {
  68 + this.reactId = reactId;
  69 + this.reactName = config.getNames().get(reactId);
  70 + this.orgId = config.getOrgs().get(reactId);
  71 + TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class);
  72 + this.triggers = triggerService.getTrigger(reactId);
  73 + DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class);
  74 + this.conditions = conditionService.getConditions(reactId);
  75 + DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class);
  76 + this.actions = actionService.getActionsByAll(reactId);
  77 + this.actions.addAll(actionService.getActionsByPart(reactId));
  78 + for (TkDoActionEntity action : actions) {
  79 + /** 动作中只有1个告警通知 */
  80 + if (ActionTypeEnum.MSG_NOTIFY.equals(action.getOutTarget())) {
  81 + this.alarmAction = action;
  82 + break;
  83 + }
  84 + }
  85 + this.noticeService = SpringBeanUtils.getBean(TkNoticeService.class);
  86 + this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
  87 + }
80 88
81 - private final List<TriggerDTO> triggers;  
82 - private TkDoActionEntity alarmAction = null;  
83 - /**  
84 - * 【场景联动的执行条件】懒加载  
85 - */  
86 - private final List<TkDoConditionEntity> conditions;  
87 - /**  
88 - * 【场景联动的执行集合】懒加载  
89 - */  
90 - private final List<TkDoActionEntity> actions;  
91 - private final TkNoticeService noticeService;  
92 - private TkDeviceService ytDeviceService; 89 + public void process(TbContext ctx, TbMsg msg, String profileId, String deviceId)
  90 + throws ExecutionException, InterruptedException {
93 91
  92 + StringBuilder detail = new StringBuilder();
  93 + if (actions == null) {
  94 + ctx.tellSuccess(msg);
  95 + }
94 96
95 - ReactState(String reactId, TbContext ctx, TbSceneReactNodeConfig config) {  
96 - this.reactId = reactId;  
97 - this.reactName = config.getNames().get(reactId);  
98 - this.orgId = config.getOrgs().get(reactId);  
99 - TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class);  
100 - this.triggers = triggerService.getTrigger(reactId);  
101 - DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class);  
102 - this.conditions = conditionService.getConditions(reactId);  
103 - DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class);  
104 - this.actions = actionService.getActionsByAll(reactId);  
105 - this.actions.addAll(actionService.getActionsByPart(reactId));  
106 - for (TkDoActionEntity action : actions) {  
107 - /**动作中只有1个告警通知*/  
108 - if (ActionTypeEnum.MSG_NOTIFY.equals(action.getOutTarget())) {  
109 - this.alarmAction = action;  
110 - break;  
111 - } 97 + boolean matched = false;
  98 + if (triggers == null || triggers.isEmpty()) {
  99 + matched = true;
  100 + } else {
  101 + boolean itemMatched = false;
  102 + for (TriggerDTO trigger : triggers) {
  103 + TriggerState triggerState = getOrCreateTriggerState(trigger, profileId, deviceId);
  104 + if (triggerState == null) {
  105 + continue;
  106 + }
  107 + itemMatched = triggerState.process(ctx, msg);
  108 + if (itemMatched) {
  109 + matched = true;
  110 + detail.append(
  111 + triggerState.getAlarmDetails() == null ? "" : triggerState.getAlarmDetails());
  112 + if (this.alarmAction != null) {
  113 + noticeMsg(
  114 + ctx,
  115 + msg,
  116 + alarmAction,
  117 + deviceId,
  118 + triggerState.getAlarmDetails(),
  119 + triggerState.getLatestValues().getTs());
  120 + }
  121 + } else if (currentAlarms.containsKey(deviceId) && this.alarmAction != null) {
  122 + // 清除设备告警
  123 + for (AlarmConditionFilterKey entityKey : triggerState.getEntityKeys()) {
  124 + clearAlarm(ctx, msg, deviceId, entityKey.getKey());
  125 + }
112 } 126 }
113 - this.noticeService = SpringBeanUtils.getBean(TkNoticeService.class);  
114 - this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class); 127 + }
115 } 128 }
116 129
117 -  
118 - public void process(TbContext ctx, TbMsg msg,String profileId, String deviceId) throws ExecutionException, InterruptedException {  
119 -  
120 -  
121 - StringBuilder detail = new StringBuilder();  
122 - if (actions == null) {  
123 - ctx.tellSuccess(msg); 130 + if (matched && conditions.size() > 0) {
  131 + matched = false;
  132 + for (TkDoConditionEntity item : conditions) {
  133 + List<String> entityIds = item.getEntityId();
  134 + if (entityIds == null || entityIds.isEmpty()) {
  135 + matched = true;
  136 + break;
124 } 137 }
125 -  
126 - boolean matched = false;  
127 - if (triggers == null || triggers.isEmpty()) { 138 + for (String id : entityIds) {
  139 + TriggerState conditionState = getOrCreateConditionState(item, profileId, id);
  140 + if (conditionState == null || conditionState.process(ctx, msg)) {
  141 + detail.append(";");
  142 + detail.append(conditionState.getAlarmDetails());
128 matched = true; 143 matched = true;
129 - } else {  
130 - boolean itemMatched = false;  
131 - for (TriggerDTO trigger : triggers) {  
132 - TriggerState triggerState = getOrCreateTriggerState(trigger,profileId, deviceId);  
133 - if (triggerState == null) {  
134 - continue;  
135 - }  
136 - itemMatched = triggerState.process(ctx, msg);  
137 - if (itemMatched) {  
138 - matched = true;  
139 - detail.append(triggerState.getAlarmDetails()==null?"":triggerState.getAlarmDetails());  
140 - if (this.alarmAction != null) {  
141 - noticeMsg(ctx, msg, alarmAction, deviceId, triggerState.getAlarmDetails(), triggerState.getLatestValues().getTs());  
142 - }  
143 - } else if (currentAlarms.containsKey(deviceId) && this.alarmAction != null) {  
144 - //清除设备告警  
145 - for (AlarmConditionFilterKey entityKey : triggerState.getEntityKeys()) {  
146 - clearAlarm(ctx, msg, deviceId, entityKey.getKey());  
147 - }  
148 - }  
149 - } 144 + break;
  145 + }
150 } 146 }
151 -  
152 -  
153 - if (matched && conditions.size() > 0) {  
154 - matched = false;  
155 - for (TkDoConditionEntity item : conditions) {  
156 - List<String> entityIds = item.getEntityId();  
157 - if (entityIds == null || entityIds.isEmpty()) {  
158 - matched = true;  
159 - break;  
160 - }  
161 - for (String id : entityIds) {  
162 - TriggerState conditionState = getOrCreateConditionState(item, profileId,id);  
163 - if (conditionState == null  
164 - || conditionState.process(ctx, msg)) {  
165 - detail.append(";");  
166 - detail.append(conditionState.getAlarmDetails());  
167 - matched = true;  
168 - break;  
169 - }  
170 - }  
171 - if (matched) {  
172 - break;  
173 - }  
174 - }  
175 - }  
176 -  
177 if (matched) { 147 if (matched) {
178 - for (TkDoActionEntity item : actions) {  
179 - if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {  
180 - continue;  
181 - }  
182 - pushMsg(ctx, msg, item, detail.toString());  
183 - }  
184 - } else {  
185 - ctx.tellSuccess(msg); 148 + break;
186 } 149 }
187 - 150 + }
188 } 151 }
189 152
190 -  
191 - protected TriggerState getOrCreateTriggerState(TriggerDTO trigger, String profileId, String deviceId) {  
192 - String triggerId = trigger.getId();  
193 - String cacheKey = triggerId + deviceId;  
194 - if (triggerState.containsKey(cacheKey)) {  
195 - return triggerState.get(cacheKey);  
196 - }  
197 - if ((trigger.getEntityType().equals(ScopeEnum.PART) && trigger.getEntityId().contains(deviceId))  
198 - ||(trigger.getEntityType().equals(ScopeEnum.ALL) && trigger.getDeviceProfileId().equals(profileId)) ) {  
199 - TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition());  
200 - triggerState.put(cacheKey, state);  
201 - return state; 153 + if (matched) {
  154 + for (TkDoActionEntity item : actions) {
  155 + if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {
  156 + continue;
202 } 157 }
203 - return null;  
204 - 158 + pushMsg(ctx, msg, item, detail.toString());
  159 + }
  160 + } else {
  161 + ctx.tellSuccess(msg);
205 } 162 }
206 -  
207 - protected TriggerState getOrCreateClearState(String deviceId, String key) {  
208 - String cacheKey = deviceId + key;  
209 - if (triggerState.containsKey(cacheKey)) {  
210 - return triggerState.get(cacheKey);  
211 - }  
212 - TriggerState state = null;  
213 - ActionAlarmDTO alarm = JacksonUtil.convertValue(alarmAction.getDoContext(), ActionAlarmDTO.class);  
214 - if (alarm != null && alarm.getClearRule() != null) {  
215 - for (TriggerDTO rule : alarm.getClearRule()) {  
216 - if ((ScopeEnum.PART.equals(rule.getEntityType()) && !rule.getEntityId().contains(deviceId))  
217 -// || !alarmAction.getDeviceId().contains(deviceId)  
218 - ) {  
219 - continue;  
220 - }  
221 -  
222 - for (AlarmConditionFilter filter : rule.getTriggerCondition().getCondition().getCondition()) {  
223 - String tempKey = filter.getKey().getKey();  
224 - if (key.equals(tempKey)) {  
225 - state = createTriggerState(deviceId, rule.getTriggerCondition());  
226 - triggerState.put(cacheKey, state);  
227 - break;  
228 - }  
229 - }  
230 - if (state != null) {  
231 - break;  
232 - }  
233 - }  
234 - return state;  
235 - }  
236 - return null;  
237 - 163 + }
  164 +
  165 + protected TriggerState getOrCreateTriggerState(
  166 + TriggerDTO trigger, String profileId, String deviceId) {
  167 + String triggerId = trigger.getId();
  168 + String cacheKey = triggerId + deviceId;
  169 + if (triggerState.containsKey(cacheKey)) {
  170 + return triggerState.get(cacheKey);
238 } 171 }
  172 + if ((trigger.getEntityType().equals(ScopeEnum.PART) && trigger.getEntityId().contains(deviceId))
  173 + || (trigger.getEntityType().equals(ScopeEnum.ALL)
  174 + && trigger.getDeviceProfileId().equals(profileId))) {
  175 + TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition());
  176 + triggerState.put(cacheKey, state);
  177 + return state;
  178 + }
  179 + return null;
  180 + }
239 181
240 - protected TriggerState getOrCreateConditionState(TkDoConditionEntity condition, String profileId,String deviceId) {  
241 - String cacheKey = condition.getId() + deviceId;  
242 - if (conditionState.containsKey(cacheKey)) {  
243 - return conditionState.get(cacheKey); 182 + protected TriggerState getOrCreateClearState(String deviceId, String key) {
  183 + String cacheKey = deviceId + key;
  184 + if (triggerState.containsKey(cacheKey)) {
  185 + return triggerState.get(cacheKey);
  186 + }
  187 + TriggerState state = null;
  188 + ActionAlarmDTO alarm =
  189 + JacksonUtil.convertValue(alarmAction.getDoContext(), ActionAlarmDTO.class);
  190 + if (alarm != null && alarm.getClearRule() != null) {
  191 + for (TriggerDTO rule : alarm.getClearRule()) {
  192 + if ((ScopeEnum.PART.equals(rule.getEntityType()) && !rule.getEntityId().contains(deviceId))
  193 + // || !alarmAction.getDeviceId().contains(deviceId)
  194 + ) {
  195 + continue;
244 } 196 }
245 - if ((condition.getEntityType().equals(ScopeEnum.PART) && condition.getEntityId().contains(deviceId) )  
246 - ||(condition.getEntityType().equals(ScopeEnum.ALL) && condition.getDeviceProfileId().equals(profileId)) ) {  
247 - TriggerState state = createTriggerState(deviceId, condition.getTriggerCondition()); 197 +
  198 + for (AlarmConditionFilter filter :
  199 + rule.getTriggerCondition().getCondition().getCondition()) {
  200 + String tempKey = filter.getKey().getKey();
  201 + if (key.equals(tempKey)) {
  202 + state = createTriggerState(deviceId, rule.getTriggerCondition());
248 triggerState.put(cacheKey, state); 203 triggerState.put(cacheKey, state);
249 - return state; 204 + break;
  205 + }
250 } 206 }
251 - return null;  
252 - }  
253 -  
254 - @NotNull  
255 - private TriggerState createTriggerState(String deviceId, AlarmRule rule) {  
256 - Set<AlarmConditionFilterKey> filterKeys = new HashSet<>();  
257 - for (AlarmConditionFilter filter : rule.getCondition().getCondition()) {  
258 - filterKeys.add(filter.getKey()); 207 + if (state != null) {
  208 + break;
259 } 209 }
260 - TriggerState state = new TriggerState(deviceId, rule, filterKeys, rule.getAlarmDetails(), null);  
261 -  
262 - return state; 210 + }
  211 + return state;
263 } 212 }
264 -  
265 - private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {  
266 - TbMsgMetaData metaData = //lastMsgMetaData != null ? lastMsgMetaData.copy() :  
267 - new TbMsgMetaData();  
268 - String relationType = "";  
269 - TbMsg newMsg = null;  
270 - switch (action.getOutTarget()) {  
271 - case DEVICE_OUT:  
272 - List<String> rpcDevices = action.getDeviceId();  
273 - if(ScopeEnum.ALL.equals(action.getEntityType())){  
274 - rpcDevices = ytDeviceService.rpcDevices(action.getTenantId(),orgId,action.getDeviceProfileId());  
275 - }  
276 - rpcMsg(ctx, msg,rpcDevices , action.getDoContext());  
277 - break;  
278 - case SCENE_ACT:  
279 - reactMsg(ctx, msg, action);  
280 - break;  
281 - default:  
282 - ctx.tellSuccess(msg);  
283 - break;  
284 - } 213 + return null;
  214 + }
  215 +
  216 + protected TriggerState getOrCreateConditionState(
  217 + TkDoConditionEntity condition, String profileId, String deviceId) {
  218 + String cacheKey = condition.getId() + deviceId;
  219 + if (conditionState.containsKey(cacheKey)) {
  220 + return conditionState.get(cacheKey);
285 } 221 }
286 -  
287 - /**  
288 - * 设备输出  
289 - *  
290 - * @param ctx  
291 - * @param msg  
292 - * @param devices  
293 - * @param context  
294 - */  
295 - private void rpcMsg(TbContext ctx, TbMsg msg, List<String> devices, JsonNode context) {  
296 - String lastMsgQueueName = msg.getQueueName();  
297 - TbMsgMetaData metaData = msg.getMetaData();  
298 - metaData.putValue(DataConstants.PERSISTENT,"true");  
299 - for (String id : devices) {  
300 - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN  
301 - , msg.getType()  
302 - , new DeviceId(UUID.fromString(id))  
303 - , msg != null ? msg.getCustomerId() : null  
304 - , metaData  
305 - , JacksonUtil.toString(context));  
306 - ctx.tellNext(newMsg, "RPC Request");  
307 - } 222 + if ((condition.getEntityType().equals(ScopeEnum.PART)
  223 + && condition.getEntityId().contains(deviceId))
  224 + || (condition.getEntityType().equals(ScopeEnum.ALL)
  225 + && condition.getDeviceProfileId().equals(profileId))) {
  226 + TriggerState state = createTriggerState(deviceId, condition.getTriggerCondition());
  227 + triggerState.put(cacheKey, state);
  228 + return state;
308 } 229 }
309 -  
310 - /**  
311 - * 告警通知  
312 - *  
313 - * @param ctx  
314 - * @param msg  
315 - * @param action  
316 - */  
317 - private void noticeMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String deviceId, String detailStr, long startTs) {  
318 -  
319 - DeviceId entityId = new DeviceId(UUID.fromString(deviceId));  
320 - ActionAlarmDTO actionAlarm = JacksonUtil.convertValue(action.getDoContext(), ActionAlarmDTO.class);  
321 -  
322 -  
323 - Alarm currentAlarm = new Alarm();  
324 - currentAlarm.setType(reactName);  
325 - currentAlarm.setStatus(AlarmStatus.ACTIVE_UNACK);  
326 - currentAlarm.setSeverity(actionAlarm.getAlarmLevel());  
327 - if (startTs == 0L) {  
328 - startTs = System.currentTimeMillis();  
329 - }  
330 - currentAlarm.setStartTs(startTs);  
331 - currentAlarm.setEndTs(currentAlarm.getStartTs());  
332 - ObjectNode detailData = JacksonUtil.newObjectNode();  
333 - if(StringUtils.isNotEmpty(detailStr)){  
334 - detailData.put("msg", detailStr);  
335 - }  
336 - detailData.put("data", JacksonUtil.toJsonNode(msg.getData()));  
337 - currentAlarm.setDetails(detailData);  
338 - currentAlarm.setOriginator(entityId);  
339 - currentAlarm.setTenantId(ctx.getTenantId());  
340 - currentAlarm.setPropagate(false);  
341 -  
342 - currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);  
343 - Alarm oldAlarm = currentAlarms.get(deviceId);  
344 - if(oldAlarm != null  
345 - && currentAlarm.getId().equals(oldAlarm.getId())  
346 - && currentAlarm.getSeverity().equals(oldAlarm.getSeverity())){  
347 - return;  
348 - }  
349 - ytDeviceService.freshAlarmStatus(entityId, 1);  
350 - currentAlarms.put(deviceId, currentAlarm);  
351 - alarmMsg(ctx, msg, currentAlarm, "Alarm Cleared");  
352 -  
353 - AlarmInfoDTO formData = new AlarmInfoDTO();  
354 - formData.setDeviceName(msg.getMetaData().getData().get("deviceName"));  
355 - formData.setDetails(JacksonUtil.toString(detailData));  
356 - formData.setType(currentAlarm.getType());  
357 - formData.setCreateTs(currentAlarm.getCreatedTime());  
358 - formData.setStartTs(currentAlarm.getStartTs());  
359 - formData.setEndTs(currentAlarm.getEndTs());  
360 - formData.setStatus(currentAlarm.getStatus().name());  
361 - formData.setDeviceId(deviceId);  
362 - formData.setTenantId(currentAlarm.getTenantId().getId().toString());  
363 - formData.setSeverity(TkAlarmSeverity.getLabel(currentAlarm.getSeverity().name()));  
364 - noticeService.alert(action.getAlarmProfileId(), formData); 230 + return null;
  231 + }
  232 +
  233 + @NotNull
  234 + private TriggerState createTriggerState(String deviceId, AlarmRule rule) {
  235 + Set<AlarmConditionFilterKey> filterKeys = new HashSet<>();
  236 + for (AlarmConditionFilter filter : rule.getCondition().getCondition()) {
  237 + filterKeys.add(filter.getKey());
365 } 238 }
366 -  
367 - private void clearAlarm(TbContext ctx, TbMsg msg, String deviceId, String key) throws ExecutionException, InterruptedException {  
368 - TriggerState clearState = getOrCreateClearState(deviceId, key);  
369 - if (clearState != null && clearState.process(ctx, msg)) {  
370 - ctx.getAlarmService().clearAlarmForResult(ctx.getTenantId(), currentAlarms.get(deviceId).getId(), null, System.currentTimeMillis());  
371 - ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);  
372 - alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");  
373 - currentAlarms.remove(deviceId); 239 + TriggerState state = new TriggerState(deviceId, rule, filterKeys, rule.getAlarmDetails(), null);
  240 +
  241 + return state;
  242 + }
  243 +
  244 + private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {
  245 + TbMsgMetaData metaData = // lastMsgMetaData != null ? lastMsgMetaData.copy() :
  246 + new TbMsgMetaData();
  247 + String relationType = "";
  248 + TbMsg newMsg = null;
  249 + switch (action.getOutTarget()) {
  250 + case DEVICE_OUT:
  251 + List<String> rpcDevices = action.getDeviceId();
  252 + if (ScopeEnum.ALL.equals(action.getEntityType())) {
  253 + rpcDevices =
  254 + ytDeviceService.rpcDevices(action.getTenantId(), orgId, action.getDeviceProfileId());
374 } 255 }
  256 + rpcMsg(ctx, msg, rpcDevices, action.getDoContext(), action.getCallType());
  257 + break;
  258 + case SCENE_ACT:
  259 + reactMsg(ctx, msg, action);
  260 + break;
  261 + default:
  262 + ctx.tellSuccess(msg);
  263 + break;
375 } 264 }
376 -  
377 - private void alarmMsg(TbContext ctx, TbMsg msg, Alarm alarm, String releationType) {  
378 - String lastMsgQueueName = msg.getQueueName();  
379 - TbMsgMetaData metaData = msg.getMetaData();  
380 - String data = JacksonUtil.valueToTree(alarm).toString();  
381 - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM",  
382 - alarm.getOriginator(), msg != null ? msg.getCustomerId() : null, metaData, data);  
383 - ctx.enqueueForTellNext(newMsg, releationType); 265 + }
  266 +
  267 + /**
  268 + * 设备输出
  269 + *
  270 + * @param ctx
  271 + * @param msg
  272 + * @param devices
  273 + * @param context
  274 + */
  275 + private void rpcMsg(
  276 + TbContext ctx, TbMsg msg, List<String> devices, JsonNode context, CallTypeEnum callType) {
  277 + String lastMsgQueueName = msg.getQueueName();
  278 + TbMsgMetaData metaData = msg.getMetaData();
  279 + metaData.putValue(DataConstants.PERSISTENT, "true");
  280 + metaData.putValue(
  281 + FastIotConstants.Rpc.ONEWAY, (callType == CallTypeEnum.SYNC ? true : false) + "");
  282 + for (String id : devices) {
  283 + TbMsg newMsg =
  284 + ctx.newMsg(
  285 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  286 + msg.getType(),
  287 + new DeviceId(UUID.fromString(id)),
  288 + msg != null ? msg.getCustomerId() : null,
  289 + metaData,
  290 + JacksonUtil.toString(context));
  291 + ctx.tellNext(newMsg, "RPC Request");
384 } 292 }
385 -  
386 -  
387 - private void reactMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action) {  
388 - //TODO: 场景联动关联消息通知  
389 - String lastMsgQueueName = msg.getQueueName();  
390 - TbMsgMetaData metaData = msg.getMetaData();  
391 - metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());  
392 - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN  
393 - , msg.getType()  
394 - , msg.getOriginator()  
395 - , msg != null ? msg.getCustomerId() : null  
396 - , metaData  
397 - , JacksonUtil.toString(action.getDoContext()));  
398 - ctx.tellNext(newMsg, "Message"); 293 + }
  294 +
  295 + /**
  296 + * 告警通知
  297 + *
  298 + * @param ctx
  299 + * @param msg
  300 + * @param action
  301 + */
  302 + private void noticeMsg(
  303 + TbContext ctx,
  304 + TbMsg msg,
  305 + TkDoActionEntity action,
  306 + String deviceId,
  307 + String detailStr,
  308 + long startTs) {
  309 +
  310 + DeviceId entityId = new DeviceId(UUID.fromString(deviceId));
  311 + ActionAlarmDTO actionAlarm =
  312 + JacksonUtil.convertValue(action.getDoContext(), ActionAlarmDTO.class);
  313 +
  314 + Alarm currentAlarm = new Alarm();
  315 + currentAlarm.setType(reactName);
  316 + currentAlarm.setStatus(AlarmStatus.ACTIVE_UNACK);
  317 + currentAlarm.setSeverity(actionAlarm.getAlarmLevel());
  318 + if (startTs == 0L) {
  319 + startTs = System.currentTimeMillis();
  320 + }
  321 + currentAlarm.setStartTs(startTs);
  322 + currentAlarm.setEndTs(currentAlarm.getStartTs());
  323 + ObjectNode detailData = JacksonUtil.newObjectNode();
  324 + if (StringUtils.isNotEmpty(detailStr)) {
  325 + detailData.put("msg", detailStr);
  326 + }
  327 + detailData.put("data", JacksonUtil.toJsonNode(msg.getData()));
  328 + currentAlarm.setDetails(detailData);
  329 + currentAlarm.setOriginator(entityId);
  330 + currentAlarm.setTenantId(ctx.getTenantId());
  331 + currentAlarm.setPropagate(false);
  332 +
  333 + currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
  334 + Alarm oldAlarm = currentAlarms.get(deviceId);
  335 + if (oldAlarm != null
  336 + && currentAlarm.getId().equals(oldAlarm.getId())
  337 + && currentAlarm.getSeverity().equals(oldAlarm.getSeverity())) {
  338 + return;
  339 + }
  340 + ytDeviceService.freshAlarmStatus(entityId, 1);
  341 + currentAlarms.put(deviceId, currentAlarm);
  342 + alarmMsg(ctx, msg, currentAlarm, "Alarm Cleared");
  343 +
  344 + AlarmInfoDTO formData = new AlarmInfoDTO();
  345 + formData.setDeviceName(msg.getMetaData().getData().get("deviceName"));
  346 + formData.setDetails(JacksonUtil.toString(detailData));
  347 + formData.setType(currentAlarm.getType());
  348 + formData.setCreateTs(currentAlarm.getCreatedTime());
  349 + formData.setStartTs(currentAlarm.getStartTs());
  350 + formData.setEndTs(currentAlarm.getEndTs());
  351 + formData.setStatus(currentAlarm.getStatus().name());
  352 + formData.setDeviceId(deviceId);
  353 + formData.setTenantId(currentAlarm.getTenantId().getId().toString());
  354 + formData.setSeverity(TkAlarmSeverity.getLabel(currentAlarm.getSeverity().name()));
  355 + noticeService.alert(action.getAlarmProfileId(), formData);
  356 + }
  357 +
  358 + private void clearAlarm(TbContext ctx, TbMsg msg, String deviceId, String key)
  359 + throws ExecutionException, InterruptedException {
  360 + TriggerState clearState = getOrCreateClearState(deviceId, key);
  361 + if (clearState != null && clearState.process(ctx, msg)) {
  362 + ctx.getAlarmService()
  363 + .clearAlarmForResult(
  364 + ctx.getTenantId(),
  365 + currentAlarms.get(deviceId).getId(),
  366 + null,
  367 + System.currentTimeMillis());
  368 + ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
  369 + alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
  370 + currentAlarms.remove(deviceId);
399 } 371 }
  372 + }
  373 +
  374 + private void alarmMsg(TbContext ctx, TbMsg msg, Alarm alarm, String releationType) {
  375 + String lastMsgQueueName = msg.getQueueName();
  376 + TbMsgMetaData metaData = msg.getMetaData();
  377 + String data = JacksonUtil.valueToTree(alarm).toString();
  378 + TbMsg newMsg =
  379 + ctx.newMsg(
  380 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  381 + "ALARM",
  382 + alarm.getOriginator(),
  383 + msg != null ? msg.getCustomerId() : null,
  384 + metaData,
  385 + data);
  386 + ctx.enqueueForTellNext(newMsg, releationType);
  387 + }
  388 +
  389 + private void reactMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action) {
  390 + // TODO: 场景联动关联消息通知
  391 + String lastMsgQueueName = msg.getQueueName();
  392 + TbMsgMetaData metaData = msg.getMetaData();
  393 + metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
  394 + TbMsg newMsg =
  395 + ctx.newMsg(
  396 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  397 + msg.getType(),
  398 + msg.getOriginator(),
  399 + msg != null ? msg.getCustomerId() : null,
  400 + metaData,
  401 + JacksonUtil.toString(action.getDoContext()));
  402 + ctx.tellNext(newMsg, "Message");
  403 + }
400 } 404 }