Commit ee7c4f6e7fc322728f9e9bf72f3f65eca404a159

Authored by Dmytro Shvaika
1 parent 34a9214c

added inserts strategy for upgrade to 2.5

@@ -36,6 +36,7 @@ BEGIN @@ -36,6 +36,7 @@ BEGIN
36 ALTER COLUMN key TYPE integer USING key::integer; 36 ALTER COLUMN key TYPE integer USING key::integer;
37 ALTER TABLE ts_kv 37 ALTER TABLE ts_kv
38 ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts); 38 ADD CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts);
  39 + CREATE TABLE IF NOT EXISTS ts_kv_indefinite PARTITION OF ts_kv DEFAULT;
39 END; 40 END;
40 $$; 41 $$;
41 42
@@ -44,22 +45,40 @@ $$; @@ -44,22 +45,40 @@ $$;
44 CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$ 45 CREATE OR REPLACE PROCEDURE create_new_ts_kv_latest_table() LANGUAGE plpgsql AS $$
45 46
46 BEGIN 47 BEGIN
47 - ALTER TABLE ts_kv_latest  
48 - RENAME TO ts_kv_latest_old;  
49 - ALTER TABLE ts_kv_latest_old  
50 - RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;  
51 - CREATE TABLE IF NOT EXISTS ts_kv_latest  
52 - (  
53 - LIKE ts_kv_latest_old  
54 - );  
55 - ALTER TABLE ts_kv_latest  
56 - DROP COLUMN entity_type;  
57 - ALTER TABLE ts_kv_latest  
58 - ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;  
59 - ALTER TABLE ts_kv_latest  
60 - ALTER COLUMN key TYPE integer USING key::integer;  
61 - ALTER TABLE ts_kv_latest  
62 - ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key); 48 + IF NOT EXISTS(SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'ts_kv_latest_old') THEN
  49 + ALTER TABLE ts_kv_latest
  50 + RENAME TO ts_kv_latest_old;
  51 + ALTER TABLE ts_kv_latest_old
  52 + RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
  53 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  54 + (
  55 + LIKE ts_kv_latest_old
  56 + );
  57 + ALTER TABLE ts_kv_latest
  58 + DROP COLUMN entity_type;
  59 + ALTER TABLE ts_kv_latest
  60 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  61 + ALTER TABLE ts_kv_latest
  62 + ALTER COLUMN key TYPE integer USING key::integer;
  63 + ALTER TABLE ts_kv_latest
  64 + ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
  65 + ELSE
  66 + RAISE NOTICE 'ts_kv_latest_old table already exists!';
  67 + IF NOT EXISTS(SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'ts_kv_latest') THEN
  68 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  69 + (
  70 + entity_id uuid NOT NULL,
  71 + key int NOT NULL,
  72 + ts bigint NOT NULL,
  73 + bool_v boolean,
  74 + str_v varchar(10000000),
  75 + long_v bigint,
  76 + dbl_v double precision,
  77 + json_v json,
  78 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
  79 + );
  80 + END IF;
  81 + END IF;
63 END; 82 END;
64 $$; 83 $$;
65 84
@@ -218,4 +237,89 @@ BEGIN @@ -218,4 +237,89 @@ BEGIN
218 END; 237 END;
219 $$; 238 $$;
220 239
  240 +-- call insert_into_ts_kv_cursor();
  241 +
  242 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor() LANGUAGE plpgsql AS $$
  243 +DECLARE
  244 + insert_size CONSTANT integer := 10000;
  245 + insert_counter integer DEFAULT 0;
  246 + insert_record RECORD;
  247 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  248 + ts_kv_records.key AS key,
  249 + ts_kv_records.ts AS ts,
  250 + ts_kv_records.bool_v AS bool_v,
  251 + ts_kv_records.str_v AS str_v,
  252 + ts_kv_records.long_v AS long_v,
  253 + ts_kv_records.dbl_v AS dbl_v
  254 + FROM (SELECT entity_id AS entity_id,
  255 + key_id AS key,
  256 + ts,
  257 + bool_v,
  258 + str_v,
  259 + long_v,
  260 + dbl_v
  261 + FROM ts_kv_old
  262 + INNER JOIN ts_kv_dictionary ON (ts_kv_old.key = ts_kv_dictionary.key)) AS ts_kv_records;
  263 +BEGIN
  264 + OPEN insert_cursor;
  265 + LOOP
  266 + insert_counter := insert_counter + 1;
  267 + FETCH insert_cursor INTO insert_record;
  268 + IF NOT FOUND THEN
  269 + RAISE NOTICE '% records have been inserted into the partitioned ts_kv!',insert_counter - 1;
  270 + EXIT;
  271 + END IF;
  272 + INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  273 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  274 + insert_record.long_v, insert_record.dbl_v);
  275 + IF MOD(insert_counter, insert_size) = 0 THEN
  276 + RAISE NOTICE '% records have been inserted into the partitioned ts_kv!',insert_counter;
  277 + END IF;
  278 + END LOOP;
  279 + CLOSE insert_cursor;
  280 +END;
  281 +$$;
  282 +
  283 +-- call insert_into_ts_kv_latest_cursor();
  284 +
  285 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest_cursor() LANGUAGE plpgsql AS $$
  286 +DECLARE
  287 + insert_size CONSTANT integer := 10000;
  288 + insert_counter integer DEFAULT 0;
  289 + insert_record RECORD;
  290 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  291 + ts_kv_latest_records.key AS key,
  292 + ts_kv_latest_records.ts AS ts,
  293 + ts_kv_latest_records.bool_v AS bool_v,
  294 + ts_kv_latest_records.str_v AS str_v,
  295 + ts_kv_latest_records.long_v AS long_v,
  296 + ts_kv_latest_records.dbl_v AS dbl_v
  297 + FROM (SELECT entity_id AS entity_id,
  298 + key_id AS key,
  299 + ts,
  300 + bool_v,
  301 + str_v,
  302 + long_v,
  303 + dbl_v
  304 + FROM ts_kv_latest_old
  305 + INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS ts_kv_latest_records;
  306 +BEGIN
  307 + OPEN insert_cursor;
  308 + LOOP
  309 + insert_counter := insert_counter + 1;
  310 + FETCH insert_cursor INTO insert_record;
  311 + IF NOT FOUND THEN
  312 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter - 1;
  313 + EXIT;
  314 + END IF;
  315 + INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  316 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  317 + insert_record.long_v, insert_record.dbl_v);
  318 + IF MOD(insert_counter, insert_size) = 0 THEN
  319 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter;
  320 + END IF;
  321 + END LOOP;
  322 + CLOSE insert_cursor;
  323 +END;
  324 +$$;
221 325
@@ -162,3 +162,47 @@ BEGIN @@ -162,3 +162,47 @@ BEGIN
162 CLOSE insert_cursor; 162 CLOSE insert_cursor;
163 END; 163 END;
164 $$; 164 $$;
  165 +
  166 +-- call insert_into_ts_kv_cursor();
  167 +
  168 +CREATE OR REPLACE PROCEDURE insert_into_ts_kv_cursor() LANGUAGE plpgsql AS $$
  169 +
  170 +DECLARE
  171 + insert_size CONSTANT integer := 10000;
  172 + insert_counter integer DEFAULT 0;
  173 + insert_record RECORD;
  174 + insert_cursor CURSOR FOR SELECT to_uuid(entity_id) AS entity_id,
  175 + new_ts_kv_records.key AS key,
  176 + new_ts_kv_records.ts AS ts,
  177 + new_ts_kv_records.bool_v AS bool_v,
  178 + new_ts_kv_records.str_v AS str_v,
  179 + new_ts_kv_records.long_v AS long_v,
  180 + new_ts_kv_records.dbl_v AS dbl_v
  181 + FROM (SELECT entity_id AS entity_id,
  182 + key_id AS key,
  183 + ts,
  184 + bool_v,
  185 + str_v,
  186 + long_v,
  187 + dbl_v
  188 + FROM tenant_ts_kv_old
  189 + INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS new_ts_kv_records;
  190 +BEGIN
  191 + OPEN insert_cursor;
  192 + LOOP
  193 + insert_counter := insert_counter + 1;
  194 + FETCH insert_cursor INTO insert_record;
  195 + IF NOT FOUND THEN
  196 + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter - 1;
  197 + EXIT;
  198 + END IF;
  199 + INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  200 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  201 + insert_record.long_v, insert_record.dbl_v);
  202 + IF MOD(insert_counter, insert_size) = 0 THEN
  203 + RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter;
  204 + END IF;
  205 + END LOOP;
  206 + CLOSE insert_cursor;
  207 +END;
  208 +$$;
@@ -57,11 +57,15 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -57,11 +57,15 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
57 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()"; 57 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
58 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)"; 58 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)";
59 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest(IN path_to_file varchar)"; 59 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest(IN path_to_file varchar)";
  60 + private static final String INSERT_INTO_TS_KV_CURSOR = "insert_into_ts_kv_cursor()";
  61 + private static final String INSERT_INTO_TS_KV_LATEST_CURSOR = "insert_into_ts_kv_latest_cursor()";
60 62
61 private static final String CALL_CREATE_PARTITION_TS_KV_TABLE = CALL_REGEX + CREATE_PARTITION_TS_KV_TABLE; 63 private static final String CALL_CREATE_PARTITION_TS_KV_TABLE = CALL_REGEX + CREATE_PARTITION_TS_KV_TABLE;
62 private static final String CALL_CREATE_NEW_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_LATEST_TABLE; 64 private static final String CALL_CREATE_NEW_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_NEW_TS_KV_LATEST_TABLE;
63 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE; 65 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE;
64 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY; 66 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY;
  67 + private static final String CALL_INSERT_INTO_TS_KV_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_CURSOR;
  68 + private static final String CALL_INSERT_INTO_TS_KV_LATEST_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_LATEST_CURSOR;
65 69
66 private static final String DROP_TABLE_TS_KV_OLD = DROP_TABLE + TS_KV_OLD; 70 private static final String DROP_TABLE_TS_KV_OLD = DROP_TABLE + TS_KV_OLD;
67 private static final String DROP_TABLE_TS_KV_LATEST_OLD = DROP_TABLE + TS_KV_LATEST_OLD; 71 private static final String DROP_TABLE_TS_KV_LATEST_OLD = DROP_TABLE + TS_KV_LATEST_OLD;
@@ -73,6 +77,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -73,6 +77,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
73 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY; 77 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
74 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV; 78 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
75 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; 79 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
  80 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_CURSOR;
  81 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST_CURSOR;
76 private static final String DROP_FUNCTION_GET_PARTITION_DATA = "DROP FUNCTION IF EXISTS get_partitions_data;"; 82 private static final String DROP_FUNCTION_GET_PARTITION_DATA = "DROP FUNCTION IF EXISTS get_partitions_data;";
77 83
78 @Override 84 @Override
@@ -118,39 +124,44 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -118,39 +124,44 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
118 Path tsKvLatestFile = Files.createTempFile(pathToDir, "ts_kv_latest", ".sql"); 124 Path tsKvLatestFile = Files.createTempFile(pathToDir, "ts_kv_latest", ".sql");
119 pathToTempTsKvFile = tsKvFile.toAbsolutePath(); 125 pathToTempTsKvFile = tsKvFile.toAbsolutePath();
120 pathToTempTsKvLatestFile = tsKvLatestFile.toAbsolutePath(); 126 pathToTempTsKvLatestFile = tsKvLatestFile.toAbsolutePath();
121 - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");  
122 - executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);  
123 - executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "');"); 127 + try {
  128 + copyTimeseries(conn, pathToTempTsKvFile, pathToTempTsKvLatestFile);
  129 + } catch (Exception e) {
  130 + log.info("Upgrade script failed using the copy to/from files strategy!" +
  131 + " Trying to perfrom the upgrade using Inserts strategy ...");
  132 + insertTimeseries(conn);
  133 + }
124 } catch (IOException | SecurityException e) { 134 } catch (IOException | SecurityException e) {
125 throw new RuntimeException("Failed to create time-series upgrade files due to: " + e); 135 throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
126 } 136 }
127 } else { 137 } else {
128 - Path tempDirPath = Files.createTempDirectory("ts_kv");  
129 - File tempDirAsFile = tempDirPath.toFile();  
130 - boolean writable = tempDirAsFile.setWritable(true, false);  
131 - boolean readable = tempDirAsFile.setReadable(true, false);  
132 - boolean executable = tempDirAsFile.setExecutable(true, false);  
133 - if (writable && readable && executable) { 138 + try {
  139 + Path tempDirPath = Files.createTempDirectory("ts_kv");
  140 + File tempDirAsFile = tempDirPath.toFile();
  141 + boolean writable = tempDirAsFile.setWritable(true, false);
  142 + boolean readable = tempDirAsFile.setReadable(true, false);
  143 + boolean executable = tempDirAsFile.setExecutable(true, false);
134 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath(); 144 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath();
135 pathToTempTsKvLatestFile = tempDirPath.resolve(TS_KV_LATEST_SQL).toAbsolutePath(); 145 pathToTempTsKvLatestFile = tempDirPath.resolve(TS_KV_LATEST_SQL).toAbsolutePath();
136 - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");  
137 - executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);  
138 - executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "');");  
139 - } else {  
140 - throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");  
141 - }  
142 - }  
143 - if (pathToTempTsKvFile.toFile().exists() && pathToTempTsKvLatestFile.toFile().exists()) {  
144 - boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();  
145 - if (deleteTsKvFile) {  
146 - log.info("Successfully deleted the temp file for ts_kv table upgrade!");  
147 - }  
148 - boolean deleteTsKvLatestFile = pathToTempTsKvLatestFile.toFile().delete();  
149 - if (deleteTsKvLatestFile) {  
150 - log.info("Successfully deleted the temp file for ts_kv_latest table upgrade!"); 146 + try {
  147 + if (writable && readable && executable) {
  148 + copyTimeseries(conn, pathToTempTsKvFile, pathToTempTsKvLatestFile);
  149 + } else {
  150 + throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
  151 + }
  152 + } catch (Exception e) {
  153 + log.info(e.getMessage());
  154 + log.info("Upgrade script failed using the copy to/from files strategy!" +
  155 + " Trying to perfrom the upgrade using Inserts strategy ...");
  156 + insertTimeseries(conn);
  157 + }
  158 + } catch (IOException | SecurityException e) {
  159 + throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
151 } 160 }
152 } 161 }
153 162
  163 + removeUpgradeFiles(pathToTempTsKvFile, pathToTempTsKvLatestFile);
  164 +
154 executeQuery(conn, DROP_TABLE_TS_KV_OLD); 165 executeQuery(conn, DROP_TABLE_TS_KV_OLD);
155 executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD); 166 executeQuery(conn, DROP_TABLE_TS_KV_LATEST_OLD);
156 167
@@ -161,6 +172,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -161,6 +172,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
161 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); 172 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
162 executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); 173 executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE);
163 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); 174 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
  175 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR);
  176 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST_CURSOR);
164 executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA); 177 executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA);
165 178
166 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); 179 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
@@ -186,6 +199,31 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -186,6 +199,31 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
186 } 199 }
187 } 200 }
188 201
  202 + private void removeUpgradeFiles(Path pathToTempTsKvFile, Path pathToTempTsKvLatestFile) {
  203 + if (pathToTempTsKvFile.toFile().exists() && pathToTempTsKvLatestFile.toFile().exists()) {
  204 + boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
  205 + if (deleteTsKvFile) {
  206 + log.info("Successfully deleted the temp file for ts_kv table upgrade!");
  207 + }
  208 + boolean deleteTsKvLatestFile = pathToTempTsKvLatestFile.toFile().delete();
  209 + if (deleteTsKvLatestFile) {
  210 + log.info("Successfully deleted the temp file for ts_kv_latest table upgrade!");
  211 + }
  212 + }
  213 + }
  214 +
  215 + private void copyTimeseries(Connection conn, Path pathToTempTsKvFile, Path pathToTempTsKvLatestFile) {
  216 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  217 + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
  218 + executeQuery(conn, "call insert_into_ts_kv_latest('" + pathToTempTsKvLatestFile + "')");
  219 + }
  220 +
  221 + private void insertTimeseries(Connection conn) {
  222 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  223 + executeQuery(conn, CALL_CREATE_NEW_TS_KV_LATEST_TABLE);
  224 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST_CURSOR);
  225 + }
  226 +
189 @Override 227 @Override
190 protected void loadSql(Connection conn, String fileName) { 228 protected void loadSql(Connection conn, String fileName) {
191 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); 229 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);
@@ -53,6 +53,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -53,6 +53,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
53 private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()"; 53 private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()";
54 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()"; 54 private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()";
55 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)"; 55 private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv(IN path_to_file varchar)";
  56 + private static final String INSERT_INTO_TS_KV_CURSOR = "insert_into_ts_kv_cursor()";
56 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest()"; 57 private static final String INSERT_INTO_TS_KV_LATEST = "insert_into_ts_kv_latest()";
57 58
58 private static final String CALL_CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_TS_KV_LATEST_TABLE; 59 private static final String CALL_CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + CREATE_TS_KV_LATEST_TABLE;
@@ -60,6 +61,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -60,6 +61,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
60 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE; 61 private static final String CALL_CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + CREATE_TS_KV_DICTIONARY_TABLE;
61 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY; 62 private static final String CALL_INSERT_INTO_DICTIONARY = CALL_REGEX + INSERT_INTO_DICTIONARY;
62 private static final String CALL_INSERT_INTO_TS_KV_LATEST = CALL_REGEX + INSERT_INTO_TS_KV_LATEST; 63 private static final String CALL_INSERT_INTO_TS_KV_LATEST = CALL_REGEX + INSERT_INTO_TS_KV_LATEST;
  64 + private static final String CALL_INSERT_INTO_TS_KV_CURSOR = CALL_REGEX + INSERT_INTO_TS_KV_CURSOR;
63 65
64 private static final String DROP_OLD_TENANT_TS_KV_TABLE = DROP_TABLE + TENANT_TS_KV_OLD_TABLE; 66 private static final String DROP_OLD_TENANT_TS_KV_TABLE = DROP_TABLE + TENANT_TS_KV_OLD_TABLE;
65 67
@@ -68,6 +70,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -68,6 +70,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
68 private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE; 70 private static final String DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE = DROP_PROCEDURE_IF_EXISTS + CREATE_TS_KV_DICTIONARY_TABLE;
69 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY; 71 private static final String DROP_PROCEDURE_INSERT_INTO_DICTIONARY = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_DICTIONARY;
70 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV; 72 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV;
  73 + private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_CURSOR;
71 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST; 74 private static final String DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST = DROP_PROCEDURE_IF_EXISTS + INSERT_INTO_TS_KV_LATEST;
72 75
73 @Autowired 76 @Autowired
@@ -112,31 +115,41 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -112,31 +115,41 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
112 try { 115 try {
113 Path tsKvFile = Files.createTempFile(pathToDir, "ts_kv", ".sql"); 116 Path tsKvFile = Files.createTempFile(pathToDir, "ts_kv", ".sql");
114 pathToTempTsKvFile = tsKvFile.toAbsolutePath(); 117 pathToTempTsKvFile = tsKvFile.toAbsolutePath();
115 - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");  
116 - pathToTempTsKvFile.toFile().deleteOnExit(); 118 + try {
  119 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  120 + } catch (Exception e) {
  121 + log.info("Upgrade script failed using the copy to/from files strategy!" +
  122 + " Trying to perfrom the upgrade using Inserts strategy ...");
  123 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  124 + }
117 } catch (IOException | SecurityException e) { 125 } catch (IOException | SecurityException e) {
118 throw new RuntimeException("Failed to create time-series upgrade files due to: " + e); 126 throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
119 } 127 }
120 } else { 128 } else {
121 - Path tempDirPath = Files.createTempDirectory("ts_kv");  
122 - File tempDirAsFile = tempDirPath.toFile();  
123 - boolean writable = tempDirAsFile.setWritable(true, false);  
124 - boolean readable = tempDirAsFile.setReadable(true, false);  
125 - boolean executable = tempDirAsFile.setExecutable(true, false);  
126 - if (writable && readable && executable) { 129 + try {
  130 + Path tempDirPath = Files.createTempDirectory("ts_kv");
  131 + File tempDirAsFile = tempDirPath.toFile();
  132 + boolean writable = tempDirAsFile.setWritable(true, false);
  133 + boolean readable = tempDirAsFile.setReadable(true, false);
  134 + boolean executable = tempDirAsFile.setExecutable(true, false);
127 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath(); 135 pathToTempTsKvFile = tempDirPath.resolve(TS_KV_SQL).toAbsolutePath();
128 - executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");  
129 - } else {  
130 - throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");  
131 - }  
132 - }  
133 -  
134 - if (pathToTempTsKvFile.toFile().exists()) {  
135 - boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();  
136 - if (deleteTsKvFile) {  
137 - log.info("Successfully deleted the temp file for ts_kv table upgrade!"); 136 + try {
  137 + if (writable && readable && executable) {
  138 + executeQuery(conn, "call insert_into_ts_kv('" + pathToTempTsKvFile + "')");
  139 + } else {
  140 + throw new RuntimeException("Failed to grant write permissions for the: " + tempDirPath + "folder!");
  141 + }
  142 + } catch (Exception e) {
  143 + log.info(e.getMessage());
  144 + log.info("Upgrade script failed using the copy to/from files strategy!" +
  145 + " Trying to perfrom the upgrade using Inserts strategy ...");
  146 + executeQuery(conn, CALL_INSERT_INTO_TS_KV_CURSOR);
  147 + }
  148 + } catch (IOException | SecurityException e) {
  149 + throw new RuntimeException("Failed to create time-series upgrade files due to: " + e);
138 } 150 }
139 } 151 }
  152 + removeUpgradeFile(pathToTempTsKvFile);
140 153
141 executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST); 154 executeQuery(conn, CALL_INSERT_INTO_TS_KV_LATEST);
142 155
@@ -147,6 +160,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -147,6 +160,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
147 executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE); 160 executeQuery(conn, DROP_PROCEDURE_CREATE_TS_KV_DICTIONARY_TABLE);
148 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY); 161 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_DICTIONARY);
149 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); 162 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV);
  163 + executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_CURSOR);
150 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); 164 executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST);
151 165
152 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); 166 executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;");
@@ -166,6 +180,15 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -166,6 +180,15 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
166 } 180 }
167 } 181 }
168 182
  183 + private void removeUpgradeFile(Path pathToTempTsKvFile) {
  184 + if (pathToTempTsKvFile.toFile().exists()) {
  185 + boolean deleteTsKvFile = pathToTempTsKvFile.toFile().delete();
  186 + if (deleteTsKvFile) {
  187 + log.info("Successfully deleted the temp file for ts_kv table upgrade!");
  188 + }
  189 + }
  190 + }
  191 +
169 @Override 192 @Override
170 protected void loadSql(Connection conn, String fileName) { 193 protected void loadSql(Connection conn, String fileName) {
171 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); 194 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName);