Commit 5e77e3ea9fe04b1fe82d22a061ebc66e6e7db7ca

Authored by Igor Kulikov
2 parents 7cfa352a 648dd81b

Merge branch 'master' of github.com:thingsboard/thingsboard

... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.controller;
18 18 import com.fasterxml.jackson.core.type.TypeReference;
19 19 import com.fasterxml.jackson.databind.JsonNode;
20 20 import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import com.fasterxml.jackson.databind.node.ArrayNode;
21 22 import com.fasterxml.jackson.databind.node.ObjectNode;
22 23 import lombok.extern.slf4j.Slf4j;
23 24 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -422,6 +423,25 @@ public class RuleChainController extends BaseController {
422 423 }
423 424
424 425 private String msgToOutput(TbMsg msg) throws Exception {
  426 + JsonNode resultNode = convertMsgToOut(msg);
  427 + return objectMapper.writeValueAsString(resultNode);
  428 + }
  429 +
  430 + private String msgToOutput(List<TbMsg> msgs) throws Exception {
  431 + JsonNode resultNode;
  432 + if (msgs.size() > 1) {
  433 + resultNode = objectMapper.createArrayNode();
  434 + for (TbMsg msg : msgs) {
  435 + JsonNode convertedData = convertMsgToOut(msg);
  436 + ((ArrayNode) resultNode).add(convertedData);
  437 + }
  438 + } else {
  439 + resultNode = convertMsgToOut(msgs.get(0));
  440 + }
  441 + return objectMapper.writeValueAsString(resultNode);
  442 + }
  443 +
  444 + private JsonNode convertMsgToOut(TbMsg msg) throws Exception{
425 445 ObjectNode msgData = objectMapper.createObjectNode();
426 446 if (!StringUtils.isEmpty(msg.getData())) {
427 447 msgData.set("msg", objectMapper.readTree(msg.getData()));
... ... @@ -429,7 +449,8 @@ public class RuleChainController extends BaseController {
429 449 Map<String, String> metadata = msg.getMetaData().getData();
430 450 msgData.set("metadata", objectMapper.valueToTree(metadata));
431 451 msgData.put("msgType", msg.getType());
432   - return objectMapper.writeValueAsString(msgData);
  452 + return msgData;
433 453 }
434 454
  455 +
435 456 }
... ...
... ... @@ -108,13 +108,18 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
108 108 }
109 109
110 110 @Override
111   - public TbMsg executeUpdate(TbMsg msg) throws ScriptException {
  111 + public List<TbMsg> executeUpdate(TbMsg msg) throws ScriptException {
112 112 JsonNode result = executeScript(msg);
113   - if (!result.isObject()) {
  113 + if (result.isObject()) {
  114 + return Collections.singletonList(unbindMsg(result, msg));
  115 + } else if (result.isArray()){
  116 + List<TbMsg> res = new ArrayList<>(result.size());
  117 + result.forEach(jsonObject -> res.add(unbindMsg(jsonObject, msg)));
  118 + return res;
  119 + } else {
114 120 log.warn("Wrong result type: {}", result.getNodeType());
115 121 throw new ScriptException("Wrong result type: " + result.getNodeType());
116 122 }
117   - return unbindMsg(result, msg);
118 123 }
119 124
120 125 @Override
... ...
... ... @@ -110,7 +110,7 @@ security:
110 110 # Enable/disable claiming devices, if false -> the device's [claimingAllowed] SERVER_SCOPE attribute must be set to [true] to allow claiming specific device
111 111 allowClaimingByDefault: "${SECURITY_CLAIM_ALLOW_CLAIMING_BY_DEFAULT:true}"
112 112 # Time allowed to claim the device in milliseconds
113   - duration: "${SECURITY_CLAIM_DURATION:60000}" # 1 minute, note this value must equal claimDevices.timeToLiveInMinutes value
  113 + duration: "${SECURITY_CLAIM_DURATION:86400000}" # 1 minute, note this value must equal claimDevices.timeToLiveInMinutes value
114 114 basic:
115 115 enabled: "${SECURITY_BASIC_ENABLED:false}"
116 116 oauth2:
... ... @@ -348,8 +348,8 @@ caffeine:
348 348 timeToLiveInMinutes: 1440
349 349 maxSize: 0
350 350 claimDevices:
351   - timeToLiveInMinutes: 1
352   - maxSize: 0
  351 + timeToLiveInMinutes: 1440
  352 + maxSize: 1000
353 353 securitySettings:
354 354 timeToLiveInMinutes: 1440
355 355 maxSize: 0
... ...
... ... @@ -54,7 +54,8 @@
54 54 <cassandra.version>4.10.0</cassandra.version>
55 55 <metrics.version>4.0.5</metrics.version>
56 56 <cassandra-unit.version>4.3.1.0</cassandra-unit.version>
57   - <cassandra-all.version>3.11.9</cassandra-all.version>
  57 + <cassandra-all.version>3.11.10</cassandra-all.version>
  58 + <cassandra-driver-core.version>3.11.0</cassandra-driver-core.version>
58 59 <takari-cpsuite.version>1.2.7</takari-cpsuite.version>
59 60 <guava.version>28.2-jre</guava.version>
60 61 <caffeine.version>2.6.1</caffeine.version>
... ... @@ -1096,6 +1097,11 @@
1096 1097 <version>${cassandra.version}</version>
1097 1098 </dependency>
1098 1099 <dependency>
  1100 + <groupId>com.datastax.cassandra</groupId>
  1101 + <artifactId>cassandra-driver-core</artifactId>
  1102 + <version>${cassandra-driver-core.version}</version>
  1103 + </dependency>
  1104 + <dependency>
1099 1105 <groupId>io.dropwizard.metrics</groupId>
1100 1106 <artifactId>metrics-jmx</artifactId>
1101 1107 <version>${metrics.version}</version>
... ...
... ... @@ -25,7 +25,7 @@ import java.util.Set;
25 25
26 26 public interface ScriptEngine {
27 27
28   - TbMsg executeUpdate(TbMsg msg) throws ScriptException;
  28 + List<TbMsg> executeUpdate(TbMsg msg) throws ScriptException;
29 29
30 30 ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg);
31 31
... ...
... ... @@ -56,8 +56,8 @@
56 56 <artifactId>cassandra-all</artifactId>
57 57 </dependency>
58 58 <dependency>
59   - <groupId>com.datastax.oss</groupId>
60   - <artifactId>java-driver-core</artifactId>
  59 + <groupId>com.datastax.cassandra</groupId>
  60 + <artifactId>cassandra-driver-core</artifactId>
61 61 </dependency>
62 62 <dependency>
63 63 <groupId>commons-io</groupId>
... ... @@ -70,7 +70,7 @@
70 70 <plugins>
71 71 <plugin>
72 72 <artifactId>maven-assembly-plugin</artifactId>
73   - <configuration>
  73 + <configuration combine.self="override">
74 74 <archive>
75 75 <manifest>
76 76 <mainClass>org.thingsboard.client.tools.migrator.MigratorTool</mainClass>
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.client.tools.migrator;
  17 +
  18 +import org.apache.commons.io.FileUtils;
  19 +import org.apache.commons.io.LineIterator;
  20 +import org.apache.commons.lang3.StringUtils;
  21 +
  22 +import java.io.File;
  23 +import java.io.IOException;
  24 +import java.util.HashMap;
  25 +import java.util.Map;
  26 +
  27 +public class DictionaryParser {
  28 + private Map<String, String> dictionaryParsed = new HashMap<>();
  29 +
  30 + public DictionaryParser(File sourceFile) throws IOException {
  31 + parseDictionaryDump(FileUtils.lineIterator(sourceFile));
  32 + }
  33 +
  34 + public String getKeyByKeyId(String keyId) {
  35 + return dictionaryParsed.get(keyId);
  36 + }
  37 +
  38 + private boolean isBlockFinished(String line) {
  39 + return StringUtils.isBlank(line) || line.equals("\\.");
  40 + }
  41 +
  42 + private boolean isBlockStarted(String line) {
  43 + return line.startsWith("COPY public.ts_kv_dictionary (");
  44 + }
  45 +
  46 + private void parseDictionaryDump(LineIterator iterator) {
  47 + try {
  48 + String tempLine;
  49 + while (iterator.hasNext()) {
  50 + tempLine = iterator.nextLine();
  51 +
  52 + if (isBlockStarted(tempLine)) {
  53 + processBlock(iterator);
  54 + }
  55 + }
  56 + } finally {
  57 + iterator.close();
  58 + }
  59 + }
  60 +
  61 + private void processBlock(LineIterator lineIterator) {
  62 + String tempLine;
  63 + String[] lineSplited;
  64 + while(lineIterator.hasNext()) {
  65 + tempLine = lineIterator.nextLine();
  66 + if(isBlockFinished(tempLine)) {
  67 + return;
  68 + }
  69 +
  70 + lineSplited = tempLine.split("\t");
  71 + dictionaryParsed.put(lineSplited[1], lineSplited[0]);
  72 + }
  73 + }
  74 +}
... ...
... ... @@ -30,17 +30,26 @@ public class MigratorTool {
30 30 public static void main(String[] args) {
31 31 CommandLine cmd = parseArgs(args);
32 32
33   -
34 33 try {
35   - File latestSource = new File(cmd.getOptionValue("latestTelemetryFrom"));
36   - File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));
37   - File tsSource = new File(cmd.getOptionValue("telemetryFrom"));
38   - File tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));
39   - File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));
40 34 boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable"));
  35 + File allTelemetrySource = new File(cmd.getOptionValue("telemetryFrom"));
  36 + File tsSaveDir = null;
  37 + File partitionsSaveDir = null;
  38 + File latestSaveDir = null;
  39 +
  40 + RelatedEntitiesParser allEntityIdsAndTypes =
  41 + new RelatedEntitiesParser(new File(cmd.getOptionValue("relatedEntities")));
  42 + DictionaryParser dictionaryParser = new DictionaryParser(allTelemetrySource);
  43 +
  44 + if(cmd.getOptionValue("latestTelemetryOut") != null) {
  45 + latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));
  46 + }
  47 + if(cmd.getOptionValue("telemetryOut") != null) {
  48 + tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));
  49 + partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));
  50 + }
41 51
42   - PgCaLatestMigrator.migrateLatest(latestSource, latestSaveDir, castEnable);
43   - PostgresToCassandraTelemetryMigrator.migrateTs(tsSource, tsSaveDir, partitionsSaveDir, castEnable);
  52 + new PgCaMigrator(allTelemetrySource, tsSaveDir, partitionsSaveDir, latestSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable).migrate();
44 53
45 54 } catch (Throwable th) {
46 55 th.printStackTrace();
... ... @@ -52,30 +61,30 @@ public class MigratorTool {
52 61 private static CommandLine parseArgs(String[] args) {
53 62 Options options = new Options();
54 63
55   - Option latestTsOpt = new Option("latestFrom", "latestTelemetryFrom", true, "latest telemetry source file path");
56   - latestTsOpt.setRequired(true);
57   - options.addOption(latestTsOpt);
  64 + Option telemetryAllFrom = new Option("telemetryFrom", "telemetryFrom", true, "telemetry source file");
  65 + telemetryAllFrom.setRequired(true);
  66 + options.addOption(telemetryAllFrom);
58 67
59 68 Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir");
60   - latestTsOutOpt.setRequired(true);
  69 + latestTsOutOpt.setRequired(false);
61 70 options.addOption(latestTsOutOpt);
62 71
63   - Option tsOpt = new Option("tsFrom", "telemetryFrom", true, "telemetry source file path");
64   - tsOpt.setRequired(true);
65   - options.addOption(tsOpt);
66   -
67 72 Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir");
68   - tsOutOpt.setRequired(true);
  73 + tsOutOpt.setRequired(false);
69 74 options.addOption(tsOutOpt);
70 75
71 76 Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir");
72   - partitionOutOpt.setRequired(true);
  77 + partitionOutOpt.setRequired(false);
73 78 options.addOption(partitionOutOpt);
74 79
75 80 Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible");
76 81 castOpt.setRequired(true);
77 82 options.addOption(castOpt);
78 83
  84 + Option relatedOpt = new Option("relatedEntities", "relatedEntities", true, "related entities source file path");
  85 + relatedOpt.setRequired(true);
  86 + options.addOption(relatedOpt);
  87 +
79 88 HelpFormatter formatter = new HelpFormatter();
80 89 CommandLineParser parser = new BasicParser();
81 90
... ...
1   -/**
2   - * Copyright © 2016-2021 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.client.tools.migrator;
17   -
18   -import com.google.common.collect.Lists;
19   -import org.apache.cassandra.io.sstable.CQLSSTableWriter;
20   -import org.apache.commons.io.FileUtils;
21   -import org.apache.commons.io.LineIterator;
22   -import org.apache.commons.lang3.StringUtils;
23   -import org.apache.commons.lang3.math.NumberUtils;
24   -
25   -import java.io.File;
26   -import java.io.IOException;
27   -import java.util.ArrayList;
28   -import java.util.Arrays;
29   -import java.util.Date;
30   -import java.util.List;
31   -import java.util.UUID;
32   -import java.util.stream.Collectors;
33   -
34   -public class PgCaLatestMigrator {
35   -
36   - private static final long LOG_BATCH = 1000000;
37   - private static final long rowPerFile = 1000000;
38   -
39   -
40   - private static long linesProcessed = 0;
41   - private static long linesMigrated = 0;
42   - private static long castErrors = 0;
43   - private static long castedOk = 0;
44   -
45   - private static long currentWriterCount = 1;
46   - private static CQLSSTableWriter currentTsWriter = null;
47   -
48   - public static void migrateLatest(File sourceFile, File outDir, boolean castStringsIfPossible) throws IOException {
49   - long startTs = System.currentTimeMillis();
50   - long stepLineTs = System.currentTimeMillis();
51   - long stepOkLineTs = System.currentTimeMillis();
52   - LineIterator iterator = FileUtils.lineIterator(sourceFile);
53   - currentTsWriter = WriterBuilder.getTsWriter(outDir);
54   -
55   - boolean isBlockStarted = false;
56   - boolean isBlockFinished = false;
57   -
58   - String line;
59   - while (iterator.hasNext()) {
60   - if (linesProcessed++ % LOG_BATCH == 0) {
61   - System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);
62   - stepLineTs = System.currentTimeMillis();
63   - }
64   -
65   - line = iterator.nextLine();
66   -
67   - if (isBlockFinished) {
68   - break;
69   - }
70   -
71   - if (!isBlockStarted) {
72   - if (isBlockStarted(line)) {
73   - System.out.println();
74   - System.out.println();
75   - System.out.println(line);
76   - System.out.println();
77   - System.out.println();
78   - isBlockStarted = true;
79   - }
80   - continue;
81   - }
82   -
83   - if (isBlockFinished(line)) {
84   - isBlockFinished = true;
85   - } else {
86   - try {
87   - List<String> raw = Arrays.stream(line.trim().split("\t"))
88   - .map(String::trim)
89   - .filter(StringUtils::isNotEmpty)
90   - .collect(Collectors.toList());
91   - List<Object> values = toValues(raw);
92   -
93   - if (currentWriterCount == 0) {
94   - System.out.println(new Date() + " close writer " + new Date());
95   - currentTsWriter.close();
96   - currentTsWriter = WriterBuilder.getLatestWriter(outDir);
97   - }
98   -
99   - if (castStringsIfPossible) {
100   - currentTsWriter.addRow(castToNumericIfPossible(values));
101   - } else {
102   - currentTsWriter.addRow(values);
103   - }
104   - currentWriterCount++;
105   - if (currentWriterCount >= rowPerFile) {
106   - currentWriterCount = 0;
107   - }
108   -
109   - if (linesMigrated++ % LOG_BATCH == 0) {
110   - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs));
111   - stepOkLineTs = System.currentTimeMillis();
112   - }
113   - } catch (Exception ex) {
114   - System.out.println(ex.getMessage() + " -> " + line);
115   - }
116   -
117   - }
118   - }
119   -
120   - long endTs = System.currentTimeMillis();
121   - System.out.println();
122   - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs));
123   -
124   - currentTsWriter.close();
125   - System.out.println();
126   - System.out.println("Finished migrate Latest Telemetry");
127   - }
128   -
129   -
130   - private static List<Object> castToNumericIfPossible(List<Object> values) {
131   - try {
132   - if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
133   - Double casted = NumberUtils.createDouble(values.get(6).toString());
134   - List<Object> numeric = Lists.newArrayList();
135   - numeric.addAll(values);
136   - numeric.set(6, null);
137   - numeric.set(8, casted);
138   - castedOk++;
139   - return numeric;
140   - }
141   - } catch (Throwable th) {
142   - castErrors++;
143   - }
144   - return values;
145   - }
146   -
147   - private static List<Object> toValues(List<String> raw) {
148   - //expected Table structure:
149   -// COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
150   -
151   -
152   - List<Object> result = new ArrayList<>();
153   - result.add(raw.get(0));
154   - result.add(fromString(raw.get(1)));
155   - result.add(raw.get(2));
156   -
157   - long ts = Long.parseLong(raw.get(3));
158   - result.add(ts);
159   -
160   - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);
161   - result.add(raw.get(5).equals("\\N") ? null : raw.get(5));
162   - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));
163   - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7)));
164   - return result;
165   - }
166   -
167   - public static UUID fromString(String src) {
168   - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"
169   - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19));
170   - }
171   -
172   - private static boolean isBlockStarted(String line) {
173   - return line.startsWith("COPY public.ts_kv_latest");
174   - }
175   -
176   - private static boolean isBlockFinished(String line) {
177   - return StringUtils.isBlank(line) || line.equals("\\.");
178   - }
179   -}
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.client.tools.migrator;
  17 +
  18 +import com.google.common.collect.Lists;
  19 +import org.apache.cassandra.io.sstable.CQLSSTableWriter;
  20 +import org.apache.commons.io.FileUtils;
  21 +import org.apache.commons.io.LineIterator;
  22 +import org.apache.commons.lang3.StringUtils;
  23 +import org.apache.commons.lang3.math.NumberUtils;
  24 +
  25 +import java.io.File;
  26 +import java.io.IOException;
  27 +import java.time.Instant;
  28 +import java.time.LocalDateTime;
  29 +import java.time.ZoneOffset;
  30 +import java.time.temporal.ChronoUnit;
  31 +import java.util.ArrayList;
  32 +import java.util.Arrays;
  33 +import java.util.Date;
  34 +import java.util.HashSet;
  35 +import java.util.List;
  36 +import java.util.Set;
  37 +import java.util.UUID;
  38 +import java.util.function.Function;
  39 +import java.util.stream.Collectors;
  40 +
  41 +public class PgCaMigrator {
  42 +
  43 + private final long LOG_BATCH = 1000000;
  44 + private final long rowPerFile = 1000000;
  45 +
  46 + private long linesTsMigrated = 0;
  47 + private long linesLatestMigrated = 0;
  48 + private long castErrors = 0;
  49 + private long castedOk = 0;
  50 +
  51 + private long currentWriterCount = 1;
  52 +
  53 + private final File sourceFile;
  54 + private final boolean castStringIfPossible;
  55 +
  56 + private final RelatedEntitiesParser entityIdsAndTypes;
  57 + private final DictionaryParser keyParser;
  58 + private CQLSSTableWriter currentTsWriter;
  59 + private CQLSSTableWriter currentPartitionsWriter;
  60 + private CQLSSTableWriter currentTsLatestWriter;
  61 + private final Set<String> partitions = new HashSet<>();
  62 +
  63 + private File outTsDir;
  64 + private File outTsLatestDir;
  65 +
  66 + public PgCaMigrator(File sourceFile,
  67 + File ourTsDir,
  68 + File outTsPartitionDir,
  69 + File outTsLatestDir,
  70 + RelatedEntitiesParser allEntityIdsAndTypes,
  71 + DictionaryParser dictionaryParser,
  72 + boolean castStringsIfPossible) {
  73 + this.sourceFile = sourceFile;
  74 + this.entityIdsAndTypes = allEntityIdsAndTypes;
  75 + this.keyParser = dictionaryParser;
  76 + this.castStringIfPossible = castStringsIfPossible;
  77 + if(outTsLatestDir != null) {
  78 + this.currentTsLatestWriter = WriterBuilder.getLatestWriter(outTsLatestDir);
  79 + this.outTsLatestDir = outTsLatestDir;
  80 + }
  81 + if(ourTsDir != null) {
  82 + this.currentTsWriter = WriterBuilder.getTsWriter(ourTsDir);
  83 + this.currentPartitionsWriter = WriterBuilder.getPartitionWriter(outTsPartitionDir);
  84 + this.outTsDir = ourTsDir;
  85 + }
  86 + }
  87 +
  88 + public void migrate() throws IOException {
  89 + boolean isTsDone = false;
  90 + boolean isLatestDone = false;
  91 + String line;
  92 + LineIterator iterator = FileUtils.lineIterator(this.sourceFile);
  93 +
  94 + try {
  95 + while(iterator.hasNext()) {
  96 + line = iterator.nextLine();
  97 + if(!isLatestDone && isBlockLatestStarted(line)) {
  98 + System.out.println("START TO MIGRATE LATEST");
  99 + long start = System.currentTimeMillis();
  100 + processBlock(iterator, currentTsLatestWriter, outTsLatestDir, this::toValuesLatest);
  101 + System.out.println("TOTAL LINES MIGRATED: " + linesLatestMigrated + ", FORMING OF SSL FOR LATEST TS FINISHED WITH TIME: " + (System.currentTimeMillis() - start) + " ms.");
  102 + isLatestDone = true;
  103 + }
  104 +
  105 + if(!isTsDone && isBlockTsStarted(line)) {
  106 + System.out.println("START TO MIGRATE TS");
  107 + long start = System.currentTimeMillis();
  108 + processBlock(iterator, currentTsWriter, outTsDir, this::toValuesTs);
  109 + System.out.println("TOTAL LINES MIGRATED: " + linesTsMigrated + ", FORMING OF SSL FOR TS FINISHED WITH TIME: " + (System.currentTimeMillis() - start) + " ms.");
  110 + isTsDone = true;
  111 + }
  112 + }
  113 +
  114 + System.out.println("Partitions collected " + partitions.size());
  115 + long startTs = System.currentTimeMillis();
  116 + for (String partition : partitions) {
  117 + String[] split = partition.split("\\|");
  118 + List<Object> values = Lists.newArrayList();
  119 + values.add(split[0]);
  120 + values.add(UUID.fromString(split[1]));
  121 + values.add(split[2]);
  122 + values.add(Long.parseLong(split[3]));
  123 + currentPartitionsWriter.addRow(values);
  124 + }
  125 +
  126 + System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (System.currentTimeMillis() - startTs));
  127 +
  128 + System.out.println();
  129 + System.out.println("Finished migrate Telemetry");
  130 +
  131 + } finally {
  132 + iterator.close();
  133 + currentTsLatestWriter.close();
  134 + currentTsWriter.close();
  135 + currentPartitionsWriter.close();
  136 + }
  137 + }
  138 +
  139 + private void logLinesProcessed(long lines) {
  140 + if (lines % LOG_BATCH == 0) {
  141 + System.out.println(new Date() + " lines processed = " + lines + " in, castOk " + castedOk + " castErr " + castErrors);
  142 + }
  143 + }
  144 +
  145 + private void logLinesMigrated(long lines) {
  146 + if(lines % LOG_BATCH == 0) {
  147 + System.out.println(new Date() + " lines migrated = " + lines + " in, castOk " + castedOk + " castErr " + castErrors);
  148 + }
  149 + }
  150 +
  151 + private void addTypeIdKey(List<Object> result, List<String> raw) {
  152 + result.add(entityIdsAndTypes.getEntityType(raw.get(0)));
  153 + result.add(UUID.fromString(raw.get(0)));
  154 + result.add(keyParser.getKeyByKeyId(raw.get(1)));
  155 + }
  156 +
  157 + private void addPartitions(List<Object> result, List<String> raw) {
  158 + long ts = Long.parseLong(raw.get(2));
  159 + long partition = toPartitionTs(ts);
  160 + result.add(partition);
  161 + result.add(ts);
  162 + }
  163 +
  164 + private void addTimeseries(List<Object> result, List<String> raw) {
  165 + result.add(Long.parseLong(raw.get(2)));
  166 + }
  167 +
  168 + private void addValues(List<Object> result, List<String> raw) {
  169 + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE);
  170 + result.add(raw.get(4).equals("\\N") ? null : raw.get(4));
  171 + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5)));
  172 + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6)));
  173 + result.add(raw.get(7).equals("\\N") ? null : raw.get(7));
  174 + }
  175 +
  176 + private List<Object> toValuesTs(List<String> raw) {
  177 +
  178 + logLinesMigrated(linesTsMigrated++);
  179 +
  180 + List<Object> result = new ArrayList<>();
  181 +
  182 + addTypeIdKey(result, raw);
  183 + addPartitions(result, raw);
  184 + addValues(result, raw);
  185 +
  186 + processPartitions(result);
  187 +
  188 + return result;
  189 + }
  190 +
  191 + private List<Object> toValuesLatest(List<String> raw) {
  192 + logLinesMigrated(linesLatestMigrated++);
  193 + List<Object> result = new ArrayList<>();
  194 +
  195 + addTypeIdKey(result, raw);
  196 + addTimeseries(result, raw);
  197 + addValues(result, raw);
  198 +
  199 + return result;
  200 + }
  201 +
  202 + private long toPartitionTs(long ts) {
  203 + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
  204 + return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();
  205 + }
  206 +
  207 + private void processPartitions(List<Object> values) {
  208 + String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3);
  209 + partitions.add(key);
  210 + }
  211 +
  212 + private void processBlock(LineIterator iterator, CQLSSTableWriter writer, File outDir, Function<List<String>, List<Object>> function) {
  213 + String currentLine;
  214 + long linesProcessed = 0;
  215 + while(iterator.hasNext()) {
  216 + logLinesProcessed(linesProcessed++);
  217 + currentLine = iterator.nextLine();
  218 + if(isBlockFinished(currentLine)) {
  219 + return;
  220 + }
  221 +
  222 + try {
  223 + List<String> raw = Arrays.stream(currentLine.trim().split("\t"))
  224 + .map(String::trim)
  225 + .collect(Collectors.toList());
  226 + List<Object> values = function.apply(raw);
  227 +
  228 + if (this.currentWriterCount == 0) {
  229 + System.out.println(new Date() + " close writer " + new Date());
  230 + writer.close();
  231 + writer = WriterBuilder.getLatestWriter(outDir);
  232 + }
  233 +
  234 + if (this.castStringIfPossible) {
  235 + writer.addRow(castToNumericIfPossible(values));
  236 + } else {
  237 + writer.addRow(values);
  238 + }
  239 +
  240 + currentWriterCount++;
  241 + if (currentWriterCount >= rowPerFile) {
  242 + currentWriterCount = 0;
  243 + }
  244 + } catch (Exception ex) {
  245 + System.out.println(ex.getMessage() + " -> " + currentLine);
  246 + }
  247 + }
  248 + }
  249 +
  250 + private List<Object> castToNumericIfPossible(List<Object> values) {
  251 + try {
  252 + if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
  253 + Double casted = NumberUtils.createDouble(values.get(6).toString());
  254 + List<Object> numeric = Lists.newArrayList();
  255 + numeric.addAll(values);
  256 + numeric.set(6, null);
  257 + numeric.set(8, casted);
  258 + castedOk++;
  259 + return numeric;
  260 + }
  261 + } catch (Throwable th) {
  262 + castErrors++;
  263 + }
  264 +
  265 + processPartitions(values);
  266 +
  267 + return values;
  268 + }
  269 +
  270 + private boolean isBlockFinished(String line) {
  271 + return StringUtils.isBlank(line) || line.equals("\\.");
  272 + }
  273 +
  274 + private boolean isBlockTsStarted(String line) {
  275 + return line.startsWith("COPY public.ts_kv (");
  276 + }
  277 +
  278 + private boolean isBlockLatestStarted(String line) {
  279 + return line.startsWith("COPY public.ts_kv_latest (");
  280 + }
  281 +
  282 +}
... ...
1   -/**
2   - * Copyright © 2016-2021 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.client.tools.migrator;
17   -
18   -import com.google.common.collect.Lists;
19   -import org.apache.cassandra.io.sstable.CQLSSTableWriter;
20   -import org.apache.commons.io.FileUtils;
21   -import org.apache.commons.io.LineIterator;
22   -import org.apache.commons.lang3.StringUtils;
23   -import org.apache.commons.lang3.math.NumberUtils;
24   -
25   -import java.io.File;
26   -import java.io.IOException;
27   -import java.time.Instant;
28   -import java.time.LocalDateTime;
29   -import java.time.ZoneOffset;
30   -import java.time.temporal.ChronoUnit;
31   -import java.util.ArrayList;
32   -import java.util.Arrays;
33   -import java.util.Date;
34   -import java.util.HashSet;
35   -import java.util.List;
36   -import java.util.Set;
37   -import java.util.UUID;
38   -import java.util.stream.Collectors;
39   -
40   -public class PostgresToCassandraTelemetryMigrator {
41   -
42   - private static final long LOG_BATCH = 1000000;
43   - private static final long rowPerFile = 1000000;
44   -
45   -
46   - private static long linesProcessed = 0;
47   - private static long linesMigrated = 0;
48   - private static long castErrors = 0;
49   - private static long castedOk = 0;
50   -
51   - private static long currentWriterCount = 1;
52   - private static CQLSSTableWriter currentTsWriter = null;
53   - private static CQLSSTableWriter currentPartitionWriter = null;
54   -
55   - private static Set<String> partitions = new HashSet<>();
56   -
57   -
58   - public static void migrateTs(File sourceFile, File outTsDir, File outPartitionDir, boolean castStringsIfPossible) throws IOException {
59   - long startTs = System.currentTimeMillis();
60   - long stepLineTs = System.currentTimeMillis();
61   - long stepOkLineTs = System.currentTimeMillis();
62   - LineIterator iterator = FileUtils.lineIterator(sourceFile);
63   - currentTsWriter = WriterBuilder.getTsWriter(outTsDir);
64   - currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir);
65   -
66   - boolean isBlockStarted = false;
67   - boolean isBlockFinished = false;
68   -
69   - String line;
70   - while (iterator.hasNext()) {
71   - if (linesProcessed++ % LOG_BATCH == 0) {
72   - System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);
73   - stepLineTs = System.currentTimeMillis();
74   - }
75   -
76   - line = iterator.nextLine();
77   -
78   - if (isBlockFinished) {
79   - break;
80   - }
81   -
82   - if (!isBlockStarted) {
83   - if (isBlockStarted(line)) {
84   - System.out.println();
85   - System.out.println();
86   - System.out.println(line);
87   - System.out.println();
88   - System.out.println();
89   - isBlockStarted = true;
90   - }
91   - continue;
92   - }
93   -
94   - if (isBlockFinished(line)) {
95   - isBlockFinished = true;
96   - } else {
97   - try {
98   - List<String> raw = Arrays.stream(line.trim().split("\t"))
99   - .map(String::trim)
100   - .filter(StringUtils::isNotEmpty)
101   - .collect(Collectors.toList());
102   - List<Object> values = toValues(raw);
103   -
104   - if (currentWriterCount == 0) {
105   - System.out.println(new Date() + " close writer " + new Date());
106   - currentTsWriter.close();
107   - currentTsWriter = WriterBuilder.getTsWriter(outTsDir);
108   - }
109   -
110   - if (castStringsIfPossible) {
111   - currentTsWriter.addRow(castToNumericIfPossible(values));
112   - } else {
113   - currentTsWriter.addRow(values);
114   - }
115   - processPartitions(values);
116   - currentWriterCount++;
117   - if (currentWriterCount >= rowPerFile) {
118   - currentWriterCount = 0;
119   - }
120   -
121   - if (linesMigrated++ % LOG_BATCH == 0) {
122   - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " partitions = " + partitions.size());
123   - stepOkLineTs = System.currentTimeMillis();
124   - }
125   - } catch (Exception ex) {
126   - System.out.println(ex.getMessage() + " -> " + line);
127   - }
128   -
129   - }
130   - }
131   -
132   - long endTs = System.currentTimeMillis();
133   - System.out.println();
134   - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs));
135   - System.out.println("Partitions collected " + partitions.size());
136   -
137   - startTs = System.currentTimeMillis();
138   - for (String partition : partitions) {
139   - String[] split = partition.split("\\|");
140   - List<Object> values = Lists.newArrayList();
141   - values.add(split[0]);
142   - values.add(UUID.fromString(split[1]));
143   - values.add(split[2]);
144   - values.add(Long.parseLong(split[3]));
145   - currentPartitionWriter.addRow(values);
146   - }
147   - currentPartitionWriter.close();
148   - endTs = System.currentTimeMillis();
149   - System.out.println();
150   - System.out.println();
151   - System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (endTs - startTs));
152   -
153   -
154   - currentTsWriter.close();
155   - System.out.println();
156   - System.out.println("Finished migrate Telemetry");
157   - }
158   -
159   - private static List<Object> castToNumericIfPossible(List<Object> values) {
160   - try {
161   - if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
162   - Double casted = NumberUtils.createDouble(values.get(6).toString());
163   - List<Object> numeric = Lists.newArrayList();
164   - numeric.addAll(values);
165   - numeric.set(6, null);
166   - numeric.set(8, casted);
167   - castedOk++;
168   - return numeric;
169   - }
170   - } catch (Throwable th) {
171   - castErrors++;
172   - }
173   - return values;
174   - }
175   -
176   - private static void processPartitions(List<Object> values) {
177   - String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3);
178   - partitions.add(key);
179   - }
180   -
181   - private static List<Object> toValues(List<String> raw) {
182   - //expected Table structure:
183   -// COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
184   -
185   -
186   - List<Object> result = new ArrayList<>();
187   - result.add(raw.get(0));
188   - result.add(fromString(raw.get(1)));
189   - result.add(raw.get(2));
190   -
191   - long ts = Long.parseLong(raw.get(3));
192   - long partition = toPartitionTs(ts);
193   - result.add(partition);
194   - result.add(ts);
195   -
196   - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);
197   - result.add(raw.get(5).equals("\\N") ? null : raw.get(5));
198   - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));
199   - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7)));
200   - return result;
201   - }
202   -
203   - public static UUID fromString(String src) {
204   - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"
205   - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19));
206   - }
207   -
208   - private static long toPartitionTs(long ts) {
209   - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
210   - return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();
211   -// return TsPartitionDate.MONTHS.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
212   - }
213   -
214   - private static boolean isBlockStarted(String line) {
215   - return line.startsWith("COPY public.ts_kv");
216   - }
217   -
218   - private static boolean isBlockFinished(String line) {
219   - return StringUtils.isBlank(line) || line.equals("\\.");
220   - }
221   -
222   -}
... ... @@ -21,37 +21,41 @@ It will generate single jar file with all required dependencies inside `target d
21 21 #### Dump data from the source Postgres Database
22 22 *Do not use compression if possible because Tool can only work with uncompressed file
23 23
24   -1. Dump table `ts_kv` table:
25   -
26   - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv > ts_kv.dmp`
27   -
28   -2. Dump table `ts_kv_latest` table:
29   -
30   - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv_latest > ts_kv_latest.dmp`
  24 +1. Dump related tables that need to correct save telemetry
  25 +
  26 + `pg_dump -h localhost -U postgres -d thingsboard -T admin_settings -T attribute_kv -T audit_log -T component_discriptor -T device_credentials -T event -T oauth2_client_registration -T oauth2_client_registration_info -T oauth2_client_registration_template -T relation -T rule_node_state tb_schema_settings -T user_credentials > related_entities.dmp`
  27 +
  28 +2. Dump `ts_kv` and child:
  29 +
  30 + `pg_dump -h localhost -U postgres -d thingsboard --load-via-partition-root --data-only -t ts_kv* > ts_kv_all.dmp`
31 31
32   -3. [Optional] move table dumps to the instance where cassandra will be hosted
  32 +3. [Optional] Move table dumps to the instance where cassandra will be hosted
33 33
34 34 #### Prepare directory structure for SSTables
35 35 Tool use 3 different directories for saving SSTables - `ts_kv_cf`, `ts_kv_latest_cf`, `ts_kv_partitions_cf`
36 36
37 37 Create 3 empty directories. For example:
38 38
39   - /home/ubunut/migration/ts
40   - /home/ubunut/migration/ts_latest
41   - /home/ubunut/migration/ts_partition
  39 + /home/user/migration/ts
  40 + /home/user/migration/ts_latest
  41 + /home/user/migration/ts_partition
42 42
43 43 #### Run tool
44   -*Note: if you run this tool on remote instance - don't forget to execute this command in `screen` to avoid unexpected termination
  44 +
  45 +**If you want to migrate just `ts_kv` without `ts_kv_latest` or vice versa don't use arguments (paths) for output files*
  46 +
  47 +**Note: if you run this tool on remote instance - don't forget to execute this command in `screen` to avoid unexpected termination*
45 48
46 49 ```
47   -java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar
48   - -latestFrom ./source/ts_kv_latest.dmp
49   - -latestOut /home/ubunut/migration/ts_latest
50   - -tsFrom ./source/ts_kv.dmp
51   - -tsOut /home/ubunut/migration/ts
52   - -partitionsOut /home/ubunut/migration/ts_partition
53   - -castEnable false
  50 +java -jar ./tools-3.2.2-SNAPSHOT-jar-with-dependencies.jar
  51 + -telemetryFrom /home/user/dump/ts_kv_all.dmp
  52 + -relatedEntities /home/user/dump/related_entities.dmp
  53 + -latestOut /home/user/migration/ts_latest
  54 + -tsOut /home/user/migration/ts
  55 + -partitionsOut /home/user/migration/ts_partition
  56 + -castEnable false
54 57 ```
  58 +*Use your paths for program arguments*
55 59
56 60 Tool execution time depends on DB size, CPU resources and Disk throughput
57 61
... ... @@ -59,22 +63,23 @@ Tool execution time depends on DB size, CPU resources and Disk throughput
59 63 * Note that this this part works only for single node Cassandra Cluster. If you have more nodes - it is better to use `sstableloader` tool.
60 64
61 65 1. [Optional] install Cassandra on the instance
62   -2. [Optional] Using `cqlsh` create `thingsboard` keyspace and requred tables from this file `schema-ts.cql`
  66 +2. [Optional] Using `cqlsh` create `thingsboard` keyspace and requred tables from this files `schema-ts.cql` and `schema-ts-latest.cql` using `source` command
63 67 3. Stop Cassandra
64   -4. Copy generated SSTable files into cassandra data dir:
  68 +4. Look at `/var/lib/cassandra/data/thingsboard` and check for names of data folders
  69 +5. Copy generated SSTable files into cassandra data dir using next command:
65 70
66 71 ```
67   - sudo find /home/ubunut/migration/ts -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_cf-0e9aaf00ee5511e9a5fa7d6f489ffd13/ \;
68   - sudo find /home/ubunut/migration/ts_latest -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_latest_cf-161449d0ee5511e9a5fa7d6f489ffd13/ \;
69   - sudo find /home/ubunut/migration/ts_partition -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_partitions_cf-12e8fa80ee5511e9a5fa7d6f489ffd13/ \;
  72 + sudo find /home/user/migration/ts -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_cf-0e9aaf00ee5511e9a5fa7d6f489ffd13/ \;
  73 + sudo find /home/user/migration/ts_latest -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_latest_cf-161449d0ee5511e9a5fa7d6f489ffd13/ \;
  74 + sudo find /home/user/migration/ts_partition -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_partitions_cf-12e8fa80ee5511e9a5fa7d6f489ffd13/ \;
70 75 ```
71   -
72   -5. Start Cassandra service and trigger compaction
  76 + *Pay attention! Data folders have similar name `ts_kv_cf-0e9aaf00ee5511e9a5fa7d6f489ffd13`, but you have to use own*
  77 +6. Start Cassandra service and trigger compaction
  78 +
  79 + Trigger compactions: `nodetool compact thingsboard`
  80 +
  81 + Check compaction status: `nodetool compactionstats`
73 82
74   -```
75   - trigger compactions: nodetool compact thingsboard
76   - check compaction status: nodetool compactionstats
77   -```
78 83
79 84 ## Switch Thignsboard into Hybrid Mode
80 85
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.client.tools.migrator;
  17 +
  18 +import org.apache.commons.io.FileUtils;
  19 +import org.apache.commons.io.LineIterator;
  20 +import org.apache.commons.lang3.StringUtils;
  21 +import org.thingsboard.server.common.data.EntityType;
  22 +
  23 +import java.io.File;
  24 +import java.io.IOException;
  25 +import java.util.HashMap;
  26 +import java.util.Map;
  27 +
  28 +public class RelatedEntitiesParser {
  29 + private final Map<String, String> allEntityIdsAndTypes = new HashMap<>();
  30 +
  31 + private final Map<String, EntityType> tableNameAndEntityType = Map.ofEntries(
  32 + Map.entry("COPY public.alarm ", EntityType.ALARM),
  33 + Map.entry("COPY public.asset ", EntityType.ASSET),
  34 + Map.entry("COPY public.customer ", EntityType.CUSTOMER),
  35 + Map.entry("COPY public.dashboard ", EntityType.DASHBOARD),
  36 + Map.entry("COPY public.device ", EntityType.DEVICE),
  37 + Map.entry("COPY public.rule_chain ", EntityType.RULE_CHAIN),
  38 + Map.entry("COPY public.rule_node ", EntityType.RULE_NODE),
  39 + Map.entry("COPY public.tenant ", EntityType.TENANT),
  40 + Map.entry("COPY public.tb_user ", EntityType.USER),
  41 + Map.entry("COPY public.entity_view ", EntityType.ENTITY_VIEW),
  42 + Map.entry("COPY public.widgets_bundle ", EntityType.WIDGETS_BUNDLE),
  43 + Map.entry("COPY public.widget_type ", EntityType.WIDGET_TYPE),
  44 + Map.entry("COPY public.tenant_profile ", EntityType.TENANT_PROFILE),
  45 + Map.entry("COPY public.device_profile ", EntityType.DEVICE_PROFILE),
  46 + Map.entry("COPY public.api_usage_state ", EntityType.API_USAGE_STATE)
  47 + );
  48 +
  49 + public RelatedEntitiesParser(File source) throws IOException {
  50 + processAllTables(FileUtils.lineIterator(source));
  51 + }
  52 +
  53 + public String getEntityType(String uuid) {
  54 + return this.allEntityIdsAndTypes.get(uuid);
  55 + }
  56 +
  57 + private boolean isBlockFinished(String line) {
  58 + return StringUtils.isBlank(line) || line.equals("\\.");
  59 + }
  60 +
  61 + private void processAllTables(LineIterator lineIterator) {
  62 + String currentLine;
  63 + try {
  64 + while (lineIterator.hasNext()) {
  65 + currentLine = lineIterator.nextLine();
  66 + for(Map.Entry<String, EntityType> entry : tableNameAndEntityType.entrySet()) {
  67 + if(currentLine.startsWith(entry.getKey())) {
  68 + processBlock(lineIterator, entry.getValue());
  69 + }
  70 + }
  71 + }
  72 + } finally {
  73 + lineIterator.close();
  74 + }
  75 + }
  76 +
  77 + private void processBlock(LineIterator lineIterator, EntityType entityType) {
  78 + String currentLine;
  79 + while(lineIterator.hasNext()) {
  80 + currentLine = lineIterator.nextLine();
  81 + if(isBlockFinished(currentLine)) {
  82 + return;
  83 + }
  84 + allEntityIdsAndTypes.put(currentLine.split("\t")[0], entityType.name());
  85 + }
  86 + }
  87 +}
... ...
... ... @@ -31,6 +31,7 @@ public class WriterBuilder {
31 31 " str_v text,\n" +
32 32 " long_v bigint,\n" +
33 33 " dbl_v double,\n" +
  34 + " json_v text,\n" +
34 35 " PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" +
35 36 ");";
36 37
... ... @@ -43,6 +44,7 @@ public class WriterBuilder {
43 44 " str_v text,\n" +
44 45 " long_v bigint,\n" +
45 46 " dbl_v double,\n" +
  47 + " json_v text,\n" +
46 48 " PRIMARY KEY (( entity_type, entity_id ), key)\n" +
47 49 ") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };";
48 50
... ... @@ -59,8 +61,8 @@ public class WriterBuilder {
59 61 return CQLSSTableWriter.builder()
60 62 .inDirectory(dir)
61 63 .forTable(tsSchema)
62   - .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v) " +
63   - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
  64 + .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
  65 + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
64 66 .build();
65 67 }
66 68
... ... @@ -68,8 +70,8 @@ public class WriterBuilder {
68 70 return CQLSSTableWriter.builder()
69 71 .inDirectory(dir)
70 72 .forTable(latestSchema)
71   - .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
72   - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
  73 + .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
  74 + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
73 75 .build();
74 76 }
75 77
... ...