Commit 3955600a9ca170f1dab3c0a575a05587d685bad7

Authored by ShvaykaD
Committed by Andrew Shvayka
1 parent b3abfd38

bug fixes & improvements / sql-timeseries (#2382)

* fixed the partion date extracting

* fix imports

* ts-keys dictionary for latest, hsqldb

* removed AbstractSimpleSqlTimeseriesDao class & fix beanCreationException in ThingsboardInstallService

* timescale-db upgrade added

* added postgreSQL upgrade

* fix logging

* refactoring timeseries-dao implementation
Showing 34 changed files with 1037 additions and 611 deletions
... ... @@ -14,7 +14,7 @@
14 14 -- limitations under the License.
15 15 --
16 16
17   --- load function check_version()
  17 +-- select check_version();
18 18
19 19 CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$
20 20 DECLARE
... ... @@ -38,9 +38,9 @@ BEGIN
38 38 END;
39 39 $$ LANGUAGE 'plpgsql';
40 40
41   --- load function create_partition_table()
  41 +-- select create_partition_ts_kv_table();
42 42
43   -CREATE OR REPLACE FUNCTION create_partition_table() RETURNS VOID AS $$
  43 +CREATE OR REPLACE FUNCTION create_partition_ts_kv_table() RETURNS VOID AS $$
44 44
45 45 BEGIN
46 46 ALTER TABLE ts_kv
... ... @@ -59,8 +59,32 @@ BEGIN
59 59 END;
60 60 $$ LANGUAGE 'plpgsql';
61 61
  62 +-- select create_new_ts_kv_latest_table();
62 63
63   --- load function create_partitions()
  64 +CREATE OR REPLACE FUNCTION create_new_ts_kv_latest_table() RETURNS VOID AS $$
  65 +
  66 +BEGIN
  67 + ALTER TABLE ts_kv_latest
  68 + RENAME TO ts_kv_latest_old;
  69 + ALTER TABLE ts_kv_latest_old
  70 + RENAME CONSTRAINT ts_kv_latest_pkey TO ts_kv_latest_pkey_old;
  71 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  72 + (
  73 + LIKE ts_kv_latest_old
  74 + );
  75 + ALTER TABLE ts_kv_latest
  76 + DROP COLUMN entity_type;
  77 + ALTER TABLE ts_kv_latest
  78 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  79 + ALTER TABLE ts_kv_latest
  80 + ALTER COLUMN key TYPE integer USING key::integer;
  81 + ALTER TABLE ts_kv_latest
  82 + ADD CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key);
  83 +END;
  84 +$$ LANGUAGE 'plpgsql';
  85 +
  86 +
  87 +-- select create_partitions();
64 88
65 89 CREATE OR REPLACE FUNCTION create_partitions() RETURNS VOID AS
66 90 $$
... ... @@ -89,7 +113,7 @@ BEGIN
89 113 END;
90 114 $$ language 'plpgsql';
91 115
92   --- load function create_ts_kv_dictionary_table()
  116 +-- select create_ts_kv_dictionary_table();
93 117
94 118 CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$
95 119
... ... @@ -103,7 +127,7 @@ BEGIN
103 127 END;
104 128 $$ LANGUAGE 'plpgsql';
105 129
106   --- load function insert_into_dictionary()
  130 +-- select insert_into_dictionary();
107 131
108 132 CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS
109 133 $$
... ... @@ -128,7 +152,7 @@ BEGIN
128 152 END;
129 153 $$ language 'plpgsql';
130 154
131   --- load function insert_into_ts_kv()
  155 +-- select insert_into_ts_kv();
132 156
133 157 CREATE OR REPLACE FUNCTION insert_into_ts_kv() RETURNS void AS
134 158 $$
... ... @@ -176,4 +200,52 @@ BEGIN
176 200 END;
177 201 $$ LANGUAGE 'plpgsql';
178 202
  203 +-- select insert_into_ts_kv_latest();
  204 +
  205 +CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS
  206 +$$
  207 +DECLARE
  208 + insert_size CONSTANT integer := 10000;
  209 + insert_counter integer DEFAULT 0;
  210 + insert_record RECORD;
  211 + insert_cursor CURSOR FOR SELECT CONCAT(first, '-', second, '-1', third, '-', fourth, '-', fifth)::uuid AS entity_id,
  212 + substrings.key AS key,
  213 + substrings.ts AS ts,
  214 + substrings.bool_v AS bool_v,
  215 + substrings.str_v AS str_v,
  216 + substrings.long_v AS long_v,
  217 + substrings.dbl_v AS dbl_v
  218 + FROM (SELECT SUBSTRING(entity_id, 8, 8) AS first,
  219 + SUBSTRING(entity_id, 4, 4) AS second,
  220 + SUBSTRING(entity_id, 1, 3) AS third,
  221 + SUBSTRING(entity_id, 16, 4) AS fourth,
  222 + SUBSTRING(entity_id, 20) AS fifth,
  223 + key_id AS key,
  224 + ts,
  225 + bool_v,
  226 + str_v,
  227 + long_v,
  228 + dbl_v
  229 + FROM ts_kv_latest_old
  230 + INNER JOIN ts_kv_dictionary ON (ts_kv_latest_old.key = ts_kv_dictionary.key)) AS substrings;
  231 +BEGIN
  232 + OPEN insert_cursor;
  233 + LOOP
  234 + insert_counter := insert_counter + 1;
  235 + FETCH insert_cursor INTO insert_record;
  236 + IF NOT FOUND THEN
  237 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter - 1;
  238 + EXIT;
  239 + END IF;
  240 + INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  241 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  242 + insert_record.long_v, insert_record.dbl_v);
  243 + IF MOD(insert_counter, insert_size) = 0 THEN
  244 + RAISE NOTICE '% records have been inserted into the ts_kv_latest!',insert_counter;
  245 + END IF;
  246 + END LOOP;
  247 + CLOSE insert_cursor;
  248 +END;
  249 +$$ LANGUAGE 'plpgsql';
  250 +
179 251
... ...
  1 +--
  2 +-- Copyright © 2016-2020 The Thingsboard Authors
  3 +--
  4 +-- Licensed under the Apache License, Version 2.0 (the "License");
  5 +-- you may not use this file except in compliance with the License.
  6 +-- You may obtain a copy of the License at
  7 +--
  8 +-- http://www.apache.org/licenses/LICENSE-2.0
  9 +--
  10 +-- Unless required by applicable law or agreed to in writing, software
  11 +-- distributed under the License is distributed on an "AS IS" BASIS,
  12 +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +-- See the License for the specific language governing permissions and
  14 +-- limitations under the License.
  15 +--
  16 +
  17 +-- select check_version();
  18 +
  19 +CREATE OR REPLACE FUNCTION check_version() RETURNS boolean AS $$
  20 +DECLARE
  21 + current_version integer;
  22 + valid_version boolean;
  23 +BEGIN
  24 + RAISE NOTICE 'Check the current installed PostgreSQL version...';
  25 + SELECT current_setting('server_version_num') INTO current_version;
  26 + IF current_version < 90600 THEN
  27 + valid_version := FALSE;
  28 + ELSE
  29 + valid_version := TRUE;
  30 + END IF;
  31 + IF valid_version = FALSE THEN
  32 + RAISE NOTICE 'Postgres version should be at least more than 9.6!';
  33 + ELSE
  34 + RAISE NOTICE 'PostgreSQL version is valid!';
  35 + RAISE NOTICE 'Schema update started...';
  36 + END IF;
  37 + RETURN valid_version;
  38 +END;
  39 +$$ LANGUAGE 'plpgsql';
  40 +
  41 +-- select create_tenant_ts_kv_table_copy();
  42 +
  43 +CREATE OR REPLACE FUNCTION create_tenant_ts_kv_table_copy() RETURNS VOID AS $$
  44 +
  45 +BEGIN
  46 + ALTER TABLE tenant_ts_kv
  47 + RENAME TO tenant_ts_kv_old;
  48 + CREATE TABLE IF NOT EXISTS tenant_ts_kv
  49 + (
  50 + LIKE tenant_ts_kv_old
  51 + );
  52 + ALTER TABLE tenant_ts_kv
  53 + ALTER COLUMN tenant_id TYPE uuid USING tenant_id::uuid;
  54 + ALTER TABLE tenant_ts_kv
  55 + ALTER COLUMN entity_id TYPE uuid USING entity_id::uuid;
  56 + ALTER TABLE tenant_ts_kv
  57 + ALTER COLUMN key TYPE integer USING key::integer;
  58 + ALTER TABLE tenant_ts_kv
  59 + ADD CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY(tenant_id, entity_id, key, ts);
  60 + ALTER INDEX idx_tenant_ts_kv RENAME TO idx_tenant_ts_kv_old;
  61 + ALTER INDEX tenant_ts_kv_ts_idx RENAME TO tenant_ts_kv_ts_idx_old;
  62 + PERFORM create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => 86400000, if_not_exists => true);
  63 + CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts);
  64 +END;
  65 +$$ LANGUAGE 'plpgsql';
  66 +
  67 +
  68 +-- select create_ts_kv_latest_table();
  69 +
  70 +CREATE OR REPLACE FUNCTION create_ts_kv_latest_table() RETURNS VOID AS $$
  71 +
  72 +BEGIN
  73 + CREATE TABLE IF NOT EXISTS ts_kv_latest
  74 + (
  75 + entity_id uuid NOT NULL,
  76 + key int NOT NULL,
  77 + ts bigint NOT NULL,
  78 + bool_v boolean,
  79 + str_v varchar(10000000),
  80 + long_v bigint,
  81 + dbl_v double precision,
  82 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
  83 + );
  84 +END;
  85 +$$ LANGUAGE 'plpgsql';
  86 +
  87 +
  88 +-- select create_ts_kv_dictionary_table();
  89 +
  90 +CREATE OR REPLACE FUNCTION create_ts_kv_dictionary_table() RETURNS VOID AS $$
  91 +
  92 +BEGIN
  93 + CREATE TABLE IF NOT EXISTS ts_kv_dictionary
  94 + (
  95 + key varchar(255) NOT NULL,
  96 + key_id serial UNIQUE,
  97 + CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
  98 + );
  99 +END;
  100 +$$ LANGUAGE 'plpgsql';
  101 +
  102 +-- select insert_into_dictionary();
  103 +
  104 +CREATE OR REPLACE FUNCTION insert_into_dictionary() RETURNS VOID AS
  105 +$$
  106 +DECLARE
  107 + insert_record RECORD;
  108 + key_cursor CURSOR FOR SELECT DISTINCT key
  109 + FROM tenant_ts_kv_old
  110 + ORDER BY key;
  111 +BEGIN
  112 + OPEN key_cursor;
  113 + LOOP
  114 + FETCH key_cursor INTO insert_record;
  115 + EXIT WHEN NOT FOUND;
  116 + IF NOT EXISTS(SELECT key FROM ts_kv_dictionary WHERE key = insert_record.key) THEN
  117 + INSERT INTO ts_kv_dictionary(key) VALUES (insert_record.key);
  118 + RAISE NOTICE 'Key: % has been inserted into the dictionary!',insert_record.key;
  119 + ELSE
  120 + RAISE NOTICE 'Key: % already exists in the dictionary!',insert_record.key;
  121 + END IF;
  122 + END LOOP;
  123 + CLOSE key_cursor;
  124 +END;
  125 +$$ language 'plpgsql';
  126 +
  127 +-- select insert_into_tenant_ts_kv();
  128 +
  129 +CREATE OR REPLACE FUNCTION insert_into_tenant_ts_kv() RETURNS void AS
  130 +$$
  131 +DECLARE
  132 + insert_size CONSTANT integer := 10000;
  133 + insert_counter integer DEFAULT 0;
  134 + insert_record RECORD;
  135 + insert_cursor CURSOR FOR SELECT CONCAT(tenant_id_first, '-', tenant_id_second, '-1', tenant_id_third, '-', tenant_id_fourth, '-', tenant_id_fifth)::uuid AS tenant_id,
  136 + CONCAT(entity_id_first, '-', entity_id_second, '-1', entity_id_third, '-', entity_id_fourth, '-', entity_id_fifth)::uuid AS entity_id,
  137 + substrings.key AS key,
  138 + substrings.ts AS ts,
  139 + substrings.bool_v AS bool_v,
  140 + substrings.str_v AS str_v,
  141 + substrings.long_v AS long_v,
  142 + substrings.dbl_v AS dbl_v
  143 + FROM (SELECT SUBSTRING(tenant_id, 8, 8) AS tenant_id_first,
  144 + SUBSTRING(tenant_id, 4, 4) AS tenant_id_second,
  145 + SUBSTRING(tenant_id, 1, 3) AS tenant_id_third,
  146 + SUBSTRING(tenant_id, 16, 4) AS tenant_id_fourth,
  147 + SUBSTRING(tenant_id, 20) AS tenant_id_fifth,
  148 + SUBSTRING(entity_id, 8, 8) AS entity_id_first,
  149 + SUBSTRING(entity_id, 4, 4) AS entity_id_second,
  150 + SUBSTRING(entity_id, 1, 3) AS entity_id_third,
  151 + SUBSTRING(entity_id, 16, 4) AS entity_id_fourth,
  152 + SUBSTRING(entity_id, 20) AS entity_id_fifth,
  153 + key_id AS key,
  154 + ts,
  155 + bool_v,
  156 + str_v,
  157 + long_v,
  158 + dbl_v
  159 + FROM tenant_ts_kv_old
  160 + INNER JOIN ts_kv_dictionary ON (tenant_ts_kv_old.key = ts_kv_dictionary.key)) AS substrings;
  161 +BEGIN
  162 + OPEN insert_cursor;
  163 + LOOP
  164 + insert_counter := insert_counter + 1;
  165 + FETCH insert_cursor INTO insert_record;
  166 + IF NOT FOUND THEN
  167 + RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter - 1;
  168 + EXIT;
  169 + END IF;
  170 + INSERT INTO tenant_ts_kv(tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  171 + VALUES (insert_record.tenant_id, insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v,
  172 + insert_record.long_v, insert_record.dbl_v);
  173 + IF MOD(insert_counter, insert_size) = 0 THEN
  174 + RAISE NOTICE '% records have been inserted into the new tenant_ts_kv table!',insert_counter;
  175 + END IF;
  176 + END LOOP;
  177 + CLOSE insert_cursor;
  178 +END;
  179 +$$ LANGUAGE 'plpgsql';
  180 +
  181 +-- select insert_into_ts_kv_latest();
  182 +
  183 +CREATE OR REPLACE FUNCTION insert_into_ts_kv_latest() RETURNS void AS
  184 +$$
  185 +DECLARE
  186 + insert_size CONSTANT integer := 10000;
  187 + insert_counter integer DEFAULT 0;
  188 + latest_record RECORD;
  189 + insert_record RECORD;
  190 + insert_cursor CURSOR FOR SELECT
  191 + latest.key AS key,
  192 + latest.entity_id AS entity_id,
  193 + latest.ts AS ts
  194 + FROM (SELECT DISTINCT key AS key, entity_id AS entity_id, MAX(ts) AS ts FROM tenant_ts_kv GROUP BY key, entity_id) AS latest;
  195 +BEGIN
  196 + OPEN insert_cursor;
  197 + LOOP
  198 + insert_counter := insert_counter + 1;
  199 + FETCH insert_cursor INTO latest_record;
  200 + IF NOT FOUND THEN
  201 + RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter - 1;
  202 + EXIT;
  203 + END IF;
  204 + SELECT entity_id AS entity_id, key AS key, ts AS ts, bool_v AS bool_v, str_v AS str_v, long_v AS long_v, dbl_v AS dbl_v INTO insert_record FROM tenant_ts_kv WHERE entity_id = latest_record.entity_id AND key = latest_record.key AND ts = latest_record.ts;
  205 + INSERT INTO ts_kv_latest(entity_id, key, ts, bool_v, str_v, long_v, dbl_v)
  206 + VALUES (insert_record.entity_id, insert_record.key, insert_record.ts, insert_record.bool_v, insert_record.str_v, insert_record.long_v, insert_record.dbl_v);
  207 + IF MOD(insert_counter, insert_size) = 0 THEN
  208 + RAISE NOTICE '% records have been inserted into the ts_kv_latest table!',insert_counter;
  209 + END IF;
  210 + END LOOP;
  211 + CLOSE insert_cursor;
  212 +END;
  213 +$$ LANGUAGE 'plpgsql';
... ...
... ... @@ -53,7 +53,7 @@ public class ThingsboardInstallService {
53 53 @Autowired
54 54 private DatabaseEntitiesUpgradeService databaseEntitiesUpgradeService;
55 55
56   - @Autowired
  56 + @Autowired(required = false)
57 57 private DatabaseTsUpgradeService databaseTsUpgradeService;
58 58
59 59 @Autowired
... ...
... ... @@ -43,12 +43,15 @@ public class PsqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService {
43 43 private static final String CALL_REGEX = "call ";
44 44 private static final String LOAD_FUNCTIONS_SQL = "schema_update_psql_ts.sql";
45 45 private static final String CHECK_VERSION = CALL_REGEX + "check_version()";
46   - private static final String CREATE_PARTITION_TABLE = CALL_REGEX + "create_partition_table()";
  46 + private static final String CREATE_PARTITION_TS_KV_TABLE = CALL_REGEX + "create_partition_ts_kv_table()";
  47 + private static final String CREATE_NEW_TS_KV_LATEST_TABLE = CALL_REGEX + "create_new_ts_kv_latest_table()";
47 48 private static final String CREATE_PARTITIONS = CALL_REGEX + "create_partitions()";
48 49 private static final String CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + "create_ts_kv_dictionary_table()";
49 50 private static final String INSERT_INTO_DICTIONARY = CALL_REGEX + "insert_into_dictionary()";
50 51 private static final String INSERT_INTO_TS_KV = CALL_REGEX + "insert_into_ts_kv()";
51   - private static final String DROP_OLD_TABLE = "DROP TABLE ts_kv_old;";
  52 + private static final String INSERT_INTO_TS_KV_LATEST = CALL_REGEX + "insert_into_ts_kv_latest()";
  53 + private static final String DROP_TABLE_TS_KV_OLD = "DROP TABLE ts_kv_old;";
  54 + private static final String DROP_TABLE_TS_KV_LATEST_OLD = "DROP TABLE ts_kv_latest_old;";
52 55
53 56 @Value("${spring.datasource.url}")
54 57 private String dbUrl;
... ... @@ -70,7 +73,6 @@ public class PsqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService {
70 73 log.info("Updating timeseries schema ...");
71 74 log.info("Load upgrade functions ...");
72 75 loadSql(conn);
73   - log.info("Upgrade functions successfully loaded!");
74 76 boolean versionValid = checkVersion(conn);
75 77 if (!versionValid) {
76 78 log.info("PostgreSQL version should be at least more than 10!");
... ... @@ -78,12 +80,15 @@ public class PsqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService {
78 80 } else {
79 81 log.info("PostgreSQL version is valid!");
80 82 log.info("Updating schema ...");
81   - executeFunction(conn, CREATE_PARTITION_TABLE);
  83 + executeFunction(conn, CREATE_PARTITION_TS_KV_TABLE);
82 84 executeFunction(conn, CREATE_PARTITIONS);
83 85 executeFunction(conn, CREATE_TS_KV_DICTIONARY_TABLE);
84 86 executeFunction(conn, INSERT_INTO_DICTIONARY);
85 87 executeFunction(conn, INSERT_INTO_TS_KV);
86   - dropOldTable(conn, DROP_OLD_TABLE);
  88 + executeFunction(conn, CREATE_NEW_TS_KV_LATEST_TABLE);
  89 + executeFunction(conn, INSERT_INTO_TS_KV_LATEST);
  90 + dropOldTable(conn, DROP_TABLE_TS_KV_OLD);
  91 + dropOldTable(conn, DROP_TABLE_TS_KV_LATEST_OLD);
87 92 log.info("schema timeseries updated!");
88 93 }
89 94 }
... ... @@ -97,6 +102,7 @@ public class PsqlTsDatabaseUpgradeService implements DatabaseTsUpgradeService {
97 102 Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL);
98 103 try {
99 104 loadFunctions(schemaUpdateFile, conn);
  105 + log.info("Upgrade functions successfully loaded!");
100 106 } catch (Exception e) {
101 107 log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage());
102 108 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.install;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.context.annotation.Profile;
  22 +import org.springframework.stereotype.Service;
  23 +import org.thingsboard.server.dao.util.PsqlDao;
  24 +import org.thingsboard.server.dao.util.TimescaleDBTsDao;
  25 +
  26 +import java.nio.charset.StandardCharsets;
  27 +import java.nio.file.Files;
  28 +import java.nio.file.Path;
  29 +import java.nio.file.Paths;
  30 +import java.sql.CallableStatement;
  31 +import java.sql.Connection;
  32 +import java.sql.DriverManager;
  33 +import java.sql.SQLException;
  34 +import java.sql.Types;
  35 +
  36 +@Service
  37 +@Profile("install")
  38 +@Slf4j
  39 +@TimescaleDBTsDao
  40 +@PsqlDao
  41 +public class SqlTimescaleDatabaseUpgradeService implements DatabaseTsUpgradeService {
  42 +
  43 + private static final String CALL_REGEX = "call ";
  44 + private static final String LOAD_FUNCTIONS_SQL = "schema_update_timescale_ts.sql";
  45 + private static final String CHECK_VERSION = CALL_REGEX + "check_version()";
  46 + private static final String CREATE_TS_KV_LATEST_TABLE = CALL_REGEX + "create_ts_kv_latest_table()";
  47 + private static final String CREATE_TENANT_TS_KV_TABLE_COPY = CALL_REGEX + "create_tenant_ts_kv_table_copy()";
  48 + private static final String CREATE_TS_KV_DICTIONARY_TABLE = CALL_REGEX + "create_ts_kv_dictionary_table()";
  49 + private static final String INSERT_INTO_DICTIONARY = CALL_REGEX + "insert_into_dictionary()";
  50 + private static final String INSERT_INTO_TS_KV = CALL_REGEX + "insert_into_tenant_ts_kv()";
  51 + private static final String INSERT_INTO_TS_KV_LATEST = CALL_REGEX + "insert_into_ts_kv_latest()";
  52 + private static final String DROP_OLD_TS_KV_TABLE = "DROP TABLE tenant_ts_kv_old;";
  53 +
  54 + @Value("${spring.datasource.url}")
  55 + private String dbUrl;
  56 +
  57 + @Value("${spring.datasource.username}")
  58 + private String dbUserName;
  59 +
  60 + @Value("${spring.datasource.password}")
  61 + private String dbPassword;
  62 +
  63 + @Autowired
  64 + private InstallScripts installScripts;
  65 +
  66 + @Override
  67 + public void upgradeDatabase(String fromVersion) throws Exception {
  68 + switch (fromVersion) {
  69 + case "2.4.3":
  70 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  71 + log.info("Updating timescale schema ...");
  72 + log.info("Load upgrade functions ...");
  73 + loadSql(conn);
  74 + boolean versionValid = checkVersion(conn);
  75 + if (!versionValid) {
  76 + log.info("PostgreSQL version should be at least more than 9.6!");
  77 + log.info("Please upgrade your PostgreSQL and restart the script!");
  78 + } else {
  79 + log.info("PostgreSQL version is valid!");
  80 + log.info("Updating schema ...");
  81 + executeFunction(conn, CREATE_TS_KV_LATEST_TABLE);
  82 + executeFunction(conn, CREATE_TENANT_TS_KV_TABLE_COPY);
  83 + executeFunction(conn, CREATE_TS_KV_DICTIONARY_TABLE);
  84 + executeFunction(conn, INSERT_INTO_DICTIONARY);
  85 + executeFunction(conn, INSERT_INTO_TS_KV);
  86 + executeFunction(conn, INSERT_INTO_TS_KV_LATEST);
  87 + executeQuery(conn, DROP_OLD_TS_KV_TABLE);
  88 + log.info("schema timeseries updated!");
  89 + }
  90 + }
  91 + break;
  92 + default:
  93 + throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
  94 + }
  95 + }
  96 +
  97 + private void loadSql(Connection conn) {
  98 + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", LOAD_FUNCTIONS_SQL);
  99 + try {
  100 + loadFunctions(schemaUpdateFile, conn);
  101 + log.info("Upgrade functions successfully loaded!");
  102 + } catch (Exception e) {
  103 + log.info("Failed to load Timescale upgrade functions due to: {}", e.getMessage());
  104 + }
  105 + }
  106 +
  107 + private void loadFunctions(Path sqlFile, Connection conn) throws Exception {
  108 + String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8);
  109 + conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  110 + }
  111 +
  112 + private boolean checkVersion(Connection conn) {
  113 + log.info("Check the current PostgreSQL version...");
  114 + boolean versionValid = false;
  115 + try {
  116 + CallableStatement callableStatement = conn.prepareCall("{? = " + CHECK_VERSION + " }");
  117 + callableStatement.registerOutParameter(1, Types.BOOLEAN);
  118 + callableStatement.execute();
  119 + versionValid = callableStatement.getBoolean(1);
  120 + callableStatement.close();
  121 + } catch (Exception e) {
  122 + log.info("Failed to check current PostgreSQL version due to: {}", e.getMessage());
  123 + }
  124 + return versionValid;
  125 + }
  126 +
  127 + private void executeFunction(Connection conn, String query) {
  128 + log.info("{} ... ", query);
  129 + try {
  130 + CallableStatement callableStatement = conn.prepareCall("{" + query + "}");
  131 + callableStatement.execute();
  132 + callableStatement.close();
  133 + log.info("Successfully executed: {}", query.replace(CALL_REGEX, ""));
  134 + } catch (Exception e) {
  135 + log.info("Failed to execute {} due to: {}", query, e.getMessage());
  136 + }
  137 + }
  138 +
  139 + private void executeQuery(Connection conn, String query) {
  140 + try {
  141 + conn.createStatement().execute(query); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
  142 + Thread.sleep(5000);
  143 + } catch (InterruptedException | SQLException e) {
  144 + log.info("Failed to drop table {} due to: {}", query.replace("DROP TABLE ", ""), e.getMessage());
  145 + }
  146 + }
  147 +}
\ No newline at end of file
... ...
... ... @@ -208,10 +208,6 @@ sql:
208 208 batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
209 209 batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
210 210 stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
211   - ts_timescale:
212   - batch_size: "${SQL_TS_TIMESCALE_BATCH_SIZE:10000}"
213   - batch_max_delay: "${SQL_TS_TIMESCALE_BATCH_MAX_DELAY_MS:100}"
214   - stats_print_interval_ms: "${SQL_TS_TIMESCALE_BATCH_STATS_PRINT_MS:10000}"
215 211 # Specify whether to remove null characters from strValue of attributes and timeseries before insert
216 212 remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
217 213 # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +
  20 +@ConditionalOnExpression("'${database.ts.type}'=='sql' || '${database.ts.type}'=='timescale'")
  21 +public @interface SqlTsAnyDao {
  22 +}
... ...
common/dao-api/src/main/java/org/thingsboard/server/dao/util/TimescaleDBTsDao.java renamed from dao/src/main/java/org/thingsboard/server/dao/util/TimescaleDBTsDao.java
... ... @@ -27,8 +27,8 @@ import org.thingsboard.server.dao.util.SqlTsDao;
27 27 @Configuration
28 28 @EnableAutoConfiguration
29 29 @ComponentScan({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest"})
30   -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest"})
31   -@EntityScan({"org.thingsboard.server.dao.model.sqlts.hsql", "org.thingsboard.server.dao.model.sqlts.latest"})
  30 +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
  31 +@EntityScan({"org.thingsboard.server.dao.model.sqlts.hsql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
32 32 @EnableTransactionManagement
33 33 @SqlTsDao
34 34 @HsqlDao
... ...
... ... @@ -22,7 +22,6 @@ import org.springframework.context.annotation.Configuration;
22 22 import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
23 23 import org.springframework.transaction.annotation.EnableTransactionManagement;
24 24 import org.thingsboard.server.dao.util.SqlDao;
25   -import org.thingsboard.server.dao.util.TimescaleDBTsDao;
26 25
27 26 /**
28 27 * @author Valerii Sosliuk
... ...
... ... @@ -21,6 +21,7 @@ import org.springframework.context.annotation.ComponentScan;
21 21 import org.springframework.context.annotation.Configuration;
22 22 import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
23 23 import org.springframework.transaction.annotation.EnableTransactionManagement;
  24 +import org.thingsboard.server.dao.util.PsqlDao;
24 25 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
25 26
26 27 @Configuration
... ... @@ -30,6 +31,7 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
30 31 @EntityScan({"org.thingsboard.server.dao.model.sqlts.timescale", "org.thingsboard.server.dao.model.sqlts.dictionary", "org.thingsboard.server.dao.model.sqlts.latest"})
31 32 @EnableTransactionManagement
32 33 @TimescaleDBTsDao
  34 +@PsqlDao
33 35 public class TimescaleDaoConfig {
34 36
35 37 }
\ No newline at end of file
... ...
... ... @@ -16,20 +16,32 @@
16 16 package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.Data;
  19 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  20 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  21 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  22 +import org.thingsboard.server.common.data.kv.KvEntry;
  23 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  24 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  25 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  26 +import org.thingsboard.server.dao.model.ToData;
19 27
20 28 import javax.persistence.Column;
21 29 import javax.persistence.Id;
22 30 import javax.persistence.MappedSuperclass;
  31 +import javax.persistence.Transient;
  32 +
  33 +import java.util.UUID;
23 34
24 35 import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLUMN;
25 36 import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN;
  37 +import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
26 38 import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN;
27 39 import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN;
28 40 import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
29 41
30 42 @Data
31 43 @MappedSuperclass
32   -public abstract class AbstractTsKvEntity {
  44 +public abstract class AbstractTsKvEntity implements ToData<TsKvEntry> {
33 45
34 46 protected static final String SUM = "SUM";
35 47 protected static final String AVG = "AVG";
... ... @@ -37,6 +49,10 @@ public abstract class AbstractTsKvEntity {
37 49 protected static final String MAX = "MAX";
38 50
39 51 @Id
  52 + @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid")
  53 + protected UUID entityId;
  54 +
  55 + @Id
40 56 @Column(name = TS_COLUMN)
41 57 protected Long ts;
42 58
... ... @@ -52,6 +68,9 @@ public abstract class AbstractTsKvEntity {
52 68 @Column(name = DOUBLE_VALUE_COLUMN)
53 69 protected Double doubleValue;
54 70
  71 + @Transient
  72 + protected String strKey;
  73 +
55 74 public abstract boolean isNotEmpty();
56 75
57 76 protected static boolean isAllNull(Object... args) {
... ... @@ -62,4 +81,20 @@ public abstract class AbstractTsKvEntity {
62 81 }
63 82 return true;
64 83 }
  84 +
  85 + @Override
  86 + public TsKvEntry toData() {
  87 + KvEntry kvEntry = null;
  88 + if (strValue != null) {
  89 + kvEntry = new StringDataEntry(strKey, strValue);
  90 + } else if (longValue != null) {
  91 + kvEntry = new LongDataEntry(strKey, longValue);
  92 + } else if (doubleValue != null) {
  93 + kvEntry = new DoubleDataEntry(strKey, doubleValue);
  94 + } else if (booleanValue != null) {
  95 + kvEntry = new BooleanDataEntry(strKey, booleanValue);
  96 + }
  97 + return new BasicTsKvEntry(ts, kvEntry);
  98 + }
  99 +
65 100 }
\ No newline at end of file
... ...
... ... @@ -38,7 +38,7 @@ public final class TsKvDictionary {
38 38 @Column(name = KEY_COLUMN)
39 39 private String key;
40 40
41   - @Column(name = KEY_ID_COLUMN, unique = true, columnDefinition="serial")
  41 + @Column(name = KEY_ID_COLUMN, unique = true, columnDefinition="int")
42 42 @Generated(GenerationTime.INSERT)
43 43 private int keyId;
44 44
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.EntityType;
22 22
23 23 import javax.persistence.Transient;
24 24 import java.io.Serializable;
  25 +import java.util.UUID;
25 26
26 27 @Data
27 28 @AllArgsConstructor
... ... @@ -31,9 +32,8 @@ public class TsKvCompositeKey implements Serializable {
31 32 @Transient
32 33 private static final long serialVersionUID = -4089175869616037523L;
33 34
34   - private EntityType entityType;
35   - private String entityId;
36   - private String key;
  35 + private UUID entityId;
  36 + private int key;
37 37 private long ts;
38 38
39 39 }
\ No newline at end of file
... ...
... ... @@ -34,6 +34,9 @@ import javax.persistence.Enumerated;
34 34 import javax.persistence.Id;
35 35 import javax.persistence.IdClass;
36 36 import javax.persistence.Table;
  37 +import javax.persistence.Transient;
  38 +
  39 +import java.util.UUID;
37 40
38 41 import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
39 42 import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
... ... @@ -46,17 +49,8 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
46 49 public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
47 50
48 51 @Id
49   - @Enumerated(EnumType.STRING)
50   - @Column(name = ENTITY_TYPE_COLUMN)
51   - private EntityType entityType;
52   -
53   - @Id
54   - @Column(name = ENTITY_ID_COLUMN)
55   - private String entityId;
56   -
57   - @Id
58 52 @Column(name = KEY_COLUMN)
59   - private String key;
  53 + private int key;
60 54
61 55 public TsKvEntity() {
62 56 }
... ... @@ -120,19 +114,4 @@ public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvE
120 114 public boolean isNotEmpty() {
121 115 return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
122 116 }
123   -
124   - @Override
125   - public TsKvEntry toData() {
126   - KvEntry kvEntry = null;
127   - if (strValue != null) {
128   - kvEntry = new StringDataEntry(key, strValue);
129   - } else if (longValue != null) {
130   - kvEntry = new LongDataEntry(key, longValue);
131   - } else if (doubleValue != null) {
132   - kvEntry = new DoubleDataEntry(key, doubleValue);
133   - } else if (booleanValue != null) {
134   - kvEntry = new BooleanDataEntry(key, booleanValue);
135   - }
136   - return new BasicTsKvEntry(ts, kvEntry);
137   - }
138 117 }
\ No newline at end of file
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.EntityType;
22 22
23 23 import javax.persistence.Transient;
24 24 import java.io.Serializable;
  25 +import java.util.UUID;
25 26
26 27 @Data
27 28 @NoArgsConstructor
... ... @@ -31,7 +32,6 @@ public class TsKvLatestCompositeKey implements Serializable{
31 32 @Transient
32 33 private static final long serialVersionUID = -4089175869616037523L;
33 34
34   - private EntityType entityType;
35   - private String entityId;
36   - private String key;
  35 + private UUID entityId;
  36 + private int key;
37 37 }
\ No newline at end of file
... ...
... ... @@ -16,66 +16,78 @@
16 16 package org.thingsboard.server.dao.model.sqlts.latest;
17 17
18 18 import lombok.Data;
19   -import org.thingsboard.server.common.data.EntityType;
20   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
21   -import org.thingsboard.server.common.data.kv.BooleanDataEntry;
22   -import org.thingsboard.server.common.data.kv.DoubleDataEntry;
23   -import org.thingsboard.server.common.data.kv.KvEntry;
24   -import org.thingsboard.server.common.data.kv.LongDataEntry;
25   -import org.thingsboard.server.common.data.kv.StringDataEntry;
26   -import org.thingsboard.server.common.data.kv.TsKvEntry;
27   -import org.thingsboard.server.dao.model.ToData;
28 19 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
  20 +import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
29 21
30 22 import javax.persistence.Column;
  23 +import javax.persistence.ColumnResult;
  24 +import javax.persistence.ConstructorResult;
31 25 import javax.persistence.Entity;
32   -import javax.persistence.EnumType;
33   -import javax.persistence.Enumerated;
34 26 import javax.persistence.Id;
35 27 import javax.persistence.IdClass;
  28 +import javax.persistence.NamedNativeQueries;
  29 +import javax.persistence.NamedNativeQuery;
  30 +import javax.persistence.SqlResultSetMapping;
  31 +import javax.persistence.SqlResultSetMappings;
36 32 import javax.persistence.Table;
  33 +import java.util.UUID;
37 34
38   -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
39   -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
40 35 import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
41 36
42 37 @Data
43 38 @Entity
44 39 @Table(name = "ts_kv_latest")
45 40 @IdClass(TsKvLatestCompositeKey.class)
46   -public final class TsKvLatestEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
  41 +@SqlResultSetMappings({
  42 + @SqlResultSetMapping(
  43 + name = "tsKvLatestFindMapping",
  44 + classes = {
  45 + @ConstructorResult(
  46 + targetClass = TsKvLatestEntity.class,
  47 + columns = {
  48 + @ColumnResult(name = "entityId", type = UUID.class),
  49 + @ColumnResult(name = "key", type = Integer.class),
  50 + @ColumnResult(name = "strKey", type = String.class),
  51 + @ColumnResult(name = "strValue", type = String.class),
  52 + @ColumnResult(name = "boolValue", type = Boolean.class),
  53 + @ColumnResult(name = "longValue", type = Long.class),
  54 + @ColumnResult(name = "doubleValue", type = Double.class),
  55 + @ColumnResult(name = "ts", type = Long.class),
47 56
48   - @Id
49   - @Enumerated(EnumType.STRING)
50   - @Column(name = ENTITY_TYPE_COLUMN)
51   - private EntityType entityType;
52   -
53   - @Id
54   - @Column(name = ENTITY_ID_COLUMN)
55   - private String entityId;
  57 + }
  58 + ),
  59 + })
  60 +})
  61 +@NamedNativeQueries({
  62 + @NamedNativeQuery(
  63 + name = SearchTsKvLatestRepository.FIND_ALL_BY_ENTITY_ID,
  64 + query = SearchTsKvLatestRepository.FIND_ALL_BY_ENTITY_ID_QUERY,
  65 + resultSetMapping = "tsKvLatestFindMapping",
  66 + resultClass = TsKvLatestEntity.class
  67 + )
  68 +})
  69 +public final class TsKvLatestEntity extends AbstractTsKvEntity {
56 70
57 71 @Id
58 72 @Column(name = KEY_COLUMN)
59   - private String key;
  73 + private int key;
60 74
61 75 @Override
62 76 public boolean isNotEmpty() {
63 77 return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
64 78 }
65 79
66   - @Override
67   - public TsKvEntry toData() {
68   - KvEntry kvEntry = null;
69   - if (strValue != null) {
70   - kvEntry = new StringDataEntry(key, strValue);
71   - } else if (longValue != null) {
72   - kvEntry = new LongDataEntry(key, longValue);
73   - } else if (doubleValue != null) {
74   - kvEntry = new DoubleDataEntry(key, doubleValue);
75   - } else if (booleanValue != null) {
76   - kvEntry = new BooleanDataEntry(key, booleanValue);
77   - }
78   - return new BasicTsKvEntry(ts, kvEntry);
  80 + public TsKvLatestEntity() {
79 81 }
80 82
  83 + public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, Long ts) {
  84 + this.entityId = entityId;
  85 + this.key = key;
  86 + this.ts = ts;
  87 + this.longValue = longValue;
  88 + this.doubleValue = doubleValue;
  89 + this.strValue = strValue;
  90 + this.booleanValue = boolValue;
  91 + this.strKey = strKey;
  92 + }
81 93 }
... ...
... ... @@ -41,18 +41,11 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
41 41 @Entity
42 42 @Table(name = "ts_kv")
43 43 @IdClass(TsKvCompositeKey.class)
44   -public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {
45   -
46   - @Id
47   - @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid")
48   - protected UUID entityId;
  44 +public final class TsKvEntity extends AbstractTsKvEntity {
49 45
50 46 @Id
51 47 @Column(name = KEY_COLUMN)
52   - protected int key;
53   -
54   - @Transient
55   - protected String strKey;
  48 + private int key;
56 49
57 50 public TsKvEntity() {
58 51 }
... ... @@ -116,20 +109,4 @@ public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvE
116 109 public boolean isNotEmpty() {
117 110 return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
118 111 }
119   -
120   - @Override
121   - public TsKvEntry toData() {
122   - KvEntry kvEntry = null;
123   - if (strValue != null) {
124   - kvEntry = new StringDataEntry(strKey, strValue);
125   - } else if (longValue != null) {
126   - kvEntry = new LongDataEntry(strKey, longValue);
127   - } else if (doubleValue != null) {
128   - kvEntry = new DoubleDataEntry(strKey, doubleValue);
129   - } else if (booleanValue != null) {
130   - kvEntry = new BooleanDataEntry(strKey, booleanValue);
131   - }
132   - return new BasicTsKvEntry(ts, kvEntry);
133   - }
134   -
135 112 }
... ...
... ... @@ -130,16 +130,8 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD
130 130 private UUID tenantId;
131 131
132 132 @Id
133   - @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid")
134   - protected UUID entityId;
135   -
136   - @Id
137 133 @Column(name = KEY_COLUMN)
138   - protected int key;
139   -
140   - @Transient
141   - protected String strKey;
142   -
  134 + private int key;
143 135
144 136 public TimescaleTsKvEntity() {
145 137 }
... ... @@ -204,20 +196,4 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD
204 196 public boolean isNotEmpty() {
205 197 return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null);
206 198 }
207   -
208   - @Override
209   - public TsKvEntry toData() {
210   - KvEntry kvEntry = null;
211   - if (strValue != null) {
212   - kvEntry = new StringDataEntry(strKey, strValue);
213   - } else if (longValue != null) {
214   - kvEntry = new LongDataEntry(strKey, longValue);
215   - } else if (doubleValue != null) {
216   - kvEntry = new DoubleDataEntry(strKey, doubleValue);
217   - } else if (booleanValue != null) {
218   - kvEntry = new BooleanDataEntry(strKey, booleanValue);
219   - }
220   - return new BasicTsKvEntry(ts, kvEntry);
221   - }
222   -
223 199 }
\ No newline at end of file
... ...
dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractPsqlHsqlTimeseriesDao.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSimpleSqlTimeseriesDao.java
... ... @@ -15,16 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.dao.sqlts;
17 17
18   -import com.google.common.util.concurrent.Futures;
19 18 import com.google.common.util.concurrent.ListenableFuture;
20 19 import com.google.common.util.concurrent.SettableFuture;
21 20 import lombok.extern.slf4j.Slf4j;
22 21 import org.springframework.beans.factory.annotation.Autowired;
23   -import org.springframework.beans.factory.annotation.Value;
24 22 import org.thingsboard.server.common.data.id.EntityId;
25 23 import org.thingsboard.server.common.data.id.TenantId;
26 24 import org.thingsboard.server.common.data.kv.Aggregation;
27   -import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
28 25 import org.thingsboard.server.common.data.kv.TsKvEntry;
29 26 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
30 27 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
... ... @@ -32,26 +29,16 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
32 29
33 30 import javax.annotation.PostConstruct;
34 31 import javax.annotation.PreDestroy;
35   -import java.util.ArrayList;
36 32 import java.util.List;
37 33 import java.util.Optional;
38 34 import java.util.concurrent.CompletableFuture;
39 35 import java.util.stream.Collectors;
40 36
41 37 @Slf4j
42   -public abstract class AbstractSimpleSqlTimeseriesDao<T extends AbstractTsKvEntity> extends AbstractSqlTimeseriesDao {
  38 +public abstract class AbstractPsqlHsqlTimeseriesDao<T extends AbstractTsKvEntity> extends AbstractSqlTimeseriesDao {
43 39
44 40 @Autowired
45   - private InsertTsRepository<T> insertRepository;
46   -
47   - @Value("${sql.ts.batch_size:1000}")
48   - private int tsBatchSize;
49   -
50   - @Value("${sql.ts.batch_max_delay:100}")
51   - private long tsMaxDelay;
52   -
53   - @Value("${sql.ts.stats_print_interval_ms:1000}")
54   - private long tsStatsPrintIntervalMs;
  41 + protected InsertTsRepository<T> insertRepository;
55 42
56 43 protected TbSqlBlockingQueue<EntityContainer<T>> tsQueue;
57 44
... ... @@ -76,26 +63,39 @@ public abstract class AbstractSimpleSqlTimeseriesDao<T extends AbstractTsKvEntit
76 63 }
77 64 }
78 65
79   - protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
80   - if (query.getAggregation() == Aggregation.NONE) {
81   - return findAllAsyncWithLimit(entityId, query);
82   - } else {
83   - long stepTs = query.getStartTs();
84   - List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
85   - while (stepTs < query.getEndTs()) {
86   - long startTs = stepTs;
87   - long endTs = stepTs + query.getInterval();
88   - long ts = startTs + (endTs - startTs) / 2;
89   - futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
90   - stepTs = endTs;
91   - }
92   - return getTskvEntriesFuture(Futures.allAsList(futures));
  66 + protected abstract ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation);
  67 +
  68 + protected void switchAgregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<T>> entitiesFutures) {
  69 + switch (aggregation) {
  70 + case AVG:
  71 + findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures);
  72 + break;
  73 + case MAX:
  74 + findMax(tenantId, entityId, key, startTs, endTs, entitiesFutures);
  75 + break;
  76 + case MIN:
  77 + findMin(tenantId, entityId, key, startTs, endTs, entitiesFutures);
  78 + break;
  79 + case SUM:
  80 + findSum(tenantId, entityId, key, startTs, endTs, entitiesFutures);
  81 + break;
  82 + case COUNT:
  83 + findCount(tenantId, entityId, key, startTs, endTs, entitiesFutures);
  84 + break;
  85 + default:
  86 + throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
93 87 }
94 88 }
95 89
96   - protected abstract ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation);
  90 + protected abstract void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
  91 +
  92 + protected abstract void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
  93 +
  94 + protected abstract void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
  95 +
  96 + protected abstract void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
97 97
98   - protected abstract ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query);
  98 + protected abstract void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
99 99
100 100 protected SettableFuture<T> setFutures(List<CompletableFuture<T>> entitiesFutures) {
101 101 SettableFuture<T> listenableFuture = SettableFuture.create();
... ... @@ -121,36 +121,4 @@ public abstract class AbstractSimpleSqlTimeseriesDao<T extends AbstractTsKvEntit
121 121 });
122 122 return listenableFuture;
123 123 }
124   -
125   - protected void switchAgregation(EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<T>> entitiesFutures) {
126   - switch (aggregation) {
127   - case AVG:
128   - findAvg(entityId, key, startTs, endTs, entitiesFutures);
129   - break;
130   - case MAX:
131   - findMax(entityId, key, startTs, endTs, entitiesFutures);
132   - break;
133   - case MIN:
134   - findMin(entityId, key, startTs, endTs, entitiesFutures);
135   - break;
136   - case SUM:
137   - findSum(entityId, key, startTs, endTs, entitiesFutures);
138   - break;
139   - case COUNT:
140   - findCount(entityId, key, startTs, endTs, entitiesFutures);
141   - break;
142   - default:
143   - throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
144   - }
145   - }
146   -
147   - protected abstract void findCount(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
148   -
149   - protected abstract void findSum(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
150   -
151   - protected abstract void findMin(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
152   -
153   - protected abstract void findMax(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
154   -
155   - protected abstract void findAvg(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures);
156   -}
\ No newline at end of file
  124 +}
... ...
... ... @@ -21,9 +21,9 @@ import com.google.common.util.concurrent.FutureCallback;
21 21 import com.google.common.util.concurrent.Futures;
22 22 import com.google.common.util.concurrent.ListenableFuture;
23 23 import lombok.extern.slf4j.Slf4j;
  24 +import org.hibernate.exception.ConstraintViolationException;
24 25 import org.springframework.beans.factory.annotation.Autowired;
25 26 import org.springframework.beans.factory.annotation.Value;
26   -import org.thingsboard.server.common.data.UUIDConverter;
27 27 import org.thingsboard.server.common.data.id.EntityId;
28 28 import org.thingsboard.server.common.data.id.TenantId;
29 29 import org.thingsboard.server.common.data.kv.Aggregation;
... ... @@ -34,12 +34,17 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
34 34 import org.thingsboard.server.common.data.kv.StringDataEntry;
35 35 import org.thingsboard.server.common.data.kv.TsKvEntry;
36 36 import org.thingsboard.server.dao.DaoUtil;
  37 +import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
  38 +import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
  39 +import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
37 40 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey;
38 41 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
39 42 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
40 43 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
41 44 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
42 45 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  46 +import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
  47 +import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
43 48 import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
44 49 import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
45 50
... ... @@ -49,24 +54,34 @@ import javax.annotation.PreDestroy;
49 54 import java.util.List;
50 55 import java.util.Objects;
51 56 import java.util.Optional;
  57 +import java.util.concurrent.ConcurrentHashMap;
  58 +import java.util.concurrent.ConcurrentMap;
52 59 import java.util.concurrent.ExecutionException;
  60 +import java.util.concurrent.locks.ReentrantLock;
53 61 import java.util.stream.Collectors;
54 62
55   -import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
56   -
57 63 @Slf4j
58 64 public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningExecutorService {
59 65
60 66 private static final String DESC_ORDER = "DESC";
61 67
  68 + private final ConcurrentMap<String, Integer> tsKvDictionaryMap = new ConcurrentHashMap<>();
  69 +
  70 + private static final ReentrantLock tsCreationLock = new ReentrantLock();
  71 +
62 72 @Autowired
63 73 private TsKvLatestRepository tsKvLatestRepository;
64 74
65 75 @Autowired
  76 + private SearchTsKvLatestRepository searchTsKvLatestRepository;
  77 +
  78 + @Autowired
66 79 private InsertLatestRepository insertLatestRepository;
67 80
68 81 @Autowired
69   - protected ScheduledLogExecutorComponent logExecutor;
  82 + private TsKvDictionaryRepository dictionaryRepository;
  83 +
  84 + private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue;
70 85
71 86 @Value("${sql.ts_latest.batch_size:1000}")
72 87 private int tsLatestBatchSize;
... ... @@ -77,7 +92,17 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
77 92 @Value("${sql.ts_latest.stats_print_interval_ms:1000}")
78 93 private long tsLatestStatsPrintIntervalMs;
79 94
80   - private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue;
  95 + @Autowired
  96 + protected ScheduledLogExecutorComponent logExecutor;
  97 +
  98 + @Value("${sql.ts.batch_size:1000}")
  99 + protected int tsBatchSize;
  100 +
  101 + @Value("${sql.ts.batch_max_delay:100}")
  102 + protected long tsMaxDelay;
  103 +
  104 + @Value("${sql.ts.stats_print_interval_ms:1000}")
  105 + protected long tsStatsPrintIntervalMs;
81 106
82 107 @PostConstruct
83 108 protected void init() {
... ... @@ -120,6 +145,8 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
120 145
121 146 protected abstract ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
122 147
  148 + protected abstract ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query);
  149 +
123 150 protected ListenableFuture<List<TsKvEntry>> getTskvEntriesFuture(ListenableFuture<List<Optional<TsKvEntry>>> future) {
124 151 return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
125 152 @Nullable
... ... @@ -147,13 +174,14 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
147 174 protected ListenableFuture<TsKvEntry> getFindLatestFuture(EntityId entityId, String key) {
148 175 TsKvLatestCompositeKey compositeKey =
149 176 new TsKvLatestCompositeKey(
150   - entityId.getEntityType(),
151   - fromTimeUUID(entityId.getId()),
152   - key);
  177 + entityId.getId(),
  178 + getOrSaveKeyId(key));
153 179 Optional<TsKvLatestEntity> entry = tsKvLatestRepository.findById(compositeKey);
154 180 TsKvEntry result;
155 181 if (entry.isPresent()) {
156   - result = DaoUtil.getData(entry.get());
  182 + TsKvLatestEntity tsKvLatestEntity = entry.get();
  183 + tsKvLatestEntity.setStrKey(key);
  184 + result = DaoUtil.getData(tsKvLatestEntity);
157 185 } else {
158 186 result = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
159 187 }
... ... @@ -171,9 +199,8 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
171 199 ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
172 200 if (isRemove) {
173 201 TsKvLatestEntity latestEntity = new TsKvLatestEntity();
174   - latestEntity.setEntityType(entityId.getEntityType());
175   - latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
176   - latestEntity.setKey(query.getKey());
  202 + latestEntity.setEntityId(entityId.getId());
  203 + latestEntity.setKey(getOrSaveKeyId(query.getKey()));
177 204 return service.submit(() -> {
178 205 tsKvLatestRepository.delete(latestEntity);
179 206 return null;
... ... @@ -215,17 +242,14 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
215 242 protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
216 243 return Futures.immediateFuture(
217 244 DaoUtil.convertDataList(Lists.newArrayList(
218   - tsKvLatestRepository.findAllByEntityTypeAndEntityId(
219   - entityId.getEntityType(),
220   - UUIDConverter.fromTimeUUID(entityId.getId())))));
  245 + searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
221 246 }
222 247
223 248 protected ListenableFuture<Void> getSaveLatestFuture(EntityId entityId, TsKvEntry tsKvEntry) {
224 249 TsKvLatestEntity latestEntity = new TsKvLatestEntity();
225   - latestEntity.setEntityType(entityId.getEntityType());
226   - latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
  250 + latestEntity.setEntityId(entityId.getId());
227 251 latestEntity.setTs(tsKvEntry.getTs());
228   - latestEntity.setKey(tsKvEntry.getKey());
  252 + latestEntity.setKey(getOrSaveKeyId(tsKvEntry.getKey()));
229 253 latestEntity.setStrValue(tsKvEntry.getStrValue().orElse(null));
230 254 latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
231 255 latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null));
... ... @@ -233,6 +257,42 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
233 257 return tsLatestQueue.add(latestEntity);
234 258 }
235 259
  260 + protected Integer getOrSaveKeyId(String strKey) {
  261 + Integer keyId = tsKvDictionaryMap.get(strKey);
  262 + if (keyId == null) {
  263 + Optional<TsKvDictionary> tsKvDictionaryOptional;
  264 + tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
  265 + if (!tsKvDictionaryOptional.isPresent()) {
  266 + tsCreationLock.lock();
  267 + try {
  268 + tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
  269 + if (!tsKvDictionaryOptional.isPresent()) {
  270 + TsKvDictionary tsKvDictionary = new TsKvDictionary();
  271 + tsKvDictionary.setKey(strKey);
  272 + try {
  273 + TsKvDictionary saved = dictionaryRepository.save(tsKvDictionary);
  274 + tsKvDictionaryMap.put(saved.getKey(), saved.getKeyId());
  275 + keyId = saved.getKeyId();
  276 + } catch (ConstraintViolationException e) {
  277 + tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
  278 + TsKvDictionary dictionary = tsKvDictionaryOptional.orElseThrow(() -> new RuntimeException("Failed to get TsKvDictionary entity from DB!"));
  279 + tsKvDictionaryMap.put(dictionary.getKey(), dictionary.getKeyId());
  280 + keyId = dictionary.getKeyId();
  281 + }
  282 + } else {
  283 + keyId = tsKvDictionaryOptional.get().getKeyId();
  284 + }
  285 + } finally {
  286 + tsCreationLock.unlock();
  287 + }
  288 + } else {
  289 + keyId = tsKvDictionaryOptional.get().getKeyId();
  290 + tsKvDictionaryMap.put(strKey, keyId);
  291 + }
  292 + }
  293 + return keyId;
  294 + }
  295 +
236 296 private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
237 297 ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
238 298 return Futures.transformAsync(future, entryList -> {
... ...
... ... @@ -18,11 +18,11 @@ package org.thingsboard.server.dao.sqlts.dictionary;
18 18 import org.springframework.data.repository.CrudRepository;
19 19 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
20 20 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
21   -import org.thingsboard.server.dao.util.PsqlDao;
  21 +import org.thingsboard.server.dao.util.SqlTsAnyDao;
22 22
23 23 import java.util.Optional;
24 24
25   -@PsqlDao
  25 +@SqlTsAnyDao
26 26 public interface TsKvDictionaryRepository extends CrudRepository<TsKvDictionary, TsKvDictionaryCompositeKey> {
27 27
28 28 Optional<TsKvDictionary> findByKeyId(int keyId);
... ...
... ... @@ -37,15 +37,14 @@ import java.util.List;
37 37 public class HsqlTimeseriesInsertRepository extends AbstractInsertRepository implements InsertTsRepository<TsKvEntity> {
38 38
39 39 private static final String INSERT_OR_UPDATE =
40   - "MERGE INTO ts_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?) " +
41   - "T (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
42   - "ON (ts_kv.entity_type=T.entity_type " +
43   - "AND ts_kv.entity_id=T.entity_id " +
  40 + "MERGE INTO ts_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?) " +
  41 + "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
  42 + "ON (ts_kv.entity_id=T.entity_id " +
44 43 "AND ts_kv.key=T.key " +
45 44 "AND ts_kv.ts=T.ts) " +
46 45 "WHEN MATCHED THEN UPDATE SET ts_kv.bool_v = T.bool_v, ts_kv.str_v = T.str_v, ts_kv.long_v = T.long_v, ts_kv.dbl_v = T.dbl_v " +
47   - "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
48   - "VALUES (T.entity_type, T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);";
  46 + "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
  47 + "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);";
49 48
50 49 @Override
51 50 public void saveOrUpdate(List<EntityContainer<TsKvEntity>> entities) {
... ... @@ -54,29 +53,28 @@ public class HsqlTimeseriesInsertRepository extends AbstractInsertRepository imp
54 53 public void setValues(PreparedStatement ps, int i) throws SQLException {
55 54 EntityContainer<TsKvEntity> tsKvEntityEntityContainer = entities.get(i);
56 55 TsKvEntity tsKvEntity = tsKvEntityEntityContainer.getEntity();
57   - ps.setString(1, tsKvEntity.getEntityType().name());
58   - ps.setString(2, tsKvEntity.getEntityId());
59   - ps.setString(3, tsKvEntity.getKey());
60   - ps.setLong(4, tsKvEntity.getTs());
  56 + ps.setObject(1, tsKvEntity.getEntityId());
  57 + ps.setInt(2, tsKvEntity.getKey());
  58 + ps.setLong(3, tsKvEntity.getTs());
61 59
62 60 if (tsKvEntity.getBooleanValue() != null) {
63   - ps.setBoolean(5, tsKvEntity.getBooleanValue());
  61 + ps.setBoolean(4, tsKvEntity.getBooleanValue());
64 62 } else {
65   - ps.setNull(5, Types.BOOLEAN);
  63 + ps.setNull(4, Types.BOOLEAN);
66 64 }
67 65
68   - ps.setString(6, tsKvEntity.getStrValue());
  66 + ps.setString(5, tsKvEntity.getStrValue());
69 67
70 68 if (tsKvEntity.getLongValue() != null) {
71   - ps.setLong(7, tsKvEntity.getLongValue());
  69 + ps.setLong(6, tsKvEntity.getLongValue());
72 70 } else {
73   - ps.setNull(7, Types.BIGINT);
  71 + ps.setNull(6, Types.BIGINT);
74 72 }
75 73
76 74 if (tsKvEntity.getDoubleValue() != null) {
77   - ps.setDouble(8, tsKvEntity.getDoubleValue());
  75 + ps.setDouble(7, tsKvEntity.getDoubleValue());
78 76 } else {
79   - ps.setNull(8, Types.DOUBLE);
  77 + ps.setNull(7, Types.DOUBLE);
80 78 }
81 79 }
82 80
... ...
... ... @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
30 30 import org.thingsboard.server.common.data.kv.TsKvEntry;
31 31 import org.thingsboard.server.dao.DaoUtil;
32 32 import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;
33   -import org.thingsboard.server.dao.sqlts.AbstractSimpleSqlTimeseriesDao;
  33 +import org.thingsboard.server.dao.sqlts.AbstractPsqlHsqlTimeseriesDao;
34 34 import org.thingsboard.server.dao.sqlts.EntityContainer;
35 35 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
36 36 import org.thingsboard.server.dao.util.HsqlDao;
... ... @@ -41,14 +41,12 @@ import java.util.List;
41 41 import java.util.Optional;
42 42 import java.util.concurrent.CompletableFuture;
43 43
44   -import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
45   -
46 44
47 45 @Component
48 46 @Slf4j
49 47 @SqlTsDao
50 48 @HsqlDao
51   -public class JpaHsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEntity> implements TimeseriesDao {
  49 +public class JpaHsqlTimeseriesDao extends AbstractPsqlHsqlTimeseriesDao<TsKvEntity> implements TimeseriesDao {
52 50
53 51 @Autowired
54 52 private TsKvHsqlRepository tsKvRepository;
... ... @@ -60,11 +58,12 @@ public class JpaHsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
60 58
61 59 @Override
62 60 public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
  61 + String strKey = tsKvEntry.getKey();
  62 + Integer keyId = getOrSaveKeyId(strKey);
63 63 TsKvEntity entity = new TsKvEntity();
64   - entity.setEntityType(entityId.getEntityType());
65   - entity.setEntityId(fromTimeUUID(entityId.getId()));
  64 + entity.setEntityId(entityId.getId());
66 65 entity.setTs(tsKvEntry.getTs());
67   - entity.setKey(tsKvEntry.getKey());
  66 + entity.setKey(keyId);
68 67 entity.setStrValue(tsKvEntry.getStrValue().orElse(null));
69 68 entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
70 69 entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
... ... @@ -77,9 +76,8 @@ public class JpaHsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
77 76 public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
78 77 return service.submit(() -> {
79 78 tsKvRepository.delete(
80   - fromTimeUUID(entityId.getId()),
81   - entityId.getEntityType(),
82   - query.getKey(),
  79 + entityId.getId(),
  80 + getOrSaveKeyId(query.getKey()),
83 81 query.getStartTs(),
84 82 query.getEndTs());
85 83 return null;
... ... @@ -116,14 +114,47 @@ public class JpaHsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
116 114 return Futures.immediateFuture(null);
117 115 }
118 116
119   - protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
  117 + protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  118 + if (query.getAggregation() == Aggregation.NONE) {
  119 + return findAllAsyncWithLimit(tenantId, entityId, query);
  120 + } else {
  121 + long stepTs = query.getStartTs();
  122 + List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
  123 + while (stepTs < query.getEndTs()) {
  124 + long startTs = stepTs;
  125 + long endTs = stepTs + query.getInterval();
  126 + long ts = startTs + (endTs - startTs) / 2;
  127 + futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
  128 + stepTs = endTs;
  129 + }
  130 + return getTskvEntriesFuture(Futures.allAsList(futures));
  131 + }
  132 + }
  133 +
  134 + @Override
  135 + protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  136 + List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
  137 + entityId.getId(),
  138 + getOrSaveKeyId(query.getKey()),
  139 + query.getStartTs(),
  140 + query.getEndTs(),
  141 + new PageRequest(0, query.getLimit(),
  142 + new Sort(Sort.Direction.fromString(
  143 + query.getOrderBy()), "ts")));
  144 + tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
  145 + return Futures.immediateFuture(
  146 + DaoUtil.convertDataList(
  147 + tsKvEntities));
  148 + }
  149 +
  150 + @Override
  151 + protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
120 152 List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
121   - switchAgregation(entityId, key, startTs, endTs, aggregation, entitiesFutures);
  153 + switchAgregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
122 154 return Futures.transform(setFutures(entitiesFutures), entity -> {
123 155 if (entity != null && entity.isNotEmpty()) {
124   - entity.setEntityId(fromTimeUUID(entityId.getId()));
125   - entity.setEntityType(entityId.getEntityType());
126   - entity.setKey(key);
  156 + entity.setEntityId(entityId.getId());
  157 + entity.setKey(getOrSaveKeyId(key));
127 158 entity.setTs(ts);
128 159 return Optional.of(DaoUtil.getData(entity));
129 160 } else {
... ... @@ -132,75 +163,63 @@ public class JpaHsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
132 163 });
133 164 }
134 165
135   - protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
136   - return Futures.immediateFuture(
137   - DaoUtil.convertDataList(
138   - tsKvRepository.findAllWithLimit(
139   - fromTimeUUID(entityId.getId()),
140   - entityId.getEntityType(),
141   - query.getKey(),
142   - query.getStartTs(),
143   - query.getEndTs(),
144   - new PageRequest(0, query.getLimit(),
145   - new Sort(Sort.Direction.fromString(
146   - query.getOrderBy()), "ts")))));
147   - }
148   -
149   - protected void findCount(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  166 + @Override
  167 + protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  168 + Integer keyId = getOrSaveKeyId(key);
150 169 entitiesFutures.add(tsKvRepository.findCount(
151   - fromTimeUUID(entityId.getId()),
152   - entityId.getEntityType(),
153   - key,
  170 + entityId.getId(),
  171 + keyId,
154 172 startTs,
155 173 endTs));
156 174 }
157 175
158   - protected void findSum(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  176 + @Override
  177 + protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  178 + Integer keyId = getOrSaveKeyId(key);
159 179 entitiesFutures.add(tsKvRepository.findSum(
160   - fromTimeUUID(entityId.getId()),
161   - entityId.getEntityType(),
162   - key,
  180 + entityId.getId(),
  181 + keyId,
163 182 startTs,
164 183 endTs));
165 184 }
166 185
167   - protected void findMin(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  186 + @Override
  187 + protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  188 + Integer keyId = getOrSaveKeyId(key);
168 189 entitiesFutures.add(tsKvRepository.findStringMin(
169   - fromTimeUUID(entityId.getId()),
170   - entityId.getEntityType(),
171   - key,
  190 + entityId.getId(),
  191 + keyId,
172 192 startTs,
173 193 endTs));
174 194 entitiesFutures.add(tsKvRepository.findNumericMin(
175   - fromTimeUUID(entityId.getId()),
176   - entityId.getEntityType(),
177   - key,
  195 + entityId.getId(),
  196 + keyId,
178 197 startTs,
179 198 endTs));
180 199 }
181 200
182   - protected void findMax(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  201 + @Override
  202 + protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  203 + Integer keyId = getOrSaveKeyId(key);
183 204 entitiesFutures.add(tsKvRepository.findStringMax(
184   - fromTimeUUID(entityId.getId()),
185   - entityId.getEntityType(),
186   - key,
  205 + entityId.getId(),
  206 + keyId,
187 207 startTs,
188 208 endTs));
189 209 entitiesFutures.add(tsKvRepository.findNumericMax(
190   - fromTimeUUID(entityId.getId()),
191   - entityId.getEntityType(),
192   - key,
  210 + entityId.getId(),
  211 + keyId,
193 212 startTs,
194 213 endTs));
195 214 }
196 215
197   - protected void findAvg(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  216 + @Override
  217 + protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  218 + Integer keyId = getOrSaveKeyId(key);
198 219 entitiesFutures.add(tsKvRepository.findAvg(
199   - fromTimeUUID(entityId.getId()),
200   - entityId.getEntityType(),
201   - key,
  220 + entityId.getId(),
  221 + keyId,
202 222 startTs,
203 223 endTs));
204 224 }
205   -
206 225 }
\ No newline at end of file
... ...
... ... @@ -22,23 +22,21 @@ import org.springframework.data.repository.CrudRepository;
22 22 import org.springframework.data.repository.query.Param;
23 23 import org.springframework.scheduling.annotation.Async;
24 24 import org.springframework.transaction.annotation.Transactional;
25   -import org.thingsboard.server.common.data.EntityType;
26 25 import org.thingsboard.server.dao.model.sqlts.hsql.TsKvCompositeKey;
27 26 import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;
28 27 import org.thingsboard.server.dao.util.SqlDao;
29 28
30 29 import java.util.List;
  30 +import java.util.UUID;
31 31 import java.util.concurrent.CompletableFuture;
32 32
33 33 @SqlDao
34 34 public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
35 35
36 36 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
37   - "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
38   - "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
39   - List<TsKvEntity> findAllWithLimit(@Param("entityId") String entityId,
40   - @Param("entityType") EntityType entityType,
41   - @Param("entityKey") String key,
  37 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  38 + List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
  39 + @Param("entityKey") int key,
42 40 @Param("startTs") long startTs,
43 41 @Param("endTs") long endTs,
44 42 Pageable pageable);
... ... @@ -46,22 +44,18 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
46 44 @Transactional
47 45 @Modifying
48 46 @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
49   - "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
50   - "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
51   - void delete(@Param("entityId") String entityId,
52   - @Param("entityType") EntityType entityType,
53   - @Param("entityKey") String key,
  47 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  48 + void delete(@Param("entityId") UUID entityId,
  49 + @Param("entityKey") int key,
54 50 @Param("startTs") long startTs,
55 51 @Param("endTs") long endTs);
56 52
57 53 @Async
58 54 @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +
59   - "WHERE tskv.strValue IS NOT NULL " +
60   - "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
61   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
62   - CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") String entityId,
63   - @Param("entityType") EntityType entityType,
64   - @Param("entityKey") String entityKey,
  55 + "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId AND tskv.key = :entityKey" +
  56 + " AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  57 + CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") UUID entityId,
  58 + @Param("entityKey") int entityKey,
65 59 @Param("startTs") long startTs,
66 60 @Param("endTs") long endTs);
67 61
... ... @@ -70,24 +64,20 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
70 64 "MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " +
71 65 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
72 66 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
73   - "'MAX') FROM TsKvEntity tskv " +
74   - "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  67 + "'MAX') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
75 68 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
76   - CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") String entityId,
77   - @Param("entityType") EntityType entityType,
78   - @Param("entityKey") String entityKey,
  69 + CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") UUID entityId,
  70 + @Param("entityKey") int entityKey,
79 71 @Param("startTs") long startTs,
80 72 @Param("endTs") long endTs);
81 73
82 74
83 75 @Async
84 76 @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +
85   - "WHERE tskv.strValue IS NOT NULL " +
86   - "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  77 + "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId " +
87 78 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
88   - CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") String entityId,
89   - @Param("entityType") EntityType entityType,
90   - @Param("entityKey") String entityKey,
  79 + CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") UUID entityId,
  80 + @Param("entityKey") int entityKey,
91 81 @Param("startTs") long startTs,
92 82 @Param("endTs") long endTs);
93 83
... ... @@ -96,12 +86,10 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
96 86 "MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " +
97 87 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
98 88 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
99   - "'MIN') FROM TsKvEntity tskv " +
100   - "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  89 + "'MIN') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
101 90 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
102   - CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") String entityId,
103   - @Param("entityType") EntityType entityType,
104   - @Param("entityKey") String entityKey,
  91 + CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") UUID entityId,
  92 + @Param("entityKey") int entityKey,
105 93 @Param("startTs") long startTs,
106 94 @Param("endTs") long endTs);
107 95
... ... @@ -110,11 +98,9 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
110 98 "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " +
111 99 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
112 100 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +
113   - "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
114   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
115   - CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
116   - @Param("entityType") EntityType entityType,
117   - @Param("entityKey") String entityKey,
  101 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  102 + CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId,
  103 + @Param("entityKey") int entityKey,
118 104 @Param("startTs") long startTs,
119 105 @Param("endTs") long endTs);
120 106
... ... @@ -123,12 +109,10 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
123 109 "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
124 110 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
125 111 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
126   - "'AVG') FROM TsKvEntity tskv " +
127   - "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  112 + "'AVG') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
128 113 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
129   - CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
130   - @Param("entityType") EntityType entityType,
131   - @Param("entityKey") String entityKey,
  114 + CompletableFuture<TsKvEntity> findAvg(@Param("entityId") UUID entityId,
  115 + @Param("entityKey") int entityKey,
132 116 @Param("startTs") long startTs,
133 117 @Param("endTs") long endTs);
134 118
... ... @@ -137,12 +121,10 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo
137 121 "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
138 122 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
139 123 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
140   - "'SUM') FROM TsKvEntity tskv " +
141   - "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  124 + "'SUM') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
142 125 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
143   - CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
144   - @Param("entityType") EntityType entityType,
145   - @Param("entityKey") String entityKey,
  126 + CompletableFuture<TsKvEntity> findSum(@Param("entityId") UUID entityId,
  127 + @Param("entityKey") int entityKey,
146 128 @Param("startTs") long startTs,
147 129 @Param("endTs") long endTs);
148 130
... ...
... ... @@ -36,43 +36,41 @@ import java.util.List;
36 36 public class HsqlLatestInsertRepository extends AbstractInsertRepository implements InsertLatestRepository {
37 37
38 38 private static final String INSERT_OR_UPDATE =
39   - "MERGE INTO ts_kv_latest USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?) " +
40   - "T (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
41   - "ON (ts_kv_latest.entity_type=T.entity_type " +
42   - "AND ts_kv_latest.entity_id=T.entity_id " +
  39 + "MERGE INTO ts_kv_latest USING(VALUES ?, ?, ?, ?, ?, ?, ?) " +
  40 + "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
  41 + "ON (ts_kv_latest.entity_id=T.entity_id " +
43 42 "AND ts_kv_latest.key=T.key) " +
44 43 "WHEN MATCHED THEN UPDATE SET ts_kv_latest.ts = T.ts, ts_kv_latest.bool_v = T.bool_v, ts_kv_latest.str_v = T.str_v, ts_kv_latest.long_v = T.long_v, ts_kv_latest.dbl_v = T.dbl_v " +
45   - "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
46   - "VALUES (T.entity_type, T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);";
  44 + "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
  45 + "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);";
47 46
48 47 @Override
49 48 public void saveOrUpdate(List<TsKvLatestEntity> entities) {
50 49 jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
51 50 @Override
52 51 public void setValues(PreparedStatement ps, int i) throws SQLException {
53   - ps.setString(1, entities.get(i).getEntityType().name());
54   - ps.setString(2, entities.get(i).getEntityId());
55   - ps.setString(3, entities.get(i).getKey());
56   - ps.setLong(4, entities.get(i).getTs());
  52 + ps.setObject(1, entities.get(i).getEntityId());
  53 + ps.setInt(2, entities.get(i).getKey());
  54 + ps.setLong(3, entities.get(i).getTs());
57 55
58 56 if (entities.get(i).getBooleanValue() != null) {
59   - ps.setBoolean(5, entities.get(i).getBooleanValue());
  57 + ps.setBoolean(4, entities.get(i).getBooleanValue());
60 58 } else {
61   - ps.setNull(5, Types.BOOLEAN);
  59 + ps.setNull(4, Types.BOOLEAN);
62 60 }
63 61
64   - ps.setString(6, entities.get(i).getStrValue());
  62 + ps.setString(5, entities.get(i).getStrValue());
65 63
66 64 if (entities.get(i).getLongValue() != null) {
67   - ps.setLong(7, entities.get(i).getLongValue());
  65 + ps.setLong(6, entities.get(i).getLongValue());
68 66 } else {
69   - ps.setNull(7, Types.BIGINT);
  67 + ps.setNull(6, Types.BIGINT);
70 68 }
71 69
72 70 if (entities.get(i).getDoubleValue() != null) {
73   - ps.setDouble(8, entities.get(i).getDoubleValue());
  71 + ps.setDouble(7, entities.get(i).getDoubleValue());
74 72 } else {
75   - ps.setNull(8, Types.DOUBLE);
  73 + ps.setNull(7, Types.DOUBLE);
76 74 }
77 75 }
78 76
... ...
... ... @@ -38,12 +38,12 @@ import java.util.List;
38 38 public class PsqlLatestInsertRepository extends AbstractInsertRepository implements InsertLatestRepository {
39 39
40 40 private static final String BATCH_UPDATE =
41   - "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_type = ? AND entity_id = ? and key = ?";
  41 + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_id = ? and key = ?";
42 42
43 43
44 44 private static final String INSERT_OR_UPDATE =
45   - "INSERT INTO ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " +
46   - "ON CONFLICT (entity_type, entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;";
  45 + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?) " +
  46 + "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;";
47 47
48 48 @Override
49 49 public void saveOrUpdate(List<TsKvLatestEntity> entities) {
... ... @@ -76,9 +76,8 @@ public class PsqlLatestInsertRepository extends AbstractInsertRepository impleme
76 76 ps.setNull(5, Types.DOUBLE);
77 77 }
78 78
79   - ps.setString(6, tsKvLatestEntity.getEntityType().name());
80   - ps.setString(7, tsKvLatestEntity.getEntityId());
81   - ps.setString(8, tsKvLatestEntity.getKey());
  79 + ps.setObject(6, tsKvLatestEntity.getEntityId());
  80 + ps.setInt(7, tsKvLatestEntity.getKey());
82 81 }
83 82
84 83 @Override
... ... @@ -105,38 +104,37 @@ public class PsqlLatestInsertRepository extends AbstractInsertRepository impleme
105 104 @Override
106 105 public void setValues(PreparedStatement ps, int i) throws SQLException {
107 106 TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i);
108   - ps.setString(1, tsKvLatestEntity.getEntityType().name());
109   - ps.setString(2, tsKvLatestEntity.getEntityId());
110   - ps.setString(3, tsKvLatestEntity.getKey());
111   - ps.setLong(4, tsKvLatestEntity.getTs());
112   - ps.setLong(9, tsKvLatestEntity.getTs());
  107 + ps.setObject(1, tsKvLatestEntity.getEntityId());
  108 + ps.setInt(2, tsKvLatestEntity.getKey());
  109 + ps.setLong(3, tsKvLatestEntity.getTs());
  110 + ps.setLong(8, tsKvLatestEntity.getTs());
113 111
114 112 if (tsKvLatestEntity.getBooleanValue() != null) {
115   - ps.setBoolean(5, tsKvLatestEntity.getBooleanValue());
116   - ps.setBoolean(10, tsKvLatestEntity.getBooleanValue());
  113 + ps.setBoolean(4, tsKvLatestEntity.getBooleanValue());
  114 + ps.setBoolean(9, tsKvLatestEntity.getBooleanValue());
117 115 } else {
118   - ps.setNull(5, Types.BOOLEAN);
119   - ps.setNull(10, Types.BOOLEAN);
  116 + ps.setNull(4, Types.BOOLEAN);
  117 + ps.setNull(9, Types.BOOLEAN);
120 118 }
121 119
122   - ps.setString(6, replaceNullChars(tsKvLatestEntity.getStrValue()));
123   - ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue()));
  120 + ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue()));
  121 + ps.setString(10, replaceNullChars(tsKvLatestEntity.getStrValue()));
124 122
125 123
126 124 if (tsKvLatestEntity.getLongValue() != null) {
127   - ps.setLong(7, tsKvLatestEntity.getLongValue());
128   - ps.setLong(12, tsKvLatestEntity.getLongValue());
  125 + ps.setLong(6, tsKvLatestEntity.getLongValue());
  126 + ps.setLong(11, tsKvLatestEntity.getLongValue());
129 127 } else {
130   - ps.setNull(7, Types.BIGINT);
131   - ps.setNull(12, Types.BIGINT);
  128 + ps.setNull(6, Types.BIGINT);
  129 + ps.setNull(11, Types.BIGINT);
132 130 }
133 131
134 132 if (tsKvLatestEntity.getDoubleValue() != null) {
135   - ps.setDouble(8, tsKvLatestEntity.getDoubleValue());
136   - ps.setDouble(13, tsKvLatestEntity.getDoubleValue());
  133 + ps.setDouble(7, tsKvLatestEntity.getDoubleValue());
  134 + ps.setDouble(12, tsKvLatestEntity.getDoubleValue());
137 135 } else {
138   - ps.setNull(8, Types.DOUBLE);
139   - ps.setNull(13, Types.DOUBLE);
  136 + ps.setNull(7, Types.DOUBLE);
  137 + ps.setNull(12, Types.DOUBLE);
140 138 }
141 139 }
142 140
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sqlts.latest;
  17 +
  18 +import org.springframework.stereotype.Repository;
  19 +import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
  20 +import org.thingsboard.server.dao.util.SqlTsAnyDao;
  21 +
  22 +import javax.persistence.EntityManager;
  23 +import javax.persistence.PersistenceContext;
  24 +import java.util.List;
  25 +import java.util.UUID;
  26 +
  27 +@SqlTsAnyDao
  28 +@Repository
  29 +public class SearchTsKvLatestRepository {
  30 +
  31 + public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId";
  32 + public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," +
  33 + " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " +
  34 + "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";
  35 +
  36 + @PersistenceContext
  37 + private EntityManager entityManager;
  38 +
  39 + public List<TsKvLatestEntity> findAllByEntityId(UUID entityId) {
  40 + return entityManager.createNamedQuery(FIND_ALL_BY_ENTITY_ID, TsKvLatestEntity.class)
  41 + .setParameter("id", entityId)
  42 + .getResultList();
  43 + }
  44 +
  45 +}
... ...
... ... @@ -16,15 +16,15 @@
16 16 package org.thingsboard.server.dao.sqlts.latest;
17 17
18 18 import org.springframework.data.repository.CrudRepository;
19   -import org.thingsboard.server.common.data.EntityType;
20 19 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey;
21 20 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
22 21 import org.thingsboard.server.dao.util.SqlDao;
23 22
24 23 import java.util.List;
  24 +import java.util.UUID;
25 25
26 26 @SqlDao
27 27 public interface TsKvLatestRepository extends CrudRepository<TsKvLatestEntity, TsKvLatestCompositeKey> {
28 28
29   - List<TsKvLatestEntity> findAllByEntityTypeAndEntityId(EntityType entityType, String entityId);
  29 + List<TsKvLatestEntity> findAllByEntityId(UUID entityId);
30 30 }
... ...
... ... @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.psql;
18 18 import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21   -import org.hibernate.exception.ConstraintViolationException;
22 21 import org.springframework.beans.factory.annotation.Autowired;
23 22 import org.springframework.beans.factory.annotation.Value;
24 23 import org.springframework.data.domain.PageRequest;
... ... @@ -31,12 +30,9 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
31 30 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
32 31 import org.thingsboard.server.common.data.kv.TsKvEntry;
33 32 import org.thingsboard.server.dao.DaoUtil;
34   -import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
35   -import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
36 33 import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity;
37   -import org.thingsboard.server.dao.sqlts.AbstractSimpleSqlTimeseriesDao;
  34 +import org.thingsboard.server.dao.sqlts.AbstractPsqlHsqlTimeseriesDao;
38 35 import org.thingsboard.server.dao.sqlts.EntityContainer;
39   -import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
40 36 import org.thingsboard.server.dao.timeseries.PsqlPartition;
41 37 import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
42 38 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
... ... @@ -48,10 +44,12 @@ import java.time.LocalDateTime;
48 44 import java.time.ZoneOffset;
49 45 import java.time.ZonedDateTime;
50 46 import java.time.format.DateTimeFormatter;
51   -import java.util.*;
  47 +import java.util.ArrayList;
  48 +import java.util.List;
  49 +import java.util.Map;
  50 +import java.util.Optional;
52 51 import java.util.concurrent.CompletableFuture;
53 52 import java.util.concurrent.ConcurrentHashMap;
54   -import java.util.concurrent.ConcurrentMap;
55 53 import java.util.concurrent.locks.ReentrantLock;
56 54
57 55 import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_START;
... ... @@ -61,18 +59,12 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA
61 59 @Slf4j
62 60 @SqlTsDao
63 61 @PsqlDao
64   -public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEntity> implements TimeseriesDao {
  62 +public class JpaPsqlTimeseriesDao extends AbstractPsqlHsqlTimeseriesDao<TsKvEntity> implements TimeseriesDao {
65 63
66   - private final ConcurrentMap<String, Integer> tsKvDictionaryMap = new ConcurrentHashMap<>();
67 64 private final Map<Long, PsqlPartition> partitions = new ConcurrentHashMap<>();
68   -
69   - private static final ReentrantLock tsCreationLock = new ReentrantLock();
70 65 private static final ReentrantLock partitionCreationLock = new ReentrantLock();
71 66
72 67 @Autowired
73   - private TsKvDictionaryRepository dictionaryRepository;
74   -
75   - @Autowired
76 68 private TsKvPsqlRepository tsKvRepository;
77 69
78 70 @Autowired
... ... @@ -101,11 +93,6 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
101 93 }
102 94
103 95 @Override
104   - public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
105   - return processFindAllAsync(tenantId, entityId, queries);
106   - }
107   -
108   - @Override
109 96 public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
110 97 String strKey = tsKvEntry.getKey();
111 98 Integer keyId = getOrSaveKeyId(strKey);
... ... @@ -166,22 +153,25 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
166 153 return Futures.immediateFuture(null);
167 154 }
168 155
169   - protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
170   - List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
171   - switchAgregation(entityId, key, startTs, endTs, aggregation, entitiesFutures);
172   - return Futures.transform(setFutures(entitiesFutures), entity -> {
173   - if (entity != null && entity.isNotEmpty()) {
174   - entity.setEntityId(entityId.getId());
175   - entity.setStrKey(key);
176   - entity.setTs(ts);
177   - return Optional.of(DaoUtil.getData(entity));
178   - } else {
179   - return Optional.empty();
  156 + protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  157 + if (query.getAggregation() == Aggregation.NONE) {
  158 + return findAllAsyncWithLimit(tenantId, entityId, query);
  159 + } else {
  160 + long stepTs = query.getStartTs();
  161 + List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
  162 + while (stepTs < query.getEndTs()) {
  163 + long startTs = stepTs;
  164 + long endTs = stepTs + query.getInterval();
  165 + long ts = startTs + (endTs - startTs) / 2;
  166 + futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
  167 + stepTs = endTs;
180 168 }
181   - });
  169 + return getTskvEntriesFuture(Futures.allAsList(futures));
  170 + }
182 171 }
183 172
184   - protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
  173 + @Override
  174 + protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
185 175 Integer keyId = getOrSaveKeyId(query.getKey());
186 176 List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
187 177 entityId.getId(),
... ... @@ -195,7 +185,24 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
195 185 return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
196 186 }
197 187
198   - protected void findCount(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  188 + @Override
  189 + protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
  190 + List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
  191 + switchAgregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
  192 + return Futures.transform(setFutures(entitiesFutures), entity -> {
  193 + if (entity != null && entity.isNotEmpty()) {
  194 + entity.setEntityId(entityId.getId());
  195 + entity.setStrKey(key);
  196 + entity.setTs(ts);
  197 + return Optional.of(DaoUtil.getData(entity));
  198 + } else {
  199 + return Optional.empty();
  200 + }
  201 + });
  202 + }
  203 +
  204 + @Override
  205 + protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
199 206 Integer keyId = getOrSaveKeyId(key);
200 207 entitiesFutures.add(tsKvRepository.findCount(
201 208 entityId.getId(),
... ... @@ -204,7 +211,8 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
204 211 endTs));
205 212 }
206 213
207   - protected void findSum(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  214 + @Override
  215 + protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
208 216 Integer keyId = getOrSaveKeyId(key);
209 217 entitiesFutures.add(tsKvRepository.findSum(
210 218 entityId.getId(),
... ... @@ -213,7 +221,8 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
213 221 endTs));
214 222 }
215 223
216   - protected void findMin(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  224 + @Override
  225 + protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
217 226 Integer keyId = getOrSaveKeyId(key);
218 227 entitiesFutures.add(tsKvRepository.findStringMin(
219 228 entityId.getId(),
... ... @@ -227,7 +236,8 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
227 236 endTs));
228 237 }
229 238
230   - protected void findMax(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  239 + @Override
  240 + protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
231 241 Integer keyId = getOrSaveKeyId(key);
232 242 entitiesFutures.add(tsKvRepository.findStringMax(
233 243 entityId.getId(),
... ... @@ -241,7 +251,8 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
241 251 endTs));
242 252 }
243 253
244   - protected void findAvg(EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  254 + @Override
  255 + protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
245 256 Integer keyId = getOrSaveKeyId(key);
246 257 entitiesFutures.add(tsKvRepository.findAvg(
247 258 entityId.getId(),
... ... @@ -250,40 +261,9 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt
250 261 endTs));
251 262 }
252 263
253   - private Integer getOrSaveKeyId(String strKey) {
254   - Integer keyId = tsKvDictionaryMap.get(strKey);
255   - if (keyId == null) {
256   - Optional<TsKvDictionary> tsKvDictionaryOptional;
257   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
258   - if (!tsKvDictionaryOptional.isPresent()) {
259   - tsCreationLock.lock();
260   - try {
261   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
262   - if (!tsKvDictionaryOptional.isPresent()) {
263   - TsKvDictionary tsKvDictionary = new TsKvDictionary();
264   - tsKvDictionary.setKey(strKey);
265   - try {
266   - TsKvDictionary saved = dictionaryRepository.save(tsKvDictionary);
267   - tsKvDictionaryMap.put(saved.getKey(), saved.getKeyId());
268   - keyId = saved.getKeyId();
269   - } catch (ConstraintViolationException e) {
270   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
271   - TsKvDictionary dictionary = tsKvDictionaryOptional.orElseThrow(() -> new RuntimeException("Failed to get TsKvDictionary entity from DB!"));
272   - tsKvDictionaryMap.put(dictionary.getKey(), dictionary.getKeyId());
273   - keyId = dictionary.getKeyId();
274   - }
275   - } else {
276   - keyId = tsKvDictionaryOptional.get().getKeyId();
277   - }
278   - } finally {
279   - tsCreationLock.unlock();
280   - }
281   - } else {
282   - keyId = tsKvDictionaryOptional.get().getKeyId();
283   - tsKvDictionaryMap.put(strKey, keyId);
284   - }
285   - }
286   - return keyId;
  264 + @Override
  265 + public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
  266 + return processFindAllAsync(tenantId, entityId, queries);
287 267 }
288 268
289 269 private void savePartition(PsqlPartition psqlPartition) {
... ...
... ... @@ -19,9 +19,7 @@ import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
20 20 import com.google.common.util.concurrent.SettableFuture;
21 21 import lombok.extern.slf4j.Slf4j;
22   -import org.hibernate.exception.ConstraintViolationException;
23 22 import org.springframework.beans.factory.annotation.Autowired;
24   -import org.springframework.beans.factory.annotation.Value;
25 23 import org.springframework.data.domain.PageRequest;
26 24 import org.springframework.data.domain.Sort;
27 25 import org.springframework.stereotype.Component;
... ... @@ -33,15 +31,12 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
33 31 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
34 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
35 33 import org.thingsboard.server.dao.DaoUtil;
36   -import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
37   -import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
38 34 import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;
39 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
40 36 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
41 37 import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
42 38 import org.thingsboard.server.dao.sqlts.EntityContainer;
43 39 import org.thingsboard.server.dao.sqlts.InsertTsRepository;
44   -import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
45 40 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
46 41 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
47 42
... ... @@ -53,25 +48,12 @@ import java.util.List;
53 48 import java.util.Optional;
54 49 import java.util.UUID;
55 50 import java.util.concurrent.CompletableFuture;
56   -import java.util.concurrent.ConcurrentHashMap;
57   -import java.util.concurrent.ConcurrentMap;
58   -import java.util.concurrent.locks.ReentrantLock;
59   -
60 51
61 52 @Component
62 53 @Slf4j
63 54 @TimescaleDBTsDao
64 55 public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao {
65 56
66   - private static final String TS = "ts";
67   -
68   - private final ConcurrentMap<String, Integer> tsKvDictionaryMap = new ConcurrentHashMap<>();
69   -
70   - private static final ReentrantLock tsCreationLock = new ReentrantLock();
71   -
72   - @Autowired
73   - private TsKvDictionaryRepository dictionaryRepository;
74   -
75 57 @Autowired
76 58 private TsKvTimescaleRepository tsKvRepository;
77 59
... ... @@ -79,40 +61,32 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
79 61 private AggregationRepository aggregationRepository;
80 62
81 63 @Autowired
82   - private InsertTsRepository<TimescaleTsKvEntity> insertRepository;
83   -
84   - @Value("${sql.ts_timescale.batch_size:1000}")
85   - private int batchSize;
86   -
87   - @Value("${sql.ts_timescale.batch_max_delay:100}")
88   - private long maxDelay;
  64 + protected InsertTsRepository<TimescaleTsKvEntity> insertRepository;
89 65
90   - @Value("${sql.ts_timescale.stats_print_interval_ms:1000}")
91   - private long statsPrintIntervalMs;
92   -
93   - private TbSqlBlockingQueue<EntityContainer<TimescaleTsKvEntity>> queue;
  66 + protected TbSqlBlockingQueue<EntityContainer<TimescaleTsKvEntity>> tsQueue;
94 67
95 68 @PostConstruct
96 69 protected void init() {
97 70 super.init();
98   - TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
  71 + TbSqlBlockingQueueParams tsParams = TbSqlBlockingQueueParams.builder()
99 72 .logName("TS Timescale")
100   - .batchSize(batchSize)
101   - .maxDelay(maxDelay)
102   - .statsPrintIntervalMs(statsPrintIntervalMs)
  73 + .batchSize(tsBatchSize)
  74 + .maxDelay(tsMaxDelay)
  75 + .statsPrintIntervalMs(tsStatsPrintIntervalMs)
103 76 .build();
104   - queue = new TbSqlBlockingQueue<>(params);
105   - queue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
  77 + tsQueue = new TbSqlBlockingQueue<>(tsParams);
  78 + tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
106 79 }
107 80
108 81 @PreDestroy
109 82 protected void destroy() {
110 83 super.destroy();
111   - if (queue != null) {
112   - queue.destroy();
  84 + if (tsQueue != null) {
  85 + tsQueue.destroy();
113 86 }
114 87 }
115 88
  89 + @Override
116 90 protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
117 91 if (query.getAggregation() == Aggregation.NONE) {
118 92 return findAllAsyncWithLimit(tenantId, entityId, query);
... ... @@ -120,12 +94,59 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
120 94 long startTs = query.getStartTs();
121 95 long endTs = query.getEndTs();
122 96 long timeBucket = query.getInterval();
123   - ListenableFuture<List<Optional<TsKvEntry>>> future = findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
  97 + ListenableFuture<List<Optional<TsKvEntry>>> future = findAllAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, timeBucket, query.getAggregation());
124 98 return getTskvEntriesFuture(future);
125 99 }
126 100 }
127 101
128 102 @Override
  103 + protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  104 + String strKey = query.getKey();
  105 + Integer keyId = getOrSaveKeyId(strKey);
  106 + List<TimescaleTsKvEntity> timescaleTsKvEntities = tsKvRepository.findAllWithLimit(
  107 + tenantId.getId(),
  108 + entityId.getId(),
  109 + keyId,
  110 + query.getStartTs(),
  111 + query.getEndTs(),
  112 + new PageRequest(0, query.getLimit(),
  113 + new Sort(Sort.Direction.fromString(
  114 + query.getOrderBy()), "ts")));
  115 + timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey));
  116 + return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities));
  117 + }
  118 +
  119 + private ListenableFuture<List<Optional<TsKvEntry>>> findAllAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {
  120 + CompletableFuture<List<TimescaleTsKvEntity>> listCompletableFuture = switchAgregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId(), tenantId.getId());
  121 + SettableFuture<List<TimescaleTsKvEntity>> listenableFuture = SettableFuture.create();
  122 + listCompletableFuture.whenComplete((timescaleTsKvEntities, throwable) -> {
  123 + if (throwable != null) {
  124 + listenableFuture.setException(throwable);
  125 + } else {
  126 + listenableFuture.set(timescaleTsKvEntities);
  127 + }
  128 + });
  129 + return Futures.transform(listenableFuture, timescaleTsKvEntities -> {
  130 + if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
  131 + List<Optional<TsKvEntry>> result = new ArrayList<>();
  132 + timescaleTsKvEntities.forEach(entity -> {
  133 + if (entity != null && entity.isNotEmpty()) {
  134 + entity.setEntityId(entityId.getId());
  135 + entity.setTenantId(tenantId.getId());
  136 + entity.setStrKey(key);
  137 + result.add(Optional.of(DaoUtil.getData(entity)));
  138 + } else {
  139 + result.add(Optional.empty());
  140 + }
  141 + });
  142 + return result;
  143 + } else {
  144 + return Collections.emptyList();
  145 + }
  146 + });
  147 + }
  148 +
  149 + @Override
129 150 public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
130 151 return processFindAllAsync(tenantId, entityId, queries);
131 152 }
... ... @@ -154,7 +175,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
154 175 entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
155 176 entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
156 177 log.trace("Saving entity to timescale db: {}", entity);
157   - return queue.add(new EntityContainer(entity, null));
  178 + return tsQueue.add(new EntityContainer(entity, null));
158 179 }
159 180
160 181 @Override
... ... @@ -192,88 +213,6 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
192 213 return service.submit(() -> null);
193 214 }
194 215
195   - private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
196   - String strKey = query.getKey();
197   - Integer keyId = getOrSaveKeyId(strKey);
198   - List<TimescaleTsKvEntity> timescaleTsKvEntities = tsKvRepository.findAllWithLimit(
199   - tenantId.getId(),
200   - entityId.getId(),
201   - keyId,
202   - query.getStartTs(),
203   - query.getEndTs(),
204   - new PageRequest(0, query.getLimit(),
205   - new Sort(Sort.Direction.fromString(
206   - query.getOrderBy()), TS)));
207   - timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey));
208   - return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities));
209   - }
210   -
211   - private Integer getOrSaveKeyId(String strKey) {
212   - Integer keyId = tsKvDictionaryMap.get(strKey);
213   - if (keyId == null) {
214   - Optional<TsKvDictionary> tsKvDictionaryOptional;
215   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
216   - if (!tsKvDictionaryOptional.isPresent()) {
217   - tsCreationLock.lock();
218   - try {
219   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
220   - if (!tsKvDictionaryOptional.isPresent()) {
221   - TsKvDictionary tsKvDictionary = new TsKvDictionary();
222   - tsKvDictionary.setKey(strKey);
223   - try {
224   - TsKvDictionary saved = dictionaryRepository.save(tsKvDictionary);
225   - tsKvDictionaryMap.put(saved.getKey(), saved.getKeyId());
226   - keyId = saved.getKeyId();
227   - } catch (ConstraintViolationException e) {
228   - tsKvDictionaryOptional = dictionaryRepository.findById(new TsKvDictionaryCompositeKey(strKey));
229   - TsKvDictionary dictionary = tsKvDictionaryOptional.orElseThrow(() -> new RuntimeException("Failed to get TsKvDictionary entity from DB!"));
230   - tsKvDictionaryMap.put(dictionary.getKey(), dictionary.getKeyId());
231   - keyId = dictionary.getKeyId();
232   - }
233   - } else {
234   - keyId = tsKvDictionaryOptional.get().getKeyId();
235   - }
236   - } finally {
237   - tsCreationLock.unlock();
238   - }
239   - } else {
240   - keyId = tsKvDictionaryOptional.get().getKeyId();
241   - tsKvDictionaryMap.put(strKey, keyId);
242   - }
243   - }
244   - return keyId;
245   - }
246   -
247   - private ListenableFuture<List<Optional<TsKvEntry>>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {
248   - CompletableFuture<List<TimescaleTsKvEntity>> listCompletableFuture = switchAgregation(key, startTs, endTs, timeBucket, aggregation, entityId.getId(), tenantId.getId());
249   - SettableFuture<List<TimescaleTsKvEntity>> listenableFuture = SettableFuture.create();
250   - listCompletableFuture.whenComplete((timescaleTsKvEntities, throwable) -> {
251   - if (throwable != null) {
252   - listenableFuture.setException(throwable);
253   - } else {
254   - listenableFuture.set(timescaleTsKvEntities);
255   - }
256   - });
257   - return Futures.transform(listenableFuture, timescaleTsKvEntities -> {
258   - if (!CollectionUtils.isEmpty(timescaleTsKvEntities)) {
259   - List<Optional<TsKvEntry>> result = new ArrayList<>();
260   - timescaleTsKvEntities.forEach(entity -> {
261   - if (entity != null && entity.isNotEmpty()) {
262   - entity.setEntityId(entityId.getId());
263   - entity.setTenantId(tenantId.getId());
264   - entity.setStrKey(key);
265   - result.add(Optional.of(DaoUtil.getData(entity)));
266   - } else {
267   - result.add(Optional.empty());
268   - }
269   - });
270   - return result;
271   - } else {
272   - return Collections.emptyList();
273   - }
274   - });
275   - }
276   -
277 216 private CompletableFuture<List<TimescaleTsKvEntity>> switchAgregation(String key, long startTs, long endTs, long timeBucket, Aggregation aggregation, UUID entityId, UUID tenantId) {
278 217 switch (aggregation) {
279 218 case AVG:
... ... @@ -291,9 +230,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
291 230 }
292 231 }
293 232
294   - private CompletableFuture<List<TimescaleTsKvEntity>> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
  233 + private CompletableFuture<List<TimescaleTsKvEntity>> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
295 234 Integer keyId = getOrSaveKeyId(key);
296   - return aggregationRepository.findAvg(
  235 + return aggregationRepository.findCount(
297 236 tenantId,
298 237 entityId,
299 238 keyId,
... ... @@ -302,9 +241,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
302 241 endTs);
303 242 }
304 243
305   - private CompletableFuture<List<TimescaleTsKvEntity>> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
  244 + private CompletableFuture<List<TimescaleTsKvEntity>> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
306 245 Integer keyId = getOrSaveKeyId(key);
307   - return aggregationRepository.findMax(
  246 + return aggregationRepository.findSum(
308 247 tenantId,
309 248 entityId,
310 249 keyId,
... ... @@ -322,12 +261,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
322 261 timeBucket,
323 262 startTs,
324 263 endTs);
325   -
326 264 }
327 265
328   - private CompletableFuture<List<TimescaleTsKvEntity>> findSum(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
  266 + private CompletableFuture<List<TimescaleTsKvEntity>> findMax(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
329 267 Integer keyId = getOrSaveKeyId(key);
330   - return aggregationRepository.findSum(
  268 + return aggregationRepository.findMax(
331 269 tenantId,
332 270 entityId,
333 271 keyId,
... ... @@ -336,9 +274,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
336 274 endTs);
337 275 }
338 276
339   - private CompletableFuture<List<TimescaleTsKvEntity>> findCount(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
  277 + private CompletableFuture<List<TimescaleTsKvEntity>> findAvg(String key, long startTs, long endTs, long timeBucket, UUID entityId, UUID tenantId) {
340 278 Integer keyId = getOrSaveKeyId(key);
341   - return aggregationRepository.findCount(
  279 + return aggregationRepository.findAvg(
342 280 tenantId,
343 281 entityId,
344 282 keyId,
... ...
... ... @@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS tenant_ts_kv (
25 25 str_v varchar(10000000),
26 26 long_v bigint,
27 27 dbl_v double precision,
28   - CONSTRAINT ts_kv_pkey PRIMARY KEY (tenant_id, entity_id, key, ts)
  28 + CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY (tenant_id, entity_id, key, ts)
29 29 );
30 30
31 31 CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
... ... @@ -35,15 +35,14 @@ CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
35 35 );
36 36
37 37 CREATE TABLE IF NOT EXISTS ts_kv_latest (
38   - entity_type varchar(255) NOT NULL,
39   - entity_id varchar(31) NOT NULL,
40   - key varchar(255) NOT NULL,
  38 + entity_id uuid NOT NULL,
  39 + key int NOT NULL,
41 40 ts bigint NOT NULL,
42 41 bool_v boolean,
43 42 str_v varchar(10000000),
44 43 long_v bigint,
45 44 dbl_v double precision,
46   - CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_type, entity_id, key)
  45 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
47 46 );
48 47
49 48 SELECT create_hypertable('tenant_ts_kv', 'ts', chunk_time_interval => 86400000, if_not_exists => true);
\ No newline at end of file
... ...
... ... @@ -14,26 +14,32 @@
14 14 -- limitations under the License.
15 15 --
16 16
  17 +SET DATABASE SQL SYNTAX PGS TRUE;
  18 +
17 19 CREATE TABLE IF NOT EXISTS ts_kv (
18   - entity_type varchar(255) NOT NULL,
19   - entity_id varchar(31) NOT NULL,
20   - key varchar(255) NOT NULL,
  20 + entity_id uuid NOT NULL,
  21 + key int NOT NULL,
21 22 ts bigint NOT NULL,
22 23 bool_v boolean,
23 24 str_v varchar(10000000),
24 25 long_v bigint,
25 26 dbl_v double precision,
26   - CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_type, entity_id, key, ts)
  27 + CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts)
27 28 );
28 29
29 30 CREATE TABLE IF NOT EXISTS ts_kv_latest (
30   - entity_type varchar(255) NOT NULL,
31   - entity_id varchar(31) NOT NULL,
32   - key varchar(255) NOT NULL,
  31 + entity_id uuid NOT NULL,
  32 + key int NOT NULL,
33 33 ts bigint NOT NULL,
34 34 bool_v boolean,
35 35 str_v varchar(10000000),
36 36 long_v bigint,
37 37 dbl_v double precision,
38   - CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_type, entity_id, key)
  38 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
  39 +);
  40 +
  41 +CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
  42 + key varchar(255) NOT NULL,
  43 + key_id int GENERATED BY DEFAULT AS IDENTITY(start with 0 increment by 1) UNIQUE,
  44 + CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
39 45 );
... ...
... ... @@ -24,20 +24,19 @@ CREATE TABLE IF NOT EXISTS ts_kv (
24 24 dbl_v double precision
25 25 ) PARTITION BY RANGE (ts);
26 26
27   -CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
28   - key varchar(255) NOT NULL,
29   - key_id serial UNIQUE,
30   - CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
31   -);
32   -
33 27 CREATE TABLE IF NOT EXISTS ts_kv_latest (
34   - entity_type varchar(255) NOT NULL,
35   - entity_id varchar(31) NOT NULL,
36   - key varchar(255) NOT NULL,
  28 + entity_id uuid NOT NULL,
  29 + key int NOT NULL,
37 30 ts bigint NOT NULL,
38 31 bool_v boolean,
39 32 str_v varchar(10000000),
40 33 long_v bigint,
41 34 dbl_v double precision,
42   - CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_type, entity_id, key)
  35 + CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
  36 +);
  37 +
  38 +CREATE TABLE IF NOT EXISTS ts_kv_dictionary (
  39 + key varchar(255) NOT NULL,
  40 + key_id serial UNIQUE,
  41 + CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
43 42 );
\ No newline at end of file
... ...