Commit aec2d388ce860117ec9e67999d957d4d2d15cd5f

Authored by Andrii Shvaika
Committed by Andrew Shvayka
1 parent 65fa1a7a

Alarm Websockets API

Showing 27 changed files with 451 additions and 183 deletions
@@ -103,6 +103,7 @@ import org.thingsboard.server.service.security.permission.AccessControlService; @@ -103,6 +103,7 @@ import org.thingsboard.server.service.security.permission.AccessControlService;
103 import org.thingsboard.server.service.security.permission.Operation; 103 import org.thingsboard.server.service.security.permission.Operation;
104 import org.thingsboard.server.service.security.permission.Resource; 104 import org.thingsboard.server.service.security.permission.Resource;
105 import org.thingsboard.server.service.state.DeviceStateService; 105 import org.thingsboard.server.service.state.DeviceStateService;
  106 +import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
106 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; 107 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
107 108
108 import javax.mail.MessagingException; 109 import javax.mail.MessagingException;
@@ -145,7 +146,7 @@ public abstract class BaseController { @@ -145,7 +146,7 @@ public abstract class BaseController {
145 protected AssetService assetService; 146 protected AssetService assetService;
146 147
147 @Autowired 148 @Autowired
148 - protected AlarmService alarmService; 149 + protected AlarmSubscriptionService alarmService;
149 150
150 @Autowired 151 @Autowired
151 protected DeviceCredentialsService deviceCredentialsService; 152 protected DeviceCredentialsService deviceCredentialsService;
@@ -23,6 +23,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -23,6 +23,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
23 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; 23 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
24 import org.thingsboard.server.common.data.DataConstants; 24 import org.thingsboard.server.common.data.DataConstants;
25 import org.thingsboard.server.common.data.EntityType; 25 import org.thingsboard.server.common.data.EntityType;
  26 +import org.thingsboard.server.common.data.alarm.Alarm;
26 import org.thingsboard.server.common.data.id.DeviceId; 27 import org.thingsboard.server.common.data.id.DeviceId;
27 import org.thingsboard.server.common.data.id.EntityId; 28 import org.thingsboard.server.common.data.id.EntityId;
28 import org.thingsboard.server.common.data.id.TenantId; 29 import org.thingsboard.server.common.data.id.TenantId;
@@ -52,7 +53,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @@ -52,7 +53,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
52 import org.thingsboard.server.service.queue.TbClusterService; 53 import org.thingsboard.server.service.queue.TbClusterService;
53 import org.thingsboard.server.service.state.DefaultDeviceStateService; 54 import org.thingsboard.server.service.state.DefaultDeviceStateService;
54 import org.thingsboard.server.service.state.DeviceStateService; 55 import org.thingsboard.server.service.state.DeviceStateService;
55 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 56 +import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
  57 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
56 58
57 import javax.annotation.PostConstruct; 59 import javax.annotation.PostConstruct;
58 import javax.annotation.PreDestroy; 60 import javax.annotation.PreDestroy;
@@ -195,7 +197,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -195,7 +197,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
195 197
196 @Override 198 @Override
197 public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback) { 199 public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback) {
198 - onLocalSubUpdate(entityId, 200 + onLocalTelemetrySubUpdate(entityId,
199 s -> { 201 s -> {
200 if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { 202 if (TbSubscriptionType.TIMESERIES.equals(s.getType())) {
201 return (TbTimeseriesSubscription) s; 203 return (TbTimeseriesSubscription) s;
@@ -219,7 +221,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -219,7 +221,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
219 221
220 @Override 222 @Override
221 public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) { 223 public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) {
222 - onLocalSubUpdate(entityId, 224 + onLocalTelemetrySubUpdate(entityId,
223 s -> { 225 s -> {
224 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { 226 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
225 return (TbAttributeSubscription) s; 227 return (TbAttributeSubscription) s;
@@ -257,8 +259,42 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -257,8 +259,42 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
257 } 259 }
258 260
259 @Override 261 @Override
  262 + public void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback) {
  263 + onLocalAlarmSubUpdate(entityId,
  264 + s -> {
  265 + if (TbSubscriptionType.ALARMS.equals(s.getType())) {
  266 + return (TbAlarmsSubscription) s;
  267 + } else {
  268 + return null;
  269 + }
  270 + },
  271 + s -> alarm.getCreatedTime() >= s.getTs(),
  272 + s -> alarm,
  273 + false
  274 + );
  275 + callback.onSuccess();
  276 + }
  277 +
  278 + @Override
  279 + public void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback) {
  280 + onLocalAlarmSubUpdate(entityId,
  281 + s -> {
  282 + if (TbSubscriptionType.ALARMS.equals(s.getType())) {
  283 + return (TbAlarmsSubscription) s;
  284 + } else {
  285 + return null;
  286 + }
  287 + },
  288 + s -> alarm.getCreatedTime() >= s.getTs(),
  289 + s -> alarm,
  290 + true
  291 + );
  292 + callback.onSuccess();
  293 + }
  294 +
  295 + @Override
260 public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) { 296 public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
261 - onLocalSubUpdate(entityId, 297 + onLocalTelemetrySubUpdate(entityId,
262 s -> { 298 s -> {
263 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { 299 if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
264 return (TbAttributeSubscription) s; 300 return (TbAttributeSubscription) s;
@@ -282,17 +318,17 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -282,17 +318,17 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
282 callback.onSuccess(); 318 callback.onSuccess();
283 } 319 }
284 320
285 - private <T extends TbSubscription> void onLocalSubUpdate(EntityId entityId,  
286 - Function<TbSubscription, T> castFunction,  
287 - Predicate<T> filterFunction,  
288 - Function<T, List<TsKvEntry>> processFunction) { 321 + private <T extends TbSubscription> void onLocalTelemetrySubUpdate(EntityId entityId,
  322 + Function<TbSubscription, T> castFunction,
  323 + Predicate<T> filterFunction,
  324 + Function<T, List<TsKvEntry>> processFunction) {
289 Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId); 325 Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
290 if (entitySubscriptions != null) { 326 if (entitySubscriptions != null) {
291 entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { 327 entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
292 List<TsKvEntry> subscriptionUpdate = processFunction.apply(s); 328 List<TsKvEntry> subscriptionUpdate = processFunction.apply(s);
293 if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) { 329 if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) {
294 if (serviceId.equals(s.getServiceId())) { 330 if (serviceId.equals(s.getServiceId())) {
295 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate); 331 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
296 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); 332 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
297 } else { 333 } else {
298 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); 334 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
@@ -305,6 +341,29 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -305,6 +341,29 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
305 } 341 }
306 } 342 }
307 343
  344 + private void onLocalAlarmSubUpdate(EntityId entityId,
  345 + Function<TbSubscription, TbAlarmsSubscription> castFunction,
  346 + Predicate<TbAlarmsSubscription> filterFunction,
  347 + Function<TbAlarmsSubscription, Alarm> processFunction, boolean deleted) {
  348 + Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
  349 + if (entitySubscriptions != null) {
  350 + entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
  351 + Alarm alarm = processFunction.apply(s);
  352 + if (alarm != null) {
  353 + if (serviceId.equals(s.getServiceId())) {
  354 + AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate(s.getSubscriptionId(), alarm, deleted);
  355 + localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
  356 + } else {
  357 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
  358 + toCoreNotificationsProducer.send(tpi, toProto(s, alarm), null);
  359 + }
  360 + }
  361 + });
  362 + } else {
  363 + log.debug("[{}] No device subscriptions to process!", entityId);
  364 + }
  365 + }
  366 +
308 private boolean isInTimeRange(TbTimeseriesSubscription subscription, long kvTime) { 367 private boolean isInTimeRange(TbTimeseriesSubscription subscription, long kvTime) {
309 return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime) 368 return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime)
310 && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime); 369 && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime);
@@ -412,4 +471,19 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -412,4 +471,19 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
412 return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); 471 return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
413 } 472 }
414 473
  474 + private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, Alarm alarm) {
  475 + TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder();
  476 +
  477 + builder.setSessionId(subscription.getSessionId());
  478 + builder.setSubscriptionId(subscription.getSubscriptionId());
  479 +
  480 + //TODO 3.1
  481 + throw new RuntimeException("Not implemented!");
  482 +//
  483 +// ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
  484 +// LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build())
  485 +// .build();
  486 +// return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
  487 + }
  488 +
415 } 489 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -36,7 +36,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -36,7 +36,8 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
36 import org.thingsboard.server.common.msg.queue.TbCallback; 36 import org.thingsboard.server.common.msg.queue.TbCallback;
37 import org.thingsboard.server.queue.util.TbCoreComponent; 37 import org.thingsboard.server.queue.util.TbCoreComponent;
38 import org.thingsboard.server.service.queue.TbClusterService; 38 import org.thingsboard.server.service.queue.TbClusterService;
39 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 39 +import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
  40 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
40 41
41 import javax.annotation.PostConstruct; 42 import javax.annotation.PostConstruct;
42 import javax.annotation.PreDestroy; 43 import javax.annotation.PreDestroy;
@@ -137,7 +138,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -137,7 +138,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
137 } 138 }
138 139
139 @Override 140 @Override
140 - public void onSubscriptionUpdate(String sessionId, TsSubscriptionUpdate update, TbCallback callback) { 141 + public void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback) {
141 TbSubscription subscription = subscriptionsBySessionId 142 TbSubscription subscription = subscriptionsBySessionId
142 .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); 143 .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
143 if (subscription != null) { 144 if (subscription != null) {
@@ -157,6 +158,16 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @@ -157,6 +158,16 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
157 } 158 }
158 159
159 @Override 160 @Override
  161 + public void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback) {
  162 + TbSubscription subscription = subscriptionsBySessionId
  163 + .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
  164 + if (subscription != null && subscription.getType() == TbSubscriptionType.ALARMS) {
  165 + subscription.getUpdateConsumer().accept(sessionId, update);
  166 + }
  167 + callback.onSuccess();
  168 + }
  169 +
  170 + @Override
160 public void cancelSubscription(String sessionId, int subscriptionId) { 171 public void cancelSubscription(String sessionId, int subscriptionId) {
161 log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); 172 log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
162 Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.get(sessionId); 173 Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription; @@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription;
17 17
18 import org.springframework.context.ApplicationListener; 18 import org.springframework.context.ApplicationListener;
19 import org.thingsboard.server.common.data.alarm.Alarm; 19 import org.thingsboard.server.common.data.alarm.Alarm;
  20 +import org.thingsboard.server.common.data.id.AlarmId;
20 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 22 import org.thingsboard.server.common.data.id.TenantId;
22 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 23 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@@ -38,5 +39,7 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio @@ -38,5 +39,7 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
38 39
39 void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty); 40 void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
40 41
41 - void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm); 42 + void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
  43 +
  44 + void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
42 } 45 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,6 +18,9 @@ package org.thingsboard.server.service.subscription; @@ -18,6 +18,9 @@ package org.thingsboard.server.service.subscription;
18 import lombok.Getter; 18 import lombok.Getter;
19 import lombok.Setter; 19 import lombok.Setter;
20 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.data.alarm.Alarm;
  22 +import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
  23 +import org.thingsboard.server.common.data.id.AlarmId;
21 import org.thingsboard.server.common.data.id.EntityId; 24 import org.thingsboard.server.common.data.id.EntityId;
22 import org.thingsboard.server.common.data.page.PageData; 25 import org.thingsboard.server.common.data.page.PageData;
23 import org.thingsboard.server.common.data.query.AlarmData; 26 import org.thingsboard.server.common.data.query.AlarmData;
@@ -26,15 +29,18 @@ import org.thingsboard.server.common.data.query.AlarmDataQuery; @@ -26,15 +29,18 @@ import org.thingsboard.server.common.data.query.AlarmDataQuery;
26 import org.thingsboard.server.common.data.query.EntityData; 29 import org.thingsboard.server.common.data.query.EntityData;
27 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; 30 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
28 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; 31 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
  32 +import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate;
29 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; 33 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
30 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate;  
31 34
32 import java.util.ArrayList; 35 import java.util.ArrayList;
33 import java.util.Collection; 36 import java.util.Collection;
  37 +import java.util.Collections;
34 import java.util.HashMap; 38 import java.util.HashMap;
35 import java.util.LinkedHashMap; 39 import java.util.LinkedHashMap;
36 import java.util.List; 40 import java.util.List;
37 import java.util.Map; 41 import java.util.Map;
  42 +import java.util.function.Function;
  43 +import java.util.stream.Collectors;
38 44
39 @Slf4j 45 @Slf4j
40 public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { 46 public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
@@ -44,6 +50,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -44,6 +50,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
44 private final LinkedHashMap<EntityId, EntityData> entitiesMap; 50 private final LinkedHashMap<EntityId, EntityData> entitiesMap;
45 @Getter 51 @Getter
46 @Setter 52 @Setter
  53 + private final HashMap<AlarmId, AlarmData> alarmsMap;
  54 + @Getter
  55 + @Setter
47 private PageData<AlarmData> alarms; 56 private PageData<AlarmData> alarms;
48 @Getter 57 @Getter
49 @Setter 58 @Setter
@@ -56,6 +65,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -56,6 +65,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
56 public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) { 65 public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
57 super(serviceId, wsService, sessionRef, cmdId); 66 super(serviceId, wsService, sessionRef, cmdId);
58 this.entitiesMap = new LinkedHashMap<>(); 67 this.entitiesMap = new LinkedHashMap<>();
  68 + this.alarmsMap = new HashMap<>();
59 } 69 }
60 70
61 public void setEntitiesData(PageData<EntityData> entitiesData) { 71 public void setEntitiesData(PageData<EntityData> entitiesData) {
@@ -81,6 +91,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -81,6 +91,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
81 } 91 }
82 } 92 }
83 } 93 }
  94 + alarmsMap.clear();
  95 + alarmsMap.putAll(alarms.getData().stream().collect(Collectors.toMap(AlarmData::getId, Function.identity())));
84 return this.alarms; 96 return this.alarms;
85 } 97 }
86 98
@@ -100,16 +112,64 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -100,16 +112,64 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
100 .entityId(entityData.getEntityId()) 112 .entityId(entityData.getEntityId())
101 .updateConsumer(this::sendWsMsg) 113 .updateConsumer(this::sendWsMsg)
102 .ts(lastFetchTs) 114 .ts(lastFetchTs)
103 - .typeList(pageLink.getTypeList())  
104 - .severityList(pageLink.getSeverityList())  
105 - .statusList(pageLink.getStatusList())  
106 - .searchPropagatedAlarms(pageLink.isSearchPropagatedAlarms())  
107 .build()); 115 .build());
108 } 116 }
109 return result; 117 return result;
110 } 118 }
111 119
112 private void sendWsMsg(String sessionId, AlarmSubscriptionUpdate subscriptionUpdate) { 120 private void sendWsMsg(String sessionId, AlarmSubscriptionUpdate subscriptionUpdate) {
  121 + Alarm alarm = subscriptionUpdate.getAlarm();
  122 + AlarmId alarmId = alarm.getId();
  123 + if (subscriptionUpdate.isAlarmDeleted()) {
  124 + Alarm deleted = alarmsMap.remove(alarmId);
  125 + if (deleted != null) {
  126 + //TODO: invalidate current page;
  127 + }
  128 + } else {
  129 + AlarmData current = alarmsMap.get(alarmId);
  130 + boolean onCurrentPage = current != null;
  131 + boolean matchesFilter = filter(alarm);
  132 + if (onCurrentPage) {
  133 + if (matchesFilter) {
  134 + AlarmData updated = new AlarmData(alarm, current.getName(), current.getEntityId());
  135 + alarmsMap.put(alarmId, updated);
  136 + wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated)));
  137 + } else {
  138 + //TODO: invalidate current page;
  139 + }
  140 + } else if (matchesFilter && query.getPageLink().getPage() == 0) {
  141 + //TODO: invalidate current page;
  142 + }
  143 + }
  144 + }
113 145
  146 + private boolean filter(Alarm alarm) {
  147 + AlarmDataPageLink filter = query.getPageLink();
  148 + long startTs = System.currentTimeMillis() - filter.getTimeWindow();
  149 + if (alarm.getCreatedTime() < startTs) {
  150 + //Skip update that does not match time window.
  151 + return false;
  152 + }
  153 + if (filter.getTypeList() != null && !filter.getTypeList().isEmpty() && !filter.getTypeList().contains(alarm.getType())) {
  154 + return false;
  155 + }
  156 + if (filter.getSeverityList() != null && !filter.getSeverityList().isEmpty()) {
  157 + if (!filter.getSeverityList().contains(alarm.getSeverity())) {
  158 + return false;
  159 + }
  160 + }
  161 + if (filter.getStatusList() != null && !filter.getStatusList().isEmpty()) {
  162 + boolean matches = false;
  163 + for (AlarmSearchStatus status : filter.getStatusList()) {
  164 + if (status.getStatuses().contains(alarm.getStatus())) {
  165 + matches = true;
  166 + break;
  167 + }
  168 + }
  169 + if (!matches) {
  170 + return false;
  171 + }
  172 + }
  173 + return true;
114 } 174 }
115 } 175 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,35 +16,26 @@ @@ -16,35 +16,26 @@
16 package org.thingsboard.server.service.subscription; 16 package org.thingsboard.server.service.subscription;
17 17
18 import lombok.Builder; 18 import lombok.Builder;
  19 +import lombok.Getter;
19 import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; 20 import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
20 import org.thingsboard.server.common.data.alarm.AlarmSeverity; 21 import org.thingsboard.server.common.data.alarm.AlarmSeverity;
21 import org.thingsboard.server.common.data.id.EntityId; 22 import org.thingsboard.server.common.data.id.EntityId;
22 import org.thingsboard.server.common.data.id.TenantId; 23 import org.thingsboard.server.common.data.id.TenantId;
23 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; 24 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
24 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate;  
25 25
26 import java.util.List; 26 import java.util.List;
27 import java.util.function.BiConsumer; 27 import java.util.function.BiConsumer;
28 28
29 public class TbAlarmsSubscription extends TbSubscription<AlarmSubscriptionUpdate> { 29 public class TbAlarmsSubscription extends TbSubscription<AlarmSubscriptionUpdate> {
30 30
  31 + @Getter
31 private final long ts; 32 private final long ts;
32 - private final List<String> typeList;  
33 - private final List<AlarmSearchStatus> statusList;  
34 - private final List<AlarmSeverity> severityList;  
35 - private final boolean searchPropagatedAlarms;  
36 33
37 @Builder 34 @Builder
38 public TbAlarmsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 35 public TbAlarmsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
39 - TbSubscriptionType type, BiConsumer<String, AlarmSubscriptionUpdate> updateConsumer,  
40 - long ts, List<String> typeList, List<AlarmSearchStatus> statusList,  
41 - List<AlarmSeverity> severityList, boolean searchPropagatedAlarms) { 36 + TbSubscriptionType type, BiConsumer<String, AlarmSubscriptionUpdate> updateConsumer, long ts) {
42 super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateConsumer); 37 super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateConsumer);
43 this.ts = ts; 38 this.ts = ts;
44 - this.typeList = typeList;  
45 - this.statusList = statusList;  
46 - this.severityList = severityList;  
47 - this.searchPropagatedAlarms = searchPropagatedAlarms;  
48 } 39 }
49 40
50 @Override 41 @Override
@@ -19,12 +19,12 @@ import lombok.Builder; @@ -19,12 +19,12 @@ import lombok.Builder;
19 import lombok.Getter; 19 import lombok.Getter;
20 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
22 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 22 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
23 23
24 import java.util.Map; 24 import java.util.Map;
25 import java.util.function.BiConsumer; 25 import java.util.function.BiConsumer;
26 26
27 -public class TbAttributeSubscription extends TbSubscription<TsSubscriptionUpdate> { 27 +public class TbAttributeSubscription extends TbSubscription<TelemetrySubscriptionUpdate> {
28 28
29 @Getter private final boolean allKeys; 29 @Getter private final boolean allKeys;
30 @Getter private final Map<String, Long> keyStates; 30 @Getter private final Map<String, Long> keyStates;
@@ -32,7 +32,7 @@ public class TbAttributeSubscription extends TbSubscription<TsSubscriptionUpdate @@ -32,7 +32,7 @@ public class TbAttributeSubscription extends TbSubscription<TsSubscriptionUpdate
32 32
33 @Builder 33 @Builder
34 public TbAttributeSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 34 public TbAttributeSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
35 - BiConsumer<String, TsSubscriptionUpdate> updateConsumer, 35 + BiConsumer<String, TelemetrySubscriptionUpdate> updateConsumer,
36 boolean allKeys, Map<String, Long> keyStates, TbAttributeSubscriptionScope scope) { 36 boolean allKeys, Map<String, Long> keyStates, TbAttributeSubscriptionScope scope) {
37 super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES, updateConsumer); 37 super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ATTRIBUTES, updateConsumer);
38 this.allKeys = allKeys; 38 this.allKeys = allKeys;
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,7 +33,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; @@ -33,7 +33,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
33 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; 33 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
34 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; 34 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
35 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; 35 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
36 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 36 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
37 37
38 import java.util.ArrayList; 38 import java.util.ArrayList;
39 import java.util.Arrays; 39 import java.util.Arrays;
@@ -169,11 +169,11 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -169,11 +169,11 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
169 return keyStates; 169 return keyStates;
170 } 170 }
171 171
172 - private void sendWsMsg(String sessionId, TsSubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { 172 + private void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
173 sendWsMsg(sessionId, subscriptionUpdate, keyType, true); 173 sendWsMsg(sessionId, subscriptionUpdate, keyType, true);
174 } 174 }
175 175
176 - private void sendWsMsg(String sessionId, TsSubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues) { 176 + private void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues) {
177 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); 177 EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
178 if (entityId != null) { 178 if (entityId != null) {
179 log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); 179 log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
@@ -187,7 +187,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -187,7 +187,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
187 } 187 }
188 } 188 }
189 189
190 - private void sendLatestWsMsg(EntityId entityId, String sessionId, TsSubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { 190 + private void sendLatestWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
191 Map<String, TsValue> latestUpdate = new HashMap<>(); 191 Map<String, TsValue> latestUpdate = new HashMap<>();
192 subscriptionUpdate.getData().forEach((k, v) -> { 192 subscriptionUpdate.getData().forEach((k, v) -> {
193 Object[] data = (Object[]) v.get(0); 193 Object[] data = (Object[]) v.get(0);
@@ -226,7 +226,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> { @@ -226,7 +226,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
226 } 226 }
227 } 227 }
228 228
229 - private void sendTsWsMsg(EntityId entityId, String sessionId, TsSubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) { 229 + private void sendTsWsMsg(EntityId entityId, String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
230 Map<String, List<TsValue>> tsUpdate = new HashMap<>(); 230 Map<String, List<TsValue>> tsUpdate = new HashMap<>();
231 subscriptionUpdate.getData().forEach((k, v) -> { 231 subscriptionUpdate.getData().forEach((k, v) -> {
232 Object[] data = (Object[]) v.get(0); 232 Object[] data = (Object[]) v.get(0);
@@ -18,7 +18,8 @@ package org.thingsboard.server.service.subscription; @@ -18,7 +18,8 @@ package org.thingsboard.server.service.subscription;
18 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; 18 import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
19 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 19 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
20 import org.thingsboard.server.common.msg.queue.TbCallback; 20 import org.thingsboard.server.common.msg.queue.TbCallback;
21 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 21 +import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
  22 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
22 23
23 public interface TbLocalSubscriptionService { 24 public interface TbLocalSubscriptionService {
24 25
@@ -28,7 +29,9 @@ public interface TbLocalSubscriptionService { @@ -28,7 +29,9 @@ public interface TbLocalSubscriptionService {
28 29
29 void cancelAllSessionSubscriptions(String sessionId); 30 void cancelAllSessionSubscriptions(String sessionId);
30 31
31 - void onSubscriptionUpdate(String sessionId, TsSubscriptionUpdate update, TbCallback callback); 32 + void onSubscriptionUpdate(String sessionId, TelemetrySubscriptionUpdate update, TbCallback callback);
  33 +
  34 + void onSubscriptionUpdate(String sessionId, AlarmSubscriptionUpdate update, TbCallback callback);
32 35
33 void onApplicationEvent(PartitionChangeEvent event); 36 void onApplicationEvent(PartitionChangeEvent event);
34 37
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.service.subscription; 16 package org.thingsboard.server.service.subscription;
17 17
  18 +import org.thingsboard.server.common.data.alarm.Alarm;
18 import org.thingsboard.server.common.data.id.EntityId; 19 import org.thingsboard.server.common.data.id.EntityId;
19 import org.thingsboard.server.common.data.id.EntityIdFactory; 20 import org.thingsboard.server.common.data.id.EntityIdFactory;
20 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
@@ -44,7 +45,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr @@ -44,7 +45,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdatePr
44 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 45 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
45 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; 46 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
46 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; 47 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
47 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 48 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
48 49
49 import java.util.ArrayList; 50 import java.util.ArrayList;
50 import java.util.HashMap; 51 import java.util.HashMap;
@@ -137,9 +138,9 @@ public class TbSubscriptionUtils { @@ -137,9 +138,9 @@ public class TbSubscriptionUtils {
137 return builder.build(); 138 return builder.build();
138 } 139 }
139 140
140 - public static TsSubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) { 141 + public static TelemetrySubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) {
141 if (proto.getErrorCode() > 0) { 142 if (proto.getErrorCode() > 0) {
142 - return new TsSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); 143 + return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
143 } else { 144 } else {
144 Map<String, List<Object>> data = new TreeMap<>(); 145 Map<String, List<Object>> data = new TreeMap<>();
145 proto.getDataList().forEach(v -> { 146 proto.getDataList().forEach(v -> {
@@ -151,7 +152,7 @@ public class TbSubscriptionUtils { @@ -151,7 +152,7 @@ public class TbSubscriptionUtils {
151 values.add(value); 152 values.add(value);
152 } 153 }
153 }); 154 });
154 - return new TsSubscriptionUpdate(proto.getSubscriptionId(), data); 155 + return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), data);
155 } 156 }
156 } 157 }
157 158
@@ -261,4 +262,14 @@ public class TbSubscriptionUtils { @@ -261,4 +262,14 @@ public class TbSubscriptionUtils {
261 } 262 }
262 return entry; 263 return entry;
263 } 264 }
  265 +
  266 + public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, Alarm alarm) {
  267 +// TODO: 3.1
  268 + throw new RuntimeException("Not implemented!");
  269 + }
  270 +
  271 + public static ToCoreMsg toAlarmDeletedProto(TenantId tenantId, EntityId entityId, Alarm alarm) {
  272 +// TODO: 3.1
  273 + throw new RuntimeException("Not implemented!");
  274 + }
264 } 275 }
@@ -19,12 +19,12 @@ import lombok.Builder; @@ -19,12 +19,12 @@ import lombok.Builder;
19 import lombok.Getter; 19 import lombok.Getter;
20 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
22 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 22 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
23 23
24 import java.util.Map; 24 import java.util.Map;
25 import java.util.function.BiConsumer; 25 import java.util.function.BiConsumer;
26 26
27 -public class TbTimeseriesSubscription extends TbSubscription<TsSubscriptionUpdate> { 27 +public class TbTimeseriesSubscription extends TbSubscription<TelemetrySubscriptionUpdate> {
28 28
29 @Getter 29 @Getter
30 private final boolean allKeys; 30 private final boolean allKeys;
@@ -37,7 +37,7 @@ public class TbTimeseriesSubscription extends TbSubscription<TsSubscriptionUpdat @@ -37,7 +37,7 @@ public class TbTimeseriesSubscription extends TbSubscription<TsSubscriptionUpdat
37 37
38 @Builder 38 @Builder
39 public TbTimeseriesSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 39 public TbTimeseriesSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
40 - BiConsumer<String, TsSubscriptionUpdate> updateConsumer, 40 + BiConsumer<String, TelemetrySubscriptionUpdate> updateConsumer,
41 boolean allKeys, Map<String, Long> keyStates, long startTime, long endTime) { 41 boolean allKeys, Map<String, Long> keyStates, long startTime, long endTime) {
42 super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES, updateConsumer); 42 super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.TIMESERIES, updateConsumer);
43 this.allKeys = allKeys; 43 this.allKeys = allKeys;
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.FutureCallback; @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures; 20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
22 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
  23 +import org.checkerframework.checker.nullness.qual.Nullable;
23 import org.springframework.beans.factory.annotation.Autowired; 24 import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.context.event.EventListener; 25 import org.springframework.context.event.EventListener;
25 import org.springframework.stereotype.Service; 26 import org.springframework.stereotype.Service;
@@ -47,6 +48,7 @@ import org.thingsboard.server.common.data.query.AlarmDataPageLink; @@ -47,6 +48,7 @@ import org.thingsboard.server.common.data.query.AlarmDataPageLink;
47 import org.thingsboard.server.common.msg.queue.ServiceType; 48 import org.thingsboard.server.common.msg.queue.ServiceType;
48 import org.thingsboard.server.common.msg.queue.TbCallback; 49 import org.thingsboard.server.common.msg.queue.TbCallback;
49 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 50 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  51 +import org.thingsboard.server.dao.alarm.AlarmOperationResult;
50 import org.thingsboard.server.dao.alarm.AlarmService; 52 import org.thingsboard.server.dao.alarm.AlarmService;
51 import org.thingsboard.server.dao.attributes.AttributesService; 53 import org.thingsboard.server.dao.attributes.AttributesService;
52 import org.thingsboard.server.dao.timeseries.TimeseriesService; 54 import org.thingsboard.server.dao.timeseries.TimeseriesService;
@@ -58,7 +60,6 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService; @@ -58,7 +60,6 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService;
58 import org.thingsboard.server.service.subscription.TbSubscriptionUtils; 60 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
59 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; 61 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
60 62
61 -import javax.annotation.Nullable;  
62 import javax.annotation.PostConstruct; 63 import javax.annotation.PostConstruct;
63 import javax.annotation.PreDestroy; 64 import javax.annotation.PreDestroy;
64 import java.util.Collection; 65 import java.util.Collection;
@@ -99,46 +100,32 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService @@ -99,46 +100,32 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
99 100
100 @Override 101 @Override
101 public Alarm createOrUpdateAlarm(Alarm alarm) { 102 public Alarm createOrUpdateAlarm(Alarm alarm) {
102 - //TODO 3.1: we also need a list of related entities if this is propagated alarm;  
103 - Alarm result = alarmService.createOrUpdateAlarm(alarm);  
104 - List<EntityId> relatedEntities = Collections.singletonList(result.getOriginator());  
105 - pushAlarmToSubService(result, relatedEntities);  
106 - return result;  
107 - }  
108 -  
109 - private void pushAlarmToSubService(Alarm result, List<EntityId> entityIds) {  
110 - wsCallBackExecutor.submit(() -> {  
111 - TenantId tenantId = result.getTenantId();  
112 - for (EntityId entityId : entityIds) {  
113 - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);  
114 - if (currentPartitions.contains(tpi)) {  
115 - if (subscriptionManagerService.isPresent()) {  
116 - subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, result);  
117 - } else {  
118 - log.warn("Possible misconfiguration because subscriptionManagerService is null!");  
119 - }  
120 - } else {  
121 - //TODO 3.1: cluster mode notification  
122 -// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesUpdateProto(tenantId, entityId, ts);  
123 -// clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);  
124 - }  
125 - }  
126 - }); 103 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(alarm);
  104 + if (result.isSuccessful()) {
  105 + onAlarmUpdated(result);
  106 + }
  107 + return result.getAlarm();
127 } 108 }
128 109
129 @Override 110 @Override
130 public Boolean deleteAlarm(TenantId tenantId, AlarmId alarmId) { 111 public Boolean deleteAlarm(TenantId tenantId, AlarmId alarmId) {
131 - return alarmService.deleteAlarm(tenantId, alarmId); 112 + AlarmOperationResult result = alarmService.deleteAlarm(tenantId, alarmId);
  113 + onAlarmDeleted(result);
  114 + return result.isSuccessful();
132 } 115 }
133 116
134 @Override 117 @Override
135 public ListenableFuture<Boolean> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTs) { 118 public ListenableFuture<Boolean> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTs) {
136 - return alarmService.ackAlarm(tenantId, alarmId, ackTs); 119 + ListenableFuture<AlarmOperationResult> result = alarmService.ackAlarm(tenantId, alarmId, ackTs);
  120 + Futures.addCallback(result, new AlarmUpdateCallback(), wsCallBackExecutor);
  121 + return Futures.transform(result, AlarmOperationResult::isSuccessful, wsCallBackExecutor);
137 } 122 }
138 123
139 @Override 124 @Override
140 public ListenableFuture<Boolean> clearAlarm(TenantId tenantId, AlarmId alarmId, JsonNode details, long clearTs) { 125 public ListenableFuture<Boolean> clearAlarm(TenantId tenantId, AlarmId alarmId, JsonNode details, long clearTs) {
141 - return alarmService.clearAlarm(tenantId, alarmId, details, clearTs); 126 + ListenableFuture<AlarmOperationResult> result = alarmService.clearAlarm(tenantId, alarmId, details, clearTs);
  127 + Futures.addCallback(result, new AlarmUpdateCallback(), wsCallBackExecutor);
  128 + return Futures.transform(result, AlarmOperationResult::isSuccessful, wsCallBackExecutor);
142 } 129 }
143 130
144 @Override 131 @Override
@@ -147,9 +134,80 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService @@ -147,9 +134,80 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
147 } 134 }
148 135
149 @Override 136 @Override
  137 + public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(TenantId tenantId, AlarmId alarmId) {
  138 + return alarmService.findAlarmInfoByIdAsync(tenantId, alarmId);
  139 + }
  140 +
  141 + @Override
  142 + public ListenableFuture<PageData<AlarmInfo>> findAlarms(TenantId tenantId, AlarmQuery query) {
  143 + return alarmService.findAlarms(tenantId, query);
  144 + }
  145 +
  146 + @Override
  147 + public AlarmSeverity findHighestAlarmSeverity(TenantId tenantId, EntityId entityId, AlarmSearchStatus alarmSearchStatus, AlarmStatus alarmStatus) {
  148 + return alarmService.findHighestAlarmSeverity(tenantId, entityId, alarmSearchStatus, alarmStatus);
  149 + }
  150 +
  151 + @Override
  152 + public PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId, CustomerId customerId, AlarmDataPageLink pageLink, Collection<EntityId> orderedEntityIds) {
  153 + return alarmService.findAlarmDataByQueryForEntities(tenantId, customerId, pageLink, orderedEntityIds);
  154 + }
  155 +
  156 + @Override
150 public ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type) { 157 public ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type) {
151 return alarmService.findLatestByOriginatorAndType(tenantId, originator, type); 158 return alarmService.findLatestByOriginatorAndType(tenantId, originator, type);
152 } 159 }
153 160
  161 + private void onAlarmUpdated(AlarmOperationResult result) {
  162 + wsCallBackExecutor.submit(() -> {
  163 + Alarm alarm = result.getAlarm();
  164 + TenantId tenantId = result.getAlarm().getTenantId();
  165 + for (EntityId entityId : result.getPropagatedEntitiesList()) {
  166 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  167 + if (currentPartitions.contains(tpi)) {
  168 + if (subscriptionManagerService.isPresent()) {
  169 + subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm);
  170 + } else {
  171 + log.warn("Possible misconfiguration because subscriptionManagerService is null!");
  172 + }
  173 + } else {
  174 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, alarm);
  175 + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
  176 + }
  177 + }
  178 + });
  179 + }
  180 +
  181 + private void onAlarmDeleted(AlarmOperationResult result) {
  182 + wsCallBackExecutor.submit(() -> {
  183 + Alarm alarm = result.getAlarm();
  184 + TenantId tenantId = result.getAlarm().getTenantId();
  185 + for (EntityId entityId : result.getPropagatedEntitiesList()) {
  186 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
  187 + if (currentPartitions.contains(tpi)) {
  188 + if (subscriptionManagerService.isPresent()) {
  189 + subscriptionManagerService.get().onAlarmDeleted(tenantId, entityId, alarm);
  190 + } else {
  191 + log.warn("Possible misconfiguration because subscriptionManagerService is null!");
  192 + }
  193 + } else {
  194 + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAlarmDeletedProto(tenantId, entityId, alarm);
  195 + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
  196 + }
  197 + }
  198 + });
  199 + }
  200 +
  201 + private class AlarmUpdateCallback implements FutureCallback<AlarmOperationResult> {
  202 + @Override
  203 + public void onSuccess(@Nullable AlarmOperationResult result) {
  204 + onAlarmUpdated(result);
  205 + }
  206 +
  207 + @Override
  208 + public void onFailure(Throwable t) {
  209 + log.warn("Failed to update alarm", t);
  210 + }
  211 + }
154 212
155 } 213 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -69,7 +69,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd; @@ -69,7 +69,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
69 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; 69 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
70 import org.thingsboard.server.service.telemetry.exception.UnauthorizedException; 70 import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
71 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; 71 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
72 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 72 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
73 73
74 import javax.annotation.Nullable; 74 import javax.annotation.Nullable;
75 import javax.annotation.PostConstruct; 75 import javax.annotation.PostConstruct;
@@ -219,7 +219,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -219,7 +219,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
219 } 219 }
220 } catch (IOException e) { 220 } catch (IOException e) {
221 log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); 221 log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
222 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND); 222 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
223 sendWsMsg(sessionRef, update); 223 sendWsMsg(sessionRef, update);
224 } 224 }
225 } 225 }
@@ -254,7 +254,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -254,7 +254,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
254 } 254 }
255 255
256 @Override 256 @Override
257 - public void sendWsMsg(String sessionId, TsSubscriptionUpdate update) { 257 + public void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate update) {
258 sendWsMsg(sessionId, update.getSubscriptionId(), update); 258 sendWsMsg(sessionId, update.getSubscriptionId(), update);
259 } 259 }
260 260
@@ -397,7 +397,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -397,7 +397,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
397 @Override 397 @Override
398 public void onSuccess(List<AttributeKvEntry> data) { 398 public void onSuccess(List<AttributeKvEntry> data) {
399 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 399 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
400 - sendWsMsg(sessionRef, new TsSubscriptionUpdate(cmd.getCmdId(), attributesData)); 400 + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
401 401
402 Map<String, Long> subState = new HashMap<>(keys.size()); 402 Map<String, Long> subState = new HashMap<>(keys.size());
403 keys.forEach(key -> subState.put(key, 0L)); 403 keys.forEach(key -> subState.put(key, 0L));
@@ -422,12 +422,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -422,12 +422,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
422 @Override 422 @Override
423 public void onFailure(Throwable e) { 423 public void onFailure(Throwable e) {
424 log.error(FAILED_TO_FETCH_ATTRIBUTES, e); 424 log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
425 - TsSubscriptionUpdate update; 425 + TelemetrySubscriptionUpdate update;
426 if (e instanceof UnauthorizedException) { 426 if (e instanceof UnauthorizedException) {
427 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, 427 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
428 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); 428 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
429 } else { 429 } else {
430 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 430 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
431 FAILED_TO_FETCH_ATTRIBUTES); 431 FAILED_TO_FETCH_ATTRIBUTES);
432 } 432 }
433 sendWsMsg(sessionRef, update); 433 sendWsMsg(sessionRef, update);
@@ -446,19 +446,19 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -446,19 +446,19 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
446 WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); 446 WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
447 if (sessionMD == null) { 447 if (sessionMD == null) {
448 log.warn("[{}] Session meta data not found. ", sessionId); 448 log.warn("[{}] Session meta data not found. ", sessionId);
449 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 449 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
450 SESSION_META_DATA_NOT_FOUND); 450 SESSION_META_DATA_NOT_FOUND);
451 sendWsMsg(sessionRef, update); 451 sendWsMsg(sessionRef, update);
452 return; 452 return;
453 } 453 }
454 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { 454 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
455 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 455 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
456 "Device id is empty!"); 456 "Device id is empty!");
457 sendWsMsg(sessionRef, update); 457 sendWsMsg(sessionRef, update);
458 return; 458 return;
459 } 459 }
460 if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) { 460 if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
461 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 461 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
462 "Keys are empty!"); 462 "Keys are empty!");
463 sendWsMsg(sessionRef, update); 463 sendWsMsg(sessionRef, update);
464 return; 464 return;
@@ -471,17 +471,17 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -471,17 +471,17 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
471 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() { 471 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
472 @Override 472 @Override
473 public void onSuccess(List<TsKvEntry> data) { 473 public void onSuccess(List<TsKvEntry> data) {
474 - sendWsMsg(sessionRef, new TsSubscriptionUpdate(cmd.getCmdId(), data)); 474 + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
475 } 475 }
476 476
477 @Override 477 @Override
478 public void onFailure(Throwable e) { 478 public void onFailure(Throwable e) {
479 - TsSubscriptionUpdate update; 479 + TelemetrySubscriptionUpdate update;
480 if (UnauthorizedException.class.isInstance(e)) { 480 if (UnauthorizedException.class.isInstance(e)) {
481 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, 481 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
482 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); 482 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
483 } else { 483 } else {
484 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 484 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
485 FAILED_TO_FETCH_DATA); 485 FAILED_TO_FETCH_DATA);
486 } 486 }
487 sendWsMsg(sessionRef, update); 487 sendWsMsg(sessionRef, update);
@@ -497,7 +497,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -497,7 +497,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
497 @Override 497 @Override
498 public void onSuccess(List<AttributeKvEntry> data) { 498 public void onSuccess(List<AttributeKvEntry> data) {
499 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 499 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
500 - sendWsMsg(sessionRef, new TsSubscriptionUpdate(cmd.getCmdId(), attributesData)); 500 + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
501 501
502 Map<String, Long> subState = new HashMap<>(attributesData.size()); 502 Map<String, Long> subState = new HashMap<>(attributesData.size());
503 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); 503 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
@@ -520,7 +520,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -520,7 +520,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
520 @Override 520 @Override
521 public void onFailure(Throwable e) { 521 public void onFailure(Throwable e) {
522 log.error(FAILED_TO_FETCH_ATTRIBUTES, e); 522 log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
523 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 523 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
524 FAILED_TO_FETCH_ATTRIBUTES); 524 FAILED_TO_FETCH_ATTRIBUTES);
525 sendWsMsg(sessionRef, update); 525 sendWsMsg(sessionRef, update);
526 } 526 }
@@ -583,7 +583,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -583,7 +583,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
583 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() { 583 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
584 @Override 584 @Override
585 public void onSuccess(List<TsKvEntry> data) { 585 public void onSuccess(List<TsKvEntry> data) {
586 - sendWsMsg(sessionRef, new TsSubscriptionUpdate(cmd.getCmdId(), data)); 586 + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
587 Map<String, Long> subState = new HashMap<>(data.size()); 587 Map<String, Long> subState = new HashMap<>(data.size());
588 data.forEach(v -> subState.put(v.getKey(), v.getTs())); 588 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
589 589
@@ -601,12 +601,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -601,12 +601,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
601 601
602 @Override 602 @Override
603 public void onFailure(Throwable e) { 603 public void onFailure(Throwable e) {
604 - TsSubscriptionUpdate update; 604 + TelemetrySubscriptionUpdate update;
605 if (UnauthorizedException.class.isInstance(e)) { 605 if (UnauthorizedException.class.isInstance(e)) {
606 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, 606 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
607 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); 607 SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
608 } else { 608 } else {
609 - update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 609 + update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
610 FAILED_TO_FETCH_DATA); 610 FAILED_TO_FETCH_DATA);
611 } 611 }
612 sendWsMsg(sessionRef, update); 612 sendWsMsg(sessionRef, update);
@@ -620,7 +620,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -620,7 +620,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
620 return new FutureCallback<List<TsKvEntry>>() { 620 return new FutureCallback<List<TsKvEntry>>() {
621 @Override 621 @Override
622 public void onSuccess(List<TsKvEntry> data) { 622 public void onSuccess(List<TsKvEntry> data) {
623 - sendWsMsg(sessionRef, new TsSubscriptionUpdate(cmd.getCmdId(), data)); 623 + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
624 Map<String, Long> subState = new HashMap<>(keys.size()); 624 Map<String, Long> subState = new HashMap<>(keys.size());
625 keys.forEach(key -> subState.put(key, startTs)); 625 keys.forEach(key -> subState.put(key, startTs));
626 data.forEach(v -> subState.put(v.getKey(), v.getTs())); 626 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
@@ -644,7 +644,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -644,7 +644,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
644 } else { 644 } else {
645 log.info(FAILED_TO_FETCH_DATA, e); 645 log.info(FAILED_TO_FETCH_DATA, e);
646 } 646 }
647 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 647 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
648 FAILED_TO_FETCH_DATA); 648 FAILED_TO_FETCH_DATA);
649 sendWsMsg(sessionRef, update); 649 sendWsMsg(sessionRef, update);
650 } 650 }
@@ -661,12 +661,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -661,12 +661,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
661 661
662 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { 662 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
663 if (cmd.getCmdId() < 0) { 663 if (cmd.getCmdId() < 0) {
664 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 664 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
665 "Cmd id is negative value!"); 665 "Cmd id is negative value!");
666 sendWsMsg(sessionRef, update); 666 sendWsMsg(sessionRef, update);
667 return false; 667 return false;
668 } else if (cmd.getQuery() == null && cmd.getLatestCmd() == null && cmd.getHistoryCmd() == null && cmd.getTsCmd() == null) { 668 } else if (cmd.getQuery() == null && cmd.getLatestCmd() == null && cmd.getHistoryCmd() == null && cmd.getTsCmd() == null) {
669 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 669 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
670 "Query is empty!"); 670 "Query is empty!");
671 sendWsMsg(sessionRef, update); 671 sendWsMsg(sessionRef, update);
672 return false; 672 return false;
@@ -676,12 +676,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -676,12 +676,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
676 676
677 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { 677 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
678 if (cmd.getCmdId() < 0) { 678 if (cmd.getCmdId() < 0) {
679 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 679 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
680 "Cmd id is negative value!"); 680 "Cmd id is negative value!");
681 sendWsMsg(sessionRef, update); 681 sendWsMsg(sessionRef, update);
682 return false; 682 return false;
683 } else if (cmd.getQuery() == null) { 683 } else if (cmd.getQuery() == null) {
684 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 684 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
685 "Query is empty!"); 685 "Query is empty!");
686 sendWsMsg(sessionRef, update); 686 sendWsMsg(sessionRef, update);
687 return false; 687 return false;
@@ -691,7 +691,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -691,7 +691,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
691 691
692 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) { 692 private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
693 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { 693 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
694 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, 694 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
695 "Device id is empty!"); 695 "Device id is empty!");
696 sendWsMsg(sessionRef, update); 696 sendWsMsg(sessionRef, update);
697 return false; 697 return false;
@@ -707,7 +707,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -707,7 +707,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
707 WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); 707 WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
708 if (sessionMD == null) { 708 if (sessionMD == null) {
709 log.warn("[{}] Session meta data not found. ", sessionId); 709 log.warn("[{}] Session meta data not found. ", sessionId);
710 - TsSubscriptionUpdate update = new TsSubscriptionUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR, 710 + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR,
711 SESSION_META_DATA_NOT_FOUND); 711 SESSION_META_DATA_NOT_FOUND);
712 sendWsMsg(sessionRef, update); 712 sendWsMsg(sessionRef, update);
713 return false; 713 return false;
@@ -720,7 +720,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -720,7 +720,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
720 sendWsMsg(sessionRef, update.getCmdId(), update); 720 sendWsMsg(sessionRef, update.getCmdId(), update);
721 } 721 }
722 722
723 - private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, TsSubscriptionUpdate update) { 723 + private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, TelemetrySubscriptionUpdate update) {
724 sendWsMsg(sessionRef, update.getSubscriptionId(), update); 724 sendWsMsg(sessionRef, update.getSubscriptionId(), update);
725 } 725 }
726 726
@@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
16 package org.thingsboard.server.service.telemetry; 16 package org.thingsboard.server.service.telemetry;
17 17
18 import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate; 18 import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate;
19 -import org.thingsboard.server.service.telemetry.sub.TsSubscriptionUpdate; 19 +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
20 20
21 /** 21 /**
22 * Created by ashvayka on 27.03.18. 22 * Created by ashvayka on 27.03.18.
@@ -27,7 +27,7 @@ public interface TelemetryWebSocketService { @@ -27,7 +27,7 @@ public interface TelemetryWebSocketService {
27 27
28 void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg); 28 void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
29 29
30 - void sendWsMsg(String sessionId, TsSubscriptionUpdate update); 30 + void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate update);
31 31
32 void sendWsMsg(String sessionId, DataUpdate update); 32 void sendWsMsg(String sessionId, DataUpdate update);
33 33
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.service.telemetry.sub; 16 package org.thingsboard.server.service.telemetry.sub;
17 17
  18 +import lombok.Getter;
18 import org.thingsboard.server.common.data.alarm.Alarm; 19 import org.thingsboard.server.common.data.alarm.Alarm;
19 import org.thingsboard.server.common.data.kv.TsKvEntry; 20 import org.thingsboard.server.common.data.kv.TsKvEntry;
20 import org.thingsboard.server.common.data.query.AlarmData; 21 import org.thingsboard.server.common.data.query.AlarmData;
@@ -28,15 +29,26 @@ import java.util.stream.Collectors; @@ -28,15 +29,26 @@ import java.util.stream.Collectors;
28 29
29 public class AlarmSubscriptionUpdate { 30 public class AlarmSubscriptionUpdate {
30 31
  32 + @Getter
31 private int subscriptionId; 33 private int subscriptionId;
  34 + @Getter
32 private int errorCode; 35 private int errorCode;
  36 + @Getter
33 private String errorMsg; 37 private String errorMsg;
  38 + @Getter
34 private Alarm alarm; 39 private Alarm alarm;
  40 + @Getter
  41 + private boolean alarmDeleted;
35 42
36 public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm) { 43 public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm) {
  44 + this(subscriptionId, alarm, false);
  45 + }
  46 +
  47 + public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm, boolean alarmDeleted) {
37 super(); 48 super();
38 this.subscriptionId = subscriptionId; 49 this.subscriptionId = subscriptionId;
39 this.alarm = alarm; 50 this.alarm = alarm;
  51 + this.alarmDeleted = alarmDeleted;
40 } 52 }
41 53
42 public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) { 54 public AlarmSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) {
@@ -50,19 +62,6 @@ public class AlarmSubscriptionUpdate { @@ -50,19 +62,6 @@ public class AlarmSubscriptionUpdate {
50 this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg(); 62 this.errorMsg = errorMsg != null ? errorMsg : errorCode.getDefaultMsg();
51 } 63 }
52 64
53 - public int getSubscriptionId() {  
54 - return subscriptionId;  
55 - }  
56 -  
57 -  
58 - public int getErrorCode() {  
59 - return errorCode;  
60 - }  
61 -  
62 - public String getErrorMsg() {  
63 - return errorMsg;  
64 - }  
65 -  
66 @Override 65 @Override
67 public String toString() { 66 public String toString() {
68 return "AlarmUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", alarm=" 67 return "AlarmUpdate [subscriptionId=" + subscriptionId + ", errorCode=" + errorCode + ", errorMsg=" + errorMsg + ", alarm="
application/src/main/java/org/thingsboard/server/service/telemetry/sub/TelemetrySubscriptionUpdate.java renamed from application/src/main/java/org/thingsboard/server/service/telemetry/sub/TsSubscriptionUpdate.java
@@ -24,14 +24,14 @@ import java.util.Map; @@ -24,14 +24,14 @@ import java.util.Map;
24 import java.util.TreeMap; 24 import java.util.TreeMap;
25 import java.util.stream.Collectors; 25 import java.util.stream.Collectors;
26 26
27 -public class TsSubscriptionUpdate { 27 +public class TelemetrySubscriptionUpdate {
28 28
29 private int subscriptionId; 29 private int subscriptionId;
30 private int errorCode; 30 private int errorCode;
31 private String errorMsg; 31 private String errorMsg;
32 private Map<String, List<Object>> data; 32 private Map<String, List<Object>> data;
33 33
34 - public TsSubscriptionUpdate(int subscriptionId, List<TsKvEntry> data) { 34 + public TelemetrySubscriptionUpdate(int subscriptionId, List<TsKvEntry> data) {
35 super(); 35 super();
36 this.subscriptionId = subscriptionId; 36 this.subscriptionId = subscriptionId;
37 this.data = new TreeMap<>(); 37 this.data = new TreeMap<>();
@@ -46,17 +46,17 @@ public class TsSubscriptionUpdate { @@ -46,17 +46,17 @@ public class TsSubscriptionUpdate {
46 } 46 }
47 } 47 }
48 48
49 - public TsSubscriptionUpdate(int subscriptionId, Map<String, List<Object>> data) { 49 + public TelemetrySubscriptionUpdate(int subscriptionId, Map<String, List<Object>> data) {
50 super(); 50 super();
51 this.subscriptionId = subscriptionId; 51 this.subscriptionId = subscriptionId;
52 this.data = data; 52 this.data = data;
53 } 53 }
54 54
55 - public TsSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) { 55 + public TelemetrySubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode) {
56 this(subscriptionId, errorCode, null); 56 this(subscriptionId, errorCode, null);
57 } 57 }
58 58
59 - public TsSubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode, String errorMsg) { 59 + public TelemetrySubscriptionUpdate(int subscriptionId, SubscriptionErrorCode errorCode, String errorMsg) {
60 super(); 60 super();
61 this.subscriptionId = subscriptionId; 61 this.subscriptionId = subscriptionId;
62 this.errorCode = errorCode.getCode(); 62 this.errorCode = errorCode.getCode();
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 package org.thingsboard.server.dao.alarm; 16 package org.thingsboard.server.dao.alarm;
2 17
  18 +import lombok.AllArgsConstructor;
3 import lombok.Data; 19 import lombok.Data;
4 import org.thingsboard.server.common.data.alarm.Alarm; 20 import org.thingsboard.server.common.data.alarm.Alarm;
5 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
6 22
  23 +import java.util.Collections;
7 import java.util.List; 24 import java.util.List;
8 25
9 @Data 26 @Data
@@ -11,4 +28,16 @@ public class AlarmOperationResult { @@ -11,4 +28,16 @@ public class AlarmOperationResult {
11 private final Alarm alarm; 28 private final Alarm alarm;
12 private final boolean successful; 29 private final boolean successful;
13 private final List<EntityId> propagatedEntitiesList; 30 private final List<EntityId> propagatedEntitiesList;
  31 +
  32 + public AlarmOperationResult(Alarm alarm, boolean successful) {
  33 + this.alarm = alarm;
  34 + this.successful = successful;
  35 + this.propagatedEntitiesList = Collections.emptyList();
  36 + }
  37 +
  38 + public AlarmOperationResult(Alarm alarm, boolean successful, List<EntityId> propagatedEntitiesList) {
  39 + this.alarm = alarm;
  40 + this.successful = successful;
  41 + this.propagatedEntitiesList = propagatedEntitiesList;
  42 + }
14 } 43 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -140,15 +140,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @@ -140,15 +140,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
140 } 140 }
141 141
142 @Override 142 @Override
143 - public Boolean deleteAlarm(TenantId tenantId, AlarmId alarmId) { 143 + public AlarmOperationResult deleteAlarm(TenantId tenantId, AlarmId alarmId) {
144 try { 144 try {
145 log.debug("Deleting Alarm Id: {}", alarmId); 145 log.debug("Deleting Alarm Id: {}", alarmId);
146 Alarm alarm = alarmDao.findAlarmByIdAsync(tenantId, alarmId.getId()).get(); 146 Alarm alarm = alarmDao.findAlarmByIdAsync(tenantId, alarmId.getId()).get();
147 if (alarm == null) { 147 if (alarm == null) {
148 - return false; 148 + return new AlarmOperationResult(alarm, false);
149 } 149 }
  150 + AlarmOperationResult result = new AlarmOperationResult(alarm, true, new ArrayList<>(getPropagationEntityIds(alarm)));
150 deleteEntityRelations(tenantId, alarm.getId()); 151 deleteEntityRelations(tenantId, alarm.getId());
151 - return alarmDao.deleteAlarm(tenantId, alarm); 152 + alarmDao.deleteAlarm(tenantId, alarm);
  153 + return result;
152 } catch (ExecutionException | InterruptedException e) { 154 } catch (ExecutionException | InterruptedException e) {
153 throw new RuntimeException(e); 155 throw new RuntimeException(e);
154 } 156 }
@@ -158,7 +160,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @@ -158,7 +160,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
158 log.debug("New Alarm : {}", alarm); 160 log.debug("New Alarm : {}", alarm);
159 Alarm saved = alarmDao.save(alarm.getTenantId(), alarm); 161 Alarm saved = alarmDao.save(alarm.getTenantId(), alarm);
160 List<EntityId> propagatedEntitiesList = createAlarmRelations(saved); 162 List<EntityId> propagatedEntitiesList = createAlarmRelations(saved);
161 - return new AlarmOperationResult(alarm, true, propagatedEntitiesList); 163 + return new AlarmOperationResult(saved, true, propagatedEntitiesList);
162 } 164 }
163 165
164 private List<EntityId> createAlarmRelations(Alarm alarm) throws InterruptedException, ExecutionException { 166 private List<EntityId> createAlarmRelations(Alarm alarm) throws InterruptedException, ExecutionException {
@@ -168,13 +170,13 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @@ -168,13 +170,13 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
168 propagatedEntitiesList = new ArrayList<>(parentEntities.size() + 1); 170 propagatedEntitiesList = new ArrayList<>(parentEntities.size() + 1);
169 for (EntityId parentId : parentEntities) { 171 for (EntityId parentId : parentEntities) {
170 propagatedEntitiesList.add(parentId); 172 propagatedEntitiesList.add(parentId);
171 - createAlarmRelation(alarm.getTenantId(), parentId, alarm.getId(), alarm.getStatus(), true); 173 + createAlarmRelation(alarm.getTenantId(), parentId, alarm.getId());
172 } 174 }
173 propagatedEntitiesList.add(alarm.getOriginator()); 175 propagatedEntitiesList.add(alarm.getOriginator());
174 } else { 176 } else {
175 propagatedEntitiesList = Collections.singletonList(alarm.getOriginator()); 177 propagatedEntitiesList = Collections.singletonList(alarm.getOriginator());
176 } 178 }
177 - createAlarmRelation(alarm.getTenantId(), alarm.getOriginator(), alarm.getId(), alarm.getStatus(), true); 179 + createAlarmRelation(alarm.getTenantId(), alarm.getOriginator(), alarm.getId());
178 return propagatedEntitiesList; 180 return propagatedEntitiesList;
179 } 181 }
180 182
@@ -225,34 +227,33 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @@ -225,34 +227,33 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
225 } 227 }
226 228
227 @Override 229 @Override
228 - public ListenableFuture<Boolean> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTime) {  
229 - return getAndUpdate(tenantId, alarmId, new Function<Alarm, Boolean>() { 230 + public ListenableFuture<AlarmOperationResult> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTime) {
  231 + return getAndUpdate(tenantId, alarmId, new Function<Alarm, AlarmOperationResult>() {
230 @Nullable 232 @Nullable
231 @Override 233 @Override
232 - public Boolean apply(@Nullable Alarm alarm) { 234 + public AlarmOperationResult apply(@Nullable Alarm alarm) {
233 if (alarm == null || alarm.getStatus().isAck()) { 235 if (alarm == null || alarm.getStatus().isAck()) {
234 - return false; 236 + return new AlarmOperationResult(alarm, false);
235 } else { 237 } else {
236 AlarmStatus oldStatus = alarm.getStatus(); 238 AlarmStatus oldStatus = alarm.getStatus();
237 AlarmStatus newStatus = oldStatus.isCleared() ? AlarmStatus.CLEARED_ACK : AlarmStatus.ACTIVE_ACK; 239 AlarmStatus newStatus = oldStatus.isCleared() ? AlarmStatus.CLEARED_ACK : AlarmStatus.ACTIVE_ACK;
238 alarm.setStatus(newStatus); 240 alarm.setStatus(newStatus);
239 alarm.setAckTs(ackTime); 241 alarm.setAckTs(ackTime);
240 - alarmDao.save(alarm.getTenantId(), alarm);  
241 - updateRelations(alarm, oldStatus, newStatus);  
242 - return true; 242 + alarm = alarmDao.save(alarm.getTenantId(), alarm);
  243 + return new AlarmOperationResult(alarm, true, new ArrayList<>(getPropagationEntityIds(alarm)));
243 } 244 }
244 } 245 }
245 }); 246 });
246 } 247 }
247 248
248 @Override 249 @Override
249 - public ListenableFuture<Boolean> clearAlarm(TenantId tenantId, AlarmId alarmId, JsonNode details, long clearTime) {  
250 - return getAndUpdate(tenantId, alarmId, new Function<Alarm, Boolean>() { 250 + public ListenableFuture<AlarmOperationResult> clearAlarm(TenantId tenantId, AlarmId alarmId, JsonNode details, long clearTime) {
  251 + return getAndUpdate(tenantId, alarmId, new Function<Alarm, AlarmOperationResult>() {
251 @Nullable 252 @Nullable
252 @Override 253 @Override
253 - public Boolean apply(@Nullable Alarm alarm) { 254 + public AlarmOperationResult apply(@Nullable Alarm alarm) {
254 if (alarm == null || alarm.getStatus().isCleared()) { 255 if (alarm == null || alarm.getStatus().isCleared()) {
255 - return false; 256 + return new AlarmOperationResult(alarm, false);
256 } else { 257 } else {
257 AlarmStatus oldStatus = alarm.getStatus(); 258 AlarmStatus oldStatus = alarm.getStatus();
258 AlarmStatus newStatus = oldStatus.isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK; 259 AlarmStatus newStatus = oldStatus.isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK;
@@ -261,9 +262,8 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @@ -261,9 +262,8 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
261 if (details != null) { 262 if (details != null) {
262 alarm.setDetails(details); 263 alarm.setDetails(details);
263 } 264 }
264 - alarmDao.save(alarm.getTenantId(), alarm);  
265 - updateRelations(alarm, oldStatus, newStatus);  
266 - return true; 265 + alarm = alarmDao.save(alarm.getTenantId(), alarm);
  266 + return new AlarmOperationResult(alarm, true, new ArrayList<>(getPropagationEntityIds(alarm)));
267 } 267 }
268 } 268 }
269 }); 269 });
@@ -102,15 +102,17 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A @@ -102,15 +102,17 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
102 public PageData<AlarmInfo> findAlarms(TenantId tenantId, AlarmQuery query) { 102 public PageData<AlarmInfo> findAlarms(TenantId tenantId, AlarmQuery query) {
103 log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink()); 103 log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink());
104 EntityId affectedEntity = query.getAffectedEntityId(); 104 EntityId affectedEntity = query.getAffectedEntityId();
105 - String searchStatusName;  
106 - if (query.getSearchStatus() == null && query.getStatus() == null) {  
107 - searchStatusName = AlarmSearchStatus.ANY.name();  
108 - } else if (query.getSearchStatus() != null) {  
109 - searchStatusName = query.getSearchStatus().name();  
110 - } else {  
111 - searchStatusName = query.getStatus().name();  
112 - }  
113 - String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; 105 +
  106 + //TODO 3.1: add search by statuses
  107 +// String searchStatusName;
  108 +// if (query.getSearchStatus() == null && query.getStatus() == null) {
  109 +// searchStatusName = AlarmSearchStatus.ANY.name();
  110 +// } else if (query.getSearchStatus() != null) {
  111 +// searchStatusName = query.getSearchStatus().name();
  112 +// } else {
  113 +// searchStatusName = query.getStatus().name();
  114 +// }
  115 +// String relationType = BaseAlarmService.ALARM_RELATION_PREFIX;
114 116
115 return DaoUtil.toPageData( 117 return DaoUtil.toPageData(
116 alarmRepository.findAlarms( 118 alarmRepository.findAlarms(
@@ -64,9 +64,9 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { @@ -64,9 +64,9 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
64 } 64 }
65 65
66 public static final String SELECT_ORIGINATOR_NAME = " CASE" + 66 public static final String SELECT_ORIGINATOR_NAME = " CASE" +
67 - " WHEN a.originator_type = "+ EntityType.TENANT.ordinal() + 67 + " WHEN a.originator_type = " + EntityType.TENANT.ordinal() +
68 " THEN (select title from tenant where id = a.originator_id)" + 68 " THEN (select title from tenant where id = a.originator_id)" +
69 - " WHEN a.originator_type = "+ EntityType.CUSTOMER.ordinal() + 69 + " WHEN a.originator_type = " + EntityType.CUSTOMER.ordinal() +
70 " THEN (select title from customer where id = a.originator_id)" + 70 " THEN (select title from customer where id = a.originator_id)" +
71 " WHEN a.originator_type = " + EntityType.USER.ordinal() + 71 " WHEN a.originator_type = " + EntityType.USER.ordinal() +
72 " THEN (select CONCAT (first_name, ' ', last_name) from tb_user where id = a.originator_id)" + 72 " THEN (select CONCAT (first_name, ' ', last_name) from tb_user where id = a.originator_id)" +
@@ -156,17 +156,27 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { @@ -156,17 +156,27 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
156 sortPart.append("e.priority"); 156 sortPart.append("e.priority");
157 } 157 }
158 158
159 - if (pageLink.getStartTs() > 0) { 159 + long startTs;
  160 + long endTs;
  161 + if (pageLink.getTimeWindow() > 0) {
  162 + endTs = System.currentTimeMillis();
  163 + startTs = endTs - pageLink.getTimeWindow();
  164 + } else {
  165 + startTs = pageLink.getStartTs();
  166 + endTs = pageLink.getEndTs();
  167 + }
  168 +
  169 + if (startTs > 0) {
160 addAndIfNeeded(wherePart, addAnd); 170 addAndIfNeeded(wherePart, addAnd);
161 addAnd = true; 171 addAnd = true;
162 - ctx.addLongParameter("startTime", pageLink.getStartTs()); 172 + ctx.addLongParameter("startTime", startTs);
163 wherePart.append("a.created_time >= :startTime"); 173 wherePart.append("a.created_time >= :startTime");
164 } 174 }
165 175
166 - if (pageLink.getEndTs() > 0) { 176 + if (endTs > 0) {
167 addAndIfNeeded(wherePart, addAnd); 177 addAndIfNeeded(wherePart, addAnd);
168 addAnd = true; 178 addAnd = true;
169 - ctx.addLongParameter("endTime", pageLink.getEndTs()); 179 + ctx.addLongParameter("endTime", endTs);
170 wherePart.append("a.created_time <= :endTime"); 180 wherePart.append("a.created_time <= :endTime");
171 } 181 }
172 182
@@ -24,7 +24,7 @@ import java.util.Arrays; @@ -24,7 +24,7 @@ import java.util.Arrays;
24 24
25 @RunWith(ClasspathSuite.class) 25 @RunWith(ClasspathSuite.class)
26 @ClassnameFilters({ 26 @ClassnameFilters({
27 - "org.thingsboard.server.dao.service.sql.*SqlTest" 27 + "org.thingsboard.server.dao.service.sql.AlarmServiceSqlTest"
28 }) 28 })
29 public class SqlDaoServiceTestSuite { 29 public class SqlDaoServiceTestSuite {
30 30
@@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.query.EntityKey; @@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.query.EntityKey;
43 import org.thingsboard.server.common.data.query.EntityKeyType; 43 import org.thingsboard.server.common.data.query.EntityKeyType;
44 import org.thingsboard.server.common.data.relation.EntityRelation; 44 import org.thingsboard.server.common.data.relation.EntityRelation;
45 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 45 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
  46 +import org.thingsboard.server.dao.alarm.AlarmOperationResult;
46 47
47 import java.util.Arrays; 48 import java.util.Arrays;
48 import java.util.Collections; 49 import java.util.Collections;
@@ -84,7 +85,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -84,7 +85,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
84 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) 85 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
85 .startTs(ts).build(); 86 .startTs(ts).build();
86 87
87 - Alarm created = alarmService.createOrUpdateAlarm(alarm); 88 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(alarm);
  89 + Alarm created = result.getAlarm();
88 90
89 Assert.assertNotNull(created); 91 Assert.assertNotNull(created);
90 Assert.assertNotNull(created.getId()); 92 Assert.assertNotNull(created.getId());
@@ -122,7 +124,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -122,7 +124,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
122 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) 124 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
123 .startTs(ts).build(); 125 .startTs(ts).build();
124 126
125 - Alarm created = alarmService.createOrUpdateAlarm(alarm); 127 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(alarm);
  128 + Alarm created = result.getAlarm();
126 129
127 // Check child relation 130 // Check child relation
128 PageData<AlarmInfo> alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder() 131 PageData<AlarmInfo> alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder()
@@ -146,7 +149,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -146,7 +149,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
146 Assert.assertEquals(0, alarms.getData().size()); 149 Assert.assertEquals(0, alarms.getData().size());
147 150
148 created.setPropagate(true); 151 created.setPropagate(true);
149 - created = alarmService.createOrUpdateAlarm(created); 152 + result = alarmService.createOrUpdateAlarm(created);
  153 + created = result.getAlarm();
150 154
151 // Check child relation 155 // Check child relation
152 alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder() 156 alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder()
@@ -234,7 +238,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -234,7 +238,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
234 .propagate(true) 238 .propagate(true)
235 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) 239 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
236 .startTs(ts).build(); 240 .startTs(ts).build();
237 - tenantAlarm = alarmService.createOrUpdateAlarm(tenantAlarm); 241 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(tenantAlarm);
  242 + tenantAlarm = result.getAlarm();
238 243
239 Alarm deviceAlarm = Alarm.builder().tenantId(tenantId) 244 Alarm deviceAlarm = Alarm.builder().tenantId(tenantId)
240 .originator(customerDevice.getId()) 245 .originator(customerDevice.getId())
@@ -242,7 +247,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -242,7 +247,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
242 .propagate(true) 247 .propagate(true)
243 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) 248 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
244 .startTs(ts).build(); 249 .startTs(ts).build();
245 - deviceAlarm = alarmService.createOrUpdateAlarm(deviceAlarm); 250 + result = alarmService.createOrUpdateAlarm(deviceAlarm);
  251 + deviceAlarm = result.getAlarm();
246 252
247 AlarmDataPageLink pageLink = new AlarmDataPageLink(); 253 AlarmDataPageLink pageLink = new AlarmDataPageLink();
248 pageLink.setPage(0); 254 pageLink.setPage(0);
@@ -281,7 +287,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -281,7 +287,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
281 .status(AlarmStatus.ACTIVE_UNACK) 287 .status(AlarmStatus.ACTIVE_UNACK)
282 .startTs(ts).build(); 288 .startTs(ts).build();
283 289
284 - Alarm created = alarmService.createOrUpdateAlarm(alarm); 290 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(alarm);
  291 + Alarm created = result.getAlarm();
285 292
286 AlarmDataPageLink pageLink = new AlarmDataPageLink(); 293 AlarmDataPageLink pageLink = new AlarmDataPageLink();
287 pageLink.setPage(0); 294 pageLink.setPage(0);
@@ -321,7 +328,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -321,7 +328,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
321 Assert.assertEquals(created, new Alarm(alarms.getData().get(0))); 328 Assert.assertEquals(created, new Alarm(alarms.getData().get(0)));
322 329
323 created.setPropagate(true); 330 created.setPropagate(true);
324 - created = alarmService.createOrUpdateAlarm(created); 331 + result = alarmService.createOrUpdateAlarm(created);
  332 + created = result.getAlarm();
325 333
326 // Check child relation 334 // Check child relation
327 pageLink.setPage(0); 335 pageLink.setPage(0);
@@ -402,7 +410,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -402,7 +410,8 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
402 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) 410 .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
403 .startTs(ts).build(); 411 .startTs(ts).build();
404 412
405 - Alarm created = alarmService.createOrUpdateAlarm(alarm); 413 + AlarmOperationResult result = alarmService.createOrUpdateAlarm(alarm);
  414 + Alarm created = result.getAlarm();
406 415
407 PageData<AlarmInfo> alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder() 416 PageData<AlarmInfo> alarms = alarmService.findAlarms(tenantId, AlarmQuery.builder()
408 .affectedEntityId(childId) 417 .affectedEntityId(childId)
@@ -426,16 +435,16 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest { @@ -426,16 +435,16 @@ public abstract class BaseAlarmServiceTest extends AbstractServiceTest {
426 Assert.assertEquals(created, alarms.getData().get(0)); 435 Assert.assertEquals(created, alarms.getData().get(0));
427 436
428 List<EntityRelation> toAlarmRelations = relationService.findByTo(tenantId, created.getId(), RelationTypeGroup.ALARM); 437 List<EntityRelation> toAlarmRelations = relationService.findByTo(tenantId, created.getId(), RelationTypeGroup.ALARM);
429 - Assert.assertEquals(8, toAlarmRelations.size()); 438 + Assert.assertEquals(2, toAlarmRelations.size());
430 439
431 List<EntityRelation> fromChildRelations = relationService.findByFrom(tenantId, childId, RelationTypeGroup.ALARM); 440 List<EntityRelation> fromChildRelations = relationService.findByFrom(tenantId, childId, RelationTypeGroup.ALARM);
432 - Assert.assertEquals(4, fromChildRelations.size()); 441 + Assert.assertEquals(1, fromChildRelations.size());
433 442
434 - List<EntityRelation> fromParentRelations = relationService.findByFrom(tenantId, childId, RelationTypeGroup.ALARM);  
435 - Assert.assertEquals(4, fromParentRelations.size()); 443 + List<EntityRelation> fromParentRelations = relationService.findByFrom(tenantId, parentId, RelationTypeGroup.ALARM);
  444 + Assert.assertEquals(1, fromParentRelations.size());
436 445
437 446
438 - Assert.assertTrue("Alarm was not deleted when expected", alarmService.deleteAlarm(tenantId, created.getId())); 447 + Assert.assertTrue("Alarm was not deleted when expected", alarmService.deleteAlarm(tenantId, created.getId()).isSuccessful());
439 448
440 Alarm fetched = alarmService.findAlarmByIdAsync(tenantId, created.getId()).get(); 449 Alarm fetched = alarmService.findAlarmByIdAsync(tenantId, created.getId()).get();
441 450
@@ -54,4 +54,11 @@ public interface RuleEngineAlarmService { @@ -54,4 +54,11 @@ public interface RuleEngineAlarmService {
54 54
55 ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type); 55 ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
56 56
  57 + ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(TenantId tenantId, AlarmId alarmId);
  58 +
  59 + ListenableFuture<PageData<AlarmInfo>> findAlarms(TenantId tenantId, AlarmQuery query);
  60 +
  61 + AlarmSeverity findHighestAlarmSeverity(TenantId tenantId, EntityId entityId, AlarmSearchStatus alarmSearchStatus, AlarmStatus alarmStatus);
  62 +
  63 + PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId, CustomerId customerId, AlarmDataPageLink pageLink, Collection<EntityId> orderedEntityIds);
57 } 64 }