Commit 4e5f485106f0cefd9a2fd6b0fc6fd79f388238ad

Authored by Andrew Shvayka
Committed by GitHub
2 parents 8e5c82d8 b578659b

Merge pull request #4682 from thingsboard/feature/cleanup-call-deduplication

Refactoring of the TTL cleanup services
Showing 26 changed files with 487 additions and 255 deletions
@@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar @@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
18 LANGUAGE plpgsql AS 18 LANGUAGE plpgsql AS
19 $$ 19 $$
20 DECLARE 20 DECLARE
21 - max_tenant_ttl bigint;  
22 - max_customer_ttl bigint;  
23 - max_ttl bigint;  
24 - date timestamp;  
25 - partition_by_max_ttl_date varchar;  
26 - partition_month varchar;  
27 - partition_day varchar;  
28 - partition_year varchar;  
29 - partition varchar;  
30 - partition_to_delete varchar;  
31 - 21 + max_tenant_ttl bigint;
  22 + max_customer_ttl bigint;
  23 + max_ttl bigint;
  24 + date timestamp;
  25 + partition_by_max_ttl_date varchar;
  26 + partition_by_max_ttl_month varchar;
  27 + partition_by_max_ttl_day varchar;
  28 + partition_by_max_ttl_year varchar;
  29 + partition varchar;
  30 + partition_year integer;
  31 + partition_month integer;
  32 + partition_day integer;
32 33
33 BEGIN 34 BEGIN
34 SELECT max(attribute_kv.long_v) 35 SELECT max(attribute_kv.long_v)
@@ -45,53 +46,138 @@ BEGIN @@ -45,53 +46,138 @@ BEGIN
45 if max_ttl IS NOT NULL AND max_ttl > 0 THEN 46 if max_ttl IS NOT NULL AND max_ttl > 0 THEN
46 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); 47 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
47 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); 48 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
  49 + RAISE NOTICE 'Date by max ttl: %', date;
48 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; 50 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
49 IF partition_by_max_ttl_date IS NOT NULL THEN 51 IF partition_by_max_ttl_date IS NOT NULL THEN
50 CASE 52 CASE
51 WHEN partition_type = 'DAYS' THEN 53 WHEN partition_type = 'DAYS' THEN
52 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);  
53 - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);  
54 - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); 54 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  55 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  56 + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
55 WHEN partition_type = 'MONTHS' THEN 57 WHEN partition_type = 'MONTHS' THEN
56 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);  
57 - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); 58 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  59 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
58 ELSE 60 ELSE
59 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); 61 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
60 END CASE; 62 END CASE;
61 - FOR partition IN SELECT tablename  
62 - FROM pg_tables  
63 - WHERE schemaname = 'public'  
64 - AND tablename like 'ts_kv_' || '%'  
65 - AND tablename != 'ts_kv_latest'  
66 - AND tablename != 'ts_kv_dictionary'  
67 - AND tablename != 'ts_kv_indefinite'  
68 - LOOP  
69 - IF partition != partition_by_max_ttl_date THEN  
70 - IF partition_year IS NOT NULL THEN  
71 - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN  
72 - partition_to_delete := partition;  
73 - ELSE  
74 - IF partition_month IS NOT NULL THEN  
75 - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN  
76 - partition_to_delete := partition; 63 + IF partition_by_max_ttl_year IS NULL THEN
  64 + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
  65 + ELSE
  66 + IF partition_type = 'YEARS' THEN
  67 + FOR partition IN SELECT tablename
  68 + FROM pg_tables
  69 + WHERE schemaname = 'public'
  70 + AND tablename like 'ts_kv_' || '%'
  71 + AND tablename != 'ts_kv_latest'
  72 + AND tablename != 'ts_kv_dictionary'
  73 + AND tablename != 'ts_kv_indefinite'
  74 + AND tablename != partition_by_max_ttl_date
  75 + LOOP
  76 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  77 + IF partition_year < partition_by_max_ttl_year::integer THEN
  78 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  79 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  80 + deleted := deleted + 1;
  81 + END IF;
  82 + END LOOP;
  83 + ELSE
  84 + IF partition_type = 'MONTHS' THEN
  85 + IF partition_by_max_ttl_month IS NULL THEN
  86 + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
  87 + ELSE
  88 + FOR partition IN SELECT tablename
  89 + FROM pg_tables
  90 + WHERE schemaname = 'public'
  91 + AND tablename like 'ts_kv_' || '%'
  92 + AND tablename != 'ts_kv_latest'
  93 + AND tablename != 'ts_kv_dictionary'
  94 + AND tablename != 'ts_kv_indefinite'
  95 + AND tablename != partition_by_max_ttl_date
  96 + LOOP
  97 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  98 + IF partition_year > partition_by_max_ttl_year::integer THEN
  99 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  100 + CONTINUE;
77 ELSE 101 ELSE
78 - IF partition_day IS NOT NULL THEN  
79 - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN  
80 - partition_to_delete := partition; 102 + IF partition_year < partition_by_max_ttl_year::integer THEN
  103 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  104 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  105 + deleted := deleted + 1;
  106 + ELSE
  107 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  108 + IF partition_year = partition_by_max_ttl_year::integer THEN
  109 + IF partition_month >= partition_by_max_ttl_month::integer THEN
  110 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  111 + CONTINUE;
  112 + ELSE
  113 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  114 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  115 + deleted := deleted + 1;
  116 + END IF;
81 END IF; 117 END IF;
82 END IF; 118 END IF;
83 END IF; 119 END IF;
  120 + END LOOP;
  121 + END IF;
  122 + ELSE
  123 + IF partition_type = 'DAYS' THEN
  124 + IF partition_by_max_ttl_month IS NULL THEN
  125 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
  126 + ELSE
  127 + IF partition_by_max_ttl_day IS NULL THEN
  128 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
  129 + ELSE
  130 + FOR partition IN SELECT tablename
  131 + FROM pg_tables
  132 + WHERE schemaname = 'public'
  133 + AND tablename like 'ts_kv_' || '%'
  134 + AND tablename != 'ts_kv_latest'
  135 + AND tablename != 'ts_kv_dictionary'
  136 + AND tablename != 'ts_kv_indefinite'
  137 + AND tablename != partition_by_max_ttl_date
  138 + LOOP
  139 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  140 + IF partition_year > partition_by_max_ttl_year::integer THEN
  141 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  142 + CONTINUE;
  143 + ELSE
  144 + IF partition_year < partition_by_max_ttl_year::integer THEN
  145 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  146 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  147 + deleted := deleted + 1;
  148 + ELSE
  149 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  150 + IF partition_month > partition_by_max_ttl_month::integer THEN
  151 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  152 + CONTINUE;
  153 + ELSE
  154 + IF partition_month < partition_by_max_ttl_month::integer THEN
  155 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  156 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  157 + deleted := deleted + 1;
  158 + ELSE
  159 + partition_day := SPLIT_PART(partition, '_', 5)::integer;
  160 + IF partition_day >= partition_by_max_ttl_day::integer THEN
  161 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  162 + CONTINUE;
  163 + ELSE
  164 + IF partition_day < partition_by_max_ttl_day::integer THEN
  165 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  166 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  167 + deleted := deleted + 1;
  168 + END IF;
  169 + END IF;
  170 + END IF;
  171 + END IF;
  172 + END IF;
  173 + END IF;
  174 + END LOOP;
84 END IF; 175 END IF;
85 END IF; 176 END IF;
86 END IF; 177 END IF;
87 - IF partition_to_delete IS NOT NULL THEN  
88 - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;  
89 - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);  
90 - partition_to_delete := NULL;  
91 - deleted := deleted + 1;  
92 - END IF;  
93 END IF; 178 END IF;
94 - END LOOP; 179 + END IF;
  180 + END IF;
95 END IF; 181 END IF;
96 END IF; 182 END IF;
97 END 183 END
@@ -107,8 +193,6 @@ BEGIN @@ -107,8 +193,6 @@ BEGIN
107 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); 193 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
108 WHEN partition_type = 'YEARS' THEN 194 WHEN partition_type = 'YEARS' THEN
109 partition := 'ts_kv_' || to_char(date, 'yyyy'); 195 partition := 'ts_kv_' || to_char(date, 'yyyy');
110 - WHEN partition_type = 'INDEFINITE' THEN  
111 - partition := NULL;  
112 ELSE 196 ELSE
113 partition := NULL; 197 partition := NULL;
114 END CASE; 198 END CASE;
@@ -15,8 +15,13 @@ @@ -15,8 +15,13 @@
15 */ 15 */
16 package org.thingsboard.server.service.ttl; 16 package org.thingsboard.server.service.ttl;
17 17
  18 +import lombok.RequiredArgsConstructor;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.beans.factory.annotation.Value; 21 import org.springframework.beans.factory.annotation.Value;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.msg.queue.ServiceType;
  24 +import org.thingsboard.server.queue.discovery.PartitionService;
20 25
21 import java.sql.Connection; 26 import java.sql.Connection;
22 import java.sql.DriverManager; 27 import java.sql.DriverManager;
@@ -27,43 +32,12 @@ import java.sql.Statement; @@ -27,43 +32,12 @@ import java.sql.Statement;
27 32
28 33
29 @Slf4j 34 @Slf4j
  35 +@RequiredArgsConstructor
30 public abstract class AbstractCleanUpService { 36 public abstract class AbstractCleanUpService {
31 37
32 - @Value("${spring.datasource.url}")  
33 - protected String dbUrl; 38 + private final PartitionService partitionService;
34 39
35 - @Value("${spring.datasource.username}")  
36 - protected String dbUserName;  
37 -  
38 - @Value("${spring.datasource.password}")  
39 - protected String dbPassword;  
40 -  
41 - protected long executeQuery(Connection conn, String query) throws SQLException {  
42 - try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) {  
43 - if (log.isDebugEnabled()) {  
44 - getWarnings(statement);  
45 - }  
46 - resultSet.next();  
47 - return resultSet.getLong(1);  
48 - }  
49 - }  
50 -  
51 - protected void getWarnings(Statement statement) throws SQLException {  
52 - SQLWarning warnings = statement.getWarnings();  
53 - if (warnings != null) {  
54 - log.debug("{}", warnings.getMessage());  
55 - SQLWarning nextWarning = warnings.getNextWarning();  
56 - while (nextWarning != null) {  
57 - log.debug("{}", nextWarning.getMessage());  
58 - nextWarning = nextWarning.getNextWarning();  
59 - }  
60 - } 40 + protected boolean isSystemTenantPartitionMine(){
  41 + return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
61 } 42 }
62 -  
63 - protected abstract void doCleanUp(Connection connection) throws SQLException;  
64 -  
65 - protected Connection getConnection() throws SQLException {  
66 - return DriverManager.getConnection(dbUrl, dbUserName, dbPassword);  
67 - }  
68 -  
69 } 43 }
application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.service.ttl.alarms; 16 +package org.thingsboard.server.service.ttl;
17 17
18 import lombok.RequiredArgsConstructor; 18 import lombok.RequiredArgsConstructor;
19 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
52 @Slf4j 52 @Slf4j
53 @RequiredArgsConstructor 53 @RequiredArgsConstructor
54 public class AlarmsCleanUpService { 54 public class AlarmsCleanUpService {
  55 +
55 @Value("${sql.ttl.alarms.removal_batch_size}") 56 @Value("${sql.ttl.alarms.removal_batch_size}")
56 private Integer removalBatchSize; 57 private Integer removalBatchSize;
57 58
application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java
@@ -13,20 +13,18 @@ @@ -13,20 +13,18 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.service.ttl.edge; 16 +package org.thingsboard.server.service.ttl;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.beans.factory.annotation.Value;
20 import org.springframework.scheduling.annotation.Scheduled; 20 import org.springframework.scheduling.annotation.Scheduled;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 -import org.thingsboard.server.dao.util.PsqlDao; 22 +import org.thingsboard.server.dao.edge.EdgeService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
23 import org.thingsboard.server.service.ttl.AbstractCleanUpService; 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
24 26
25 -import java.sql.Connection;  
26 -import java.sql.DriverManager;  
27 -import java.sql.SQLException;  
28 -  
29 -@PsqlDao 27 +@TbCoreComponent
30 @Slf4j 28 @Slf4j
31 @Service 29 @Service
32 public class EdgeEventsCleanUpService extends AbstractCleanUpService { 30 public class EdgeEventsCleanUpService extends AbstractCleanUpService {
@@ -37,20 +35,18 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService { @@ -37,20 +35,18 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
37 @Value("${sql.ttl.edge_events.enabled}") 35 @Value("${sql.ttl.edge_events.enabled}")
38 private boolean ttlTaskExecutionEnabled; 36 private boolean ttlTaskExecutionEnabled;
39 37
  38 + private final EdgeService edgeService;
  39 +
  40 + public EdgeEventsCleanUpService(PartitionService partitionService, EdgeService edgeService) {
  41 + super(partitionService);
  42 + this.edgeService = edgeService;
  43 + }
  44 +
40 @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") 45 @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
41 public void cleanUp() { 46 public void cleanUp() {
42 - if (ttlTaskExecutionEnabled) {  
43 - try (Connection conn = getConnection()) {  
44 - doCleanUp(conn);  
45 - } catch (SQLException e) {  
46 - log.error("SQLException occurred during TTL task execution ", e);  
47 - } 47 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  48 + edgeService.cleanupEvents(ttl);
48 } 49 }
49 } 50 }
50 51
51 - @Override  
52 - protected void doCleanUp(Connection connection) throws SQLException {  
53 - long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);");  
54 - log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);  
55 - }  
56 } 52 }
application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java
@@ -13,20 +13,18 @@ @@ -13,20 +13,18 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.service.ttl.events; 16 +package org.thingsboard.server.service.ttl;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.beans.factory.annotation.Value;
20 import org.springframework.scheduling.annotation.Scheduled; 20 import org.springframework.scheduling.annotation.Scheduled;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 -import org.thingsboard.server.dao.util.PsqlDao; 22 +import org.thingsboard.server.dao.event.EventService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
23 import org.thingsboard.server.service.ttl.AbstractCleanUpService; 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
24 26
25 -import java.sql.Connection;  
26 -import java.sql.DriverManager;  
27 -import java.sql.SQLException;  
28 -  
29 -@PsqlDao 27 +@TbCoreComponent
30 @Slf4j 28 @Slf4j
31 @Service 29 @Service
32 public class EventsCleanUpService extends AbstractCleanUpService { 30 public class EventsCleanUpService extends AbstractCleanUpService {
@@ -40,20 +38,18 @@ public class EventsCleanUpService extends AbstractCleanUpService { @@ -40,20 +38,18 @@ public class EventsCleanUpService extends AbstractCleanUpService {
40 @Value("${sql.ttl.events.enabled}") 38 @Value("${sql.ttl.events.enabled}")
41 private boolean ttlTaskExecutionEnabled; 39 private boolean ttlTaskExecutionEnabled;
42 40
  41 + private final EventService eventService;
  42 +
  43 + public EventsCleanUpService(PartitionService partitionService, EventService eventService) {
  44 + super(partitionService);
  45 + this.eventService = eventService;
  46 + }
  47 +
43 @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") 48 @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
44 public void cleanUp() { 49 public void cleanUp() {
45 - if (ttlTaskExecutionEnabled) {  
46 - try (Connection conn = getConnection()) {  
47 - doCleanUp(conn);  
48 - } catch (SQLException e) {  
49 - log.error("SQLException occurred during TTL task execution ", e);  
50 - } 50 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  51 + eventService.cleanupEvents(ttl, debugTtl);
51 } 52 }
52 } 53 }
53 54
54 - @Override  
55 - protected void doCleanUp(Connection connection) throws SQLException {  
56 - long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);");  
57 - log.info("Total events removed by TTL: [{}]", totalEventsRemoved);  
58 - }  
59 } 55 }
application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java
@@ -13,19 +13,21 @@ @@ -13,19 +13,21 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.service.ttl.timeseries; 16 +package org.thingsboard.server.service.ttl;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.beans.factory.annotation.Value;
20 import org.springframework.scheduling.annotation.Scheduled; 20 import org.springframework.scheduling.annotation.Scheduled;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
21 import org.thingsboard.server.service.ttl.AbstractCleanUpService; 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
22 26
23 -import java.sql.Connection;  
24 -import java.sql.DriverManager;  
25 -import java.sql.SQLException;  
26 - 27 +@TbCoreComponent
27 @Slf4j 28 @Slf4j
28 -public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService { 29 +@Service
  30 +public class TimeseriesCleanUpService extends AbstractCleanUpService {
29 31
30 @Value("${sql.ttl.ts.ts_key_value_ttl}") 32 @Value("${sql.ttl.ts.ts_key_value_ttl}")
31 protected long systemTtl; 33 protected long systemTtl;
@@ -33,14 +35,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe @@ -33,14 +35,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe
33 @Value("${sql.ttl.ts.enabled}") 35 @Value("${sql.ttl.ts.enabled}")
34 private boolean ttlTaskExecutionEnabled; 36 private boolean ttlTaskExecutionEnabled;
35 37
  38 + private final TimeseriesService timeseriesService;
  39 +
  40 + public TimeseriesCleanUpService(PartitionService partitionService, TimeseriesService timeseriesService) {
  41 + super(partitionService);
  42 + this.timeseriesService = timeseriesService;
  43 + }
  44 +
36 @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}") 45 @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}")
37 public void cleanUp() { 46 public void cleanUp() {
38 - if (ttlTaskExecutionEnabled) {  
39 - try (Connection conn = getConnection()) {  
40 - doCleanUp(conn);  
41 - } catch (SQLException e) {  
42 - log.error("SQLException occurred during TTL task execution ", e);  
43 - } 47 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  48 + timeseriesService.cleanup(systemTtl);
44 } 49 }
45 } 50 }
46 51
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.service.ttl.timeseries;  
17 -  
18 -import lombok.extern.slf4j.Slf4j;  
19 -import org.springframework.beans.factory.annotation.Value;  
20 -import org.springframework.stereotype.Service;  
21 -import org.thingsboard.server.dao.model.ModelConstants;  
22 -import org.thingsboard.server.dao.util.PsqlDao;  
23 -import org.thingsboard.server.dao.util.SqlTsDao;  
24 -  
25 -import java.sql.Connection;  
26 -import java.sql.SQLException;  
27 -  
28 -@SqlTsDao  
29 -@PsqlDao  
30 -@Service  
31 -@Slf4j  
32 -public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {  
33 -  
34 - @Value("${sql.postgres.ts_key_value_partitioning}")  
35 - private String partitionType;  
36 -  
37 - @Override  
38 - protected void doCleanUp(Connection connection) throws SQLException {  
39 - long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);");  
40 - log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved);  
41 - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");  
42 - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);  
43 - }  
44 -}  
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.service.ttl.timeseries;  
17 -  
18 -import lombok.extern.slf4j.Slf4j;  
19 -import org.springframework.stereotype.Service;  
20 -import org.thingsboard.server.dao.model.ModelConstants;  
21 -import org.thingsboard.server.dao.util.TimescaleDBTsDao;  
22 -  
23 -import java.sql.Connection;  
24 -import java.sql.SQLException;  
25 -  
26 -@TimescaleDBTsDao  
27 -@Service  
28 -@Slf4j  
29 -public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {  
30 -  
31 - @Override  
32 - protected void doCleanUp(Connection connection) throws SQLException {  
33 - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");  
34 - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);  
35 - }  
36 -}  
@@ -93,4 +93,6 @@ public interface EdgeService { @@ -93,4 +93,6 @@ public interface EdgeService {
93 Object activateInstance(String licenseSecret, String releaseDate); 93 Object activateInstance(String licenseSecret, String releaseDate);
94 94
95 String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); 95 String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId);
  96 +
  97 + void cleanupEvents(long ttl);
96 } 98 }
@@ -46,4 +46,6 @@ public interface EventService { @@ -46,4 +46,6 @@ public interface EventService {
46 46
47 void removeEvents(TenantId tenantId, EntityId entityId); 47 void removeEvents(TenantId tenantId, EntityId entityId);
48 48
  49 + void cleanupEvents(long ttl, long debugTtl);
  50 +
49 } 51 }
@@ -52,4 +52,6 @@ public interface TimeseriesService { @@ -52,4 +52,6 @@ public interface TimeseriesService {
52 List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); 52 List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
53 53
54 List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds); 54 List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);
  55 +
  56 + void cleanup(long systemTtl);
55 } 57 }
@@ -176,4 +176,10 @@ public interface EdgeDao extends Dao<Edge> { @@ -176,4 +176,10 @@ public interface EdgeDao extends Dao<Edge> {
176 * @return the list of rule chain objects 176 * @return the list of rule chain objects
177 */ 177 */
178 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId); 178 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId);
  179 +
  180 + /**
  181 + * Executes stored procedure to cleanup old edge events.
  182 + * @param ttl the ttl for edge events in seconds
  183 + */
  184 + void cleanupEvents(long ttl);
179 } 185 }
@@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
627 return result.toString(); 627 return result.toString();
628 } 628 }
629 629
  630 + @Override
  631 + public void cleanupEvents(long ttl) {
  632 + edgeDao.cleanupEvents(ttl);
  633 + }
  634 +
630 private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { 635 private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
631 List<RuleChain> result = new ArrayList<>(); 636 List<RuleChain> result = new ArrayList<>();
632 PageLink pageLink = new PageLink(DEFAULT_LIMIT); 637 PageLink pageLink = new PageLink(DEFAULT_LIMIT);
@@ -131,6 +131,11 @@ public class BaseEventService implements EventService { @@ -131,6 +131,11 @@ public class BaseEventService implements EventService {
131 } while (eventPageData.hasNext()); 131 } while (eventPageData.hasNext());
132 } 132 }
133 133
  134 + @Override
  135 + public void cleanupEvents(long ttl, long debugTtl) {
  136 + eventDao.cleanupEvents(ttl, debugTtl);
  137 + }
  138 +
134 private DataValidator<Event> eventValidator = 139 private DataValidator<Event> eventValidator =
135 new DataValidator<Event>() { 140 new DataValidator<Event>() {
136 @Override 141 @Override
@@ -102,4 +102,10 @@ public interface EventDao extends Dao<Event> { @@ -102,4 +102,10 @@ public interface EventDao extends Dao<Event> {
102 */ 102 */
103 List<Event> findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit); 103 List<Event> findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit);
104 104
  105 + /**
  106 + * Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
  107 + * @param otherEventsTtl the ttl for events in seconds
  108 + * @param debugEventsTtl the ttl for debug events in seconds
  109 + */
  110 + void cleanupEvents(long otherEventsTtl, long debugEventsTtl);
105 } 111 }
@@ -15,11 +15,33 @@ @@ -15,11 +15,33 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sql; 16 package org.thingsboard.server.dao.sql;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
19 20
  21 +import javax.sql.DataSource;
  22 +import java.sql.SQLException;
  23 +import java.sql.SQLWarning;
  24 +import java.sql.Statement;
  25 +
  26 +@Slf4j
20 public abstract class JpaAbstractDaoListeningExecutorService { 27 public abstract class JpaAbstractDaoListeningExecutorService {
21 28
22 @Autowired 29 @Autowired
23 protected JpaExecutorService service; 30 protected JpaExecutorService service;
24 31
  32 + @Autowired
  33 + protected DataSource dataSource;
  34 +
  35 + protected void printWarnings(Statement statement) throws SQLException {
  36 + SQLWarning warnings = statement.getWarnings();
  37 + if (warnings != null) {
  38 + log.debug("{}", warnings.getMessage());
  39 + SQLWarning nextWarning = warnings.getNextWarning();
  40 + while (nextWarning != null) {
  41 + log.debug("{}", nextWarning.getMessage());
  42 + nextWarning = nextWarning.getNextWarning();
  43 + }
  44 + }
  45 + }
  46 +
25 } 47 }
@@ -40,6 +40,10 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity; @@ -40,6 +40,10 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity;
40 import org.thingsboard.server.dao.relation.RelationDao; 40 import org.thingsboard.server.dao.relation.RelationDao;
41 import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; 41 import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
42 42
  43 +import java.sql.Connection;
  44 +import java.sql.PreparedStatement;
  45 +import java.sql.ResultSet;
  46 +import java.sql.SQLException;
43 import java.util.ArrayList; 47 import java.util.ArrayList;
44 import java.util.Collections; 48 import java.util.Collections;
45 import java.util.List; 49 import java.util.List;
@@ -194,6 +198,24 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao<EdgeEntity, Edge> imple @@ -194,6 +198,24 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao<EdgeEntity, Edge> imple
194 return transformFromRelationToEdge(tenantId, relations); 198 return transformFromRelationToEdge(tenantId, relations);
195 } 199 }
196 200
  201 + @Override
  202 + public void cleanupEvents(long ttl) {
  203 + log.info("Going to cleanup old edge events using ttl: {}s", ttl);
  204 + try (Connection connection = dataSource.getConnection();
  205 + PreparedStatement stmt = connection.prepareStatement("call cleanup_edge_events_by_ttl(?,?)")) {
  206 + stmt.setLong(1, ttl);
  207 + stmt.setLong(2, 0);
  208 + stmt.execute();
  209 + printWarnings(stmt);
  210 + try (ResultSet resultSet = stmt.getResultSet()) {
  211 + resultSet.next();
  212 + log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1));
  213 + }
  214 + } catch (SQLException e) {
  215 + log.error("SQLException occurred during edge events TTL task execution ", e);
  216 + }
  217 + }
  218 +
197 private ListenableFuture<List<Edge>> transformFromRelationToEdge(UUID tenantId, ListenableFuture<List<EntityRelation>> relations) { 219 private ListenableFuture<List<Edge>> transformFromRelationToEdge(UUID tenantId, ListenableFuture<List<EntityRelation>> relations) {
198 return Futures.transformAsync(relations, input -> { 220 return Futures.transformAsync(relations, input -> {
199 List<ListenableFuture<Edge>> edgeFutures = new ArrayList<>(input.size()); 221 List<ListenableFuture<Edge>> edgeFutures = new ArrayList<>(input.size());
@@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event; @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event;
27 import org.thingsboard.server.common.data.event.DebugEvent; 27 import org.thingsboard.server.common.data.event.DebugEvent;
28 import org.thingsboard.server.common.data.event.ErrorEventFilter; 28 import org.thingsboard.server.common.data.event.ErrorEventFilter;
29 import org.thingsboard.server.common.data.event.EventFilter; 29 import org.thingsboard.server.common.data.event.EventFilter;
30 -import org.thingsboard.server.common.data.event.EventType;  
31 import org.thingsboard.server.common.data.event.LifeCycleEventFilter; 30 import org.thingsboard.server.common.data.event.LifeCycleEventFilter;
32 import org.thingsboard.server.common.data.event.StatisticsEventFilter; 31 import org.thingsboard.server.common.data.event.StatisticsEventFilter;
33 import org.thingsboard.server.common.data.id.EntityId; 32 import org.thingsboard.server.common.data.id.EntityId;
@@ -40,6 +39,10 @@ import org.thingsboard.server.dao.event.EventDao; @@ -40,6 +39,10 @@ import org.thingsboard.server.dao.event.EventDao;
40 import org.thingsboard.server.dao.model.sql.EventEntity; 39 import org.thingsboard.server.dao.model.sql.EventEntity;
41 import org.thingsboard.server.dao.sql.JpaAbstractDao; 40 import org.thingsboard.server.dao.sql.JpaAbstractDao;
42 41
  42 +import java.sql.Connection;
  43 +import java.sql.PreparedStatement;
  44 +import java.sql.ResultSet;
  45 +import java.sql.SQLException;
43 import java.util.List; 46 import java.util.List;
44 import java.util.Objects; 47 import java.util.Objects;
45 import java.util.Optional; 48 import java.util.Optional;
@@ -256,6 +259,25 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen @@ -256,6 +259,25 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen
256 return DaoUtil.convertDataList(latest); 259 return DaoUtil.convertDataList(latest);
257 } 260 }
258 261
  262 + @Override
  263 + public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
  264 + log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl);
  265 + try (Connection connection = dataSource.getConnection();
  266 + PreparedStatement stmt = connection.prepareStatement("call cleanup_events_by_ttl(?,?,?)")) {
  267 + stmt.setLong(1, otherEventsTtl);
  268 + stmt.setLong(2, debugEventsTtl);
  269 + stmt.setLong(3, 0);
  270 + stmt.execute();
  271 + printWarnings(stmt);
  272 + try (ResultSet resultSet = stmt.getResultSet()){
  273 + resultSet.next();
  274 + log.info("Total events removed by TTL: [{}]", resultSet.getLong(1));
  275 + }
  276 + } catch (SQLException e) {
  277 + log.error("SQLException occurred during events TTL task execution ", e);
  278 + }
  279 + }
  280 +
259 public Optional<Event> save(EventEntity entity, boolean ifNotExists) { 281 public Optional<Event> save(EventEntity entity, boolean ifNotExists) {
260 log.debug("Save event [{}] ", entity); 282 log.debug("Save event [{}] ", entity);
261 if (entity.getTenantId() == null) { 283 if (entity.getTenantId() == null) {
@@ -25,9 +25,14 @@ import org.thingsboard.server.common.data.id.EntityId; @@ -25,9 +25,14 @@ import org.thingsboard.server.common.data.id.EntityId;
25 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
26 import org.thingsboard.server.common.data.kv.ReadTsKvQuery; 26 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
27 import org.thingsboard.server.common.data.kv.TsKvEntry; 27 import org.thingsboard.server.common.data.kv.TsKvEntry;
  28 +import org.thingsboard.server.dao.model.ModelConstants;
28 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; 29 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
29 30
30 import javax.annotation.Nullable; 31 import javax.annotation.Nullable;
  32 +import java.sql.Connection;
  33 +import java.sql.PreparedStatement;
  34 +import java.sql.ResultSet;
  35 +import java.sql.SQLException;
31 import java.util.List; 36 import java.util.List;
32 import java.util.Objects; 37 import java.util.Objects;
33 import java.util.concurrent.TimeUnit; 38 import java.util.concurrent.TimeUnit;
@@ -62,6 +67,24 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries @@ -62,6 +67,24 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
62 @Value("${sql.ttl.ts.ts_key_value_ttl:0}") 67 @Value("${sql.ttl.ts.ts_key_value_ttl:0}")
63 private long systemTtl; 68 private long systemTtl;
64 69
  70 + public void cleanup(long systemTtl) {
  71 + log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl);
  72 + try (Connection connection = dataSource.getConnection();
  73 + PreparedStatement stmt = connection.prepareStatement("call cleanup_timeseries_by_ttl(?,?,?)")) {
  74 + stmt.setObject(1, ModelConstants.NULL_UUID);
  75 + stmt.setLong(2, systemTtl);
  76 + stmt.setLong(3, 0);
  77 + stmt.execute();
  78 + printWarnings(stmt);
  79 + try (ResultSet resultSet = stmt.getResultSet()) {
  80 + resultSet.next();
  81 + log.info("Total telemetry removed stats by TTL for entities: [{}]", resultSet.getLong(1));
  82 + }
  83 + } catch (SQLException e) {
  84 + log.error("SQLException occurred during timeseries TTL task execution ", e);
  85 + }
  86 + }
  87 +
65 protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) { 88 protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
66 List<ListenableFuture<List<TsKvEntry>>> futures = queries 89 List<ListenableFuture<List<TsKvEntry>>> futures = queries
67 .stream() 90 .stream()
@@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
54 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); 54 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
55 } 55 }
56 56
  57 + @Override
  58 + public void cleanup(long systemTtl) {
  59 +
  60 + }
  61 +
57 } 62 }
@@ -35,6 +35,10 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; @@ -35,6 +35,10 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
35 import org.thingsboard.server.dao.util.PsqlDao; 35 import org.thingsboard.server.dao.util.PsqlDao;
36 import org.thingsboard.server.dao.util.SqlTsDao; 36 import org.thingsboard.server.dao.util.SqlTsDao;
37 37
  38 +import java.sql.Connection;
  39 +import java.sql.PreparedStatement;
  40 +import java.sql.ResultSet;
  41 +import java.sql.SQLException;
38 import java.time.Instant; 42 import java.time.Instant;
39 import java.time.LocalDateTime; 43 import java.time.LocalDateTime;
40 import java.time.ZoneOffset; 44 import java.time.ZoneOffset;
@@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
62 @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") 66 @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}")
63 private String partitioning; 67 private String partitioning;
64 68
  69 +
65 @Override 70 @Override
66 protected void init() { 71 protected void init() {
67 super.init(); 72 super.init();
@@ -93,6 +98,30 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -93,6 +98,30 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
93 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); 98 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
94 } 99 }
95 100
  101 + @Override
  102 + public void cleanup(long systemTtl) {
  103 + cleanupPartitions(systemTtl);
  104 + super.cleanup(systemTtl);
  105 + }
  106 +
  107 + private void cleanupPartitions(long systemTtl) {
  108 + log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl);
  109 + try (Connection connection = dataSource.getConnection();
  110 + PreparedStatement stmt = connection.prepareStatement("call drop_partitions_by_max_ttl(?,?,?)")) {
  111 + stmt.setString(1, partitioning);
  112 + stmt.setLong(2, systemTtl);
  113 + stmt.setLong(3, 0);
  114 + stmt.execute();
  115 + printWarnings(stmt);
  116 + try (ResultSet resultSet = stmt.getResultSet()) {
  117 + resultSet.next();
  118 + log.info("Total partitions removed by TTL: [{}]", resultSet.getLong(1));
  119 + }
  120 + } catch (SQLException e) {
  121 + log.error("SQLException occurred during TTL task execution ", e);
  122 + }
  123 + }
  124 +
96 private void savePartitionIfNotExist(long ts) { 125 private void savePartitionIfNotExist(long ts) {
97 if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) { 126 if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) {
98 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); 127 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
34 import org.thingsboard.server.common.data.kv.TsKvEntry; 34 import org.thingsboard.server.common.data.kv.TsKvEntry;
35 import org.thingsboard.server.common.stats.StatsFactory; 35 import org.thingsboard.server.common.stats.StatsFactory;
36 import org.thingsboard.server.dao.DaoUtil; 36 import org.thingsboard.server.dao.DaoUtil;
  37 +import org.thingsboard.server.dao.model.ModelConstants;
37 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; 38 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
38 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; 39 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
39 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; 40 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
@@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
45 46
46 import javax.annotation.PostConstruct; 47 import javax.annotation.PostConstruct;
47 import javax.annotation.PreDestroy; 48 import javax.annotation.PreDestroy;
  49 +import java.sql.CallableStatement;
  50 +import java.sql.SQLException;
  51 +import java.sql.Types;
48 import java.util.*; 52 import java.util.*;
49 import java.util.concurrent.CompletableFuture; 53 import java.util.concurrent.CompletableFuture;
50 import java.util.function.Function; 54 import java.util.function.Function;
@@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
156 } 160 }
157 } 161 }
158 162
  163 + @Override
  164 + public void cleanup(long systemTtl) {
  165 + super.cleanup(systemTtl);
  166 + }
  167 +
159 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { 168 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
160 String strKey = query.getKey(); 169 String strKey = query.getKey();
161 Integer keyId = getOrSaveKeyId(strKey); 170 Integer keyId = getOrSaveKeyId(strKey);
@@ -127,6 +127,11 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -127,6 +127,11 @@ public class BaseTimeseriesService implements TimeseriesService {
127 } 127 }
128 128
129 @Override 129 @Override
  130 + public void cleanup(long systemTtl) {
  131 + timeseriesDao.cleanup(systemTtl);
  132 + }
  133 +
  134 + @Override
130 public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { 135 public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
131 validate(entityId); 136 validate(entityId);
132 if (tsKvEntry == null) { 137 if (tsKvEntry == null) {
@@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
288 } 288 }
289 } 289 }
290 290
  291 + @Override
  292 + public void cleanup(long systemTtl) {
  293 + //Cleanup by TTL is native for Cassandra
  294 + }
  295 +
291 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { 296 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
292 long minPartition = toPartitionTs(query.getStartTs()); 297 long minPartition = toPartitionTs(query.getStartTs());
293 long maxPartition = toPartitionTs(query.getEndTs()); 298 long maxPartition = toPartitionTs(query.getEndTs());
@@ -38,4 +38,6 @@ public interface TimeseriesDao { @@ -38,4 +38,6 @@ public interface TimeseriesDao {
38 ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); 38 ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
39 39
40 ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); 40 ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
  41 +
  42 + void cleanup(long systemTtl);
41 } 43 }
@@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar @@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
38 LANGUAGE plpgsql AS 38 LANGUAGE plpgsql AS
39 $$ 39 $$
40 DECLARE 40 DECLARE
41 - max_tenant_ttl bigint;  
42 - max_customer_ttl bigint;  
43 - max_ttl bigint;  
44 - date timestamp;  
45 - partition_by_max_ttl_date varchar;  
46 - partition_month varchar;  
47 - partition_day varchar;  
48 - partition_year varchar;  
49 - partition varchar;  
50 - partition_to_delete varchar;  
51 - 41 + max_tenant_ttl bigint;
  42 + max_customer_ttl bigint;
  43 + max_ttl bigint;
  44 + date timestamp;
  45 + partition_by_max_ttl_date varchar;
  46 + partition_by_max_ttl_month varchar;
  47 + partition_by_max_ttl_day varchar;
  48 + partition_by_max_ttl_year varchar;
  49 + partition varchar;
  50 + partition_year integer;
  51 + partition_month integer;
  52 + partition_day integer;
52 53
53 BEGIN 54 BEGIN
54 SELECT max(attribute_kv.long_v) 55 SELECT max(attribute_kv.long_v)
@@ -65,53 +66,138 @@ BEGIN @@ -65,53 +66,138 @@ BEGIN
65 if max_ttl IS NOT NULL AND max_ttl > 0 THEN 66 if max_ttl IS NOT NULL AND max_ttl > 0 THEN
66 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl); 67 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
67 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date); 68 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
  69 + RAISE NOTICE 'Date by max ttl: %', date;
68 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date; 70 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
69 IF partition_by_max_ttl_date IS NOT NULL THEN 71 IF partition_by_max_ttl_date IS NOT NULL THEN
70 CASE 72 CASE
71 WHEN partition_type = 'DAYS' THEN 73 WHEN partition_type = 'DAYS' THEN
72 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);  
73 - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);  
74 - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5); 74 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  75 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  76 + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
75 WHEN partition_type = 'MONTHS' THEN 77 WHEN partition_type = 'MONTHS' THEN
76 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);  
77 - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4); 78 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  79 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
78 ELSE 80 ELSE
79 - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3); 81 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
80 END CASE; 82 END CASE;
81 - FOR partition IN SELECT tablename  
82 - FROM pg_tables  
83 - WHERE schemaname = 'public'  
84 - AND tablename like 'ts_kv_' || '%'  
85 - AND tablename != 'ts_kv_latest'  
86 - AND tablename != 'ts_kv_dictionary'  
87 - AND tablename != 'ts_kv_indefinite'  
88 - LOOP  
89 - IF partition != partition_by_max_ttl_date THEN  
90 - IF partition_year IS NOT NULL THEN  
91 - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN  
92 - partition_to_delete := partition;  
93 - ELSE  
94 - IF partition_month IS NOT NULL THEN  
95 - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN  
96 - partition_to_delete := partition; 83 + IF partition_by_max_ttl_year IS NULL THEN
  84 + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
  85 + ELSE
  86 + IF partition_type = 'YEARS' THEN
  87 + FOR partition IN SELECT tablename
  88 + FROM pg_tables
  89 + WHERE schemaname = 'public'
  90 + AND tablename like 'ts_kv_' || '%'
  91 + AND tablename != 'ts_kv_latest'
  92 + AND tablename != 'ts_kv_dictionary'
  93 + AND tablename != 'ts_kv_indefinite'
  94 + AND tablename != partition_by_max_ttl_date
  95 + LOOP
  96 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  97 + IF partition_year < partition_by_max_ttl_year::integer THEN
  98 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  99 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  100 + deleted := deleted + 1;
  101 + END IF;
  102 + END LOOP;
  103 + ELSE
  104 + IF partition_type = 'MONTHS' THEN
  105 + IF partition_by_max_ttl_month IS NULL THEN
  106 + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
  107 + ELSE
  108 + FOR partition IN SELECT tablename
  109 + FROM pg_tables
  110 + WHERE schemaname = 'public'
  111 + AND tablename like 'ts_kv_' || '%'
  112 + AND tablename != 'ts_kv_latest'
  113 + AND tablename != 'ts_kv_dictionary'
  114 + AND tablename != 'ts_kv_indefinite'
  115 + AND tablename != partition_by_max_ttl_date
  116 + LOOP
  117 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  118 + IF partition_year > partition_by_max_ttl_year::integer THEN
  119 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  120 + CONTINUE;
97 ELSE 121 ELSE
98 - IF partition_day IS NOT NULL THEN  
99 - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN  
100 - partition_to_delete := partition; 122 + IF partition_year < partition_by_max_ttl_year::integer THEN
  123 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  124 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  125 + deleted := deleted + 1;
  126 + ELSE
  127 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  128 + IF partition_year = partition_by_max_ttl_year::integer THEN
  129 + IF partition_month >= partition_by_max_ttl_month::integer THEN
  130 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  131 + CONTINUE;
  132 + ELSE
  133 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  134 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  135 + deleted := deleted + 1;
  136 + END IF;
101 END IF; 137 END IF;
102 END IF; 138 END IF;
103 END IF; 139 END IF;
  140 + END LOOP;
  141 + END IF;
  142 + ELSE
  143 + IF partition_type = 'DAYS' THEN
  144 + IF partition_by_max_ttl_month IS NULL THEN
  145 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
  146 + ELSE
  147 + IF partition_by_max_ttl_day IS NULL THEN
  148 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
  149 + ELSE
  150 + FOR partition IN SELECT tablename
  151 + FROM pg_tables
  152 + WHERE schemaname = 'public'
  153 + AND tablename like 'ts_kv_' || '%'
  154 + AND tablename != 'ts_kv_latest'
  155 + AND tablename != 'ts_kv_dictionary'
  156 + AND tablename != 'ts_kv_indefinite'
  157 + AND tablename != partition_by_max_ttl_date
  158 + LOOP
  159 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  160 + IF partition_year > partition_by_max_ttl_year::integer THEN
  161 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  162 + CONTINUE;
  163 + ELSE
  164 + IF partition_year < partition_by_max_ttl_year::integer THEN
  165 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  166 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  167 + deleted := deleted + 1;
  168 + ELSE
  169 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  170 + IF partition_month > partition_by_max_ttl_month::integer THEN
  171 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  172 + CONTINUE;
  173 + ELSE
  174 + IF partition_month < partition_by_max_ttl_month::integer THEN
  175 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  176 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  177 + deleted := deleted + 1;
  178 + ELSE
  179 + partition_day := SPLIT_PART(partition, '_', 5)::integer;
  180 + IF partition_day >= partition_by_max_ttl_day::integer THEN
  181 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  182 + CONTINUE;
  183 + ELSE
  184 + IF partition_day < partition_by_max_ttl_day::integer THEN
  185 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  186 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  187 + deleted := deleted + 1;
  188 + END IF;
  189 + END IF;
  190 + END IF;
  191 + END IF;
  192 + END IF;
  193 + END IF;
  194 + END LOOP;
104 END IF; 195 END IF;
105 END IF; 196 END IF;
106 END IF; 197 END IF;
107 - IF partition_to_delete IS NOT NULL THEN  
108 - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;  
109 - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);  
110 - partition_to_delete := NULL;  
111 - deleted := deleted + 1;  
112 - END IF;  
113 END IF; 198 END IF;
114 - END LOOP; 199 + END IF;
  200 + END IF;
115 END IF; 201 END IF;
116 END IF; 202 END IF;
117 END 203 END
@@ -127,8 +213,6 @@ BEGIN @@ -127,8 +213,6 @@ BEGIN
127 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM'); 213 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
128 WHEN partition_type = 'YEARS' THEN 214 WHEN partition_type = 'YEARS' THEN
129 partition := 'ts_kv_' || to_char(date, 'yyyy'); 215 partition := 'ts_kv_' || to_char(date, 'yyyy');
130 - WHEN partition_type = 'INDEFINITE' THEN  
131 - partition := NULL;  
132 ELSE 216 ELSE
133 partition := NULL; 217 partition := NULL;
134 END CASE; 218 END CASE;