Commit 65fb9a91ec5901e0c06999eb35367182cff32f88

Authored by Igor Kulikov
2 parents 2d7d83f0 bd40edfd

Merge with develop/2.5.1

... ... @@ -16,50 +16,85 @@
16 16
17 17 -- call create_partition_ts_kv_table();
18 18
19   -CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table() LANGUAGE plpgsql AS $$
  19 +CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table()
  20 + LANGUAGE plpgsql AS
  21 +$$
20 22
21 23 BEGIN
22   - ALTER TABLE ts_kv
23   - RENAME TO ts_kv_old;
24   - ALTER TABLE ts_kv_old
25   - RENAME CONSTRAINT ts_kv_pkey TO ts_kv_pkey_old;
26   - CREATE TABLE IF NOT EXISTS ts_kv
27   - (
28   - LIKE ts_kv_old
29   - )
30   - PARTITION BY RANGE (ts);
31   - ALTER TABLE ts_kv
32   - DROP COLUMN entity_type;
33   - ALTER TABLE ts_kv
34   - ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
35   - ALTER TABLE ts_kv
36   - ALTER COLUMN key TYPE integer USING key::integer;
37   - ALTER TABLE ts_kv
38   - ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);
  24 + ALTER TABLE ts_kv
  25 + DROP CONSTRAINT IF EXISTS ts_kv_unq_key;
  26 + ALTER TABLE ts_kv
  27 + DROP CONSTRAINT IF EXISTS ts_kv_pkey;
  28 + ALTER TABLE ts_kv
  29 + ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_type, entity_id, key, ts);
  30 + ALTER TABLE ts_kv
  31 + RENAME TO ts_kv_old;
  32 + ALTER TABLE ts_kv_old
  33 + RENAME CONSTRAINT ts_kv_pkey TO ts_kv_pkey_old;
  34 + CREATE TABLE IF NOT EXISTS ts_kv
  35 + (
  36 + LIKE ts_kv_old
  37 + )
  38 + PARTITION BY RANGE (ts);
  39 + ALTER TABLE ts_kv
  40 + DROP COLUMN entity_type;
  41 + ALTER TABLE ts_kv
  42 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  43 + ALTER TABLE ts_kv
  44 + ALTER COLUMN key TYPE integer USING key::integer;
  45 + ALTER TABLE ts_kv
  46 + ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);
  47 + CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;
39 48 END;
40 49 $$;
41 50
42 51 -- call create_new_ts_kv_latest_table();
43 52
44   -CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$
  53 +CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table()
  54 + LANGUAGE plpgsql AS
  55 +$$
45 56
46 57 BEGIN
47   - ALTER TABLE ts_kv_latest
48   - RENAME TO ts_kv_latest_old;
49   - ALTER TABLE ts_kv_latest_old
50   - RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
51   - CREATE TABLE IF NOT EXISTS ts_kv_latest
52   - (
53   - LIKE ts_kv_latest_old
54   - );
55   - ALTER TABLE ts_kv_latest
56   - DROP COLUMN entity_type;
57   - ALTER TABLE ts_kv_latest
58   - ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
59   - ALTER TABLE ts_kv_latest
60   - ALTER COLUMN key TYPE integer USING key::integer;
61   - ALTER TABLE ts_kv_latest
62   - ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
  58 + IF NOT EXISTS(SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'ts_kv_latest_old') THEN
  59 + ALTER TABLE ts_kv_latest
  60 + DROP CONSTRAINT IF EXISTS ts_kv_latest_unq_key;
  61 + ALTER TABLE ts_kv_latest
  62 + DROP CONSTRAINT IF EXISTS ts_kv_latest_pkey;
  63 + ALTER TABLE ts_kv_latest
  64 + ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_type, entity_id, key);
  65 + ALTER TABLE ts_kv_latest
  66 + RENAME TO ts_kv_latest_old;
  67 + ALTER TABLE ts_kv_latest_old
  68 + RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
  69 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  70 + (
  71 + LIKE ts_kv_latest_old
  72 + );
  73 + ALTER TABLE ts_kv_latest
  74 + DROP COLUMN entity_type;
  75 + ALTER TABLE ts_kv_latest
  76 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  77 + ALTER TABLE ts_kv_latest
  78 + ALTER COLUMN key TYPE integer USING key::integer;
  79 + ALTER TABLE ts_kv_latest
  80 + ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
  81 + ELSE
  82 + RAISE NOTICE 'ts_kv_latest_old table already exists!';
  83 + IF NOT EXISTS(SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'ts_kv_latest') THEN
  84 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  85 + (
  86 + entity_id uuid NOT NULL,
  87 + key int NOT NULL,
  88 + ts bigint NOT NULL,
  89 + bool_v boolean,
  90 + str_v varchar(10000000),
  91 + long_v bigint,
  92 + dbl_v double precision,
  93 + json_v json,
  94 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
  95 + );
  96 + END IF;
  97 + END IF;
63 98 END;
64 99 $$;
65 100
... ... @@ -93,8 +128,9 @@ BEGIN
93 128 RETURN QUERY SELECT SUBSTRING(year_date.year, 1, 4) AS partition_date,
94 129 (extract(epoch from (year_date.year)::timestamp) * 1000)::bigint AS from_ts,
95 130 (extract(epoch from (year_date.year::date + INTERVAL '1 YEAR')::timestamp) *
96   - 1000)::bigint AS to_ts
97   - FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_01_01') AS year FROM ts_kv_old) AS year_date;
  131 + 1000)::bigint AS to_ts
  132 + FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_01_01') AS year
  133 + FROM ts_kv_old) AS year_date;
98 134 ELSE
99 135 RAISE EXCEPTION 'Failed to parse partitioning property: % !', partition_type;
100 136 END CASE;
... ... @@ -103,13 +139,16 @@ $$ LANGUAGE plpgsql;
103 139
104 140 -- call create_partitions();
105 141
106   -CREATE OR REPLACE PROCEDURE create_partitions(IN partition_type varchar) LANGUAGE plpgsql AS $$
  142 +CREATE OR REPLACE PROCEDURE create_partitions(IN partition_type varchar)
  143 + LANGUAGE plpgsql AS
  144 +$$
107 145
108 146 DECLARE
109 147 partition_date varchar;
110 148 from_ts bigint;
111 149 to_ts bigint;
112   - partitions_cursor CURSOR FOR SELECT * FROM get_partitions_data(partition_type);
  150 + partitions_cursor CURSOR FOR SELECT *
  151 + FROM get_partitions_data(partition_type);
113 152 BEGIN
114 153 OPEN partitions_cursor;
115 154 LOOP
... ... @@ -127,21 +166,25 @@ $$;
127 166
128 167 -- call create_ts_kv_dictionary_table();
129 168
130   -CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$
  169 +CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table()
  170 + LANGUAGE plpgsql AS
  171 +$$
131 172
132 173 BEGIN
133   - CREATE TABLE IF NOT EXISTS ts_kv_dictionary
134   - (
135   - key varchar(255) NOT NULL,
136   - key_id serial UNIQUE,
137   - CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
138   - );
  174 + CREATE TABLE IF NOT EXISTS ts_kv_dictionary
  175 + (
  176 + key varchar(255) NOT NULL,
  177 + key_id serial UNIQUE,
  178 + CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
  179 + );
139 180 END;
140 181 $$;
141 182
142 183 -- call insert_into_dictionary();
143 184
144   -CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$
  185 +CREATE OR REPLACE PROCEDURE insert_into_dictionary()
  186 + LANGUAGE plpgsql AS
  187 +$$
145 188
146 189 DECLARE
147 190 insert_record RECORD;
... ... @@ -172,9 +215,11 @@ BEGIN
172 215 END;
173 216 $$ LANGUAGE plpgsql;
174 217
175   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv(IN path_to_file varchar) LANGUAGE plpgsql AS $$
  218 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv(IN path_to_file varchar)
  219 + LANGUAGE plpgsql AS
  220 +$$
176 221 BEGIN
177   - EXECUTE format ('COPY (SELECT to_uuid(entity_id) AS entity_id,
  222 + EXECUTE format('COPY (SELECT to_uuid(entity_id) AS entity_id,
178 223 ts_kv_records.key AS key,
179 224 ts_kv_records.ts AS ts,
180 225 ts_kv_records.bool_v AS bool_v,
... ... @@ -189,16 +234,19 @@ BEGIN
189 234 long_v,
190 235 dbl_v
191 236 FROM ts_kv_old
192   - INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records) TO %L;', path_to_file);
193   - EXECUTE format ('COPY ts_kv FROM %L', path_to_file);
  237 + INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records) TO %L;',
  238 + path_to_file);
  239 + EXECUTE format('COPY ts_kv FROM %L', path_to_file);
194 240 END
195 241 $$;
196 242
197 243 -- call insert_into_ts_kv_latest();
198 244
199   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest(IN path_to_file varchar) LANGUAGE plpgsql AS $$
  245 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest(IN path_to_file varchar)
  246 + LANGUAGE plpgsql AS
  247 +$$
200 248 BEGIN
201   - EXECUTE format ('COPY (SELECT to_uuid(entity_id) AS entity_id,
  249 + EXECUTE format('COPY (SELECT to_uuid(entity_id) AS entity_id,
202 250 ts_kv_latest_records.key AS key,
203 251 ts_kv_latest_records.ts AS ts,
204 252 ts_kv_latest_records.bool_v AS bool_v,
... ... @@ -213,9 +261,99 @@ BEGIN
213 261 long_v,
214 262 dbl_v
215 263 FROM ts_kv_latest_old
216   - INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records) TO %L;', path_to_file);
217   - EXECUTE format ('COPY ts_kv_latest FROM %L', path_to_file);
  264 + INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records) TO %L;',
  265 + path_to_file);
  266 + EXECUTE format('COPY ts_kv_latest FROM %L', path_to_file);
218 267 END;
219 268 $$;
220 269
  270 +-- call insert_into_ts_kv_cursor();
  271 +
  272 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor()
  273 + LANGUAGE plpgsql AS
  274 +$$
  275 +DECLARE
  276 + insert_size CONSTANT integer := 10000;
  277 + insert_counter integer DEFAULT 0;
  278 + insert_record RECORD;
  279 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  280 + ts_kv_records.key AS key,
  281 + ts_kv_records.ts AS ts,
  282 + ts_kv_records.bool_v AS bool_v,
  283 + ts_kv_records.str_v AS str_v,
  284 + ts_kv_records.long_v AS long_v,
  285 + ts_kv_records.dbl_v AS dbl_v
  286 + FROM (SELECT entity_id AS entity_id,
  287 + key_id AS key,
  288 + ts,
  289 + bool_v,
  290 + str_v,
  291 + long_v,
  292 + dbl_v
  293 + FROM ts_kv_old
  294 + INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records;
  295 +BEGIN
  296 + OPEN insert_cursor;
  297 + LOOP
  298 + insert_counter := insert_counter + 1;
  299 + FETCH insert_cursor INTO insert_record;
  300 + IF NOT FOUND THEN
  301 + RAISE NOTICE '% records have been inserted into the partitioned ts_kv!',insert_counter - 1;
  302 + EXIT;
  303 + END IF;
  304 + INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  305 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  306 + insert_record.long_v, insert_record.dbl_v);
  307 + IF MOD(insert_counter, insert_size) = 0 THEN
  308 + RAISE NOTICE '% records have been inserted into the partitioned ts_kv!',insert_counter;
  309 + END IF;
  310 + END LOOP;
  311 + CLOSE insert_cursor;
  312 +END;
  313 +$$;
  314 +
  315 +-- call insert_into_ts_kv_latest_cursor();
  316 +
  317 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest_cursor()
  318 + LANGUAGE plpgsql AS
  319 +$$
  320 +DECLARE
  321 + insert_size CONSTANT integer := 10000;
  322 + insert_counter integer DEFAULT 0;
  323 + insert_record RECORD;
  324 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  325 + ts_kv_latest_records.key AS key,
  326 + ts_kv_latest_records.ts AS ts,
  327 + ts_kv_latest_records.bool_v AS bool_v,
  328 + ts_kv_latest_records.str_v AS str_v,
  329 + ts_kv_latest_records.long_v AS long_v,
  330 + ts_kv_latest_records.dbl_v AS dbl_v
  331 + FROM (SELECT entity_id AS entity_id,
  332 + key_id AS key,
  333 + ts,
  334 + bool_v,
  335 + str_v,
  336 + long_v,
  337 + dbl_v
  338 + FROM ts_kv_latest_old
  339 + INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records;
  340 +BEGIN
  341 + OPEN insert_cursor;
  342 + LOOP
  343 + insert_counter := insert_counter + 1;
  344 + FETCH insert_cursor INTO insert_record;
  345 + IF NOT FOUND THEN
  346 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter - 1;
  347 + EXIT;
  348 + END IF;
  349 + INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  350 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  351 + insert_record.long_v, insert_record.dbl_v);
  352 + IF MOD(insert_counter, insert_size) = 0 THEN
  353 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter;
  354 + END IF;
  355 + END LOOP;
  356 + CLOSE insert_cursor;
  357 +END;
  358 +$$;
221 359
... ...
... ... @@ -162,3 +162,47 @@ BEGIN
162 162 CLOSE insert_cursor;
163 163 END;
164 164 $$;
  165 +
  166 +-- call insert_into_ts_kv_cursor();
  167 +
  168 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor() LANGUAGE plpgsql AS $$
  169 +
  170 +DECLARE
  171 + insert_size CONSTANT integer := 10000;
  172 + insert_counter integer DEFAULT 0;
  173 + insert_record RECORD;
  174 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  175 + new_ts_kv_records.key AS key,
  176 + new_ts_kv_records.ts AS ts,
  177 + new_ts_kv_records.bool_v AS bool_v,
  178 + new_ts_kv_records.str_v AS str_v,
  179 + new_ts_kv_records.long_v AS long_v,
  180 + new_ts_kv_records.dbl_v AS dbl_v
  181 + FROM (SELECT entity_id AS entity_id,
  182 + key_id AS key,
  183 + ts,
  184 + bool_v,
  185 + str_v,
  186 + long_v,
  187 + dbl_v
  188 + FROM tenant_ts_kv_old
  189 + INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS new_ts_kv_records;
  190 +BEGIN
  191 + OPEN insert_cursor;
  192 + LOOP
  193 + insert_counter := insert_counter + 1;
  194 + FETCH insert_cursor INTO insert_record;
  195 + IF NOT FOUND THEN
  196 + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter - 1;
  197 + EXIT;
  198 + END IF;
  199 + INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  200 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  201 + insert_record.long_v, insert_record.dbl_v);
  202 + IF MOD(insert_counter, insert_size) = 0 THEN
  203 + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter;
  204 + END IF;
  205 + END LOOP;
  206 + CLOSE insert_cursor;
  207 +END;
  208 +$$;
\ No newline at end of file
... ...
... ... @@ -149,12 +149,13 @@ public class ThingsboardInstallService {
149 149 databaseTsUpgradeService.upgradeDatabase("2.4.3");
150 150 }
151 151 databaseEntitiesUpgradeService.upgradeDatabase("2.4.3");
152   -
153   - log.info("Updating system data...");
154   - systemDataLoaderService.updateSystemWidgets();
155   - break;
156 152 case "2.5.0":
157   - log.info("Upgrading ThingsBoard from version 2.5 to 3.0 ...");
  153 + log.info("Upgrading ThingsBoard from version 2.5.0 to 2.5.1 ...");
  154 + if (databaseTsUpgradeService != null) {
  155 + databaseTsUpgradeService.upgradeDatabase("2.5.0");
  156 + }
  157 + case "2.5.1":
  158 + log.info("Upgrading ThingsBoard from version 2.5.1 to 3.0.0 ...");
158 159 log.info("Updating system data...");
159 160 systemDataLoaderService.updateSystemWidgets();
160 161 break;
... ...
... ... @@ -37,8 +37,6 @@ public class PsqlTsDatabaseSchemaService extends SqlAbstractDatabaseSchemaServic
37 37 @Override
38 38 public void createDatabaseSchema() throws Exception {
39 39 super.createDatabaseSchema();
40   - if (partitionType.equals("INDEFINITE")) {
41   - executeQuery("CREATE TABLE ts_kv_indefinite PARTITION OF ts_kv DEFAULT;");
42   - }
  40 + executeQuery("CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;");
43 41 }
44 42 }
\ No newline at end of file
... ...
... ... @@ -57,11 +57,15 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
57 57 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
58 58 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)";
59 59 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest(IN path_to_file varchar)";
  60 + private static final String INSERT_INTO_TS_KV_CURSOR = "insert_into_ts_kv_cursor()";
  61 + private static final String INSERT_INTO_TS_KV_LATEST_CURSOR = "insert_into_ts_kv_latest_cursor()";
60 62
61 63 private static final String CALL_CREATE_PARTITION_TS_KV_TABLE = CALL_REGEX + CREATE_PARTITION_TS_KV_TABLE;
62 64 private static final String CALL_CREATE_NEW_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_LATEST_TABLE;
63 65 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE;
64 66 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY;
  67 + private static final String CALL_INSERT_INTO_TS_KV_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_CURSOR;
  68 + private static final String CALL_INSERT_INTO_TS_KV_LATEST_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_LATEST_CURSOR;
65 69
66 70 private static final String DROP_TABLE_TS_KV_OLD = DROP_TABLE + TS_KV_OLD;
67 71 private static final String DROP_TABLE_TS_KV_LATEST_OLD = DROP_TABLE + TS_KV_LATEST_OLD;
... ... @@ -73,6 +77,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
73 77 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
74 78 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
75 79 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
  80 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_CURSOR;
  81 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST_CURSOR;
76 82 private static final String DROP_FUNCTION_GET_PARTITION_DATA = "DROP FUNCTION IF EXISTS get_partitions_data;";
77 83
78 84 @Override
... ... @@ -93,14 +99,12 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
93 99 executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
94 100 if (!partitionType.equals("INDEFINITE")) {
95 101 executeQuery(conn, "call create_partitions('" + partitionType + "')");
96   - } else {
97   - executeQuery(conn, "CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;");
98 102 }
99 103 executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
100 104 executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
101 105
102   - Path pathToTempTsKvFile;
103   - Path pathToTempTsKvLatestFile;
  106 + Path pathToTempTsKvFile = null;
  107 + Path pathToTempTsKvLatestFile = null;
104 108 if (SystemUtils.IS_OS_WINDOWS) {
105 109 log.info("Lookup for environment variable: {} ...", THINGSBOARD_WINDOWS_UPGRADE_DIR);
106 110 Path pathToDir;
... ... @@ -118,39 +122,41 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
118 122 Path tsKvLatestFile = Files.createTempFile(pathToDir, "ts_kv_latest", ".sql");
119 123 pathToTempTsKvFile = tsKvFile.toAbsolutePath();
120 124 pathToTempTsKvLatestFile = tsKvLatestFile.toAbsolutePath();
121   - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
122   - executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
123   - executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "');");
  125 + try {
  126 + copyTimeseries(conn, pathToTempTsKvFile, pathToTempTsKvLatestFile);
  127 + } catch (Exception e) {
  128 + insertTimeseries(conn);
  129 + }
124 130 } catch (IOException | SecurityException e) {
125   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  131 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  132 + insertTimeseries(conn);
126 133 }
127 134 } else {
128   - Path tempDirPath = Files.createTempDirectory("ts_kv");
129   - File tempDirAsFile = tempDirPath.toFile();
130   - boolean writable = tempDirAsFile.setWritable(true, false);
131   - boolean readable = tempDirAsFile.setReadable(true, false);
132   - boolean executable = tempDirAsFile.setExecutable(true, false);
133   - if (writable && readable && executable) {
  135 + try {
  136 + Path tempDirPath = Files.createTempDirectory("ts_kv");
  137 + File tempDirAsFile = tempDirPath.toFile();
  138 + boolean writable = tempDirAsFile.setWritable(true, false);
  139 + boolean readable = tempDirAsFile.setReadable(true, false);
  140 + boolean executable = tempDirAsFile.setExecutable(true, false);
134 141 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath();
135 142 pathToTempTsKvLatestFile = tempDirPath.resolve(TS_KV_LATEST_SQL).toAbsolutePath();
136   - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
137   - executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
138   - executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "');");
139   - } else {
140   - throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
141   - }
142   - }
143   - if (pathToTempTsKvFile.toFile().exists() && pathToTempTsKvLatestFile.toFile().exists()) {
144   - boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
145   - if (deleteTsKvFile) {
146   - log.info("Successfully deleted the temp file for ts_kv table upgrade!");
147   - }
148   - boolean deleteTsKvLatestFile = pathToTempTsKvLatestFile.toFile().delete();
149   - if (deleteTsKvLatestFile) {
150   - log.info("Successfully deleted the temp file for ts_kv_latest table upgrade!");
  143 + try {
  144 + if (writable && readable && executable) {
  145 + copyTimeseries(conn, pathToTempTsKvFile, pathToTempTsKvLatestFile);
  146 + } else {
  147 + throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
  148 + }
  149 + } catch (Exception e) {
  150 + insertTimeseries(conn);
  151 + }
  152 + } catch (IOException | SecurityException e) {
  153 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  154 + insertTimeseries(conn);
151 155 }
152 156 }
153 157
  158 + removeUpgradeFiles(pathToTempTsKvFile, pathToTempTsKvLatestFile);
  159 +
154 160 executeQuery(conn, DROP_TABLE_TS_KV_OLD);
155 161 executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD);
156 162
... ... @@ -161,6 +167,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
161 167 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
162 168 executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
163 169 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
  170 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR);
  171 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST_CURSOR);
164 172 executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA);
165 173
166 174 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
... ... @@ -181,11 +189,46 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
181 189 }
182 190 }
183 191 break;
  192 + case "2.5.0":
  193 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  194 + executeQuery(conn, "CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;");
  195 + executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005001");
  196 + }
  197 + break;
184 198 default:
185 199 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
186 200 }
187 201 }
188 202
  203 + private void removeUpgradeFiles(Path pathToTempTsKvFile, Path pathToTempTsKvLatestFile) {
  204 + if (pathToTempTsKvFile != null && pathToTempTsKvFile.toFile().exists()) {
  205 + boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
  206 + if (deleteTsKvFile) {
  207 + log.info("Successfully deleted the temp file for ts_kv table upgrade!");
  208 + }
  209 + }
  210 + if (pathToTempTsKvLatestFile != null && pathToTempTsKvLatestFile.toFile().exists()) {
  211 + boolean deleteTsKvLatestFile = pathToTempTsKvLatestFile.toFile().delete();
  212 + if (deleteTsKvLatestFile) {
  213 + log.info("Successfully deleted the temp file for ts_kv_latest table upgrade!");
  214 + }
  215 + }
  216 + }
  217 +
  218 + private void copyTimeseries(Connection conn, Path pathToTempTsKvFile, Path pathToTempTsKvLatestFile) {
  219 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  220 + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
  221 + executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "')");
  222 + }
  223 +
  224 + private void insertTimeseries(Connection conn) {
  225 + log.warn("Upgrade script failed using the copy to/from files strategy!" +
  226 + " Trying to perfrom the upgrade using Inserts strategy ...");
  227 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  228 + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
  229 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST_CURSOR);
  230 + }
  231 +
189 232 @Override
190 233 protected void loadSql(Connection conn, String fileName) {
191 234 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);
... ...
... ... @@ -53,6 +53,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
53 53 private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()";
54 54 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
55 55 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)";
  56 + private static final String INSERT_INTO_TS_KV_CURSOR = "insert_into_ts_kv_cursor()";
56 57 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest()";
57 58
58 59 private static final String CALL_CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_TS_KV_LATEST_TABLE;
... ... @@ -60,6 +61,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
60 61 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE;
61 62 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY;
62 63 private static final String CALL_INSERT_INTO_TS_KV_LATEST = CALL_REGEX + INSERT_INTO_TS_KV_LATEST;
  64 + private static final String CALL_INSERT_INTO_TS_KV_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_CURSOR;
63 65
64 66 private static final String DROP_OLD_TENANT_TS_KV_TABLE = DROP_TABLE + TENANT_TS_KV_OLD_TABLE;
65 67
... ... @@ -68,6 +70,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
68 70 private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
69 71 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
70 72 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
  73 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_CURSOR;
71 74 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
72 75
73 76 @Autowired
... ... @@ -96,7 +99,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
96 99 executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
97 100 executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
98 101
99   - Path pathToTempTsKvFile;
  102 + Path pathToTempTsKvFile = null;
100 103 if (SystemUtils.IS_OS_WINDOWS) {
101 104 Path pathToDir;
102 105 log.info("Lookup for environment variable: {} ...", THINGSBOARD_WINDOWS_UPGRADE_DIR);
... ... @@ -112,31 +115,38 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
112 115 try {
113 116 Path tsKvFile = Files.createTempFile(pathToDir, "ts_kv", ".sql");
114 117 pathToTempTsKvFile = tsKvFile.toAbsolutePath();
115   - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
116   - pathToTempTsKvFile.toFile().deleteOnExit();
  118 + try {
  119 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  120 + } catch (Exception e) {
  121 + insertTimeseries(conn);
  122 + }
117 123 } catch (IOException | SecurityException e) {
118   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  124 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  125 + insertTimeseries(conn);
119 126 }
120 127 } else {
121   - Path tempDirPath = Files.createTempDirectory("ts_kv");
122   - File tempDirAsFile = tempDirPath.toFile();
123   - boolean writable = tempDirAsFile.setWritable(true, false);
124   - boolean readable = tempDirAsFile.setReadable(true, false);
125   - boolean executable = tempDirAsFile.setExecutable(true, false);
126   - if (writable && readable && executable) {
  128 + try {
  129 + Path tempDirPath = Files.createTempDirectory("ts_kv");
  130 + File tempDirAsFile = tempDirPath.toFile();
  131 + boolean writable = tempDirAsFile.setWritable(true, false);
  132 + boolean readable = tempDirAsFile.setReadable(true, false);
  133 + boolean executable = tempDirAsFile.setExecutable(true, false);
127 134 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath();
128   - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
129   - } else {
130   - throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
131   - }
132   - }
133   -
134   - if (pathToTempTsKvFile.toFile().exists()) {
135   - boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
136   - if (deleteTsKvFile) {
137   - log.info("Successfully deleted the temp file for ts_kv table upgrade!");
  135 + try {
  136 + if (writable && readable && executable) {
  137 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  138 + } else {
  139 + throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
  140 + }
  141 + } catch (Exception e) {
  142 + insertTimeseries(conn);
  143 + }
  144 + } catch (IOException | SecurityException e) {
  145 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  146 + insertTimeseries(conn);
138 147 }
139 148 }
  149 + removeUpgradeFile(pathToTempTsKvFile);
140 150
141 151 executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
142 152
... ... @@ -147,6 +157,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
147 157 executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
148 158 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
149 159 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
  160 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR);
150 161 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
151 162
152 163 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
... ... @@ -161,11 +172,31 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
161 172 }
162 173 }
163 174 break;
  175 + case "2.5.0":
  176 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  177 + executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005001");
  178 + }
  179 + break;
164 180 default:
165 181 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
166 182 }
167 183 }
168 184
  185 + private void insertTimeseries(Connection conn) {
  186 + log.warn("Upgrade script failed using the copy to/from files strategy!" +
  187 + " Trying to perfrom the upgrade using Inserts strategy ...");
  188 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  189 + }
  190 +
  191 + private void removeUpgradeFile(Path pathToTempTsKvFile) {
  192 + if (pathToTempTsKvFile != null && pathToTempTsKvFile.toFile().exists()) {
  193 + boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
  194 + if (deleteTsKvFile) {
  195 + log.info("Successfully deleted the temp file for ts_kv table upgrade!");
  196 + }
  197 + }
  198 + }
  199 +
169 200 @Override
170 201 protected void loadSql(Connection conn, String fileName) {
171 202 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);
... ...
... ... @@ -90,7 +90,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
90 90 }
91 91
92 92 private void savePartitionIfNotExist(long ts) {
93   - if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) {
  93 + if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) {
94 94 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
95 95 LocalDateTime localDateTimeStart = tsFormat.trancateTo(time);
96 96 long partitionStartTs = toMills(localDateTimeStart);
... ...
... ... @@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings
52 52 CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
53 53 );
54 54
55   -INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000;
  55 +INSERT INTO tb_schema_settings (schema_version) VALUES (2005001) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005001;
56 56
57 57 CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
58 58 $$
... ...
... ... @@ -53,7 +53,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings
53 53 CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
54 54 );
55 55
56   -INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000;
  56 +INSERT INTO tb_schema_settings (schema_version) VALUES (2005001) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005001;
57 57
58 58 CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
59 59 LANGUAGE plpgsql AS
... ...