schema_update_timescale_ts.sql
7.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
--
-- Copyright © 2016-2020 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-- call create_new_ts_kv_table();
CREATE OR REPLACE PROCEDURE create_new_ts_kv_table() LANGUAGE plpgsql AS $$
BEGIN
ALTER TABLE tenant_ts_kv
RENAME TO tenant_ts_kv_old;
CREATE TABLE IF NOT EXISTS ts_kv
(
LIKE tenant_ts_kv_old
);
ALTER TABLE ts_kv ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
ALTER TABLE ts_kv ALTER COLUMN key TYPE integer USING key::integer;
ALTER INDEX ts_kv_pkey RENAME TO tenant_ts_kv_pkey_old;
ALTER INDEX idx_tenant_ts_kv RENAME TO idx_tenant_ts_kv_old;
ALTER INDEX tenant_ts_kv_ts_idx RENAME TO tenant_ts_kv_ts_idx_old;
ALTER TABLE ts_kv ADD CONSTRAINT ts_kv_pkey PRIMARY KEY(entity_id, key, ts);
-- CREATE INDEX IF NOT EXISTS ts_kv_ts_idx ON ts_kv(ts DESC);
ALTER TABLE ts_kv DROP COLUMN IF EXISTS tenant_id;
END;
$$;
-- call create_ts_kv_latest_table();
CREATE OR REPLACE PROCEDURE create_ts_kv_latest_table() LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS ts_kv_latest
(
entity_id uuid NOT NULL,
key int NOT NULL,
ts bigint NOT NULL,
bool_v boolean,
str_v varchar(10000000),
long_v bigint,
dbl_v double precision,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);
END;
$$;
-- call create_ts_kv_dictionary_table();
CREATE OR REPLACE PROCEDURE create_ts_kv_dictionary_table() LANGUAGE plpgsql AS $$
BEGIN
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
(
key varchar(255) NOT NULL,
key_id serial UNIQUE,
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
);
END;
$$;
-- call insert_into_dictionary();
CREATE OR REPLACE PROCEDURE insert_into_dictionary() LANGUAGE plpgsql AS $$
DECLARE
insert_record RECORD;
key_cursor CURSOR FOR SELECT DISTINCT key
FROM tenant_ts_kv_old
ORDER BY key;
BEGIN
OPEN key_cursor;
LOOP
FETCH key_cursor INTO insert_record;
EXIT WHEN NOT FOUND;
IF NOT EXISTS(SELECT key FROM ts_kv_dictionary WHERE key = insert_record.key) THEN
INSERT INTO ts_kv_dictionary(key) VALUES (insert_record.key);
RAISE NOTICE 'Key: % has been inserted into the dictionary!',insert_record.key;
ELSE
RAISE NOTICE 'Key: % already exists in the dictionary!',insert_record.key;
END IF;
END LOOP;
CLOSE key_cursor;
END;
$$;
-- call insert_into_ts_kv();
CREATE OR REPLACE PROCEDURE insert_into_ts_kv() LANGUAGE plpgsql AS $$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
insert_record RECORD;
insert_cursor CURSOR FOR SELECT CONCAT(entity_id_uuid_first_part, '-', entity_id_uuid_second_part, '-1', entity_id_uuid_third_part, '-', entity_id_uuid_fourth_part, '-', entity_id_uuid_fifth_part)::uuid AS entity_id,
new_ts_kv_records.key AS key,
new_ts_kv_records.ts AS ts,
new_ts_kv_records.bool_v AS bool_v,
new_ts_kv_records.str_v AS str_v,
new_ts_kv_records.long_v AS long_v,
new_ts_kv_records.dbl_v AS dbl_v
FROM (SELECT SUBSTRING(entity_id, 8, 8) AS entity_id_uuid_first_part,
SUBSTRING(entity_id, 4, 4) AS entity_id_uuid_second_part,
SUBSTRING(entity_id, 1, 3) AS entity_id_uuid_third_part,
SUBSTRING(entity_id, 16, 4) AS entity_id_uuid_fourth_part,
SUBSTRING(entity_id, 20) AS entity_id_uuid_fifth_part,
key_id AS key,
ts,
bool_v,
str_v,
long_v,
dbl_v
FROM tenant_ts_kv_old
INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS new_ts_kv_records;
BEGIN
OPEN insert_cursor;
LOOP
insert_counter := insert_counter + 1;
FETCH insert_cursor INTO insert_record;
IF NOT FOUND THEN
RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter - 1;
EXIT;
END IF;
INSERT INTO ts_kv(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
insert_record.long_v, insert_record.dbl_v);
IF MOD(insert_counter, insert_size) = 0 THEN
RAISE NOTICE '% records have been inserted into the new ts_kv table!',insert_counter;
END IF;
END LOOP;
CLOSE insert_cursor;
END;
$$;
-- call insert_into_ts_kv_latest();
CREATE OR REPLACE PROCEDURE insert_into_ts_kv_latest() LANGUAGE plpgsql AS $$
DECLARE
insert_size CONSTANT integer := 10000;
insert_counter integer DEFAULT 0;
latest_record RECORD;
insert_record RECORD;
insert_cursor CURSOR FOR SELECT
latest_records.key AS key,
latest_records.entity_id AS entity_id,
latest_records.ts AS ts
FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM ts_kv GROUP BY key, entity_id) AS latest_records;
BEGIN
OPEN insert_cursor;
LOOP
insert_counter := insert_counter + 1;
FETCH insert_cursor INTO latest_record;
IF NOT FOUND THEN
RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter - 1;
EXIT;
END IF;
SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts;
INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, insert_record.long_v, insert_record.dbl_v);
IF MOD(insert_counter, insert_size) = 0 THEN
RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter;
END IF;
END LOOP;
CLOSE insert_cursor;
END;
$$;