Showing
1 changed file
with
84 additions
and
62 deletions
@@ -33,121 +33,116 @@ import java.util.concurrent.ExecutionException; | @@ -33,121 +33,116 @@ import java.util.concurrent.ExecutionException; | ||
33 | @Slf4j | 33 | @Slf4j |
34 | class ReactState { | 34 | class ReactState { |
35 | 35 | ||
36 | - /**场景联动主键*/ | 36 | + /** |
37 | + * 场景联动主键 | ||
38 | + */ | ||
37 | String reactId; | 39 | String reactId; |
38 | 40 | ||
39 | - /**场景联动的触发器状态 | 41 | + /** |
42 | + * 场景联动的触发器状态 | ||
40 | * 键:设备主键 | 43 | * 键:设备主键 |
41 | * 值:设备指标参与的触发器 | 44 | * 值:设备指标参与的触发器 |
42 | */ | 45 | */ |
43 | - private ConcurrentHashMap<String, TriggerState > triggerState = new ConcurrentHashMap<>(); | 46 | + private ConcurrentHashMap<String, TriggerState> triggerState = new ConcurrentHashMap<>(); |
44 | 47 | ||
45 | private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>(); | 48 | private ConcurrentHashMap<String, TriggerState> clearState = new ConcurrentHashMap<>(); |
46 | 49 | ||
47 | - /**场景联动的执行条件状态 | 50 | + /** |
51 | + * 场景联动的执行条件状态 | ||
48 | * 键:设备主键 | 52 | * 键:设备主键 |
49 | * 值:设备指标参与的触发器 | 53 | * 值:设备指标参与的触发器 |
50 | */ | 54 | */ |
51 | private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>(); | 55 | private ConcurrentHashMap<String, TriggerState> conditionState = new ConcurrentHashMap<>(); |
52 | 56 | ||
53 | 57 | ||
54 | - | ||
55 | - | ||
56 | - | ||
57 | private final List<TriggerDTO> triggers; | 58 | private final List<TriggerDTO> triggers; |
58 | - /**【场景联动的执行条件】懒加载*/ | 59 | + /** |
60 | + * 【场景联动的执行条件】懒加载 | ||
61 | + */ | ||
59 | private final List<DoCondition> conditions; | 62 | private final List<DoCondition> conditions; |
60 | - /**【场景联动的执行集合】懒加载*/ | 63 | + /** |
64 | + * 【场景联动的执行集合】懒加载 | ||
65 | + */ | ||
61 | private final List<DoAction> actions; | 66 | private final List<DoAction> actions; |
62 | 67 | ||
63 | 68 | ||
64 | - | ||
65 | - | ||
66 | - | ||
67 | private RuleNodeState state; | 69 | private RuleNodeState state; |
68 | 70 | ||
69 | 71 | ||
70 | - ReactState(String reactId,TbContext ctx, TbSceneReactNodeConfig config) { | 72 | + ReactState(String reactId, TbContext ctx, TbSceneReactNodeConfig config) { |
71 | this.reactId = reactId; | 73 | this.reactId = reactId; |
72 | TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); | 74 | TriggerService triggerService = SpringBeanUtils.getBean(TriggerService.class); |
73 | this.triggers = triggerService.getTrigger(reactId); | 75 | this.triggers = triggerService.getTrigger(reactId); |
74 | DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class); | 76 | DoConditionService conditionService = SpringBeanUtils.getBean(DoConditionService.class); |
75 | this.conditions = conditionService.getConditions(reactId); | 77 | this.conditions = conditionService.getConditions(reactId); |
76 | DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class); | 78 | DoActionService actionService = SpringBeanUtils.getBean(DoActionService.class); |
77 | - this.actions =actionService.getActions(reactId); | 79 | + this.actions = actionService.getActions(reactId); |
78 | 80 | ||
79 | } | 81 | } |
80 | 82 | ||
81 | 83 | ||
82 | - | ||
83 | - public void process(TbContext ctx, TbMsg msg,String deviceId) throws ExecutionException, InterruptedException { | ||
84 | - | ||
85 | - | 84 | + public void process(TbContext ctx, TbMsg msg, String deviceId) throws ExecutionException, InterruptedException { |
86 | 85 | ||
87 | 86 | ||
88 | - if( actions == null){ | 87 | + if (actions == null) { |
89 | ctx.tellSuccess(msg); | 88 | ctx.tellSuccess(msg); |
90 | } | 89 | } |
91 | 90 | ||
92 | boolean matched; | 91 | boolean matched; |
93 | - if(triggers == null || triggers.isEmpty()){ | 92 | + if (triggers == null || triggers.isEmpty()) { |
94 | matched = true; | 93 | matched = true; |
95 | - }else{ | 94 | + } else { |
96 | matched = false; | 95 | matched = false; |
97 | - for(TriggerDTO trigger: triggers){ | ||
98 | - TriggerState triggerState = getOrCreateTriggerState(trigger,deviceId); | ||
99 | - matched = triggerState.process(ctx,msg); | ||
100 | - if(matched){ | 96 | + for (TriggerDTO trigger : triggers) { |
97 | + TriggerState triggerState = getOrCreateTriggerState(trigger, deviceId); | ||
98 | + matched = triggerState.process(ctx, msg); | ||
99 | + if (matched) { | ||
101 | break; | 100 | break; |
102 | } | 101 | } |
103 | } | 102 | } |
104 | } | 103 | } |
105 | 104 | ||
106 | 105 | ||
107 | - if(matched && conditions.size() >0 ){ | 106 | + if (matched && conditions.size() > 0) { |
108 | matched = false; | 107 | matched = false; |
109 | - for(DoCondition item:conditions){ | 108 | + for (DoCondition item : conditions) { |
110 | List<String> entityIds = item.getEntityId(); | 109 | List<String> entityIds = item.getEntityId(); |
111 | - if(entityIds == null || entityIds.isEmpty()){ | 110 | + if (entityIds == null || entityIds.isEmpty()) { |
112 | matched = true; | 111 | matched = true; |
113 | break; | 112 | break; |
114 | } | 113 | } |
115 | - for(String id:entityIds){ | ||
116 | - TriggerState conditionState = getOrCreateConditionState(item.getId(),id,item.getTriggerCondition()); | ||
117 | - if( conditionState == null | ||
118 | - || conditionState.process(ctx,msg)){ | 114 | + for (String id : entityIds) { |
115 | + TriggerState conditionState = getOrCreateConditionState(item.getId(), id, item.getTriggerCondition()); | ||
116 | + if (conditionState == null | ||
117 | + || conditionState.process(ctx, msg)) { | ||
119 | matched = true; | 118 | matched = true; |
120 | break; | 119 | break; |
121 | } | 120 | } |
122 | } | 121 | } |
123 | - if(matched){ | 122 | + if (matched) { |
124 | break; | 123 | break; |
125 | } | 124 | } |
126 | } | 125 | } |
127 | } | 126 | } |
128 | 127 | ||
129 | - if(matched){ | ||
130 | - for(DoAction item: actions){ | ||
131 | - pushMsg(ctx, msg, item); | 128 | + if (matched) { |
129 | + for (DoAction item : actions) { | ||
130 | + pushMsg(ctx, msg, item); | ||
132 | } | 131 | } |
133 | - }else{ | 132 | + } else { |
134 | ctx.tellSuccess(msg); | 133 | ctx.tellSuccess(msg); |
135 | } | 134 | } |
136 | 135 | ||
137 | } | 136 | } |
138 | 137 | ||
139 | 138 | ||
140 | - | ||
141 | - | ||
142 | - | ||
143 | - | ||
144 | - protected TriggerState getOrCreateTriggerState(TriggerDTO trigger,String deviceId) { | 139 | + protected TriggerState getOrCreateTriggerState(TriggerDTO trigger, String deviceId) { |
145 | String triggerId = trigger.getId(); | 140 | String triggerId = trigger.getId(); |
146 | - String cacheKey =triggerId+deviceId; | ||
147 | - if(triggerState.containsKey(cacheKey)){ | 141 | + String cacheKey = triggerId + deviceId; |
142 | + if (triggerState.containsKey(cacheKey)) { | ||
148 | return triggerState.get(cacheKey); | 143 | return triggerState.get(cacheKey); |
149 | } | 144 | } |
150 | - if(trigger.getEntityId().contains(deviceId)){ | 145 | + if (trigger.getEntityId().contains(deviceId)) { |
151 | TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition()); | 146 | TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition()); |
152 | triggerState.put(cacheKey, state); | 147 | triggerState.put(cacheKey, state); |
153 | return state; | 148 | return state; |
@@ -157,13 +152,13 @@ class ReactState { | @@ -157,13 +152,13 @@ class ReactState { | ||
157 | } | 152 | } |
158 | 153 | ||
159 | protected TriggerState getOrCreateConditionState(String conditionId, String deviceId, AlarmRule condition) { | 154 | protected TriggerState getOrCreateConditionState(String conditionId, String deviceId, AlarmRule condition) { |
160 | - String cacheKey =conditionId+deviceId; | ||
161 | - if(conditionState.containsKey(cacheKey)){ | 155 | + String cacheKey = conditionId + deviceId; |
156 | + if (conditionState.containsKey(cacheKey)) { | ||
162 | return conditionState.get(cacheKey); | 157 | return conditionState.get(cacheKey); |
163 | - }else{ | 158 | + } else { |
164 | TriggerState state = createTriggerState(deviceId, condition); | 159 | TriggerState state = createTriggerState(deviceId, condition); |
165 | conditionState.put(cacheKey, state); | 160 | conditionState.put(cacheKey, state); |
166 | - return state; | 161 | + return state; |
167 | } | 162 | } |
168 | 163 | ||
169 | } | 164 | } |
@@ -171,12 +166,12 @@ class ReactState { | @@ -171,12 +166,12 @@ class ReactState { | ||
171 | @NotNull | 166 | @NotNull |
172 | private TriggerState createTriggerState(String deviceId, AlarmRule rule) { | 167 | private TriggerState createTriggerState(String deviceId, AlarmRule rule) { |
173 | Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); | 168 | Set<AlarmConditionFilterKey> filterKeys = new HashSet<>(); |
174 | - for(AlarmConditionFilter filter :rule.getCondition().getCondition()){ | 169 | + for (AlarmConditionFilter filter : rule.getCondition().getCondition()) { |
175 | filterKeys.add(filter.getKey()); | 170 | filterKeys.add(filter.getKey()); |
176 | } | 171 | } |
177 | - TriggerState state = new TriggerState(deviceId,rule, filterKeys,null); | 172 | + TriggerState state = new TriggerState(deviceId, rule, filterKeys, null); |
178 | 173 | ||
179 | - return state; | 174 | + return state; |
180 | } | 175 | } |
181 | 176 | ||
182 | private void pushMsg(TbContext ctx, TbMsg msg, DoAction action) { | 177 | private void pushMsg(TbContext ctx, TbMsg msg, DoAction action) { |
@@ -184,38 +179,65 @@ class ReactState { | @@ -184,38 +179,65 @@ class ReactState { | ||
184 | new TbMsgMetaData(); | 179 | new TbMsgMetaData(); |
185 | String relationType = ""; | 180 | String relationType = ""; |
186 | TbMsg newMsg = null; | 181 | TbMsg newMsg = null; |
187 | - switch(action.getOutTarget()){ | 182 | + switch (action.getOutTarget()) { |
188 | case DEVICE_OUT: | 183 | case DEVICE_OUT: |
189 | - relationType = "RPC Request"; | ||
190 | - newMsg = rpcMsg(ctx,msg,action.getDoContext()); | 184 | + rpcMsg(ctx, msg, action.getDeviceId(), action.getDoContext()); |
191 | break; | 185 | break; |
192 | case SCENE_ACT: | 186 | case SCENE_ACT: |
193 | - //TODO: 场景联动关联场景联动 | ||
194 | - relationType = "Alarm Updated"; | 187 | + //TODO: 场景联动关联消息通知 |
195 | break; | 188 | break; |
196 | case MSG_NOTIFY: | 189 | case MSG_NOTIFY: |
197 | - relationType = "Message"; | 190 | + noticeMsg(ctx, msg, action.getDoContext()); |
198 | break; | 191 | break; |
199 | default: | 192 | default: |
200 | ctx.tellSuccess(msg); | 193 | ctx.tellSuccess(msg); |
201 | break; | 194 | break; |
202 | } | 195 | } |
203 | 196 | ||
204 | - if(newMsg != null){ | 197 | + if (newMsg != null) { |
205 | ctx.tellNext(newMsg, relationType); | 198 | ctx.tellNext(newMsg, relationType); |
206 | } | 199 | } |
207 | } | 200 | } |
208 | 201 | ||
209 | - private TbMsg rpcMsg(TbContext ctx, TbMsg msg, JsonNode context){ | 202 | + /** |
203 | + * 设备输出 | ||
204 | + * | ||
205 | + * @param ctx | ||
206 | + * @param msg | ||
207 | + * @param devices | ||
208 | + * @param context | ||
209 | + */ | ||
210 | + private void rpcMsg(TbContext ctx, TbMsg msg, List<String> devices, JsonNode context) { | ||
211 | + String lastMsgQueueName = msg.getQueueName(); | ||
212 | + TbMsgMetaData metaData = msg.getMetaData(); | ||
213 | + for (String id : devices) { | ||
214 | + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN | ||
215 | + , msg.getType() | ||
216 | + , msg.getOriginator() | ||
217 | + , msg != null ? msg.getCustomerId() : null | ||
218 | + , metaData | ||
219 | + , JacksonUtil.toString(context)); | ||
220 | + ctx.tellNext(newMsg, "RPC Request"); | ||
221 | + } | ||
222 | + } | ||
223 | + | ||
224 | + /** | ||
225 | + * 告警通知 | ||
226 | + * | ||
227 | + * @param ctx | ||
228 | + * @param msg | ||
229 | + * @param context | ||
230 | + */ | ||
231 | + private void noticeMsg(TbContext ctx, TbMsg msg, JsonNode context) { | ||
210 | String lastMsgQueueName = msg.getQueueName(); | 232 | String lastMsgQueueName = msg.getQueueName(); |
211 | TbMsgMetaData metaData = msg.getMetaData(); | 233 | TbMsgMetaData metaData = msg.getMetaData(); |
212 | - metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); | 234 | + metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); |
213 | TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN | 235 | TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN |
214 | , msg.getType() | 236 | , msg.getType() |
215 | , msg.getOriginator() | 237 | , msg.getOriginator() |
216 | , msg != null ? msg.getCustomerId() : null | 238 | , msg != null ? msg.getCustomerId() : null |
217 | , metaData | 239 | , metaData |
218 | , JacksonUtil.toString(context)); | 240 | , JacksonUtil.toString(context)); |
219 | - return newMsg; | 241 | + ctx.tellNext(newMsg, "Message"); |
220 | } | 242 | } |
221 | } | 243 | } |