Commit 5dc3d075aac4dd371d872b109a27ad66a12b2c60

Authored by Viacheslav Klimov
1 parent e33a336e

Type cast of timeseries and attributes for bulk import

... ... @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
19 19 import com.fasterxml.jackson.databind.node.TextNode;
20 20 import org.springframework.stereotype.Service;
21 21 import org.thingsboard.common.util.JacksonUtil;
  22 +import org.thingsboard.server.cluster.TbClusterService;
22 23 import org.thingsboard.server.common.data.asset.Asset;
23 24 import org.thingsboard.server.dao.asset.AssetService;
24 25 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
... ... @@ -27,7 +28,6 @@ import org.thingsboard.server.service.action.EntityActionService;
27 28 import org.thingsboard.server.service.importing.AbstractBulkImportService;
28 29 import org.thingsboard.server.service.importing.BulkImportRequest;
29 30 import org.thingsboard.server.service.importing.ImportedEntityInfo;
30   -import org.thingsboard.server.service.queue.TbClusterService;
31 31 import org.thingsboard.server.service.security.AccessValidator;
32 32 import org.thingsboard.server.service.security.model.SecurityUser;
33 33 import org.thingsboard.server.service.security.permission.AccessControlService;
... ... @@ -36,7 +36,6 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
36 36 import java.util.Map;
37 37 import java.util.Optional;
38 38
39   -
40 39 @Service
41 40 @TbCoreComponent
42 41 public class AssetBulkImportService extends AbstractBulkImportService<Asset> {
... ...
... ... @@ -22,6 +22,7 @@ import lombok.SneakyThrows;
22 22 import org.apache.commons.collections.CollectionUtils;
23 23 import org.springframework.stereotype.Service;
24 24 import org.thingsboard.common.util.JacksonUtil;
  25 +import org.thingsboard.server.cluster.TbClusterService;
25 26 import org.thingsboard.server.common.data.Device;
26 27 import org.thingsboard.server.common.data.DeviceProfile;
27 28 import org.thingsboard.server.common.data.DeviceProfileProvisionType;
... ... @@ -48,7 +49,6 @@ import org.thingsboard.server.service.importing.AbstractBulkImportService;
48 49 import org.thingsboard.server.service.importing.BulkImportColumnType;
49 50 import org.thingsboard.server.service.importing.BulkImportRequest;
50 51 import org.thingsboard.server.service.importing.ImportedEntityInfo;
51   -import org.thingsboard.server.service.queue.TbClusterService;
52 52 import org.thingsboard.server.service.security.AccessValidator;
53 53 import org.thingsboard.server.service.security.model.SecurityUser;
54 54 import org.thingsboard.server.service.security.permission.AccessControlService;
... ...
... ... @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
19 19 import com.fasterxml.jackson.databind.node.TextNode;
20 20 import org.springframework.stereotype.Service;
21 21 import org.thingsboard.common.util.JacksonUtil;
  22 +import org.thingsboard.server.cluster.TbClusterService;
22 23 import org.thingsboard.server.common.data.edge.Edge;
23 24 import org.thingsboard.server.dao.edge.EdgeService;
24 25 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
... ... @@ -27,7 +28,6 @@ import org.thingsboard.server.service.action.EntityActionService;
27 28 import org.thingsboard.server.service.importing.AbstractBulkImportService;
28 29 import org.thingsboard.server.service.importing.BulkImportRequest;
29 30 import org.thingsboard.server.service.importing.ImportedEntityInfo;
30   -import org.thingsboard.server.service.queue.TbClusterService;
31 31 import org.thingsboard.server.service.security.AccessValidator;
32 32 import org.thingsboard.server.service.security.model.SecurityUser;
33 33 import org.thingsboard.server.service.security.permission.AccessControlService;
... ...
... ... @@ -21,6 +21,7 @@ import com.google.gson.JsonPrimitive;
21 21 import lombok.RequiredArgsConstructor;
22 22 import lombok.SneakyThrows;
23 23 import org.apache.commons.lang3.StringUtils;
  24 +import org.thingsboard.server.cluster.TbClusterService;
24 25 import org.thingsboard.server.common.data.BaseData;
25 26 import org.thingsboard.server.common.data.TenantProfile;
26 27 import org.thingsboard.server.common.data.audit.ActionType;
... ... @@ -35,7 +36,6 @@ import org.thingsboard.server.controller.BaseController;
35 36 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
36 37 import org.thingsboard.server.service.action.EntityActionService;
37 38 import org.thingsboard.server.service.importing.BulkImportRequest.ColumnMapping;
38   -import org.thingsboard.server.service.queue.TbClusterService;
39 39 import org.thingsboard.server.service.security.AccessValidator;
40 40 import org.thingsboard.server.service.security.model.SecurityUser;
41 41 import org.thingsboard.server.service.security.permission.AccessControlService;
... ... @@ -127,7 +127,8 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
127 127
128 128 @SneakyThrows
129 129 private void saveTelemetry(SecurityUser user, E entity, Map.Entry<BulkImportColumnType, JsonObject> kvsEntry) {
130   - List<TsKvEntry> timeseries = JsonConverter.convertToTelemetry(kvsEntry.getValue(), System.currentTimeMillis()).entrySet().stream()
  130 + List<TsKvEntry> timeseries = JsonConverter.convertToTelemetry(kvsEntry.getValue(), System.currentTimeMillis(), false, true)
  131 + .entrySet().stream()
131 132 .flatMap(entry -> entry.getValue().stream().map(kvEntry -> new BasicTsKvEntry(entry.getKey(), kvEntry)))
132 133 .collect(Collectors.toList());
133 134
... ... @@ -154,7 +155,7 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent
154 155 @SneakyThrows
155 156 private void saveAttributes(SecurityUser user, E entity, Map.Entry<BulkImportColumnType, JsonObject> kvsEntry, BulkImportColumnType kvType) {
156 157 String scope = kvType.getKey();
157   - List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue()));
  158 + List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue(), true));
158 159
159 160 accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> {
160 161 tsSubscriptionService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<>() {
... ...
... ... @@ -76,7 +76,7 @@ public class JsonConverter {
76 76
77 77 public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException {
78 78 PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
79   - convertToTelemetry(jsonElement, ts, null, builder);
  79 + convertToTelemetry(jsonElement, ts, null, builder, isTypeCastEnabled);
80 80 return builder.build();
81 81 }
82 82
... ... @@ -84,13 +84,13 @@ public class JsonConverter {
84 84 return convertToTelemetryProto(jsonElement, System.currentTimeMillis());
85 85 }
86 86
87   - private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder) {
  87 + private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, boolean typeCastEnabled) {
88 88 if (jsonElement.isJsonObject()) {
89   - parseObject(systemTs, result, builder, jsonElement.getAsJsonObject());
  89 + parseObject(systemTs, result, builder, jsonElement.getAsJsonObject(), typeCastEnabled);
90 90 } else if (jsonElement.isJsonArray()) {
91 91 jsonElement.getAsJsonArray().forEach(je -> {
92 92 if (je.isJsonObject()) {
93   - parseObject(systemTs, result, builder, je.getAsJsonObject());
  93 + parseObject(systemTs, result, builder, je.getAsJsonObject(), typeCastEnabled);
94 94 } else {
95 95 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
96 96 }
... ... @@ -100,11 +100,11 @@ public class JsonConverter {
100 100 }
101 101 }
102 102
103   - private static void parseObject(long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, JsonObject jo) {
  103 + private static void parseObject(long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, JsonObject jo, boolean typeCastEnabled) {
104 104 if (result != null) {
105   - parseObject(result, systemTs, jo);
  105 + parseObject(result, systemTs, jo, typeCastEnabled);
106 106 } else {
107   - parseObject(builder, systemTs, jo);
  107 + parseObject(builder, systemTs, jo, typeCastEnabled);
108 108 }
109 109 }
110 110
... ... @@ -146,7 +146,7 @@ public class JsonConverter {
146 146 public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException {
147 147 if (jsonObject.isJsonObject()) {
148 148 PostAttributeMsg.Builder result = PostAttributeMsg.newBuilder();
149   - List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject());
  149 + List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject(), isTypeCastEnabled);
150 150 result.addAllKv(keyValueList);
151 151 return result.build();
152 152 } else {
... ... @@ -164,29 +164,29 @@ public class JsonConverter {
164 164 return result;
165 165 }
166 166
167   - private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonObject jo) {
  167 + private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonObject jo, boolean typeCastEnabled) {
168 168 if (jo.has("ts") && jo.has("values")) {
169   - parseWithTs(builder, jo);
  169 + parseWithTs(builder, jo, typeCastEnabled);
170 170 } else {
171   - parseWithoutTs(builder, systemTs, jo);
  171 + parseWithoutTs(builder, systemTs, jo, typeCastEnabled);
172 172 }
173 173 }
174 174
175   - private static void parseWithoutTs(PostTelemetryMsg.Builder request, long systemTs, JsonObject jo) {
  175 + private static void parseWithoutTs(PostTelemetryMsg.Builder request, long systemTs, JsonObject jo, boolean typeCastEnabled) {
176 176 TsKvListProto.Builder builder = TsKvListProto.newBuilder();
177 177 builder.setTs(systemTs);
178   - builder.addAllKv(parseProtoValues(jo));
  178 + builder.addAllKv(parseProtoValues(jo, typeCastEnabled));
179 179 request.addTsKvList(builder.build());
180 180 }
181 181
182   - private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
  182 + private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo, boolean typeCastEnabled) {
183 183 TsKvListProto.Builder builder = TsKvListProto.newBuilder();
184 184 builder.setTs(jo.get("ts").getAsLong());
185   - builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
  185 + builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject(), typeCastEnabled));
186 186 request.addTsKvList(builder.build());
187 187 }
188 188
189   - private static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
  189 + private static List<KeyValueProto> parseProtoValues(JsonObject valuesObject, boolean typeCastEnabled) {
190 190 List<KeyValueProto> result = new ArrayList<>();
191 191 for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
192 192 JsonElement element = valueEntry.getValue();
... ... @@ -197,9 +197,9 @@ public class JsonConverter {
197 197 String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
198 198 throw new JsonSyntaxException(message);
199 199 }
200   - if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
  200 + if (typeCastEnabled && isNumber(value)) {
201 201 try {
202   - result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
  202 + result.add(buildNumericKeyValueProto(value, valueEntry.getKey(), typeCastEnabled));
203 203 } catch (RuntimeException th) {
204 204 result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
205 205 .setStringV(value.getAsString()).build());
... ... @@ -212,7 +212,7 @@ public class JsonConverter {
212 212 result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.BOOLEAN_V)
213 213 .setBoolV(value.getAsBoolean()).build());
214 214 } else if (value.isNumber()) {
215   - result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
  215 + result.add(buildNumericKeyValueProto(value, valueEntry.getKey(), typeCastEnabled));
216 216 } else if (!value.isJsonNull()) {
217 217 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
218 218 }
... ... @@ -225,15 +225,15 @@ public class JsonConverter {
225 225 return result;
226 226 }
227 227
228   - private static KeyValueProto buildNumericKeyValueProto(JsonPrimitive value, String key) {
229   - String valueAsString = value.getAsString();
  228 + private static KeyValueProto buildNumericKeyValueProto(JsonPrimitive value, String key, boolean typeCastEnabled) {
  229 + String valueAsString = value.getAsString().replace(',', '.');
230 230 KeyValueProto.Builder builder = KeyValueProto.newBuilder().setKey(key);
231 231 var bd = new BigDecimal(valueAsString);
232 232 if (bd.stripTrailingZeros().scale() <= 0 && !isSimpleDouble(valueAsString)) {
233 233 try {
234 234 return builder.setType(KeyValueType.LONG_V).setLongV(bd.longValueExact()).build();
235 235 } catch (ArithmeticException e) {
236   - if (isTypeCastEnabled) {
  236 + if (typeCastEnabled) {
237 237 return builder.setType(KeyValueType.STRING_V).setStringV(bd.toPlainString()).build();
238 238 } else {
239 239 throw new JsonSyntaxException("Big integer values are not supported!");
... ... @@ -242,7 +242,7 @@ public class JsonConverter {
242 242 } else {
243 243 if (bd.scale() <= 16) {
244 244 return builder.setType(KeyValueType.DOUBLE_V).setDoubleV(bd.doubleValue()).build();
245   - } else if (isTypeCastEnabled) {
  245 + } else if (typeCastEnabled) {
246 246 return builder.setType(KeyValueType.STRING_V).setStringV(bd.toPlainString()).build();
247 247 } else {
248 248 throw new JsonSyntaxException("Big integer values are not supported!");
... ... @@ -260,15 +260,15 @@ public class JsonConverter {
260 260 return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build();
261 261 }
262 262
263   - private static void parseNumericValue(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
264   - String valueAsString = value.getAsString();
  263 + private static void parseNumericValue(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value, boolean typeCastEnabled) {
  264 + String valueAsString = value.getAsString().replace(',', '.');
265 265 String key = valueEntry.getKey();
266 266 var bd = new BigDecimal(valueAsString);
267 267 if (bd.stripTrailingZeros().scale() <= 0 && !isSimpleDouble(valueAsString)) {
268 268 try {
269 269 result.add(new LongDataEntry(key, bd.longValueExact()));
270 270 } catch (ArithmeticException e) {
271   - if (isTypeCastEnabled) {
  271 + if (typeCastEnabled) {
272 272 result.add(new StringDataEntry(key, bd.toPlainString()));
273 273 } else {
274 274 throw new JsonSyntaxException("Big integer values are not supported!");
... ... @@ -277,7 +277,7 @@ public class JsonConverter {
277 277 } else {
278 278 if (bd.scale() <= 16) {
279 279 result.add(new DoubleDataEntry(key, bd.doubleValue()));
280   - } else if (isTypeCastEnabled) {
  280 + } else if (typeCastEnabled) {
281 281 result.add(new StringDataEntry(key, bd.toPlainString()));
282 282 } else {
283 283 throw new JsonSyntaxException("Big integer values are not supported!");
... ... @@ -487,13 +487,17 @@ public class JsonConverter {
487 487 }
488 488
489 489 public static Set<AttributeKvEntry> convertToAttributes(JsonElement element) {
  490 + return convertToAttributes(element, isTypeCastEnabled);
  491 + }
  492 +
  493 + public static Set<AttributeKvEntry> convertToAttributes(JsonElement element, boolean typeCastEnabled) {
490 494 Set<AttributeKvEntry> result = new HashSet<>();
491 495 long ts = System.currentTimeMillis();
492   - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
  496 + result.addAll(parseValues(element.getAsJsonObject(), typeCastEnabled).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
493 497 return result;
494 498 }
495 499
496   - private static List<KvEntry> parseValues(JsonObject valuesObject) {
  500 + private static List<KvEntry> parseValues(JsonObject valuesObject, boolean typeCastEnabled) {
497 501 List<KvEntry> result = new ArrayList<>();
498 502 for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
499 503 JsonElement element = valueEntry.getValue();
... ... @@ -504,9 +508,9 @@ public class JsonConverter {
504 508 String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
505 509 throw new JsonSyntaxException(message);
506 510 }
507   - if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
  511 + if (typeCastEnabled && isNumber(value)) {
508 512 try {
509   - parseNumericValue(result, valueEntry, value);
  513 + parseNumericValue(result, valueEntry, value, typeCastEnabled);
510 514 } catch (RuntimeException th) {
511 515 result.add(new StringDataEntry(valueEntry.getKey(), value.getAsString()));
512 516 }
... ... @@ -516,7 +520,7 @@ public class JsonConverter {
516 520 } else if (value.isBoolean()) {
517 521 result.add(new BooleanDataEntry(valueEntry.getKey(), value.getAsBoolean()));
518 522 } else if (value.isNumber()) {
519   - parseNumericValue(result, valueEntry, value);
  523 + parseNumericValue(result, valueEntry, value, typeCastEnabled);
520 524 } else {
521 525 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
522 526 }
... ... @@ -541,30 +545,35 @@ public class JsonConverter {
541 545
542 546 public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws
543 547 JsonSyntaxException {
  548 + return convertToTelemetry(jsonElement, systemTs, sorted, isTypeCastEnabled);
  549 + }
  550 +
  551 + public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted, boolean typeCastEnabled) throws
  552 + JsonSyntaxException {
544 553 Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>();
545   - convertToTelemetry(jsonElement, systemTs, result, null);
  554 + convertToTelemetry(jsonElement, systemTs, result, null, typeCastEnabled);
546 555 return result;
547 556 }
548 557
549 558
550   - private static void parseObject(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo) {
  559 + private static void parseObject(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo, boolean typeCastEnabled) {
551 560 if (jo.has("ts") && jo.has("values")) {
552   - parseWithTs(result, jo);
  561 + parseWithTs(result, jo, typeCastEnabled);
553 562 } else {
554   - parseWithoutTs(result, systemTs, jo);
  563 + parseWithoutTs(result, systemTs, jo, typeCastEnabled);
555 564 }
556 565 }
557 566
558   - private static void parseWithoutTs(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo) {
559   - for (KvEntry entry : parseValues(jo)) {
  567 + private static void parseWithoutTs(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo, boolean typeCastEnabled) {
  568 + for (KvEntry entry : parseValues(jo, typeCastEnabled)) {
560 569 result.computeIfAbsent(systemTs, tmp -> new ArrayList<>()).add(entry);
561 570 }
562 571 }
563 572
564   - public static void parseWithTs(Map<Long, List<KvEntry>> result, JsonObject jo) {
  573 + public static void parseWithTs(Map<Long, List<KvEntry>> result, JsonObject jo, boolean typeCastEnabled) {
565 574 long ts = jo.get("ts").getAsLong();
566 575 JsonObject valuesObject = jo.get("values").getAsJsonObject();
567   - for (KvEntry entry : parseValues(valuesObject)) {
  576 + for (KvEntry entry : parseValues(valuesObject, typeCastEnabled)) {
568 577 result.computeIfAbsent(ts, tmp -> new ArrayList<>()).add(entry);
569 578 }
570 579 }
... ... @@ -634,6 +643,10 @@ public class JsonConverter {
634 643 }
635 644
636 645
  646 + private static boolean isNumber(JsonPrimitive value) {
  647 + return NumberUtils.isParsable(value.getAsString().replace(',', '.'));
  648 + }
  649 +
637 650 private static String getStrValue(JsonObject jo, String field, boolean requiredField) {
638 651 if (jo.has(field)) {
639 652 return jo.get(field).getAsString();
... ...
... ... @@ -66,6 +66,12 @@ public class JsonConverterTest {
66 66 }
67 67
68 68 @Test
  69 + public void testParseAsDoubleWithCommaDecimalSeparatorAndTypeCast() {
  70 + var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": \"-1,1\"}"), 0L);
  71 + Assert.assertEquals(-1.1, result.get(0L).get(0).getDoubleValue().get(), 0.0);
  72 + }
  73 +
  74 + @Test
69 75 public void testParseAsLong() {
70 76 var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": 11}"), 0L);
71 77 Assert.assertEquals(11L, result.get(0L).get(0).getLongValue().get().longValue());
... ...