...
|
...
|
@@ -89,24 +89,27 @@ class ReactState { |
89
|
89
|
*
|
90
|
90
|
* @param ctx
|
91
|
91
|
* @param msg 设备推送的遥测数据
|
|
92
|
+ * @param prefixId 运算规则ID,例如:触发器、执行条件
|
92
|
93
|
* @param deviceId 遥测数据的来源设备的TB设备ID
|
93
|
94
|
* @throws ExecutionException
|
94
|
95
|
* @throws InterruptedException
|
95
|
96
|
*/
|
96
|
|
- public void process(TbContext ctx, TbMsg msg, String prefixId,String deviceId)
|
|
97
|
+ public void process(TbContext ctx, TbMsg msg, String prefixId, String deviceId)
|
97
|
98
|
throws ExecutionException, InterruptedException {
|
98
|
99
|
|
99
|
|
- StringBuilder detail = new StringBuilder();
|
|
100
|
+ /** 场景联动告警详情 */
|
|
101
|
+ ObjectNode detail = JacksonUtil.newObjectNode();
|
100
|
102
|
if (actions == null) {
|
101
|
103
|
ctx.tellSuccess(msg);
|
102
|
104
|
}
|
103
|
105
|
|
|
106
|
+ /** 1、单个触发器内只要有1个设备满足条件即为true 2、多个触发器只要有1个触发器满足条件即为true */
|
104
|
107
|
AtomicBoolean triggerMatched = new AtomicBoolean(true);
|
105
|
108
|
Optional.ofNullable(triggers)
|
106
|
109
|
.ifPresent(
|
107
|
|
- t -> {
|
|
110
|
+ all -> {
|
108
|
111
|
triggerMatched.set(false);
|
109
|
|
- t.forEach(
|
|
112
|
+ all.forEach(
|
110
|
113
|
trigger -> {
|
111
|
114
|
ScopeEnum entityType = trigger.getEntityType();
|
112
|
115
|
List<String> trifggerDevices = trigger.getEntityId();
|
...
|
...
|
@@ -119,24 +122,30 @@ class ReactState { |
119
|
122
|
triggerMatched.set(
|
120
|
123
|
trifggerDevices.stream()
|
121
|
124
|
.anyMatch(
|
122
|
|
- id -> {
|
|
125
|
+ devId -> {
|
123
|
126
|
TriggerState triggerState =
|
124
|
|
- getOrCreateTriggerState(trigger, tkProjectId, id);
|
|
127
|
+ getOrCreateTriggerState(trigger, tkProjectId, devId);
|
125
|
128
|
if (triggerState == null) {
|
126
|
129
|
return false;
|
127
|
130
|
}
|
128
|
131
|
try {
|
129
|
132
|
boolean fresh = false;
|
130
|
|
- if(trigger.getId().equals(prefixId)&&msg.getOriginator().getId().toString().equals(id)){
|
131
|
|
- fresh=true;
|
|
133
|
+ if (trigger.getId().equals(prefixId)
|
|
134
|
+ && msg.getOriginator().getId().toString().equals(devId)) {
|
|
135
|
+ fresh = true;
|
132
|
136
|
}
|
133
|
|
- boolean result = triggerState.process(ctx, msg,fresh);
|
134
|
|
- log.error(String.format("触发器【%s】刷新【%s】结果【%s】触发器设备【%s】数据设备【%s】数据内容【%s】",trigger.getId(),fresh,result,id,msg.getOriginator(),msg.getData()));
|
135
|
|
- if (result) {
|
136
|
|
- detail.append(
|
137
|
|
- triggerState.getAlarmDetails() == null
|
138
|
|
- ? ""
|
139
|
|
- : triggerState.getAlarmDetails());
|
|
137
|
+ ObjectNode result = triggerState.process(ctx, msg, fresh);
|
|
138
|
+ log.error(
|
|
139
|
+ String.format(
|
|
140
|
+ "触发器【%s】刷新【%s】结果【%s】触发器设备【%s】数据设备【%s】数据内容【%s】",
|
|
141
|
+ trigger.getId(),
|
|
142
|
+ fresh,
|
|
143
|
+ result,
|
|
144
|
+ devId,
|
|
145
|
+ msg.getOriginator(),
|
|
146
|
+ msg.getData()));
|
|
147
|
+ if (!result.isEmpty()) {
|
|
148
|
+ detail.set(FastIotConstants.Alarm.TRIGGER,result);
|
140
|
149
|
return true;
|
141
|
150
|
} else if (currentAlarms.containsKey(deviceId)) {
|
142
|
151
|
// 清除设备告警
|
...
|
...
|
@@ -155,51 +164,65 @@ class ReactState { |
155
|
164
|
});
|
156
|
165
|
});
|
157
|
166
|
|
158
|
|
- /** 执行条件的所有设备都满足才为true */
|
|
167
|
+ /** 1、单个执行条件内全部设备满足条件才为true 2、多个执行条件全部执行条件满足条件才为true */
|
159
|
168
|
AtomicBoolean conditionMatched = new AtomicBoolean(true);
|
160
|
169
|
Optional.ofNullable(conditions)
|
161
|
|
- .ifPresent(
|
162
|
|
- t -> {
|
163
|
|
- t.forEach(
|
164
|
|
- condition -> {
|
165
|
|
- ScopeEnum entityType = condition.getEntityType();
|
166
|
|
- List<String> conditionDevices = condition.getEntityId();
|
167
|
|
- String tkProjectId = condition.getDeviceProfileId();
|
168
|
|
- if (ScopeEnum.ALL.equals(entityType)) {
|
169
|
|
- conditionDevices =
|
170
|
|
- ytDeviceService.findTbDeviceIdsByDeviceProfileId(
|
171
|
|
- tkProjectId, condition.getTenantId());
|
172
|
|
- }
|
173
|
|
- conditionMatched.set(
|
174
|
|
- !conditionDevices.stream()
|
175
|
|
- .anyMatch(
|
176
|
|
- id -> {
|
177
|
|
- TriggerState conditionState =
|
178
|
|
- getOrCreateConditionState(condition, tkProjectId, id);
|
179
|
|
- try {
|
180
|
|
- boolean fresh = false;
|
181
|
|
- if(msg.getOriginator().getId().toString().equals(id)){
|
182
|
|
- fresh=true;
|
183
|
|
- }
|
184
|
|
- boolean result = conditionState.process(ctx, msg,fresh);
|
185
|
|
- log.warn(String.format("执行器【%s】刷新【%s】结果【%s】执行器设备【%s】数据设备【%s】数据内容【%s】",condition.getId(),fresh,result,id,msg.getOriginator(),msg.getData()));
|
186
|
|
- return !result;
|
187
|
|
- } catch (ExecutionException e) {
|
188
|
|
- throw new RuntimeException(e);
|
189
|
|
- } catch (InterruptedException e) {
|
190
|
|
- throw new RuntimeException(e);
|
191
|
|
- }
|
192
|
|
- }));
|
193
|
|
- });
|
194
|
|
- });
|
|
170
|
+ .ifPresent(
|
|
171
|
+ all -> {
|
|
172
|
+ conditionMatched.set(
|
|
173
|
+ all.stream()
|
|
174
|
+ .allMatch(
|
|
175
|
+ condition -> {
|
|
176
|
+ ScopeEnum entityType = condition.getEntityType();
|
|
177
|
+ List<String> conditionDevices = condition.getEntityId();
|
|
178
|
+ String tkProjectId = condition.getDeviceProfileId();
|
|
179
|
+ if (ScopeEnum.ALL.equals(entityType)) {
|
|
180
|
+ conditionDevices =
|
|
181
|
+ ytDeviceService.findTbDeviceIdsByDeviceProfileId(
|
|
182
|
+ tkProjectId, condition.getTenantId());
|
|
183
|
+ }
|
|
184
|
+
|
|
185
|
+ return conditionDevices.stream()
|
|
186
|
+ .allMatch(
|
|
187
|
+ devId -> {
|
|
188
|
+ TriggerState conditionState =
|
|
189
|
+ getOrCreateConditionState(condition, tkProjectId, devId);
|
|
190
|
+ try {
|
|
191
|
+ boolean fresh = false;
|
|
192
|
+ if (msg.getOriginator().getId().toString().equals(devId)) {
|
|
193
|
+ fresh = true;
|
|
194
|
+ }
|
|
195
|
+ ObjectNode result = conditionState.process(ctx, msg, fresh);
|
|
196
|
+ log.warn(
|
|
197
|
+ String.format(
|
|
198
|
+ "执行器【%s】刷新【%s】结果【%s】执行器设备【%s】数据设备【%s】数据内容【%s】",
|
|
199
|
+ condition.getId(),
|
|
200
|
+ fresh,
|
|
201
|
+ result,
|
|
202
|
+ devId,
|
|
203
|
+ msg.getOriginator(),
|
|
204
|
+ msg.getData()));
|
|
205
|
+ if (!result.isEmpty()) {
|
|
206
|
+ detail.set(FastIotConstants.Alarm.CONDITION,result);
|
|
207
|
+ return true;
|
|
208
|
+ }
|
|
209
|
+ } catch (ExecutionException e) {
|
|
210
|
+ throw new RuntimeException(e);
|
|
211
|
+ } catch (InterruptedException e) {
|
|
212
|
+ throw new RuntimeException(e);
|
|
213
|
+ }
|
|
214
|
+ return false;
|
|
215
|
+ });
|
|
216
|
+ }));
|
|
217
|
+ });
|
195
|
218
|
|
196
|
219
|
if (triggerMatched.get() && conditionMatched.get()) {
|
197
|
|
- log.error(String.format("设备【%s】的消息内容【%s】触发动作",deviceId,msg.getData()));
|
|
220
|
+ log.error(String.format("设备【%s】的消息内容【%s】触发动作", deviceId, msg.getData()));
|
198
|
221
|
for (TkDoActionEntity item : actions) {
|
199
|
222
|
if (ActionTypeEnum.MSG_NOTIFY.equals(item.getOutTarget())) {
|
200
|
|
- noticeMsg(ctx, msg, item, deviceId, detail.toString(), msg.getTs());
|
|
223
|
+ noticeMsg(ctx, msg, item, deviceId, detail, msg.getTs());
|
201
|
224
|
} else {
|
202
|
|
- pushMsg(ctx, msg, item, detail.toString());
|
|
225
|
+ pushMsg(ctx, msg, item);
|
203
|
226
|
}
|
204
|
227
|
}
|
205
|
228
|
} else {
|
...
|
...
|
@@ -226,7 +249,7 @@ class ReactState { |
226
|
249
|
|| (trigger.getEntityType().equals(ScopeEnum.ALL)
|
227
|
250
|
&& trigger.getDeviceProfileId().equals(profileId))) {
|
228
|
251
|
TriggerState state = createTriggerState(deviceId, trigger.getTriggerCondition());
|
229
|
|
- log.error(String.format("新建设备【%s】的触发器",deviceId));
|
|
252
|
+ log.error(String.format("新建设备【%s】的触发器", deviceId));
|
230
|
253
|
triggerState.put(cacheKey, state);
|
231
|
254
|
return state;
|
232
|
255
|
}
|
...
|
...
|
@@ -290,12 +313,12 @@ class ReactState { |
290
|
313
|
for (AlarmConditionFilter filter : rule.getCondition().getCondition()) {
|
291
|
314
|
filterKeys.add(filter.getKey());
|
292
|
315
|
}
|
293
|
|
- TriggerState state = new TriggerState(deviceId, rule, filterKeys, rule.getAlarmDetails(), null);
|
|
316
|
+ TriggerState state = new TriggerState(deviceId, rule, filterKeys, null);
|
294
|
317
|
|
295
|
318
|
return state;
|
296
|
319
|
}
|
297
|
320
|
|
298
|
|
- private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action, String detail) {
|
|
321
|
+ private void pushMsg(TbContext ctx, TbMsg msg, TkDoActionEntity action) {
|
299
|
322
|
switch (action.getOutTarget()) {
|
300
|
323
|
case DEVICE_OUT:
|
301
|
324
|
List<String> rpcDevices = action.getDeviceId();
|
...
|
...
|
@@ -354,7 +377,7 @@ class ReactState { |
354
|
377
|
TbMsg msg,
|
355
|
378
|
TkDoActionEntity action,
|
356
|
379
|
String deviceId,
|
357
|
|
- String detailStr,
|
|
380
|
+ ObjectNode detail,
|
358
|
381
|
long startTs) {
|
359
|
382
|
|
360
|
383
|
DeviceId entityId = new DeviceId(UUID.fromString(deviceId));
|
...
|
...
|
@@ -370,12 +393,7 @@ class ReactState { |
370
|
393
|
}
|
371
|
394
|
currentAlarm.setStartTs(startTs);
|
372
|
395
|
currentAlarm.setEndTs(currentAlarm.getStartTs());
|
373
|
|
- ObjectNode detailData = JacksonUtil.newObjectNode();
|
374
|
|
- if (StringUtils.isNotEmpty(detailStr)) {
|
375
|
|
- detailData.put("msg", detailStr);
|
376
|
|
- }
|
377
|
|
- detailData.put("data", JacksonUtil.toJsonNode(msg.getData()));
|
378
|
|
- currentAlarm.setDetails(detailData);
|
|
396
|
+ currentAlarm.setDetails(detail);
|
379
|
397
|
currentAlarm.setOriginator(entityId);
|
380
|
398
|
currentAlarm.setTenantId(ctx.getTenantId());
|
381
|
399
|
currentAlarm.setPropagate(false);
|
...
|
...
|
@@ -393,7 +411,7 @@ class ReactState { |
393
|
411
|
|
394
|
412
|
AlarmInfoDTO formData = new AlarmInfoDTO();
|
395
|
413
|
formData.setDeviceName(msg.getMetaData().getData().get("deviceName"));
|
396
|
|
- formData.setDetails(JacksonUtil.toString(detailData));
|
|
414
|
+ formData.setDetails(JacksonUtil.toString(detail));
|
397
|
415
|
formData.setType(currentAlarm.getType());
|
398
|
416
|
formData.setCreateTs(currentAlarm.getCreatedTime());
|
399
|
417
|
formData.setStartTs(currentAlarm.getStartTs());
|
...
|
...
|
@@ -408,16 +426,19 @@ class ReactState { |
408
|
426
|
private void clearAlarm(TbContext ctx, TbMsg msg, String deviceId, String key)
|
409
|
427
|
throws ExecutionException, InterruptedException {
|
410
|
428
|
TriggerState clearState = getOrCreateClearState(deviceId, key);
|
411
|
|
- if (clearState != null && clearState.process(ctx, msg,true)) {
|
412
|
|
- ctx.getAlarmService()
|
413
|
|
- .clearAlarmForResult(
|
414
|
|
- ctx.getTenantId(),
|
415
|
|
- currentAlarms.get(deviceId).getId(),
|
416
|
|
- null,
|
417
|
|
- System.currentTimeMillis());
|
418
|
|
- ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
|
419
|
|
- alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
|
420
|
|
- currentAlarms.remove(deviceId);
|
|
429
|
+ if(clearState !=null){
|
|
430
|
+ ObjectNode clearResult = clearState.process(ctx, msg, true);
|
|
431
|
+ if(!clearResult.isEmpty()){
|
|
432
|
+ ctx.getAlarmService()
|
|
433
|
+ .clearAlarmForResult(
|
|
434
|
+ ctx.getTenantId(),
|
|
435
|
+ currentAlarms.get(deviceId).getId(),
|
|
436
|
+ clearResult,
|
|
437
|
+ System.currentTimeMillis());
|
|
438
|
+ ytDeviceService.freshAlarmStatus(new DeviceId(UUID.fromString(deviceId)), 0);
|
|
439
|
+ alarmMsg(ctx, msg, currentAlarms.get(deviceId), "Alarm Cleared");
|
|
440
|
+ currentAlarms.remove(deviceId);
|
|
441
|
+ }
|
421
|
442
|
}
|
422
|
443
|
}
|
423
|
444
|
|
...
|
...
|
|