Commit bafbf7b239386a86077f32abc67e33befc101a4c

Authored by YevhenBondarenko
1 parent 5626c30f

added rpc ttl

  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.ttl.rpc;
  17 +
  18 +import lombok.RequiredArgsConstructor;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.scheduling.annotation.Scheduled;
  22 +import org.springframework.stereotype.Service;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.data.page.PageData;
  25 +import org.thingsboard.server.common.data.page.PageLink;
  26 +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
  27 +import org.thingsboard.server.common.msg.queue.ServiceType;
  28 +import org.thingsboard.server.dao.rpc.RpcDao;
  29 +import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
  30 +import org.thingsboard.server.dao.tenant.TenantDao;
  31 +import org.thingsboard.server.queue.discovery.PartitionService;
  32 +import org.thingsboard.server.queue.util.TbCoreComponent;
  33 +
  34 +import java.util.Date;
  35 +import java.util.Optional;
  36 +import java.util.concurrent.TimeUnit;
  37 +
  38 +@TbCoreComponent
  39 +@Service
  40 +@Slf4j
  41 +@RequiredArgsConstructor
  42 +public class RpcCleanUpService {
  43 + @Value("${sql.ttl.rpc.enabled}")
  44 + private boolean ttlTaskExecutionEnabled;
  45 +
  46 + private final TenantDao tenantDao;
  47 + private final PartitionService partitionService;
  48 + private final TbTenantProfileCache tenantProfileCache;
  49 + private final RpcDao rpcDao;
  50 +
  51 + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}")
  52 + public void cleanUp() {
  53 + if (ttlTaskExecutionEnabled) {
  54 + PageLink tenantsBatchRequest = new PageLink(10_000, 0);
  55 + PageData<TenantId> tenantsIds;
  56 + do {
  57 + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest);
  58 + for (TenantId tenantId : tenantsIds.getData()) {
  59 + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
  60 + continue;
  61 + }
  62 +
  63 + Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
  64 + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) {
  65 + continue;
  66 + }
  67 +
  68 + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays());
  69 + long expirationTime = System.currentTimeMillis() - ttl;
  70 +
  71 + long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime);
  72 +
  73 + if (totalRemoved > 0) {
  74 + log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
  75 + }
  76 + }
  77 +
  78 + tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
  79 + } while (tenantsIds.hasNext());
  80 + }
  81 + }
  82 +
  83 +}
@@ -276,6 +276,9 @@ sql: @@ -276,6 +276,9 @@ sql:
276 alarms: 276 alarms:
277 checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours 277 checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
278 removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches 278 removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
  279 + rpc:
  280 + enabled: "${SQL_TTL_RPC_ENABLED:true}"
  281 + checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
279 282
280 # Actor system parameters 283 # Actor system parameters
281 actors: 284 actors:
@@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura @@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
56 56
57 private int defaultStorageTtlDays; 57 private int defaultStorageTtlDays;
58 private int alarmsTtlDays; 58 private int alarmsTtlDays;
  59 + private int rpcTtlDays;
59 60
60 private double warnThreshold; 61 private double warnThreshold;
61 62
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.dao.rpc; 16 package org.thingsboard.server.dao.rpc;
17 17
18 import org.thingsboard.server.common.data.id.DeviceId; 18 import org.thingsboard.server.common.data.id.DeviceId;
  19 +import org.thingsboard.server.common.data.id.TenantId;
19 import org.thingsboard.server.common.data.page.PageData; 20 import org.thingsboard.server.common.data.page.PageData;
20 import org.thingsboard.server.common.data.page.PageLink; 21 import org.thingsboard.server.common.data.page.PageLink;
21 import org.thingsboard.server.common.data.rpc.Rpc; 22 import org.thingsboard.server.common.data.rpc.Rpc;
@@ -24,4 +25,6 @@ import org.thingsboard.server.dao.Dao; @@ -24,4 +25,6 @@ import org.thingsboard.server.dao.Dao;
24 25
25 public interface RpcDao extends Dao<Rpc> { 26 public interface RpcDao extends Dao<Rpc> {
26 PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); 27 PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
  28 +
  29 + Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime);
27 } 30 }
@@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
20 import org.springframework.data.repository.CrudRepository; 20 import org.springframework.data.repository.CrudRepository;
21 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
22 import org.thingsboard.server.common.data.id.DeviceId; 22 import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
23 import org.thingsboard.server.common.data.page.PageData; 24 import org.thingsboard.server.common.data.page.PageData;
24 import org.thingsboard.server.common.data.page.PageLink; 25 import org.thingsboard.server.common.data.page.PageLink;
25 import org.thingsboard.server.common.data.rpc.Rpc; 26 import org.thingsboard.server.common.data.rpc.Rpc;
@@ -53,4 +54,9 @@ public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao @@ -53,4 +54,9 @@ public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao
53 public PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { 54 public PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
54 return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); 55 return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
55 } 56 }
  57 +
  58 + @Override
  59 + public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) {
  60 + return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime);
  61 + }
56 } 62 }
@@ -17,7 +17,9 @@ package org.thingsboard.server.dao.sql.rpc; @@ -17,7 +17,9 @@ package org.thingsboard.server.dao.sql.rpc;
17 17
18 import org.springframework.data.domain.Page; 18 import org.springframework.data.domain.Page;
19 import org.springframework.data.domain.Pageable; 19 import org.springframework.data.domain.Pageable;
  20 +import org.springframework.data.jpa.repository.Query;
20 import org.springframework.data.repository.CrudRepository; 21 import org.springframework.data.repository.CrudRepository;
  22 +import org.springframework.data.repository.query.Param;
21 import org.thingsboard.server.common.data.rpc.RpcStatus; 23 import org.thingsboard.server.common.data.rpc.RpcStatus;
22 import org.thingsboard.server.dao.model.sql.RpcEntity; 24 import org.thingsboard.server.dao.model.sql.RpcEntity;
23 25
@@ -25,4 +27,8 @@ import java.util.UUID; @@ -25,4 +27,8 @@ import java.util.UUID;
25 27
26 public interface RpcRepository extends CrudRepository<RpcEntity, UUID> { 28 public interface RpcRepository extends CrudRepository<RpcEntity, UUID> {
27 Page<RpcEntity> findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable); 29 Page<RpcEntity> findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable);
  30 +
  31 + @Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted",
  32 + nativeQuery = true)
  33 + Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime);
28 } 34 }
@@ -197,6 +197,18 @@ @@ -197,6 +197,18 @@
197 </mat-error> 197 </mat-error>
198 </mat-form-field> 198 </mat-form-field>
199 <mat-form-field class="mat-block"> 199 <mat-form-field class="mat-block">
  200 + <mat-label translate>tenant-profile.rpc-ttl-days</mat-label>
  201 + <input matInput required min="0" step="1"
  202 + formControlName="rpcTtlDays"
  203 + type="number">
  204 + <mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('required')">
  205 + {{ 'tenant-profile.rpc-ttl-days-required' | translate}}
  206 + </mat-error>
  207 + <mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('min')">
  208 + {{ 'tenant-profile.rpc-ttl-days-days-range' | translate}}
  209 + </mat-error>
  210 + </mat-form-field>
  211 + <mat-form-field class="mat-block">
200 <mat-label translate>tenant-profile.max-rule-node-executions-per-message</mat-label> 212 <mat-label translate>tenant-profile.max-rule-node-executions-per-message</mat-label>
201 <input matInput required min="0" step="1" 213 <input matInput required min="0" step="1"
202 formControlName="maxRuleNodeExecutionsPerMessage" 214 formControlName="maxRuleNodeExecutionsPerMessage"
@@ -77,7 +77,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA @@ -77,7 +77,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
77 maxSms: [null, [Validators.required, Validators.min(0)]], 77 maxSms: [null, [Validators.required, Validators.min(0)]],
78 maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]], 78 maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]],
79 defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]], 79 defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]],
80 - alarmsTtlDays: [null, [Validators.required, Validators.min(0)]] 80 + alarmsTtlDays: [null, [Validators.required, Validators.min(0)]],
  81 + rpcTtlDays: [null, [Validators.required, Validators.min(0)]]
81 }); 82 });
82 this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => { 83 this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => {
83 this.updateModel(); 84 this.updateModel();
@@ -51,6 +51,8 @@ export interface DefaultTenantProfileConfiguration { @@ -51,6 +51,8 @@ export interface DefaultTenantProfileConfiguration {
51 maxSms: number; 51 maxSms: number;
52 52
53 defaultStorageTtlDays: number; 53 defaultStorageTtlDays: number;
  54 + alarmsTtlDays: number;
  55 + rpcTtlDays: number;
54 } 56 }
55 57
56 export type TenantProfileConfigurations = DefaultTenantProfileConfiguration; 58 export type TenantProfileConfigurations = DefaultTenantProfileConfiguration;
@@ -81,7 +83,9 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan @@ -81,7 +83,9 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan
81 maxRuleNodeExecutionsPerMessage: 0, 83 maxRuleNodeExecutionsPerMessage: 0,
82 maxEmails: 0, 84 maxEmails: 0,
83 maxSms: 0, 85 maxSms: 0,
84 - defaultStorageTtlDays: 0 86 + defaultStorageTtlDays: 0,
  87 + alarmsTtlDays: 0,
  88 + rpcTtlDays: 0
85 }; 89 };
86 configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; 90 configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT};
87 break; 91 break;
@@ -2570,6 +2570,9 @@ @@ -2570,6 +2570,9 @@
2570 "alarms-ttl-days": "Alarms TTL days (0 - unlimited)", 2570 "alarms-ttl-days": "Alarms TTL days (0 - unlimited)",
2571 "alarms-ttl-days-required": "Alarms TTL days required", 2571 "alarms-ttl-days-required": "Alarms TTL days required",
2572 "alarms-ttl-days-days-range": "Alarms TTL days can't be negative", 2572 "alarms-ttl-days-days-range": "Alarms TTL days can't be negative",
  2573 + "rpc-ttl-days": "RPC TTL days (0 - unlimited)",
  2574 + "rpc-ttl-days-required": "RPC TTL days required",
  2575 + "rpc-ttl-days-days-range": "RPC TTL days can't be negative",
2573 "max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)", 2576 "max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)",
2574 "max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.", 2577 "max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.",
2575 "max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative", 2578 "max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative",