Commit 92d360e14b61115f5a62ef7a43de2502fd87640c

Authored by Andrew Volostnykh
Committed by Andrew Shvayka
1 parent bd42cfc8

Refactoring of migration tool for new Thingsboard DB structure

@@ -56,8 +56,9 @@ @@ -56,8 +56,9 @@
56 <artifactId>cassandra-all</artifactId> 56 <artifactId>cassandra-all</artifactId>
57 </dependency> 57 </dependency>
58 <dependency> 58 <dependency>
59 - <groupId>com.datastax.oss</groupId>  
60 - <artifactId>java-driver-core</artifactId> 59 + <groupId>com.datastax.cassandra</groupId>
  60 + <artifactId>cassandra-driver-core</artifactId>
  61 + <version>3.10.1</version>
61 </dependency> 62 </dependency>
62 <dependency> 63 <dependency>
63 <groupId>commons-io</groupId> 64 <groupId>commons-io</groupId>
  1 +package org.thingsboard.client.tools.migrator;
  2 +
  3 +import org.apache.commons.io.FileUtils;
  4 +import org.apache.commons.io.LineIterator;
  5 +import org.apache.commons.lang3.StringUtils;
  6 +
  7 +import java.io.File;
  8 +import java.io.IOException;
  9 +import java.util.HashMap;
  10 +import java.util.Map;
  11 +
  12 +public class DictionaryParser {
  13 + private Map<String, String> dictionaryParsed = new HashMap<>();
  14 +
  15 + public DictionaryParser(File sourceFile) throws IOException {
  16 + parseDictionaryDump(FileUtils.lineIterator(sourceFile));
  17 + }
  18 +
  19 + public String getKeyByKeyId(String keyId) {
  20 + return dictionaryParsed.get(keyId);
  21 + }
  22 +
  23 + private boolean isBlockFinished(String line) {
  24 + return StringUtils.isBlank(line) || line.equals("\\.");
  25 + }
  26 +
  27 + private boolean isBlockStarted(String line) {
  28 + return line.startsWith("COPY public.ts_kv_dictionary (");
  29 + }
  30 +
  31 + private void parseDictionaryDump(LineIterator iterator) {
  32 + String tempLine;
  33 + while(iterator.hasNext()) {
  34 + tempLine = iterator.nextLine();
  35 +
  36 + if(isBlockStarted(tempLine)) {
  37 + processBlock(iterator);
  38 + }
  39 + }
  40 + }
  41 +
  42 + private void processBlock(LineIterator lineIterator) {
  43 + String tempLine;
  44 + String[] lineSplited;
  45 + while(lineIterator.hasNext()) {
  46 + tempLine = lineIterator.nextLine();
  47 + if(isBlockFinished(tempLine)) {
  48 + return;
  49 + }
  50 +
  51 + lineSplited = tempLine.split("\t");
  52 + dictionaryParsed.put(lineSplited[1], lineSplited[0]);
  53 + }
  54 + }
  55 +}
@@ -30,17 +30,25 @@ public class MigratorTool { @@ -30,17 +30,25 @@ public class MigratorTool {
30 public static void main(String[] args) { 30 public static void main(String[] args) {
31 CommandLine cmd = parseArgs(args); 31 CommandLine cmd = parseArgs(args);
32 32
33 -  
34 try { 33 try {
35 - File latestSource = new File(cmd.getOptionValue("latestTelemetryFrom"));  
36 - File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));  
37 - File tsSource = new File(cmd.getOptionValue("telemetryFrom"));  
38 - File tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));  
39 - File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));  
40 boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable")); 34 boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable"));
41 -  
42 - PgCaLatestMigrator.migrateLatest(latestSource, latestSaveDir, castEnable);  
43 - PostgresToCassandraTelemetryMigrator.migrateTs(tsSource, tsSaveDir, partitionsSaveDir, castEnable); 35 + File allTelemetrySource = new File(cmd.getOptionValue("telemetryFrom"));
  36 +
  37 + RelatedEntitiesParser allEntityIdsAndTypes =
  38 + new RelatedEntitiesParser(new File(cmd.getOptionValue("relatedEntities")));
  39 + DictionaryParser dictionaryParser = new DictionaryParser(allTelemetrySource);
  40 +
  41 + if(cmd.getOptionValue("latestTelemetryOut") != null) {
  42 + File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));
  43 + PgCaLatestMigrator.migrateLatest(allTelemetrySource, latestSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable);
  44 + }
  45 + if(cmd.getOptionValue("telemetryOut") != null) {
  46 + File tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));
  47 + File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));
  48 + PostgresToCassandraTelemetryMigrator.migrateTs(
  49 + allTelemetrySource, tsSaveDir, partitionsSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable
  50 + );
  51 + }
44 52
45 } catch (Throwable th) { 53 } catch (Throwable th) {
46 th.printStackTrace(); 54 th.printStackTrace();
@@ -52,30 +60,30 @@ public class MigratorTool { @@ -52,30 +60,30 @@ public class MigratorTool {
52 private static CommandLine parseArgs(String[] args) { 60 private static CommandLine parseArgs(String[] args) {
53 Options options = new Options(); 61 Options options = new Options();
54 62
55 - Option latestTsOpt = new Option("latestFrom", "latestTelemetryFrom", true, "latest telemetry source file path");  
56 - latestTsOpt.setRequired(true);  
57 - options.addOption(latestTsOpt); 63 + Option telemetryAllFrom = new Option("telemetryFrom", "telemetryFrom", true, "telemetry source file");
  64 + telemetryAllFrom.setRequired(true);
  65 + options.addOption(telemetryAllFrom);
58 66
59 Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir"); 67 Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir");
60 - latestTsOutOpt.setRequired(true); 68 + latestTsOutOpt.setRequired(false);
61 options.addOption(latestTsOutOpt); 69 options.addOption(latestTsOutOpt);
62 70
63 - Option tsOpt = new Option("tsFrom", "telemetryFrom", true, "telemetry source file path");  
64 - tsOpt.setRequired(true);  
65 - options.addOption(tsOpt);  
66 -  
67 Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir"); 71 Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir");
68 - tsOutOpt.setRequired(true); 72 + tsOutOpt.setRequired(false);
69 options.addOption(tsOutOpt); 73 options.addOption(tsOutOpt);
70 74
71 Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir"); 75 Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir");
72 - partitionOutOpt.setRequired(true); 76 + partitionOutOpt.setRequired(false);
73 options.addOption(partitionOutOpt); 77 options.addOption(partitionOutOpt);
74 78
75 Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible"); 79 Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible");
76 castOpt.setRequired(true); 80 castOpt.setRequired(true);
77 options.addOption(castOpt); 81 options.addOption(castOpt);
78 82
  83 + Option relatedOpt = new Option("relatedEntities", "relatedEntities", true, "related entities source file path");
  84 + relatedOpt.setRequired(true);
  85 + options.addOption(relatedOpt);
  86 +
79 HelpFormatter formatter = new HelpFormatter(); 87 HelpFormatter formatter = new HelpFormatter();
80 CommandLineParser parser = new BasicParser(); 88 CommandLineParser parser = new BasicParser();
81 89
@@ -43,14 +43,21 @@ public class PgCaLatestMigrator { @@ -43,14 +43,21 @@ public class PgCaLatestMigrator {
43 private static long castedOk = 0; 43 private static long castedOk = 0;
44 44
45 private static long currentWriterCount = 1; 45 private static long currentWriterCount = 1;
46 - private static CQLSSTableWriter currentTsWriter = null;  
47 -  
48 - public static void migrateLatest(File sourceFile, File outDir, boolean castStringsIfPossible) throws IOException { 46 + private static RelatedEntitiesParser allIdsAndTypes;
  47 + private static DictionaryParser keyPairs;
  48 +
  49 + public static void migrateLatest(File sourceFile,
  50 + File outDir,
  51 + RelatedEntitiesParser allEntityIdsAndTypes,
  52 + DictionaryParser dictionaryParser,
  53 + boolean castStringsIfPossible) throws IOException {
49 long startTs = System.currentTimeMillis(); 54 long startTs = System.currentTimeMillis();
50 long stepLineTs = System.currentTimeMillis(); 55 long stepLineTs = System.currentTimeMillis();
51 long stepOkLineTs = System.currentTimeMillis(); 56 long stepOkLineTs = System.currentTimeMillis();
52 LineIterator iterator = FileUtils.lineIterator(sourceFile); 57 LineIterator iterator = FileUtils.lineIterator(sourceFile);
53 - currentTsWriter = WriterBuilder.getTsWriter(outDir); 58 + CQLSSTableWriter currentTsWriter = WriterBuilder.getLatestWriter(outDir);
  59 + allIdsAndTypes = allEntityIdsAndTypes;
  60 + keyPairs = dictionaryParser;
54 61
55 boolean isBlockStarted = false; 62 boolean isBlockStarted = false;
56 boolean isBlockFinished = false; 63 boolean isBlockFinished = false;
@@ -107,7 +114,7 @@ public class PgCaLatestMigrator { @@ -107,7 +114,7 @@ public class PgCaLatestMigrator {
107 } 114 }
108 115
109 if (linesMigrated++ % LOG_BATCH == 0) { 116 if (linesMigrated++ % LOG_BATCH == 0) {
110 - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs)); 117 + System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " ms.");
111 stepOkLineTs = System.currentTimeMillis(); 118 stepOkLineTs = System.currentTimeMillis();
112 } 119 }
113 } catch (Exception ex) { 120 } catch (Exception ex) {
@@ -119,7 +126,7 @@ public class PgCaLatestMigrator { @@ -119,7 +126,7 @@ public class PgCaLatestMigrator {
119 126
120 long endTs = System.currentTimeMillis(); 127 long endTs = System.currentTimeMillis();
121 System.out.println(); 128 System.out.println();
122 - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs)); 129 + System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs) + " ts");
123 130
124 currentTsWriter.close(); 131 currentTsWriter.close();
125 System.out.println(); 132 System.out.println();
@@ -146,34 +153,31 @@ public class PgCaLatestMigrator { @@ -146,34 +153,31 @@ public class PgCaLatestMigrator {
146 153
147 private static List<Object> toValues(List<String> raw) { 154 private static List<Object> toValues(List<String> raw) {
148 //expected Table structure: 155 //expected Table structure:
149 -// COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;  
150 - 156 + //COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
151 157
152 List<Object> result = new ArrayList<>(); 158 List<Object> result = new ArrayList<>();
153 - result.add(raw.get(0));  
154 - result.add(fromString(raw.get(1)));  
155 - result.add(raw.get(2)); 159 + result.add(allIdsAndTypes.getEntityType(raw.get(0)));
  160 + result.add(UUID.fromString(raw.get(0)));
  161 + result.add(keyPairs.getKeyByKeyId(raw.get(1)));
156 162
157 - long ts = Long.parseLong(raw.get(3));  
158 - result.add(ts); 163 + long ts = Long.parseLong(raw.get(2));
  164 + result.add(3, ts);
159 165
160 - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);  
161 - result.add(raw.get(5).equals("\\N") ? null : raw.get(5));  
162 - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));  
163 - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7)));  
164 - return result;  
165 - } 166 + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);
  167 + result.add(raw.get(4).equals("\\N") ? null : raw.get(4));
  168 + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));
  169 + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));
  170 + result.add(raw.get(7).equals("\\N") ? null : raw.get(7));
166 171
167 - public static UUID fromString(String src) {  
168 - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"  
169 - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19)); 172 + return result;
170 } 173 }
171 174
172 private static boolean isBlockStarted(String line) { 175 private static boolean isBlockStarted(String line) {
173 - return line.startsWith("COPY public.ts_kv_latest"); 176 + return line.startsWith("COPY public.ts_kv_latest (");
174 } 177 }
175 178
176 private static boolean isBlockFinished(String line) { 179 private static boolean isBlockFinished(String line) {
177 return StringUtils.isBlank(line) || line.equals("\\."); 180 return StringUtils.isBlank(line) || line.equals("\\.");
178 } 181 }
  182 +
179 } 183 }
@@ -42,7 +42,6 @@ public class PostgresToCassandraTelemetryMigrator { @@ -42,7 +42,6 @@ public class PostgresToCassandraTelemetryMigrator {
42 private static final long LOG_BATCH = 1000000; 42 private static final long LOG_BATCH = 1000000;
43 private static final long rowPerFile = 1000000; 43 private static final long rowPerFile = 1000000;
44 44
45 -  
46 private static long linesProcessed = 0; 45 private static long linesProcessed = 0;
47 private static long linesMigrated = 0; 46 private static long linesMigrated = 0;
48 private static long castErrors = 0; 47 private static long castErrors = 0;
@@ -53,15 +52,23 @@ public class PostgresToCassandraTelemetryMigrator { @@ -53,15 +52,23 @@ public class PostgresToCassandraTelemetryMigrator {
53 private static CQLSSTableWriter currentPartitionWriter = null; 52 private static CQLSSTableWriter currentPartitionWriter = null;
54 53
55 private static Set<String> partitions = new HashSet<>(); 54 private static Set<String> partitions = new HashSet<>();
56 -  
57 -  
58 - public static void migrateTs(File sourceFile, File outTsDir, File outPartitionDir, boolean castStringsIfPossible) throws IOException { 55 + private static RelatedEntitiesParser entityIdsAndTypes;
  56 + private static DictionaryParser keyParser;
  57 +
  58 + public static void migrateTs(File sourceFile,
  59 + File outTsDir,
  60 + File outPartitionDir,
  61 + RelatedEntitiesParser allEntityIdsAndTypes,
  62 + DictionaryParser dictionaryParser,
  63 + boolean castStringsIfPossible) throws IOException {
59 long startTs = System.currentTimeMillis(); 64 long startTs = System.currentTimeMillis();
60 long stepLineTs = System.currentTimeMillis(); 65 long stepLineTs = System.currentTimeMillis();
61 long stepOkLineTs = System.currentTimeMillis(); 66 long stepOkLineTs = System.currentTimeMillis();
62 LineIterator iterator = FileUtils.lineIterator(sourceFile); 67 LineIterator iterator = FileUtils.lineIterator(sourceFile);
63 currentTsWriter = WriterBuilder.getTsWriter(outTsDir); 68 currentTsWriter = WriterBuilder.getTsWriter(outTsDir);
64 currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir); 69 currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir);
  70 + entityIdsAndTypes = allEntityIdsAndTypes;
  71 + keyParser = dictionaryParser;
65 72
66 boolean isBlockStarted = false; 73 boolean isBlockStarted = false;
67 boolean isBlockFinished = false; 74 boolean isBlockFinished = false;
@@ -182,29 +189,24 @@ public class PostgresToCassandraTelemetryMigrator { @@ -182,29 +189,24 @@ public class PostgresToCassandraTelemetryMigrator {
182 //expected Table structure: 189 //expected Table structure:
183 // COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; 190 // COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
184 191
185 -  
186 List<Object> result = new ArrayList<>(); 192 List<Object> result = new ArrayList<>();
187 - result.add(raw.get(0));  
188 - result.add(fromString(raw.get(1)));  
189 - result.add(raw.get(2)); 193 + result.add(entityIdsAndTypes.getEntityType(raw.get(0)));
  194 + result.add(UUID.fromString(raw.get(0)));
  195 + result.add(keyParser.getKeyByKeyId(raw.get(1)));
190 196
191 - long ts = Long.parseLong(raw.get(3)); 197 + long ts = Long.parseLong(raw.get(2));
192 long partition = toPartitionTs(ts); 198 long partition = toPartitionTs(ts);
193 result.add(partition); 199 result.add(partition);
194 result.add(ts); 200 result.add(ts);
195 201
196 - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);  
197 - result.add(raw.get(5).equals("\\N") ? null : raw.get(5));  
198 - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));  
199 - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7))); 202 + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);
  203 + result.add(raw.get(4).equals("\\N") ? null : raw.get(4));
  204 + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));
  205 + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));
  206 + result.add(raw.get(7).equals("\\N") ? null : raw.get(7));
200 return result; 207 return result;
201 } 208 }
202 209
203 - public static UUID fromString(String src) {  
204 - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"  
205 - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19));  
206 - }  
207 -  
208 private static long toPartitionTs(long ts) { 210 private static long toPartitionTs(long ts) {
209 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); 211 LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
210 return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli(); 212 return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();
@@ -212,7 +214,7 @@ public class PostgresToCassandraTelemetryMigrator { @@ -212,7 +214,7 @@ public class PostgresToCassandraTelemetryMigrator {
212 } 214 }
213 215
214 private static boolean isBlockStarted(String line) { 216 private static boolean isBlockStarted(String line) {
215 - return line.startsWith("COPY public.ts_kv"); 217 + return line.startsWith("COPY public.ts_kv (");
216 } 218 }
217 219
218 private static boolean isBlockFinished(String line) { 220 private static boolean isBlockFinished(String line) {
@@ -21,15 +21,17 @@ It will generate single jar file with all required dependencies inside `target d @@ -21,15 +21,17 @@ It will generate single jar file with all required dependencies inside `target d
21 #### Dump data from the source Postgres Database 21 #### Dump data from the source Postgres Database
22 *Do not use compression if possible because Tool can only work with uncompressed file 22 *Do not use compression if possible because Tool can only work with uncompressed file
23 23
24 -1. Dump table `ts_kv` table: 24 +*If you want to migrate just `ts_kv` without `ts_kv_latest` just don't dump an unnecessary table and when starting the tool don't use arguments (paths) for input dump and output files*
25 25
26 - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv > ts_kv.dmp`  
27 -  
28 -2. Dump table `ts_kv_latest` table:  
29 -  
30 - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv_latest > ts_kv_latest.dmp` 26 +1. Dump related tables that need to correct save telemetry
  27 +
  28 + `pg_dump -h localhost -U postgres -d thingsboard -t tenant -t customer -t user -t dashboard -t asset -t device -t alarm -t rule_chain -t rule_node -t entity_view -t widgets_bundle -t widget_type -t tenant_profile -t device_profile -t api_usage_state -t tb_user > related_entities.dmp`
  29 +
  30 +2. Dump `ts_kv` and child:
  31 +
  32 + `pg_dump -h localhost -U postgres -d thingsboard --load-via-partition-root --data-only -t ts_kv* > ts_kv_all.dmp`
31 33
32 -3. [Optional] move table dumps to the instance where cassandra will be hosted 34 +3. [Optional] Move table dumps to the instance where cassandra will be hosted
33 35
34 #### Prepare directory structure for SSTables 36 #### Prepare directory structure for SSTables
35 Tool use 3 different directories for saving SSTables - `ts_kv_cf`, `ts_kv_latest_cf`, `ts_kv_partitions_cf` 37 Tool use 3 different directories for saving SSTables - `ts_kv_cf`, `ts_kv_latest_cf`, `ts_kv_partitions_cf`
@@ -45,9 +47,8 @@ Create 3 empty directories. For example: @@ -45,9 +47,8 @@ Create 3 empty directories. For example:
45 47
46 ``` 48 ```
47 java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar 49 java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar
48 - -latestFrom ./source/ts_kv_latest.dmp 50 + -telemetryFrom ./source/ts_kv_all.dmp
49 -latestOut /home/ubunut/migration/ts_latest 51 -latestOut /home/ubunut/migration/ts_latest
50 - -tsFrom ./source/ts_kv.dmp  
51 -tsOut /home/ubunut/migration/ts 52 -tsOut /home/ubunut/migration/ts
52 -partitionsOut /home/ubunut/migration/ts_partition 53 -partitionsOut /home/ubunut/migration/ts_partition
53 -castEnable false 54 -castEnable false
  1 +package org.thingsboard.client.tools.migrator;
  2 +
  3 +import org.apache.commons.io.FileUtils;
  4 +import org.apache.commons.io.LineIterator;
  5 +import org.apache.commons.lang3.StringUtils;
  6 +import org.thingsboard.server.common.data.EntityType;
  7 +
  8 +import java.io.File;
  9 +import java.io.IOException;
  10 +import java.util.HashMap;
  11 +import java.util.Map;
  12 +
  13 +public class RelatedEntitiesParser {
  14 + private final Map<String, String> allEntityIdsAndTypes = new HashMap<>();
  15 +
  16 + public RelatedEntitiesParser(File source) throws IOException {
  17 + processAllTables(FileUtils.lineIterator(source));
  18 + }
  19 +
  20 + public String getEntityType(String uuid) {
  21 + return this.allEntityIdsAndTypes.get(uuid);
  22 + }
  23 +
  24 + private boolean isBlockFinished(String line) {
  25 + return StringUtils.isBlank(line) || line.equals("\\.");
  26 + }
  27 +
  28 + private void processAllTables(LineIterator lineIterator) {
  29 + String currentLine;
  30 + while(lineIterator.hasNext()) {
  31 + currentLine = lineIterator.nextLine();
  32 + if(currentLine.startsWith("COPY public.alarm")) {
  33 + processBlock(lineIterator, EntityType.ALARM);
  34 + } else if (currentLine.startsWith("COPY public.asset")) {
  35 + processBlock(lineIterator, EntityType.ASSET);
  36 + } else if (currentLine.startsWith("COPY public.customer")) {
  37 + processBlock(lineIterator, EntityType.CUSTOMER);
  38 + } else if (currentLine.startsWith("COPY public.dashboard")) {
  39 + processBlock(lineIterator, EntityType.DASHBOARD);
  40 + } else if (currentLine.startsWith("COPY public.device")) {
  41 + processBlock(lineIterator, EntityType.DEVICE);
  42 + } else if (currentLine.startsWith("COPY public.rule_chain")) {
  43 + processBlock(lineIterator, EntityType.RULE_CHAIN);
  44 + } else if (currentLine.startsWith("COPY public.rule_node")) {
  45 + processBlock(lineIterator, EntityType.RULE_NODE);
  46 + } else if (currentLine.startsWith("COPY public.tenant")) {
  47 + processBlock(lineIterator, EntityType.TENANT);
  48 + } else if (currentLine.startsWith("COPY public.tb_user")) {
  49 + processBlock(lineIterator, EntityType.USER);
  50 + } else if (currentLine.startsWith("COPY public.entity_view")) {
  51 + processBlock(lineIterator, EntityType.ENTITY_VIEW);
  52 + } else if (currentLine.startsWith("COPY public.widgets_bundle")) {
  53 + processBlock(lineIterator, EntityType.WIDGETS_BUNDLE);
  54 + } else if (currentLine.startsWith("COPY public.widget_type")) {
  55 + processBlock(lineIterator, EntityType.WIDGET_TYPE);
  56 + } else if (currentLine.startsWith("COPY public.tenant_profile")) {
  57 + processBlock(lineIterator, EntityType.TENANT_PROFILE);
  58 + } else if (currentLine.startsWith("COPY public.device_profile")) {
  59 + processBlock(lineIterator, EntityType.DEVICE_PROFILE);
  60 + } else if (currentLine.startsWith("COPY public.api_usage_state")) {
  61 + processBlock(lineIterator, EntityType.API_USAGE_STATE);
  62 + }
  63 + }
  64 + }
  65 +
  66 + private void processBlock(LineIterator lineIterator, EntityType entityType) {
  67 + String currentLine;
  68 + while(lineIterator.hasNext()) {
  69 + currentLine = lineIterator.nextLine();
  70 + if(isBlockFinished(currentLine)) {
  71 + return;
  72 + }
  73 + allEntityIdsAndTypes.put(currentLine.split("\t")[0], entityType.name());
  74 + }
  75 + }
  76 +}
@@ -31,6 +31,7 @@ public class WriterBuilder { @@ -31,6 +31,7 @@ public class WriterBuilder {
31 " str_v text,\n" + 31 " str_v text,\n" +
32 " long_v bigint,\n" + 32 " long_v bigint,\n" +
33 " dbl_v double,\n" + 33 " dbl_v double,\n" +
  34 + " json_v text,\n" +
34 " PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" + 35 " PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" +
35 ");"; 36 ");";
36 37
@@ -43,6 +44,7 @@ public class WriterBuilder { @@ -43,6 +44,7 @@ public class WriterBuilder {
43 " str_v text,\n" + 44 " str_v text,\n" +
44 " long_v bigint,\n" + 45 " long_v bigint,\n" +
45 " dbl_v double,\n" + 46 " dbl_v double,\n" +
  47 + " json_v text,\n" +
46 " PRIMARY KEY (( entity_type, entity_id ), key)\n" + 48 " PRIMARY KEY (( entity_type, entity_id ), key)\n" +
47 ") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };"; 49 ") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };";
48 50
@@ -59,8 +61,8 @@ public class WriterBuilder { @@ -59,8 +61,8 @@ public class WriterBuilder {
59 return CQLSSTableWriter.builder() 61 return CQLSSTableWriter.builder()
60 .inDirectory(dir) 62 .inDirectory(dir)
61 .forTable(tsSchema) 63 .forTable(tsSchema)
62 - .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v) " +  
63 - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") 64 + .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
  65 + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
64 .build(); 66 .build();
65 } 67 }
66 68
@@ -68,8 +70,8 @@ public class WriterBuilder { @@ -68,8 +70,8 @@ public class WriterBuilder {
68 return CQLSSTableWriter.builder() 70 return CQLSSTableWriter.builder()
69 .inDirectory(dir) 71 .inDirectory(dir)
70 .forTable(latestSchema) 72 .forTable(latestSchema)
71 - .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +  
72 - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)") 73 + .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
  74 + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
73 .build(); 75 .build();
74 } 76 }
75 77