Commit b197965ab8424935bd74e5b3cd012d8b93b4f781

Authored by YevhenBondarenko
2 parents a665b2cb 4e5f4851

Merge branch 'master' of https://github.com/thingsboard/thingsboard into feature/power-mode

Showing 58 changed files with 1010 additions and 666 deletions
... ... @@ -18,17 +18,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
18 18 LANGUAGE plpgsql AS
19 19 $$
20 20 DECLARE
21   - max_tenant_ttl bigint;
22   - max_customer_ttl bigint;
23   - max_ttl bigint;
24   - date timestamp;
25   - partition_by_max_ttl_date varchar;
26   - partition_month varchar;
27   - partition_day varchar;
28   - partition_year varchar;
29   - partition varchar;
30   - partition_to_delete varchar;
31   -
  21 + max_tenant_ttl bigint;
  22 + max_customer_ttl bigint;
  23 + max_ttl bigint;
  24 + date timestamp;
  25 + partition_by_max_ttl_date varchar;
  26 + partition_by_max_ttl_month varchar;
  27 + partition_by_max_ttl_day varchar;
  28 + partition_by_max_ttl_year varchar;
  29 + partition varchar;
  30 + partition_year integer;
  31 + partition_month integer;
  32 + partition_day integer;
32 33
33 34 BEGIN
34 35 SELECT max(attribute_kv.long_v)
... ... @@ -45,53 +46,138 @@ BEGIN
45 46 if max_ttl IS NOT NULL AND max_ttl > 0 THEN
46 47 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
47 48 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
  49 + RAISE NOTICE 'Date by max ttl: %', date;
48 50 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
49 51 IF partition_by_max_ttl_date IS NOT NULL THEN
50 52 CASE
51 53 WHEN partition_type = 'DAYS' THEN
52   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
53   - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
54   - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
  54 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  55 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  56 + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
55 57 WHEN partition_type = 'MONTHS' THEN
56   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
57   - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  58 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  59 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
58 60 ELSE
59   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  61 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
60 62 END CASE;
61   - FOR partition IN SELECT tablename
62   - FROM pg_tables
63   - WHERE schemaname = 'public'
64   - AND tablename like 'ts_kv_' || '%'
65   - AND tablename != 'ts_kv_latest'
66   - AND tablename != 'ts_kv_dictionary'
67   - AND tablename != 'ts_kv_indefinite'
68   - LOOP
69   - IF partition != partition_by_max_ttl_date THEN
70   - IF partition_year IS NOT NULL THEN
71   - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
72   - partition_to_delete := partition;
73   - ELSE
74   - IF partition_month IS NOT NULL THEN
75   - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
76   - partition_to_delete := partition;
  63 + IF partition_by_max_ttl_year IS NULL THEN
  64 + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
  65 + ELSE
  66 + IF partition_type = 'YEARS' THEN
  67 + FOR partition IN SELECT tablename
  68 + FROM pg_tables
  69 + WHERE schemaname = 'public'
  70 + AND tablename like 'ts_kv_' || '%'
  71 + AND tablename != 'ts_kv_latest'
  72 + AND tablename != 'ts_kv_dictionary'
  73 + AND tablename != 'ts_kv_indefinite'
  74 + AND tablename != partition_by_max_ttl_date
  75 + LOOP
  76 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  77 + IF partition_year < partition_by_max_ttl_year::integer THEN
  78 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  79 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  80 + deleted := deleted + 1;
  81 + END IF;
  82 + END LOOP;
  83 + ELSE
  84 + IF partition_type = 'MONTHS' THEN
  85 + IF partition_by_max_ttl_month IS NULL THEN
  86 + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
  87 + ELSE
  88 + FOR partition IN SELECT tablename
  89 + FROM pg_tables
  90 + WHERE schemaname = 'public'
  91 + AND tablename like 'ts_kv_' || '%'
  92 + AND tablename != 'ts_kv_latest'
  93 + AND tablename != 'ts_kv_dictionary'
  94 + AND tablename != 'ts_kv_indefinite'
  95 + AND tablename != partition_by_max_ttl_date
  96 + LOOP
  97 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  98 + IF partition_year > partition_by_max_ttl_year::integer THEN
  99 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  100 + CONTINUE;
77 101 ELSE
78   - IF partition_day IS NOT NULL THEN
79   - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
80   - partition_to_delete := partition;
  102 + IF partition_year < partition_by_max_ttl_year::integer THEN
  103 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  104 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  105 + deleted := deleted + 1;
  106 + ELSE
  107 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  108 + IF partition_year = partition_by_max_ttl_year::integer THEN
  109 + IF partition_month >= partition_by_max_ttl_month::integer THEN
  110 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  111 + CONTINUE;
  112 + ELSE
  113 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  114 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  115 + deleted := deleted + 1;
  116 + END IF;
81 117 END IF;
82 118 END IF;
83 119 END IF;
  120 + END LOOP;
  121 + END IF;
  122 + ELSE
  123 + IF partition_type = 'DAYS' THEN
  124 + IF partition_by_max_ttl_month IS NULL THEN
  125 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
  126 + ELSE
  127 + IF partition_by_max_ttl_day IS NULL THEN
  128 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
  129 + ELSE
  130 + FOR partition IN SELECT tablename
  131 + FROM pg_tables
  132 + WHERE schemaname = 'public'
  133 + AND tablename like 'ts_kv_' || '%'
  134 + AND tablename != 'ts_kv_latest'
  135 + AND tablename != 'ts_kv_dictionary'
  136 + AND tablename != 'ts_kv_indefinite'
  137 + AND tablename != partition_by_max_ttl_date
  138 + LOOP
  139 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  140 + IF partition_year > partition_by_max_ttl_year::integer THEN
  141 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  142 + CONTINUE;
  143 + ELSE
  144 + IF partition_year < partition_by_max_ttl_year::integer THEN
  145 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  146 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  147 + deleted := deleted + 1;
  148 + ELSE
  149 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  150 + IF partition_month > partition_by_max_ttl_month::integer THEN
  151 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  152 + CONTINUE;
  153 + ELSE
  154 + IF partition_month < partition_by_max_ttl_month::integer THEN
  155 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  156 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  157 + deleted := deleted + 1;
  158 + ELSE
  159 + partition_day := SPLIT_PART(partition, '_', 5)::integer;
  160 + IF partition_day >= partition_by_max_ttl_day::integer THEN
  161 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  162 + CONTINUE;
  163 + ELSE
  164 + IF partition_day < partition_by_max_ttl_day::integer THEN
  165 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  166 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  167 + deleted := deleted + 1;
  168 + END IF;
  169 + END IF;
  170 + END IF;
  171 + END IF;
  172 + END IF;
  173 + END IF;
  174 + END LOOP;
84 175 END IF;
85 176 END IF;
86 177 END IF;
87   - IF partition_to_delete IS NOT NULL THEN
88   - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
89   - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);
90   - partition_to_delete := NULL;
91   - deleted := deleted + 1;
92   - END IF;
93 178 END IF;
94   - END LOOP;
  179 + END IF;
  180 + END IF;
95 181 END IF;
96 182 END IF;
97 183 END
... ... @@ -107,8 +193,6 @@ BEGIN
107 193 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
108 194 WHEN partition_type = 'YEARS' THEN
109 195 partition := 'ts_kv_' || to_char(date, 'yyyy');
110   - WHEN partition_type = 'INDEFINITE' THEN
111   - partition := NULL;
112 196 ELSE
113 197 partition := NULL;
114 198 END CASE;
... ...
... ... @@ -86,7 +86,10 @@ public class DefaultTbResourceService implements TbResourceService {
86 86 } else {
87 87 throw new DataValidationException(String.format("Could not parse the XML of objectModel with name %s", resource.getSearchText()));
88 88 }
89   - } catch (InvalidDDFFileException | IOException e) {
  89 + } catch (InvalidDDFFileException e) {
  90 + log.error("Failed to parse file {}", resource.getFileName(), e);
  91 + throw new DataValidationException("Failed to parse file " + resource.getFileName());
  92 + } catch (IOException e) {
90 93 throw new ThingsboardException(e, ThingsboardErrorCode.GENERAL);
91 94 }
92 95 if (resource.getResourceType().equals(ResourceType.LWM2M_MODEL) && toLwM2mObject(resource, true) == null) {
... ... @@ -194,8 +197,7 @@ public class DefaultTbResourceService implements TbResourceService {
194 197 if (isSave) {
195 198 LwM2mResourceObserve lwM2MResourceObserve = new LwM2mResourceObserve(k, v.name, false, false, false);
196 199 resources.add(lwM2MResourceObserve);
197   - }
198   - else if (v.operations.isReadable()) {
  200 + } else if (v.operations.isReadable()) {
199 201 LwM2mResourceObserve lwM2MResourceObserve = new LwM2mResourceObserve(k, v.name, false, false, false);
200 202 resources.add(lwM2MResourceObserve);
201 203 }
... ... @@ -204,8 +206,7 @@ public class DefaultTbResourceService implements TbResourceService {
204 206 instance.setResources(resources.toArray(LwM2mResourceObserve[]::new));
205 207 lwM2mObject.setInstances(new LwM2mInstance[]{instance});
206 208 return lwM2mObject;
207   - }
208   - else {
  209 + } else {
209 210 return null;
210 211 }
211 212 }
... ...
... ... @@ -15,8 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.service.ttl;
17 17
  18 +import lombok.RequiredArgsConstructor;
18 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Autowired;
19 21 import org.springframework.beans.factory.annotation.Value;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.msg.queue.ServiceType;
  24 +import org.thingsboard.server.queue.discovery.PartitionService;
20 25
21 26 import java.sql.Connection;
22 27 import java.sql.DriverManager;
... ... @@ -27,43 +32,12 @@ import java.sql.Statement;
27 32
28 33
29 34 @Slf4j
  35 +@RequiredArgsConstructor
30 36 public abstract class AbstractCleanUpService {
31 37
32   - @Value("${spring.datasource.url}")
33   - protected String dbUrl;
  38 + private final PartitionService partitionService;
34 39
35   - @Value("${spring.datasource.username}")
36   - protected String dbUserName;
37   -
38   - @Value("${spring.datasource.password}")
39   - protected String dbPassword;
40   -
41   - protected long executeQuery(Connection conn, String query) throws SQLException {
42   - try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) {
43   - if (log.isDebugEnabled()) {
44   - getWarnings(statement);
45   - }
46   - resultSet.next();
47   - return resultSet.getLong(1);
48   - }
49   - }
50   -
51   - protected void getWarnings(Statement statement) throws SQLException {
52   - SQLWarning warnings = statement.getWarnings();
53   - if (warnings != null) {
54   - log.debug("{}", warnings.getMessage());
55   - SQLWarning nextWarning = warnings.getNextWarning();
56   - while (nextWarning != null) {
57   - log.debug("{}", nextWarning.getMessage());
58   - nextWarning = nextWarning.getNextWarning();
59   - }
60   - }
  40 + protected boolean isSystemTenantPartitionMine(){
  41 + return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
61 42 }
62   -
63   - protected abstract void doCleanUp(Connection connection) throws SQLException;
64   -
65   - protected Connection getConnection() throws SQLException {
66   - return DriverManager.getConnection(dbUrl, dbUserName, dbPassword);
67   - }
68   -
69 43 }
... ...
application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java
... ... @@ -13,7 +13,7 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.ttl.alarms;
  16 +package org.thingsboard.server.service.ttl;
17 17
18 18 import lombok.RequiredArgsConstructor;
19 19 import lombok.extern.slf4j.Slf4j;
... ... @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
52 52 @Slf4j
53 53 @RequiredArgsConstructor
54 54 public class AlarmsCleanUpService {
  55 +
55 56 @Value("${sql.ttl.alarms.removal_batch_size}")
56 57 private Integer removalBatchSize;
57 58
... ...
application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java
... ... @@ -13,20 +13,18 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.ttl.edge;
  16 +package org.thingsboard.server.service.ttl;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21 21 import org.springframework.stereotype.Service;
22   -import org.thingsboard.server.dao.util.PsqlDao;
  22 +import org.thingsboard.server.dao.edge.EdgeService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
23 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
24 26
25   -import java.sql.Connection;
26   -import java.sql.DriverManager;
27   -import java.sql.SQLException;
28   -
29   -@PsqlDao
  27 +@TbCoreComponent
30 28 @Slf4j
31 29 @Service
32 30 public class EdgeEventsCleanUpService extends AbstractCleanUpService {
... ... @@ -37,20 +35,18 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
37 35 @Value("${sql.ttl.edge_events.enabled}")
38 36 private boolean ttlTaskExecutionEnabled;
39 37
  38 + private final EdgeService edgeService;
  39 +
  40 + public EdgeEventsCleanUpService(PartitionService partitionService, EdgeService edgeService) {
  41 + super(partitionService);
  42 + this.edgeService = edgeService;
  43 + }
  44 +
40 45 @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
41 46 public void cleanUp() {
42   - if (ttlTaskExecutionEnabled) {
43   - try (Connection conn = getConnection()) {
44   - doCleanUp(conn);
45   - } catch (SQLException e) {
46   - log.error("SQLException occurred during TTL task execution ", e);
47   - }
  47 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  48 + edgeService.cleanupEvents(ttl);
48 49 }
49 50 }
50 51
51   - @Override
52   - protected void doCleanUp(Connection connection) throws SQLException {
53   - long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);");
54   - log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);
55   - }
56 52 }
... ...
application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java
... ... @@ -13,20 +13,18 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.ttl.events;
  16 +package org.thingsboard.server.service.ttl;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21 21 import org.springframework.stereotype.Service;
22   -import org.thingsboard.server.dao.util.PsqlDao;
  22 +import org.thingsboard.server.dao.event.EventService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
23 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
24 26
25   -import java.sql.Connection;
26   -import java.sql.DriverManager;
27   -import java.sql.SQLException;
28   -
29   -@PsqlDao
  27 +@TbCoreComponent
30 28 @Slf4j
31 29 @Service
32 30 public class EventsCleanUpService extends AbstractCleanUpService {
... ... @@ -40,20 +38,18 @@ public class EventsCleanUpService extends AbstractCleanUpService {
40 38 @Value("${sql.ttl.events.enabled}")
41 39 private boolean ttlTaskExecutionEnabled;
42 40
  41 + private final EventService eventService;
  42 +
  43 + public EventsCleanUpService(PartitionService partitionService, EventService eventService) {
  44 + super(partitionService);
  45 + this.eventService = eventService;
  46 + }
  47 +
43 48 @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
44 49 public void cleanUp() {
45   - if (ttlTaskExecutionEnabled) {
46   - try (Connection conn = getConnection()) {
47   - doCleanUp(conn);
48   - } catch (SQLException e) {
49   - log.error("SQLException occurred during TTL task execution ", e);
50   - }
  50 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  51 + eventService.cleanupEvents(ttl, debugTtl);
51 52 }
52 53 }
53 54
54   - @Override
55   - protected void doCleanUp(Connection connection) throws SQLException {
56   - long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);");
57   - log.info("Total events removed by TTL: [{}]", totalEventsRemoved);
58   - }
59 55 }
\ No newline at end of file
... ...
application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java
... ... @@ -13,19 +13,21 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.ttl.timeseries;
  16 +package org.thingsboard.server.service.ttl;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.scheduling.annotation.Scheduled;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  23 +import org.thingsboard.server.queue.discovery.PartitionService;
  24 +import org.thingsboard.server.queue.util.TbCoreComponent;
21 25 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
22 26
23   -import java.sql.Connection;
24   -import java.sql.DriverManager;
25   -import java.sql.SQLException;
26   -
  27 +@TbCoreComponent
27 28 @Slf4j
28   -public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService {
  29 +@Service
  30 +public class TimeseriesCleanUpService extends AbstractCleanUpService {
29 31
30 32 @Value("${sql.ttl.ts.ts_key_value_ttl}")
31 33 protected long systemTtl;
... ... @@ -33,14 +35,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe
33 35 @Value("${sql.ttl.ts.enabled}")
34 36 private boolean ttlTaskExecutionEnabled;
35 37
  38 + private final TimeseriesService timeseriesService;
  39 +
  40 + public TimeseriesCleanUpService(PartitionService partitionService, TimeseriesService timeseriesService) {
  41 + super(partitionService);
  42 + this.timeseriesService = timeseriesService;
  43 + }
  44 +
36 45 @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}")
37 46 public void cleanUp() {
38   - if (ttlTaskExecutionEnabled) {
39   - try (Connection conn = getConnection()) {
40   - doCleanUp(conn);
41   - } catch (SQLException e) {
42   - log.error("SQLException occurred during TTL task execution ", e);
43   - }
  47 + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
  48 + timeseriesService.cleanup(systemTtl);
44 49 }
45 50 }
46 51
... ...
1   -/**
2   - * Copyright © 2016-2021 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.ttl.timeseries;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.beans.factory.annotation.Value;
20   -import org.springframework.stereotype.Service;
21   -import org.thingsboard.server.dao.model.ModelConstants;
22   -import org.thingsboard.server.dao.util.PsqlDao;
23   -import org.thingsboard.server.dao.util.SqlTsDao;
24   -
25   -import java.sql.Connection;
26   -import java.sql.SQLException;
27   -
28   -@SqlTsDao
29   -@PsqlDao
30   -@Service
31   -@Slf4j
32   -public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
33   -
34   - @Value("${sql.postgres.ts_key_value_partitioning}")
35   - private String partitionType;
36   -
37   - @Override
38   - protected void doCleanUp(Connection connection) throws SQLException {
39   - long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);");
40   - log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved);
41   - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");
42   - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
43   - }
44   -}
\ No newline at end of file
... ... @@ -17,24 +17,47 @@ package org.thingsboard.server.transport.lwm2m;
17 17
18 18 import com.fasterxml.jackson.core.type.TypeReference;
19 19 import org.apache.commons.io.IOUtils;
  20 +import org.eclipse.californium.core.network.config.NetworkConfig;
  21 +import org.eclipse.leshan.client.object.Security;
20 22 import org.eclipse.leshan.core.util.Hex;
  23 +import org.jetbrains.annotations.NotNull;
21 24 import org.junit.After;
22 25 import org.junit.Assert;
23 26 import org.junit.Before;
  27 +import org.springframework.mock.web.MockMultipartFile;
  28 +import org.springframework.test.web.servlet.request.MockMultipartHttpServletRequestBuilder;
  29 +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
24 30 import org.thingsboard.common.util.JacksonUtil;
  31 +import org.thingsboard.server.common.data.Device;
25 32 import org.thingsboard.server.common.data.DeviceProfile;
26 33 import org.thingsboard.server.common.data.DeviceProfileProvisionType;
27 34 import org.thingsboard.server.common.data.DeviceProfileType;
28 35 import org.thingsboard.server.common.data.DeviceTransportType;
  36 +import org.thingsboard.server.common.data.OtaPackageInfo;
29 37 import org.thingsboard.server.common.data.ResourceType;
30 38 import org.thingsboard.server.common.data.TbResource;
  39 +import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MClientCredentials;
31 40 import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
32 41 import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
33 42 import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration;
34 43 import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
  44 +import org.thingsboard.server.common.data.query.EntityData;
  45 +import org.thingsboard.server.common.data.query.EntityDataPageLink;
  46 +import org.thingsboard.server.common.data.query.EntityDataQuery;
  47 +import org.thingsboard.server.common.data.query.EntityKey;
  48 +import org.thingsboard.server.common.data.query.EntityKeyType;
  49 +import org.thingsboard.server.common.data.query.SingleEntityFilter;
  50 +import org.thingsboard.server.common.data.security.DeviceCredentials;
  51 +import org.thingsboard.server.common.data.security.DeviceCredentialsType;
35 52 import org.thingsboard.server.controller.AbstractWebsocketTest;
36 53 import org.thingsboard.server.controller.TbTestWebSocketClient;
37 54 import org.thingsboard.server.dao.service.DaoSqlTest;
  55 +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
  56 +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
  57 +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
  58 +import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
  59 +import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient;
  60 +import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials;
38 61
39 62 import java.io.IOException;
40 63 import java.io.InputStream;
... ... @@ -54,9 +77,15 @@ import java.security.spec.ECPrivateKeySpec;
54 77 import java.security.spec.ECPublicKeySpec;
55 78 import java.security.spec.KeySpec;
56 79 import java.util.Base64;
  80 +import java.util.Collections;
  81 +import java.util.List;
57 82 import java.util.concurrent.Executors;
58 83 import java.util.concurrent.ScheduledExecutorService;
59 84
  85 +import static org.eclipse.leshan.client.object.Security.noSec;
  86 +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
  87 +import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE;
  88 +
60 89 @DaoSqlTest
61 90 public class AbstractLwM2MIntegrationTest extends AbstractWebsocketTest {
62 91
... ... @@ -139,6 +168,15 @@ public class AbstractLwM2MIntegrationTest extends AbstractWebsocketTest {
139 168 // certificates trustedby the server (should contain rootCA)
140 169 protected final Certificate[] trustedCertificates = new Certificate[1];
141 170
  171 + protected static final int SECURE_PORT = 5686;
  172 + protected static final NetworkConfig SECURE_COAP_CONFIG = new NetworkConfig().setString("COAP_SECURE_PORT", Integer.toString(SECURE_PORT));
  173 + protected static final String ENDPOINT = "deviceAEndpoint";
  174 + protected static final String SECURE_URI = "coaps://localhost:" + SECURE_PORT;
  175 +
  176 + protected static final int PORT = 5685;
  177 + protected static final Security SECURITY = noSec("coap://localhost:" + PORT, 123);
  178 + protected static final NetworkConfig COAP_CONFIG = new NetworkConfig().setString("COAP_PORT", Integer.toString(PORT));
  179 +
142 180 public AbstractLwM2MIntegrationTest() {
143 181 // create client credentials
144 182 try {
... ... @@ -262,10 +300,95 @@ public class AbstractLwM2MIntegrationTest extends AbstractWebsocketTest {
262 300 Assert.assertNotNull(deviceProfile);
263 301 }
264 302
  303 + @NotNull
  304 + protected Device createDevice(LwM2MClientCredentials clientCredentials) throws Exception {
  305 + Device device = new Device();
  306 + device.setName("Device A");
  307 + device.setDeviceProfileId(deviceProfile.getId());
  308 + device.setTenantId(tenantId);
  309 + device = doPost("/api/device", device, Device.class);
  310 + Assert.assertNotNull(device);
  311 +
  312 + DeviceCredentials deviceCredentials =
  313 + doGet("/api/device/" + device.getId().getId().toString() + "/credentials", DeviceCredentials.class);
  314 + Assert.assertEquals(device.getId(), deviceCredentials.getDeviceId());
  315 + deviceCredentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
  316 +
  317 + LwM2MCredentials credentials = new LwM2MCredentials();
  318 +
  319 + credentials.setClient(clientCredentials);
  320 +
  321 + deviceCredentials.setCredentialsValue(JacksonUtil.toString(credentials));
  322 + doPost("/api/device/credentials", deviceCredentials).andExpect(status().isOk());
  323 + return device;
  324 + }
  325 +
  326 +
  327 + protected OtaPackageInfo createFirmware() throws Exception {
  328 + String CHECKSUM = "4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a";
  329 +
  330 + OtaPackageInfo firmwareInfo = new OtaPackageInfo();
  331 + firmwareInfo.setDeviceProfileId(deviceProfile.getId());
  332 + firmwareInfo.setType(FIRMWARE);
  333 + firmwareInfo.setTitle("My firmware");
  334 + firmwareInfo.setVersion("v1.0");
  335 +
  336 + OtaPackageInfo savedFirmwareInfo = doPost("/api/otaPackage", firmwareInfo, OtaPackageInfo.class);
  337 +
  338 + MockMultipartFile testData = new MockMultipartFile("file", "filename.txt", "text/plain", new byte[]{1});
  339 +
  340 + return savaData("/api/otaPackage/" + savedFirmwareInfo.getId().getId().toString() + "?checksum={checksum}&checksumAlgorithm={checksumAlgorithm}", testData, CHECKSUM, "SHA256");
  341 + }
  342 +
  343 + protected OtaPackageInfo savaData(String urlTemplate, MockMultipartFile content, String... params) throws Exception {
  344 + MockMultipartHttpServletRequestBuilder postRequest = MockMvcRequestBuilders.multipart(urlTemplate, params);
  345 + postRequest.file(content);
  346 + setJwtToken(postRequest);
  347 + return readResponse(mockMvc.perform(postRequest).andExpect(status().isOk()), OtaPackageInfo.class);
  348 + }
  349 +
265 350 @After
266 351 public void after() {
267 352 executor.shutdownNow();
268 353 wsClient.close();
269 354 }
270 355
  356 + public void basicTestConnectionObserveTelemetry(Security security,
  357 + LwM2MClientCredentials credentials,
  358 + NetworkConfig coapConfig,
  359 + String endpoint) throws Exception {
  360 + createDeviceProfile(TRANSPORT_CONFIGURATION);
  361 + Device device = createDevice(credentials);
  362 +
  363 + SingleEntityFilter sef = new SingleEntityFilter();
  364 + sef.setSingleEntity(device.getId());
  365 + LatestValueCmd latestCmd = new LatestValueCmd();
  366 + latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
  367 + EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
  368 + Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
  369 +
  370 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
  371 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  372 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  373 +
  374 + wsClient.send(mapper.writeValueAsString(wrapper));
  375 + wsClient.waitForReply();
  376 +
  377 + wsClient.registerWaitForUpdate();
  378 + LwM2MTestClient client = new LwM2MTestClient(executor, endpoint);
  379 +
  380 + client.init(security, coapConfig);
  381 + String msg = wsClient.waitForUpdate();
  382 +
  383 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  384 + Assert.assertEquals(1, update.getCmdId());
  385 + List<EntityData> eData = update.getUpdate();
  386 + Assert.assertNotNull(eData);
  387 + Assert.assertEquals(1, eData.size());
  388 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  389 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
  390 + var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
  391 + Assert.assertEquals(42, Long.parseLong(tsValue.getValue()));
  392 + client.destroy();
  393 + }
271 394 }
... ...
... ... @@ -15,14 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m;
17 17
18   -import org.eclipse.californium.core.network.config.NetworkConfig;
19   -import org.eclipse.leshan.client.object.Security;
20 18 import org.junit.Assert;
21 19 import org.junit.Test;
22   -import org.springframework.mock.web.MockMultipartFile;
23   -import org.springframework.test.web.servlet.request.MockMultipartHttpServletRequestBuilder;
24   -import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
25   -import org.thingsboard.common.util.JacksonUtil;
26 20 import org.thingsboard.server.common.data.Device;
27 21 import org.thingsboard.server.common.data.OtaPackageInfo;
28 22 import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredentials;
... ... @@ -32,116 +26,30 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
32 26 import org.thingsboard.server.common.data.query.EntityKey;
33 27 import org.thingsboard.server.common.data.query.EntityKeyType;
34 28 import org.thingsboard.server.common.data.query.SingleEntityFilter;
35   -import org.thingsboard.server.common.data.security.DeviceCredentials;
36   -import org.thingsboard.server.common.data.security.DeviceCredentialsType;
37 29 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
38 30 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
39 31 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
40 32 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
41 33 import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient;
42   -import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials;
43 34
44 35 import java.util.Collections;
45 36 import java.util.List;
46 37
47   -import static org.eclipse.leshan.client.object.Security.noSec;
48   -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
49   -import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE;
50   -
51 38 public class NoSecLwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
52 39
53   - private final int PORT = 5685;
54   - private final Security SECURITY = noSec("coap://localhost:" + PORT, 123);
55   - private final NetworkConfig COAP_CONFIG = new NetworkConfig().setString("COAP_PORT", Integer.toString(PORT));
56   - private final String ENDPOINT = "noSecEndpoint";
57   -
58   - private Device createDevice() throws Exception {
59   - Device device = new Device();
60   - device.setName("Device A");
61   - device.setDeviceProfileId(deviceProfile.getId());
62   - device.setTenantId(tenantId);
63   - device = doPost("/api/device", device, Device.class);
64   - Assert.assertNotNull(device);
65   -
66   - DeviceCredentials deviceCredentials =
67   - doGet("/api/device/" + device.getId().getId().toString() + "/credentials", DeviceCredentials.class);
68   - Assert.assertEquals(device.getId(), deviceCredentials.getDeviceId());
69   - deviceCredentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
70   -
71   - LwM2MCredentials noSecCredentials = new LwM2MCredentials();
72   - NoSecClientCredentials clientCredentials = new NoSecClientCredentials();
73   - clientCredentials.setEndpoint(ENDPOINT);
74   - noSecCredentials.setClient(clientCredentials);
75   - deviceCredentials.setCredentialsValue(JacksonUtil.toString(noSecCredentials));
76   - doPost("/api/device/credentials", deviceCredentials).andExpect(status().isOk());
77   - return device;
78   - }
79   -
80   - private OtaPackageInfo createFirmware() throws Exception {
81   - String CHECKSUM = "4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a";
82   -
83   - OtaPackageInfo firmwareInfo = new OtaPackageInfo();
84   - firmwareInfo.setDeviceProfileId(deviceProfile.getId());
85   - firmwareInfo.setType(FIRMWARE);
86   - firmwareInfo.setTitle("My firmware");
87   - firmwareInfo.setVersion("v1.0");
88   -
89   - OtaPackageInfo savedFirmwareInfo = doPost("/api/otaPackage", firmwareInfo, OtaPackageInfo.class);
90   -
91   - MockMultipartFile testData = new MockMultipartFile("file", "filename.txt", "text/plain", new byte[]{1});
92   -
93   - return savaData("/api/otaPackage/" + savedFirmwareInfo.getId().getId().toString() + "?checksum={checksum}&checksumAlgorithm={checksumAlgorithm}", testData, CHECKSUM, "SHA256");
94   - }
95   -
96   - protected OtaPackageInfo savaData(String urlTemplate, MockMultipartFile content, String... params) throws Exception {
97   - MockMultipartHttpServletRequestBuilder postRequest = MockMvcRequestBuilders.multipart(urlTemplate, params);
98   - postRequest.file(content);
99   - setJwtToken(postRequest);
100   - return readResponse(mockMvc.perform(postRequest).andExpect(status().isOk()), OtaPackageInfo.class);
101   - }
102   -
103 40 @Test
104 41 public void testConnectAndObserveTelemetry() throws Exception {
105   - createDeviceProfile(TRANSPORT_CONFIGURATION);
106   -
107   - Device device = createDevice();
108   -
109   - SingleEntityFilter sef = new SingleEntityFilter();
110   - sef.setSingleEntity(device.getId());
111   - LatestValueCmd latestCmd = new LatestValueCmd();
112   - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
113   - EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
114   - Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
115   -
116   - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
117   - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
118   - wrapper.setEntityDataCmds(Collections.singletonList(cmd));
119   -
120   - wsClient.send(mapper.writeValueAsString(wrapper));
121   - wsClient.waitForReply();
122   -
123   - wsClient.registerWaitForUpdate();
124   - LwM2MTestClient client = new LwM2MTestClient(executor, ENDPOINT);
125   - client.init(SECURITY, COAP_CONFIG);
126   - String msg = wsClient.waitForUpdate();
127   -
128   - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
129   - Assert.assertEquals(1, update.getCmdId());
130   - List<EntityData> eData = update.getUpdate();
131   - Assert.assertNotNull(eData);
132   - Assert.assertEquals(1, eData.size());
133   - Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
134   - Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
135   - var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
136   - Assert.assertEquals(42, Long.parseLong(tsValue.getValue()));
137   - client.destroy();
  42 + NoSecClientCredentials clientCredentials = new NoSecClientCredentials();
  43 + clientCredentials.setEndpoint(ENDPOINT);
  44 + super.basicTestConnectionObserveTelemetry(SECURITY, clientCredentials, COAP_CONFIG, ENDPOINT);
138 45 }
139 46
140 47 @Test
141 48 public void testFirmwareUpdateWithClientWithoutFirmwareInfo() throws Exception {
142 49 createDeviceProfile(TRANSPORT_CONFIGURATION);
143   -
144   - Device device = createDevice();
  50 + NoSecClientCredentials clientCredentials = new NoSecClientCredentials();
  51 + clientCredentials.setEndpoint(ENDPOINT);
  52 + Device device = createDevice(clientCredentials);
145 53
146 54 OtaPackageInfo firmware = createFirmware();
147 55
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m;
  17 +
  18 +import org.eclipse.leshan.client.object.Security;
  19 +import org.eclipse.leshan.core.util.Hex;
  20 +import org.junit.Test;
  21 +import org.thingsboard.server.common.data.device.credentials.lwm2m.PSKClientCredentials;
  22 +
  23 +import java.nio.charset.StandardCharsets;
  24 +
  25 +import static org.eclipse.leshan.client.object.Security.psk;
  26 +
  27 +public class PskLwm2mIntegrationTest extends AbstractLwM2MIntegrationTest {
  28 +
  29 + @Test
  30 + public void testConnectWithPSKAndObserveTelemetry() throws Exception {
  31 + String pskIdentity = "SOME_PSK_ID";
  32 + String pskKey = "73656372657450534b";
  33 + PSKClientCredentials clientCredentials = new PSKClientCredentials();
  34 + clientCredentials.setEndpoint(ENDPOINT);
  35 + clientCredentials.setKey(pskKey);
  36 + clientCredentials.setIdentity(pskIdentity);
  37 + Security security = psk(SECURE_URI,
  38 + 123,
  39 + pskIdentity.getBytes(StandardCharsets.UTF_8),
  40 + Hex.decodeHex(pskKey.toCharArray()));
  41 + super.basicTestConnectionObserveTelemetry(security, clientCredentials, SECURE_COAP_CONFIG, ENDPOINT);
  42 + }
  43 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.lwm2m;
  17 +
  18 +import org.eclipse.leshan.client.object.Security;
  19 +import org.eclipse.leshan.core.util.Hex;
  20 +import org.junit.Test;
  21 +import org.thingsboard.server.common.data.device.credentials.lwm2m.RPKClientCredentials;
  22 +
  23 +import static org.eclipse.leshan.client.object.Security.rpk;
  24 +
  25 +public class RpkLwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
  26 + @Test
  27 + public void testConnectWithRPKAndObserveTelemetry() throws Exception {
  28 + RPKClientCredentials rpkClientCredentials = new RPKClientCredentials();
  29 + rpkClientCredentials.setEndpoint(ENDPOINT);
  30 + rpkClientCredentials.setKey(Hex.encodeHexString(clientPublicKey.getEncoded()));
  31 + Security security = rpk(SECURE_URI,
  32 + 123,
  33 + clientPublicKey.getEncoded(),
  34 + clientPrivateKey.getEncoded(),
  35 + serverX509Cert.getPublicKey().getEncoded());
  36 + super.basicTestConnectionObserveTelemetry(security, rpkClientCredentials, SECURE_COAP_CONFIG, ENDPOINT);
  37 + }
  38 +
  39 +}
... ...
... ... @@ -15,147 +15,38 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m;
17 17
18   -import org.eclipse.californium.core.network.config.NetworkConfig;
19 18 import org.eclipse.leshan.client.object.Security;
20   -import org.jetbrains.annotations.NotNull;
21   -import org.junit.Assert;
22   -import org.junit.Ignore;
23 19 import org.junit.Test;
24   -import org.thingsboard.common.util.JacksonUtil;
25   -import org.thingsboard.server.common.data.Device;
26 20 import org.thingsboard.server.common.data.device.credentials.lwm2m.X509ClientCredentials;
27   -import org.thingsboard.server.common.data.query.EntityData;
28   -import org.thingsboard.server.common.data.query.EntityDataPageLink;
29   -import org.thingsboard.server.common.data.query.EntityDataQuery;
30   -import org.thingsboard.server.common.data.query.EntityKey;
31   -import org.thingsboard.server.common.data.query.EntityKeyType;
32   -import org.thingsboard.server.common.data.query.SingleEntityFilter;
33   -import org.thingsboard.server.common.data.security.DeviceCredentials;
34   -import org.thingsboard.server.common.data.security.DeviceCredentialsType;
35 21 import org.thingsboard.server.common.transport.util.SslUtil;
36   -import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
37   -import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
38   -import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
39   -import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
40   -import org.thingsboard.server.transport.lwm2m.client.LwM2MTestClient;
41   -import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials;
42   -
43   -import java.util.Collections;
44   -import java.util.List;
45 22
46 23 import static org.eclipse.leshan.client.object.Security.x509;
47   -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
48 24
49 25 public class X509LwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
50 26
51   - private final int port = 5686;
52   - private final NetworkConfig coapConfig = new NetworkConfig().setString("COAP_SECURE_PORT", Integer.toString(port));
53   - private final String endpoint = "deviceAEndpoint";
54   - private final String serverUri = "coaps://localhost:" + port;
55   -
56   - private Device createDevice(X509ClientCredentials clientCredentials) throws Exception {
57   - Device device = new Device();
58   - device.setName("Device A");
59   - device.setDeviceProfileId(deviceProfile.getId());
60   - device.setTenantId(tenantId);
61   - device = doPost("/api/device", device, Device.class);
62   - Assert.assertNotNull(device);
63   -
64   - DeviceCredentials deviceCredentials =
65   - doGet("/api/device/" + device.getId().getId().toString() + "/credentials", DeviceCredentials.class);
66   - Assert.assertEquals(device.getId(), deviceCredentials.getDeviceId());
67   - deviceCredentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
68   -
69   - LwM2MCredentials credentials = new LwM2MCredentials();
70   -
71   - credentials.setClient(clientCredentials);
72   -
73   - deviceCredentials.setCredentialsValue(JacksonUtil.toString(credentials));
74   - doPost("/api/device/credentials", deviceCredentials).andExpect(status().isOk());
75   - return device;
76   - }
77   -
78   - //TODO: use different endpoints to isolate tests.
79   - @Ignore()
80 27 @Test
81 28 public void testConnectAndObserveTelemetry() throws Exception {
82   - createDeviceProfile(TRANSPORT_CONFIGURATION);
83 29 X509ClientCredentials credentials = new X509ClientCredentials();
84   - credentials.setEndpoint(endpoint+1);
85   - Device device = createDevice(credentials);
86   -
87   - SingleEntityFilter sef = new SingleEntityFilter();
88   - sef.setSingleEntity(device.getId());
89   - LatestValueCmd latestCmd = new LatestValueCmd();
90   - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
91   - EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
92   - Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
93   -
94   - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
95   - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
96   - wrapper.setEntityDataCmds(Collections.singletonList(cmd));
97   -
98   - wsClient.send(mapper.writeValueAsString(wrapper));
99   - wsClient.waitForReply();
100   -
101   - wsClient.registerWaitForUpdate();
102   - LwM2MTestClient client = new LwM2MTestClient(executor, endpoint+1);
103   - Security security = x509(serverUri, 123, clientX509Cert.getEncoded(), clientPrivateKeyFromCert.getEncoded(), serverX509Cert.getEncoded());
104   - client.init(security, coapConfig);
105   - String msg = wsClient.waitForUpdate();
106   -
107   - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
108   - Assert.assertEquals(1, update.getCmdId());
109   - List<EntityData> eData = update.getUpdate();
110   - Assert.assertNotNull(eData);
111   - Assert.assertEquals(1, eData.size());
112   - Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
113   - Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
114   - var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
115   - Assert.assertEquals(42, Long.parseLong(tsValue.getValue()));
116   - client.destroy();
  30 + credentials.setEndpoint(ENDPOINT);
  31 + Security security = x509(SECURE_URI,
  32 + 123,
  33 + clientX509Cert.getEncoded(),
  34 + clientPrivateKeyFromCert.getEncoded(),
  35 + serverX509Cert.getEncoded());
  36 + super.basicTestConnectionObserveTelemetry(security, credentials, SECURE_COAP_CONFIG, ENDPOINT);
117 37 }
118 38
119 39 @Test
120 40 public void testConnectWithCertAndObserveTelemetry() throws Exception {
121   - createDeviceProfile(TRANSPORT_CONFIGURATION);
122 41 X509ClientCredentials credentials = new X509ClientCredentials();
123   - credentials.setEndpoint(endpoint);
  42 + credentials.setEndpoint(ENDPOINT);
124 43 credentials.setCert(SslUtil.getCertificateString(clientX509CertNotTrusted));
125   - Device device = createDevice(credentials);
126   -
127   - SingleEntityFilter sef = new SingleEntityFilter();
128   - sef.setSingleEntity(device.getId());
129   - LatestValueCmd latestCmd = new LatestValueCmd();
130   - latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "batteryLevel")));
131   - EntityDataQuery edq = new EntityDataQuery(sef, new EntityDataPageLink(1, 0, null, null),
132   - Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
133   -
134   - EntityDataCmd cmd = new EntityDataCmd(1, edq, null, latestCmd, null);
135   - TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
136   - wrapper.setEntityDataCmds(Collections.singletonList(cmd));
137   -
138   - wsClient.send(mapper.writeValueAsString(wrapper));
139   - wsClient.waitForReply();
140   -
141   - wsClient.registerWaitForUpdate();
142   - LwM2MTestClient client = new LwM2MTestClient(executor, endpoint);
143   -
144   - Security security = x509(serverUri, 123, clientX509CertNotTrusted.getEncoded(), clientPrivateKeyFromCert.getEncoded(), serverX509Cert.getEncoded());
145   -
146   - client.init(security, coapConfig);
147   - String msg = wsClient.waitForUpdate();
148   -
149   - EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
150   - Assert.assertEquals(1, update.getCmdId());
151   - List<EntityData> eData = update.getUpdate();
152   - Assert.assertNotNull(eData);
153   - Assert.assertEquals(1, eData.size());
154   - Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
155   - Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
156   - var tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("batteryLevel");
157   - Assert.assertEquals(42, Long.parseLong(tsValue.getValue()));
158   - client.destroy();
  44 + Security security = x509(SECURE_URI,
  45 + 123,
  46 + clientX509CertNotTrusted.getEncoded(),
  47 + clientPrivateKeyFromCert.getEncoded(),
  48 + serverX509Cert.getEncoded());
  49 + super.basicTestConnectionObserveTelemetry(security, credentials, SECURE_COAP_CONFIG, ENDPOINT);
159 50 }
160 51
161 52 }
... ...
... ... @@ -93,4 +93,6 @@ public interface EdgeService {
93 93 Object activateInstance(String licenseSecret, String releaseDate);
94 94
95 95 String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId);
  96 +
  97 + void cleanupEvents(long ttl);
96 98 }
... ...
... ... @@ -46,4 +46,6 @@ public interface EventService {
46 46
47 47 void removeEvents(TenantId tenantId, EntityId entityId);
48 48
  49 + void cleanupEvents(long ttl, long debugTtl);
  50 +
49 51 }
... ...
... ... @@ -52,4 +52,6 @@ public interface TimeseriesService {
52 52 List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
53 53
54 54 List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);
  55 +
  56 + void cleanup(long systemTtl);
55 57 }
... ...
common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/AbstractLwM2MClientCredentialsWithKey.java renamed from common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/HasKey.java
... ... @@ -15,20 +15,25 @@
15 15 */
16 16 package org.thingsboard.server.common.data.device.credentials.lwm2m;
17 17
  18 +import com.fasterxml.jackson.annotation.JsonIgnore;
  19 +import lombok.Getter;
  20 +import lombok.Setter;
18 21 import lombok.SneakyThrows;
19 22 import org.apache.commons.codec.binary.Hex;
20 23
21   -public abstract class HasKey extends AbstractLwM2MClientCredentials {
22   - private byte[] key;
  24 +public abstract class AbstractLwM2MClientCredentialsWithKey extends AbstractLwM2MClientCredentials {
  25 + @Getter
  26 + @Setter
  27 + private String key;
  28 +
  29 + private byte[] keyInBytes;
23 30
24 31 @SneakyThrows
25   - public void setKey(String key) {
26   - if (key != null) {
27   - this.key = Hex.decodeHex(key.toLowerCase().toCharArray());
  32 + @JsonIgnore
  33 + public byte[] getDecodedKey() {
  34 + if (keyInBytes == null) {
  35 + keyInBytes = Hex.decodeHex(key.toLowerCase().toCharArray());
28 36 }
29   - }
30   -
31   - public byte[] getKey() {
32   - return key;
  37 + return keyInBytes;
33 38 }
34 39 }
... ...
... ... @@ -20,7 +20,7 @@ import lombok.Setter;
20 20
21 21 @Getter
22 22 @Setter
23   -public class PSKClientCredentials extends HasKey {
  23 +public class PSKClientCredentials extends AbstractLwM2MClientCredentialsWithKey {
24 24 private String identity;
25 25
26 26 @Override
... ...
... ... @@ -15,7 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.common.data.device.credentials.lwm2m;
17 17
18   -public class RPKClientCredentials extends HasKey {
  18 +public class RPKClientCredentials extends AbstractLwM2MClientCredentialsWithKey {
19 19
20 20 @Override
21 21 public LwM2MSecurityMode getSecurityConfigClientMode() {
... ...
... ... @@ -64,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap;
64 64 import java.util.concurrent.ConcurrentMap;
65 65 import java.util.concurrent.TimeUnit;
66 66 import java.util.concurrent.atomic.AtomicInteger;
  67 +import java.util.stream.Collectors;
67 68
68 69 @Slf4j
69 70 public class CoapTransportResource extends AbstractCoapTransportResource {
... ... @@ -75,9 +76,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
75 76 private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4;
76 77 private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID";
77 78
78   - private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionInfoMap = new ConcurrentHashMap<>();
79   - private final ConcurrentMap<String, AtomicInteger> tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>();
80   - private final ConcurrentMap<TransportProtos.SessionInfoProto, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
  79 + private final ConcurrentMap<String, CoapObserveSessionInfo> tokenToCoapSessionInfoMap = new ConcurrentHashMap<>();
  80 + private final ConcurrentMap<CoapObserveSessionInfo, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>();
81 81 private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
82 82 private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
83 83
... ... @@ -93,7 +93,11 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
93 93 this.timeout = coapServerService.getTimeout();
94 94 this.sessionReportTimeout = ctx.getSessionReportTimeout();
95 95 ctx.getScheduler().scheduleAtFixedRate(() -> {
96   - Set<TransportProtos.SessionInfoProto> observeSessions = sessionInfoToObserveRelationMap.keySet();
  96 + Set<CoapObserveSessionInfo> coapObserveSessionInfos = sessionInfoToObserveRelationMap.keySet();
  97 + Set<TransportProtos.SessionInfoProto> observeSessions = coapObserveSessionInfos
  98 + .stream()
  99 + .map(CoapObserveSessionInfo::getSessionInfoProto)
  100 + .collect(Collectors.toSet());
97 101 observeSessions.forEach(this::reportActivity);
98 102 }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
99 103 }
... ... @@ -111,17 +115,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
111 115 relation.setEstablished();
112 116 addObserveRelation(relation);
113 117 }
114   - AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0));
115   - response.getOptions().setObserve(notificationCounter.getAndIncrement());
  118 + AtomicInteger observeNotificationCounter = tokenToCoapSessionInfoMap.get(token).getObserveNotificationCounter();
  119 + response.getOptions().setObserve(observeNotificationCounter.getAndIncrement());
116 120 } // ObserveLayer takes care of the else case
117 121 }
118 122
119   - public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) {
  123 + private void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) {
120 124 relation.cancel();
121 125 relation.getExchange().sendResponse(new Response(code));
122 126 }
123 127
124   - public Map<TransportProtos.SessionInfoProto, ObserveRelation> getSessionInfoToObserveRelationMap() {
  128 + private Map<CoapObserveSessionInfo, ObserveRelation> getCoapSessionInfoToObserveRelationMap() {
125 129 return sessionInfoToObserveRelationMap;
126 130 }
127 131
... ... @@ -277,8 +281,8 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
277 281 new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
278 282 break;
279 283 case SUBSCRIBE_ATTRIBUTES_REQUEST:
280   - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));
281   - if (currentAttrSession == null) {
  284 + CoapObserveSessionInfo currentCoapObserveAttrSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  285 + if (currentCoapObserveAttrSessionInfo == null) {
282 286 attributeSubscriptions.add(sessionId);
283 287 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
284 288 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
... ... @@ -290,20 +294,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
290 294 }
291 295 break;
292 296 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
293   - TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request));
294   - if (attrSession != null) {
  297 + CoapObserveSessionInfo coapObserveAttrSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  298 + if (coapObserveAttrSessionInfo != null) {
  299 + TransportProtos.SessionInfoProto attrSession = coapObserveAttrSessionInfo.getSessionInfoProto();
295 300 UUID attrSessionId = toSessionId(attrSession);
296 301 attributeSubscriptions.remove(attrSessionId);
297   - sessionInfoToObserveRelationMap.remove(attrSession);
298 302 transportService.process(attrSession,
299 303 TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(),
300 304 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
301   - closeAndDeregister(sessionInfo);
302 305 }
  306 + closeAndDeregister(sessionInfo);
303 307 break;
304 308 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
305   - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request));
306   - if (currentRpcSession == null) {
  309 + CoapObserveSessionInfo currentCoapObserveRpcSessionInfo = tokenToCoapSessionInfoMap.get(getTokenFromRequest(request));
  310 + if (currentCoapObserveRpcSessionInfo == null) {
307 311 rpcSubscriptions.add(sessionId);
308 312 registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor,
309 313 transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request));
... ... @@ -314,16 +318,16 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
314 318 }
315 319 break;
316 320 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
317   - TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request));
318   - if (rpcSession != null) {
  321 + CoapObserveSessionInfo coapObserveRpcSessionInfo = lookupAsyncSessionInfo(getTokenFromRequest(request));
  322 + if (coapObserveRpcSessionInfo != null) {
  323 + TransportProtos.SessionInfoProto rpcSession = coapObserveRpcSessionInfo.getSessionInfoProto();
319 324 UUID rpcSessionId = toSessionId(rpcSession);
320 325 rpcSubscriptions.remove(rpcSessionId);
321   - sessionInfoToObserveRelationMap.remove(rpcSession);
322 326 transportService.process(rpcSession,
323 327 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
324 328 new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR));
325   - closeAndDeregister(sessionInfo);
326 329 }
  330 + closeAndDeregister(sessionInfo);
327 331 break;
328 332 case TO_DEVICE_RPC_RESPONSE:
329 333 transportService.process(sessionInfo,
... ... @@ -355,13 +359,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
355 359 return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
356 360 }
357 361
358   - private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) {
359   - tokenToObserveNotificationSeqMap.remove(token);
360   - return tokenToSessionInfoMap.remove(token);
  362 + private CoapObserveSessionInfo lookupAsyncSessionInfo(String token) {
  363 + return tokenToCoapSessionInfoMap.remove(token);
361 364 }
362 365
363 366 private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
364   - tokenToSessionInfoMap.putIfAbsent(token, sessionInfo);
  367 + tokenToCoapSessionInfoMap.putIfAbsent(token, new CoapObserveSessionInfo(sessionInfo));
365 368 transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
366 369 transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
367 370 }
... ... @@ -476,45 +479,40 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
476 479 }
477 480
478 481 @Override
479   - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg msg) {
  482 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
  483 + log.trace("[{}] Received attributes update notification to device", sessionId);
480 484 try {
481 485 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg));
482 486 } catch (AdaptorException e) {
483 487 log.trace("Failed to reply due to error", e);
484   - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  488 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  489 + closeAndDeregister();
485 490 }
486 491 }
487 492
488 493 @Override
489 494 public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
490 495 log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
491   - Map<TransportProtos.SessionInfoProto, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap();
492   - if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
493   - Set<TransportProtos.SessionInfoProto> observeSessions = sessionToObserveRelationMap.keySet();
494   - Optional<TransportProtos.SessionInfoProto> observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> {
495   - UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB());
496   - return observeSessionId.equals(sessionId);
497   - }).findFirst();
498   - if (observeSessionToClose.isPresent()) {
499   - TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get();
500   - ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto);
501   - coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE);
502   - }
503   - }
  496 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.SERVICE_UNAVAILABLE);
  497 + closeAndDeregister();
504 498 }
505 499
506 500 @Override
507   - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {
508   - boolean successful;
  501 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
  502 + log.trace("[{}] Received RPC command to device", sessionId);
  503 + boolean successful = true;
509 504 try {
510 505 exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
511   - successful = true;
512 506 } catch (AdaptorException e) {
513 507 log.trace("Failed to reply due to error", e);
514   - exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
  508 + closeObserveRelationAndNotify(sessionId, CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
515 509 successful = false;
  510 + } finally {
  511 + coapTransportResource.transportService.process(sessionInfo, msg, !successful, TransportServiceCallback.EMPTY);
  512 + if (!successful) {
  513 + closeAndDeregister();
  514 + }
516 515 }
517   - coapTransportResource.transportService.process(sessionInfo, msg, !successful, TransportServiceCallback.EMPTY);
518 516 }
519 517
520 518 @Override
... ... @@ -530,6 +528,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
530 528 private boolean isConRequest() {
531 529 return exchange.advanced().getRequest().isConfirmable();
532 530 }
  531 +
  532 + private void closeObserveRelationAndNotify(UUID sessionId, CoAP.ResponseCode responseCode) {
  533 + Map<CoapObserveSessionInfo, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getCoapSessionInfoToObserveRelationMap();
  534 + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) {
  535 + Optional<CoapObserveSessionInfo> observeSessionToClose = sessionToObserveRelationMap.keySet().stream().filter(coapObserveSessionInfo -> {
  536 + TransportProtos.SessionInfoProto sessionToDelete = coapObserveSessionInfo.getSessionInfoProto();
  537 + UUID observeSessionId = new UUID(sessionToDelete.getSessionIdMSB(), sessionToDelete.getSessionIdLSB());
  538 + return observeSessionId.equals(sessionId);
  539 + }).findFirst();
  540 + if (observeSessionToClose.isPresent()) {
  541 + CoapObserveSessionInfo coapObserveSessionInfo = observeSessionToClose.get();
  542 + ObserveRelation observeRelation = sessionToObserveRelationMap.get(coapObserveSessionInfo);
  543 + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, responseCode);
  544 + }
  545 + }
  546 + }
  547 +
  548 + private void closeAndDeregister() {
  549 + Request request = exchange.advanced().getRequest();
  550 + String token = coapTransportResource.getTokenFromRequest(request);
  551 + CoapObserveSessionInfo deleted = coapTransportResource.lookupAsyncSessionInfo(token);
  552 + coapTransportResource.closeAndDeregister(deleted.getSessionInfoProto());
  553 + }
  554 +
533 555 }
534 556
535 557 public class CoapResourceObserver implements ResourceObserver {
... ... @@ -554,7 +576,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
554 576 public void addedObserveRelation(ObserveRelation relation) {
555 577 Request request = relation.getExchange().getRequest();
556 578 String token = getTokenFromRequest(request);
557   - sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation);
  579 + sessionInfoToObserveRelationMap.putIfAbsent(tokenToCoapSessionInfoMap.get(token), relation);
558 580 log.trace("Added Observe relation for token: {}", token);
559 581 }
560 582
... ... @@ -562,8 +584,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
562 584 public void removedObserveRelation(ObserveRelation relation) {
563 585 Request request = relation.getExchange().getRequest();
564 586 String token = getTokenFromRequest(request);
565   - TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token);
566   - sessionInfoToObserveRelationMap.remove(session);
  587 + sessionInfoToObserveRelationMap.remove(tokenToCoapSessionInfoMap.get(token));
567 588 log.trace("Relation removed for token: {}", token);
568 589 }
569 590 }
... ... @@ -574,7 +595,6 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
574 595 transportService.deregisterSession(session);
575 596 rpcSubscriptions.remove(sessionId);
576 597 attributeSubscriptions.remove(sessionId);
577   - sessionInfoToObserveRelationMap.remove(session);
578 598 }
579 599
580 600 private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException {
... ... @@ -640,4 +660,17 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
640 660 this.jsonPayload = jsonPayload;
641 661 }
642 662 }
  663 +
  664 + @Data
  665 + private static class CoapObserveSessionInfo {
  666 +
  667 + private final TransportProtos.SessionInfoProto sessionInfoProto;
  668 + private final AtomicInteger observeNotificationCounter;
  669 +
  670 + private CoapObserveSessionInfo(TransportProtos.SessionInfoProto sessionInfoProto) {
  671 + this.sessionInfoProto = sessionInfoProto;
  672 + this.observeNotificationCounter = new AtomicInteger(0);
  673 + }
  674 + }
  675 +
643 676 }
... ...
... ... @@ -393,7 +393,8 @@ public class DeviceApiController implements TbTransportService {
393 393 }
394 394
395 395 @Override
396   - public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) {
  396 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg msg) {
  397 + log.trace("[{}] Received attributes update notification to device", sessionId);
397 398 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
398 399 }
399 400
... ... @@ -404,7 +405,8 @@ public class DeviceApiController implements TbTransportService {
404 405 }
405 406
406 407 @Override
407   - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
  408 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
  409 + log.trace("[{}] Received RPC command to device", sessionId);
408 410 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
409 411 transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY);
410 412 }
... ...
... ... @@ -21,10 +21,11 @@ import org.eclipse.leshan.core.request.BindingMode;
21 21 import org.eclipse.leshan.core.util.Hex;
22 22 import org.eclipse.leshan.server.bootstrap.BootstrapConfig;
23 23
  24 +import java.io.Serializable;
24 25 import java.nio.charset.StandardCharsets;
25 26
26 27 @Data
27   -public class LwM2MBootstrapConfig {
  28 +public class LwM2MBootstrapConfig implements Serializable {
28 29 /*
29 30 interface BootstrapSecurityConfig
30 31 servers: BootstrapServersSecurityConfig,
... ...
... ... @@ -146,10 +146,10 @@ public class LwM2mCredentialsSecurityInfoValidator {
146 146 PSKClientCredentials pskConfig = (PSKClientCredentials) clientCredentialsConfig;
147 147 if (StringUtils.isNotEmpty(pskConfig.getIdentity())) {
148 148 try {
149   - if (pskConfig.getKey() != null && pskConfig.getKey().length > 0) {
  149 + if (pskConfig.getDecodedKey() != null && pskConfig.getDecodedKey().length > 0) {
150 150 endpoint = StringUtils.isNotEmpty(pskConfig.getEndpoint()) ? pskConfig.getEndpoint() : endpoint;
151 151 if (endpoint != null && !endpoint.isEmpty()) {
152   - result.setSecurityInfo(SecurityInfo.newPreSharedKeyInfo(endpoint, pskConfig.getIdentity(), pskConfig.getKey()));
  152 + result.setSecurityInfo(SecurityInfo.newPreSharedKeyInfo(endpoint, pskConfig.getIdentity(), pskConfig.getDecodedKey()));
153 153 result.setSecurityMode(PSK);
154 154 }
155 155 }
... ... @@ -164,8 +164,8 @@ public class LwM2mCredentialsSecurityInfoValidator {
164 164 private void createClientSecurityInfoRPK(TbLwM2MSecurityInfo result, String endpoint, LwM2MClientCredentials clientCredentialsConfig) {
165 165 RPKClientCredentials rpkConfig = (RPKClientCredentials) clientCredentialsConfig;
166 166 try {
167   - if (rpkConfig.getKey() != null) {
168   - PublicKey key = SecurityUtil.publicKey.decode(rpkConfig.getKey());
  167 + if (rpkConfig.getDecodedKey() != null) {
  168 + PublicKey key = SecurityUtil.publicKey.decode(rpkConfig.getDecodedKey());
169 169 result.setSecurityInfo(SecurityInfo.newRawPublicKeyInfo(endpoint, key));
170 170 result.setSecurityMode(RPK);
171 171 } else {
... ...
... ... @@ -42,8 +42,8 @@ import org.thingsboard.server.common.transport.util.SslUtil;
42 42 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
43 43 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
44 44 import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials;
45   -import org.thingsboard.server.transport.lwm2m.server.store.TbEditableSecurityStore;
46 45 import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore;
  46 +import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
47 47
48 48 import javax.annotation.PostConstruct;
49 49 import javax.security.auth.x500.X500Principal;
... ... @@ -67,7 +67,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer
67 67 private final TbLwM2MDtlsSessionStore sessionStorage;
68 68 private final LwM2MTransportServerConfig config;
69 69 private final LwM2mCredentialsSecurityInfoValidator securityInfoValidator;
70   - private final TbEditableSecurityStore securityStore;
  70 + private final TbMainSecurityStore securityStore;
71 71
72 72 @SuppressWarnings("deprecation")
73 73 private StaticCertificateVerifier staticCertificateVerifier;
... ... @@ -134,7 +134,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer
134 134 if (msg.hasDeviceInfo() && deviceProfile != null) {
135 135 sessionStorage.put(endpoint, new TbX509DtlsSessionInfo(cert.getSubjectX500Principal().getName(), msg));
136 136 try {
137   - securityStore.put(securityInfo);
  137 + securityStore.putX509(securityInfo);
138 138 } catch (NonUniqueSecurityInfoException e) {
139 139 log.trace("Failed to add security info: {}", securityInfo, e);
140 140 }
... ...
... ... @@ -23,8 +23,10 @@ import org.thingsboard.server.common.data.DeviceProfile;
23 23 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
24 24 import org.thingsboard.server.transport.lwm2m.bootstrap.secure.LwM2MBootstrapConfig;
25 25
  26 +import java.io.Serializable;
  27 +
26 28 @Data
27   -public class TbLwM2MSecurityInfo {
  29 +public class TbLwM2MSecurityInfo implements Serializable {
28 30 private ValidateDeviceCredentialsResponse msg;
29 31 private SecurityInfo securityInfo;
30 32 private SecurityMode securityMode;
... ...
... ... @@ -18,8 +18,10 @@ package org.thingsboard.server.transport.lwm2m.secure;
18 18 import lombok.Data;
19 19 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
20 20
  21 +import java.io.Serializable;
  22 +
21 23 @Data
22   -public class TbX509DtlsSessionInfo {
  24 +public class TbX509DtlsSessionInfo implements Serializable {
23 25
24 26 private final String x509CommonName;
25 27 private final ValidateDeviceCredentialsResponse credentials;
... ...
... ... @@ -55,7 +55,8 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
55 55 }
56 56
57 57 @Override
58   - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) {
  58 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  59 + log.trace("[{}] Received attributes update notification to device", sessionId);
59 60 this.attributesService.onAttributesUpdate(attributeUpdateNotification, this.sessionInfo);
60 61 }
61 62
... ... @@ -80,7 +81,8 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
80 81 }
81 82
82 83 @Override
83   - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
  84 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  85 + log.trace("[{}] Received RPC command to device", sessionId);
84 86 this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
85 87 transportService.process(sessionInfo, toDeviceRequest, false, TransportServiceCallback.EMPTY);
86 88 }
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.lwm2m.server.client;
17 17
18 18 import lombok.RequiredArgsConstructor;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.eclipse.leshan.core.SecurityMode;
20 21 import org.eclipse.leshan.core.model.ResourceModel;
21 22 import org.eclipse.leshan.core.node.LwM2mPath;
22 23 import org.eclipse.leshan.server.registration.Registration;
... ... @@ -30,7 +31,7 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
30 31 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
31 32 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
32 33 import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
33   -import org.thingsboard.server.transport.lwm2m.server.store.TbEditableSecurityStore;
  34 +import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
34 35
35 36 import java.util.Arrays;
36 37 import java.util.Collection;
... ... @@ -54,7 +55,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
54 55
55 56 private final LwM2mTransportContext context;
56 57 private final LwM2MTransportServerConfig config;
57   - private final TbEditableSecurityStore securityStore;
  58 + private final TbMainSecurityStore securityStore;
58 59 private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
59 60 private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
60 61 private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
... ... @@ -75,6 +76,9 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
75 76 oldSession = lwM2MClient.getSession();
76 77 TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint());
77 78 if (securityInfo.getSecurityMode() != null) {
  79 + if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) {
  80 + securityStore.registerX509(registration.getEndpoint(), registration.getId());
  81 + }
78 82 if (securityInfo.getDeviceProfile() != null) {
79 83 profileUpdate(securityInfo.getDeviceProfile());
80 84 if (securityInfo.getSecurityInfo() != null) {
... ... @@ -124,7 +128,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
124 128 if (currentRegistration.getId().equals(registration.getId())) {
125 129 lwM2MClient.setState(LwM2MClientState.UNREGISTERED);
126 130 lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint());
127   - this.securityStore.remove(lwM2MClient.getEndpoint());
  131 + this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId());
128 132 UUID profileId = lwM2MClient.getProfileId();
129 133 if (profileId != null) {
130 134 Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
... ...
... ... @@ -15,28 +15,29 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m.server.store;
17 17
18   -import com.fasterxml.jackson.databind.JsonNode;
  18 +import org.nustaq.serialization.FSTConfiguration;
19 19 import org.springframework.data.redis.connection.RedisConnectionFactory;
20   -import org.thingsboard.common.util.JacksonUtil;
21 20 import org.thingsboard.server.transport.lwm2m.secure.TbX509DtlsSessionInfo;
22 21
23 22 public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore {
24 23
25 24 private static final String SESSION_EP = "SESSION#EP#";
26   - RedisConnectionFactory connectionFactory;
  25 + private final RedisConnectionFactory connectionFactory;
  26 + private final FSTConfiguration serializer;
27 27
28 28 public TbLwM2MDtlsSessionRedisStore(RedisConnectionFactory redisConnectionFactory) {
29 29 this.connectionFactory = redisConnectionFactory;
  30 + this.serializer = FSTConfiguration.createDefaultConfiguration();
30 31 }
31 32
32 33 @Override
33 34 public void put(String endpoint, TbX509DtlsSessionInfo msg) {
34 35 try (var c = connectionFactory.getConnection()) {
35   - var msgJson = JacksonUtil.convertValue(msg, JsonNode.class);
36   - if (msgJson != null) {
37   - c.set(getKey(endpoint), msgJson.toString().getBytes());
  36 + var serializedMsg = serializer.asByteArray(msg);
  37 + if (serializedMsg != null) {
  38 + c.set(getKey(endpoint), serializedMsg);
38 39 } else {
39   - throw new RuntimeException("Problem with serialization of message: " + msg.toString());
  40 + throw new RuntimeException("Problem with serialization of message: " + msg);
40 41 }
41 42 }
42 43 }
... ... @@ -46,7 +47,7 @@ public class TbLwM2MDtlsSessionRedisStore implements TbLwM2MDtlsSessionStore {
46 47 try (var c = connectionFactory.getConnection()) {
47 48 var data = c.get(getKey(endpoint));
48 49 if (data != null) {
49   - return JacksonUtil.fromString(new String(data), TbX509DtlsSessionInfo.class);
  50 + return (TbX509DtlsSessionInfo) serializer.asObject(data);
50 51 } else {
51 52 return null;
52 53 }
... ...
... ... @@ -15,49 +15,55 @@
15 15 */
16 16 package org.thingsboard.server.transport.lwm2m.server.store;
17 17
18   -import org.eclipse.leshan.server.redis.serialization.SecurityInfoSerDes;
19   -import org.eclipse.leshan.server.security.EditableSecurityStore;
20 18 import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
21 19 import org.eclipse.leshan.server.security.SecurityInfo;
22   -import org.eclipse.leshan.server.security.SecurityStoreListener;
23   -import org.springframework.data.redis.connection.RedisClusterConnection;
  20 +import org.nustaq.serialization.FSTConfiguration;
24 21 import org.springframework.data.redis.connection.RedisConnectionFactory;
25   -import org.springframework.data.redis.core.Cursor;
26   -import org.springframework.data.redis.core.ScanOptions;
  22 +import org.springframework.integration.redis.util.RedisLockRegistry;
27 23 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
28 24
29   -import java.util.ArrayList;
30   -import java.util.Collection;
31   -import java.util.LinkedList;
32   -import java.util.List;
  25 +import java.util.concurrent.locks.Lock;
33 26
34 27 public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
35 28 private static final String SEC_EP = "SEC#EP#";
36   -
  29 + private static final String LOCK_EP = "LOCK#EP#";
37 30 private static final String PSKID_SEC = "PSKID#SEC";
38 31
39 32 private final RedisConnectionFactory connectionFactory;
40   - private SecurityStoreListener listener;
  33 + private final FSTConfiguration serializer;
  34 + private final RedisLockRegistry redisLock;
41 35
42 36 public TbLwM2mRedisSecurityStore(RedisConnectionFactory connectionFactory) {
43 37 this.connectionFactory = connectionFactory;
  38 + redisLock = new RedisLockRegistry(connectionFactory, "Security");
  39 + serializer = FSTConfiguration.createDefaultConfiguration();
44 40 }
45 41
46 42 @Override
47 43 public SecurityInfo getByEndpoint(String endpoint) {
  44 + Lock lock = null;
48 45 try (var connection = connectionFactory.getConnection()) {
  46 + lock = redisLock.obtain(toLockKey(endpoint));
  47 + lock.lock();
49 48 byte[] data = connection.get((SEC_EP + endpoint).getBytes());
50 49 if (data == null) {
51 50 return null;
52 51 } else {
53   - return deserialize(data);
  52 + return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
  53 + }
  54 + } finally {
  55 + if (lock != null) {
  56 + lock.unlock();
54 57 }
55 58 }
56 59 }
57 60
58 61 @Override
59 62 public SecurityInfo getByIdentity(String identity) {
  63 + Lock lock = null;
60 64 try (var connection = connectionFactory.getConnection()) {
  65 + lock = redisLock.obtain(toLockKey(identity));
  66 + lock.lock();
61 67 byte[] ep = connection.hGet(PSKID_SEC.getBytes(), identity.getBytes());
62 68 if (ep == null) {
63 69 return null;
... ... @@ -66,102 +72,86 @@ public class TbLwM2mRedisSecurityStore implements TbEditableSecurityStore {
66 72 if (data == null) {
67 73 return null;
68 74 } else {
69   - return deserialize(data);
  75 + return ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
70 76 }
71 77 }
  78 + } finally {
  79 + if (lock != null) {
  80 + lock.unlock();
  81 + }
72 82 }
73 83 }
74 84
75 85 @Override
76 86 public void put(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException {
77   - //TODO: implement
  87 + SecurityInfo info = tbSecurityInfo.getSecurityInfo();
  88 + byte[] tbSecurityInfoSerialized = serializer.asByteArray(tbSecurityInfo);
  89 + Lock lock = null;
  90 + try (var connection = connectionFactory.getConnection()) {
  91 + lock = redisLock.obtain(tbSecurityInfo.getEndpoint());
  92 + lock.lock();
  93 + if (info != null && info.getIdentity() != null) {
  94 + byte[] oldEndpointBytes = connection.hGet(PSKID_SEC.getBytes(), info.getIdentity().getBytes());
  95 + if (oldEndpointBytes != null) {
  96 + String oldEndpoint = new String(oldEndpointBytes);
  97 + if (!oldEndpoint.equals(info.getEndpoint())) {
  98 + throw new NonUniqueSecurityInfoException("PSK Identity " + info.getIdentity() + " is already used");
  99 + }
  100 + connection.hSet(PSKID_SEC.getBytes(), info.getIdentity().getBytes(), info.getEndpoint().getBytes());
  101 + }
  102 + }
  103 +
  104 + byte[] previousData = connection.getSet((SEC_EP + tbSecurityInfo.getEndpoint()).getBytes(), tbSecurityInfoSerialized);
  105 + if (previousData != null && info != null) {
  106 + String previousIdentity = ((TbLwM2MSecurityInfo) serializer.asObject(previousData)).getSecurityInfo().getIdentity();
  107 + if (previousIdentity != null && !previousIdentity.equals(info.getIdentity())) {
  108 + connection.hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes());
  109 + }
  110 + }
  111 + } finally {
  112 + if (lock != null) {
  113 + lock.unlock();
  114 + }
  115 + }
78 116 }
79 117
80 118 @Override
81 119 public TbLwM2MSecurityInfo getTbLwM2MSecurityInfoByEndpoint(String endpoint) {
82   - //TODO: implement
83   - return null;
  120 + Lock lock = null;
  121 + try (var connection = connectionFactory.getConnection()) {
  122 + lock = redisLock.obtain(endpoint);
  123 + lock.lock();
  124 + byte[] data = connection.get((SEC_EP + endpoint).getBytes());
  125 + return (TbLwM2MSecurityInfo) serializer.asObject(data);
  126 + } finally {
  127 + if (lock != null) {
  128 + lock.unlock();
  129 + }
  130 + }
84 131 }
85 132
86 133 @Override
87 134 public void remove(String endpoint) {
88   - //TODO: implement
89   - }
90   -
91   - // @Override
92   -// public Collection<SecurityInfo> getAll() {
93   -// try (var connection = connectionFactory.getConnection()) {
94   -// Collection<SecurityInfo> list = new LinkedList<>();
95   -// ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(SEC_EP + "*").build();
96   -// List<Cursor<byte[]>> scans = new ArrayList<>();
97   -// if (connection instanceof RedisClusterConnection) {
98   -// ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
99   -// scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
100   -// });
101   -// } else {
102   -// scans.add(connection.scan(scanOptions));
103   -// }
104   -//
105   -// scans.forEach(scan -> {
106   -// scan.forEachRemaining(key -> {
107   -// byte[] element = connection.get(key);
108   -// list.add(deserialize(element));
109   -// });
110   -// });
111   -// return list;
112   -// }
113   -// }
114   -//
115   -// @Override
116   -// public SecurityInfo add(SecurityInfo info) throws NonUniqueSecurityInfoException {
117   -// byte[] data = serialize(info);
118   -// try (var connection = connectionFactory.getConnection()) {
119   -// if (info.getIdentity() != null) {
120   -// // populate the secondary index (security info by PSK id)
121   -// String oldEndpoint = new String(connection.hGet(PSKID_SEC.getBytes(), info.getIdentity().getBytes()));
122   -// if (!oldEndpoint.equals(info.getEndpoint())) {
123   -// throw new NonUniqueSecurityInfoException("PSK Identity " + info.getIdentity() + " is already used");
124   -// }
125   -// connection.hSet(PSKID_SEC.getBytes(), info.getIdentity().getBytes(), info.getEndpoint().getBytes());
126   -// }
127   -//
128   -// byte[] previousData = connection.getSet((SEC_EP + info.getEndpoint()).getBytes(), data);
129   -// SecurityInfo previous = previousData == null ? null : deserialize(previousData);
130   -// String previousIdentity = previous == null ? null : previous.getIdentity();
131   -// if (previousIdentity != null && !previousIdentity.equals(info.getIdentity())) {
132   -// connection.hDel(PSKID_SEC.getBytes(), previousIdentity.getBytes());
133   -// }
134   -//
135   -// return previous;
136   -// }
137   -// }
138   -//
139   -// @Override
140   -// public SecurityInfo remove(String endpoint, boolean infosAreCompromised) {
141   -// try (var connection = connectionFactory.getConnection()) {
142   -// byte[] data = connection.get((SEC_EP + endpoint).getBytes());
143   -//
144   -// if (data != null) {
145   -// SecurityInfo info = deserialize(data);
146   -// if (info.getIdentity() != null) {
147   -// connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes());
148   -// }
149   -// connection.del((SEC_EP + endpoint).getBytes());
150   -// if (listener != null) {
151   -// listener.securityInfoRemoved(infosAreCompromised, info);
152   -// }
153   -// return info;
154   -// }
155   -// }
156   -// return null;
157   -// }
158   -
159   - private byte[] serialize(SecurityInfo secInfo) {
160   - return SecurityInfoSerDes.serialize(secInfo);
  135 + Lock lock = null;
  136 + try (var connection = connectionFactory.getConnection()) {
  137 + lock = redisLock.obtain(endpoint);
  138 + lock.lock();
  139 + byte[] data = connection.get((SEC_EP + endpoint).getBytes());
  140 + if (data != null) {
  141 + SecurityInfo info = ((TbLwM2MSecurityInfo) serializer.asObject(data)).getSecurityInfo();
  142 + if (info != null && info.getIdentity() != null) {
  143 + connection.hDel(PSKID_SEC.getBytes(), info.getIdentity().getBytes());
  144 + }
  145 + connection.del((SEC_EP + endpoint).getBytes());
  146 + }
  147 + } finally {
  148 + if (lock != null) {
  149 + lock.unlock();
  150 + }
  151 + }
161 152 }
162 153
163   - private SecurityInfo deserialize(byte[] data) {
164   - return SecurityInfoSerDes.deserialize(data);
  154 + private String toLockKey(String endpoint) {
  155 + return LOCK_EP + endpoint;
165 156 }
166   -
167 157 }
... ...
... ... @@ -22,13 +22,22 @@ import org.jetbrains.annotations.Nullable;
22 22 import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInfoValidator;
23 23 import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
24 24
  25 +import java.util.HashSet;
  26 +import java.util.Map;
  27 +import java.util.Set;
  28 +import java.util.concurrent.ConcurrentHashMap;
  29 +import java.util.concurrent.ConcurrentMap;
  30 +import java.util.concurrent.locks.Lock;
  31 +import java.util.concurrent.locks.ReentrantLock;
  32 +
25 33 import static org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mTypeServer.CLIENT;
26 34
27 35 @Slf4j
28   -public class TbLwM2mSecurityStore implements TbEditableSecurityStore {
  36 +public class TbLwM2mSecurityStore implements TbMainSecurityStore {
29 37
30 38 private final TbEditableSecurityStore securityStore;
31 39 private final LwM2mCredentialsSecurityInfoValidator validator;
  40 + private final ConcurrentMap<String, Set<String>> endpointRegistrations = new ConcurrentHashMap<>();
32 41
33 42 public TbLwM2mSecurityStore(TbEditableSecurityStore securityStore, LwM2mCredentialsSecurityInfoValidator validator) {
34 43 this.securityStore = securityStore;
... ... @@ -61,24 +70,42 @@ public class TbLwM2mSecurityStore implements TbEditableSecurityStore {
61 70 @Nullable
62 71 public SecurityInfo fetchAndPutSecurityInfo(String credentialsId) {
63 72 TbLwM2MSecurityInfo securityInfo = validator.getEndpointSecurityInfoByCredentialsId(credentialsId, CLIENT);
64   - try {
65   - if (securityInfo != null) {
  73 + doPut(securityInfo);
  74 + return securityInfo != null ? securityInfo.getSecurityInfo() : null;
  75 + }
  76 +
  77 + private void doPut(TbLwM2MSecurityInfo securityInfo) {
  78 + if (securityInfo != null) {
  79 + try {
66 80 securityStore.put(securityInfo);
  81 + } catch (NonUniqueSecurityInfoException e) {
  82 + log.trace("Failed to add security info: {}", securityInfo, e);
67 83 }
68   - } catch (NonUniqueSecurityInfoException e) {
69   - log.trace("Failed to add security info: {}", securityInfo, e);
70 84 }
71   - return securityInfo != null ? securityInfo.getSecurityInfo() : null;
72 85 }
73 86
74 87 @Override
75   - public void put(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException {
76   - securityStore.put(tbSecurityInfo);
  88 + public void putX509(TbLwM2MSecurityInfo securityInfo) throws NonUniqueSecurityInfoException {
  89 + securityStore.put(securityInfo);
77 90 }
78 91
79 92 @Override
80   - public void remove(String endpoint) {
81   - //TODO: Make sure we delay removal of security store from endpoint due to reg/unreg race condition.
82   -// securityStore.remove(endpoint);
  93 + public void registerX509(String endpoint, String registrationId) {
  94 + endpointRegistrations.computeIfAbsent(endpoint, ep -> new HashSet<>()).add(registrationId);
  95 + }
  96 +
  97 + @Override
  98 + public void remove(String endpoint, String registrationId) {
  99 + Set<String> epRegistrationIds = endpointRegistrations.get(endpoint);
  100 + boolean shouldRemove;
  101 + if (epRegistrationIds == null) {
  102 + shouldRemove = true;
  103 + } else {
  104 + epRegistrationIds.remove(registrationId);
  105 + shouldRemove = epRegistrationIds.isEmpty();
  106 + }
  107 + if (shouldRemove) {
  108 + securityStore.remove(endpoint);
  109 + }
83 110 }
84 111 }
... ...
... ... @@ -51,7 +51,7 @@ public class TbLwM2mStoreFactory {
51 51 }
52 52
53 53 @Bean
54   - private TbEditableSecurityStore securityStore() {
  54 + private TbMainSecurityStore securityStore() {
55 55 return new TbLwM2mSecurityStore(redisConfiguration.isPresent() && useRedis ?
56 56 new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator);
57 57 }
... ...
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbMainSecurityStore.java renamed from application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java
... ... @@ -13,24 +13,17 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.service.ttl.timeseries;
  16 +package org.thingsboard.server.transport.lwm2m.server.store;
17 17
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.stereotype.Service;
20   -import org.thingsboard.server.dao.model.ModelConstants;
21   -import org.thingsboard.server.dao.util.TimescaleDBTsDao;
  18 +import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
  19 +import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
22 20
23   -import java.sql.Connection;
24   -import java.sql.SQLException;
  21 +public interface TbMainSecurityStore extends TbSecurityStore {
25 22
26   -@TimescaleDBTsDao
27   -@Service
28   -@Slf4j
29   -public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
  23 + void putX509(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException;
  24 +
  25 + void registerX509(String endpoint, String registrationId);
  26 +
  27 + void remove(String endpoint, String registrationId);
30 28
31   - @Override
32   - protected void doCleanUp(Connection connection) throws SQLException {
33   - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");
34   - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
35   - }
36 29 }
... ...
... ... @@ -796,7 +796,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
796 796 }
797 797
798 798 @Override
799   - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
  799 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  800 + log.trace("[{}] Received attributes update notification to device", sessionId);
800 801 try {
801 802 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
802 803 } catch (Exception e) {
... ... @@ -811,7 +812,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
811 812 }
812 813
813 814 @Override
814   - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  815 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
815 816 log.trace("[{}] Received RPC command to device", sessionId);
816 817 try {
817 818 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)
... ...
... ... @@ -84,7 +84,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
84 84 }
85 85
86 86 @Override
87   - public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
  87 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  88 + log.trace("[{}] Received attributes update notification to device", sessionId);
88 89 try {
89 90 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
90 91 } catch (Exception e) {
... ... @@ -93,7 +94,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
93 94 }
94 95
95 96 @Override
96   - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
  97 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
  98 + log.trace("[{}] Received RPC command to device", sessionId);
97 99 try {
98 100 parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
99 101 payload -> {
... ...
... ... @@ -128,7 +128,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
128 128 }
129 129
130 130 @Override
131   - public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) {
  131 + public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
  132 + log.trace("[{}] Received attributes update notification to device", sessionId);
132 133 snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
133 134 }
134 135
... ... @@ -138,7 +139,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
138 139 }
139 140
140 141 @Override
141   - public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
  142 + public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
  143 + log.trace("[{}] Received RPC command to device", sessionId);
142 144 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
143 145 snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, false, TransportServiceCallback.EMPTY);
144 146 }
... ...
... ... @@ -36,11 +36,11 @@ public interface SessionMsgListener {
36 36
37 37 void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
38 38
39   - void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
  39 + void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification);
40 40
41 41 void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification);
42 42
43   - void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest);
  43 + void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest);
44 44
45 45 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
46 46
... ...
... ... @@ -20,7 +20,6 @@ import org.thingsboard.server.common.data.DeviceTransportType;
20 20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22 22 import org.thingsboard.server.common.transport.service.SessionMetaData;
23   -import org.thingsboard.server.gen.transport.TransportProtos;
24 23 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
25 24 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
26 25 import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg;
... ... @@ -47,6 +46,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
47 46 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
48 47 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
49 48 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
  49 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
50 50 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
51 51 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
52 52 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
... ... @@ -109,7 +109,7 @@ public interface TransportService {
109 109
110 110 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
111 111
112   - void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, boolean isFailedRpc, TransportServiceCallback<Void> callback);
  112 + void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, boolean isFailedRpc, TransportServiceCallback<Void> callback);
113 113
114 114 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
115 115
... ...
... ... @@ -22,8 +22,10 @@ import org.thingsboard.server.common.data.id.DeviceId;
22 22 import org.thingsboard.server.common.data.id.DeviceProfileId;
23 23 import org.thingsboard.server.common.data.id.TenantId;
24 24
  25 +import java.io.Serializable;
  26 +
25 27 @Data
26   -public class TransportDeviceInfo {
  28 +public class TransportDeviceInfo implements Serializable {
27 29
28 30 private TenantId tenantId;
29 31 private CustomerId customerId;
... ...
... ... @@ -19,9 +19,11 @@ import lombok.Builder;
19 19 import lombok.Data;
20 20 import org.thingsboard.server.common.data.DeviceProfile;
21 21
  22 +import java.io.Serializable;
  23 +
22 24 @Data
23 25 @Builder
24   -public class ValidateDeviceCredentialsResponse implements DeviceProfileAware {
  26 +public class ValidateDeviceCredentialsResponse implements DeviceProfileAware, Serializable {
25 27
26 28 private final TransportDeviceInfo deviceInfo;
27 29 private final DeviceProfile deviceProfile;
... ...
... ... @@ -776,7 +776,7 @@ public class DefaultTransportService implements TransportService {
776 776 listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
777 777 }
778 778 if (toSessionMsg.hasAttributeUpdateNotification()) {
779   - listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
  779 + listener.onAttributeUpdate(sessionId, toSessionMsg.getAttributeUpdateNotification());
780 780 }
781 781 if (toSessionMsg.hasSessionCloseNotification()) {
782 782 listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification());
... ... @@ -785,7 +785,7 @@ public class DefaultTransportService implements TransportService {
785 785 listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification());
786 786 }
787 787 if (toSessionMsg.hasToDeviceRequest()) {
788   - listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
  788 + listener.onToDeviceRpcRequest(sessionId, toSessionMsg.getToDeviceRequest());
789 789 }
790 790 if (toSessionMsg.hasToServerResponse()) {
791 791 String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();
... ...
... ... @@ -176,4 +176,10 @@ public interface EdgeDao extends Dao<Edge> {
176 176 * @return the list of rule chain objects
177 177 */
178 178 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId);
  179 +
  180 + /**
  181 + * Executes stored procedure to cleanup old edge events.
  182 + * @param ttl the ttl for edge events in seconds
  183 + */
  184 + void cleanupEvents(long ttl);
179 185 }
\ No newline at end of file
... ...
... ... @@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
627 627 return result.toString();
628 628 }
629 629
  630 + @Override
  631 + public void cleanupEvents(long ttl) {
  632 + edgeDao.cleanupEvents(ttl);
  633 + }
  634 +
630 635 private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
631 636 List<RuleChain> result = new ArrayList<>();
632 637 PageLink pageLink = new PageLink(DEFAULT_LIMIT);
... ...
... ... @@ -131,6 +131,11 @@ public class BaseEventService implements EventService {
131 131 } while (eventPageData.hasNext());
132 132 }
133 133
  134 + @Override
  135 + public void cleanupEvents(long ttl, long debugTtl) {
  136 + eventDao.cleanupEvents(ttl, debugTtl);
  137 + }
  138 +
134 139 private DataValidator<Event> eventValidator =
135 140 new DataValidator<Event>() {
136 141 @Override
... ...
... ... @@ -102,4 +102,10 @@ public interface EventDao extends Dao<Event> {
102 102 */
103 103 List<Event> findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit);
104 104
  105 + /**
  106 + * Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
  107 + * @param otherEventsTtl the ttl for events in seconds
  108 + * @param debugEventsTtl the ttl for debug events in seconds
  109 + */
  110 + void cleanupEvents(long otherEventsTtl, long debugEventsTtl);
105 111 }
... ...
... ... @@ -15,11 +15,33 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 19 import org.springframework.beans.factory.annotation.Autowired;
19 20
  21 +import javax.sql.DataSource;
  22 +import java.sql.SQLException;
  23 +import java.sql.SQLWarning;
  24 +import java.sql.Statement;
  25 +
  26 +@Slf4j
20 27 public abstract class JpaAbstractDaoListeningExecutorService {
21 28
22 29 @Autowired
23 30 protected JpaExecutorService service;
24 31
  32 + @Autowired
  33 + protected DataSource dataSource;
  34 +
  35 + protected void printWarnings(Statement statement) throws SQLException {
  36 + SQLWarning warnings = statement.getWarnings();
  37 + if (warnings != null) {
  38 + log.debug("{}", warnings.getMessage());
  39 + SQLWarning nextWarning = warnings.getNextWarning();
  40 + while (nextWarning != null) {
  41 + log.debug("{}", nextWarning.getMessage());
  42 + nextWarning = nextWarning.getNextWarning();
  43 + }
  44 + }
  45 + }
  46 +
25 47 }
... ...
... ... @@ -40,6 +40,10 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity;
40 40 import org.thingsboard.server.dao.relation.RelationDao;
41 41 import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
42 42
  43 +import java.sql.Connection;
  44 +import java.sql.PreparedStatement;
  45 +import java.sql.ResultSet;
  46 +import java.sql.SQLException;
43 47 import java.util.ArrayList;
44 48 import java.util.Collections;
45 49 import java.util.List;
... ... @@ -194,6 +198,24 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao<EdgeEntity, Edge> imple
194 198 return transformFromRelationToEdge(tenantId, relations);
195 199 }
196 200
  201 + @Override
  202 + public void cleanupEvents(long ttl) {
  203 + log.info("Going to cleanup old edge events using ttl: {}s", ttl);
  204 + try (Connection connection = dataSource.getConnection();
  205 + PreparedStatement stmt = connection.prepareStatement("call cleanup_edge_events_by_ttl(?,?)")) {
  206 + stmt.setLong(1, ttl);
  207 + stmt.setLong(2, 0);
  208 + stmt.execute();
  209 + printWarnings(stmt);
  210 + try (ResultSet resultSet = stmt.getResultSet()) {
  211 + resultSet.next();
  212 + log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1));
  213 + }
  214 + } catch (SQLException e) {
  215 + log.error("SQLException occurred during edge events TTL task execution ", e);
  216 + }
  217 + }
  218 +
197 219 private ListenableFuture<List<Edge>> transformFromRelationToEdge(UUID tenantId, ListenableFuture<List<EntityRelation>> relations) {
198 220 return Futures.transformAsync(relations, input -> {
199 221 List<ListenableFuture<Edge>> edgeFutures = new ArrayList<>(input.size());
... ...
... ... @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event;
27 27 import org.thingsboard.server.common.data.event.DebugEvent;
28 28 import org.thingsboard.server.common.data.event.ErrorEventFilter;
29 29 import org.thingsboard.server.common.data.event.EventFilter;
30   -import org.thingsboard.server.common.data.event.EventType;
31 30 import org.thingsboard.server.common.data.event.LifeCycleEventFilter;
32 31 import org.thingsboard.server.common.data.event.StatisticsEventFilter;
33 32 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -40,6 +39,10 @@ import org.thingsboard.server.dao.event.EventDao;
40 39 import org.thingsboard.server.dao.model.sql.EventEntity;
41 40 import org.thingsboard.server.dao.sql.JpaAbstractDao;
42 41
  42 +import java.sql.Connection;
  43 +import java.sql.PreparedStatement;
  44 +import java.sql.ResultSet;
  45 +import java.sql.SQLException;
43 46 import java.util.List;
44 47 import java.util.Objects;
45 48 import java.util.Optional;
... ... @@ -256,6 +259,25 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen
256 259 return DaoUtil.convertDataList(latest);
257 260 }
258 261
  262 + @Override
  263 + public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
  264 + log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl);
  265 + try (Connection connection = dataSource.getConnection();
  266 + PreparedStatement stmt = connection.prepareStatement("call cleanup_events_by_ttl(?,?,?)")) {
  267 + stmt.setLong(1, otherEventsTtl);
  268 + stmt.setLong(2, debugEventsTtl);
  269 + stmt.setLong(3, 0);
  270 + stmt.execute();
  271 + printWarnings(stmt);
  272 + try (ResultSet resultSet = stmt.getResultSet()){
  273 + resultSet.next();
  274 + log.info("Total events removed by TTL: [{}]", resultSet.getLong(1));
  275 + }
  276 + } catch (SQLException e) {
  277 + log.error("SQLException occurred during events TTL task execution ", e);
  278 + }
  279 + }
  280 +
259 281 public Optional<Event> save(EventEntity entity, boolean ifNotExists) {
260 282 log.debug("Save event [{}] ", entity);
261 283 if (entity.getTenantId() == null) {
... ...
... ... @@ -25,9 +25,14 @@ import org.thingsboard.server.common.data.id.EntityId;
25 25 import org.thingsboard.server.common.data.id.TenantId;
26 26 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
27 27 import org.thingsboard.server.common.data.kv.TsKvEntry;
  28 +import org.thingsboard.server.dao.model.ModelConstants;
28 29 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
29 30
30 31 import javax.annotation.Nullable;
  32 +import java.sql.Connection;
  33 +import java.sql.PreparedStatement;
  34 +import java.sql.ResultSet;
  35 +import java.sql.SQLException;
31 36 import java.util.List;
32 37 import java.util.Objects;
33 38 import java.util.concurrent.TimeUnit;
... ... @@ -62,6 +67,24 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
62 67 @Value("${sql.ttl.ts.ts_key_value_ttl:0}")
63 68 private long systemTtl;
64 69
  70 + public void cleanup(long systemTtl) {
  71 + log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl);
  72 + try (Connection connection = dataSource.getConnection();
  73 + PreparedStatement stmt = connection.prepareStatement("call cleanup_timeseries_by_ttl(?,?,?)")) {
  74 + stmt.setObject(1, ModelConstants.NULL_UUID);
  75 + stmt.setLong(2, systemTtl);
  76 + stmt.setLong(3, 0);
  77 + stmt.execute();
  78 + printWarnings(stmt);
  79 + try (ResultSet resultSet = stmt.getResultSet()) {
  80 + resultSet.next();
  81 + log.info("Total telemetry removed stats by TTL for entities: [{}]", resultSet.getLong(1));
  82 + }
  83 + } catch (SQLException e) {
  84 + log.error("SQLException occurred during timeseries TTL task execution ", e);
  85 + }
  86 + }
  87 +
65 88 protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
66 89 List<ListenableFuture<List<TsKvEntry>>> futures = queries
67 90 .stream()
... ...
... ... @@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
54 54 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
55 55 }
56 56
  57 + @Override
  58 + public void cleanup(long systemTtl) {
  59 +
  60 + }
  61 +
57 62 }
... ...
... ... @@ -35,6 +35,10 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
35 35 import org.thingsboard.server.dao.util.PsqlDao;
36 36 import org.thingsboard.server.dao.util.SqlTsDao;
37 37
  38 +import java.sql.Connection;
  39 +import java.sql.PreparedStatement;
  40 +import java.sql.ResultSet;
  41 +import java.sql.SQLException;
38 42 import java.time.Instant;
39 43 import java.time.LocalDateTime;
40 44 import java.time.ZoneOffset;
... ... @@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
62 66 @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}")
63 67 private String partitioning;
64 68
  69 +
65 70 @Override
66 71 protected void init() {
67 72 super.init();
... ... @@ -93,6 +98,30 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
93 98 return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
94 99 }
95 100
  101 + @Override
  102 + public void cleanup(long systemTtl) {
  103 + cleanupPartitions(systemTtl);
  104 + super.cleanup(systemTtl);
  105 + }
  106 +
  107 + private void cleanupPartitions(long systemTtl) {
  108 + log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl);
  109 + try (Connection connection = dataSource.getConnection();
  110 + PreparedStatement stmt = connection.prepareStatement("call drop_partitions_by_max_ttl(?,?,?)")) {
  111 + stmt.setString(1, partitioning);
  112 + stmt.setLong(2, systemTtl);
  113 + stmt.setLong(3, 0);
  114 + stmt.execute();
  115 + printWarnings(stmt);
  116 + try (ResultSet resultSet = stmt.getResultSet()) {
  117 + resultSet.next();
  118 + log.info("Total partitions removed by TTL: [{}]", resultSet.getLong(1));
  119 + }
  120 + } catch (SQLException e) {
  121 + log.error("SQLException occurred during TTL task execution ", e);
  122 + }
  123 + }
  124 +
96 125 private void savePartitionIfNotExist(long ts) {
97 126 if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) {
98 127 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
34 34 import org.thingsboard.server.common.data.kv.TsKvEntry;
35 35 import org.thingsboard.server.common.stats.StatsFactory;
36 36 import org.thingsboard.server.dao.DaoUtil;
  37 +import org.thingsboard.server.dao.model.ModelConstants;
37 38 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
38 39 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
39 40 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
... ... @@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
45 46
46 47 import javax.annotation.PostConstruct;
47 48 import javax.annotation.PreDestroy;
  49 +import java.sql.CallableStatement;
  50 +import java.sql.SQLException;
  51 +import java.sql.Types;
48 52 import java.util.*;
49 53 import java.util.concurrent.CompletableFuture;
50 54 import java.util.function.Function;
... ... @@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
156 160 }
157 161 }
158 162
  163 + @Override
  164 + public void cleanup(long systemTtl) {
  165 + super.cleanup(systemTtl);
  166 + }
  167 +
159 168 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
160 169 String strKey = query.getKey();
161 170 Integer keyId = getOrSaveKeyId(strKey);
... ...
... ... @@ -127,6 +127,11 @@ public class BaseTimeseriesService implements TimeseriesService {
127 127 }
128 128
129 129 @Override
  130 + public void cleanup(long systemTtl) {
  131 + timeseriesDao.cleanup(systemTtl);
  132 + }
  133 +
  134 + @Override
130 135 public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
131 136 validate(entityId);
132 137 if (tsKvEntry == null) {
... ...
... ... @@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
288 288 }
289 289 }
290 290
  291 + @Override
  292 + public void cleanup(long systemTtl) {
  293 + //Cleanup by TTL is native for Cassandra
  294 + }
  295 +
291 296 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
292 297 long minPartition = toPartitionTs(query.getStartTs());
293 298 long maxPartition = toPartitionTs(query.getEndTs());
... ...
... ... @@ -38,4 +38,6 @@ public interface TimeseriesDao {
38 38 ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
39 39
40 40 ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
  41 +
  42 + void cleanup(long systemTtl);
41 43 }
... ...
... ... @@ -38,17 +38,18 @@ CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar
38 38 LANGUAGE plpgsql AS
39 39 $$
40 40 DECLARE
41   - max_tenant_ttl bigint;
42   - max_customer_ttl bigint;
43   - max_ttl bigint;
44   - date timestamp;
45   - partition_by_max_ttl_date varchar;
46   - partition_month varchar;
47   - partition_day varchar;
48   - partition_year varchar;
49   - partition varchar;
50   - partition_to_delete varchar;
51   -
  41 + max_tenant_ttl bigint;
  42 + max_customer_ttl bigint;
  43 + max_ttl bigint;
  44 + date timestamp;
  45 + partition_by_max_ttl_date varchar;
  46 + partition_by_max_ttl_month varchar;
  47 + partition_by_max_ttl_day varchar;
  48 + partition_by_max_ttl_year varchar;
  49 + partition varchar;
  50 + partition_year integer;
  51 + partition_month integer;
  52 + partition_day integer;
52 53
53 54 BEGIN
54 55 SELECT max(attribute_kv.long_v)
... ... @@ -65,53 +66,138 @@ BEGIN
65 66 if max_ttl IS NOT NULL AND max_ttl > 0 THEN
66 67 date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - max_ttl);
67 68 partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
  69 + RAISE NOTICE 'Date by max ttl: %', date;
68 70 RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
69 71 IF partition_by_max_ttl_date IS NOT NULL THEN
70 72 CASE
71 73 WHEN partition_type = 'DAYS' THEN
72   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
73   - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
74   - partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
  74 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  75 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  76 + partition_by_max_ttl_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
75 77 WHEN partition_type = 'MONTHS' THEN
76   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
77   - partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
  78 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  79 + partition_by_max_ttl_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
78 80 ELSE
79   - partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
  81 + partition_by_max_ttl_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
80 82 END CASE;
81   - FOR partition IN SELECT tablename
82   - FROM pg_tables
83   - WHERE schemaname = 'public'
84   - AND tablename like 'ts_kv_' || '%'
85   - AND tablename != 'ts_kv_latest'
86   - AND tablename != 'ts_kv_dictionary'
87   - AND tablename != 'ts_kv_indefinite'
88   - LOOP
89   - IF partition != partition_by_max_ttl_date THEN
90   - IF partition_year IS NOT NULL THEN
91   - IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
92   - partition_to_delete := partition;
93   - ELSE
94   - IF partition_month IS NOT NULL THEN
95   - IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
96   - partition_to_delete := partition;
  83 + IF partition_by_max_ttl_year IS NULL THEN
  84 + RAISE NOTICE 'Failed to remove partitions by max ttl date due to partition_by_max_ttl_year is null!';
  85 + ELSE
  86 + IF partition_type = 'YEARS' THEN
  87 + FOR partition IN SELECT tablename
  88 + FROM pg_tables
  89 + WHERE schemaname = 'public'
  90 + AND tablename like 'ts_kv_' || '%'
  91 + AND tablename != 'ts_kv_latest'
  92 + AND tablename != 'ts_kv_dictionary'
  93 + AND tablename != 'ts_kv_indefinite'
  94 + AND tablename != partition_by_max_ttl_date
  95 + LOOP
  96 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  97 + IF partition_year < partition_by_max_ttl_year::integer THEN
  98 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  99 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  100 + deleted := deleted + 1;
  101 + END IF;
  102 + END LOOP;
  103 + ELSE
  104 + IF partition_type = 'MONTHS' THEN
  105 + IF partition_by_max_ttl_month IS NULL THEN
  106 + RAISE NOTICE 'Failed to remove months partitions by max ttl date due to partition_by_max_ttl_month is null!';
  107 + ELSE
  108 + FOR partition IN SELECT tablename
  109 + FROM pg_tables
  110 + WHERE schemaname = 'public'
  111 + AND tablename like 'ts_kv_' || '%'
  112 + AND tablename != 'ts_kv_latest'
  113 + AND tablename != 'ts_kv_dictionary'
  114 + AND tablename != 'ts_kv_indefinite'
  115 + AND tablename != partition_by_max_ttl_date
  116 + LOOP
  117 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  118 + IF partition_year > partition_by_max_ttl_year::integer THEN
  119 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  120 + CONTINUE;
97 121 ELSE
98   - IF partition_day IS NOT NULL THEN
99   - IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
100   - partition_to_delete := partition;
  122 + IF partition_year < partition_by_max_ttl_year::integer THEN
  123 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  124 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  125 + deleted := deleted + 1;
  126 + ELSE
  127 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  128 + IF partition_year = partition_by_max_ttl_year::integer THEN
  129 + IF partition_month >= partition_by_max_ttl_month::integer THEN
  130 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  131 + CONTINUE;
  132 + ELSE
  133 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  134 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  135 + deleted := deleted + 1;
  136 + END IF;
101 137 END IF;
102 138 END IF;
103 139 END IF;
  140 + END LOOP;
  141 + END IF;
  142 + ELSE
  143 + IF partition_type = 'DAYS' THEN
  144 + IF partition_by_max_ttl_month IS NULL THEN
  145 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_month is null!';
  146 + ELSE
  147 + IF partition_by_max_ttl_day IS NULL THEN
  148 + RAISE NOTICE 'Failed to remove days partitions by max ttl date due to partition_by_max_ttl_day is null!';
  149 + ELSE
  150 + FOR partition IN SELECT tablename
  151 + FROM pg_tables
  152 + WHERE schemaname = 'public'
  153 + AND tablename like 'ts_kv_' || '%'
  154 + AND tablename != 'ts_kv_latest'
  155 + AND tablename != 'ts_kv_dictionary'
  156 + AND tablename != 'ts_kv_indefinite'
  157 + AND tablename != partition_by_max_ttl_date
  158 + LOOP
  159 + partition_year := SPLIT_PART(partition, '_', 3)::integer;
  160 + IF partition_year > partition_by_max_ttl_year::integer THEN
  161 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  162 + CONTINUE;
  163 + ELSE
  164 + IF partition_year < partition_by_max_ttl_year::integer THEN
  165 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  166 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  167 + deleted := deleted + 1;
  168 + ELSE
  169 + partition_month := SPLIT_PART(partition, '_', 4)::integer;
  170 + IF partition_month > partition_by_max_ttl_month::integer THEN
  171 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  172 + CONTINUE;
  173 + ELSE
  174 + IF partition_month < partition_by_max_ttl_month::integer THEN
  175 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  176 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  177 + deleted := deleted + 1;
  178 + ELSE
  179 + partition_day := SPLIT_PART(partition, '_', 5)::integer;
  180 + IF partition_day >= partition_by_max_ttl_day::integer THEN
  181 + RAISE NOTICE 'Skip iteration! Partition: % is valid!', partition;
  182 + CONTINUE;
  183 + ELSE
  184 + IF partition_day < partition_by_max_ttl_day::integer THEN
  185 + RAISE NOTICE 'Partition to delete by max ttl: %', partition;
  186 + EXECUTE format('DROP TABLE IF EXISTS %I', partition);
  187 + deleted := deleted + 1;
  188 + END IF;
  189 + END IF;
  190 + END IF;
  191 + END IF;
  192 + END IF;
  193 + END IF;
  194 + END LOOP;
104 195 END IF;
105 196 END IF;
106 197 END IF;
107   - IF partition_to_delete IS NOT NULL THEN
108   - RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
109   - EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);
110   - partition_to_delete := NULL;
111   - deleted := deleted + 1;
112   - END IF;
113 198 END IF;
114   - END LOOP;
  199 + END IF;
  200 + END IF;
115 201 END IF;
116 202 END IF;
117 203 END
... ... @@ -127,8 +213,6 @@ BEGIN
127 213 partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
128 214 WHEN partition_type = 'YEARS' THEN
129 215 partition := 'ts_kv_' || to_char(date, 'yyyy');
130   - WHEN partition_type = 'INDEFINITE' THEN
131   - partition := NULL;
132 216 ELSE
133 217 partition := NULL;
134 218 END CASE;
... ...
... ... @@ -2378,7 +2378,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
2378 2378
2379 2379 public void setUserCredentialsEnabled(UserId userId, boolean userCredentialsEnabled) {
2380 2380 restTemplate.postForLocation(
2381   - baseURL + "/api/user/{userId}/userCredentialsEnabled?serCredentialsEnabled={serCredentialsEnabled}",
  2381 + baseURL + "/api/user/{userId}/userCredentialsEnabled?userCredentialsEnabled={userCredentialsEnabled}",
2382 2382 null,
2383 2383 userId.getId(),
2384 2384 userCredentialsEnabled);
... ...
... ... @@ -2369,7 +2369,7 @@
2369 2369 "delete-resource-text": "Be careful, after the confirmation the resource will become unrecoverable.",
2370 2370 "delete-resource-title": "Are you sure you want to delete the resource '{{resourceTitle}}'?",
2371 2371 "delete-resources-action-title": "Delete { count, plural, 1 {1 resource} other {# resources} }",
2372   - "delete-resources-text": "Be careful, after the confirmation all selected resources will be removed.",
  2372 + "delete-resources-text": "Please note that the selected resources, even if they are used in device profiles, will be deleted.",
2373 2373 "delete-resources-title": "Are you sure you want to delete { count, plural, 1 {1 resource} other {# resources} }?",
2374 2374 "download": "Download resource",
2375 2375 "drop-file": "Drop a resource file or click to select a file to upload.",
... ...