Commit 6b212bb5d70bf8821866cb296233c67cd6daf8b3
Merge branch 'master' of github.com:thingsboard/thingsboard
Showing
19 changed files
with
236 additions
and
64 deletions
@@ -123,3 +123,28 @@ BEGIN | @@ -123,3 +123,28 @@ BEGIN | ||
123 | END LOOP; | 123 | END LOOP; |
124 | END | 124 | END |
125 | $$; | 125 | $$; |
126 | + | ||
127 | +CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint) | ||
128 | + LANGUAGE plpgsql AS | ||
129 | +$$ | ||
130 | +DECLARE | ||
131 | + ttl_ts bigint; | ||
132 | + debug_ttl_ts bigint; | ||
133 | + ttl_deleted_count bigint DEFAULT 0; | ||
134 | + debug_ttl_deleted_count bigint DEFAULT 0; | ||
135 | +BEGIN | ||
136 | + IF ttl > 0 THEN | ||
137 | + ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint; | ||
138 | + EXECUTE format( | ||
139 | + 'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type != %L::varchar AND event_type != %L::varchar) RETURNING *) SELECT count(*) FROM deleted', ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into ttl_deleted_count; | ||
140 | + END IF; | ||
141 | + IF debug_ttl > 0 THEN | ||
142 | + debug_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - debug_ttl::bigint * 1000)::bigint; | ||
143 | + EXECUTE format( | ||
144 | + 'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type = %L::varchar OR event_type = %L::varchar) RETURNING *) SELECT count(*) FROM deleted', debug_ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into debug_ttl_deleted_count; | ||
145 | + END IF; | ||
146 | + RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count; | ||
147 | + RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count; | ||
148 | + deleted := ttl_deleted_count + debug_ttl_deleted_count; | ||
149 | +END | ||
150 | +$$; |
@@ -23,8 +23,8 @@ import org.springframework.context.ApplicationContext; | @@ -23,8 +23,8 @@ import org.springframework.context.ApplicationContext; | ||
23 | import org.springframework.context.annotation.Profile; | 23 | import org.springframework.context.annotation.Profile; |
24 | import org.springframework.stereotype.Service; | 24 | import org.springframework.stereotype.Service; |
25 | import org.thingsboard.server.service.component.ComponentDiscoveryService; | 25 | import org.thingsboard.server.service.component.ComponentDiscoveryService; |
26 | -import org.thingsboard.server.service.install.DatabaseTsUpgradeService; | ||
27 | import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService; | 26 | import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService; |
27 | +import org.thingsboard.server.service.install.DatabaseTsUpgradeService; | ||
28 | import org.thingsboard.server.service.install.EntityDatabaseSchemaService; | 28 | import org.thingsboard.server.service.install.EntityDatabaseSchemaService; |
29 | import org.thingsboard.server.service.install.SystemDataLoaderService; | 29 | import org.thingsboard.server.service.install.SystemDataLoaderService; |
30 | import org.thingsboard.server.service.install.TsDatabaseSchemaService; | 30 | import org.thingsboard.server.service.install.TsDatabaseSchemaService; |
@@ -225,6 +225,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService | @@ -225,6 +225,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService | ||
225 | conn.createStatement().execute("ALTER TABLE tenant ADD COLUMN isolated_tb_core boolean DEFAULT (false), ADD COLUMN isolated_tb_rule_engine boolean DEFAULT (false)"); | 225 | conn.createStatement().execute("ALTER TABLE tenant ADD COLUMN isolated_tb_core boolean DEFAULT (false), ADD COLUMN isolated_tb_rule_engine boolean DEFAULT (false)"); |
226 | } catch (Exception e) { | 226 | } catch (Exception e) { |
227 | } | 227 | } |
228 | + try { | ||
229 | + long ts = System.currentTimeMillis(); | ||
230 | + conn.createStatement().execute("ALTER TABLE event ADD COLUMN ts bigint DEFAULT " + ts + ";"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script | ||
231 | + } catch (Exception e) { | ||
232 | + } | ||
228 | log.info("Schema updated."); | 233 | log.info("Schema updated."); |
229 | } | 234 | } |
230 | break; | 235 | break; |
application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java
renamed from
application/src/main/java/org/thingsboard/server/service/ttl/AbstractTimeseriesCleanUpService.java
@@ -17,47 +17,27 @@ package org.thingsboard.server.service.ttl; | @@ -17,47 +17,27 @@ package org.thingsboard.server.service.ttl; | ||
17 | 17 | ||
18 | import lombok.extern.slf4j.Slf4j; | 18 | import lombok.extern.slf4j.Slf4j; |
19 | import org.springframework.beans.factory.annotation.Value; | 19 | import org.springframework.beans.factory.annotation.Value; |
20 | -import org.springframework.scheduling.annotation.Scheduled; | ||
21 | -import org.thingsboard.server.dao.util.PsqlTsAnyDao; | 20 | +import org.thingsboard.server.dao.util.PsqlDao; |
22 | 21 | ||
23 | import java.sql.Connection; | 22 | import java.sql.Connection; |
24 | -import java.sql.DriverManager; | ||
25 | import java.sql.ResultSet; | 23 | import java.sql.ResultSet; |
26 | import java.sql.SQLException; | 24 | import java.sql.SQLException; |
27 | import java.sql.SQLWarning; | 25 | import java.sql.SQLWarning; |
28 | import java.sql.Statement; | 26 | import java.sql.Statement; |
29 | 27 | ||
30 | -@PsqlTsAnyDao | ||
31 | -@Slf4j | ||
32 | -public abstract class AbstractTimeseriesCleanUpService { | ||
33 | - | ||
34 | - @Value("${sql.ttl.ts_key_value_ttl}") | ||
35 | - protected long systemTtl; | ||
36 | 28 | ||
37 | - @Value("${sql.ttl.enabled}") | ||
38 | - private boolean ttlTaskExecutionEnabled; | 29 | +@Slf4j |
30 | +@PsqlDao | ||
31 | +public abstract class AbstractCleanUpService { | ||
39 | 32 | ||
40 | @Value("${spring.datasource.url}") | 33 | @Value("${spring.datasource.url}") |
41 | - private String dbUrl; | 34 | + protected String dbUrl; |
42 | 35 | ||
43 | @Value("${spring.datasource.username}") | 36 | @Value("${spring.datasource.username}") |
44 | - private String dbUserName; | 37 | + protected String dbUserName; |
45 | 38 | ||
46 | @Value("${spring.datasource.password}") | 39 | @Value("${spring.datasource.password}") |
47 | - private String dbPassword; | ||
48 | - | ||
49 | - @Scheduled(initialDelayString = "${sql.ttl.execution_interval_ms}", fixedDelayString = "${sql.ttl.execution_interval_ms}") | ||
50 | - public void cleanUp() { | ||
51 | - if (ttlTaskExecutionEnabled) { | ||
52 | - try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { | ||
53 | - doCleanUp(conn); | ||
54 | - } catch (SQLException e) { | ||
55 | - log.error("SQLException occurred during TTL task execution ", e); | ||
56 | - } | ||
57 | - } | ||
58 | - } | ||
59 | - | ||
60 | - protected abstract void doCleanUp(Connection connection); | 40 | + protected String dbPassword; |
61 | 41 | ||
62 | protected long executeQuery(Connection conn, String query) { | 42 | protected long executeQuery(Connection conn, String query) { |
63 | long removed = 0L; | 43 | long removed = 0L; |
@@ -74,7 +54,7 @@ public abstract class AbstractTimeseriesCleanUpService { | @@ -74,7 +54,7 @@ public abstract class AbstractTimeseriesCleanUpService { | ||
74 | return removed; | 54 | return removed; |
75 | } | 55 | } |
76 | 56 | ||
77 | - private void getWarnings(Statement statement) throws SQLException { | 57 | + protected void getWarnings(Statement statement) throws SQLException { |
78 | SQLWarning warnings = statement.getWarnings(); | 58 | SQLWarning warnings = statement.getWarnings(); |
79 | if (warnings != null) { | 59 | if (warnings != null) { |
80 | log.debug("{}", warnings.getMessage()); | 60 | log.debug("{}", warnings.getMessage()); |
@@ -86,4 +66,6 @@ public abstract class AbstractTimeseriesCleanUpService { | @@ -86,4 +66,6 @@ public abstract class AbstractTimeseriesCleanUpService { | ||
86 | } | 66 | } |
87 | } | 67 | } |
88 | 68 | ||
89 | -} | ||
69 | + protected abstract void doCleanUp(Connection connection); | ||
70 | + | ||
71 | +} |
application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.service.ttl.events; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.springframework.beans.factory.annotation.Value; | ||
20 | +import org.springframework.scheduling.annotation.Scheduled; | ||
21 | +import org.springframework.stereotype.Service; | ||
22 | +import org.thingsboard.server.dao.util.PsqlDao; | ||
23 | +import org.thingsboard.server.service.ttl.AbstractCleanUpService; | ||
24 | + | ||
25 | +import java.sql.Connection; | ||
26 | +import java.sql.DriverManager; | ||
27 | +import java.sql.SQLException; | ||
28 | + | ||
29 | +@PsqlDao | ||
30 | +@Slf4j | ||
31 | +@Service | ||
32 | +public class EventsCleanUpService extends AbstractCleanUpService { | ||
33 | + | ||
34 | + @Value("${sql.ttl.events.events_ttl}") | ||
35 | + private long ttl; | ||
36 | + | ||
37 | + @Value("${sql.ttl.events.debug_events_ttl}") | ||
38 | + private long debugTtl; | ||
39 | + | ||
40 | + @Value("${sql.ttl.events.enabled}") | ||
41 | + private boolean ttlTaskExecutionEnabled; | ||
42 | + | ||
43 | + @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") | ||
44 | + public void cleanUp() { | ||
45 | + if (ttlTaskExecutionEnabled) { | ||
46 | + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { | ||
47 | + doCleanUp(conn); | ||
48 | + } catch (SQLException e) { | ||
49 | + log.error("SQLException occurred during TTL task execution ", e); | ||
50 | + } | ||
51 | + } | ||
52 | + } | ||
53 | + | ||
54 | + @Override | ||
55 | + protected void doCleanUp(Connection connection) { | ||
56 | + long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);"); | ||
57 | + log.info("Total events removed by TTL: [{}]", totalEventsRemoved); | ||
58 | + } | ||
59 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.service.ttl.timeseries; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.springframework.beans.factory.annotation.Value; | ||
20 | +import org.springframework.scheduling.annotation.Scheduled; | ||
21 | +import org.thingsboard.server.dao.util.PsqlTsAnyDao; | ||
22 | +import org.thingsboard.server.service.ttl.AbstractCleanUpService; | ||
23 | + | ||
24 | +import java.sql.Connection; | ||
25 | +import java.sql.DriverManager; | ||
26 | +import java.sql.SQLException; | ||
27 | + | ||
28 | +@PsqlTsAnyDao | ||
29 | +@Slf4j | ||
30 | +public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService { | ||
31 | + | ||
32 | + @Value("${sql.ttl.ts.ts_key_value_ttl}") | ||
33 | + protected long systemTtl; | ||
34 | + | ||
35 | + @Value("${sql.ttl.ts.enabled}") | ||
36 | + private boolean ttlTaskExecutionEnabled; | ||
37 | + | ||
38 | + @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}") | ||
39 | + public void cleanUp() { | ||
40 | + if (ttlTaskExecutionEnabled) { | ||
41 | + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { | ||
42 | + doCleanUp(conn); | ||
43 | + } catch (SQLException e) { | ||
44 | + log.error("SQLException occurred during TTL task execution ", e); | ||
45 | + } | ||
46 | + } | ||
47 | + } | ||
48 | + | ||
49 | +} |
application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java
renamed from
application/src/main/java/org/thingsboard/server/service/ttl/PsqlTimeseriesCleanUpService.java
@@ -13,7 +13,7 @@ | @@ -13,7 +13,7 @@ | ||
13 | * See the License for the specific language governing permissions and | 13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. | 14 | * limitations under the License. |
15 | */ | 15 | */ |
16 | -package org.thingsboard.server.service.ttl; | 16 | +package org.thingsboard.server.service.ttl.timeseries; |
17 | 17 | ||
18 | import lombok.extern.slf4j.Slf4j; | 18 | import lombok.extern.slf4j.Slf4j; |
19 | import org.springframework.beans.factory.annotation.Value; | 19 | import org.springframework.beans.factory.annotation.Value; |
application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java
renamed from
application/src/main/java/org/thingsboard/server/service/ttl/TimescaleTimeseriesCleanUpService.java
@@ -13,7 +13,7 @@ | @@ -13,7 +13,7 @@ | ||
13 | * See the License for the specific language governing permissions and | 13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. | 14 | * limitations under the License. |
15 | */ | 15 | */ |
16 | -package org.thingsboard.server.service.ttl; | 16 | +package org.thingsboard.server.service.ttl.timeseries; |
17 | 17 | ||
18 | import lombok.extern.slf4j.Slf4j; | 18 | import lombok.extern.slf4j.Slf4j; |
19 | import org.springframework.stereotype.Service; | 19 | import org.springframework.stereotype.Service; |
@@ -181,31 +181,37 @@ cassandra: | @@ -181,31 +181,37 @@ cassandra: | ||
181 | 181 | ||
182 | # SQL configuration parameters | 182 | # SQL configuration parameters |
183 | sql: | 183 | sql: |
184 | - # Specify batch size for persisting attribute updates | ||
185 | - attributes: | ||
186 | - batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" | ||
187 | - batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" | ||
188 | - stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" | ||
189 | - ts: | ||
190 | - batch_size: "${SQL_TS_BATCH_SIZE:10000}" | ||
191 | - batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" | ||
192 | - stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" | ||
193 | - ts_latest: | ||
194 | - batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" | ||
195 | - batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" | ||
196 | - stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" | ||
197 | - # Specify whether to remove null characters from strValue of attributes and timeseries before insert | ||
198 | - remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" | ||
199 | - postgres: | ||
200 | - # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. | ||
201 | - ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" | ||
202 | - timescale: | ||
203 | - # Specify Interval size for new data chunks storage. | ||
204 | - chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" | ||
205 | - ttl: | ||
206 | - enabled: "${SQL_TTL_ENABLED:true}" | ||
207 | - execution_interval_ms: "${SQL_TTL_EXECUTION_INTERVAL:86400000}" # Number of miliseconds | ||
208 | - ts_key_value_ttl: "${SQL_TTL_TS_KEY_VALUE_TTL:0}" # Number of seconds | 184 | + # Specify batch size for persisting attribute updates |
185 | + attributes: | ||
186 | + batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" | ||
187 | + batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" | ||
188 | + stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" | ||
189 | + ts: | ||
190 | + batch_size: "${SQL_TS_BATCH_SIZE:10000}" | ||
191 | + batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" | ||
192 | + stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" | ||
193 | + ts_latest: | ||
194 | + batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" | ||
195 | + batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" | ||
196 | + stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" | ||
197 | + # Specify whether to remove null characters from strValue of attributes and timeseries before insert | ||
198 | + remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" | ||
199 | + postgres: | ||
200 | + # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. | ||
201 | + ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" | ||
202 | + timescale: | ||
203 | + # Specify Interval size for new data chunks storage. | ||
204 | + chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" | ||
205 | + ttl: | ||
206 | + ts: | ||
207 | + enabled: "${SQL_TTL_TS_ENABLED:true}" | ||
208 | + execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of miliseconds. The current value corresponds to one day | ||
209 | + ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds | ||
210 | + events: | ||
211 | + enabled: "${SQL_TTL_EVENTS_ENABLED:true}" | ||
212 | + execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of miliseconds. The current value corresponds to one day | ||
213 | + events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds | ||
214 | + debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week | ||
209 | 215 | ||
210 | # Actor system parameters | 216 | # Actor system parameters |
211 | actors: | 217 | actors: |
@@ -32,6 +32,9 @@ public class ModelConstants { | @@ -32,6 +32,9 @@ public class ModelConstants { | ||
32 | public static final String NULL_UUID_STR = UUIDConverter.fromTimeUUID(NULL_UUID); | 32 | public static final String NULL_UUID_STR = UUIDConverter.fromTimeUUID(NULL_UUID); |
33 | public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); | 33 | public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); |
34 | 34 | ||
35 | + // this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units | ||
36 | + public static final long EPOCH_DIFF = 122192928000000000L; | ||
37 | + | ||
35 | /** | 38 | /** |
36 | * Generic constants. | 39 | * Generic constants. |
37 | */ | 40 | */ |
@@ -37,6 +37,9 @@ import javax.persistence.EnumType; | @@ -37,6 +37,9 @@ import javax.persistence.EnumType; | ||
37 | import javax.persistence.Enumerated; | 37 | import javax.persistence.Enumerated; |
38 | import javax.persistence.Table; | 38 | import javax.persistence.Table; |
39 | 39 | ||
40 | +import java.util.UUID; | ||
41 | + | ||
42 | +import static org.thingsboard.server.dao.model.ModelConstants.EPOCH_DIFF; | ||
40 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BODY_PROPERTY; | 43 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BODY_PROPERTY; |
41 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME; | 44 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME; |
42 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_ID_PROPERTY; | 45 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_ID_PROPERTY; |
@@ -44,6 +47,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_TYPE_ | @@ -44,6 +47,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_TYPE_ | ||
44 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TENANT_ID_PROPERTY; | 47 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TENANT_ID_PROPERTY; |
45 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_PROPERTY; | 48 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_PROPERTY; |
46 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_UID_PROPERTY; | 49 | import static org.thingsboard.server.dao.model.ModelConstants.EVENT_UID_PROPERTY; |
50 | +import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; | ||
47 | 51 | ||
48 | @Data | 52 | @Data |
49 | @EqualsAndHashCode(callSuper = true) | 53 | @EqualsAndHashCode(callSuper = true) |
@@ -73,9 +77,15 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve | @@ -73,9 +77,15 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve | ||
73 | @Column(name = EVENT_BODY_PROPERTY) | 77 | @Column(name = EVENT_BODY_PROPERTY) |
74 | private JsonNode body; | 78 | private JsonNode body; |
75 | 79 | ||
80 | + @Column(name = TS_COLUMN) | ||
81 | + private long ts; | ||
82 | + | ||
76 | public EventEntity(Event event) { | 83 | public EventEntity(Event event) { |
77 | if (event.getId() != null) { | 84 | if (event.getId() != null) { |
78 | this.setUuid(event.getId().getId()); | 85 | this.setUuid(event.getId().getId()); |
86 | + this.ts = getTs(event.getId().getId()); | ||
87 | + } else { | ||
88 | + this.ts = System.currentTimeMillis(); | ||
79 | } | 89 | } |
80 | if (event.getTenantId() != null) { | 90 | if (event.getTenantId() != null) { |
81 | this.tenantId = toString(event.getTenantId().getId()); | 91 | this.tenantId = toString(event.getTenantId().getId()); |
@@ -101,4 +111,8 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve | @@ -101,4 +111,8 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve | ||
101 | event.setUid(eventUid); | 111 | event.setUid(eventUid); |
102 | return event; | 112 | return event; |
103 | } | 113 | } |
114 | + | ||
115 | + private long getTs(UUID uuid) { | ||
116 | + return (uuid.timestamp() - EPOCH_DIFF) / 10000; | ||
117 | + } | ||
104 | } | 118 | } |
@@ -75,7 +75,8 @@ public abstract class AbstractEventInsertRepository implements EventInsertReposi | @@ -75,7 +75,8 @@ public abstract class AbstractEventInsertRepository implements EventInsertReposi | ||
75 | .setParameter("entity_type", entity.getEntityType().name()) | 75 | .setParameter("entity_type", entity.getEntityType().name()) |
76 | .setParameter("event_type", entity.getEventType()) | 76 | .setParameter("event_type", entity.getEventType()) |
77 | .setParameter("event_uid", entity.getEventUid()) | 77 | .setParameter("event_uid", entity.getEventUid()) |
78 | - .setParameter("tenant_id", entity.getTenantId()); | 78 | + .setParameter("tenant_id", entity.getTenantId()) |
79 | + .setParameter("ts", entity.getTs()); | ||
79 | } | 80 | } |
80 | 81 | ||
81 | private EventEntity processSaveOrUpdate(EventEntity entity, String query) { | 82 | private EventEntity processSaveOrUpdate(EventEntity entity, String query) { |
@@ -44,7 +44,7 @@ public class HsqlEventInsertRepository extends AbstractEventInsertRepository { | @@ -44,7 +44,7 @@ public class HsqlEventInsertRepository extends AbstractEventInsertRepository { | ||
44 | } | 44 | } |
45 | 45 | ||
46 | private static String getInsertString(String conflictStatement) { | 46 | private static String getInsertString(String conflictStatement) { |
47 | - return "MERGE INTO event USING (VALUES :id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) I (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) ON " + conflictStatement + " WHEN MATCHED THEN UPDATE SET event.id = I.id, event.body = I.body, event.entity_id = I.entity_id, event.entity_type = I.entity_type, event.event_type = I.event_type, event.event_uid = I.event_uid, event.tenant_id = I.tenant_id" + | ||
48 | - " WHEN NOT MATCHED THEN INSERT (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (I.id, I.body, I.entity_id, I.entity_type, I.event_type, I.event_uid, I.tenant_id)"; | 47 | + return "MERGE INTO event USING (VALUES :id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id, :ts) I (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) ON " + conflictStatement + " WHEN MATCHED THEN UPDATE SET event.id = I.id, event.body = I.body, event.entity_id = I.entity_id, event.entity_type = I.entity_type, event.event_type = I.event_type, event.event_uid = I.event_uid, event.tenant_id = I.tenant_id, event.ts = I.ts" + |
48 | + " WHEN NOT MATCHED THEN INSERT (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) VALUES (I.id, I.body, I.entity_id, I.entity_type, I.event_type, I.event_uid, I.tenant_id, I.ts)"; | ||
49 | } | 49 | } |
50 | } | 50 | } |
@@ -48,6 +48,6 @@ public class PsqlEventInsertRepository extends AbstractEventInsertRepository { | @@ -48,6 +48,6 @@ public class PsqlEventInsertRepository extends AbstractEventInsertRepository { | ||
48 | } | 48 | } |
49 | 49 | ||
50 | private static String getInsertOrUpdateString(String eventKeyStatement, String updateKeyStatement) { | 50 | private static String getInsertOrUpdateString(String eventKeyStatement, String updateKeyStatement) { |
51 | - return "INSERT INTO event (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (:id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) ON CONFLICT " + eventKeyStatement + " DO UPDATE SET body = :body, " + updateKeyStatement + " returning *"; | 51 | + return "INSERT INTO event (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) VALUES (:id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id, :ts) ON CONFLICT " + eventKeyStatement + " DO UPDATE SET body = :body, ts = :ts," + updateKeyStatement + " returning *"; |
52 | } | 52 | } |
53 | } | 53 | } |
@@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event ( | @@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event ( | ||
144 | event_type varchar(255), | 144 | event_type varchar(255), |
145 | event_uid varchar(255), | 145 | event_uid varchar(255), |
146 | tenant_id varchar(31), | 146 | tenant_id varchar(31), |
147 | + ts bigint NOT NULL, | ||
147 | CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid) | 148 | CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid) |
148 | ); | 149 | ); |
149 | 150 | ||
@@ -251,3 +252,4 @@ CREATE TABLE IF NOT EXISTS entity_view ( | @@ -251,3 +252,4 @@ CREATE TABLE IF NOT EXISTS entity_view ( | ||
251 | search_text varchar(255), | 252 | search_text varchar(255), |
252 | additional_info varchar | 253 | additional_info varchar |
253 | ); | 254 | ); |
255 | + |
@@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event ( | @@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event ( | ||
144 | event_type varchar(255), | 144 | event_type varchar(255), |
145 | event_uid varchar(255), | 145 | event_uid varchar(255), |
146 | tenant_id varchar(31), | 146 | tenant_id varchar(31), |
147 | + ts bigint NOT NULL, | ||
147 | CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid) | 148 | CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid) |
148 | ); | 149 | ); |
149 | 150 | ||
@@ -251,3 +252,28 @@ CREATE TABLE IF NOT EXISTS entity_view ( | @@ -251,3 +252,28 @@ CREATE TABLE IF NOT EXISTS entity_view ( | ||
251 | search_text varchar(255), | 252 | search_text varchar(255), |
252 | additional_info varchar | 253 | additional_info varchar |
253 | ); | 254 | ); |
255 | + | ||
256 | +CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint) | ||
257 | + LANGUAGE plpgsql AS | ||
258 | +$$ | ||
259 | +DECLARE | ||
260 | + ttl_ts bigint; | ||
261 | + debug_ttl_ts bigint; | ||
262 | + ttl_deleted_count bigint DEFAULT 0; | ||
263 | + debug_ttl_deleted_count bigint DEFAULT 0; | ||
264 | +BEGIN | ||
265 | + IF ttl > 0 THEN | ||
266 | + ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint; | ||
267 | + EXECUTE format( | ||
268 | + 'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type != %L::varchar AND event_type != %L::varchar) RETURNING *) SELECT count(*) FROM deleted', ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into ttl_deleted_count; | ||
269 | + END IF; | ||
270 | + IF debug_ttl > 0 THEN | ||
271 | + debug_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - debug_ttl::bigint * 1000)::bigint; | ||
272 | + EXECUTE format( | ||
273 | + 'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type = %L::varchar OR event_type = %L::varchar) RETURNING *) SELECT count(*) FROM deleted', debug_ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into debug_ttl_deleted_count; | ||
274 | + END IF; | ||
275 | + RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count; | ||
276 | + RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count; | ||
277 | + deleted := ttl_deleted_count + debug_ttl_deleted_count; | ||
278 | +END | ||
279 | +$$; |
@@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings | @@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings | ||
52 | CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) | 52 | CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) |
53 | ); | 53 | ); |
54 | 54 | ||
55 | -INSERT INTO tb_schema_settings (schema_version) VALUES (2005000); | 55 | +INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000; |
56 | 56 | ||
57 | CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS | 57 | CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS |
58 | $$ | 58 | $$ |
@@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings | @@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings | ||
52 | CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) | 52 | CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version) |
53 | ); | 53 | ); |
54 | 54 | ||
55 | -INSERT INTO tb_schema_settings (schema_version) VALUES (2005000); | 55 | +INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000; |
56 | 56 | ||
57 | CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint) | 57 | CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint) |
58 | LANGUAGE plpgsql AS | 58 | LANGUAGE plpgsql AS |
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg; | @@ -30,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg; | ||
30 | 30 | ||
31 | @Slf4j | 31 | @Slf4j |
32 | @RuleNode(type = ComponentType.ENRICHMENT, | 32 | @RuleNode(type = ComponentType.ENRICHMENT, |
33 | - name = "device attributes", | 33 | + name = "related device attributes", |
34 | configClazz = TbGetDeviceAttrNodeConfiguration.class, | 34 | configClazz = TbGetDeviceAttrNodeConfiguration.class, |
35 | nodeDescription = "Add Originators Related Device Attributes and Latest Telemetry value into Message Metadata", | 35 | nodeDescription = "Add Originators Related Device Attributes and Latest Telemetry value into Message Metadata", |
36 | nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " + | 36 | nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " + |