Showing
13 changed files
with
138 additions
and
2 deletions
application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.ttl.alarms; | |
17 | + | |
18 | +import lombok.RequiredArgsConstructor; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.springframework.beans.factory.annotation.Value; | |
21 | +import org.springframework.scheduling.annotation.Scheduled; | |
22 | +import org.springframework.stereotype.Service; | |
23 | +import org.thingsboard.server.common.data.id.TenantId; | |
24 | +import org.thingsboard.server.common.data.page.PageData; | |
25 | +import org.thingsboard.server.common.data.page.PageLink; | |
26 | +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; | |
27 | +import org.thingsboard.server.common.msg.queue.ServiceType; | |
28 | +import org.thingsboard.server.dao.alarm.AlarmDao; | |
29 | +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; | |
30 | +import org.thingsboard.server.dao.tenant.TenantDao; | |
31 | +import org.thingsboard.server.dao.util.PsqlDao; | |
32 | +import org.thingsboard.server.queue.discovery.PartitionService; | |
33 | + | |
34 | +import java.util.Optional; | |
35 | +import java.util.UUID; | |
36 | +import java.util.concurrent.TimeUnit; | |
37 | + | |
38 | +@PsqlDao | |
39 | +@Service | |
40 | +@Slf4j | |
41 | +@RequiredArgsConstructor | |
42 | +public class AlarmsCleanUpService { | |
43 | + @Value("${sql.ttl.alarms.removal_batch_size}") | |
44 | + private Integer removalBatchSize; | |
45 | + | |
46 | + private final AlarmDao alarmDao; | |
47 | + private final TenantDao tenantDao; | |
48 | + private final PartitionService partitionService; | |
49 | + private final TbTenantProfileCache tenantProfileCache; | |
50 | + | |
51 | + @Scheduled(initialDelayString = "${sql.ttl.alarms.checking_interval}", fixedDelayString = "${sql.ttl.alarms.checking_interval}") | |
52 | + public void cleanUp() { | |
53 | + if (!partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition()) { | |
54 | + return; | |
55 | + } | |
56 | + | |
57 | + PageLink tenantsBatchRequest = new PageLink(65536, 0); | |
58 | + PageLink alarmsRemovalBatchRequest = new PageLink(removalBatchSize, 0); | |
59 | + long currentTime = System.currentTimeMillis(); | |
60 | + | |
61 | + PageData<TenantId> tenantsIds; | |
62 | + do { | |
63 | + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest); | |
64 | + tenantsIds.getData().forEach(tenantId -> { | |
65 | + Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); | |
66 | + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) { | |
67 | + return; | |
68 | + } | |
69 | + | |
70 | + PageData<UUID> toRemove; | |
71 | + long outdatageTime = currentTime - TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays()); | |
72 | + log.info("Cleaning up outdated alarms for tenant {}", tenantId); | |
73 | + do { | |
74 | + toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(outdatageTime, tenantId, alarmsRemovalBatchRequest); | |
75 | + alarmDao.removeAllByIds(toRemove.getData()); | |
76 | + } while (toRemove.hasNext()); | |
77 | + }); | |
78 | + | |
79 | + tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); | |
80 | + } while (tenantsIds.hasNext()); | |
81 | + } | |
82 | + | |
83 | +} | ... | ... |
... | ... | @@ -273,6 +273,9 @@ sql: |
273 | 273 | enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}" |
274 | 274 | execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day |
275 | 275 | edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month |
276 | + alarms: | |
277 | + checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours | |
278 | + removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:200}" # To delete outdated alarms not all at once but in batches | |
276 | 279 | |
277 | 280 | # Actor system parameters |
278 | 281 | actors: | ... | ... |
... | ... | @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.validation.NoXss; |
27 | 27 | |
28 | 28 | import java.io.ByteArrayInputStream; |
29 | 29 | import java.io.IOException; |
30 | +import java.util.Optional; | |
30 | 31 | |
31 | 32 | import static org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo.mapper; |
32 | 33 | |
... | ... | @@ -92,6 +93,12 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H |
92 | 93 | } |
93 | 94 | } |
94 | 95 | |
96 | + public Optional<DefaultTenantProfileConfiguration> getProfileConfiguration() { | |
97 | + return Optional.ofNullable(getProfileData().getConfiguration()) | |
98 | + .filter(profileConfiguration -> profileConfiguration instanceof DefaultTenantProfileConfiguration) | |
99 | + .map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration); | |
100 | + } | |
101 | + | |
95 | 102 | public TenantProfileData createDefaultTenantProfileData() { |
96 | 103 | TenantProfileData tpd = new TenantProfileData(); |
97 | 104 | tpd.setConfiguration(new DefaultTenantProfileConfiguration()); | ... | ... |
... | ... | @@ -17,10 +17,11 @@ package org.thingsboard.server.common.data.page; |
17 | 17 | |
18 | 18 | import com.fasterxml.jackson.annotation.JsonCreator; |
19 | 19 | import com.fasterxml.jackson.annotation.JsonProperty; |
20 | -import org.thingsboard.server.common.data.BaseData; | |
21 | 20 | |
22 | 21 | import java.util.Collections; |
23 | 22 | import java.util.List; |
23 | +import java.util.function.Function; | |
24 | +import java.util.stream.Collectors; | |
24 | 25 | |
25 | 26 | public class PageData<T> { |
26 | 27 | |
... | ... | @@ -61,4 +62,8 @@ public class PageData<T> { |
61 | 62 | return hasNext; |
62 | 63 | } |
63 | 64 | |
65 | + public <D> PageData<D> mapData(Function<T, D> mapper) { | |
66 | + return new PageData<>(getData().stream().map(mapper).collect(Collectors.toList()), getTotalPages(), getTotalElements(), hasNext()); | |
67 | + } | |
68 | + | |
64 | 69 | } | ... | ... |
... | ... | @@ -18,6 +18,7 @@ package org.thingsboard.server.dao; |
18 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
19 | 19 | import org.thingsboard.server.common.data.id.TenantId; |
20 | 20 | |
21 | +import java.util.Collection; | |
21 | 22 | import java.util.List; |
22 | 23 | import java.util.UUID; |
23 | 24 | |
... | ... | @@ -33,4 +34,6 @@ public interface Dao<T> { |
33 | 34 | |
34 | 35 | boolean removeById(TenantId tenantId, UUID id); |
35 | 36 | |
37 | + void removeAllByIds(Collection<UUID> ids); | |
38 | + | |
36 | 39 | } | ... | ... |
... | ... | @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.CustomerId; |
25 | 25 | import org.thingsboard.server.common.data.id.EntityId; |
26 | 26 | import org.thingsboard.server.common.data.id.TenantId; |
27 | 27 | import org.thingsboard.server.common.data.page.PageData; |
28 | +import org.thingsboard.server.common.data.page.PageLink; | |
28 | 29 | import org.thingsboard.server.common.data.query.AlarmData; |
29 | 30 | import org.thingsboard.server.common.data.query.AlarmDataQuery; |
30 | 31 | import org.thingsboard.server.dao.Dao; |
... | ... | @@ -54,4 +55,7 @@ public interface AlarmDao extends Dao<Alarm> { |
54 | 55 | AlarmDataQuery query, Collection<EntityId> orderedEntityIds); |
55 | 56 | |
56 | 57 | Set<AlarmSeverity> findAlarmSeverities(TenantId tenantId, EntityId entityId, Set<AlarmStatus> status); |
58 | + | |
59 | + PageData<UUID> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink); | |
60 | + | |
57 | 61 | } | ... | ... |
... | ... | @@ -26,6 +26,7 @@ import org.thingsboard.server.dao.Dao; |
26 | 26 | import org.thingsboard.server.dao.DaoUtil; |
27 | 27 | import org.thingsboard.server.dao.model.BaseEntity; |
28 | 28 | |
29 | +import java.util.Collection; | |
29 | 30 | import java.util.List; |
30 | 31 | import java.util.Optional; |
31 | 32 | import java.util.UUID; |
... | ... | @@ -87,6 +88,12 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D> |
87 | 88 | return !getCrudRepository().existsById(id); |
88 | 89 | } |
89 | 90 | |
91 | + @Transactional | |
92 | + public void removeAllByIds(Collection<UUID> ids) { | |
93 | + CrudRepository<E, UUID> repository = getCrudRepository(); | |
94 | + ids.forEach(repository::deleteById); | |
95 | + } | |
96 | + | |
90 | 97 | @Override |
91 | 98 | public List<D> find(TenantId tenantId) { |
92 | 99 | List<E> entities = Lists.newArrayList(getCrudRepository().findAll()); | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.alarm; |
17 | 17 | |
18 | 18 | import org.springframework.data.domain.Page; |
19 | 19 | import org.springframework.data.domain.Pageable; |
20 | +import org.springframework.data.jpa.repository.Modifying; | |
20 | 21 | import org.springframework.data.jpa.repository.Query; |
21 | 22 | import org.springframework.data.repository.CrudRepository; |
22 | 23 | import org.springframework.data.repository.query.Param; |
... | ... | @@ -159,4 +160,8 @@ public interface AlarmRepository extends CrudRepository<AlarmEntity, UUID> { |
159 | 160 | @Param("affectedEntityId") UUID affectedEntityId, |
160 | 161 | @Param("affectedEntityType") String affectedEntityType, |
161 | 162 | @Param("alarmStatuses") Set<AlarmStatus> alarmStatuses); |
163 | + | |
164 | + @Query("SELECT a.id FROM AlarmEntity a WHERE a.createdTime < :time AND a.endTs < :time") | |
165 | + Page<UUID> findAlarmsIdsByEndTsBefore(@Param("time") Long time, Pageable pageable); | |
166 | + | |
162 | 167 | } | ... | ... |
... | ... | @@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.CustomerId; |
30 | 30 | import org.thingsboard.server.common.data.id.EntityId; |
31 | 31 | import org.thingsboard.server.common.data.id.TenantId; |
32 | 32 | import org.thingsboard.server.common.data.page.PageData; |
33 | +import org.thingsboard.server.common.data.page.PageLink; | |
33 | 34 | import org.thingsboard.server.common.data.query.AlarmData; |
34 | 35 | import org.thingsboard.server.common.data.query.AlarmDataQuery; |
35 | 36 | import org.thingsboard.server.dao.DaoUtil; |
... | ... | @@ -161,4 +162,9 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A |
161 | 162 | public Set<AlarmSeverity> findAlarmSeverities(TenantId tenantId, EntityId entityId, Set<AlarmStatus> statuses) { |
162 | 163 | return alarmRepository.findAlarmSeverities(tenantId.getId(), entityId.getId(), entityId.getEntityType().name(), statuses); |
163 | 164 | } |
165 | + | |
166 | + @Override | |
167 | + public PageData<UUID> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink) { | |
168 | + return DaoUtil.pageToPageData(alarmRepository.findAlarmsIdsByEndTsBefore(time, DaoUtil.toPageable(pageLink))); | |
169 | + } | |
164 | 170 | } | ... | ... |
... | ... | @@ -74,4 +74,10 @@ public class JpaTenantDao extends JpaAbstractSearchTextDao<TenantEntity, Tenant> |
74 | 74 | Objects.toString(pageLink.getTextSearch(), ""), |
75 | 75 | DaoUtil.toPageable(pageLink, TenantInfoEntity.tenantInfoColumnMap))); |
76 | 76 | } |
77 | + | |
78 | + @Override | |
79 | + public PageData<TenantId> findTenantsIds(PageLink pageLink) { | |
80 | + return DaoUtil.pageToPageData(tenantRepository.findTenantsIds(DaoUtil.toPageable(pageLink))).mapData(TenantId::new); | |
81 | + } | |
82 | + | |
77 | 83 | } | ... | ... |
... | ... | @@ -50,4 +50,8 @@ public interface TenantRepository extends PagingAndSortingRepository<TenantEntit |
50 | 50 | Page<TenantInfoEntity> findTenantInfoByRegionNextPage(@Param("region") String region, |
51 | 51 | @Param("textSearch") String textSearch, |
52 | 52 | Pageable pageable); |
53 | + | |
54 | + @Query("SELECT t.id FROM TenantEntity t") | |
55 | + Page<UUID> findTenantsIds(Pageable pageable); | |
56 | + | |
53 | 57 | } | ... | ... |
... | ... | @@ -46,5 +46,7 @@ public interface TenantDao extends Dao<Tenant> { |
46 | 46 | PageData<Tenant> findTenantsByRegion(TenantId tenantId, String region, PageLink pageLink); |
47 | 47 | |
48 | 48 | PageData<TenantInfo> findTenantInfosByRegion(TenantId tenantId, String region, PageLink pageLink); |
49 | - | |
49 | + | |
50 | + PageData<TenantId> findTenantsIds(PageLink pageLink); | |
51 | + | |
50 | 52 | } | ... | ... |