Commit 44377d56caae07fadb5ad4d61bd34b99ae74998f

Authored by Volodymyr Babak
1 parent acfaf680

Added alarm push from edge to cloud

@@ -186,15 +186,26 @@ class DefaultTbContext implements TbContext { @@ -186,15 +186,26 @@ class DefaultTbContext implements TbContext {
186 } 186 }
187 187
188 public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) { 188 public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) {
  189 + return alarmMsg(alarm, ruleNodeId, DataConstants.ENTITY_CREATED);
  190 + }
  191 +
  192 + public TbMsg alarmUpdatedMsg(Alarm alarm, RuleNodeId ruleNodeId) {
  193 + return alarmMsg(alarm, ruleNodeId, DataConstants.ENTITY_UPDATED);
  194 + }
  195 +
  196 + public TbMsg alarmClearedMsg(Alarm alarm, RuleNodeId ruleNodeId) {
  197 + return alarmMsg(alarm, ruleNodeId, DataConstants.ALARM_CLEAR);
  198 + }
  199 +
  200 + private TbMsg alarmMsg(Alarm alarm, RuleNodeId ruleNodeId, String type) {
189 try { 201 try {
190 ObjectNode entityNode = mapper.valueToTree(alarm); 202 ObjectNode entityNode = mapper.valueToTree(alarm);
191 - return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); 203 + return new TbMsg(UUIDs.timeBased(), type, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L);
192 } catch (JsonProcessingException | IllegalArgumentException e) { 204 } catch (JsonProcessingException | IllegalArgumentException e) {
193 - throw new RuntimeException("Failed to process alarm created msg: " + e); 205 + throw new RuntimeException("Failed to process alarm created, updated or cleared msg: " + e);
194 } 206 }
195 } 207 }
196 208
197 -  
198 @Override 209 @Override
199 public RuleNodeId getSelfId() { 210 public RuleNodeId getSelfId() {
200 return nodeCtx.getSelf().getId(); 211 return nodeCtx.getSelf().getId();
@@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.context.annotation.Lazy; 20 import org.springframework.context.annotation.Lazy;
21 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
22 import org.thingsboard.server.actors.service.ActorService; 22 import org.thingsboard.server.actors.service.ActorService;
  23 +import org.thingsboard.server.dao.alarm.AlarmService;
23 import org.thingsboard.server.dao.asset.AssetService; 24 import org.thingsboard.server.dao.asset.AssetService;
24 import org.thingsboard.server.dao.attributes.AttributesService; 25 import org.thingsboard.server.dao.attributes.AttributesService;
25 import org.thingsboard.server.dao.customer.CustomerService; 26 import org.thingsboard.server.dao.customer.CustomerService;
@@ -62,5 +63,9 @@ public class EdgeContextComponent { @@ -62,5 +63,9 @@ public class EdgeContextComponent {
62 63
63 @Lazy 64 @Lazy
64 @Autowired 65 @Autowired
  66 + private AlarmService alarmService;
  67 +
  68 + @Lazy
  69 + @Autowired
65 private ActorService actorService; 70 private ActorService actorService;
66 } 71 }
@@ -32,6 +32,8 @@ import org.thingsboard.server.common.data.EntityType; @@ -32,6 +32,8 @@ import org.thingsboard.server.common.data.EntityType;
32 import org.thingsboard.server.common.data.EntityView; 32 import org.thingsboard.server.common.data.EntityView;
33 import org.thingsboard.server.common.data.Event; 33 import org.thingsboard.server.common.data.Event;
34 import org.thingsboard.server.common.data.alarm.Alarm; 34 import org.thingsboard.server.common.data.alarm.Alarm;
  35 +import org.thingsboard.server.common.data.alarm.AlarmSeverity;
  36 +import org.thingsboard.server.common.data.alarm.AlarmStatus;
35 import org.thingsboard.server.common.data.asset.Asset; 37 import org.thingsboard.server.common.data.asset.Asset;
36 import org.thingsboard.server.common.data.edge.Edge; 38 import org.thingsboard.server.common.data.edge.Edge;
37 import org.thingsboard.server.common.data.edge.EdgeQueueEntry; 39 import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
@@ -180,7 +182,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -180,7 +182,7 @@ public final class EdgeGrpcSession implements Cloneable {
180 case ENTITY_UPDATED_RPC_MESSAGE: 182 case ENTITY_UPDATED_RPC_MESSAGE:
181 case ENTITY_CREATED_RPC_MESSAGE: 183 case ENTITY_CREATED_RPC_MESSAGE:
182 case ALARM_ACK_RPC_MESSAGE: 184 case ALARM_ACK_RPC_MESSAGE:
183 - case ALARM_CLEARK_RPC_MESSAGE: 185 + case ALARM_CLEAR_RPC_MESSAGE:
184 processEntityCRUDMessage(entry, msgType); 186 processEntityCRUDMessage(entry, msgType);
185 break; 187 break;
186 case RULE_CHAIN_CUSTOM_MESSAGE: 188 case RULE_CHAIN_CUSTOM_MESSAGE:
@@ -379,7 +381,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -379,7 +381,7 @@ public final class EdgeGrpcSession implements Cloneable {
379 AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder() 381 AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
380 .setMsgType(msgType) 382 .setMsgType(msgType)
381 .setName(alarm.getName()) 383 .setName(alarm.getName())
382 - .setType(alarm.getName()) 384 + .setType(alarm.getType())
383 .setOriginatorName(entityName) 385 .setOriginatorName(entityName)
384 .setOriginatorType(alarm.getOriginator().getEntityType().name()) 386 .setOriginatorType(alarm.getOriginator().getEntityType().name())
385 .setSeverity(alarm.getSeverity().name()) 387 .setSeverity(alarm.getSeverity().name())
@@ -409,6 +411,10 @@ public final class EdgeGrpcSession implements Cloneable { @@ -409,6 +411,10 @@ public final class EdgeGrpcSession implements Cloneable {
409 case DataConstants.ENTITY_DELETED: 411 case DataConstants.ENTITY_DELETED:
410 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: 412 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
411 return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; 413 return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
  414 + case DataConstants.ALARM_ACK:
  415 + return UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
  416 + case DataConstants.ALARM_CLEAR:
  417 + return UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE;
412 default: 418 default:
413 throw new RuntimeException("Unsupported msgType [" + msgType + "]"); 419 throw new RuntimeException("Unsupported msgType [" + msgType + "]");
414 } 420 }
@@ -532,7 +538,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -532,7 +538,7 @@ public final class EdgeGrpcSession implements Cloneable {
532 DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() 538 DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
533 .setMsgType(msgType) 539 .setMsgType(msgType)
534 .setName(device.getName()) 540 .setName(device.getName())
535 - .setType(device.getName()); 541 + .setType(device.getType());
536 return builder.build(); 542 return builder.build();
537 } 543 }
538 544
@@ -540,7 +546,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -540,7 +546,7 @@ public final class EdgeGrpcSession implements Cloneable {
540 AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder() 546 AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder()
541 .setMsgType(msgType) 547 .setMsgType(msgType)
542 .setName(asset.getName()) 548 .setName(asset.getName())
543 - .setType(asset.getName()); 549 + .setType(asset.getType());
544 return builder.build(); 550 return builder.build();
545 } 551 }
546 552
@@ -562,7 +568,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -562,7 +568,7 @@ public final class EdgeGrpcSession implements Cloneable {
562 EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder() 568 EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder()
563 .setMsgType(msgType) 569 .setMsgType(msgType)
564 .setName(entityView.getName()) 570 .setName(entityView.getName())
565 - .setType(entityView.getName()) 571 + .setType(entityView.getType())
566 .setRelatedName(relatedName) 572 .setRelatedName(relatedName)
567 .setRelatedType(relatedType) 573 .setRelatedType(relatedType)
568 .setRelatedEntityType(relatedEntityType); 574 .setRelatedEntityType(relatedEntityType);
@@ -618,6 +624,11 @@ public final class EdgeGrpcSession implements Cloneable { @@ -618,6 +624,11 @@ public final class EdgeGrpcSession implements Cloneable {
618 } 624 }
619 } 625 }
620 } 626 }
  627 + if (uplinkMsg.getAlarmUpdatemsgList() != null && !uplinkMsg.getAlarmUpdatemsgList().isEmpty()) {
  628 + for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdatemsgList()) {
  629 + onAlarmUpdate(alarmUpdateMsg);
  630 + }
  631 + }
621 } catch (Exception e) { 632 } catch (Exception e) {
622 return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build(); 633 return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build();
623 } 634 }
@@ -625,6 +636,65 @@ public final class EdgeGrpcSession implements Cloneable { @@ -625,6 +636,65 @@ public final class EdgeGrpcSession implements Cloneable {
625 return UplinkResponseMsg.newBuilder().setSuccess(true).build(); 636 return UplinkResponseMsg.newBuilder().setSuccess(true).build();
626 } 637 }
627 638
  639 + private EntityId getAlarmOriginator(String entityName, org.thingsboard.server.common.data.EntityType entityType) {
  640 + switch (entityType) {
  641 + case DEVICE:
  642 + return ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), entityName).getId();
  643 + case ASSET:
  644 + return ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), entityName).getId();
  645 + case ENTITY_VIEW:
  646 + return ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityName).getId();
  647 + default:
  648 + return null;
  649 + }
  650 + }
  651 +
  652 + private void onAlarmUpdate(AlarmUpdateMsg alarmUpdateMsg) {
  653 + EntityId originatorId = getAlarmOriginator(alarmUpdateMsg.getOriginatorName(), org.thingsboard.server.common.data.EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
  654 + if (originatorId != null) {
  655 + try {
  656 + Alarm existentAlarm = ctx.getAlarmService().findLatestByOriginatorAndType(edge.getTenantId(), originatorId, alarmUpdateMsg.getType()).get();
  657 + switch (alarmUpdateMsg.getMsgType()) {
  658 + case ENTITY_CREATED_RPC_MESSAGE:
  659 + case ENTITY_UPDATED_RPC_MESSAGE:
  660 + if (existentAlarm == null) {
  661 + existentAlarm = new Alarm();
  662 + existentAlarm.setTenantId(edge.getTenantId());
  663 + existentAlarm.setType(alarmUpdateMsg.getName());
  664 + existentAlarm.setOriginator(originatorId);
  665 + existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity()));
  666 + existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus()));
  667 + existentAlarm.setStartTs(alarmUpdateMsg.getStartTs());
  668 + existentAlarm.setAckTs(alarmUpdateMsg.getAckTs());
  669 + existentAlarm.setClearTs(alarmUpdateMsg.getClearTs());
  670 + existentAlarm.setPropagate(alarmUpdateMsg.getPropagate());
  671 + }
  672 + existentAlarm.setEndTs(alarmUpdateMsg.getEndTs());
  673 + existentAlarm.setDetails(objectMapper.readTree(alarmUpdateMsg.getDetails()));
  674 + ctx.getAlarmService().createOrUpdateAlarm(existentAlarm);
  675 + break;
  676 + case ALARM_ACK_RPC_MESSAGE:
  677 + if (existentAlarm != null) {
  678 + ctx.getAlarmService().ackAlarm(edge.getTenantId(), existentAlarm.getId(), alarmUpdateMsg.getAckTs());
  679 + }
  680 + break;
  681 + case ALARM_CLEAR_RPC_MESSAGE:
  682 + if (existentAlarm != null) {
  683 + ctx.getAlarmService().clearAlarm(edge.getTenantId(), existentAlarm.getId(), objectMapper.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs());
  684 + }
  685 + break;
  686 + case ENTITY_DELETED_RPC_MESSAGE:
  687 + if (existentAlarm != null) {
  688 + ctx.getAlarmService().deleteAlarm(edge.getTenantId(), existentAlarm.getId());
  689 + }
  690 + break;
  691 + }
  692 + } catch (Exception e) {
  693 + log.error("Error during finding existent alarm", e);
  694 + }
  695 + }
  696 + }
  697 +
628 private Device getOrCreateDevice(String deviceName, String deviceType) { 698 private Device getOrCreateDevice(String deviceName, String deviceType) {
629 Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName); 699 Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
630 if (device == null) { 700 if (device == null) {
@@ -647,6 +717,7 @@ public final class EdgeGrpcSession implements Cloneable { @@ -647,6 +717,7 @@ public final class EdgeGrpcSession implements Cloneable {
647 device.setTenantId(edge.getTenantId()); 717 device.setTenantId(edge.getTenantId());
648 device.setCustomerId(edge.getCustomerId()); 718 device.setCustomerId(edge.getCustomerId());
649 device = ctx.getDeviceService().saveDevice(device); 719 device = ctx.getDeviceService().saveDevice(device);
  720 + device = ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
650 createRelationFromEdge(device.getId()); 721 createRelationFromEdge(device.getId());
651 ctx.getActorService().onDeviceAdded(device); 722 ctx.getActorService().onDeviceAdded(device);
652 pushDeviceCreatedEventToRuleEngine(device); 723 pushDeviceCreatedEventToRuleEngine(device);
@@ -26,6 +26,7 @@ @@ -26,6 +26,7 @@
26 </appender> 26 </appender>
27 27
28 <logger name="org.thingsboard.server" level="INFO" /> 28 <logger name="org.thingsboard.server" level="INFO" />
  29 + <logger name="org.thingsboard.server.service.edge" level="TRACE" />
29 <logger name="akka" level="INFO" /> 30 <logger name="akka" level="INFO" />
30 31
31 <root level="INFO"> 32 <root level="INFO">
@@ -89,7 +89,7 @@ enum UpdateMsgType { @@ -89,7 +89,7 @@ enum UpdateMsgType {
89 ENTITY_UPDATED_RPC_MESSAGE = 1; 89 ENTITY_UPDATED_RPC_MESSAGE = 1;
90 ENTITY_DELETED_RPC_MESSAGE = 2; 90 ENTITY_DELETED_RPC_MESSAGE = 2;
91 ALARM_ACK_RPC_MESSAGE = 3; 91 ALARM_ACK_RPC_MESSAGE = 3;
92 - ALARM_CLEARK_RPC_MESSAGE = 4; 92 + ALARM_CLEAR_RPC_MESSAGE = 4;
93 RULE_CHAIN_CUSTOM_MESSAGE = 5; 93 RULE_CHAIN_CUSTOM_MESSAGE = 5;
94 } 94 }
95 95
@@ -79,6 +79,10 @@ public interface TbContext { @@ -79,6 +79,10 @@ public interface TbContext {
79 79
80 TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId); 80 TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId);
81 81
  82 + TbMsg alarmUpdatedMsg(Alarm alarm, RuleNodeId ruleNodeId);
  83 +
  84 + TbMsg alarmClearedMsg(Alarm alarm, RuleNodeId ruleNodeId);
  85 +
82 RuleNodeId getSelfId(); 86 RuleNodeId getSelfId();
83 87
84 TenantId getTenantId(); 88 TenantId getTenantId();
@@ -63,8 +63,10 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -63,8 +63,10 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
63 ctx.sendTbMsgToRuleEngine(ctx.alarmCreatedMsg(alarmResult.alarm, ctx.getSelfId())); 63 ctx.sendTbMsgToRuleEngine(ctx.alarmCreatedMsg(alarmResult.alarm, ctx.getSelfId()));
64 } else if (alarmResult.isUpdated) { 64 } else if (alarmResult.isUpdated) {
65 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated"); 65 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
  66 + ctx.sendTbMsgToRuleEngine(ctx.alarmUpdatedMsg(alarmResult.alarm, ctx.getSelfId()));
66 } else if (alarmResult.isCleared) { 67 } else if (alarmResult.isCleared) {
67 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); 68 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
  69 + ctx.sendTbMsgToRuleEngine(ctx.alarmClearedMsg(alarmResult.alarm, ctx.getSelfId()));
68 } 70 }
69 }, 71 },
70 t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); 72 t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());