Commit 76f949fe52f95d21283de2c884d97c3ad9c368c1

Authored by xp.Huang
2 parents a4869b2b 8c51655d

Merge branch '20230705' into 'master_dev'

fix: 场景联动命令下发同步异步问题修复

See merge request yunteng/thingskit!204
... ... @@ -171,5 +171,7 @@ public interface FastIotConstants {
171 171 public static String TARGET_ID = "target";
172 172 /**实控设备名称*/
173 173 public static String TARGET_NAME = "deviceName";
  174 + /**RPC单项双向*/
  175 + public static String ONEWAY = "oneway";
174 176 }
175 177 }
... ...
1   -/**
2   - * 设备场景联动状态
3   - */
  1 +/** 设备场景联动状态 */
4 2 package org.thingsboard.rule.engine.yunteng.scene;
5 3
6 4 import com.fasterxml.jackson.databind.JsonNode;
7 5 import com.fasterxml.jackson.databind.node.ObjectNode;
  6 +import java.util.HashSet;
  7 +import java.util.List;
  8 +import java.util.Set;
  9 +import java.util.UUID;
  10 +import java.util.concurrent.ConcurrentHashMap;
  11 +import java.util.concurrent.ExecutionException;
8 12 import lombok.extern.slf4j.Slf4j;
9 13 import org.apache.commons.lang3.StringUtils;
10 14 import org.jetbrains.annotations.NotNull;
11 15 import org.thingsboard.rule.engine.api.TbContext;
12 16 import org.thingsboard.server.common.data.DataConstants;
13   -import org.thingsboard.server.common.data.EntityType;
14 17 import org.thingsboard.server.common.data.alarm.Alarm;
15 18 import org.thingsboard.server.common.data.alarm.AlarmStatus;
16 19 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter;
17 20 import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey;
18 21 import org.thingsboard.server.common.data.device.profile.AlarmRule;
19 22 import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
20 24 import org.thingsboard.server.common.data.yunteng.dto.ActionAlarmDTO;
21 25 import org.thingsboard.server.common.data.yunteng.dto.AlarmInfoDTO;
22 26 import org.thingsboard.server.common.data.yunteng.dto.TriggerDTO;
23   -import org.thingsboard.server.common.data.yunteng.enums.ActionTypeEnum;
24   -import org.thingsboard.server.common.data.yunteng.enums.ScopeEnum;
25   -import org.thingsboard.server.common.data.yunteng.enums.TkAlarmSeverity;
26   -import org.thingsboard.server.common.data.yunteng.enums.TriggerTypeEnum;
  27 +import org.thingsboard.server.common.data.yunteng.enums.*;
27 28 import org.thingsboard.server.common.data.yunteng.utils.JacksonUtil;
28 29 import org.thingsboard.server.common.data.yunteng.utils.SpringBeanUtils;
29 30 import org.thingsboard.server.common.msg.TbMsg;
... ... @@ -33,368 +34,371 @@ import org.thingsboard.server.dao.yunteng.entities.TkDoActionEntity;
33 34 import org.thingsboard.server.dao.yunteng.entities.TkDoConditionEntity;
34 35 import org.thingsboard.server.dao.yunteng.service.*;
35 36
36   -import java.util.HashSet;
37   -import java.util.List;
38   -import java.util.Set;
39   -import java.util.UUID;
40   -import java.util.concurrent.ConcurrentHashMap;
41   -import java.util.concurrent.ExecutionException;
42   -
43 37 @Slf4j
44 38 class ReactState {
45 39
46   - /**
47   - * 场景联动主键
48   - */
49   - String reactId;
50   - String reactName;
51   - String orgId;
52   -
53   - /**
54   - * 场景联动的触发器状态
55   - * 键:触发器ID和设备ID为键
56   - * 值:设备指标参与的触发器
57   - */
58   - private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>();
59   -
60   - /**
61   - * 场景联动的告警清除触发器状态
62   - * 键:设备ID为键
63   - * 值:设备指标参与的触发器
64   - */
65   - private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>();
66   - /**
67   - * 设备告警信息
68   - * 键:设备ID为键
69   - * 值:设备最新告警信息
70   - */
71   - private ConcurrentHashMap<String, Alarm> currentAlarms = new ConcurrentHashMap<>();
72   -
73   - /**
74   - * 场景联动的执行条件状态
75   - * 键:触发器ID和设备ID为键
76   - * 值:设备指标参与的触发器
77   - */
78   - private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>();
79   -
  40 + /** 场景联动主键 */
  41 + String reactId;
  42 +
  43 + String reactName;
  44 + String orgId;
  45 +
  46 + /** 场景联动的触发器状态 键:触发器ID和设备ID为键 值:设备指标参与的触发器 */
  47 + private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>();
  48 +
  49 + /** 场景联动的告警清除触发器状态 键:设备ID为键 值:设备指标参与的触发器 */
  50 + private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>();
  51 + /** 设备告警信息 键:设备ID为键 值:设备最新告警信息 */
  52 + private ConcurrentHashMap<String, Alarm> currentAlarms = new ConcurrentHashMap<>();
  53 +
  54 + /** 场景联动的执行条件状态 键:触发器ID和设备ID为键 值:设备指标参与的触发器 */
  55 + private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>();
  56 +
  57 + private final List<TriggerDTO> triggers;
  58 + private TkDoActionEntity alarmAction = null;
  59 + /** 【场景联动的执行条件】懒加载 */
  60 + private final List<TkDoConditionEntity> conditions;
  61 + /** 【场景联动的执行集合】懒加载 */
  62 + private final List<TkDoActionEntity> actions;
  63 +
  64 + private final TkNoticeService noticeService;
  65 + private TkDeviceService ytDeviceService;
  66 +
  67 + ReactState(String reactId, TbContext ctx, TbSceneReactNodeConfig config) {
  68 + this.reactId = reactId;
  69 + this.reactName = config.getNames().get(reactId);
  70 + this.orgId = config.getOrgs().get(reactId);
  71 + TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class);
  72 + this.triggers = triggerService.getTrigger(reactId);
  73 + DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class);
  74 + this.conditions = conditionService.getConditions(reactId);
  75 + DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class);
  76 + this.actions = actionService.getActionsByAll(reactId);
  77 + this.actions.addAll(actionService.getActionsByPart(reactId));
  78 + for (TkDoActionEntity action : actions) {
  79 + /** 动作中只有1个告警通知 */
  80 + if (ActionTypeEnum.MSG_NOTIFY.equals(action.getOutTarget())) {
  81 + this.alarmAction = action;
  82 + break;
  83 + }
  84 + }
  85 + this.noticeService = SpringBeanUtils.getBean(TkNoticeService.class);
  86 + this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
  87 + }
80 88
81   - private final List<TriggerDTO> triggers;
82   - private TkDoActionEntity alarmAction = null;
83   - /**
84   - * 【场景联动的执行条件】懒加载
85   - */
86   - private final List<TkDoConditionEntity> conditions;
87   - /**
88   - * 【场景联动的执行集合】懒加载
89   - */
90   - private final List<TkDoActionEntity> actions;
91   - private final TkNoticeService noticeService;
92   - private TkDeviceService ytDeviceService;
  89 + public void process(TbContext ctx, TbMsg msg, String profileId, String deviceId)
  90 + throws ExecutionException, InterruptedException {
93 91
  92 + StringBuilder detail = new StringBuilder();
  93 + if (actions == null) {
  94 + ctx.tellSuccess(msg);
  95 + }
94 96
95   - ReactState(String reactId, TbContext ctx, TbSceneReactNodeConfig config) {
96   - this.reactId = reactId;
97   - this.reactName = config.getNames().get(reactId);
98   - this.orgId = config.getOrgs().get(reactId);
99   - TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class);
100   - this.triggers = triggerService.getTrigger(reactId);
101   - DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class);
102   - this.conditions = conditionService.getConditions(reactId);
103   - DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class);
104   - this.actions = actionService.getActionsByAll(reactId);
105   - this.actions.addAll(actionService.getActionsByPart(reactId));
106   - for (TkDoActionEntity action : actions) {
107   - /**动作中只有1个告警通知*/
108   - if (ActionTypeEnum.MSG_NOTIFY.equals(action.getOutTarget())) {
109   - this.alarmAction = action;
110   - break;
111   - }
  97 + boolean matched = false;
  98 + if (triggers == null || triggers.isEmpty()) {
  99 + matched = true;
  100 + } else {
  101 + boolean itemMatched = false;
  102 + for (TriggerDTO trigger : triggers) {
  103 + TriggerState triggerState = getOrCreateTriggerState(trigger, profileId, deviceId);
  104 + if (triggerState == null) {
  105 + continue;
  106 + }
  107 + itemMatched = triggerState.process(ctx, msg);
  108 + if (itemMatched) {
  109 + matched = true;
  110 + detail.append(
  111 + triggerState.getAlarmDetails() == null ? "" : triggerState.getAlarmDetails());
  112 + if (this.alarmAction != null) {
  113 + noticeMsg(
  114 + ctx,
  115 + msg,
  116 + alarmAction,
  117 + deviceId,
  118 + triggerState.getAlarmDetails(),
  119 + triggerState.getLatestValues().getTs());
  120 + }
  121 + } else if (currentAlarms.containsKey(deviceId) && this.alarmAction != null) {
  122 + // 清除设备告警
  123 + for (AlarmConditionFilterKey entityKey : triggerState.getEntityKeys()) {
  124 + clearAlarm(ctx, msg, deviceId, entityKey.getKey());
  125 + }
112 126 }
113   - this.noticeService = SpringBeanUtils.getBean(TkNoticeService.class);
114   - this.ytDeviceService = SpringBeanUtils.getBean(TkDeviceService.class);
  127 + }
115 128 }
116 129
117   -
118   - public void process(TbContext ctx, TbMsg msg,String profileId, String deviceId) throws ExecutionException, InterruptedException {
119   -
120   -
121   - StringBuilder detail = new StringBuilder();
122   - if (actions == null) {
123   - ctx.tellSuccess(msg);
  130 + if (matched && conditions.size() > 0) {
  131 + matched = false;
  132 + for (TkDoConditionEntity item : conditions) {
  133 + List<String> entityIds = item.getEntityId();
  134 + if (entityIds == null || entityIds.isEmpty()) {
  135 + matched = true;
  136 + break;
124 137 }
125   -
126   - boolean matched = false;
127   - if (triggers == null || triggers.isEmpty()) {
  138 + for (String id : entityIds) {
  139 + TriggerState conditionState = getOrCreateConditionState(item, profileId, id);
  140 + if (conditionState == null || conditionState.process(ctx, msg)) {
  141 + detail.append(";");
  142 + detail.append(conditionState.getAlarmDetails());
128 143 matched = true;
129   - } else {
130   - boolean itemMatched = false;
131   - for (TriggerDTO trigger : triggers) {
132   - TriggerState triggerState = getOrCreateTriggerState(trigger,profileId, deviceId);
133   - if (triggerState == null) {
134   - continue;
135   - }
136   - itemMatched = triggerState.process(ctx, msg);
137   - if (itemMatched) {
138   - matched = true;
139   - detail.append(triggerState.getAlarmDetails()==null?"":triggerState.getAlarmDetails());
140   - if (this.alarmAction != null) {
141   - noticeMsg(ctx, msg, alarmAction, deviceId, triggerState.getAlarmDetails(), triggerState.getLatestValues().getTs());
142   - }
143   - } else if (currentAlarms.containsKey(deviceId) && this.alarmAction != null) {
144   - //清除设备告警
145   - for (AlarmConditionFilterKey entityKey : triggerState.getEntityKeys()) {
146   - clearAlarm(ctx, msg, deviceId, entityKey.getKey());
147   - }
148   - }
149   - }
  144 + break;
  145 + }
150 146 }
151   -
152   -
153   - if (matched && conditions.size() > 0) {
154   - matched = false;
155   - for (TkDoConditionEntity item : conditions) {
156   - List<String> entityIds = item.getEntityId();
157   - if (entityIds == null || entityIds.isEmpty()) {
158   - matched = true;
159   - break;
160   - }
161   - for (String id : entityIds) {
162   - TriggerState conditionState = getOrCreateConditionState(item, profileId,id);
163   - if (conditionState == null
164   - || conditionState.process(ctx, msg)) {
165   - detail.append(";");
166   - detail.append(conditionState.getAlarmDetails());
167   - matched = true;
168   - break;
169   - }
170   - }
171   - if (matched) {
172   - break;
173   - }
174   - }
175   - }
176   -
177 147 if (matched) {
178   - for (TkDoActionEntity item : actions) {
179   - if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {
180   - continue;
181   - }
182   - pushMsg(ctx, msg, item, detail.toString());
183   - }
184   - } else {
185   - ctx.tellSuccess(msg);
  148 + break;
186 149 }
187   -
  150 + }
188 151 }
189 152
190   -
191   - protected TriggerState getOrCreateTriggerState(TriggerDTO trigger, String profileId, String deviceId) {
192   - String triggerId = trigger.getId();
193   - String cacheKey = triggerId + deviceId;
194   - if (triggerState.containsKey(cacheKey)) {
195   - return triggerState.get(cacheKey);
196   - }
197   - if ((trigger.getEntityType().equals(ScopeEnum.PART) && trigger.getEntityId().contains(deviceId))
198   - ||(trigger.getEntityType().equals(ScopeEnum.ALL) && trigger.getDeviceProfileId().equals(profileId)) ) {
199   - TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition());
200   - triggerState.put(cacheKey, state);
201   - return state;
  153 + if (matched) {
  154 + for (TkDoActionEntity item : actions) {
  155 + if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {
  156 + continue;
202 157 }
203   - return null;
204   -
  158 + pushMsg(ctx, msg, item, detail.toString());
  159 + }
  160 + } else {
  161 + ctx.tellSuccess(msg);
205 162 }
206   -
207   - protected TriggerState getOrCreateClearState(String deviceId, String key) {
208   - String cacheKey = deviceId + key;
209   - if (triggerState.containsKey(cacheKey)) {
210   - return triggerState.get(cacheKey);
211   - }
212   - TriggerState state = null;
213   - ActionAlarmDTO alarm = JacksonUtil.convertValue(alarmAction.getDoContext(), ActionAlarmDTO.class);
214   - if (alarm != null && alarm.getClearRule() != null) {
215   - for (TriggerDTO rule : alarm.getClearRule()) {
216   - if ((ScopeEnum.PART.equals(rule.getEntityType()) && !rule.getEntityId().contains(deviceId))
217   -// || !alarmAction.getDeviceId().contains(deviceId)
218   - ) {
219   - continue;
220   - }
221   -
222   - for (AlarmConditionFilter filter : rule.getTriggerCondition().getCondition().getCondition()) {
223   - String tempKey = filter.getKey().getKey();
224   - if (key.equals(tempKey)) {
225   - state = createTriggerState(deviceId, rule.getTriggerCondition());
226   - triggerState.put(cacheKey, state);
227   - break;
228   - }
229   - }
230   - if (state != null) {
231   - break;
232   - }
233   - }
234   - return state;
235   - }
236   - return null;
237   -
  163 + }
  164 +
  165 + protected TriggerState getOrCreateTriggerState(
  166 + TriggerDTO trigger, String profileId, String deviceId) {
  167 + String triggerId = trigger.getId();
  168 + String cacheKey = triggerId + deviceId;
  169 + if (triggerState.containsKey(cacheKey)) {
  170 + return triggerState.get(cacheKey);
238 171 }
  172 + if ((trigger.getEntityType().equals(ScopeEnum.PART) && trigger.getEntityId().contains(deviceId))
  173 + || (trigger.getEntityType().equals(ScopeEnum.ALL)
  174 + && trigger.getDeviceProfileId().equals(profileId))) {
  175 + TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition());
  176 + triggerState.put(cacheKey, state);
  177 + return state;
  178 + }
  179 + return null;
  180 + }
239 181
240   - protected TriggerState getOrCreateConditionState(TkDoConditionEntity condition, String profileId,String deviceId) {
241   - String cacheKey = condition.getId() + deviceId;
242   - if (conditionState.containsKey(cacheKey)) {
243   - return conditionState.get(cacheKey);
  182 + protected TriggerState getOrCreateClearState(String deviceId, String key) {
  183 + String cacheKey = deviceId + key;
  184 + if (triggerState.containsKey(cacheKey)) {
  185 + return triggerState.get(cacheKey);
  186 + }
  187 + TriggerState state = null;
  188 + ActionAlarmDTO alarm =
  189 + JacksonUtil.convertValue(alarmAction.getDoContext(), ActionAlarmDTO.class);
  190 + if (alarm != null && alarm.getClearRule() != null) {
  191 + for (TriggerDTO rule : alarm.getClearRule()) {
  192 + if ((ScopeEnum.PART.equals(rule.getEntityType()) && !rule.getEntityId().contains(deviceId))
  193 + // || !alarmAction.getDeviceId().contains(deviceId)
  194 + ) {
  195 + continue;
244 196 }
245   - if ((condition.getEntityType().equals(ScopeEnum.PART) && condition.getEntityId().contains(deviceId) )
246   - ||(condition.getEntityType().equals(ScopeEnum.ALL) && condition.getDeviceProfileId().equals(profileId)) ) {
247   - TriggerState state = createTriggerState(deviceId, condition.getTriggerCondition());
  197 +
  198 + for (AlarmConditionFilter filter :
  199 + rule.getTriggerCondition().getCondition().getCondition()) {
  200 + String tempKey = filter.getKey().getKey();
  201 + if (key.equals(tempKey)) {
  202 + state = createTriggerState(deviceId, rule.getTriggerCondition());
248 203 triggerState.put(cacheKey, state);
249   - return state;
  204 + break;
  205 + }
250 206 }
251   - return null;
252   - }
253   -
254   - @NotNull
255   - private TriggerState createTriggerState(String deviceId, AlarmRule rule) {
256   - Set<AlarmConditionFilterKey> filterKeys = new HashSet<>();
257   - for (AlarmConditionFilter filter : rule.getCondition().getCondition()) {
258   - filterKeys.add(filter.getKey());
  207 + if (state != null) {
  208 + break;
259 209 }
260   - TriggerState state = new TriggerState(deviceId, rule, filterKeys, rule.getAlarmDetails(), null);
261   -
262   - return state;
  210 + }
  211 + return state;
263 212 }
264   -
265   - private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {
266   - TbMsgMetaData metaData = //lastMsgMetaData != null ? lastMsgMetaData.copy() :
267   - new TbMsgMetaData();
268   - String relationType = "";
269   - TbMsg newMsg = null;
270   - switch (action.getOutTarget()) {
271   - case DEVICE_OUT:
272   - List<String> rpcDevices = action.getDeviceId();
273   - if(ScopeEnum.ALL.equals(action.getEntityType())){
274   - rpcDevices = ytDeviceService.rpcDevices(action.getTenantId(),orgId,action.getDeviceProfileId());
275   - }
276   - rpcMsg(ctx, msg,rpcDevices , action.getDoContext());
277   - break;
278   - case SCENE_ACT:
279   - reactMsg(ctx, msg, action);
280   - break;
281   - default:
282   - ctx.tellSuccess(msg);
283   - break;
284   - }
  213 + return null;
  214 + }
  215 +
  216 + protected TriggerState getOrCreateConditionState(
  217 + TkDoConditionEntity condition, String profileId, String deviceId) {
  218 + String cacheKey = condition.getId() + deviceId;
  219 + if (conditionState.containsKey(cacheKey)) {
  220 + return conditionState.get(cacheKey);
285 221 }
286   -
287   - /**
288   - * 设备输出
289   - *
290   - * @param ctx
291   - * @param msg
292   - * @param devices
293   - * @param context
294   - */
295   - private void rpcMsg(TbContext ctx, TbMsg msg, List<String> devices, JsonNode context) {
296   - String lastMsgQueueName = msg.getQueueName();
297   - TbMsgMetaData metaData = msg.getMetaData();
298   - metaData.putValue(DataConstants.PERSISTENT,"true");
299   - for (String id : devices) {
300   - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN
301   - , msg.getType()
302   - , new DeviceId(UUID.fromString(id))
303   - , msg != null ? msg.getCustomerId() : null
304   - , metaData
305   - , JacksonUtil.toString(context));
306   - ctx.tellNext(newMsg, "RPC Request");
307   - }
  222 + if ((condition.getEntityType().equals(ScopeEnum.PART)
  223 + && condition.getEntityId().contains(deviceId))
  224 + || (condition.getEntityType().equals(ScopeEnum.ALL)
  225 + && condition.getDeviceProfileId().equals(profileId))) {
  226 + TriggerState state = createTriggerState(deviceId, condition.getTriggerCondition());
  227 + triggerState.put(cacheKey, state);
  228 + return state;
308 229 }
309   -
310   - /**
311   - * 告警通知
312   - *
313   - * @param ctx
314   - * @param msg
315   - * @param action
316   - */
317   - private void noticeMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String deviceId, String detailStr, long startTs) {
318   -
319   - DeviceId entityId = new DeviceId(UUID.fromString(deviceId));
320   - ActionAlarmDTO actionAlarm = JacksonUtil.convertValue(action.getDoContext(), ActionAlarmDTO.class);
321   -
322   -
323   - Alarm currentAlarm = new Alarm();
324   - currentAlarm.setType(reactName);
325   - currentAlarm.setStatus(AlarmStatus.ACTIVE_UNACK);
326   - currentAlarm.setSeverity(actionAlarm.getAlarmLevel());
327   - if (startTs == 0L) {
328   - startTs = System.currentTimeMillis();
329   - }
330   - currentAlarm.setStartTs(startTs);
331   - currentAlarm.setEndTs(currentAlarm.getStartTs());
332   - ObjectNode detailData = JacksonUtil.newObjectNode();
333   - if(StringUtils.isNotEmpty(detailStr)){
334   - detailData.put("msg", detailStr);
335   - }
336   - detailData.put("data", JacksonUtil.toJsonNode(msg.getData()));
337   - currentAlarm.setDetails(detailData);
338   - currentAlarm.setOriginator(entityId);
339   - currentAlarm.setTenantId(ctx.getTenantId());
340   - currentAlarm.setPropagate(false);
341   -
342   - currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
343   - Alarm oldAlarm = currentAlarms.get(deviceId);
344   - if(oldAlarm != null
345   - && currentAlarm.getId().equals(oldAlarm.getId())
346   - && currentAlarm.getSeverity().equals(oldAlarm.getSeverity())){
347   - return;
348   - }
349   - ytDeviceService.freshAlarmStatus(entityId, 1);
350   - currentAlarms.put(deviceId, currentAlarm);
351   - alarmMsg(ctx, msg, currentAlarm, "Alarm Cleared");
352   -
353   - AlarmInfoDTO formData = new AlarmInfoDTO();
354   - formData.setDeviceName(msg.getMetaData().getData().get("deviceName"));
355   - formData.setDetails(JacksonUtil.toString(detailData));
356   - formData.setType(currentAlarm.getType());
357   - formData.setCreateTs(currentAlarm.getCreatedTime());
358   - formData.setStartTs(currentAlarm.getStartTs());
359   - formData.setEndTs(currentAlarm.getEndTs());
360   - formData.setStatus(currentAlarm.getStatus().name());
361   - formData.setDeviceId(deviceId);
362   - formData.setTenantId(currentAlarm.getTenantId().getId().toString());
363   - formData.setSeverity(TkAlarmSeverity.getLabel(currentAlarm.getSeverity().name()));
364   - noticeService.alert(action.getAlarmProfileId(), formData);
  230 + return null;
  231 + }
  232 +
  233 + @NotNull
  234 + private TriggerState createTriggerState(String deviceId, AlarmRule rule) {
  235 + Set<AlarmConditionFilterKey> filterKeys = new HashSet<>();
  236 + for (AlarmConditionFilter filter : rule.getCondition().getCondition()) {
  237 + filterKeys.add(filter.getKey());
365 238 }
366   -
367   - private void clearAlarm(TbContext ctx, TbMsg msg, String deviceId, String key) throws ExecutionException, InterruptedException {
368   - TriggerState clearState = getOrCreateClearState(deviceId, key);
369   - if (clearState != null && clearState.process(ctx, msg)) {
370   - ctx.getAlarmService().clearAlarmForResult(ctx.getTenantId(), currentAlarms.get(deviceId).getId(), null, System.currentTimeMillis());
371   - ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
372   - alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
373   - currentAlarms.remove(deviceId);
  239 + TriggerState state = new TriggerState(deviceId, rule, filterKeys, rule.getAlarmDetails(), null);
  240 +
  241 + return state;
  242 + }
  243 +
  244 + private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {
  245 + TbMsgMetaData metaData = // lastMsgMetaData != null ? lastMsgMetaData.copy() :
  246 + new TbMsgMetaData();
  247 + String relationType = "";
  248 + TbMsg newMsg = null;
  249 + switch (action.getOutTarget()) {
  250 + case DEVICE_OUT:
  251 + List<String> rpcDevices = action.getDeviceId();
  252 + if (ScopeEnum.ALL.equals(action.getEntityType())) {
  253 + rpcDevices =
  254 + ytDeviceService.rpcDevices(action.getTenantId(), orgId, action.getDeviceProfileId());
374 255 }
  256 + rpcMsg(ctx, msg, rpcDevices, action.getDoContext(), action.getCallType());
  257 + break;
  258 + case SCENE_ACT:
  259 + reactMsg(ctx, msg, action);
  260 + break;
  261 + default:
  262 + ctx.tellSuccess(msg);
  263 + break;
375 264 }
376   -
377   - private void alarmMsg(TbContext ctx, TbMsg msg, Alarm alarm, String releationType) {
378   - String lastMsgQueueName = msg.getQueueName();
379   - TbMsgMetaData metaData = msg.getMetaData();
380   - String data = JacksonUtil.valueToTree(alarm).toString();
381   - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM",
382   - alarm.getOriginator(), msg != null ? msg.getCustomerId() : null, metaData, data);
383   - ctx.enqueueForTellNext(newMsg, releationType);
  265 + }
  266 +
  267 + /**
  268 + * 设备输出
  269 + *
  270 + * @param ctx
  271 + * @param msg
  272 + * @param devices
  273 + * @param context
  274 + */
  275 + private void rpcMsg(
  276 + TbContext ctx, TbMsg msg, List<String> devices, JsonNode context, CallTypeEnum callType) {
  277 + String lastMsgQueueName = msg.getQueueName();
  278 + TbMsgMetaData metaData = msg.getMetaData();
  279 + metaData.putValue(DataConstants.PERSISTENT, "true");
  280 + metaData.putValue(
  281 + FastIotConstants.Rpc.ONEWAY, (callType == CallTypeEnum.SYNC ? true : false) + "");
  282 + for (String id : devices) {
  283 + TbMsg newMsg =
  284 + ctx.newMsg(
  285 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  286 + msg.getType(),
  287 + new DeviceId(UUID.fromString(id)),
  288 + msg != null ? msg.getCustomerId() : null,
  289 + metaData,
  290 + JacksonUtil.toString(context));
  291 + ctx.tellNext(newMsg, "RPC Request");
384 292 }
385   -
386   -
387   - private void reactMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action) {
388   - //TODO: 场景联动关联消息通知
389   - String lastMsgQueueName = msg.getQueueName();
390   - TbMsgMetaData metaData = msg.getMetaData();
391   - metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
392   - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN
393   - , msg.getType()
394   - , msg.getOriginator()
395   - , msg != null ? msg.getCustomerId() : null
396   - , metaData
397   - , JacksonUtil.toString(action.getDoContext()));
398   - ctx.tellNext(newMsg, "Message");
  293 + }
  294 +
  295 + /**
  296 + * 告警通知
  297 + *
  298 + * @param ctx
  299 + * @param msg
  300 + * @param action
  301 + */
  302 + private void noticeMsg(
  303 + TbContext ctx,
  304 + TbMsg msg,
  305 + TkDoActionEntity action,
  306 + String deviceId,
  307 + String detailStr,
  308 + long startTs) {
  309 +
  310 + DeviceId entityId = new DeviceId(UUID.fromString(deviceId));
  311 + ActionAlarmDTO actionAlarm =
  312 + JacksonUtil.convertValue(action.getDoContext(), ActionAlarmDTO.class);
  313 +
  314 + Alarm currentAlarm = new Alarm();
  315 + currentAlarm.setType(reactName);
  316 + currentAlarm.setStatus(AlarmStatus.ACTIVE_UNACK);
  317 + currentAlarm.setSeverity(actionAlarm.getAlarmLevel());
  318 + if (startTs == 0L) {
  319 + startTs = System.currentTimeMillis();
  320 + }
  321 + currentAlarm.setStartTs(startTs);
  322 + currentAlarm.setEndTs(currentAlarm.getStartTs());
  323 + ObjectNode detailData = JacksonUtil.newObjectNode();
  324 + if (StringUtils.isNotEmpty(detailStr)) {
  325 + detailData.put("msg", detailStr);
  326 + }
  327 + detailData.put("data", JacksonUtil.toJsonNode(msg.getData()));
  328 + currentAlarm.setDetails(detailData);
  329 + currentAlarm.setOriginator(entityId);
  330 + currentAlarm.setTenantId(ctx.getTenantId());
  331 + currentAlarm.setPropagate(false);
  332 +
  333 + currentAlarm = ctx.getAlarmService().createOrUpdateAlarm(currentAlarm);
  334 + Alarm oldAlarm = currentAlarms.get(deviceId);
  335 + if (oldAlarm != null
  336 + && currentAlarm.getId().equals(oldAlarm.getId())
  337 + && currentAlarm.getSeverity().equals(oldAlarm.getSeverity())) {
  338 + return;
  339 + }
  340 + ytDeviceService.freshAlarmStatus(entityId, 1);
  341 + currentAlarms.put(deviceId, currentAlarm);
  342 + alarmMsg(ctx, msg, currentAlarm, "Alarm Cleared");
  343 +
  344 + AlarmInfoDTO formData = new AlarmInfoDTO();
  345 + formData.setDeviceName(msg.getMetaData().getData().get("deviceName"));
  346 + formData.setDetails(JacksonUtil.toString(detailData));
  347 + formData.setType(currentAlarm.getType());
  348 + formData.setCreateTs(currentAlarm.getCreatedTime());
  349 + formData.setStartTs(currentAlarm.getStartTs());
  350 + formData.setEndTs(currentAlarm.getEndTs());
  351 + formData.setStatus(currentAlarm.getStatus().name());
  352 + formData.setDeviceId(deviceId);
  353 + formData.setTenantId(currentAlarm.getTenantId().getId().toString());
  354 + formData.setSeverity(TkAlarmSeverity.getLabel(currentAlarm.getSeverity().name()));
  355 + noticeService.alert(action.getAlarmProfileId(), formData);
  356 + }
  357 +
  358 + private void clearAlarm(TbContext ctx, TbMsg msg, String deviceId, String key)
  359 + throws ExecutionException, InterruptedException {
  360 + TriggerState clearState = getOrCreateClearState(deviceId, key);
  361 + if (clearState != null && clearState.process(ctx, msg)) {
  362 + ctx.getAlarmService()
  363 + .clearAlarmForResult(
  364 + ctx.getTenantId(),
  365 + currentAlarms.get(deviceId).getId(),
  366 + null,
  367 + System.currentTimeMillis());
  368 + ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
  369 + alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
  370 + currentAlarms.remove(deviceId);
399 371 }
  372 + }
  373 +
  374 + private void alarmMsg(TbContext ctx, TbMsg msg, Alarm alarm, String releationType) {
  375 + String lastMsgQueueName = msg.getQueueName();
  376 + TbMsgMetaData metaData = msg.getMetaData();
  377 + String data = JacksonUtil.valueToTree(alarm).toString();
  378 + TbMsg newMsg =
  379 + ctx.newMsg(
  380 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  381 + "ALARM",
  382 + alarm.getOriginator(),
  383 + msg != null ? msg.getCustomerId() : null,
  384 + metaData,
  385 + data);
  386 + ctx.enqueueForTellNext(newMsg, releationType);
  387 + }
  388 +
  389 + private void reactMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action) {
  390 + // TODO: 场景联动关联消息通知
  391 + String lastMsgQueueName = msg.getQueueName();
  392 + TbMsgMetaData metaData = msg.getMetaData();
  393 + metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
  394 + TbMsg newMsg =
  395 + ctx.newMsg(
  396 + lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN,
  397 + msg.getType(),
  398 + msg.getOriginator(),
  399 + msg != null ? msg.getCustomerId() : null,
  400 + metaData,
  401 + JacksonUtil.toString(action.getDoContext()));
  402 + ctx.tellNext(newMsg, "Message");
  403 + }
400 404 }
... ...