Commit a729f2bdf98cd0a7308fc136448291a5351b654d
Committed by
GitHub
Merge pull request #4275 from ViacheslavKlimov/fix/timescale-ttl-procedure
Fix Timescale procedure
Showing
3 changed files
with
77 additions
and
2 deletions
1 | +CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid uuid, | ||
2 | + IN system_ttl bigint, INOUT deleted bigint) | ||
3 | + LANGUAGE plpgsql AS | ||
4 | +$$ | ||
5 | +DECLARE | ||
6 | +tenant_cursor CURSOR FOR select tenant.id as tenant_id | ||
7 | + from tenant; | ||
8 | + tenant_id_record uuid; | ||
9 | + customer_id_record uuid; | ||
10 | + tenant_ttl bigint; | ||
11 | + customer_ttl bigint; | ||
12 | + deleted_for_entities bigint; | ||
13 | + tenant_ttl_ts bigint; | ||
14 | + customer_ttl_ts bigint; | ||
15 | +BEGIN | ||
16 | +OPEN tenant_cursor; | ||
17 | +FETCH tenant_cursor INTO tenant_id_record; | ||
18 | +WHILE FOUND | ||
19 | + LOOP | ||
20 | + EXECUTE format( | ||
21 | + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', | ||
22 | + tenant_id_record, 'TTL') INTO tenant_ttl; | ||
23 | + if tenant_ttl IS NULL THEN | ||
24 | + tenant_ttl := system_ttl; | ||
25 | +END IF; | ||
26 | + IF tenant_ttl > 0 THEN | ||
27 | + tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint; | ||
28 | + deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); | ||
29 | + deleted := deleted + deleted_for_entities; | ||
30 | + RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record; | ||
31 | + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts); | ||
32 | + deleted := deleted + deleted_for_entities; | ||
33 | + RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record; | ||
34 | +END IF; | ||
35 | +FOR customer_id_record IN | ||
36 | +SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record | ||
37 | + LOOP | ||
38 | + EXECUTE format( | ||
39 | + 'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L', | ||
40 | + customer_id_record, 'TTL') INTO customer_ttl; | ||
41 | +IF customer_ttl IS NULL THEN | ||
42 | + customer_ttl_ts := tenant_ttl_ts; | ||
43 | +ELSE | ||
44 | + IF customer_ttl > 0 THEN | ||
45 | + customer_ttl_ts := | ||
46 | + (EXTRACT(EPOCH FROM current_timestamp) * 1000 - | ||
47 | + customer_ttl::bigint * 1000)::bigint; | ||
48 | +END IF; | ||
49 | +END IF; | ||
50 | + IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN | ||
51 | + deleted_for_entities := | ||
52 | + delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record, | ||
53 | + customer_ttl_ts); | ||
54 | + deleted := deleted + deleted_for_entities; | ||
55 | + RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record; | ||
56 | + deleted_for_entities := | ||
57 | + delete_device_records_from_ts_kv(tenant_id_record, customer_id_record, | ||
58 | + customer_ttl_ts); | ||
59 | + deleted := deleted + deleted_for_entities; | ||
60 | + RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; | ||
61 | + deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, | ||
62 | + customer_id_record, | ||
63 | + customer_ttl_ts); | ||
64 | + deleted := deleted + deleted_for_entities; | ||
65 | + RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record; | ||
66 | +END IF; | ||
67 | +END LOOP; | ||
68 | +FETCH tenant_cursor INTO tenant_id_record; | ||
69 | +END LOOP; | ||
70 | +END | ||
71 | +$$; |
@@ -178,7 +178,11 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr | @@ -178,7 +178,11 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr | ||
178 | } | 178 | } |
179 | break; | 179 | break; |
180 | case "3.1.1": | 180 | case "3.1.1": |
181 | + break; | ||
181 | case "3.2.1": | 182 | case "3.2.1": |
183 | + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { | ||
184 | + loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); | ||
185 | + } | ||
182 | break; | 186 | break; |
183 | default: | 187 | default: |
184 | throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); | 188 | throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); |
@@ -91,8 +91,8 @@ $$ | @@ -91,8 +91,8 @@ $$ | ||
91 | DECLARE | 91 | DECLARE |
92 | tenant_cursor CURSOR FOR select tenant.id as tenant_id | 92 | tenant_cursor CURSOR FOR select tenant.id as tenant_id |
93 | from tenant; | 93 | from tenant; |
94 | - tenant_id_record varchar; | ||
95 | - customer_id_record varchar; | 94 | + tenant_id_record uuid; |
95 | + customer_id_record uuid; | ||
96 | tenant_ttl bigint; | 96 | tenant_ttl bigint; |
97 | customer_ttl bigint; | 97 | customer_ttl bigint; |
98 | deleted_for_entities bigint; | 98 | deleted_for_entities bigint; |