Commit 1240099c815b7a673f782904b8ad64c36c253018

Authored by Andrew Volostnykh
Committed by Andrew Shvayka
1 parent 67ee892e

Full refactoring and code cleaning

@@ -54,6 +54,7 @@ @@ -54,6 +54,7 @@
54 <dependency> 54 <dependency>
55 <groupId>org.apache.cassandra</groupId> 55 <groupId>org.apache.cassandra</groupId>
56 <artifactId>cassandra-all</artifactId> 56 <artifactId>cassandra-all</artifactId>
  57 + <version>3.11.10</version>
57 </dependency> 58 </dependency>
58 <dependency> 59 <dependency>
59 <groupId>com.datastax.cassandra</groupId> 60 <groupId>com.datastax.cassandra</groupId>
@@ -63,6 +64,7 @@ @@ -63,6 +64,7 @@
63 <dependency> 64 <dependency>
64 <groupId>commons-io</groupId> 65 <groupId>commons-io</groupId>
65 <artifactId>commons-io</artifactId> 66 <artifactId>commons-io</artifactId>
  67 + <version>2.5</version>
66 </dependency> 68 </dependency>
67 </dependencies> 69 </dependencies>
68 70
@@ -44,13 +44,17 @@ public class DictionaryParser { @@ -44,13 +44,17 @@ public class DictionaryParser {
44 } 44 }
45 45
46 private void parseDictionaryDump(LineIterator iterator) { 46 private void parseDictionaryDump(LineIterator iterator) {
47 - String tempLine;  
48 - while(iterator.hasNext()) {  
49 - tempLine = iterator.nextLine(); 47 + try {
  48 + String tempLine;
  49 + while (iterator.hasNext()) {
  50 + tempLine = iterator.nextLine();
50 51
51 - if(isBlockStarted(tempLine)) {  
52 - processBlock(iterator); 52 + if (isBlockStarted(tempLine)) {
  53 + processBlock(iterator);
  54 + }
53 } 55 }
  56 + } finally {
  57 + iterator.close();
54 } 58 }
55 } 59 }
56 60
@@ -33,23 +33,24 @@ public class MigratorTool { @@ -33,23 +33,24 @@ public class MigratorTool {
33 try { 33 try {
34 boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable")); 34 boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable"));
35 File allTelemetrySource = new File(cmd.getOptionValue("telemetryFrom")); 35 File allTelemetrySource = new File(cmd.getOptionValue("telemetryFrom"));
  36 + File tsSaveDir = null;
  37 + File partitionsSaveDir = null;
  38 + File latestSaveDir = null;
36 39
37 RelatedEntitiesParser allEntityIdsAndTypes = 40 RelatedEntitiesParser allEntityIdsAndTypes =
38 new RelatedEntitiesParser(new File(cmd.getOptionValue("relatedEntities"))); 41 new RelatedEntitiesParser(new File(cmd.getOptionValue("relatedEntities")));
39 DictionaryParser dictionaryParser = new DictionaryParser(allTelemetrySource); 42 DictionaryParser dictionaryParser = new DictionaryParser(allTelemetrySource);
40 43
41 if(cmd.getOptionValue("latestTelemetryOut") != null) { 44 if(cmd.getOptionValue("latestTelemetryOut") != null) {
42 - File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));  
43 - PgCaLatestMigrator.migrateLatest(allTelemetrySource, latestSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable); 45 + latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));
44 } 46 }
45 if(cmd.getOptionValue("telemetryOut") != null) { 47 if(cmd.getOptionValue("telemetryOut") != null) {
46 - File tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));  
47 - File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));  
48 - PostgresToCassandraTelemetryMigrator.migrateTs(  
49 - allTelemetrySource, tsSaveDir, partitionsSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable  
50 - ); 48 + tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));
  49 + partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));
51 } 50 }
52 51
  52 + new PgCaMigrator(allTelemetrySource, tsSaveDir, partitionsSaveDir, latestSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable).migrate();
  53 +
53 } catch (Throwable th) { 54 } catch (Throwable th) {
54 th.printStackTrace(); 55 th.printStackTrace();
55 throw new IllegalStateException("failed", th); 56 throw new IllegalStateException("failed", th);
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.client.tools.migrator;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import org.apache.cassandra.io.sstable.CQLSSTableWriter;  
20 -import org.apache.commons.io.FileUtils;  
21 -import org.apache.commons.io.LineIterator;  
22 -import org.apache.commons.lang3.StringUtils;  
23 -import org.apache.commons.lang3.math.NumberUtils;  
24 -  
25 -import java.io.File;  
26 -import java.io.IOException;  
27 -import java.util.ArrayList;  
28 -import java.util.Arrays;  
29 -import java.util.Date;  
30 -import java.util.List;  
31 -import java.util.UUID;  
32 -import java.util.stream.Collectors;  
33 -  
34 -public class PgCaLatestMigrator {  
35 -  
36 - private static final long LOG_BATCH = 1000000;  
37 - private static final long rowPerFile = 1000000;  
38 -  
39 -  
40 - private static long linesProcessed = 0;  
41 - private static long linesMigrated = 0;  
42 - private static long castErrors = 0;  
43 - private static long castedOk = 0;  
44 -  
45 - private static long currentWriterCount = 1;  
46 - private static RelatedEntitiesParser allIdsAndTypes;  
47 - private static DictionaryParser keyPairs;  
48 -  
49 - public static void migrateLatest(File sourceFile,  
50 - File outDir,  
51 - RelatedEntitiesParser allEntityIdsAndTypes,  
52 - DictionaryParser dictionaryParser,  
53 - boolean castStringsIfPossible) throws IOException {  
54 - long startTs = System.currentTimeMillis();  
55 - long stepLineTs = System.currentTimeMillis();  
56 - long stepOkLineTs = System.currentTimeMillis();  
57 - LineIterator iterator = FileUtils.lineIterator(sourceFile);  
58 - CQLSSTableWriter currentTsWriter = WriterBuilder.getLatestWriter(outDir);  
59 - allIdsAndTypes = allEntityIdsAndTypes;  
60 - keyPairs = dictionaryParser;  
61 -  
62 - boolean isBlockStarted = false;  
63 - boolean isBlockFinished = false;  
64 -  
65 - String line;  
66 - while (iterator.hasNext()) {  
67 - if (linesProcessed++ % LOG_BATCH == 0) {  
68 - System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);  
69 - stepLineTs = System.currentTimeMillis();  
70 - }  
71 -  
72 - line = iterator.nextLine();  
73 -  
74 - if (isBlockFinished) {  
75 - break;  
76 - }  
77 -  
78 - if (!isBlockStarted) {  
79 - if (isBlockStarted(line)) {  
80 - System.out.println();  
81 - System.out.println();  
82 - System.out.println(line);  
83 - System.out.println();  
84 - System.out.println();  
85 - isBlockStarted = true;  
86 - }  
87 - continue;  
88 - }  
89 -  
90 - if (isBlockFinished(line)) {  
91 - isBlockFinished = true;  
92 - } else {  
93 - try {  
94 - List<String> raw = Arrays.stream(line.trim().split("\t"))  
95 - .map(String::trim)  
96 - .filter(StringUtils::isNotEmpty)  
97 - .collect(Collectors.toList());  
98 - List<Object> values = toValues(raw);  
99 -  
100 - if (currentWriterCount == 0) {  
101 - System.out.println(new Date() + " close writer " + new Date());  
102 - currentTsWriter.close();  
103 - currentTsWriter = WriterBuilder.getLatestWriter(outDir);  
104 - }  
105 -  
106 - if (castStringsIfPossible) {  
107 - currentTsWriter.addRow(castToNumericIfPossible(values));  
108 - } else {  
109 - currentTsWriter.addRow(values);  
110 - }  
111 - currentWriterCount++;  
112 - if (currentWriterCount >= rowPerFile) {  
113 - currentWriterCount = 0;  
114 - }  
115 -  
116 - if (linesMigrated++ % LOG_BATCH == 0) {  
117 - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " ms.");  
118 - stepOkLineTs = System.currentTimeMillis();  
119 - }  
120 - } catch (Exception ex) {  
121 - System.out.println(ex.getMessage() + " -> " + line);  
122 - }  
123 -  
124 - }  
125 - }  
126 -  
127 - long endTs = System.currentTimeMillis();  
128 - System.out.println();  
129 - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs) + " ts");  
130 -  
131 - currentTsWriter.close();  
132 - System.out.println();  
133 - System.out.println("Finished migrate Latest Telemetry");  
134 - }  
135 -  
136 -  
137 - private static List<Object> castToNumericIfPossible(List<Object> values) {  
138 - try {  
139 - if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {  
140 - Double casted = NumberUtils.createDouble(values.get(6).toString());  
141 - List<Object> numeric = Lists.newArrayList();  
142 - numeric.addAll(values);  
143 - numeric.set(6, null);  
144 - numeric.set(8, casted);  
145 - castedOk++;  
146 - return numeric;  
147 - }  
148 - } catch (Throwable th) {  
149 - castErrors++;  
150 - }  
151 - return values;  
152 - }  
153 -  
154 - private static List<Object> toValues(List<String> raw) {  
155 - //expected Table structure:  
156 - //COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;  
157 -  
158 - List<Object> result = new ArrayList<>();  
159 - result.add(allIdsAndTypes.getEntityType(raw.get(0)));  
160 - result.add(UUID.fromString(raw.get(0)));  
161 - result.add(keyPairs.getKeyByKeyId(raw.get(1)));  
162 -  
163 - long ts = Long.parseLong(raw.get(2));  
164 - result.add(3, ts);  
165 -  
166 - result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);  
167 - result.add(raw.get(4).equals("\\N") ? null : raw.get(4));  
168 - result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));  
169 - result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));  
170 - result.add(raw.get(7).equals("\\N") ? null : raw.get(7));  
171 -  
172 - return result;  
173 - }  
174 -  
175 - private static boolean isBlockStarted(String line) {  
176 - return line.startsWith("COPY public.ts_kv_latest (");  
177 - }  
178 -  
179 - private static boolean isBlockFinished(String line) {  
180 - return StringUtils.isBlank(line) || line.equals("\\.");  
181 - }  
182 -  
183 -}  
  1 +package org.thingsboard.client.tools.migrator;
  2 +
  3 +import com.google.common.collect.Lists;
  4 +import org.apache.cassandra.io.sstable.CQLSSTableWriter;
  5 +import org.apache.commons.io.FileUtils;
  6 +import org.apache.commons.io.LineIterator;
  7 +import org.apache.commons.lang3.StringUtils;
  8 +import org.apache.commons.lang3.math.NumberUtils;
  9 +
  10 +import java.io.File;
  11 +import java.io.IOException;
  12 +import java.time.Instant;
  13 +import java.time.LocalDateTime;
  14 +import java.time.ZoneOffset;
  15 +import java.time.temporal.ChronoUnit;
  16 +import java.util.ArrayList;
  17 +import java.util.Arrays;
  18 +import java.util.Date;
  19 +import java.util.HashSet;
  20 +import java.util.List;
  21 +import java.util.Set;
  22 +import java.util.UUID;
  23 +import java.util.function.Function;
  24 +import java.util.stream.Collectors;
  25 +
  26 +public class PgCaMigrator {
  27 +
  28 + private final long LOG_BATCH = 1000000;
  29 + private final long rowPerFile = 1000000;
  30 +
  31 + private long linesProcessed = 0;
  32 + private long linesTsMigrated = 0;
  33 + private long linesLatestMigrated = 0;
  34 + private long castErrors = 0;
  35 + private long castedOk = 0;
  36 +
  37 + private long currentWriterCount = 1;
  38 +
  39 + private final File sourceFile;
  40 + private final boolean castStringIfPossible;
  41 +
  42 + private final RelatedEntitiesParser entityIdsAndTypes;
  43 + private final DictionaryParser keyParser;
  44 + private CQLSSTableWriter currentTsWriter;
  45 + private CQLSSTableWriter currentPartitionsWriter;
  46 + private CQLSSTableWriter currentTsLatestWriter;
  47 + private final Set<String> partitions = new HashSet<>();
  48 +
  49 + private File outTsDir;
  50 + private File outTsLatestDir;
  51 +
  52 + public PgCaMigrator(File sourceFile,
  53 + File ourTsDir,
  54 + File outTsPartitionDir,
  55 + File outTsLatestDir,
  56 + RelatedEntitiesParser allEntityIdsAndTypes,
  57 + DictionaryParser dictionaryParser,
  58 + boolean castStringsIfPossible) {
  59 + this.sourceFile = sourceFile;
  60 + this.entityIdsAndTypes = allEntityIdsAndTypes;
  61 + this.keyParser = dictionaryParser;
  62 + this.castStringIfPossible = castStringsIfPossible;
  63 + if(outTsLatestDir != null) {
  64 + this.currentTsLatestWriter = WriterBuilder.getLatestWriter(outTsLatestDir);
  65 + this.outTsLatestDir = outTsLatestDir;
  66 + }
  67 + if(ourTsDir != null) {
  68 + this.currentTsWriter = WriterBuilder.getTsWriter(ourTsDir);
  69 + this.currentPartitionsWriter = WriterBuilder.getPartitionWriter(outTsPartitionDir);
  70 + this.outTsDir = ourTsDir;
  71 + }
  72 + }
  73 +
  74 + public void migrate() throws IOException {
  75 + boolean isTsDone = false;
  76 + boolean isLatestDone = false;
  77 + String line;
  78 + LineIterator iterator = FileUtils.lineIterator(this.sourceFile);
  79 +
  80 + try {
  81 + while(iterator.hasNext()) {
  82 + line = iterator.nextLine();
  83 + if(!isLatestDone && isBlockLatestStarted(line)) {
  84 + System.out.println("START TO MIGRATE LATEST");
  85 + long start = System.currentTimeMillis();
  86 + processBlock(iterator, currentTsLatestWriter, outTsLatestDir, this::toValuesLatest);
  87 + System.out.println("FORMING OF SSL FOR LATEST TS FINISHED WITH TIME: " + (System.currentTimeMillis() - start) + " ms.");
  88 + isLatestDone = true;
  89 + }
  90 +
  91 + if(!isTsDone && isBlockTsStarted(line)) {
  92 + System.out.println("START TO MIGRATE TS");
  93 + long start = System.currentTimeMillis();
  94 + processBlock(iterator, currentTsWriter, outTsDir, this::toValuesTs);
  95 + System.out.println("FORMING OF SSL FOR TS FINISHED WITH TIME: " + (System.currentTimeMillis() - start) + " ms.");
  96 + isTsDone = true;
  97 + }
  98 + }
  99 +
  100 + System.out.println("Partitions collected " + partitions.size());
  101 + long startTs = System.currentTimeMillis();
  102 + for (String partition : partitions) {
  103 + String[] split = partition.split("\\|");
  104 + List<Object> values = Lists.newArrayList();
  105 + values.add(split[0]);
  106 + values.add(UUID.fromString(split[1]));
  107 + values.add(split[2]);
  108 + values.add(Long.parseLong(split[3]));
  109 + currentPartitionsWriter.addRow(values);
  110 + }
  111 +
  112 + System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (System.currentTimeMillis() - startTs));
  113 +
  114 + System.out.println();
  115 + System.out.println("Finished migrate Telemetry");
  116 +
  117 + } finally {
  118 + iterator.close();
  119 + currentTsLatestWriter.close();
  120 + currentTsWriter.close();
  121 + currentPartitionsWriter.close();
  122 + }
  123 + }
  124 +
  125 + private void logLinesProcessed() {
  126 + if (linesProcessed++ % LOG_BATCH == 0) {
  127 + System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in, castOk " + castedOk + " castErr " + castErrors);
  128 + }
  129 + }
  130 +
  131 + private List<Object> toValuesTs(List<String> raw) {
  132 + linesTsMigrated++;
  133 + List<Object> result = new ArrayList<>();
  134 + result.add(entityIdsAndTypes.getEntityType(raw.get(0)));
  135 + result.add(UUID.fromString(raw.get(0)));
  136 + result.add(keyParser.getKeyByKeyId(raw.get(1)));
  137 +
  138 + long ts = Long.parseLong(raw.get(2));
  139 + long partition = toPartitionTs(ts);
  140 + result.add(partition);
  141 + result.add(ts);
  142 +
  143 + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);
  144 + result.add(raw.get(4).equals("\\N") ? null : raw.get(4));
  145 + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));
  146 + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));
  147 + result.add(raw.get(7).equals("\\N") ? null : raw.get(7));
  148 +
  149 + processPartitions(result);
  150 +
  151 + return result;
  152 + }
  153 +
  154 + private List<Object> toValuesLatest(List<String> raw) {
  155 + linesLatestMigrated++;
  156 + List<Object> result = new ArrayList<>();
  157 + result.add(this.entityIdsAndTypes.getEntityType(raw.get(0)));
  158 + result.add(UUID.fromString(raw.get(0)));
  159 + result.add(this.keyParser.getKeyByKeyId(raw.get(1)));
  160 +
  161 + long ts = Long.parseLong(raw.get(2));
  162 + result.add(3, ts);
  163 +
  164 + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);
  165 + result.add(raw.get(4).equals("\\N") ? null : raw.get(4));
  166 + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));
  167 + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));
  168 + result.add(raw.get(7).equals("\\N") ? null : raw.get(7));
  169 +
  170 + return result;
  171 + }
  172 +
  173 + private long toPartitionTs(long ts) {
  174 + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
  175 + return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();
  176 + }
  177 +
  178 + private void processPartitions(List<Object> values) {
  179 + String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3);
  180 + partitions.add(key);
  181 + }
  182 +
  183 + private void processBlock(LineIterator iterator, CQLSSTableWriter writer, File outDir, Function<List<String>, List<Object>> function) {
  184 + String currentLine;
  185 + linesProcessed = 0;
  186 + while(iterator.hasNext()) {
  187 + logLinesProcessed();
  188 + currentLine = iterator.nextLine();
  189 + if(isBlockFinished(currentLine)) {
  190 + return;
  191 + }
  192 +
  193 + try {
  194 + List<String> raw = Arrays.stream(currentLine.trim().split("\t"))
  195 + .map(String::trim)
  196 + .filter(StringUtils::isNotEmpty)
  197 + .collect(Collectors.toList());
  198 + List<Object> values = function.apply(raw);
  199 +
  200 + if (this.currentWriterCount == 0) {
  201 + System.out.println(new Date() + " close writer " + new Date());
  202 + writer.close();
  203 + writer = WriterBuilder.getLatestWriter(outDir);
  204 + }
  205 +
  206 + if (this.castStringIfPossible) {
  207 + writer.addRow(castToNumericIfPossible(values));
  208 + } else {
  209 + writer.addRow(values);
  210 + }
  211 +
  212 + currentWriterCount++;
  213 + if (currentWriterCount >= rowPerFile) {
  214 + currentWriterCount = 0;
  215 + }
  216 + } catch (Exception ex) {
  217 + System.out.println(ex.getMessage() + " -> " + currentLine);
  218 + }
  219 + }
  220 + }
  221 +
  222 + private List<Object> castToNumericIfPossible(List<Object> values) {
  223 + try {
  224 + if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
  225 + Double casted = NumberUtils.createDouble(values.get(6).toString());
  226 + List<Object> numeric = Lists.newArrayList();
  227 + numeric.addAll(values);
  228 + numeric.set(6, null);
  229 + numeric.set(8, casted);
  230 + castedOk++;
  231 + return numeric;
  232 + }
  233 + } catch (Throwable th) {
  234 + castErrors++;
  235 + }
  236 +
  237 + processPartitions(values);
  238 +
  239 + return values;
  240 + }
  241 +
  242 + private boolean isBlockFinished(String line) {
  243 + return StringUtils.isBlank(line) || line.equals("\\.");
  244 + }
  245 +
  246 + private boolean isBlockTsStarted(String line) {
  247 + return line.startsWith("COPY public.ts_kv (");
  248 + }
  249 +
  250 + private boolean isBlockLatestStarted(String line) {
  251 + return line.startsWith("COPY public.ts_kv_latest (");
  252 + }
  253 +
  254 +}
1 -/**  
2 - * Copyright © 2016-2021 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.client.tools.migrator;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import org.apache.cassandra.io.sstable.CQLSSTableWriter;  
20 -import org.apache.commons.io.FileUtils;  
21 -import org.apache.commons.io.LineIterator;  
22 -import org.apache.commons.lang3.StringUtils;  
23 -import org.apache.commons.lang3.math.NumberUtils;  
24 -  
25 -import java.io.File;  
26 -import java.io.IOException;  
27 -import java.time.Instant;  
28 -import java.time.LocalDateTime;  
29 -import java.time.ZoneOffset;  
30 -import java.time.temporal.ChronoUnit;  
31 -import java.util.ArrayList;  
32 -import java.util.Arrays;  
33 -import java.util.Date;  
34 -import java.util.HashSet;  
35 -import java.util.List;  
36 -import java.util.Set;  
37 -import java.util.UUID;  
38 -import java.util.stream.Collectors;  
39 -  
40 -public class PostgresToCassandraTelemetryMigrator {  
41 -  
42 - private static final long LOG_BATCH = 1000000;  
43 - private static final long rowPerFile = 1000000;  
44 -  
45 - private static long linesProcessed = 0;  
46 - private static long linesMigrated = 0;  
47 - private static long castErrors = 0;  
48 - private static long castedOk = 0;  
49 -  
50 - private static long currentWriterCount = 1;  
51 - private static CQLSSTableWriter currentTsWriter = null;  
52 - private static CQLSSTableWriter currentPartitionWriter = null;  
53 -  
54 - private static Set<String> partitions = new HashSet<>();  
55 - private static RelatedEntitiesParser entityIdsAndTypes;  
56 - private static DictionaryParser keyParser;  
57 -  
58 - public static void migrateTs(File sourceFile,  
59 - File outTsDir,  
60 - File outPartitionDir,  
61 - RelatedEntitiesParser allEntityIdsAndTypes,  
62 - DictionaryParser dictionaryParser,  
63 - boolean castStringsIfPossible) throws IOException {  
64 - long startTs = System.currentTimeMillis();  
65 - long stepLineTs = System.currentTimeMillis();  
66 - long stepOkLineTs = System.currentTimeMillis();  
67 - LineIterator iterator = FileUtils.lineIterator(sourceFile);  
68 - currentTsWriter = WriterBuilder.getTsWriter(outTsDir);  
69 - currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir);  
70 - entityIdsAndTypes = allEntityIdsAndTypes;  
71 - keyParser = dictionaryParser;  
72 -  
73 - boolean isBlockStarted = false;  
74 - boolean isBlockFinished = false;  
75 -  
76 - String line;  
77 - while (iterator.hasNext()) {  
78 - if (linesProcessed++ % LOG_BATCH == 0) {  
79 - System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);  
80 - stepLineTs = System.currentTimeMillis();  
81 - }  
82 -  
83 - line = iterator.nextLine();  
84 -  
85 - if (isBlockFinished) {  
86 - break;  
87 - }  
88 -  
89 - if (!isBlockStarted) {  
90 - if (isBlockStarted(line)) {  
91 - System.out.println();  
92 - System.out.println();  
93 - System.out.println(line);  
94 - System.out.println();  
95 - System.out.println();  
96 - isBlockStarted = true;  
97 - }  
98 - continue;  
99 - }  
100 -  
101 - if (isBlockFinished(line)) {  
102 - isBlockFinished = true;  
103 - } else {  
104 - try {  
105 - List<String> raw = Arrays.stream(line.trim().split("\t"))  
106 - .map(String::trim)  
107 - .filter(StringUtils::isNotEmpty)  
108 - .collect(Collectors.toList());  
109 - List<Object> values = toValues(raw);  
110 -  
111 - if (currentWriterCount == 0) {  
112 - System.out.println(new Date() + " close writer " + new Date());  
113 - currentTsWriter.close();  
114 - currentTsWriter = WriterBuilder.getTsWriter(outTsDir);  
115 - }  
116 -  
117 - if (castStringsIfPossible) {  
118 - currentTsWriter.addRow(castToNumericIfPossible(values));  
119 - } else {  
120 - currentTsWriter.addRow(values);  
121 - }  
122 - processPartitions(values);  
123 - currentWriterCount++;  
124 - if (currentWriterCount >= rowPerFile) {  
125 - currentWriterCount = 0;  
126 - }  
127 -  
128 - if (linesMigrated++ % LOG_BATCH == 0) {  
129 - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " partitions = " + partitions.size());  
130 - stepOkLineTs = System.currentTimeMillis();  
131 - }  
132 - } catch (Exception ex) {  
133 - System.out.println(ex.getMessage() + " -> " + line);  
134 - }  
135 -  
136 - }  
137 - }  
138 -  
139 - long endTs = System.currentTimeMillis();  
140 - System.out.println();  
141 - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs));  
142 - System.out.println("Partitions collected " + partitions.size());  
143 -  
144 - startTs = System.currentTimeMillis();  
145 - for (String partition : partitions) {  
146 - String[] split = partition.split("\\|");  
147 - List<Object> values = Lists.newArrayList();  
148 - values.add(split[0]);  
149 - values.add(UUID.fromString(split[1]));  
150 - values.add(split[2]);  
151 - values.add(Long.parseLong(split[3]));  
152 - currentPartitionWriter.addRow(values);  
153 - }  
154 - currentPartitionWriter.close();  
155 - endTs = System.currentTimeMillis();  
156 - System.out.println();  
157 - System.out.println();  
158 - System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (endTs - startTs));  
159 -  
160 -  
161 - currentTsWriter.close();  
162 - System.out.println();  
163 - System.out.println("Finished migrate Telemetry");  
164 - }  
165 -  
166 - private static List<Object> castToNumericIfPossible(List<Object> values) {  
167 - try {  
168 - if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {  
169 - Double casted = NumberUtils.createDouble(values.get(6).toString());  
170 - List<Object> numeric = Lists.newArrayList();  
171 - numeric.addAll(values);  
172 - numeric.set(6, null);  
173 - numeric.set(8, casted);  
174 - castedOk++;  
175 - return numeric;  
176 - }  
177 - } catch (Throwable th) {  
178 - castErrors++;  
179 - }  
180 - return values;  
181 - }  
182 -  
183 - private static void processPartitions(List<Object> values) {  
184 - String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3);  
185 - partitions.add(key);  
186 - }  
187 -  
188 - private static List<Object> toValues(List<String> raw) {  
189 - //expected Table structure:  
190 -// COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;  
191 -  
192 - List<Object> result = new ArrayList<>();  
193 - result.add(entityIdsAndTypes.getEntityType(raw.get(0)));  
194 - result.add(UUID.fromString(raw.get(0)));  
195 - result.add(keyParser.getKeyByKeyId(raw.get(1)));  
196 -  
197 - long ts = Long.parseLong(raw.get(2));  
198 - long partition = toPartitionTs(ts);  
199 - result.add(partition);  
200 - result.add(ts);  
201 -  
202 - result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);  
203 - result.add(raw.get(4).equals("\\N") ? null : raw.get(4));  
204 - result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));  
205 - result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));  
206 - result.add(raw.get(7).equals("\\N") ? null : raw.get(7));  
207 - return result;  
208 - }  
209 -  
210 - private static long toPartitionTs(long ts) {  
211 - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);  
212 - return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();  
213 -// return TsPartitionDate.MONTHS.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();  
214 - }  
215 -  
216 - private static boolean isBlockStarted(String line) {  
217 - return line.startsWith("COPY public.ts_kv (");  
218 - }  
219 -  
220 - private static boolean isBlockFinished(String line) {  
221 - return StringUtils.isBlank(line) || line.equals("\\.");  
222 - }  
223 -  
224 -}  
@@ -42,39 +42,43 @@ public class RelatedEntitiesParser { @@ -42,39 +42,43 @@ public class RelatedEntitiesParser {
42 42
43 private void processAllTables(LineIterator lineIterator) { 43 private void processAllTables(LineIterator lineIterator) {
44 String currentLine; 44 String currentLine;
45 - while(lineIterator.hasNext()) {  
46 - currentLine = lineIterator.nextLine();  
47 - if(currentLine.startsWith("COPY public.alarm")) {  
48 - processBlock(lineIterator, EntityType.ALARM);  
49 - } else if (currentLine.startsWith("COPY public.asset")) {  
50 - processBlock(lineIterator, EntityType.ASSET);  
51 - } else if (currentLine.startsWith("COPY public.customer")) {  
52 - processBlock(lineIterator, EntityType.CUSTOMER);  
53 - } else if (currentLine.startsWith("COPY public.dashboard")) {  
54 - processBlock(lineIterator, EntityType.DASHBOARD);  
55 - } else if (currentLine.startsWith("COPY public.device")) {  
56 - processBlock(lineIterator, EntityType.DEVICE);  
57 - } else if (currentLine.startsWith("COPY public.rule_chain")) {  
58 - processBlock(lineIterator, EntityType.RULE_CHAIN);  
59 - } else if (currentLine.startsWith("COPY public.rule_node")) {  
60 - processBlock(lineIterator, EntityType.RULE_NODE);  
61 - } else if (currentLine.startsWith("COPY public.tenant")) {  
62 - processBlock(lineIterator, EntityType.TENANT);  
63 - } else if (currentLine.startsWith("COPY public.tb_user")) {  
64 - processBlock(lineIterator, EntityType.USER);  
65 - } else if (currentLine.startsWith("COPY public.entity_view")) {  
66 - processBlock(lineIterator, EntityType.ENTITY_VIEW);  
67 - } else if (currentLine.startsWith("COPY public.widgets_bundle")) {  
68 - processBlock(lineIterator, EntityType.WIDGETS_BUNDLE);  
69 - } else if (currentLine.startsWith("COPY public.widget_type")) {  
70 - processBlock(lineIterator, EntityType.WIDGET_TYPE);  
71 - } else if (currentLine.startsWith("COPY public.tenant_profile")) {  
72 - processBlock(lineIterator, EntityType.TENANT_PROFILE);  
73 - } else if (currentLine.startsWith("COPY public.device_profile")) {  
74 - processBlock(lineIterator, EntityType.DEVICE_PROFILE);  
75 - } else if (currentLine.startsWith("COPY public.api_usage_state")) {  
76 - processBlock(lineIterator, EntityType.API_USAGE_STATE); 45 + try {
  46 + while (lineIterator.hasNext()) {
  47 + currentLine = lineIterator.nextLine();
  48 + if (currentLine.startsWith("COPY public.alarm")) {
  49 + processBlock(lineIterator, EntityType.ALARM);
  50 + } else if (currentLine.startsWith("COPY public.asset")) {
  51 + processBlock(lineIterator, EntityType.ASSET);
  52 + } else if (currentLine.startsWith("COPY public.customer")) {
  53 + processBlock(lineIterator, EntityType.CUSTOMER);
  54 + } else if (currentLine.startsWith("COPY public.dashboard")) {
  55 + processBlock(lineIterator, EntityType.DASHBOARD);
  56 + } else if (currentLine.startsWith("COPY public.device")) {
  57 + processBlock(lineIterator, EntityType.DEVICE);
  58 + } else if (currentLine.startsWith("COPY public.rule_chain")) {
  59 + processBlock(lineIterator, EntityType.RULE_CHAIN);
  60 + } else if (currentLine.startsWith("COPY public.rule_node")) {
  61 + processBlock(lineIterator, EntityType.RULE_NODE);
  62 + } else if (currentLine.startsWith("COPY public.tenant")) {
  63 + processBlock(lineIterator, EntityType.TENANT);
  64 + } else if (currentLine.startsWith("COPY public.tb_user")) {
  65 + processBlock(lineIterator, EntityType.USER);
  66 + } else if (currentLine.startsWith("COPY public.entity_view")) {
  67 + processBlock(lineIterator, EntityType.ENTITY_VIEW);
  68 + } else if (currentLine.startsWith("COPY public.widgets_bundle")) {
  69 + processBlock(lineIterator, EntityType.WIDGETS_BUNDLE);
  70 + } else if (currentLine.startsWith("COPY public.widget_type")) {
  71 + processBlock(lineIterator, EntityType.WIDGET_TYPE);
  72 + } else if (currentLine.startsWith("COPY public.tenant_profile")) {
  73 + processBlock(lineIterator, EntityType.TENANT_PROFILE);
  74 + } else if (currentLine.startsWith("COPY public.device_profile")) {
  75 + processBlock(lineIterator, EntityType.DEVICE_PROFILE);
  76 + } else if (currentLine.startsWith("COPY public.api_usage_state")) {
  77 + processBlock(lineIterator, EntityType.API_USAGE_STATE);
  78 + }
77 } 79 }
  80 + } finally {
  81 + lineIterator.close();
78 } 82 }
79 } 83 }
80 84