Commit 2e08805e07f1558b5779a30308736307c32ca076

Authored by Dmytro Shvaika
1 parent f8a355fe

fix upgrade on RDS using insert strategy

... ... @@ -16,51 +16,67 @@
16 16
17 17 -- call create_partition_ts_kv_table();
18 18
19   -CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table() LANGUAGE plpgsql AS $$
  19 +CREATE OR REPLACE PROCEDURE create_partition_ts_kv_table()
  20 + LANGUAGE plpgsql AS
  21 +$$
20 22
21 23 BEGIN
22   - ALTER TABLE ts_kv
23   - RENAME TO ts_kv_old;
24   - ALTER TABLE ts_kv_old
25   - RENAME CONSTRAINT ts_kv_pkey TO ts_kv_pkey_old;
26   - CREATE TABLE IF NOT EXISTS ts_kv
27   - (
28   - LIKE ts_kv_old
29   - )
30   - PARTITION BY RANGE (ts);
31   - ALTER TABLE ts_kv
32   - DROP COLUMN entity_type;
33   - ALTER TABLE ts_kv
34   - ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
35   - ALTER TABLE ts_kv
36   - ALTER COLUMN key TYPE integer USING key::integer;
37   - ALTER TABLE ts_kv
38   - ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);
39   - CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;
  24 + ALTER TABLE ts_kv
  25 + DROP CONSTRAINT IF EXISTS ts_kv_unq_key;
  26 + ALTER TABLE ts_kv
  27 + DROP CONSTRAINT IF EXISTS ts_kv_pkey;
  28 + ALTER TABLE ts_kv
  29 + ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_type, entity_id, key, ts);
  30 + ALTER TABLE ts_kv
  31 + RENAME TO ts_kv_old;
  32 + ALTER TABLE ts_kv_old
  33 + RENAME CONSTRAINT ts_kv_pkey TO ts_kv_pkey_old;
  34 + CREATE TABLE IF NOT EXISTS ts_kv
  35 + (
  36 + LIKE ts_kv_old
  37 + )
  38 + PARTITION BY RANGE (ts);
  39 + ALTER TABLE ts_kv
  40 + DROP COLUMN entity_type;
  41 + ALTER TABLE ts_kv
  42 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  43 + ALTER TABLE ts_kv
  44 + ALTER COLUMN key TYPE integer USING key::integer;
  45 + ALTER TABLE ts_kv
  46 + ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);
  47 + CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;
40 48 END;
41 49 $$;
42 50
43 51 -- call create_new_ts_kv_latest_table();
44 52
45   -CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$
  53 +CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table()
  54 + LANGUAGE plpgsql AS
  55 +$$
46 56
47 57 BEGIN
48 58 IF NOT EXISTS(SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'ts_kv_latest_old') THEN
49   - ALTER TABLE ts_kv_latest
  59 + ALTER TABLE ts_kv_latest
  60 + DROP CONSTRAINT IF EXISTS ts_kv_latest_unq_key;
  61 + ALTER TABLE ts_kv_latest
  62 + DROP CONSTRAINT IF EXISTS ts_kv_latest_pkey;
  63 + ALTER TABLE ts_kv_latest
  64 + ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_type, entity_id, key);
  65 + ALTER TABLE ts_kv_latest
50 66 RENAME TO ts_kv_latest_old;
51   - ALTER TABLE ts_kv_latest_old
52   - RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
53   - CREATE TABLE IF NOT EXISTS ts_kv_latest
54   - (
  67 + ALTER TABLE ts_kv_latest_old
  68 + RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
  69 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  70 + (
55 71 LIKE ts_kv_latest_old
56   - );
57   - ALTER TABLE ts_kv_latest
  72 + );
  73 + ALTER TABLE ts_kv_latest
58 74 DROP COLUMN entity_type;
59   - ALTER TABLE ts_kv_latest
  75 + ALTER TABLE ts_kv_latest
60 76 ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
61   - ALTER TABLE ts_kv_latest
  77 + ALTER TABLE ts_kv_latest
62 78 ALTER COLUMN key TYPE integer USING key::integer;
63   - ALTER TABLE ts_kv_latest
  79 + ALTER TABLE ts_kv_latest
64 80 ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
65 81 ELSE
66 82 RAISE NOTICE 'ts_kv_latest_old table already exists!';
... ... @@ -112,8 +128,9 @@ BEGIN
112 128 RETURN QUERY SELECT SUBSTRING(year_date.year, 1, 4) AS partition_date,
113 129 (extract(epoch from (year_date.year)::timestamp) * 1000)::bigint AS from_ts,
114 130 (extract(epoch from (year_date.year::date + INTERVAL '1 YEAR')::timestamp) *
115   - 1000)::bigint AS to_ts
116   - FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_01_01') AS year FROM ts_kv_old) AS year_date;
  131 + 1000)::bigint AS to_ts
  132 + FROM (SELECT DISTINCT TO_CHAR(TO_TIMESTAMP(ts / 1000), 'YYYY_01_01') AS year
  133 + FROM ts_kv_old) AS year_date;
117 134 ELSE
118 135 RAISE EXCEPTION 'Failed to parse partitioning property: % !', partition_type;
119 136 END CASE;
... ... @@ -122,13 +139,16 @@ $$ LANGUAGE plpgsql;
122 139
123 140 -- call create_partitions();
124 141
125   -CREATE OR REPLACE PROCEDURE create_partitions(IN partition_type varchar) LANGUAGE plpgsql AS $$
  142 +CREATE OR REPLACE PROCEDURE create_partitions(IN partition_type varchar)
  143 + LANGUAGE plpgsql AS
  144 +$$
126 145
127 146 DECLARE
128 147 partition_date varchar;
129 148 from_ts bigint;
130 149 to_ts bigint;
131   - partitions_cursor CURSOR FOR SELECT * FROM get_partitions_data(partition_type);
  150 + partitions_cursor CURSOR FOR SELECT *
  151 + FROM get_partitions_data(partition_type);
132 152 BEGIN
133 153 OPEN partitions_cursor;
134 154 LOOP
... ... @@ -146,21 +166,25 @@ $$;
146 166
147 167 -- call create_ts_kv_dictionary_table();
148 168
149   -CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$
  169 +CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table()
  170 + LANGUAGE plpgsql AS
  171 +$$
150 172
151 173 BEGIN
152   - CREATE TABLE IF NOT EXISTS ts_kv_dictionary
153   - (
154   - key varchar(255) NOT NULL,
155   - key_id serial UNIQUE,
156   - CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
157   - );
  174 + CREATE TABLE IF NOT EXISTS ts_kv_dictionary
  175 + (
  176 + key varchar(255) NOT NULL,
  177 + key_id serial UNIQUE,
  178 + CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
  179 + );
158 180 END;
159 181 $$;
160 182
161 183 -- call insert_into_dictionary();
162 184
163   -CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$
  185 +CREATE OR REPLACE PROCEDURE insert_into_dictionary()
  186 + LANGUAGE plpgsql AS
  187 +$$
164 188
165 189 DECLARE
166 190 insert_record RECORD;
... ... @@ -191,9 +215,11 @@ BEGIN
191 215 END;
192 216 $$ LANGUAGE plpgsql;
193 217
194   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv(IN path_to_file varchar) LANGUAGE plpgsql AS $$
  218 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv(IN path_to_file varchar)
  219 + LANGUAGE plpgsql AS
  220 +$$
195 221 BEGIN
196   - EXECUTE format ('COPY (SELECT to_uuid(entity_id) AS entity_id,
  222 + EXECUTE format('COPY (SELECT to_uuid(entity_id) AS entity_id,
197 223 ts_kv_records.key AS key,
198 224 ts_kv_records.ts AS ts,
199 225 ts_kv_records.bool_v AS bool_v,
... ... @@ -208,16 +234,19 @@ BEGIN
208 234 long_v,
209 235 dbl_v
210 236 FROM ts_kv_old
211   - INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records) TO %L;', path_to_file);
212   - EXECUTE format ('COPY ts_kv FROM %L', path_to_file);
  237 + INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records) TO %L;',
  238 + path_to_file);
  239 + EXECUTE format('COPY ts_kv FROM %L', path_to_file);
213 240 END
214 241 $$;
215 242
216 243 -- call insert_into_ts_kv_latest();
217 244
218   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest(IN path_to_file varchar) LANGUAGE plpgsql AS $$
  245 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest(IN path_to_file varchar)
  246 + LANGUAGE plpgsql AS
  247 +$$
219 248 BEGIN
220   - EXECUTE format ('COPY (SELECT to_uuid(entity_id) AS entity_id,
  249 + EXECUTE format('COPY (SELECT to_uuid(entity_id) AS entity_id,
221 250 ts_kv_latest_records.key AS key,
222 251 ts_kv_latest_records.ts AS ts,
223 252 ts_kv_latest_records.bool_v AS bool_v,
... ... @@ -232,27 +261,30 @@ BEGIN
232 261 long_v,
233 262 dbl_v
234 263 FROM ts_kv_latest_old
235   - INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records) TO %L;', path_to_file);
236   - EXECUTE format ('COPY ts_kv_latest FROM %L', path_to_file);
  264 + INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records) TO %L;',
  265 + path_to_file);
  266 + EXECUTE format('COPY ts_kv_latest FROM %L', path_to_file);
237 267 END;
238 268 $$;
239 269
240 270 -- call insert_into_ts_kv_cursor();
241 271
242   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor() LANGUAGE plpgsql AS $$
  272 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor()
  273 + LANGUAGE plpgsql AS
  274 +$$
243 275 DECLARE
244 276 insert_size CONSTANT integer := 10000;
245 277 insert_counter integer DEFAULT 0;
246 278 insert_record RECORD;
247   - insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
248   - ts_kv_records.key AS key,
249   - ts_kv_records.ts AS ts,
250   - ts_kv_records.bool_v AS bool_v,
251   - ts_kv_records.str_v AS str_v,
252   - ts_kv_records.long_v AS long_v,
253   - ts_kv_records.dbl_v AS dbl_v
254   - FROM (SELECT entity_id AS entity_id,
255   - key_id AS key,
  279 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  280 + ts_kv_records.key AS key,
  281 + ts_kv_records.ts AS ts,
  282 + ts_kv_records.bool_v AS bool_v,
  283 + ts_kv_records.str_v AS str_v,
  284 + ts_kv_records.long_v AS long_v,
  285 + ts_kv_records.dbl_v AS dbl_v
  286 + FROM (SELECT entity_id AS entity_id,
  287 + key_id AS key,
256 288 ts,
257 289 bool_v,
258 290 str_v,
... ... @@ -282,20 +314,22 @@ $$;
282 314
283 315 -- call insert_into_ts_kv_latest_cursor();
284 316
285   -CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest_cursor() LANGUAGE plpgsql AS $$
  317 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest_cursor()
  318 + LANGUAGE plpgsql AS
  319 +$$
286 320 DECLARE
287 321 insert_size CONSTANT integer := 10000;
288 322 insert_counter integer DEFAULT 0;
289 323 insert_record RECORD;
290   - insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
291   - ts_kv_latest_records.key AS key,
292   - ts_kv_latest_records.ts AS ts,
293   - ts_kv_latest_records.bool_v AS bool_v,
294   - ts_kv_latest_records.str_v AS str_v,
295   - ts_kv_latest_records.long_v AS long_v,
296   - ts_kv_latest_records.dbl_v AS dbl_v
297   - FROM (SELECT entity_id AS entity_id,
298   - key_id AS key,
  324 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  325 + ts_kv_latest_records.key AS key,
  326 + ts_kv_latest_records.ts AS ts,
  327 + ts_kv_latest_records.bool_v AS bool_v,
  328 + ts_kv_latest_records.str_v AS str_v,
  329 + ts_kv_latest_records.long_v AS long_v,
  330 + ts_kv_latest_records.dbl_v AS dbl_v
  331 + FROM (SELECT entity_id AS entity_id,
  332 + key_id AS key,
299 333 ts,
300 334 bool_v,
301 335 str_v,
... ...
... ... @@ -103,8 +103,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
103 103 executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
104 104 executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
105 105
106   - Path pathToTempTsKvFile;
107   - Path pathToTempTsKvLatestFile;
  106 + Path pathToTempTsKvFile = null;
  107 + Path pathToTempTsKvLatestFile = null;
108 108 if (SystemUtils.IS_OS_WINDOWS) {
109 109 log.info("Lookup for environment variable: {} ...", THINGSBOARD_WINDOWS_UPGRADE_DIR);
110 110 Path pathToDir;
... ... @@ -125,12 +125,11 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
125 125 try {
126 126 copyTimeseries(conn, pathToTempTsKvFile, pathToTempTsKvLatestFile);
127 127 } catch (Exception e) {
128   - log.info("Upgrade script failed using the copy to/from files strategy!" +
129   - " Trying to perfrom the upgrade using Inserts strategy ...");
130 128 insertTimeseries(conn);
131 129 }
132 130 } catch (IOException | SecurityException e) {
133   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  131 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  132 + insertTimeseries(conn);
134 133 }
135 134 } else {
136 135 try {
... ... @@ -148,13 +147,11 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
148 147 throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
149 148 }
150 149 } catch (Exception e) {
151   - log.info(e.getMessage());
152   - log.info("Upgrade script failed using the copy to/from files strategy!" +
153   - " Trying to perfrom the upgrade using Inserts strategy ...");
154 150 insertTimeseries(conn);
155 151 }
156 152 } catch (IOException | SecurityException e) {
157   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  153 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  154 + insertTimeseries(conn);
158 155 }
159 156 }
160 157
... ... @@ -204,11 +201,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
204 201 }
205 202
206 203 private void removeUpgradeFiles(Path pathToTempTsKvFile, Path pathToTempTsKvLatestFile) {
207   - if (pathToTempTsKvFile.toFile().exists() && pathToTempTsKvLatestFile.toFile().exists()) {
  204 + if (pathToTempTsKvFile != null && pathToTempTsKvFile.toFile().exists()) {
208 205 boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
209 206 if (deleteTsKvFile) {
210 207 log.info("Successfully deleted the temp file for ts_kv table upgrade!");
211 208 }
  209 + }
  210 + if (pathToTempTsKvLatestFile != null && pathToTempTsKvLatestFile.toFile().exists()) {
212 211 boolean deleteTsKvLatestFile = pathToTempTsKvLatestFile.toFile().delete();
213 212 if (deleteTsKvLatestFile) {
214 213 log.info("Successfully deleted the temp file for ts_kv_latest table upgrade!");
... ... @@ -223,6 +222,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
223 222 }
224 223
225 224 private void insertTimeseries(Connection conn) {
  225 + log.warn("Upgrade script failed using the copy to/from files strategy!" +
  226 + " Trying to perfrom the upgrade using Inserts strategy ...");
226 227 executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
227 228 executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
228 229 executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST_CURSOR);
... ...
... ... @@ -99,7 +99,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
99 99 executeQuery(conn, CALL_CREATE_TS_KV_DICTIONARY_TABLE);
100 100 executeQuery(conn, CALL_INSERT_INTO_DICTIONARY);
101 101
102   - Path pathToTempTsKvFile;
  102 + Path pathToTempTsKvFile = null;
103 103 if (SystemUtils.IS_OS_WINDOWS) {
104 104 Path pathToDir;
105 105 log.info("Lookup for environment variable: {} ...", THINGSBOARD_WINDOWS_UPGRADE_DIR);
... ... @@ -118,12 +118,11 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
118 118 try {
119 119 executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
120 120 } catch (Exception e) {
121   - log.info("Upgrade script failed using the copy to/from files strategy!" +
122   - " Trying to perfrom the upgrade using Inserts strategy ...");
123   - executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  121 + insertTimeseries(conn);
124 122 }
125 123 } catch (IOException | SecurityException e) {
126   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  124 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  125 + insertTimeseries(conn);
127 126 }
128 127 } else {
129 128 try {
... ... @@ -140,13 +139,11 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
140 139 throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
141 140 }
142 141 } catch (Exception e) {
143   - log.info(e.getMessage());
144   - log.info("Upgrade script failed using the copy to/from files strategy!" +
145   - " Trying to perfrom the upgrade using Inserts strategy ...");
146   - executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  142 + insertTimeseries(conn);
147 143 }
148 144 } catch (IOException | SecurityException e) {
149   - throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
  145 + log.warn("Failed to create time-series upgrade files due to: {}", e.getMessage());
  146 + insertTimeseries(conn);
150 147 }
151 148 }
152 149 removeUpgradeFile(pathToTempTsKvFile);
... ... @@ -185,8 +182,14 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
185 182 }
186 183 }
187 184
  185 + private void insertTimeseries(Connection conn) {
  186 + log.warn("Upgrade script failed using the copy to/from files strategy!" +
  187 + " Trying to perfrom the upgrade using Inserts strategy ...");
  188 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  189 + }
  190 +
188 191 private void removeUpgradeFile(Path pathToTempTsKvFile) {
189   - if (pathToTempTsKvFile.toFile().exists()) {
  192 + if (pathToTempTsKvFile != null && pathToTempTsKvFile.toFile().exists()) {
190 193 boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
191 194 if (deleteTsKvFile) {
192 195 log.info("Successfully deleted the temp file for ts_kv table upgrade!");
... ...