...
|
...
|
@@ -15,6 +15,7 @@ |
15
|
15
|
*/
|
16
|
16
|
package org.thingsboard.server.service.edge.rpc.processor;
|
17
|
17
|
|
|
18
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
18
|
19
|
import com.google.common.util.concurrent.FutureCallback;
|
19
|
20
|
import com.google.common.util.concurrent.Futures;
|
20
|
21
|
import com.google.common.util.concurrent.ListenableFuture;
|
...
|
...
|
@@ -114,59 +115,84 @@ public class AlarmEdgeProcessor extends BaseEdgeProcessor { |
114
|
115
|
}
|
115
|
116
|
}
|
116
|
117
|
|
117
|
|
- public DownlinkMsg processAlarmToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType) {
|
|
118
|
+ public DownlinkMsg processAlarmToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
|
|
119
|
+ AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
|
118
|
120
|
DownlinkMsg downlinkMsg = null;
|
119
|
|
- try {
|
120
|
|
- AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
|
121
|
|
- Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
|
122
|
|
- if (alarm != null) {
|
|
121
|
+ switch (action) {
|
|
122
|
+ case ADDED:
|
|
123
|
+ case UPDATED:
|
|
124
|
+ case ALARM_ACK:
|
|
125
|
+ case ALARM_CLEAR:
|
|
126
|
+ try {
|
|
127
|
+ Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
|
|
128
|
+ if (alarm != null) {
|
|
129
|
+ downlinkMsg = DownlinkMsg.newBuilder()
|
|
130
|
+ .setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
|
131
|
+ .addAlarmUpdateMsg(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
|
|
132
|
+ .build();
|
|
133
|
+ }
|
|
134
|
+ } catch (Exception e) {
|
|
135
|
+ log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e);
|
|
136
|
+ }
|
|
137
|
+ break;
|
|
138
|
+ case DELETED:
|
|
139
|
+ Alarm alarm = mapper.convertValue(edgeEvent.getBody(), Alarm.class);
|
|
140
|
+ AlarmUpdateMsg alarmUpdateMsg =
|
|
141
|
+ alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm);
|
123
|
142
|
downlinkMsg = DownlinkMsg.newBuilder()
|
124
|
143
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
125
|
|
- .addAlarmUpdateMsg(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
|
|
144
|
+ .addAlarmUpdateMsg(alarmUpdateMsg)
|
126
|
145
|
.build();
|
127
|
|
- }
|
128
|
|
- } catch (Exception e) {
|
129
|
|
- log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e);
|
|
146
|
+ break;
|
130
|
147
|
}
|
131
|
148
|
return downlinkMsg;
|
132
|
149
|
}
|
133
|
150
|
|
134
|
|
- public void processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
|
151
|
+ public void processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
|
152
|
+ EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
135
|
153
|
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
136
|
|
- ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
137
|
|
- Futures.addCallback(alarmFuture, new FutureCallback<Alarm>() {
|
138
|
|
- @Override
|
139
|
|
- public void onSuccess(@Nullable Alarm alarm) {
|
140
|
|
- if (alarm != null) {
|
141
|
|
- EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
|
142
|
|
- if (type != null) {
|
143
|
|
- PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
144
|
|
- PageData<EdgeId> pageData;
|
145
|
|
- do {
|
146
|
|
- pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink);
|
147
|
|
- if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
148
|
|
- for (EdgeId edgeId : pageData.getData()) {
|
149
|
|
- saveEdgeEvent(tenantId,
|
150
|
|
- edgeId,
|
151
|
|
- EdgeEventType.ALARM,
|
152
|
|
- EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
153
|
|
- alarmId,
|
154
|
|
- null);
|
155
|
|
- }
|
156
|
|
- if (pageData.hasNext()) {
|
157
|
|
- pageLink = pageLink.nextPageLink();
|
158
|
|
- }
|
|
154
|
+ switch (actionType) {
|
|
155
|
+ case DELETED:
|
|
156
|
+ EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
|
157
|
+ Alarm alarm = mapper.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
|
158
|
+ saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, mapper.valueToTree(alarm));
|
|
159
|
+ break;
|
|
160
|
+ default:
|
|
161
|
+ ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
|
162
|
+ Futures.addCallback(alarmFuture, new FutureCallback<Alarm>() {
|
|
163
|
+ @Override
|
|
164
|
+ public void onSuccess(@Nullable Alarm alarm) {
|
|
165
|
+ if (alarm != null) {
|
|
166
|
+ EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
|
|
167
|
+ if (type != null) {
|
|
168
|
+ PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
|
169
|
+ PageData<EdgeId> pageData;
|
|
170
|
+ do {
|
|
171
|
+ pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink);
|
|
172
|
+ if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
|
173
|
+ for (EdgeId edgeId : pageData.getData()) {
|
|
174
|
+ saveEdgeEvent(tenantId,
|
|
175
|
+ edgeId,
|
|
176
|
+ EdgeEventType.ALARM,
|
|
177
|
+ EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
|
178
|
+ alarmId,
|
|
179
|
+ null);
|
|
180
|
+ }
|
|
181
|
+ if (pageData.hasNext()) {
|
|
182
|
+ pageLink = pageLink.nextPageLink();
|
|
183
|
+ }
|
|
184
|
+ }
|
|
185
|
+ } while (pageData != null && pageData.hasNext());
|
159
|
186
|
}
|
160
|
|
- } while (pageData != null && pageData.hasNext());
|
|
187
|
+ }
|
161
|
188
|
}
|
162
|
|
- }
|
163
|
|
- }
|
164
|
189
|
|
165
|
|
- @Override
|
166
|
|
- public void onFailure(Throwable t) {
|
167
|
|
- log.warn("[{}] can't find alarm by id [{}] {}", tenantId.getId(), alarmId.getId(), t);
|
168
|
|
- }
|
169
|
|
- }, dbCallbackExecutorService);
|
|
190
|
+ @Override
|
|
191
|
+ public void onFailure(Throwable t) {
|
|
192
|
+ log.warn("[{}] can't find alarm by id [{}] {}", tenantId.getId(), alarmId.getId(), t);
|
|
193
|
+ }
|
|
194
|
+ }, dbCallbackExecutorService);
|
|
195
|
+ }
|
170
|
196
|
}
|
171
|
197
|
|
172
|
198
|
} |
...
|
...
|
|