Commit ce0bebf9e8f7760193e0e3587f30093b38a25d30

Authored by Andrii Shvaika
1 parent c4a62703

Cluster mode for Alarms API

@@ -21,17 +21,22 @@ import org.springframework.scheduling.annotation.Scheduled; @@ -21,17 +21,22 @@ import org.springframework.scheduling.annotation.Scheduled;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 import org.thingsboard.rule.engine.api.RpcError; 22 import org.thingsboard.rule.engine.api.RpcError;
23 import org.thingsboard.server.actors.ActorSystemContext; 23 import org.thingsboard.server.actors.ActorSystemContext;
  24 +import org.thingsboard.server.common.data.alarm.Alarm;
24 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
25 import org.thingsboard.server.common.msg.MsgType; 26 import org.thingsboard.server.common.msg.MsgType;
26 import org.thingsboard.server.common.msg.TbActorMsg; 27 import org.thingsboard.server.common.msg.TbActorMsg;
27 import org.thingsboard.server.common.msg.queue.ServiceType; 28 import org.thingsboard.server.common.msg.queue.ServiceType;
28 import org.thingsboard.server.common.msg.queue.TbCallback; 29 import org.thingsboard.server.common.msg.queue.TbCallback;
  30 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  31 +import org.thingsboard.server.gen.transport.TransportProtos;
29 import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; 32 import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
30 import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; 33 import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
31 import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; 34 import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
32 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; 35 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
33 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; 36 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
34 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; 37 import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
35 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; 40 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
36 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; 41 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
37 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 42 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@@ -236,6 +241,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -236,6 +241,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
236 private void forwardToLocalSubMgrService(LocalSubscriptionServiceMsgProto msg, TbCallback callback) { 241 private void forwardToLocalSubMgrService(LocalSubscriptionServiceMsgProto msg, TbCallback callback) {
237 if (msg.hasSubUpdate()) { 242 if (msg.hasSubUpdate()) {
238 localSubscriptionService.onSubscriptionUpdate(msg.getSubUpdate().getSessionId(), TbSubscriptionUtils.fromProto(msg.getSubUpdate()), callback); 243 localSubscriptionService.onSubscriptionUpdate(msg.getSubUpdate().getSessionId(), TbSubscriptionUtils.fromProto(msg.getSubUpdate()), callback);
  244 + } else if (msg.hasAlarmSubUpdate()) {
  245 + localSubscriptionService.onSubscriptionUpdate(msg.getAlarmSubUpdate().getSessionId(), TbSubscriptionUtils.fromProto(msg.getAlarmSubUpdate()), callback);
239 } else { 246 } else {
240 throwNotHandled(msg, callback); 247 throwNotHandled(msg, callback);
241 } 248 }
@@ -246,6 +253,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -246,6 +253,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
246 subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback); 253 subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback);
247 } else if (msg.hasTelemetrySub()) { 254 } else if (msg.hasTelemetrySub()) {
248 subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getTelemetrySub()), callback); 255 subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getTelemetrySub()), callback);
  256 + } else if (msg.hasAlarmSub()) {
  257 + subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAlarmSub()), callback);
249 } else if (msg.hasSubClose()) { 258 } else if (msg.hasSubClose()) {
250 TbSubscriptionCloseProto closeProto = msg.getSubClose(); 259 TbSubscriptionCloseProto closeProto = msg.getSubClose();
251 subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback); 260 subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback);
@@ -267,6 +276,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -267,6 +276,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
267 new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), 276 new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
268 TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()), 277 TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
269 proto.getScope(), proto.getKeysList(), callback); 278 proto.getScope(), proto.getKeysList(), callback);
  279 + } else if (msg.hasAlarmUpdate()) {
  280 + TbAlarmUpdateProto proto = msg.getAlarmUpdate();
  281 + subscriptionManagerService.onAlarmUpdate(
  282 + new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
  283 + TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
  284 + JacksonUtil.fromString(proto.getAlarm(), Alarm.class), callback);
  285 + } else if (msg.hasAlarmDelete()) {
  286 + TbAlarmDeleteProto proto = msg.getAlarmDelete();
  287 + subscriptionManagerService.onAlarmDeleted(
  288 + new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
  289 + TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
  290 + JacksonUtil.fromString(proto.getAlarm(), Alarm.class), callback);
270 } else { 291 } else {
271 throwNotHandled(msg, callback); 292 throwNotHandled(msg, callback);
272 } 293 }
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; @@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
39 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 39 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
40 import org.thingsboard.server.dao.attributes.AttributesService; 40 import org.thingsboard.server.dao.attributes.AttributesService;
41 import org.thingsboard.server.dao.timeseries.TimeseriesService; 41 import org.thingsboard.server.dao.timeseries.TimeseriesService;
  42 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
42 import org.thingsboard.server.gen.transport.TransportProtos.*; 43 import org.thingsboard.server.gen.transport.TransportProtos.*;
43 import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; 44 import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
44 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; 45 import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
@@ -189,7 +190,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -189,7 +190,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
189 removedPartitions.forEach(partition -> { 190 removedPartitions.forEach(partition -> {
190 Set<TbSubscription> subs = partitionedSubscriptions.remove(partition); 191 Set<TbSubscription> subs = partitionedSubscriptions.remove(partition);
191 if (subs != null) { 192 if (subs != null) {
192 - subs.forEach(this::removeSubscriptionFromEntityMap); 193 + subs.forEach(sub -> {
  194 + if (!serviceId.equals(sub.getServiceId())) {
  195 + removeSubscriptionFromEntityMap(sub);
  196 + }
  197 + });
193 } 198 }
194 }); 199 });
195 } 200 }
@@ -355,7 +360,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -355,7 +360,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
355 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); 360 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
356 } else { 361 } else {
357 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); 362 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
358 - toCoreNotificationsProducer.send(tpi, toProto(s, alarm), null); 363 + toCoreNotificationsProducer.send(tpi, toProto(s, alarm, deleted), null);
359 } 364 }
360 } 365 }
361 }); 366 });
@@ -471,19 +476,19 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @@ -471,19 +476,19 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
471 return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); 476 return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
472 } 477 }
473 478
474 - private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, Alarm alarm) {  
475 - TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder(); 479 + private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, Alarm alarm, boolean deleted) {
  480 + TbAlarmSubscriptionUpdateProto.Builder builder = TbAlarmSubscriptionUpdateProto.newBuilder();
476 481
477 builder.setSessionId(subscription.getSessionId()); 482 builder.setSessionId(subscription.getSessionId());
478 builder.setSubscriptionId(subscription.getSubscriptionId()); 483 builder.setSubscriptionId(subscription.getSubscriptionId());
  484 + builder.setAlarm(JacksonUtil.toString(alarm));
  485 + builder.setDeleted(deleted);
479 486
480 - //TODO 3.1  
481 - throw new RuntimeException("Not implemented!");  
482 -//  
483 -// ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(  
484 -// LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build())  
485 -// .build();  
486 -// return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg); 487 + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
  488 + LocalSubscriptionServiceMsgProto.newBuilder()
  489 + .setAlarmSubUpdate(builder.build()).build())
  490 + .build();
  491 + return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
487 } 492 }
488 493
489 } 494 }
@@ -251,9 +251,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -251,9 +251,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
251 ctx.cancelTasks(); 251 ctx.cancelTasks();
252 ctx.clearEntitySubscriptions(); 252 ctx.clearEntitySubscriptions();
253 if (entities.isEmpty()) { 253 if (entities.isEmpty()) {
254 - AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(),  
255 - new PageData<>(Collections.emptyList(), 1, 0, false),  
256 - null, false); 254 + AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, false);
257 wsService.sendWsMsg(ctx.getSessionId(), update); 255 wsService.sendWsMsg(ctx.getSessionId(), update);
258 } else { 256 } else {
259 ctx.fetchAlarms(); 257 ctx.fetchAlarms();
@@ -87,14 +87,19 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -87,14 +87,19 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
87 } 87 }
88 88
89 public void fetchAlarms() { 89 public void fetchAlarms() {
90 - long start = System.currentTimeMillis();  
91 - PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(),  
92 - query, getOrderedEntityIds());  
93 - long end = System.currentTimeMillis();  
94 - stats.getAlarmQueryInvocationCnt().incrementAndGet();  
95 - stats.getAlarmQueryTimeSpent().addAndGet(end - start);  
96 - alarms = setAndMergeAlarmsData(alarms);  
97 - AlarmDataUpdate update = new AlarmDataUpdate(cmdId, alarms, null, tooManyEntities); 90 + AlarmDataUpdate update;
  91 + if(!entitiesMap.isEmpty()) {
  92 + long start = System.currentTimeMillis();
  93 + PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(),
  94 + query, getOrderedEntityIds());
  95 + long end = System.currentTimeMillis();
  96 + stats.getAlarmQueryInvocationCnt().incrementAndGet();
  97 + stats.getAlarmQueryTimeSpent().addAndGet(end - start);
  98 + alarms = setAndMergeAlarmsData(alarms);
  99 + update = new AlarmDataUpdate(cmdId, alarms, null, tooManyEntities);
  100 + } else {
  101 + update = new AlarmDataUpdate(cmdId, new PageData<>(), null, false);
  102 + }
98 wsService.sendWsMsg(getSessionId(), update); 103 wsService.sendWsMsg(getSessionId(), update);
99 } 104 }
100 105
@@ -146,7 +151,6 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> { @@ -146,7 +151,6 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
146 subToEntityIdMap.put(subIdx, entityData.getEntityId()); 151 subToEntityIdMap.put(subIdx, entityData.getEntityId());
147 log.trace("[{}][{}][{}] Creating alarms subscription for [{}] with query: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), pageLink); 152 log.trace("[{}][{}][{}] Creating alarms subscription for [{}] with query: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), pageLink);
148 TbAlarmsSubscription subscription = TbAlarmsSubscription.builder() 153 TbAlarmsSubscription subscription = TbAlarmsSubscription.builder()
149 - .type(TbSubscriptionType.ALARMS)  
150 .serviceId(serviceId) 154 .serviceId(serviceId)
151 .sessionId(sessionRef.getSessionId()) 155 .sessionId(sessionRef.getSessionId())
152 .subscriptionId(subIdx) 156 .subscriptionId(subIdx)
@@ -33,8 +33,8 @@ public class TbAlarmsSubscription extends TbSubscription<AlarmSubscriptionUpdate @@ -33,8 +33,8 @@ public class TbAlarmsSubscription extends TbSubscription<AlarmSubscriptionUpdate
33 33
34 @Builder 34 @Builder
35 public TbAlarmsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId, 35 public TbAlarmsSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
36 - TbSubscriptionType type, BiConsumer<String, AlarmSubscriptionUpdate> updateConsumer, long ts) {  
37 - super(serviceId, sessionId, subscriptionId, tenantId, entityId, type, updateConsumer); 36 + BiConsumer<String, AlarmSubscriptionUpdate> updateConsumer, long ts) {
  37 + super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ALARMS, updateConsumer);
38 this.ts = ts; 38 this.ts = ts;
39 } 39 }
40 40
@@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.kv.KvEntry; @@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.kv.KvEntry;
30 import org.thingsboard.server.common.data.kv.LongDataEntry; 30 import org.thingsboard.server.common.data.kv.LongDataEntry;
31 import org.thingsboard.server.common.data.kv.StringDataEntry; 31 import org.thingsboard.server.common.data.kv.StringDataEntry;
32 import org.thingsboard.server.common.data.kv.TsKvEntry; 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
  33 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  34 +import org.thingsboard.server.gen.transport.TransportProtos;
33 import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; 35 import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
34 import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; 36 import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
35 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; 37 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
@@ -44,6 +46,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscrip @@ -44,6 +46,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscrip
44 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; 46 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
45 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 47 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
46 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; 48 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
  49 +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
  50 +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
  51 +import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
47 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; 52 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
48 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; 53 import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
49 54
@@ -90,6 +95,13 @@ public class TbSubscriptionUtils { @@ -90,6 +95,13 @@ public class TbSubscriptionUtils {
90 TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build())); 95 TbSubscriptionKetStateProto.newBuilder().setKey(key).setTs(value).build()));
91 msgBuilder.setAttributeSub(aSubProto.build()); 96 msgBuilder.setAttributeSub(aSubProto.build());
92 break; 97 break;
  98 + case ALARMS:
  99 + TbAlarmsSubscription alarmSub = (TbAlarmsSubscription) subscription;
  100 + TransportProtos.TbAlarmSubscriptionProto.Builder alarmSubProto = TransportProtos.TbAlarmSubscriptionProto.newBuilder()
  101 + .setSub(subscriptionProto)
  102 + .setTs(alarmSub.getTs());
  103 + msgBuilder.setAlarmSub(alarmSubProto.build());
  104 + break;
93 } 105 }
94 return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); 106 return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
95 } 107 }
@@ -138,6 +150,18 @@ public class TbSubscriptionUtils { @@ -138,6 +150,18 @@ public class TbSubscriptionUtils {
138 return builder.build(); 150 return builder.build();
139 } 151 }
140 152
  153 + public static TbSubscription fromProto(TransportProtos.TbAlarmSubscriptionProto alarmSub) {
  154 + TbSubscriptionProto subProto = alarmSub.getSub();
  155 + TbAlarmsSubscription.TbAlarmsSubscriptionBuilder builder = TbAlarmsSubscription.builder()
  156 + .serviceId(subProto.getServiceId())
  157 + .sessionId(subProto.getSessionId())
  158 + .subscriptionId(subProto.getSubscriptionId())
  159 + .entityId(EntityIdFactory.getByTypeAndUuid(subProto.getEntityType(), new UUID(subProto.getEntityIdMSB(), subProto.getEntityIdLSB())))
  160 + .tenantId(new TenantId(new UUID(subProto.getTenantIdMSB(), subProto.getTenantIdLSB())));
  161 + builder.ts(alarmSub.getTs());
  162 + return builder.build();
  163 + }
  164 +
141 public static TelemetrySubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) { 165 public static TelemetrySubscriptionUpdate fromProto(TbSubscriptionUpdateProto proto) {
142 if (proto.getErrorCode() > 0) { 166 if (proto.getErrorCode() > 0) {
143 return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); 167 return new TelemetrySubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
@@ -156,6 +180,16 @@ public class TbSubscriptionUtils { @@ -156,6 +180,16 @@ public class TbSubscriptionUtils {
156 } 180 }
157 } 181 }
158 182
  183 + public static AlarmSubscriptionUpdate fromProto(TransportProtos.TbAlarmSubscriptionUpdateProto proto) {
  184 + if (proto.getErrorCode() > 0) {
  185 + return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
  186 + } else {
  187 + Alarm alarm = JacksonUtil.fromString(proto.getAlarm(), Alarm.class);
  188 + return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), alarm);
  189 + }
  190 + }
  191 +
  192 +
159 public static ToCoreMsg toTimeseriesUpdateProto(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) { 193 public static ToCoreMsg toTimeseriesUpdateProto(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
160 TbTimeSeriesUpdateProto.Builder builder = TbTimeSeriesUpdateProto.newBuilder(); 194 TbTimeSeriesUpdateProto.Builder builder = TbTimeSeriesUpdateProto.newBuilder();
161 builder.setEntityType(entityId.getEntityType().name()); 195 builder.setEntityType(entityId.getEntityType().name());
@@ -264,12 +298,28 @@ public class TbSubscriptionUtils { @@ -264,12 +298,28 @@ public class TbSubscriptionUtils {
264 } 298 }
265 299
266 public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, Alarm alarm) { 300 public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, Alarm alarm) {
267 -// TODO: 3.1  
268 - throw new RuntimeException("Not implemented!"); 301 + TbAlarmUpdateProto.Builder builder = TbAlarmUpdateProto.newBuilder();
  302 + builder.setEntityType(entityId.getEntityType().name());
  303 + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
  304 + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
  305 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  306 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  307 + builder.setAlarm(JacksonUtil.toString(alarm));
  308 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  309 + msgBuilder.setAlarmUpdate(builder);
  310 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
269 } 311 }
270 312
271 public static ToCoreMsg toAlarmDeletedProto(TenantId tenantId, EntityId entityId, Alarm alarm) { 313 public static ToCoreMsg toAlarmDeletedProto(TenantId tenantId, EntityId entityId, Alarm alarm) {
272 -// TODO: 3.1  
273 - throw new RuntimeException("Not implemented!"); 314 + TbAlarmDeleteProto.Builder builder = TbAlarmDeleteProto.newBuilder();
  315 + builder.setEntityType(entityId.getEntityType().name());
  316 + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
  317 + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
  318 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  319 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  320 + builder.setAlarm(JacksonUtil.toString(alarm));
  321 + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
  322 + msgBuilder.setAlarmDelete(builder);
  323 + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
274 } 324 }
275 } 325 }
@@ -340,28 +340,28 @@ caffeine: @@ -340,28 +340,28 @@ caffeine:
340 specs: 340 specs:
341 relations: 341 relations:
342 timeToLiveInMinutes: 1440 342 timeToLiveInMinutes: 1440
343 - maxSize: 100000 343 + maxSize: 0
344 deviceCredentials: 344 deviceCredentials:
345 timeToLiveInMinutes: 1440 345 timeToLiveInMinutes: 1440
346 - maxSize: 100000 346 + maxSize: 0
347 devices: 347 devices:
348 timeToLiveInMinutes: 1440 348 timeToLiveInMinutes: 1440
349 - maxSize: 100000 349 + maxSize: 0
350 sessions: 350 sessions:
351 timeToLiveInMinutes: 1440 351 timeToLiveInMinutes: 1440
352 - maxSize: 100000 352 + maxSize: 0
353 assets: 353 assets:
354 timeToLiveInMinutes: 1440 354 timeToLiveInMinutes: 1440
355 - maxSize: 100000 355 + maxSize: 0
356 entityViews: 356 entityViews:
357 timeToLiveInMinutes: 1440 357 timeToLiveInMinutes: 1440
358 - maxSize: 100000 358 + maxSize: 0
359 claimDevices: 359 claimDevices:
360 timeToLiveInMinutes: 1 360 timeToLiveInMinutes: 1
361 - maxSize: 100000 361 + maxSize: 0
362 securitySettings: 362 securitySettings:
363 timeToLiveInMinutes: 1440 363 timeToLiveInMinutes: 1440
364 - maxSize: 1 364 + maxSize: 0
365 365
366 redis: 366 redis:
367 # standalone or cluster 367 # standalone or cluster
@@ -277,6 +277,11 @@ message TbAttributeSubscriptionProto { @@ -277,6 +277,11 @@ message TbAttributeSubscriptionProto {
277 string scope = 4; 277 string scope = 4;
278 } 278 }
279 279
  280 +message TbAlarmSubscriptionProto {
  281 + TbSubscriptionProto sub = 1;
  282 + int64 ts = 2;
  283 +}
  284 +
280 message TbSubscriptionUpdateProto { 285 message TbSubscriptionUpdateProto {
281 string sessionId = 1; 286 string sessionId = 1;
282 int32 subscriptionId = 2; 287 int32 subscriptionId = 2;
@@ -285,6 +290,15 @@ message TbSubscriptionUpdateProto { @@ -285,6 +290,15 @@ message TbSubscriptionUpdateProto {
285 repeated TbSubscriptionUpdateValueListProto data = 5; 290 repeated TbSubscriptionUpdateValueListProto data = 5;
286 } 291 }
287 292
  293 +message TbAlarmSubscriptionUpdateProto {
  294 + string sessionId = 1;
  295 + int32 subscriptionId = 2;
  296 + int32 errorCode = 3;
  297 + string errorMsg = 4;
  298 + string alarm = 5;
  299 + bool deleted = 6;
  300 +}
  301 +
288 message TbAttributeUpdateProto { 302 message TbAttributeUpdateProto {
289 string entityType = 1; 303 string entityType = 1;
290 int64 entityIdMSB = 2; 304 int64 entityIdMSB = 2;
@@ -295,6 +309,24 @@ message TbAttributeUpdateProto { @@ -295,6 +309,24 @@ message TbAttributeUpdateProto {
295 repeated TsKvProto data = 7; 309 repeated TsKvProto data = 7;
296 } 310 }
297 311
  312 +message TbAlarmUpdateProto {
  313 + string entityType = 1;
  314 + int64 entityIdMSB = 2;
  315 + int64 entityIdLSB = 3;
  316 + int64 tenantIdMSB = 4;
  317 + int64 tenantIdLSB = 5;
  318 + string alarm = 6;
  319 +}
  320 +
  321 +message TbAlarmDeleteProto {
  322 + string entityType = 1;
  323 + int64 entityIdMSB = 2;
  324 + int64 entityIdLSB = 3;
  325 + int64 tenantIdMSB = 4;
  326 + int64 tenantIdLSB = 5;
  327 + string alarm = 6;
  328 +}
  329 +
298 message TbAttributeDeleteProto { 330 message TbAttributeDeleteProto {
299 string entityType = 1; 331 string entityType = 1;
300 int64 entityIdMSB = 2; 332 int64 entityIdMSB = 2;
@@ -351,10 +383,14 @@ message SubscriptionMgrMsgProto { @@ -351,10 +383,14 @@ message SubscriptionMgrMsgProto {
351 TbTimeSeriesUpdateProto tsUpdate = 4; 383 TbTimeSeriesUpdateProto tsUpdate = 4;
352 TbAttributeUpdateProto attrUpdate = 5; 384 TbAttributeUpdateProto attrUpdate = 5;
353 TbAttributeDeleteProto attrDelete = 6; 385 TbAttributeDeleteProto attrDelete = 6;
  386 + TbAlarmSubscriptionProto alarmSub = 7;
  387 + TbAlarmUpdateProto alarmUpdate = 8;
  388 + TbAlarmDeleteProto alarmDelete = 9;
354 } 389 }
355 390
356 message LocalSubscriptionServiceMsgProto { 391 message LocalSubscriptionServiceMsgProto {
357 TbSubscriptionUpdateProto subUpdate = 1; 392 TbSubscriptionUpdateProto subUpdate = 1;
  393 + TbAlarmSubscriptionUpdateProto alarmSubUpdate = 2;
358 } 394 }
359 395
360 message FromDeviceRPCResponseProto { 396 message FromDeviceRPCResponseProto {