Commit 2eb7949d57aa24db60554d169f7ec2a9532d433a
Committed by
Andrew Shvayka
1 parent
aec2d388
Improvements
Showing
10 changed files
with
70 additions
and
39 deletions
... | ... | @@ -76,6 +76,7 @@ import org.thingsboard.server.service.script.JsExecutorService; |
76 | 76 | import org.thingsboard.server.service.script.JsInvokeService; |
77 | 77 | import org.thingsboard.server.service.session.DeviceSessionCacheService; |
78 | 78 | import org.thingsboard.server.service.state.DeviceStateService; |
79 | +import org.thingsboard.server.service.telemetry.AlarmSubscriptionService; | |
79 | 80 | import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
80 | 81 | import org.thingsboard.server.service.transport.TbCoreToTransportService; |
81 | 82 | |
... | ... | @@ -169,10 +170,6 @@ public class ActorSystemContext { |
169 | 170 | |
170 | 171 | @Autowired |
171 | 172 | @Getter |
172 | - private AlarmService alarmService; | |
173 | - | |
174 | - @Autowired | |
175 | - @Getter | |
176 | 173 | private RelationService relationService; |
177 | 174 | |
178 | 175 | @Autowired |
... | ... | @@ -189,6 +186,10 @@ public class ActorSystemContext { |
189 | 186 | |
190 | 187 | @Autowired |
191 | 188 | @Getter |
189 | + private AlarmSubscriptionService alarmService; | |
190 | + | |
191 | + @Autowired | |
192 | + @Getter | |
192 | 193 | private JsInvokeService jsSandbox; |
193 | 194 | |
194 | 195 | @Autowired | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; |
22 | 22 | import org.springframework.data.redis.core.RedisTemplate; |
23 | 23 | import org.thingsboard.common.util.ListeningExecutor; |
24 | 24 | import org.thingsboard.rule.engine.api.MailService; |
25 | +import org.thingsboard.rule.engine.api.RuleEngineAlarmService; | |
25 | 26 | import org.thingsboard.rule.engine.api.RuleEngineRpcService; |
26 | 27 | import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; |
27 | 28 | import org.thingsboard.rule.engine.api.ScriptEngine; |
... | ... | @@ -351,7 +352,7 @@ class DefaultTbContext implements TbContext { |
351 | 352 | } |
352 | 353 | |
353 | 354 | @Override |
354 | - public AlarmService getAlarmService() { | |
355 | + public RuleEngineAlarmService getAlarmService() { | |
355 | 356 | return mainCtx.getAlarmService(); |
356 | 357 | } |
357 | 358 | ... | ... |
... | ... | @@ -266,16 +266,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
266 | 266 | AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(Collections.emptyList(), 1, 0, false), null); |
267 | 267 | wsService.sendWsMsg(ctx.getSessionId(), update); |
268 | 268 | } else { |
269 | - ctx.setLastFetchTs(System.currentTimeMillis()); | |
270 | - PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(ctx.getTenantId(), ctx.getCustomerId(), | |
271 | - ctx.getQuery().getPageLink(), ctx.getOrderedEntityIds()); | |
272 | - alarms = ctx.setAndMergeAlarmsData(alarms); | |
273 | - AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), alarms, null); | |
274 | - wsService.sendWsMsg(ctx.getSessionId(), update); | |
275 | - if (adq.getPageLink().getTimeWindow() > 0) { | |
276 | - //TODO: refresh list of entities periodically (similar to time-series subscription). | |
277 | - createAlarmSubscriptions(ctx); | |
278 | - } | |
269 | + ctx.fetchAlarmsAndCreateSubscriptions(); | |
279 | 270 | } |
280 | 271 | } |
281 | 272 | |
... | ... | @@ -321,7 +312,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
321 | 312 | |
322 | 313 | private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { |
323 | 314 | Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); |
324 | - TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId()); | |
315 | + TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, localSubscriptionService, alarmService, sessionRef, cmd.getCmdId()); | |
325 | 316 | ctx.setQuery(cmd.getQuery()); |
326 | 317 | sessionSubs.put(cmd.getCmdId(), ctx); |
327 | 318 | return ctx; |
... | ... | @@ -467,11 +458,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc |
467 | 458 | createTelemetrySubscriptions(ctx, keys, true); |
468 | 459 | } |
469 | 460 | |
470 | - private void createAlarmSubscriptions(TbAlarmDataSubCtx ctx) { | |
471 | - List<TbSubscription> subscriptions = ctx.createSubscriptions(); | |
472 | - subscriptions.forEach(localSubscriptionService::addSubscription); | |
473 | - } | |
474 | - | |
475 | 461 | private void createTelemetrySubscriptions(TbEntityDataSubCtx ctx, List<EntityKey> keys, boolean latest) { |
476 | 462 | List<TbSubscription> tbSubs = ctx.createSubscriptions(keys, latest); |
477 | 463 | tbSubs.forEach(sub -> localSubscriptionService.addSubscription(sub)); | ... | ... |
... | ... | @@ -27,8 +27,10 @@ import org.thingsboard.server.common.data.query.AlarmData; |
27 | 27 | import org.thingsboard.server.common.data.query.AlarmDataPageLink; |
28 | 28 | import org.thingsboard.server.common.data.query.AlarmDataQuery; |
29 | 29 | import org.thingsboard.server.common.data.query.EntityData; |
30 | +import org.thingsboard.server.dao.alarm.AlarmService; | |
30 | 31 | import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; |
31 | 32 | import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; |
33 | +import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; | |
32 | 34 | import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate; |
33 | 35 | import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; |
34 | 36 | |
... | ... | @@ -45,6 +47,8 @@ import java.util.stream.Collectors; |
45 | 47 | @Slf4j |
46 | 48 | public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { |
47 | 49 | |
50 | + private final TbLocalSubscriptionService localSubscriptionService; | |
51 | + private final AlarmService alarmService; | |
48 | 52 | @Getter |
49 | 53 | @Setter |
50 | 54 | private final LinkedHashMap<EntityId, EntityData> entitiesMap; |
... | ... | @@ -59,15 +63,32 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { |
59 | 63 | private boolean tooManyEntities; |
60 | 64 | |
61 | 65 | private Map<Integer, EntityId> subToEntityIdMap; |
62 | - @Setter | |
63 | - private long lastFetchTs; | |
64 | 66 | |
65 | - public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) { | |
67 | + public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, | |
68 | + TbLocalSubscriptionService localSubscriptionService, | |
69 | + AlarmService alarmService, | |
70 | + TelemetryWebSocketSessionRef sessionRef, int cmdId) { | |
66 | 71 | super(serviceId, wsService, sessionRef, cmdId); |
72 | + this.localSubscriptionService = localSubscriptionService; | |
73 | + this.alarmService = alarmService; | |
67 | 74 | this.entitiesMap = new LinkedHashMap<>(); |
68 | 75 | this.alarmsMap = new HashMap<>(); |
69 | 76 | } |
70 | 77 | |
78 | + public void fetchAlarmsAndCreateSubscriptions() { | |
79 | + PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), | |
80 | + query.getPageLink(), getOrderedEntityIds()); | |
81 | + alarms = setAndMergeAlarmsData(alarms); | |
82 | + AlarmDataUpdate update = new AlarmDataUpdate(cmdId, alarms, null); | |
83 | + wsService.sendWsMsg(getSessionId(), update); | |
84 | + if (query.getPageLink().getTimeWindow() > 0) { | |
85 | + clearSubscriptions(); | |
86 | + //TODO: refresh list of entities periodically (similar to time-series subscription). | |
87 | + List<TbSubscription> subscriptions = createSubscriptions(); | |
88 | + subscriptions.forEach(localSubscriptionService::addSubscription); | |
89 | + } | |
90 | + } | |
91 | + | |
71 | 92 | public void setEntitiesData(PageData<EntityData> entitiesData) { |
72 | 93 | entitiesMap.clear(); |
73 | 94 | tooManyEntities = entitiesData.hasNext(); |
... | ... | @@ -99,31 +120,42 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { |
99 | 120 | public List<TbSubscription> createSubscriptions() { |
100 | 121 | this.subToEntityIdMap = new HashMap<>(); |
101 | 122 | AlarmDataPageLink pageLink = query.getPageLink(); |
123 | + long startTs = System.currentTimeMillis() - pageLink.getTimeWindow(); | |
102 | 124 | List<TbSubscription> result = new ArrayList<>(); |
103 | 125 | for (EntityData entityData : entitiesMap.values()) { |
104 | 126 | int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); |
105 | 127 | subToEntityIdMap.put(subIdx, entityData.getEntityId()); |
106 | 128 | log.trace("[{}][{}][{}] Creating alarms subscription for [{}] with query: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), pageLink); |
107 | 129 | result.add(TbAlarmsSubscription.builder() |
130 | + .type(TbSubscriptionType.ALARMS) | |
108 | 131 | .serviceId(serviceId) |
109 | 132 | .sessionId(sessionRef.getSessionId()) |
110 | 133 | .subscriptionId(subIdx) |
111 | 134 | .tenantId(sessionRef.getSecurityCtx().getTenantId()) |
112 | 135 | .entityId(entityData.getEntityId()) |
113 | 136 | .updateConsumer(this::sendWsMsg) |
114 | - .ts(lastFetchTs) | |
137 | + .ts(startTs) | |
115 | 138 | .build()); |
116 | 139 | } |
117 | 140 | return result; |
118 | 141 | } |
119 | 142 | |
143 | + public void clearSubscriptions() { | |
144 | + if (subToEntityIdMap != null) { | |
145 | + for (Integer subId : subToEntityIdMap.keySet()) { | |
146 | + localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); | |
147 | + } | |
148 | + subToEntityIdMap.clear(); | |
149 | + } | |
150 | + } | |
151 | + | |
120 | 152 | private void sendWsMsg(String sessionId, AlarmSubscriptionUpdate subscriptionUpdate) { |
121 | 153 | Alarm alarm = subscriptionUpdate.getAlarm(); |
122 | 154 | AlarmId alarmId = alarm.getId(); |
123 | 155 | if (subscriptionUpdate.isAlarmDeleted()) { |
124 | 156 | Alarm deleted = alarmsMap.remove(alarmId); |
125 | 157 | if (deleted != null) { |
126 | - //TODO: invalidate current page; | |
158 | + fetchAlarmsAndCreateSubscriptions(); | |
127 | 159 | } |
128 | 160 | } else { |
129 | 161 | AlarmData current = alarmsMap.get(alarmId); |
... | ... | @@ -131,14 +163,14 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { |
131 | 163 | boolean matchesFilter = filter(alarm); |
132 | 164 | if (onCurrentPage) { |
133 | 165 | if (matchesFilter) { |
134 | - AlarmData updated = new AlarmData(alarm, current.getName(), current.getEntityId()); | |
166 | + AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); | |
135 | 167 | alarmsMap.put(alarmId, updated); |
136 | 168 | wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated))); |
137 | 169 | } else { |
138 | - //TODO: invalidate current page; | |
170 | + fetchAlarmsAndCreateSubscriptions(); | |
139 | 171 | } |
140 | 172 | } else if (matchesFilter && query.getPageLink().getPage() == 0) { |
141 | - //TODO: invalidate current page; | |
173 | + fetchAlarmsAndCreateSubscriptions(); | |
142 | 174 | } |
143 | 175 | } |
144 | 176 | } | ... | ... |
... | ... | @@ -166,7 +166,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService |
166 | 166 | TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); |
167 | 167 | if (currentPartitions.contains(tpi)) { |
168 | 168 | if (subscriptionManagerService.isPresent()) { |
169 | - subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm); | |
169 | + subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm, TbCallback.EMPTY); | |
170 | 170 | } else { |
171 | 171 | log.warn("Possible misconfiguration because subscriptionManagerService is null!"); |
172 | 172 | } |
... | ... | @@ -186,7 +186,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService |
186 | 186 | TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); |
187 | 187 | if (currentPartitions.contains(tpi)) { |
188 | 188 | if (subscriptionManagerService.isPresent()) { |
189 | - subscriptionManagerService.get().onAlarmDeleted(tenantId, entityId, alarm); | |
189 | + subscriptionManagerService.get().onAlarmDeleted(tenantId, entityId, alarm, TbCallback.EMPTY); | |
190 | 190 | } else { |
191 | 191 | log.warn("Possible misconfiguration because subscriptionManagerService is null!"); |
192 | 192 | } | ... | ... |
... | ... | @@ -19,6 +19,7 @@ import lombok.Data; |
19 | 19 | import org.thingsboard.server.common.data.id.EntityId; |
20 | 20 | import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
21 | 21 | import org.thingsboard.server.common.data.relation.EntityTypeFilter; |
22 | +import org.thingsboard.server.common.data.relation.RelationTypeGroup; | |
22 | 23 | |
23 | 24 | import java.util.List; |
24 | 25 | ... | ... |
... | ... | @@ -61,6 +61,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { |
61 | 61 | alarmFieldColumnMap.put("severity", ModelConstants.ALARM_SEVERITY_PROPERTY); |
62 | 62 | alarmFieldColumnMap.put("originator_id", ModelConstants.ALARM_ORIGINATOR_ID_PROPERTY); |
63 | 63 | alarmFieldColumnMap.put("originator_type", ModelConstants.ALARM_ORIGINATOR_TYPE_PROPERTY); |
64 | + alarmFieldColumnMap.put("originator", "originator_name"); | |
64 | 65 | } |
65 | 66 | |
66 | 67 | public static final String SELECT_ORIGINATOR_NAME = " CASE" + |
... | ... | @@ -124,7 +125,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { |
124 | 125 | EntityDataSortOrder sortOrder = pageLink.getSortOrder(); |
125 | 126 | if (sortOrder != null && sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) { |
126 | 127 | String sortOrderKey = sortOrder.getKey().getKey(); |
127 | - sortPart.append("a.").append(alarmFieldColumnMap.getOrDefault(sortOrderKey, sortOrderKey)) | |
128 | + sortPart.append(alarmFieldColumnMap.getOrDefault(sortOrderKey, sortOrderKey)) | |
128 | 129 | .append(" ").append(sortOrder.getDirection().name()); |
129 | 130 | ctx.addUuidListParameter("entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); |
130 | 131 | if (pageLink.isSearchPropagatedAlarms()) { | ... | ... |
... | ... | @@ -447,14 +447,21 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { |
447 | 447 | if (!single) { |
448 | 448 | whereFilter.append(" ("); |
449 | 449 | } |
450 | - whereFilter.append(" re.relation_type = :where_relation_type").append(entityTypeFilterIdx).append(" and re.") | |
451 | - .append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") | |
452 | - .append("_type in (:where_entity_types").append(entityTypeFilterIdx).append(")"); | |
450 | + List<String> whereEntityTypes = etf.getEntityTypes().stream().map(EntityType::name).collect(Collectors.toList()); | |
451 | + whereFilter | |
452 | + .append(" re.relation_type = :where_relation_type").append(entityTypeFilterIdx); | |
453 | + if (!whereEntityTypes.isEmpty()) { | |
454 | + whereFilter.append(" and re.") | |
455 | + .append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") | |
456 | + .append("_type in (:where_entity_types").append(entityTypeFilterIdx).append(")"); | |
457 | + } | |
453 | 458 | if (!single) { |
454 | 459 | whereFilter.append(" )"); |
455 | 460 | } |
456 | 461 | ctx.addStringParameter("where_relation_type" + entityTypeFilterIdx, relationType); |
457 | - ctx.addStringListParameter("where_entity_types" + entityTypeFilterIdx, etf.getEntityTypes().stream().map(EntityType::name).collect(Collectors.toList())); | |
462 | + if (!whereEntityTypes.isEmpty()) { | |
463 | + ctx.addStringListParameter("where_entity_types" + entityTypeFilterIdx, whereEntityTypes); | |
464 | + } | |
458 | 465 | entityTypeFilterIdx++; |
459 | 466 | } |
460 | 467 | } else { | ... | ... |
... | ... | @@ -30,6 +30,7 @@ import org.mockito.Mock; |
30 | 30 | import org.mockito.runners.MockitoJUnitRunner; |
31 | 31 | import org.mockito.stubbing.Answer; |
32 | 32 | import org.thingsboard.common.util.ListeningExecutor; |
33 | +import org.thingsboard.rule.engine.api.RuleEngineAlarmService; | |
33 | 34 | import org.thingsboard.rule.engine.api.ScriptEngine; |
34 | 35 | import org.thingsboard.rule.engine.api.TbContext; |
35 | 36 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
... | ... | @@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId; |
43 | 44 | import org.thingsboard.server.common.msg.TbMsg; |
44 | 45 | import org.thingsboard.server.common.msg.TbMsgDataType; |
45 | 46 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
47 | +import org.thingsboard.server.dao.alarm.AlarmOperationResult; | |
46 | 48 | import org.thingsboard.server.dao.alarm.AlarmService; |
47 | 49 | |
48 | 50 | import javax.script.ScriptException; |
... | ... | @@ -79,7 +81,7 @@ public class TbAlarmNodeTest { |
79 | 81 | @Mock |
80 | 82 | private TbContext ctx; |
81 | 83 | @Mock |
82 | - private AlarmService alarmService; | |
84 | + private RuleEngineAlarmService alarmService; | |
83 | 85 | |
84 | 86 | @Mock |
85 | 87 | private ScriptEngine detailsJs; |
... | ... | @@ -289,7 +291,8 @@ public class TbAlarmNodeTest { |
289 | 291 | |
290 | 292 | when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null)); |
291 | 293 | when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); |
292 | - when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())).thenReturn(Futures.immediateFuture(true)); | |
294 | + when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())) | |
295 | + .thenReturn(Futures.immediateFuture( false)); | |
293 | 296 | when(alarmService.findAlarmByIdAsync(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()))).thenReturn(Futures.immediateFuture(activeAlarm)); |
294 | 297 | // doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm); |
295 | 298 | ... | ... |