Commit 9e1d86d7e8c4837738d5ce696c5cc88b78edee16

Authored by Viacheslav Klimov
1 parent 6c6f9b20

Refactor

... ... @@ -110,8 +110,11 @@ public class AlarmController extends BaseController {
110 110 checkParameter(ALARM_ID, strAlarmId);
111 111 try {
112 112 AlarmId alarmId = new AlarmId(toUUID(strAlarmId));
113   - checkAlarmId(alarmId, Operation.WRITE);
  113 + Alarm alarm = checkAlarmId(alarmId, Operation.WRITE);
114 114
  115 + logEntityAction(alarm.getOriginator(), alarm,
  116 + getCurrentUser().getCustomerId(),
  117 + ActionType.ALARM_DELETE, null);
115 118 sendEntityNotificationMsg(getTenantId(), alarmId, EdgeEventActionType.DELETED);
116 119
117 120 return alarmService.deleteAlarm(getTenantId(), alarmId);
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.controller;
17 17
18 18 import com.fasterxml.jackson.core.JsonProcessingException;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20   -import com.fasterxml.jackson.databind.node.ArrayNode;
21 20 import com.fasterxml.jackson.databind.node.ObjectNode;
22 21 import lombok.Getter;
23 22 import lombok.extern.slf4j.Slf4j;
... ... @@ -31,7 +30,6 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
31 30 import org.thingsboard.server.common.data.Customer;
32 31 import org.thingsboard.server.common.data.Dashboard;
33 32 import org.thingsboard.server.common.data.DashboardInfo;
34   -import org.thingsboard.server.common.data.DataConstants;
35 33 import org.thingsboard.server.common.data.Device;
36 34 import org.thingsboard.server.common.data.DeviceInfo;
37 35 import org.thingsboard.server.common.data.DeviceProfile;
... ... @@ -79,10 +77,6 @@ import org.thingsboard.server.common.data.id.TenantProfileId;
79 77 import org.thingsboard.server.common.data.id.UserId;
80 78 import org.thingsboard.server.common.data.id.WidgetTypeId;
81 79 import org.thingsboard.server.common.data.id.WidgetsBundleId;
82   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
83   -import org.thingsboard.server.common.data.kv.DataType;
84   -import org.thingsboard.server.common.data.kv.KvEntry;
85   -import org.thingsboard.server.common.data.kv.TsKvEntry;
86 80 import org.thingsboard.server.common.data.page.PageLink;
87 81 import org.thingsboard.server.common.data.page.SortOrder;
88 82 import org.thingsboard.server.common.data.page.TimePageLink;
... ... @@ -94,9 +88,6 @@ import org.thingsboard.server.common.data.rule.RuleChainType;
94 88 import org.thingsboard.server.common.data.rule.RuleNode;
95 89 import org.thingsboard.server.common.data.widget.WidgetTypeDetails;
96 90 import org.thingsboard.server.common.data.widget.WidgetsBundle;
97   -import org.thingsboard.server.common.msg.TbMsg;
98   -import org.thingsboard.server.common.msg.TbMsgDataType;
99   -import org.thingsboard.server.common.msg.TbMsgMetaData;
100 91 import org.thingsboard.server.dao.asset.AssetService;
101 92 import org.thingsboard.server.dao.attributes.AttributesService;
102 93 import org.thingsboard.server.dao.audit.AuditLogService;
... ... @@ -127,6 +118,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
127 118 import org.thingsboard.server.queue.discovery.PartitionService;
128 119 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
129 120 import org.thingsboard.server.queue.util.TbCoreComponent;
  121 +import org.thingsboard.server.service.action.RuleEngineEntityActionService;
130 122 import org.thingsboard.server.service.component.ComponentDiscoveryService;
131 123 import org.thingsboard.server.service.firmware.FirmwareStateService;
132 124 import org.thingsboard.server.service.edge.EdgeNotificationService;
... ... @@ -147,11 +139,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
147 139 import javax.mail.MessagingException;
148 140 import javax.servlet.http.HttpServletResponse;
149 141 import java.util.List;
150   -import java.util.Map;
151 142 import java.util.Optional;
152 143 import java.util.Set;
153 144 import java.util.UUID;
154   -import java.util.stream.Collectors;
155 145
156 146 import static org.thingsboard.server.dao.service.Validator.validateId;
157 147
... ... @@ -281,6 +271,9 @@ public abstract class BaseController {
281 271 @Autowired(required = false)
282 272 protected EdgeGrpcService edgeGrpcService;
283 273
  274 + @Autowired
  275 + protected RuleEngineEntityActionService ruleEngineEntityActionService;
  276 +
284 277 @Value("${server.log_controller_error_stack_trace}")
285 278 @Getter
286 279 private boolean logControllerErrorStackTrace;
... ... @@ -811,7 +804,7 @@ public abstract class BaseController {
811 804 customerId = user.getCustomerId();
812 805 }
813 806 if (e == null) {
814   - pushEntityActionToRuleEngine(entityId, entity, user, customerId, actionType, additionalInfo);
  807 + ruleEngineEntityActionService.pushEntityActionToRuleEngine(entityId, entity, user.getTenantId(), customerId, actionType, user, additionalInfo);
815 808 }
816 809 auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo);
817 810 }
... ... @@ -821,184 +814,6 @@ public abstract class BaseController {
821 814 return error != null ? (Exception.class.isInstance(error) ? (Exception) error : new Exception(error)) : null;
822 815 }
823 816
824   - private <E extends HasName, I extends EntityId> void pushEntityActionToRuleEngine(I entityId, E entity, User user, CustomerId customerId,
825   - ActionType actionType, Object... additionalInfo) {
826   - String msgType = null;
827   - switch (actionType) {
828   - case ADDED:
829   - msgType = DataConstants.ENTITY_CREATED;
830   - break;
831   - case DELETED:
832   - msgType = DataConstants.ENTITY_DELETED;
833   - break;
834   - case UPDATED:
835   - msgType = DataConstants.ENTITY_UPDATED;
836   - break;
837   - case ASSIGNED_TO_CUSTOMER:
838   - msgType = DataConstants.ENTITY_ASSIGNED;
839   - break;
840   - case UNASSIGNED_FROM_CUSTOMER:
841   - msgType = DataConstants.ENTITY_UNASSIGNED;
842   - break;
843   - case ATTRIBUTES_UPDATED:
844   - msgType = DataConstants.ATTRIBUTES_UPDATED;
845   - break;
846   - case ATTRIBUTES_DELETED:
847   - msgType = DataConstants.ATTRIBUTES_DELETED;
848   - break;
849   - case ALARM_ACK:
850   - msgType = DataConstants.ALARM_ACK;
851   - break;
852   - case ALARM_CLEAR:
853   - msgType = DataConstants.ALARM_CLEAR;
854   - break;
855   - case ASSIGNED_FROM_TENANT:
856   - msgType = DataConstants.ENTITY_ASSIGNED_FROM_TENANT;
857   - break;
858   - case ASSIGNED_TO_TENANT:
859   - msgType = DataConstants.ENTITY_ASSIGNED_TO_TENANT;
860   - break;
861   - case PROVISION_SUCCESS:
862   - msgType = DataConstants.PROVISION_SUCCESS;
863   - break;
864   - case PROVISION_FAILURE:
865   - msgType = DataConstants.PROVISION_FAILURE;
866   - break;
867   - case TIMESERIES_UPDATED:
868   - msgType = DataConstants.TIMESERIES_UPDATED;
869   - break;
870   - case TIMESERIES_DELETED:
871   - msgType = DataConstants.TIMESERIES_DELETED;
872   - break;
873   - case ASSIGNED_TO_EDGE:
874   - msgType = DataConstants.ENTITY_ASSIGNED_TO_EDGE;
875   - break;
876   - case UNASSIGNED_FROM_EDGE:
877   - msgType = DataConstants.ENTITY_UNASSIGNED_FROM_EDGE;
878   - break;
879   - }
880   - if (!StringUtils.isEmpty(msgType)) {
881   - try {
882   - TbMsgMetaData metaData = new TbMsgMetaData();
883   - metaData.putValue("userId", user.getId().toString());
884   - metaData.putValue("userName", user.getName());
885   - if (customerId != null && !customerId.isNullUid()) {
886   - metaData.putValue("customerId", customerId.toString());
887   - }
888   - if (actionType == ActionType.ASSIGNED_TO_CUSTOMER) {
889   - String strCustomerId = extractParameter(String.class, 1, additionalInfo);
890   - String strCustomerName = extractParameter(String.class, 2, additionalInfo);
891   - metaData.putValue("assignedCustomerId", strCustomerId);
892   - metaData.putValue("assignedCustomerName", strCustomerName);
893   - } else if (actionType == ActionType.UNASSIGNED_FROM_CUSTOMER) {
894   - String strCustomerId = extractParameter(String.class, 1, additionalInfo);
895   - String strCustomerName = extractParameter(String.class, 2, additionalInfo);
896   - metaData.putValue("unassignedCustomerId", strCustomerId);
897   - metaData.putValue("unassignedCustomerName", strCustomerName);
898   - } else if (actionType == ActionType.ASSIGNED_FROM_TENANT) {
899   - String strTenantId = extractParameter(String.class, 0, additionalInfo);
900   - String strTenantName = extractParameter(String.class, 1, additionalInfo);
901   - metaData.putValue("assignedFromTenantId", strTenantId);
902   - metaData.putValue("assignedFromTenantName", strTenantName);
903   - } else if (actionType == ActionType.ASSIGNED_TO_TENANT) {
904   - String strTenantId = extractParameter(String.class, 0, additionalInfo);
905   - String strTenantName = extractParameter(String.class, 1, additionalInfo);
906   - metaData.putValue("assignedToTenantId", strTenantId);
907   - metaData.putValue("assignedToTenantName", strTenantName);
908   - } else if (actionType == ActionType.ASSIGNED_TO_EDGE) {
909   - String strEdgeId = extractParameter(String.class, 1, additionalInfo);
910   - String strEdgeName = extractParameter(String.class, 2, additionalInfo);
911   - metaData.putValue("assignedEdgeId", strEdgeId);
912   - metaData.putValue("assignedEdgeName", strEdgeName);
913   - } else if (actionType == ActionType.UNASSIGNED_FROM_EDGE) {
914   - String strEdgeId = extractParameter(String.class, 1, additionalInfo);
915   - String strEdgeName = extractParameter(String.class, 2, additionalInfo);
916   - metaData.putValue("unassignedEdgeId", strEdgeId);
917   - metaData.putValue("unassignedEdgeName", strEdgeName);
918   - }
919   - ObjectNode entityNode;
920   - if (entity != null) {
921   - entityNode = json.valueToTree(entity);
922   - if (entityId.getEntityType() == EntityType.DASHBOARD) {
923   - entityNode.put("configuration", "");
924   - }
925   - } else {
926   - entityNode = json.createObjectNode();
927   - if (actionType == ActionType.ATTRIBUTES_UPDATED) {
928   - String scope = extractParameter(String.class, 0, additionalInfo);
929   - @SuppressWarnings("unchecked")
930   - List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo);
931   - metaData.putValue(DataConstants.SCOPE, scope);
932   - if (attributes != null) {
933   - for (AttributeKvEntry attr : attributes) {
934   - addKvEntry(entityNode, attr);
935   - }
936   - }
937   - } else if (actionType == ActionType.ATTRIBUTES_DELETED) {
938   - String scope = extractParameter(String.class, 0, additionalInfo);
939   - @SuppressWarnings("unchecked")
940   - List<String> keys = extractParameter(List.class, 1, additionalInfo);
941   - metaData.putValue(DataConstants.SCOPE, scope);
942   - ArrayNode attrsArrayNode = entityNode.putArray("attributes");
943   - if (keys != null) {
944   - keys.forEach(attrsArrayNode::add);
945   - }
946   - } else if (actionType == ActionType.TIMESERIES_UPDATED) {
947   - @SuppressWarnings("unchecked")
948   - List<TsKvEntry> timeseries = extractParameter(List.class, 0, additionalInfo);
949   - addTimeseries(entityNode, timeseries);
950   - } else if (actionType == ActionType.TIMESERIES_DELETED) {
951   - @SuppressWarnings("unchecked")
952   - List<String> keys = extractParameter(List.class, 0, additionalInfo);
953   - if (keys != null) {
954   - ArrayNode timeseriesArrayNode = entityNode.putArray("timeseries");
955   - keys.forEach(timeseriesArrayNode::add);
956   - }
957   - entityNode.put("startTs", extractParameter(Long.class, 1, additionalInfo));
958   - entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo));
959   - }
960   - }
961   - TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, customerId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
962   - TenantId tenantId = user.getTenantId();
963   - if (tenantId.isNullUid()) {
964   - if (entity instanceof HasTenantId) {
965   - tenantId = ((HasTenantId) entity).getTenantId();
966   - }
967   - }
968   - tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, null);
969   - } catch (Exception e) {
970   - log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
971   - }
972   - }
973   - }
974   -
975   - private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) throws Exception {
976   - if (kvEntry.getDataType() == DataType.BOOLEAN) {
977   - kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
978   - } else if (kvEntry.getDataType() == DataType.DOUBLE) {
979   - kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
980   - } else if (kvEntry.getDataType() == DataType.LONG) {
981   - kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
982   - } else if (kvEntry.getDataType() == DataType.JSON) {
983   - if (kvEntry.getJsonValue().isPresent()) {
984   - entityNode.set(kvEntry.getKey(), json.readTree(kvEntry.getJsonValue().get()));
985   - }
986   - } else {
987   - entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
988   - }
989   - }
990   -
991   - private <T> T extractParameter(Class<T> clazz, int index, Object... additionalInfo) {
992   - T result = null;
993   - if (additionalInfo != null && additionalInfo.length > index) {
994   - Object paramObject = additionalInfo[index];
995   - if (clazz.isInstance(paramObject)) {
996   - result = clazz.cast(paramObject);
997   - }
998   - }
999   - return result;
1000   - }
1001   -
1002 817 protected <E extends HasName> String entityToStr(E entity) {
1003 818 try {
1004 819 return json.writeValueAsString(json.valueToTree(entity));
... ... @@ -1095,23 +910,6 @@ public abstract class BaseController {
1095 910 return result;
1096 911 }
1097 912
1098   - private void addTimeseries(ObjectNode entityNode, List<TsKvEntry> timeseries) throws Exception {
1099   - if (timeseries != null && !timeseries.isEmpty()) {
1100   - ArrayNode result = entityNode.putArray("timeseries");
1101   - Map<Long, List<TsKvEntry>> groupedTelemetry = timeseries.stream()
1102   - .collect(Collectors.groupingBy(TsKvEntry::getTs));
1103   - for (Map.Entry<Long, List<TsKvEntry>> entry : groupedTelemetry.entrySet()) {
1104   - ObjectNode element = json.createObjectNode();
1105   - element.put("ts", entry.getKey());
1106   - ObjectNode values = element.putObject("values");
1107   - for (TsKvEntry tsKvEntry : entry.getValue()) {
1108   - addKvEntry(values, tsKvEntry);
1109   - }
1110   - result.add(element);
1111   - }
1112   - }
1113   - }
1114   -
1115 913 protected void processDashboardIdFromAdditionalInfo(ObjectNode additionalInfo, String requiredFields) throws ThingsboardException {
1116 914 String dashboardId = additionalInfo.has(requiredFields) ? additionalInfo.get(requiredFields).asText() : null;
1117 915 if (dashboardId != null && !dashboardId.equals("null")) {
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.action;
  17 +
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
  19 +import com.fasterxml.jackson.databind.node.ArrayNode;
  20 +import com.fasterxml.jackson.databind.node.ObjectNode;
  21 +import lombok.RequiredArgsConstructor;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.apache.commons.lang3.StringUtils;
  24 +import org.springframework.stereotype.Service;
  25 +import org.thingsboard.server.common.data.DataConstants;
  26 +import org.thingsboard.server.common.data.EntityType;
  27 +import org.thingsboard.server.common.data.HasName;
  28 +import org.thingsboard.server.common.data.HasTenantId;
  29 +import org.thingsboard.server.common.data.User;
  30 +import org.thingsboard.server.common.data.audit.ActionType;
  31 +import org.thingsboard.server.common.data.id.CustomerId;
  32 +import org.thingsboard.server.common.data.id.EntityId;
  33 +import org.thingsboard.server.common.data.id.TenantId;
  34 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  35 +import org.thingsboard.server.common.data.kv.DataType;
  36 +import org.thingsboard.server.common.data.kv.KvEntry;
  37 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  38 +import org.thingsboard.server.common.msg.TbMsg;
  39 +import org.thingsboard.server.common.msg.TbMsgDataType;
  40 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  41 +import org.thingsboard.server.queue.util.TbCoreComponent;
  42 +import org.thingsboard.server.service.queue.TbClusterService;
  43 +
  44 +import java.util.List;
  45 +import java.util.Map;
  46 +import java.util.stream.Collectors;
  47 +
  48 +@TbCoreComponent
  49 +@Service
  50 +@RequiredArgsConstructor
  51 +@Slf4j
  52 +public class RuleEngineEntityActionService {
  53 + private final TbClusterService tbClusterService;
  54 +
  55 + private static final ObjectMapper json = new ObjectMapper();
  56 +
  57 + public void pushEntityActionToRuleEngine(EntityId entityId, HasName entity, TenantId tenantId, CustomerId customerId,
  58 + ActionType actionType, User user, Object... additionalInfo) {
  59 + String msgType = null;
  60 + switch (actionType) {
  61 + case ADDED:
  62 + msgType = DataConstants.ENTITY_CREATED;
  63 + break;
  64 + case DELETED:
  65 + msgType = DataConstants.ENTITY_DELETED;
  66 + break;
  67 + case UPDATED:
  68 + msgType = DataConstants.ENTITY_UPDATED;
  69 + break;
  70 + case ASSIGNED_TO_CUSTOMER:
  71 + msgType = DataConstants.ENTITY_ASSIGNED;
  72 + break;
  73 + case UNASSIGNED_FROM_CUSTOMER:
  74 + msgType = DataConstants.ENTITY_UNASSIGNED;
  75 + break;
  76 + case ATTRIBUTES_UPDATED:
  77 + msgType = DataConstants.ATTRIBUTES_UPDATED;
  78 + break;
  79 + case ATTRIBUTES_DELETED:
  80 + msgType = DataConstants.ATTRIBUTES_DELETED;
  81 + break;
  82 + case ALARM_ACK:
  83 + msgType = DataConstants.ALARM_ACK;
  84 + break;
  85 + case ALARM_CLEAR:
  86 + msgType = DataConstants.ALARM_CLEAR;
  87 + break;
  88 + case ALARM_DELETE:
  89 + msgType = DataConstants.ALARM_DELETE;
  90 + break;
  91 + case ASSIGNED_FROM_TENANT:
  92 + msgType = DataConstants.ENTITY_ASSIGNED_FROM_TENANT;
  93 + break;
  94 + case ASSIGNED_TO_TENANT:
  95 + msgType = DataConstants.ENTITY_ASSIGNED_TO_TENANT;
  96 + break;
  97 + case PROVISION_SUCCESS:
  98 + msgType = DataConstants.PROVISION_SUCCESS;
  99 + break;
  100 + case PROVISION_FAILURE:
  101 + msgType = DataConstants.PROVISION_FAILURE;
  102 + break;
  103 + case TIMESERIES_UPDATED:
  104 + msgType = DataConstants.TIMESERIES_UPDATED;
  105 + break;
  106 + case TIMESERIES_DELETED:
  107 + msgType = DataConstants.TIMESERIES_DELETED;
  108 + break;
  109 + case ASSIGNED_TO_EDGE:
  110 + msgType = DataConstants.ENTITY_ASSIGNED_TO_EDGE;
  111 + break;
  112 + case UNASSIGNED_FROM_EDGE:
  113 + msgType = DataConstants.ENTITY_UNASSIGNED_FROM_EDGE;
  114 + break;
  115 + }
  116 + if (!StringUtils.isEmpty(msgType)) {
  117 + try {
  118 + TbMsgMetaData metaData = new TbMsgMetaData();
  119 + if (user != null) {
  120 + metaData.putValue("userId", user.getId().toString());
  121 + metaData.putValue("userName", user.getName());
  122 + }
  123 + if (customerId != null && !customerId.isNullUid()) {
  124 + metaData.putValue("customerId", customerId.toString());
  125 + }
  126 + if (actionType == ActionType.ASSIGNED_TO_CUSTOMER) {
  127 + String strCustomerId = extractParameter(String.class, 1, additionalInfo);
  128 + String strCustomerName = extractParameter(String.class, 2, additionalInfo);
  129 + metaData.putValue("assignedCustomerId", strCustomerId);
  130 + metaData.putValue("assignedCustomerName", strCustomerName);
  131 + } else if (actionType == ActionType.UNASSIGNED_FROM_CUSTOMER) {
  132 + String strCustomerId = extractParameter(String.class, 1, additionalInfo);
  133 + String strCustomerName = extractParameter(String.class, 2, additionalInfo);
  134 + metaData.putValue("unassignedCustomerId", strCustomerId);
  135 + metaData.putValue("unassignedCustomerName", strCustomerName);
  136 + } else if (actionType == ActionType.ASSIGNED_FROM_TENANT) {
  137 + String strTenantId = extractParameter(String.class, 0, additionalInfo);
  138 + String strTenantName = extractParameter(String.class, 1, additionalInfo);
  139 + metaData.putValue("assignedFromTenantId", strTenantId);
  140 + metaData.putValue("assignedFromTenantName", strTenantName);
  141 + } else if (actionType == ActionType.ASSIGNED_TO_TENANT) {
  142 + String strTenantId = extractParameter(String.class, 0, additionalInfo);
  143 + String strTenantName = extractParameter(String.class, 1, additionalInfo);
  144 + metaData.putValue("assignedToTenantId", strTenantId);
  145 + metaData.putValue("assignedToTenantName", strTenantName);
  146 + } else if (actionType == ActionType.ASSIGNED_TO_EDGE) {
  147 + String strEdgeId = extractParameter(String.class, 1, additionalInfo);
  148 + String strEdgeName = extractParameter(String.class, 2, additionalInfo);
  149 + metaData.putValue("assignedEdgeId", strEdgeId);
  150 + metaData.putValue("assignedEdgeName", strEdgeName);
  151 + } else if (actionType == ActionType.UNASSIGNED_FROM_EDGE) {
  152 + String strEdgeId = extractParameter(String.class, 1, additionalInfo);
  153 + String strEdgeName = extractParameter(String.class, 2, additionalInfo);
  154 + metaData.putValue("unassignedEdgeId", strEdgeId);
  155 + metaData.putValue("unassignedEdgeName", strEdgeName);
  156 + }
  157 + ObjectNode entityNode;
  158 + if (entity != null) {
  159 + entityNode = json.valueToTree(entity);
  160 + if (entityId.getEntityType() == EntityType.DASHBOARD) {
  161 + entityNode.put("configuration", "");
  162 + }
  163 + } else {
  164 + entityNode = json.createObjectNode();
  165 + if (actionType == ActionType.ATTRIBUTES_UPDATED) {
  166 + String scope = extractParameter(String.class, 0, additionalInfo);
  167 + @SuppressWarnings("unchecked")
  168 + List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo);
  169 + metaData.putValue(DataConstants.SCOPE, scope);
  170 + if (attributes != null) {
  171 + for (AttributeKvEntry attr : attributes) {
  172 + addKvEntry(entityNode, attr);
  173 + }
  174 + }
  175 + } else if (actionType == ActionType.ATTRIBUTES_DELETED) {
  176 + String scope = extractParameter(String.class, 0, additionalInfo);
  177 + @SuppressWarnings("unchecked")
  178 + List<String> keys = extractParameter(List.class, 1, additionalInfo);
  179 + metaData.putValue(DataConstants.SCOPE, scope);
  180 + ArrayNode attrsArrayNode = entityNode.putArray("attributes");
  181 + if (keys != null) {
  182 + keys.forEach(attrsArrayNode::add);
  183 + }
  184 + } else if (actionType == ActionType.TIMESERIES_UPDATED) {
  185 + @SuppressWarnings("unchecked")
  186 + List<TsKvEntry> timeseries = extractParameter(List.class, 0, additionalInfo);
  187 + addTimeseries(entityNode, timeseries);
  188 + } else if (actionType == ActionType.TIMESERIES_DELETED) {
  189 + @SuppressWarnings("unchecked")
  190 + List<String> keys = extractParameter(List.class, 0, additionalInfo);
  191 + if (keys != null) {
  192 + ArrayNode timeseriesArrayNode = entityNode.putArray("timeseries");
  193 + keys.forEach(timeseriesArrayNode::add);
  194 + }
  195 + entityNode.put("startTs", extractParameter(Long.class, 1, additionalInfo));
  196 + entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo));
  197 + }
  198 + }
  199 + TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, customerId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
  200 + if (tenantId.isNullUid()) {
  201 + if (entity instanceof HasTenantId) {
  202 + tenantId = ((HasTenantId) entity).getTenantId();
  203 + }
  204 + }
  205 + tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, null);
  206 + } catch (Exception e) {
  207 + log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
  208 + }
  209 + }
  210 + }
  211 +
  212 +
  213 + private <T> T extractParameter(Class<T> clazz, int index, Object... additionalInfo) {
  214 + T result = null;
  215 + if (additionalInfo != null && additionalInfo.length > index) {
  216 + Object paramObject = additionalInfo[index];
  217 + if (clazz.isInstance(paramObject)) {
  218 + result = clazz.cast(paramObject);
  219 + }
  220 + }
  221 + return result;
  222 + }
  223 +
  224 + private void addTimeseries(ObjectNode entityNode, List<TsKvEntry> timeseries) throws Exception {
  225 + if (timeseries != null && !timeseries.isEmpty()) {
  226 + ArrayNode result = entityNode.putArray("timeseries");
  227 + Map<Long, List<TsKvEntry>> groupedTelemetry = timeseries.stream()
  228 + .collect(Collectors.groupingBy(TsKvEntry::getTs));
  229 + for (Map.Entry<Long, List<TsKvEntry>> entry : groupedTelemetry.entrySet()) {
  230 + ObjectNode element = json.createObjectNode();
  231 + element.put("ts", entry.getKey());
  232 + ObjectNode values = element.putObject("values");
  233 + for (TsKvEntry tsKvEntry : entry.getValue()) {
  234 + addKvEntry(values, tsKvEntry);
  235 + }
  236 + result.add(element);
  237 + }
  238 + }
  239 + }
  240 +
  241 + private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) throws Exception {
  242 + if (kvEntry.getDataType() == DataType.BOOLEAN) {
  243 + kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
  244 + } else if (kvEntry.getDataType() == DataType.DOUBLE) {
  245 + kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
  246 + } else if (kvEntry.getDataType() == DataType.LONG) {
  247 + kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
  248 + } else if (kvEntry.getDataType() == DataType.JSON) {
  249 + if (kvEntry.getJsonValue().isPresent()) {
  250 + entityNode.set(kvEntry.getKey(), json.readTree(kvEntry.getJsonValue().get()));
  251 + }
  252 + } else {
  253 + entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
  254 + }
  255 + }
  256 +}
... ...
... ... @@ -17,9 +17,9 @@ package org.thingsboard.server.service.ttl;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20   -import org.thingsboard.server.dao.util.PsqlDao;
21 20
22 21 import java.sql.Connection;
  22 +import java.sql.DriverManager;
23 23 import java.sql.ResultSet;
24 24 import java.sql.SQLException;
25 25 import java.sql.SQLWarning;
... ... @@ -62,4 +62,8 @@ public abstract class AbstractCleanUpService {
62 62
63 63 protected abstract void doCleanUp(Connection connection) throws SQLException;
64 64
  65 + protected Connection getConnection() throws SQLException {
  66 + return DriverManager.getConnection(dbUrl, dbUserName, dbPassword);
  67 + }
  68 +
65 69 }
... ...
... ... @@ -20,17 +20,27 @@ import lombok.extern.slf4j.Slf4j;
20 20 import org.springframework.beans.factory.annotation.Value;
21 21 import org.springframework.scheduling.annotation.Scheduled;
22 22 import org.springframework.stereotype.Service;
  23 +import org.thingsboard.server.common.data.alarm.Alarm;
  24 +import org.thingsboard.server.common.data.audit.ActionType;
  25 +import org.thingsboard.server.common.data.id.AlarmId;
23 26 import org.thingsboard.server.common.data.id.TenantId;
24 27 import org.thingsboard.server.common.data.page.PageData;
25 28 import org.thingsboard.server.common.data.page.PageLink;
26 29 import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
27 30 import org.thingsboard.server.common.msg.queue.ServiceType;
28 31 import org.thingsboard.server.dao.alarm.AlarmDao;
  32 +import org.thingsboard.server.dao.alarm.AlarmService;
  33 +import org.thingsboard.server.dao.relation.RelationService;
29 34 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
30 35 import org.thingsboard.server.dao.tenant.TenantDao;
31 36 import org.thingsboard.server.dao.util.PsqlDao;
32 37 import org.thingsboard.server.queue.discovery.PartitionService;
  38 +import org.thingsboard.server.service.action.RuleEngineEntityActionService;
  39 +import org.thingsboard.server.service.ttl.AbstractCleanUpService;
33 40
  41 +import java.sql.Connection;
  42 +import java.sql.SQLException;
  43 +import java.util.Date;
34 44 import java.util.Optional;
35 45 import java.util.UUID;
36 46 import java.util.concurrent.TimeUnit;
... ... @@ -43,37 +53,54 @@ public class AlarmsCleanUpService {
43 53 @Value("${sql.ttl.alarms.removal_batch_size}")
44 54 private Integer removalBatchSize;
45 55
46   - private final AlarmDao alarmDao;
47 56 private final TenantDao tenantDao;
  57 + private final AlarmDao alarmDao;
  58 + private final AlarmService alarmService;
  59 + private final RelationService relationService;
  60 + private final RuleEngineEntityActionService ruleEngineEntityActionService;
48 61 private final PartitionService partitionService;
49 62 private final TbTenantProfileCache tenantProfileCache;
50 63
51   - @Scheduled(initialDelayString = "${sql.ttl.alarms.checking_interval}", fixedDelayString = "${sql.ttl.alarms.checking_interval}")
  64 + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.alarms.checking_interval})}", fixedDelayString = "${sql.ttl.alarms.checking_interval}")
52 65 public void cleanUp() {
53   - PageLink tenantsBatchRequest = new PageLink(65536, 0);
54   - PageLink alarmsRemovalBatchRequest = new PageLink(removalBatchSize, 0);
55   - long currentTime = System.currentTimeMillis();
56   -
  66 + PageLink tenantsBatchRequest = new PageLink(10_000, 0);
  67 + PageLink removalBatchRequest = new PageLink(removalBatchSize, 0);
57 68 PageData<TenantId> tenantsIds;
58 69 do {
59 70 tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest);
60   - tenantsIds.getData().stream()
61   - .filter(tenantId -> partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition())
62   - .forEach(tenantId -> {
63   - Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
64   - if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) {
65   - return;
66   - }
  71 + for (TenantId tenantId : tenantsIds.getData()) {
  72 + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
  73 + continue;
  74 + }
  75 +
  76 + Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
  77 + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) {
  78 + continue;
  79 + }
67 80
68   - PageData<UUID> toRemove;
69   - long outdatageTime = currentTime - TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays());
70   - log.info("Cleaning up outdated alarms for tenant {}", tenantId);
71   - do {
72   - toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(outdatageTime, tenantId, alarmsRemovalBatchRequest);
73   - alarmDao.removeAllByIds(toRemove.getData());
74   - } while (toRemove.hasNext());
  81 + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays());
  82 + long outdatageTime = System.currentTimeMillis() - ttl;
  83 +
  84 + long totalRemoved = 0;
  85 + while (true) {
  86 + PageData<AlarmId> toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(outdatageTime, tenantId, removalBatchRequest);
  87 + toRemove.getData().forEach(alarmId -> {
  88 + relationService.deleteEntityRelations(tenantId, alarmId);
  89 + Alarm alarm = alarmService.deleteAlarm(tenantId, alarmId).getAlarm();
  90 + ruleEngineEntityActionService.pushEntityActionToRuleEngine(alarm.getOriginator(), alarm, tenantId, null, ActionType.ALARM_DELETE, null);
75 91 });
76 92
  93 + totalRemoved += toRemove.getTotalElements();
  94 + if (!toRemove.hasNext()) {
  95 + break;
  96 + }
  97 + }
  98 +
  99 + if (totalRemoved > 0) {
  100 + log.info("Removed {} outdated alarm(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(outdatageTime));
  101 + }
  102 + }
  103 +
77 104 tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
78 105 } while (tenantsIds.hasNext());
79 106 }
... ...
... ... @@ -40,7 +40,7 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
40 40 @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
41 41 public void cleanUp() {
42 42 if (ttlTaskExecutionEnabled) {
43   - try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  43 + try (Connection conn = getConnection()) {
44 44 doCleanUp(conn);
45 45 } catch (SQLException e) {
46 46 log.error("SQLException occurred during TTL task execution ", e);
... ...
... ... @@ -43,7 +43,7 @@ public class EventsCleanUpService extends AbstractCleanUpService {
43 43 @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
44 44 public void cleanUp() {
45 45 if (ttlTaskExecutionEnabled) {
46   - try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  46 + try (Connection conn = getConnection()) {
47 47 doCleanUp(conn);
48 48 } catch (SQLException e) {
49 49 log.error("SQLException occurred during TTL task execution ", e);
... ...
... ... @@ -36,7 +36,7 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe
36 36 @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}")
37 37 public void cleanUp() {
38 38 if (ttlTaskExecutionEnabled) {
39   - try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  39 + try (Connection conn = getConnection()) {
40 40 doCleanUp(conn);
41 41 } catch (SQLException e) {
42 42 log.error("SQLException occurred during TTL task execution ", e);
... ...
... ... @@ -275,7 +275,7 @@ sql:
275 275 edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month
276 276 alarms:
277 277 checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
278   - removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:200}" # To delete outdated alarms not all at once but in batches
  278 + removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
279 279
280 280 # Actor system parameters
281 281 actors:
... ...
... ... @@ -66,6 +66,7 @@ public class DataConstants {
66 66 public static final String TIMESERIES_DELETED = "TIMESERIES_DELETED";
67 67 public static final String ALARM_ACK = "ALARM_ACK";
68 68 public static final String ALARM_CLEAR = "ALARM_CLEAR";
  69 + public static final String ALARM_DELETE = "ALARM_DELETE";
69 70 public static final String ENTITY_ASSIGNED_FROM_TENANT = "ENTITY_ASSIGNED_FROM_TENANT";
70 71 public static final String ENTITY_ASSIGNED_TO_TENANT = "ENTITY_ASSIGNED_TO_TENANT";
71 72 public static final String PROVISION_SUCCESS = "PROVISION_SUCCESS";
... ...
... ... @@ -93,6 +93,7 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
93 93 }
94 94 }
95 95
  96 + @JsonIgnore
96 97 public Optional<DefaultTenantProfileConfiguration> getProfileConfiguration() {
97 98 return Optional.ofNullable(getProfileData().getConfiguration())
98 99 .filter(profileConfiguration -> profileConfiguration instanceof DefaultTenantProfileConfiguration)
... ...
... ... @@ -39,6 +39,7 @@ public enum ActionType {
39 39 RELATIONS_DELETED(false),
40 40 ALARM_ACK(false),
41 41 ALARM_CLEAR(false),
  42 + ALARM_DELETE(false),
42 43 LOGIN(false),
43 44 LOGOUT(false),
44 45 LOCKOUT(false),
... ...
... ... @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.alarm.AlarmInfo;
21 21 import org.thingsboard.server.common.data.alarm.AlarmQuery;
22 22 import org.thingsboard.server.common.data.alarm.AlarmSeverity;
23 23 import org.thingsboard.server.common.data.alarm.AlarmStatus;
  24 +import org.thingsboard.server.common.data.id.AlarmId;
24 25 import org.thingsboard.server.common.data.id.CustomerId;
25 26 import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -56,6 +57,6 @@ public interface AlarmDao extends Dao<Alarm> {
56 57
57 58 Set<AlarmSeverity> findAlarmSeverities(TenantId tenantId, EntityId entityId, Set<AlarmStatus> status);
58 59
59   - PageData<UUID> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink);
  60 + PageData<AlarmId> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink);
60 61
61 62 }
... ...
... ... @@ -23,6 +23,7 @@ import org.springframework.data.repository.CrudRepository;
23 23 import org.springframework.data.repository.query.Param;
24 24 import org.thingsboard.server.common.data.alarm.AlarmSeverity;
25 25 import org.thingsboard.server.common.data.alarm.AlarmStatus;
  26 +import org.thingsboard.server.common.data.id.TenantId;
26 27 import org.thingsboard.server.dao.model.sql.AlarmEntity;
27 28 import org.thingsboard.server.dao.model.sql.AlarmInfoEntity;
28 29
... ... @@ -161,7 +162,7 @@ public interface AlarmRepository extends CrudRepository<AlarmEntity, UUID> {
161 162 @Param("affectedEntityType") String affectedEntityType,
162 163 @Param("alarmStatuses") Set<AlarmStatus> alarmStatuses);
163 164
164   - @Query("SELECT a.id FROM AlarmEntity a WHERE a.createdTime < :time AND a.endTs < :time")
165   - Page<UUID> findAlarmsIdsByEndTsBefore(@Param("time") Long time, Pageable pageable);
  165 + @Query("SELECT a.id FROM AlarmEntity a WHERE a.tenantId = :tenantId AND a.createdTime < :time AND a.endTs < :time")
  166 + Page<UUID> findAlarmsIdsByEndTsBeforeAndTenantId(@Param("time") Long time, @Param("tenantId") UUID tenantId, Pageable pageable);
166 167
167 168 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.alarm.AlarmInfo;
26 26 import org.thingsboard.server.common.data.alarm.AlarmQuery;
27 27 import org.thingsboard.server.common.data.alarm.AlarmSeverity;
28 28 import org.thingsboard.server.common.data.alarm.AlarmStatus;
  29 +import org.thingsboard.server.common.data.id.AlarmId;
29 30 import org.thingsboard.server.common.data.id.CustomerId;
30 31 import org.thingsboard.server.common.data.id.EntityId;
31 32 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -164,7 +165,8 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
164 165 }
165 166
166 167 @Override
167   - public PageData<UUID> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink) {
168   - return DaoUtil.pageToPageData(alarmRepository.findAlarmsIdsByEndTsBefore(time, DaoUtil.toPageable(pageLink)));
  168 + public PageData<AlarmId> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink) {
  169 + return DaoUtil.pageToPageData(alarmRepository.findAlarmsIdsByEndTsBeforeAndTenantId(time, tenantId.getId(), DaoUtil.toPageable(pageLink)))
  170 + .mapData(AlarmId::new);
169 171 }
170 172 }
... ...
... ... @@ -150,6 +150,8 @@ class DeviceState {
150 150 stateChanged = processAlarmClearNotification(ctx, msg);
151 151 } else if (msg.getType().equals(DataConstants.ALARM_ACK)) {
152 152 processAlarmAckNotification(ctx, msg);
  153 + } else if (msg.getType().equals(DataConstants.ALARM_DELETE)) {
  154 + processAlarmDeleteNotification(ctx, msg);
153 155 } else {
154 156 if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED) || msg.getType().equals(DataConstants.ENTITY_UNASSIGNED)) {
155 157 dynamicPredicateValueCtx.resetCustomer();
... ... @@ -193,6 +195,12 @@ class DeviceState {
193 195 ctx.tellSuccess(msg);
194 196 }
195 197
  198 + private void processAlarmDeleteNotification(TbContext ctx, TbMsg msg) {
  199 + Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
  200 + alarmStates.values().removeIf(alarmState -> alarmState.getCurrentAlarm().getId().equals(alarm.getId()));
  201 + ctx.tellSuccess(msg);
  202 + }
  203 +
196 204 private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
197 205 String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
198 206 if (StringUtils.isEmpty(scope)) {
... ...