Commit 6aa5655e6fb0d81ccacaa1e6ac1e4ac720be93a0

Authored by Volodymyr Babak
1 parent 6d7c28c8

Fixes for upgrade functionality

Showing 18 changed files with 181 additions and 76 deletions
@@ -14,6 +14,38 @@ @@ -14,6 +14,38 @@
14 -- limitations under the License. 14 -- limitations under the License.
15 -- 15 --
16 16
  17 +DROP MATERIALIZED VIEW IF EXISTS thingsboard.rule_chain_by_tenant_and_search_text;
  18 +
  19 +DROP TABLE IF EXISTS thingsboard.rule_chain;
  20 +
  21 +CREATE TABLE IF NOT EXISTS thingsboard.rule_chain (
  22 + id uuid,
  23 + tenant_id uuid,
  24 + name text,
  25 + type text,
  26 + search_text text,
  27 + first_rule_node_id uuid,
  28 + root boolean,
  29 + debug_mode boolean,
  30 + configuration text,
  31 + additional_info text,
  32 + PRIMARY KEY (id, tenant_id, type)
  33 +);
  34 +
  35 +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_and_search_text AS
  36 + SELECT *
  37 + from thingsboard.rule_chain
  38 + WHERE tenant_id IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL AND type IS NOT NULL
  39 + PRIMARY KEY ( tenant_id, search_text, id, type )
  40 + WITH CLUSTERING ORDER BY ( search_text ASC, id DESC );
  41 +
  42 +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_by_type_and_search_text AS
  43 + SELECT *
  44 + from thingsboard.rule_chain
  45 + WHERE tenant_id IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL AND type IS NOT NULL
  46 + PRIMARY KEY ( tenant_id, type, search_text, id )
  47 + WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC );
  48 +
17 CREATE TABLE IF NOT EXISTS thingsboard.edge ( 49 CREATE TABLE IF NOT EXISTS thingsboard.edge (
18 id timeuuid, 50 id timeuuid,
19 tenant_id timeuuid, 51 tenant_id timeuuid,
@@ -37,12 +69,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS @@ -37,12 +69,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS
37 PRIMARY KEY ( tenant_id, name, id, customer_id, type) 69 PRIMARY KEY ( tenant_id, name, id, customer_id, type)
38 WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); 70 WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC);
39 71
40 -CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_routing_key AS 72 +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_routing_key AS
41 SELECT * 73 SELECT *
42 from thingsboard.edge 74 from thingsboard.edge
43 WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL 75 WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL
44 - PRIMARY KEY ( tenant_id, routing_key, id, customer_id, type)  
45 - WITH CLUSTERING ORDER BY ( routing_key ASC, id DESC, customer_id DESC); 76 + PRIMARY KEY ( routing_key, tenant_id, id, customer_id, type)
  77 + WITH CLUSTERING ORDER BY ( tenant_id DESC, id DESC, customer_id DESC);
46 78
47 CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS 79 CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS
48 SELECT * 80 SELECT *
@@ -152,6 +152,15 @@ public class ThingsboardInstallService { @@ -152,6 +152,15 @@ public class ThingsboardInstallService {
152 databaseTsUpgradeService.upgradeDatabase("2.5.4"); 152 databaseTsUpgradeService.upgradeDatabase("2.5.4");
153 } 153 }
154 154
  155 + case "2.5.5":
  156 + log.info("Upgrading ThingsBoard from version 2.5.5 to 2.6.0 ...");
  157 + if (databaseTsUpgradeService != null) {
  158 + databaseTsUpgradeService.upgradeDatabase("2.5.5");
  159 + }
  160 + databaseEntitiesUpgradeService.upgradeDatabase("2.5.5");
  161 +
  162 + dataUpdateService.updateData("2.5.5");
  163 +
155 log.info("Updating system data..."); 164 log.info("Updating system data...");
156 165
157 systemDataLoaderService.deleteSystemWidgetBundle("charts"); 166 systemDataLoaderService.deleteSystemWidgetBundle("charts");
@@ -150,6 +150,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @@ -150,6 +150,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
150 } 150 }
151 } catch (Exception e) { 151 } catch (Exception e) {
152 log.warn("Failed to process messages handling!", e); 152 log.warn("Failed to process messages handling!", e);
  153 + try {
  154 + Thread.sleep(1000);
  155 + } catch (InterruptedException ignore) {}
153 } 156 }
154 } 157 }
155 }); 158 });
@@ -252,52 +252,54 @@ public final class EdgeGrpcSession implements Closeable { @@ -252,52 +252,54 @@ public final class EdgeGrpcSession implements Closeable {
252 } 252 }
253 253
254 void processHandleMessages() throws ExecutionException, InterruptedException { 254 void processHandleMessages() throws ExecutionException, InterruptedException {
255 - Long queueStartTs = getQueueStartTs().get();  
256 - TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true);  
257 - TimePageData<EdgeEvent> pageData;  
258 - UUID ifOffset = null;  
259 - boolean success = true;  
260 - do {  
261 - pageData = ctx.getEdgeNotificationService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink);  
262 - if (isConnected() && !pageData.getData().isEmpty()) {  
263 - log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());  
264 - List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());  
265 - log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size());  
266 -  
267 - latch = new CountDownLatch(downlinkMsgsPack.size());  
268 - for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {  
269 - sendResponseMsg(ResponseMsg.newBuilder()  
270 - .setDownlinkMsg(downlinkMsg)  
271 - .build());  
272 - } 255 + if (isConnected()) {
  256 + Long queueStartTs = getQueueStartTs().get();
  257 + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true);
  258 + TimePageData<EdgeEvent> pageData;
  259 + UUID ifOffset = null;
  260 + boolean success = true;
  261 + do {
  262 + pageData = ctx.getEdgeNotificationService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink);
  263 + if (isConnected() && !pageData.getData().isEmpty()) {
  264 + log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
  265 + List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
  266 + log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size());
  267 +
  268 + latch = new CountDownLatch(downlinkMsgsPack.size());
  269 + for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {
  270 + sendResponseMsg(ResponseMsg.newBuilder()
  271 + .setDownlinkMsg(downlinkMsg)
  272 + .build());
  273 + }
273 274
274 - ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); 275 + ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
275 276
276 - success = latch.await(10, TimeUnit.SECONDS);  
277 - if (!success) {  
278 - log.warn("Failed to deliver the batch: {}", downlinkMsgsPack);  
279 - }  
280 - }  
281 - if (isConnected() && (!success || pageData.hasNext())) {  
282 - try {  
283 - Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());  
284 - } catch (InterruptedException e) {  
285 - log.error("Error during sleep between batches", e); 277 + success = latch.await(10, TimeUnit.SECONDS);
  278 + if (!success) {
  279 + log.warn("Failed to deliver the batch: {}", downlinkMsgsPack);
  280 + }
286 } 281 }
287 - if (success) {  
288 - pageLink = pageData.getNextPageLink(); 282 + if (isConnected() && (!success || pageData.hasNext())) {
  283 + try {
  284 + Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
  285 + } catch (InterruptedException e) {
  286 + log.error("Error during sleep between batches", e);
  287 + }
  288 + if (success) {
  289 + pageLink = pageData.getNextPageLink();
  290 + }
289 } 291 }
290 - }  
291 - } while (isConnected() && (!success || pageData.hasNext())); 292 + } while (isConnected() && (!success || pageData.hasNext()));
292 293
293 - if (ifOffset != null) {  
294 - Long newStartTs = UUIDs.unixTimestamp(ifOffset);  
295 - updateQueueStartTs(newStartTs);  
296 - }  
297 - try {  
298 - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());  
299 - } catch (InterruptedException e) {  
300 - log.error("Error during sleep", e); 294 + if (ifOffset != null) {
  295 + Long newStartTs = UUIDs.unixTimestamp(ifOffset);
  296 + updateQueueStartTs(newStartTs);
  297 + }
  298 + try {
  299 + Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
  300 + } catch (InterruptedException e) {
  301 + log.error("Error during sleep", e);
  302 + }
301 } 303 }
302 } 304 }
303 305
@@ -50,7 +50,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { @@ -50,7 +50,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService {
50 @Autowired 50 @Autowired
51 protected InstallScripts installScripts; 51 protected InstallScripts installScripts;
52 52
53 - protected abstract void loadSql(Connection conn, String fileName); 53 + protected abstract void loadSql(Connection conn, String version, String fileName);
54 54
55 protected void loadFunctions(Path sqlFile, Connection conn) throws Exception { 55 protected void loadFunctions(Path sqlFile, Connection conn) throws Exception {
56 String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8); 56 String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8);
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
22 import org.springframework.context.annotation.Profile; 22 import org.springframework.context.annotation.Profile;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.server.common.data.rule.RuleChainType;
24 import org.thingsboard.server.dao.dashboard.DashboardService; 25 import org.thingsboard.server.dao.dashboard.DashboardService;
25 import org.thingsboard.server.dao.util.NoSqlDao; 26 import org.thingsboard.server.dao.util.NoSqlDao;
26 import org.thingsboard.server.service.install.cql.CassandraDbHelper; 27 import org.thingsboard.server.service.install.cql.CassandraDbHelper;
@@ -44,6 +45,7 @@ import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEWS @@ -44,6 +45,7 @@ import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEWS
44 import static org.thingsboard.server.service.install.DatabaseHelper.ID; 45 import static org.thingsboard.server.service.install.DatabaseHelper.ID;
45 import static org.thingsboard.server.service.install.DatabaseHelper.KEYS; 46 import static org.thingsboard.server.service.install.DatabaseHelper.KEYS;
46 import static org.thingsboard.server.service.install.DatabaseHelper.NAME; 47 import static org.thingsboard.server.service.install.DatabaseHelper.NAME;
  48 +import static org.thingsboard.server.service.install.DatabaseHelper.RULE_CHAIN;
47 import static org.thingsboard.server.service.install.DatabaseHelper.SEARCH_TEXT; 49 import static org.thingsboard.server.service.install.DatabaseHelper.SEARCH_TEXT;
48 import static org.thingsboard.server.service.install.DatabaseHelper.START_TS; 50 import static org.thingsboard.server.service.install.DatabaseHelper.START_TS;
49 import static org.thingsboard.server.service.install.DatabaseHelper.TENANT_ID; 51 import static org.thingsboard.server.service.install.DatabaseHelper.TENANT_ID;
@@ -306,17 +308,39 @@ public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUp @@ -306,17 +308,39 @@ public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUp
306 } 308 }
307 log.info("Schema updated."); 309 log.info("Schema updated.");
308 break; 310 break;
309 - case "2.5.0": 311 + case "2.5.5":
  312 +
  313 + log.info("Upgrading Cassandra DataBase from version {} to 2.6.0 ...", fromVersion);
  314 +
  315 + // Dump rule chains
  316 +
  317 + cluster.getSession();
  318 +
  319 + ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName());
  320 +
  321 + log.info("Dumping rule chains ...");
  322 + Path ruleChainsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), RULE_CHAIN,
  323 + new String[]{ID, TENANT_ID, NAME, SEARCH_TEXT, "first_rule_node_id", "root", "debug_mode", CONFIGURATION, ADDITIONAL_INFO, TYPE},
  324 + new String[]{"", "", "", "", "", "", "", "", "", RuleChainType.CORE.name()},
  325 + "tb-rule-chains");
  326 + log.info("Rule chains dumped.");
  327 +
310 log.info("Updating schema ..."); 328 log.info("Updating schema ...");
311 schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_CQL); 329 schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_CQL);
312 loadCql(schemaUpdateFile); 330 loadCql(schemaUpdateFile);
313 -  
314 - try {  
315 - cluster.getSession().execute("alter table rule_chain add type text");  
316 - Thread.sleep(2500);  
317 - } catch (InvalidQueryException e) {}  
318 log.info("Schema updated."); 331 log.info("Schema updated.");
  332 +
  333 + // Restore rule chains
  334 +
  335 + log.info("Restoring rule chains ...");
  336 + if (ruleChainsDump != null) {
  337 + CassandraDbHelper.loadCf(ks, cluster.getSession(), RULE_CHAIN,
  338 + new String[]{ID, TENANT_ID, NAME, SEARCH_TEXT, "first_rule_node_id", "root", "debug_mode", CONFIGURATION, ADDITIONAL_INFO, TYPE}, ruleChainsDump);
  339 + Files.deleteIfExists(ruleChainsDump);
  340 + }
  341 + log.info("Rule chains restored.");
319 break; 342 break;
  343 +
320 default: 344 default:
321 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); 345 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion);
322 } 346 }
@@ -50,6 +50,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase @@ -50,6 +50,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase
50 break; 50 break;
51 case "2.5.0": 51 case "2.5.0":
52 case "2.5.4": 52 case "2.5.4":
  53 + case "2.5.5":
53 break; 54 break;
54 default: 55 default:
55 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); 56 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion);
@@ -57,6 +57,7 @@ public class DatabaseHelper { @@ -57,6 +57,7 @@ public class DatabaseHelper {
57 public static final String DASHBOARD = "dashboard"; 57 public static final String DASHBOARD = "dashboard";
58 public static final String ENTITY_VIEWS = "entity_views"; 58 public static final String ENTITY_VIEWS = "entity_views";
59 public static final String ENTITY_VIEW = "entity_view"; 59 public static final String ENTITY_VIEW = "entity_view";
  60 + public static final String RULE_CHAIN = "rule_chain";
60 public static final String ID = "id"; 61 public static final String ID = "id";
61 public static final String TITLE = "title"; 62 public static final String TITLE = "title";
62 public static final String TYPE = "type"; 63 public static final String TYPE = "type";
@@ -115,6 +115,11 @@ public class InstallScripts { @@ -115,6 +115,11 @@ public class InstallScripts {
115 } 115 }
116 } 116 }
117 117
  118 + public void createDefaultEdgeRuleChains(TenantId tenantId) {
  119 + Path tenantChainsDir = getTenantRuleChainsDir();
  120 + loadRuleChainFromFile(tenantId, tenantChainsDir.resolve("edge_root_rule_chain.json"));
  121 + }
  122 +
118 public void loadSystemWidgets() throws Exception { 123 public void loadSystemWidgets() throws Exception {
119 Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR); 124 Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR);
120 try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) { 125 try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) {
@@ -94,7 +94,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -94,7 +94,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
94 log.info("PostgreSQL version is valid!"); 94 log.info("PostgreSQL version is valid!");
95 if (isOldSchema(conn, 2004003)) { 95 if (isOldSchema(conn, 2004003)) {
96 log.info("Load upgrade functions ..."); 96 log.info("Load upgrade functions ...");
97 - loadSql(conn, LOAD_FUNCTIONS_SQL); 97 + loadSql(conn, "2.4.3", LOAD_FUNCTIONS_SQL);
98 log.info("Updating timeseries schema ..."); 98 log.info("Updating timeseries schema ...");
99 executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); 99 executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE);
100 if (!partitionType.equals("INDEFINITE")) { 100 if (!partitionType.equals("INDEFINITE")) {
@@ -179,9 +179,9 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -179,9 +179,9 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
179 } 179 }
180 180
181 log.info("Load TTL functions ..."); 181 log.info("Load TTL functions ...");
182 - loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); 182 + loadSql(conn, "2.4.3", LOAD_TTL_FUNCTIONS_SQL);
183 log.info("Load Drop Partitions functions ..."); 183 log.info("Load Drop Partitions functions ...");
184 - loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); 184 + loadSql(conn, "2.4.3", LOAD_DROP_PARTITIONS_FUNCTIONS_SQL);
185 185
186 executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); 186 executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000");
187 187
@@ -198,7 +198,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -198,7 +198,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
198 case "2.5.4": 198 case "2.5.4":
199 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { 199 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
200 log.info("Load Drop Partitions functions ..."); 200 log.info("Load Drop Partitions functions ...");
201 - loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); 201 + loadSql(conn, "2.4.3", LOAD_DROP_PARTITIONS_FUNCTIONS_SQL);
  202 + }
  203 + break;
  204 + case "2.5.5":
  205 + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
  206 + log.info("Load TTL functions ...");
  207 + loadSql(conn, "2.6.0", LOAD_TTL_FUNCTIONS_SQL);
202 } 208 }
203 break; 209 break;
204 default: 210 default:
@@ -236,8 +242,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe @@ -236,8 +242,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
236 } 242 }
237 243
238 @Override 244 @Override
239 - protected void loadSql(Connection conn, String fileName) {  
240 - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); 245 + protected void loadSql(Connection conn, String version, String fileName) {
  246 + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, fileName);
241 try { 247 try {
242 loadFunctions(schemaUpdateFile, conn); 248 loadFunctions(schemaUpdateFile, conn);
243 log.info("Functions successfully loaded!"); 249 log.info("Functions successfully loaded!");
@@ -233,7 +233,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService @@ -233,7 +233,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
233 log.info("Schema updated."); 233 log.info("Schema updated.");
234 } 234 }
235 break; 235 break;
236 - case "2.5.0": 236 + case "2.5.5":
237 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { 237 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
238 log.info("Updating schema ..."); 238 log.info("Updating schema ...");
239 schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_SQL); 239 schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_SQL);
@@ -89,7 +89,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -89,7 +89,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
89 log.info("PostgreSQL version is valid!"); 89 log.info("PostgreSQL version is valid!");
90 if (isOldSchema(conn, 2004003)) { 90 if (isOldSchema(conn, 2004003)) {
91 log.info("Load upgrade functions ..."); 91 log.info("Load upgrade functions ...");
92 - loadSql(conn, LOAD_FUNCTIONS_SQL); 92 + loadSql(conn, "2.4.3", LOAD_FUNCTIONS_SQL);
93 log.info("Updating timescale schema ..."); 93 log.info("Updating timescale schema ...");
94 executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE); 94 executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE);
95 executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); 95 executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE);
@@ -165,7 +165,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -165,7 +165,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
165 } 165 }
166 166
167 log.info("Load TTL functions ..."); 167 log.info("Load TTL functions ...");
168 - loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); 168 + loadSql(conn, "2.4.3", LOAD_TTL_FUNCTIONS_SQL);
169 169
170 executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); 170 executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000");
171 log.info("schema timescale updated!"); 171 log.info("schema timescale updated!");
@@ -179,6 +179,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -179,6 +179,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
179 break; 179 break;
180 case "2.5.4": 180 case "2.5.4":
181 break; 181 break;
  182 + case "2.5.5":
  183 + break;
182 default: 184 default:
183 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); 185 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
184 } 186 }
@@ -200,8 +202,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr @@ -200,8 +202,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
200 } 202 }
201 203
202 @Override 204 @Override
203 - protected void loadSql(Connection conn, String fileName) {  
204 - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); 205 + protected void loadSql(Connection conn, String version, String fileName) {
  206 + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, fileName);
205 try { 207 try {
206 loadFunctions(schemaUpdateFile, conn); 208 loadFunctions(schemaUpdateFile, conn);
207 log.info("Functions successfully loaded!"); 209 log.info("Functions successfully loaded!");
@@ -157,6 +157,8 @@ public class CassandraDbHelper { @@ -157,6 +157,8 @@ public class CassandraDbHelper {
157 str = new Float(row.getFloat(index)).toString(); 157 str = new Float(row.getFloat(index)).toString();
158 } else if (type == DataType.timestamp()) { 158 } else if (type == DataType.timestamp()) {
159 str = ""+row.getTimestamp(index).getTime(); 159 str = ""+row.getTimestamp(index).getTime();
  160 + } else if (type == DataType.cboolean()) {
  161 + str = ""+ row.getBool(index);
160 } else { 162 } else {
161 str = row.getString(index); 163 str = row.getString(index);
162 } 164 }
@@ -205,6 +207,8 @@ public class CassandraDbHelper { @@ -205,6 +207,8 @@ public class CassandraDbHelper {
205 boundStatement.setFloat(column, Float.valueOf(value)); 207 boundStatement.setFloat(column, Float.valueOf(value));
206 } else if (type == DataType.timestamp()) { 208 } else if (type == DataType.timestamp()) {
207 boundStatement.setTimestamp(column, new Date(Long.valueOf(value))); 209 boundStatement.setTimestamp(column, new Date(Long.valueOf(value)));
  210 + } else if (type == DataType.cboolean()) {
  211 + boundStatement.setBool(column, Boolean.valueOf(value));
208 } else { 212 } else {
209 boundStatement.setString(column, value); 213 boundStatement.setString(column, value);
210 } 214 }
@@ -19,9 +19,7 @@ import lombok.extern.slf4j.Slf4j; @@ -19,9 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.context.annotation.Profile; 20 import org.springframework.context.annotation.Profile;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 -import org.thingsboard.server.common.data.SearchTextBased;  
23 import org.thingsboard.server.common.data.Tenant; 22 import org.thingsboard.server.common.data.Tenant;
24 -import org.thingsboard.server.common.data.id.UUIDBased;  
25 import org.thingsboard.server.common.data.page.TextPageData; 23 import org.thingsboard.server.common.data.page.TextPageData;
26 import org.thingsboard.server.common.data.page.TextPageLink; 24 import org.thingsboard.server.common.data.page.TextPageLink;
27 import org.thingsboard.server.common.data.rule.RuleChain; 25 import org.thingsboard.server.common.data.rule.RuleChain;
@@ -50,6 +48,10 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -50,6 +48,10 @@ public class DefaultDataUpdateService implements DataUpdateService {
50 log.info("Updating data from version 1.4.0 to 2.0.0 ..."); 48 log.info("Updating data from version 1.4.0 to 2.0.0 ...");
51 tenantsDefaultRuleChainUpdater.updateEntities(null); 49 tenantsDefaultRuleChainUpdater.updateEntities(null);
52 break; 50 break;
  51 + case "2.5.5":
  52 + log.info("Updating data from version 2.5.5 to 2.6.0 ...");
  53 + tenantsDefaultEdgeRuleChainUpdater.updateEntities(null);
  54 + break;
53 default: 55 default:
54 throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); 56 throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
55 } 57 }
@@ -76,4 +78,24 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -76,4 +78,24 @@ public class DefaultDataUpdateService implements DataUpdateService {
76 } 78 }
77 }; 79 };
78 80
  81 + private PaginatedUpdater<String, Tenant> tenantsDefaultEdgeRuleChainUpdater =
  82 + new PaginatedUpdater<String, Tenant>() {
  83 +
  84 + @Override
  85 + protected TextPageData<Tenant> findEntities(String region, TextPageLink pageLink) {
  86 + return tenantService.findTenants(pageLink);
  87 + }
  88 +
  89 + @Override
  90 + protected void updateEntity(Tenant tenant) {
  91 + try {
  92 + RuleChain defaultEdgeRuleChain = ruleChainService.getDefaultRootEdgeRuleChain(tenant.getId());
  93 + if (defaultEdgeRuleChain == null) {
  94 + installScripts.createDefaultEdgeRuleChains(tenant.getId());
  95 + }
  96 + } catch (Exception e) {
  97 + log.error("Unable to update Tenant", e);
  98 + }
  99 + }
  100 + };
79 } 101 }
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors; @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors;
27 import lombok.extern.slf4j.Slf4j; 27 import lombok.extern.slf4j.Slf4j;
28 import org.springframework.beans.factory.annotation.Autowired; 28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.stereotype.Component; 29 import org.springframework.stereotype.Component;
30 -import org.thingsboard.server.common.data.Device;  
31 import org.thingsboard.server.common.data.EntitySubtype; 30 import org.thingsboard.server.common.data.EntitySubtype;
32 import org.thingsboard.server.common.data.EntityType; 31 import org.thingsboard.server.common.data.EntityType;
33 import org.thingsboard.server.common.data.edge.Edge; 32 import org.thingsboard.server.common.data.edge.Edge;
@@ -57,8 +56,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.in; @@ -57,8 +56,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.in;
57 import static com.datastax.driver.core.querybuilder.QueryBuilder.select; 56 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
58 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 57 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
59 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 58 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
  59 +import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_ROUTING_KEY_VIEW_NAME;
60 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_NAME_VIEW_NAME; 60 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_NAME_VIEW_NAME;
61 -import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME;  
62 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 61 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
63 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 62 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
64 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_COLUMN_FAMILY_NAME; 63 import static org.thingsboard.server.dao.model.ModelConstants.EDGE_COLUMN_FAMILY_NAME;
@@ -203,9 +202,8 @@ public class CassandraEdgeDao extends CassandraAbstractSearchTextDao<EdgeEntity, @@ -203,9 +202,8 @@ public class CassandraEdgeDao extends CassandraAbstractSearchTextDao<EdgeEntity,
203 202
204 @Override 203 @Override
205 public Optional<Edge> findByRoutingKey(UUID tenantId, String routingKey) { 204 public Optional<Edge> findByRoutingKey(UUID tenantId, String routingKey) {
206 - Select select = select().from(EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME); 205 + Select select = select().from(EDGE_BY_ROUTING_KEY_VIEW_NAME);
207 Select.Where query = select.where(); 206 Select.Where query = select.where();
208 - query.and(eq(EDGE_TENANT_ID_PROPERTY, tenantId));  
209 query.and(eq(EDGE_ROUTING_KEY_PROPERTY, routingKey)); 207 query.and(eq(EDGE_ROUTING_KEY_PROPERTY, routingKey));
210 return Optional.ofNullable(DaoUtil.getData(findOneByStatement(new TenantId(tenantId), query))); 208 return Optional.ofNullable(DaoUtil.getData(findOneByStatement(new TenantId(tenantId), query)));
211 } 209 }
@@ -373,7 +373,7 @@ public class ModelConstants { @@ -373,7 +373,7 @@ public class ModelConstants {
373 public static final String EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_and_search_text"; 373 public static final String EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_and_search_text";
374 public static final String EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_by_type_and_search_text"; 374 public static final String EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_by_type_and_search_text";
375 public static final String EDGE_BY_TENANT_AND_NAME_VIEW_NAME = "edge_by_tenant_and_name"; 375 public static final String EDGE_BY_TENANT_AND_NAME_VIEW_NAME = "edge_by_tenant_and_name";
376 - public static final String EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME = "edge_by_tenant_and_routing_key"; 376 + public static final String EDGE_BY_ROUTING_KEY_VIEW_NAME = "edge_by_routing_key";
377 377
378 public static final String EDGE_ROUTING_KEY_PROPERTY = "routing_key"; 378 public static final String EDGE_ROUTING_KEY_PROPERTY = "routing_key";
379 public static final String EDGE_SECRET_PROPERTY = "secret"; 379 public static final String EDGE_SECRET_PROPERTY = "secret";
@@ -22,9 +22,7 @@ import lombok.extern.slf4j.Slf4j; @@ -22,9 +22,7 @@ import lombok.extern.slf4j.Slf4j;
22 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.stereotype.Component; 23 import org.springframework.stereotype.Component;
24 import org.thingsboard.server.common.data.EntityType; 24 import org.thingsboard.server.common.data.EntityType;
25 -import org.thingsboard.server.common.data.edge.Edge;  
26 import org.thingsboard.server.common.data.id.EdgeId; 25 import org.thingsboard.server.common.data.id.EdgeId;
27 -import org.thingsboard.server.common.data.id.RuleChainId;  
28 import org.thingsboard.server.common.data.id.TenantId; 26 import org.thingsboard.server.common.data.id.TenantId;
29 import org.thingsboard.server.common.data.page.TextPageLink; 27 import org.thingsboard.server.common.data.page.TextPageLink;
30 import org.thingsboard.server.common.data.page.TimePageLink; 28 import org.thingsboard.server.common.data.page.TimePageLink;
@@ -45,8 +43,6 @@ import java.util.List; @@ -45,8 +43,6 @@ import java.util.List;
45 import java.util.UUID; 43 import java.util.UUID;
46 44
47 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; 45 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
48 -import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_TENANT_ID_PROPERTY;  
49 -import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_TYPE_PROPERTY;  
50 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 46 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
51 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; 47 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME;
52 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_COLUMN_FAMILY_NAME; 48 import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_COLUMN_FAMILY_NAME;
@@ -749,12 +749,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS @@ -749,12 +749,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS
749 PRIMARY KEY ( tenant_id, name, id, customer_id, type) 749 PRIMARY KEY ( tenant_id, name, id, customer_id, type)
750 WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); 750 WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC);
751 751
752 -CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_routing_key AS 752 +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_routing_key AS
753 SELECT * 753 SELECT *
754 from thingsboard.edge 754 from thingsboard.edge
755 WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL 755 WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL
756 - PRIMARY KEY ( tenant_id, routing_key, id, customer_id, type)  
757 - WITH CLUSTERING ORDER BY ( routing_key ASC, id DESC, customer_id DESC); 756 + PRIMARY KEY ( routing_key, tenant_id, id, customer_id, type)
  757 + WITH CLUSTERING ORDER BY ( tenant_id DESC, id DESC, customer_id DESC);
758 758
759 CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS 759 CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS
760 SELECT * 760 SELECT *