Commit 03f5375a02acb3bcca7039e872165768529c47db
Committed by
GitHub
1 parent
aa0fce86
JSON support (#2415)
* Created JsonDataEntry and added DataType JSON * Added json to ts and attributes, created sql schema-entities-hsql.sql (json_v varchar) * refactored * refactored * added json array support * Aggregation improvement * Changed in JsonDataEntry value type from JsonNode to String * fix AggregatePartitionsFunction Co-authored-by: Yevhen Bondarenko <56396344+YevhenBondarenko@users.noreply.github.com>
Showing
56 changed files
with
713 additions
and
324 deletions
... | ... | @@ -567,6 +567,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
567 | 567 | case STRING_V: |
568 | 568 | json.addProperty(kv.getKey(), kv.getStringV()); |
569 | 569 | break; |
570 | + case JSON_V: | |
571 | + json.add(kv.getKey(), jsonParser.parse(kv.getJsonV())); | |
572 | + break; | |
570 | 573 | } |
571 | 574 | } |
572 | 575 | return json; |
... | ... | @@ -643,6 +646,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
643 | 646 | builder.setType(KeyValueType.STRING_V); |
644 | 647 | builder.setStringV(kvEntry.getStrValue().get()); |
645 | 648 | break; |
649 | + case JSON: | |
650 | + builder.setType(KeyValueType.JSON_V); | |
651 | + builder.setJsonV(kvEntry.getJsonValue().get()); | |
652 | + break; | |
646 | 653 | } |
647 | 654 | return builder.build(); |
648 | 655 | } | ... | ... |
... | ... | @@ -642,6 +642,8 @@ public abstract class BaseController { |
642 | 642 | entityNode.put(attr.getKey(), attr.getDoubleValue().get()); |
643 | 643 | } else if (attr.getDataType() == DataType.LONG) { |
644 | 644 | entityNode.put(attr.getKey(), attr.getLongValue().get()); |
645 | + } else if (attr.getDataType() == DataType.JSON) { | |
646 | + entityNode.set(attr.getKey(), json.readTree(attr.getJsonValue().get())); | |
645 | 647 | } else { |
646 | 648 | entityNode.put(attr.getKey(), attr.getValueAsString()); |
647 | 649 | } | ... | ... |
... | ... | @@ -15,12 +15,15 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.controller; |
17 | 17 | |
18 | +import com.fasterxml.jackson.core.JsonProcessingException; | |
18 | 19 | import com.fasterxml.jackson.databind.JsonNode; |
20 | +import com.fasterxml.jackson.databind.ObjectMapper; | |
19 | 21 | import com.google.common.base.Function; |
20 | 22 | import com.google.common.util.concurrent.FutureCallback; |
21 | 23 | import com.google.common.util.concurrent.Futures; |
22 | 24 | import com.google.common.util.concurrent.ListenableFuture; |
23 | 25 | import com.google.gson.JsonElement; |
26 | +import com.google.gson.JsonParseException; | |
24 | 27 | import com.google.gson.JsonParser; |
25 | 28 | import lombok.extern.slf4j.Slf4j; |
26 | 29 | import org.springframework.beans.factory.annotation.Autowired; |
... | ... | @@ -56,8 +59,10 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; |
56 | 59 | import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
57 | 60 | import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
58 | 61 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
62 | +import org.thingsboard.server.common.data.kv.DataType; | |
59 | 63 | import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; |
60 | 64 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
65 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
61 | 66 | import org.thingsboard.server.common.data.kv.KvEntry; |
62 | 67 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
63 | 68 | import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
... | ... | @@ -77,6 +82,7 @@ import org.thingsboard.server.service.telemetry.exception.UncheckedApiException; |
77 | 82 | import javax.annotation.Nullable; |
78 | 83 | import javax.annotation.PostConstruct; |
79 | 84 | import javax.annotation.PreDestroy; |
85 | +import java.io.IOException; | |
80 | 86 | import java.util.ArrayList; |
81 | 87 | import java.util.Arrays; |
82 | 88 | import java.util.HashSet; |
... | ... | @@ -107,6 +113,8 @@ public class TelemetryController extends BaseController { |
107 | 113 | |
108 | 114 | private ExecutorService executor; |
109 | 115 | |
116 | + private static final ObjectMapper mapper = new ObjectMapper(); | |
117 | + | |
110 | 118 | @PostConstruct |
111 | 119 | public void initExecutor() { |
112 | 120 | executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("telemetry-controller")); |
... | ... | @@ -284,8 +292,7 @@ public class TelemetryController extends BaseController { |
284 | 292 | if (startTs == null || endTs == null) { |
285 | 293 | deleteToTs = endTs; |
286 | 294 | return getImmediateDeferredResult("When deleteAllDataForKeys is false, start and end timestamp values shouldn't be empty", HttpStatus.BAD_REQUEST); |
287 | - } | |
288 | - else{ | |
295 | + } else { | |
289 | 296 | deleteFromTs = startTs; |
290 | 297 | deleteToTs = endTs; |
291 | 298 | } |
... | ... | @@ -536,8 +543,9 @@ public class TelemetryController extends BaseController { |
536 | 543 | return new FutureCallback<List<AttributeKvEntry>>() { |
537 | 544 | @Override |
538 | 545 | public void onSuccess(List<AttributeKvEntry> attributes) { |
539 | - List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), | |
540 | - attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); | |
546 | + List<AttributeData> values = attributes.stream().map(attribute -> | |
547 | + new AttributeData(attribute.getLastUpdateTs(), attribute.getKey(), getKvValue(attribute)) | |
548 | + ).collect(Collectors.toList()); | |
541 | 549 | logAttributesRead(user, entityId, scope, keyList, null); |
542 | 550 | response.setResult(new ResponseEntity<>(values, HttpStatus.OK)); |
543 | 551 | } |
... | ... | @@ -639,7 +647,9 @@ public class TelemetryController extends BaseController { |
639 | 647 | jsonNode.fields().forEachRemaining(entry -> { |
640 | 648 | String key = entry.getKey(); |
641 | 649 | JsonNode value = entry.getValue(); |
642 | - if (entry.getValue().isTextual()) { | |
650 | + if (entry.getValue().isObject() || entry.getValue().isArray()) { | |
651 | + attributes.add(new BaseAttributeKvEntry(new JsonDataEntry(key, toJsonStr(value)), ts)); | |
652 | + } else if (entry.getValue().isTextual()) { | |
643 | 653 | if (maxStringValueLength > 0 && entry.getValue().textValue().length() > maxStringValueLength) { |
644 | 654 | String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", entry.getValue().textValue().length(), key, maxStringValueLength); |
645 | 655 | throw new UncheckedApiException(new InvalidParametersException(message)); |
... | ... | @@ -659,4 +669,27 @@ public class TelemetryController extends BaseController { |
659 | 669 | }); |
660 | 670 | return attributes; |
661 | 671 | } |
672 | + | |
673 | + private String toJsonStr(JsonNode value) { | |
674 | + try { | |
675 | + return mapper.writeValueAsString(value); | |
676 | + } catch (JsonProcessingException e) { | |
677 | + throw new JsonParseException("Can't parse jsonValue: " + value, e); | |
678 | + } | |
679 | + } | |
680 | + | |
681 | + private JsonNode toJsonNode(String value) { | |
682 | + try { | |
683 | + return mapper.readTree(value); | |
684 | + } catch (IOException e) { | |
685 | + throw new JsonParseException("Can't parse jsonValue: " + value, e); | |
686 | + } | |
687 | + } | |
688 | + | |
689 | + private Object getKvValue(KvEntry entry) { | |
690 | + if (entry.getDataType() == DataType.JSON) { | |
691 | + return toJsonNode(entry.getJsonValue().get()); | |
692 | + } | |
693 | + return entry.getValue(); | |
694 | + } | |
662 | 695 | } | ... | ... |
... | ... | @@ -24,9 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired; |
24 | 24 | import org.springframework.context.annotation.Lazy; |
25 | 25 | import org.springframework.stereotype.Service; |
26 | 26 | import org.springframework.util.StringUtils; |
27 | +import org.thingsboard.common.util.DonAsynchron; | |
27 | 28 | import org.thingsboard.common.util.ThingsBoardThreadFactory; |
28 | 29 | import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; |
29 | -import org.thingsboard.common.util.DonAsynchron; | |
30 | 30 | import org.thingsboard.server.actors.service.ActorService; |
31 | 31 | import org.thingsboard.server.common.data.DataConstants; |
32 | 32 | import org.thingsboard.server.common.data.EntityType; |
... | ... | @@ -36,7 +36,20 @@ import org.thingsboard.server.common.data.id.EntityId; |
36 | 36 | import org.thingsboard.server.common.data.id.EntityIdFactory; |
37 | 37 | import org.thingsboard.server.common.data.id.EntityViewId; |
38 | 38 | import org.thingsboard.server.common.data.id.TenantId; |
39 | -import org.thingsboard.server.common.data.kv.*; | |
39 | +import org.thingsboard.server.common.data.kv.Aggregation; | |
40 | +import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
41 | +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; | |
42 | +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; | |
43 | +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
44 | +import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
45 | +import org.thingsboard.server.common.data.kv.DataType; | |
46 | +import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
47 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
48 | +import org.thingsboard.server.common.data.kv.KvEntry; | |
49 | +import org.thingsboard.server.common.data.kv.LongDataEntry; | |
50 | +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; | |
51 | +import org.thingsboard.server.common.data.kv.StringDataEntry; | |
52 | +import org.thingsboard.server.common.data.kv.TsKvEntry; | |
40 | 53 | import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; |
41 | 54 | import org.thingsboard.server.common.msg.cluster.ServerAddress; |
42 | 55 | import org.thingsboard.server.dao.attributes.AttributesService; |
... | ... | @@ -105,7 +118,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
105 | 118 | @Autowired |
106 | 119 | @Lazy |
107 | 120 | private ActorService actorService; |
108 | - | |
121 | + | |
109 | 122 | private ExecutorService tsCallBackExecutor; |
110 | 123 | private ExecutorService wsCallBackExecutor; |
111 | 124 | |
... | ... | @@ -692,6 +705,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
692 | 705 | Optional<Double> doubleValue = attr.getDoubleValue(); |
693 | 706 | doubleValue.ifPresent(dataBuilder::setDoubleValue); |
694 | 707 | break; |
708 | + case JSON: | |
709 | + Optional<String> jsonValue = attr.getJsonValue(); | |
710 | + jsonValue.ifPresent(dataBuilder::setJsonValue); | |
711 | + break; | |
695 | 712 | case STRING: |
696 | 713 | Optional<String> stringValue = attr.getStrValue(); |
697 | 714 | stringValue.ifPresent(dataBuilder::setStrValue); |
... | ... | @@ -724,6 +741,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
724 | 741 | case STRING: |
725 | 742 | entry = new StringDataEntry(proto.getKey(), proto.getStrValue()); |
726 | 743 | break; |
744 | + case JSON: | |
745 | + entry = new JsonDataEntry(proto.getKey(), proto.getJsonValue()); | |
746 | + break; | |
727 | 747 | } |
728 | 748 | return entry; |
729 | 749 | } | ... | ... |
... | ... | @@ -30,7 +30,7 @@ public class ControllerSqlTestSuite { |
30 | 30 | |
31 | 31 | @ClassRule |
32 | 32 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
33 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), | |
33 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), | |
34 | 34 | "sql/drop-all-tables.sql", |
35 | 35 | "sql-test.properties"); |
36 | 36 | } | ... | ... |
... | ... | @@ -29,7 +29,7 @@ public class MqttSqlTestSuite { |
29 | 29 | |
30 | 30 | @ClassRule |
31 | 31 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
32 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/system-data.sql"), | |
32 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), | |
33 | 33 | "sql/drop-all-tables.sql", |
34 | 34 | "sql-test.properties"); |
35 | 35 | } | ... | ... |
... | ... | @@ -30,7 +30,7 @@ public class RuleEngineSqlTestSuite { |
30 | 30 | |
31 | 31 | @ClassRule |
32 | 32 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
33 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/system-data.sql"), | |
33 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), | |
34 | 34 | "sql/drop-all-tables.sql", |
35 | 35 | "sql-test.properties"); |
36 | 36 | } | ... | ... |
... | ... | @@ -31,7 +31,7 @@ public class SystemSqlTestSuite { |
31 | 31 | |
32 | 32 | @ClassRule |
33 | 33 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
34 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/system-data.sql"), | |
34 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), | |
35 | 35 | "sql/drop-all-tables.sql", |
36 | 36 | "sql-test.properties"); |
37 | 37 | ... | ... |
... | ... | @@ -35,6 +35,7 @@ import java.util.function.Consumer; |
35 | 35 | @Slf4j |
36 | 36 | public abstract class SearchTextBasedWithAdditionalInfo<I extends UUIDBased> extends SearchTextBased<I> implements HasAdditionalInfo { |
37 | 37 | |
38 | + private static final ObjectMapper mapper = new ObjectMapper(); | |
38 | 39 | private transient JsonNode additionalInfo; |
39 | 40 | @JsonIgnore |
40 | 41 | private byte[] additionalInfoBytes; |
... | ... | @@ -97,7 +98,7 @@ public abstract class SearchTextBasedWithAdditionalInfo<I extends UUIDBased> ext |
97 | 98 | public static void setJson(JsonNode json, Consumer<JsonNode> jsonConsumer, Consumer<byte[]> bytesConsumer) { |
98 | 99 | jsonConsumer.accept(json); |
99 | 100 | try { |
100 | - bytesConsumer.accept(new ObjectMapper().writeValueAsBytes(json)); | |
101 | + bytesConsumer.accept(mapper.writeValueAsBytes(json)); | |
101 | 102 | } catch (JsonProcessingException e) { |
102 | 103 | log.warn("Can't serialize json data: ", e); |
103 | 104 | } | ... | ... |
... | ... | @@ -15,6 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.data.kv; |
17 | 17 | |
18 | +import com.fasterxml.jackson.databind.JsonNode; | |
19 | + | |
18 | 20 | import java.util.Optional; |
19 | 21 | |
20 | 22 | /** |
... | ... | @@ -66,6 +68,11 @@ public class BaseAttributeKvEntry implements AttributeKvEntry { |
66 | 68 | } |
67 | 69 | |
68 | 70 | @Override |
71 | + public Optional<String> getJsonValue() { | |
72 | + return kv.getJsonValue(); | |
73 | + } | |
74 | + | |
75 | + @Override | |
69 | 76 | public String getValueAsString() { |
70 | 77 | return kv.getValueAsString(); |
71 | 78 | } | ... | ... |
... | ... | @@ -52,6 +52,11 @@ public abstract class BasicKvEntry implements KvEntry { |
52 | 52 | } |
53 | 53 | |
54 | 54 | @Override |
55 | + public Optional<String> getJsonValue() { | |
56 | + return Optional.ofNullable(null); | |
57 | + } | |
58 | + | |
59 | + @Override | |
55 | 60 | public boolean equals(Object o) { |
56 | 61 | if (this == o) return true; |
57 | 62 | if (!(o instanceof BasicKvEntry)) return false; | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.kv; | |
17 | + | |
18 | +import java.util.Objects; | |
19 | +import java.util.Optional; | |
20 | + | |
21 | +public class JsonDataEntry extends BasicKvEntry { | |
22 | + private final String value; | |
23 | + | |
24 | + public JsonDataEntry(String key, String value) { | |
25 | + super(key); | |
26 | + this.value = value; | |
27 | + } | |
28 | + | |
29 | + @Override | |
30 | + public DataType getDataType() { | |
31 | + return DataType.JSON; | |
32 | + } | |
33 | + | |
34 | + @Override | |
35 | + public Optional<String> getJsonValue() { | |
36 | + return Optional.ofNullable(value); | |
37 | + } | |
38 | + | |
39 | + @Override | |
40 | + public boolean equals(Object o) { | |
41 | + if (this == o) return true; | |
42 | + if (!(o instanceof JsonDataEntry)) return false; | |
43 | + if (!super.equals(o)) return false; | |
44 | + JsonDataEntry that = (JsonDataEntry) o; | |
45 | + return Objects.equals(value, that.value); | |
46 | + } | |
47 | + | |
48 | + @Override | |
49 | + public Object getValue() { | |
50 | + return value; | |
51 | + } | |
52 | + | |
53 | + @Override | |
54 | + public int hashCode() { | |
55 | + return Objects.hash(super.hashCode(), value); | |
56 | + } | |
57 | + | |
58 | + @Override | |
59 | + public String toString() { | |
60 | + return "JsonDataEntry{" + | |
61 | + "value=" + value + | |
62 | + "} " + super.toString(); | |
63 | + } | |
64 | + | |
65 | + @Override | |
66 | + public String getValueAsString() { | |
67 | + return value; | |
68 | + } | |
69 | +} | ... | ... |
... | ... | @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
31 | 31 | import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
32 | 32 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
33 | 33 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
34 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
34 | 35 | import org.thingsboard.server.common.data.kv.KvEntry; |
35 | 36 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
36 | 37 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
... | ... | @@ -59,6 +60,7 @@ import java.util.stream.Collectors; |
59 | 60 | public class JsonConverter { |
60 | 61 | |
61 | 62 | private static final Gson GSON = new Gson(); |
63 | + private static final JsonParser JSON_PARSER = new JsonParser(); | |
62 | 64 | private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; |
63 | 65 | private static final String DEVICE_PROPERTY = "device"; |
64 | 66 | |
... | ... | @@ -204,6 +206,14 @@ public class JsonConverter { |
204 | 206 | } else if (!value.isJsonNull()) { |
205 | 207 | throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value); |
206 | 208 | } |
209 | + } else if (element.isJsonObject() || element.isJsonArray()) { | |
210 | + result.add(KeyValueProto | |
211 | + .newBuilder() | |
212 | + .setKey(valueEntry | |
213 | + .getKey()) | |
214 | + .setType(KeyValueType.JSON_V) | |
215 | + .setJsonV(element.toString()) | |
216 | + .build()); | |
207 | 217 | } else if (!element.isJsonNull()) { |
208 | 218 | throw new JsonSyntaxException(CAN_T_PARSE_VALUE + element); |
209 | 219 | } |
... | ... | @@ -354,6 +364,9 @@ public class JsonConverter { |
354 | 364 | case LONG_V: |
355 | 365 | json.addProperty(name, entry.getLongV()); |
356 | 366 | break; |
367 | + case JSON_V: | |
368 | + json.add(name, JSON_PARSER.parse(entry.getJsonV())); | |
369 | + break; | |
357 | 370 | } |
358 | 371 | } |
359 | 372 | |
... | ... | @@ -363,47 +376,48 @@ public class JsonConverter { |
363 | 376 | |
364 | 377 | private static Consumer<TsKvProto> addToObjectFromProto(JsonObject result) { |
365 | 378 | return de -> { |
366 | - JsonPrimitive value; | |
367 | 379 | switch (de.getKv().getType()) { |
368 | 380 | case BOOLEAN_V: |
369 | - value = new JsonPrimitive(de.getKv().getBoolV()); | |
381 | + result.add(de.getKv().getKey(), new JsonPrimitive(de.getKv().getBoolV())); | |
370 | 382 | break; |
371 | 383 | case DOUBLE_V: |
372 | - value = new JsonPrimitive(de.getKv().getDoubleV()); | |
384 | + result.add(de.getKv().getKey(), new JsonPrimitive(de.getKv().getDoubleV())); | |
373 | 385 | break; |
374 | 386 | case LONG_V: |
375 | - value = new JsonPrimitive(de.getKv().getLongV()); | |
387 | + result.add(de.getKv().getKey(), new JsonPrimitive(de.getKv().getLongV())); | |
376 | 388 | break; |
377 | 389 | case STRING_V: |
378 | - value = new JsonPrimitive(de.getKv().getStringV()); | |
390 | + result.add(de.getKv().getKey(), new JsonPrimitive(de.getKv().getStringV())); | |
379 | 391 | break; |
392 | + case JSON_V: | |
393 | + result.add(de.getKv().getKey(), JSON_PARSER.parse(de.getKv().getJsonV())); | |
380 | 394 | default: |
381 | 395 | throw new IllegalArgumentException("Unsupported data type: " + de.getKv().getType()); |
382 | 396 | } |
383 | - result.add(de.getKv().getKey(), value); | |
384 | 397 | }; |
385 | 398 | } |
386 | 399 | |
387 | 400 | private static Consumer<AttributeKvEntry> addToObject(JsonObject result) { |
388 | 401 | return de -> { |
389 | - JsonPrimitive value; | |
390 | 402 | switch (de.getDataType()) { |
391 | 403 | case BOOLEAN: |
392 | - value = new JsonPrimitive(de.getBooleanValue().get()); | |
404 | + result.add(de.getKey(), new JsonPrimitive(de.getBooleanValue().get())); | |
393 | 405 | break; |
394 | 406 | case DOUBLE: |
395 | - value = new JsonPrimitive(de.getDoubleValue().get()); | |
407 | + result.add(de.getKey(), new JsonPrimitive(de.getDoubleValue().get())); | |
396 | 408 | break; |
397 | 409 | case LONG: |
398 | - value = new JsonPrimitive(de.getLongValue().get()); | |
410 | + result.add(de.getKey(), new JsonPrimitive(de.getLongValue().get())); | |
399 | 411 | break; |
400 | 412 | case STRING: |
401 | - value = new JsonPrimitive(de.getStrValue().get()); | |
413 | + result.add(de.getKey(), new JsonPrimitive(de.getStrValue().get())); | |
414 | + break; | |
415 | + case JSON: | |
416 | + result.add(de.getKey(), JSON_PARSER.parse(de.getJsonValue().get())); | |
402 | 417 | break; |
403 | 418 | default: |
404 | 419 | throw new IllegalArgumentException("Unsupported data type: " + de.getDataType()); |
405 | 420 | } |
406 | - result.add(de.getKey(), value); | |
407 | 421 | }; |
408 | 422 | } |
409 | 423 | |
... | ... | @@ -464,6 +478,8 @@ public class JsonConverter { |
464 | 478 | } else { |
465 | 479 | throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value); |
466 | 480 | } |
481 | + } else if (element.isJsonObject() || element.isJsonArray()) { | |
482 | + result.add(new JsonDataEntry(valueEntry.getKey(), element.toString())); | |
467 | 483 | } else { |
468 | 484 | throw new JsonSyntaxException(CAN_T_PARSE_VALUE + element); |
469 | 485 | } | ... | ... |
... | ... | @@ -47,6 +47,7 @@ enum KeyValueType { |
47 | 47 | LONG_V = 1; |
48 | 48 | DOUBLE_V = 2; |
49 | 49 | STRING_V = 3; |
50 | + JSON_V = 4; | |
50 | 51 | } |
51 | 52 | |
52 | 53 | message KeyValueProto { |
... | ... | @@ -56,6 +57,7 @@ message KeyValueProto { |
56 | 57 | int64 long_v = 4; |
57 | 58 | double double_v = 5; |
58 | 59 | string string_v = 6; |
60 | + string json_v = 7; | |
59 | 61 | } |
60 | 62 | |
61 | 63 | message TsKvProto { | ... | ... |
... | ... | @@ -112,31 +112,18 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem |
112 | 112 | |
113 | 113 | @Override |
114 | 114 | public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) { |
115 | - BoundStatement stmt = getSaveStmt().bind(); | |
116 | - stmt.setString(0, entityId.getEntityType().name()); | |
117 | - stmt.setUUID(1, entityId.getId()); | |
118 | - stmt.setString(2, attributeType); | |
119 | - stmt.setString(3, attribute.getKey()); | |
120 | - stmt.setLong(4, attribute.getLastUpdateTs()); | |
121 | - stmt.setString(5, attribute.getStrValue().orElse(null)); | |
122 | - Optional<Boolean> booleanValue = attribute.getBooleanValue(); | |
123 | - if (booleanValue.isPresent()) { | |
124 | - stmt.setBool(6, booleanValue.get()); | |
125 | - } else { | |
126 | - stmt.setToNull(6); | |
127 | - } | |
128 | - Optional<Long> longValue = attribute.getLongValue(); | |
129 | - if (longValue.isPresent()) { | |
130 | - stmt.setLong(7, longValue.get()); | |
131 | - } else { | |
132 | - stmt.setToNull(7); | |
133 | - } | |
134 | - Optional<Double> doubleValue = attribute.getDoubleValue(); | |
135 | - if (doubleValue.isPresent()) { | |
136 | - stmt.setDouble(8, doubleValue.get()); | |
137 | - } else { | |
138 | - stmt.setToNull(8); | |
139 | - } | |
115 | + BoundStatement stmt = getSaveStmt().bind() | |
116 | + .setString(0, entityId.getEntityType().name()) | |
117 | + .setUUID(1, entityId.getId()) | |
118 | + .setString(2, attributeType) | |
119 | + .setString(3, attribute.getKey()) | |
120 | + .setLong(4, attribute.getLastUpdateTs()) | |
121 | + .set(5, attribute.getStrValue().orElse(null), String.class) | |
122 | + .set(6, attribute.getBooleanValue().orElse(null), Boolean.class) | |
123 | + .set(7, attribute.getLongValue().orElse(null), Long.class) | |
124 | + .set(8, attribute.getDoubleValue().orElse(null), Double.class) | |
125 | + .set(9, attribute.getJsonValue().orElse(null), String.class); | |
126 | + | |
140 | 127 | log.trace("Generated save stmt [{}] for entityId {} and attributeType {} and attribute", stmt, entityId, attributeType, attribute); |
141 | 128 | return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); |
142 | 129 | } |
... | ... | @@ -172,8 +159,9 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem |
172 | 159 | "," + ModelConstants.BOOLEAN_VALUE_COLUMN + |
173 | 160 | "," + ModelConstants.LONG_VALUE_COLUMN + |
174 | 161 | "," + ModelConstants.DOUBLE_VALUE_COLUMN + |
162 | + "," + ModelConstants.JSON_VALUE_COLUMN + | |
175 | 163 | ")" + |
176 | - " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)"); | |
164 | + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); | |
177 | 165 | } |
178 | 166 | return saveStmt; |
179 | 167 | } | ... | ... |
... | ... | @@ -369,17 +369,18 @@ public class ModelConstants { |
369 | 369 | public static final String STRING_VALUE_COLUMN = "str_v"; |
370 | 370 | public static final String LONG_VALUE_COLUMN = "long_v"; |
371 | 371 | public static final String DOUBLE_VALUE_COLUMN = "dbl_v"; |
372 | + public static final String JSON_VALUE_COLUMN = "json_v"; | |
372 | 373 | |
373 | - protected static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN}; | |
374 | + protected static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, JSON_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN}; | |
374 | 375 | |
375 | - protected static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)}; | |
376 | + protected static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN), count(JSON_VALUE_COLUMN)}; | |
376 | 377 | |
377 | - protected static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, | |
378 | - new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)}); | |
379 | - protected static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, | |
380 | - new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)}); | |
381 | - protected static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, | |
382 | - new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)}); | |
378 | + protected static final String[] MIN_AGGREGATION_COLUMNS = | |
379 | + ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN), min(JSON_VALUE_COLUMN)}); | |
380 | + protected static final String[] MAX_AGGREGATION_COLUMNS = | |
381 | + ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN), max(JSON_VALUE_COLUMN)}); | |
382 | + protected static final String[] SUM_AGGREGATION_COLUMNS = | |
383 | + ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)}); | |
383 | 384 | protected static final String[] AVG_AGGREGATION_COLUMNS = SUM_AGGREGATION_COLUMNS; |
384 | 385 | |
385 | 386 | public static String min(String s) { | ... | ... |
... | ... | @@ -19,6 +19,7 @@ import lombok.Data; |
19 | 19 | import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
20 | 20 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
21 | 21 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
22 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
22 | 23 | import org.thingsboard.server.common.data.kv.KvEntry; |
23 | 24 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
24 | 25 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
... | ... | @@ -29,12 +30,12 @@ import javax.persistence.Column; |
29 | 30 | import javax.persistence.Id; |
30 | 31 | import javax.persistence.MappedSuperclass; |
31 | 32 | import javax.persistence.Transient; |
32 | - | |
33 | 33 | import java.util.UUID; |
34 | 34 | |
35 | 35 | import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLUMN; |
36 | 36 | import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN; |
37 | 37 | import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; |
38 | +import static org.thingsboard.server.dao.model.ModelConstants.JSON_VALUE_COLUMN; | |
38 | 39 | import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN; |
39 | 40 | import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN; |
40 | 41 | import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; |
... | ... | @@ -68,6 +69,9 @@ public abstract class AbstractTsKvEntity implements ToData<TsKvEntry> { |
68 | 69 | @Column(name = DOUBLE_VALUE_COLUMN) |
69 | 70 | protected Double doubleValue; |
70 | 71 | |
72 | + @Column(name = JSON_VALUE_COLUMN) | |
73 | + protected String jsonValue; | |
74 | + | |
71 | 75 | @Transient |
72 | 76 | protected String strKey; |
73 | 77 | |
... | ... | @@ -93,6 +97,8 @@ public abstract class AbstractTsKvEntity implements ToData<TsKvEntry> { |
93 | 97 | kvEntry = new DoubleDataEntry(strKey, doubleValue); |
94 | 98 | } else if (booleanValue != null) { |
95 | 99 | kvEntry = new BooleanDataEntry(strKey, booleanValue); |
100 | + } else if (jsonValue != null) { | |
101 | + kvEntry = new JsonDataEntry(strKey, jsonValue); | |
96 | 102 | } |
97 | 103 | return new BasicTsKvEntry(ts, kvEntry); |
98 | 104 | } | ... | ... |
... | ... | @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
20 | 20 | import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
21 | 21 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
22 | 22 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
23 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
23 | 24 | import org.thingsboard.server.common.data.kv.KvEntry; |
24 | 25 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
25 | 26 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
... | ... | @@ -33,6 +34,7 @@ import java.io.Serializable; |
33 | 34 | |
34 | 35 | import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLUMN; |
35 | 36 | import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN; |
37 | +import static org.thingsboard.server.dao.model.ModelConstants.JSON_VALUE_COLUMN; | |
36 | 38 | import static org.thingsboard.server.dao.model.ModelConstants.LAST_UPDATE_TS_COLUMN; |
37 | 39 | import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN; |
38 | 40 | import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN; |
... | ... | @@ -57,6 +59,9 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable |
57 | 59 | @Column(name = DOUBLE_VALUE_COLUMN) |
58 | 60 | private Double doubleValue; |
59 | 61 | |
62 | + @Column(name = JSON_VALUE_COLUMN) | |
63 | + private String jsonValue; | |
64 | + | |
60 | 65 | @Column(name = LAST_UPDATE_TS_COLUMN) |
61 | 66 | private Long lastUpdateTs; |
62 | 67 | |
... | ... | @@ -71,7 +76,10 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable |
71 | 76 | kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue); |
72 | 77 | } else if (longValue != null) { |
73 | 78 | kvEntry = new LongDataEntry(id.getAttributeKey(), longValue); |
79 | + } else if (jsonValue != null) { | |
80 | + kvEntry = new JsonDataEntry(id.getAttributeKey(), jsonValue); | |
74 | 81 | } |
82 | + | |
75 | 83 | return new BaseAttributeKvEntry(kvEntry, lastUpdateTs); |
76 | 84 | } |
77 | 85 | } | ... | ... |
... | ... | @@ -16,30 +16,16 @@ |
16 | 16 | package org.thingsboard.server.dao.model.sqlts.hsql; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | -import org.thingsboard.server.common.data.EntityType; | |
20 | -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
21 | -import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
22 | -import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
23 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
24 | -import org.thingsboard.server.common.data.kv.LongDataEntry; | |
25 | -import org.thingsboard.server.common.data.kv.StringDataEntry; | |
26 | 19 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
27 | 20 | import org.thingsboard.server.dao.model.ToData; |
28 | 21 | import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; |
29 | 22 | |
30 | 23 | import javax.persistence.Column; |
31 | 24 | import javax.persistence.Entity; |
32 | -import javax.persistence.EnumType; | |
33 | -import javax.persistence.Enumerated; | |
34 | 25 | import javax.persistence.Id; |
35 | 26 | import javax.persistence.IdClass; |
36 | 27 | import javax.persistence.Table; |
37 | -import javax.persistence.Transient; | |
38 | 28 | |
39 | -import java.util.UUID; | |
40 | - | |
41 | -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; | |
42 | -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN; | |
43 | 29 | import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; |
44 | 30 | |
45 | 31 | @Data |
... | ... | @@ -98,12 +84,14 @@ public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvE |
98 | 84 | } |
99 | 85 | } |
100 | 86 | |
101 | - public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) { | |
87 | + public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) { | |
102 | 88 | if (!isAllNull(booleanValueCount, strValueCount, longValueCount, doubleValueCount)) { |
103 | 89 | if (booleanValueCount != 0) { |
104 | 90 | this.longValue = booleanValueCount; |
105 | 91 | } else if (strValueCount != 0) { |
106 | 92 | this.longValue = strValueCount; |
93 | + } else if (jsonValueCount != 0) { | |
94 | + this.longValue = jsonValueCount; | |
107 | 95 | } else { |
108 | 96 | this.longValue = longValueCount + doubleValueCount; |
109 | 97 | } | ... | ... |
... | ... | @@ -52,6 +52,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; |
52 | 52 | @ColumnResult(name = "boolValue", type = Boolean.class), |
53 | 53 | @ColumnResult(name = "longValue", type = Long.class), |
54 | 54 | @ColumnResult(name = "doubleValue", type = Double.class), |
55 | + @ColumnResult(name = "jsonValue", type = String.class), | |
55 | 56 | @ColumnResult(name = "ts", type = Long.class), |
56 | 57 | |
57 | 58 | } |
... | ... | @@ -74,13 +75,13 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity { |
74 | 75 | |
75 | 76 | @Override |
76 | 77 | public boolean isNotEmpty() { |
77 | - return strValue != null || longValue != null || doubleValue != null || booleanValue != null; | |
78 | + return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null; | |
78 | 79 | } |
79 | 80 | |
80 | 81 | public TsKvLatestEntity() { |
81 | 82 | } |
82 | 83 | |
83 | - public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, Long ts) { | |
84 | + public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, String jsonValue, Long ts) { | |
84 | 85 | this.entityId = entityId; |
85 | 86 | this.key = key; |
86 | 87 | this.ts = ts; |
... | ... | @@ -88,6 +89,7 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity { |
88 | 89 | this.doubleValue = doubleValue; |
89 | 90 | this.strValue = strValue; |
90 | 91 | this.booleanValue = boolValue; |
92 | + this.jsonValue = jsonValue; | |
91 | 93 | this.strKey = strKey; |
92 | 94 | } |
93 | 95 | } | ... | ... |
... | ... | @@ -16,14 +16,6 @@ |
16 | 16 | package org.thingsboard.server.dao.model.sqlts.psql; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
20 | -import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
21 | -import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
22 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
23 | -import org.thingsboard.server.common.data.kv.LongDataEntry; | |
24 | -import org.thingsboard.server.common.data.kv.StringDataEntry; | |
25 | -import org.thingsboard.server.common.data.kv.TsKvEntry; | |
26 | -import org.thingsboard.server.dao.model.ToData; | |
27 | 19 | import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; |
28 | 20 | |
29 | 21 | import javax.persistence.Column; |
... | ... | @@ -31,10 +23,7 @@ import javax.persistence.Entity; |
31 | 23 | import javax.persistence.Id; |
32 | 24 | import javax.persistence.IdClass; |
33 | 25 | import javax.persistence.Table; |
34 | -import javax.persistence.Transient; | |
35 | -import java.util.UUID; | |
36 | 26 | |
37 | -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; | |
38 | 27 | import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; |
39 | 28 | |
40 | 29 | @Data |
... | ... | @@ -93,12 +82,14 @@ public final class TsKvEntity extends AbstractTsKvEntity { |
93 | 82 | } |
94 | 83 | } |
95 | 84 | |
96 | - public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) { | |
85 | + public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) { | |
97 | 86 | if (!isAllNull(booleanValueCount, strValueCount, longValueCount, doubleValueCount)) { |
98 | 87 | if (booleanValueCount != 0) { |
99 | 88 | this.longValue = booleanValueCount; |
100 | 89 | } else if (strValueCount != 0) { |
101 | 90 | this.longValue = strValueCount; |
91 | + } else if (jsonValueCount != 0) { | |
92 | + this.longValue = jsonValueCount; | |
102 | 93 | } else { |
103 | 94 | this.longValue = longValueCount + doubleValueCount; |
104 | 95 | } | ... | ... |
... | ... | @@ -18,12 +18,6 @@ package org.thingsboard.server.dao.model.sqlts.timescale; |
18 | 18 | import lombok.Data; |
19 | 19 | import lombok.EqualsAndHashCode; |
20 | 20 | import org.springframework.util.StringUtils; |
21 | -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
22 | -import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
23 | -import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
24 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
25 | -import org.thingsboard.server.common.data.kv.LongDataEntry; | |
26 | -import org.thingsboard.server.common.data.kv.StringDataEntry; | |
27 | 21 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
28 | 22 | import org.thingsboard.server.dao.model.ToData; |
29 | 23 | import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; |
... | ... | @@ -39,10 +33,8 @@ import javax.persistence.NamedNativeQuery; |
39 | 33 | import javax.persistence.SqlResultSetMapping; |
40 | 34 | import javax.persistence.SqlResultSetMappings; |
41 | 35 | import javax.persistence.Table; |
42 | -import javax.persistence.Transient; | |
43 | 36 | import java.util.UUID; |
44 | 37 | |
45 | -import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; | |
46 | 38 | import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; |
47 | 39 | import static org.thingsboard.server.dao.model.ModelConstants.TENANT_ID_COLUMN; |
48 | 40 | import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG; |
... | ... | @@ -92,6 +84,7 @@ import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.F |
92 | 84 | @ColumnResult(name = "strValueCount", type = Long.class), |
93 | 85 | @ColumnResult(name = "longValueCount", type = Long.class), |
94 | 86 | @ColumnResult(name = "doubleValueCount", type = Long.class), |
87 | + @ColumnResult(name = "jsonValueCount", type = Long.class), | |
95 | 88 | } |
96 | 89 | ) |
97 | 90 | }), |
... | ... | @@ -179,13 +172,15 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD |
179 | 172 | } |
180 | 173 | } |
181 | 174 | |
182 | - public TimescaleTsKvEntity(Long tsBucket, Long interval, Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) { | |
183 | - if (!isAllNull(tsBucket, interval, booleanValueCount, strValueCount, longValueCount, doubleValueCount)) { | |
175 | + public TimescaleTsKvEntity(Long tsBucket, Long interval, Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) { | |
176 | + if (!isAllNull(tsBucket, interval, booleanValueCount, strValueCount, longValueCount, doubleValueCount, jsonValueCount)) { | |
184 | 177 | this.ts = tsBucket + interval / 2; |
185 | 178 | if (booleanValueCount != 0) { |
186 | 179 | this.longValue = booleanValueCount; |
187 | 180 | } else if (strValueCount != 0) { |
188 | 181 | this.longValue = strValueCount; |
182 | + } else if (jsonValueCount != 0) { | |
183 | + this.longValue = jsonValueCount; | |
189 | 184 | } else { |
190 | 185 | this.longValue = longValueCount + doubleValueCount; |
191 | 186 | } |
... | ... | @@ -194,6 +189,6 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD |
194 | 189 | |
195 | 190 | @Override |
196 | 191 | public boolean isNotEmpty() { |
197 | - return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null); | |
192 | + return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null); | |
198 | 193 | } |
199 | 194 | } |
\ No newline at end of file | ... | ... |
... | ... | @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sql.attributes; |
18 | 18 | import lombok.extern.slf4j.Slf4j; |
19 | 19 | import org.springframework.beans.factory.annotation.Autowired; |
20 | 20 | import org.springframework.beans.factory.annotation.Value; |
21 | -import org.springframework.data.jpa.repository.Modifying; | |
22 | 21 | import org.springframework.jdbc.core.BatchPreparedStatementSetter; |
23 | 22 | import org.springframework.jdbc.core.JdbcTemplate; |
24 | 23 | import org.springframework.stereotype.Repository; |
... | ... | @@ -28,8 +27,6 @@ import org.springframework.transaction.support.TransactionTemplate; |
28 | 27 | import org.thingsboard.server.dao.model.sql.AttributeKvEntity; |
29 | 28 | import org.thingsboard.server.dao.util.SqlDao; |
30 | 29 | |
31 | -import javax.persistence.EntityManager; | |
32 | -import javax.persistence.PersistenceContext; | |
33 | 30 | import java.sql.PreparedStatement; |
34 | 31 | import java.sql.SQLException; |
35 | 32 | import java.sql.Types; |
... | ... | @@ -45,19 +42,14 @@ public abstract class AttributeKvInsertRepository { |
45 | 42 | private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); |
46 | 43 | private static final String EMPTY_STR = ""; |
47 | 44 | |
48 | - private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ? " + | |
45 | + private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ? " + | |
49 | 46 | "WHERE entity_type = ? and entity_id = ? and attribute_type =? and attribute_key = ?;"; |
50 | 47 | |
51 | 48 | private static final String INSERT_OR_UPDATE = |
52 | - "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + | |
53 | - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) " + | |
49 | + "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + | |
50 | + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + | |
54 | 51 | "ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) " + |
55 | - "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ?;"; | |
56 | - | |
57 | - protected static final String BOOL_V = "bool_v"; | |
58 | - protected static final String STR_V = "str_v"; | |
59 | - protected static final String LONG_V = "long_v"; | |
60 | - protected static final String DBL_V = "dbl_v"; | |
52 | + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?;"; | |
61 | 53 | |
62 | 54 | @Autowired |
63 | 55 | protected JdbcTemplate jdbcTemplate; |
... | ... | @@ -68,74 +60,6 @@ public abstract class AttributeKvInsertRepository { |
68 | 60 | @Value("${sql.remove_null_chars}") |
69 | 61 | private boolean removeNullChars; |
70 | 62 | |
71 | - @PersistenceContext | |
72 | - protected EntityManager entityManager; | |
73 | - | |
74 | - public abstract void saveOrUpdate(AttributeKvEntity entity); | |
75 | - | |
76 | - protected void processSaveOrUpdate(AttributeKvEntity entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) { | |
77 | - if (entity.getBooleanValue() != null) { | |
78 | - saveOrUpdateBoolean(entity, requestBoolValue); | |
79 | - } | |
80 | - if (entity.getStrValue() != null) { | |
81 | - saveOrUpdateString(entity, requestStrValue); | |
82 | - } | |
83 | - if (entity.getLongValue() != null) { | |
84 | - saveOrUpdateLong(entity, requestLongValue); | |
85 | - } | |
86 | - if (entity.getDoubleValue() != null) { | |
87 | - saveOrUpdateDouble(entity, requestDblValue); | |
88 | - } | |
89 | - } | |
90 | - | |
91 | - @Modifying | |
92 | - private void saveOrUpdateBoolean(AttributeKvEntity entity, String query) { | |
93 | - entityManager.createNativeQuery(query) | |
94 | - .setParameter("entity_type", entity.getId().getEntityType().name()) | |
95 | - .setParameter("entity_id", entity.getId().getEntityId()) | |
96 | - .setParameter("attribute_type", entity.getId().getAttributeType()) | |
97 | - .setParameter("attribute_key", entity.getId().getAttributeKey()) | |
98 | - .setParameter("bool_v", entity.getBooleanValue()) | |
99 | - .setParameter("last_update_ts", entity.getLastUpdateTs()) | |
100 | - .executeUpdate(); | |
101 | - } | |
102 | - | |
103 | - @Modifying | |
104 | - private void saveOrUpdateString(AttributeKvEntity entity, String query) { | |
105 | - entityManager.createNativeQuery(query) | |
106 | - .setParameter("entity_type", entity.getId().getEntityType().name()) | |
107 | - .setParameter("entity_id", entity.getId().getEntityId()) | |
108 | - .setParameter("attribute_type", entity.getId().getAttributeType()) | |
109 | - .setParameter("attribute_key", entity.getId().getAttributeKey()) | |
110 | - .setParameter("str_v", replaceNullChars(entity.getStrValue())) | |
111 | - .setParameter("last_update_ts", entity.getLastUpdateTs()) | |
112 | - .executeUpdate(); | |
113 | - } | |
114 | - | |
115 | - @Modifying | |
116 | - private void saveOrUpdateLong(AttributeKvEntity entity, String query) { | |
117 | - entityManager.createNativeQuery(query) | |
118 | - .setParameter("entity_type", entity.getId().getEntityType().name()) | |
119 | - .setParameter("entity_id", entity.getId().getEntityId()) | |
120 | - .setParameter("attribute_type", entity.getId().getAttributeType()) | |
121 | - .setParameter("attribute_key", entity.getId().getAttributeKey()) | |
122 | - .setParameter("long_v", entity.getLongValue()) | |
123 | - .setParameter("last_update_ts", entity.getLastUpdateTs()) | |
124 | - .executeUpdate(); | |
125 | - } | |
126 | - | |
127 | - @Modifying | |
128 | - private void saveOrUpdateDouble(AttributeKvEntity entity, String query) { | |
129 | - entityManager.createNativeQuery(query) | |
130 | - .setParameter("entity_type", entity.getId().getEntityType().name()) | |
131 | - .setParameter("entity_id", entity.getId().getEntityId()) | |
132 | - .setParameter("attribute_type", entity.getId().getAttributeType()) | |
133 | - .setParameter("attribute_key", entity.getId().getAttributeKey()) | |
134 | - .setParameter("dbl_v", entity.getDoubleValue()) | |
135 | - .setParameter("last_update_ts", entity.getLastUpdateTs()) | |
136 | - .executeUpdate(); | |
137 | - } | |
138 | - | |
139 | 63 | protected void saveOrUpdate(List<AttributeKvEntity> entities) { |
140 | 64 | transactionTemplate.execute(new TransactionCallbackWithoutResult() { |
141 | 65 | @Override |
... | ... | @@ -164,11 +88,13 @@ public abstract class AttributeKvInsertRepository { |
164 | 88 | ps.setNull(4, Types.BOOLEAN); |
165 | 89 | } |
166 | 90 | |
167 | - ps.setLong(5, kvEntity.getLastUpdateTs()); | |
168 | - ps.setString(6, kvEntity.getId().getEntityType().name()); | |
169 | - ps.setString(7, kvEntity.getId().getEntityId()); | |
170 | - ps.setString(8, kvEntity.getId().getAttributeType()); | |
171 | - ps.setString(9, kvEntity.getId().getAttributeKey()); | |
91 | + ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); | |
92 | + | |
93 | + ps.setLong(6, kvEntity.getLastUpdateTs()); | |
94 | + ps.setString(7, kvEntity.getId().getEntityType().name()); | |
95 | + ps.setString(8, kvEntity.getId().getEntityId()); | |
96 | + ps.setString(9, kvEntity.getId().getAttributeType()); | |
97 | + ps.setString(10, kvEntity.getId().getAttributeKey()); | |
172 | 98 | } |
173 | 99 | |
174 | 100 | @Override |
... | ... | @@ -199,35 +125,39 @@ public abstract class AttributeKvInsertRepository { |
199 | 125 | ps.setString(2, kvEntity.getId().getEntityId()); |
200 | 126 | ps.setString(3, kvEntity.getId().getAttributeType()); |
201 | 127 | ps.setString(4, kvEntity.getId().getAttributeKey()); |
128 | + | |
202 | 129 | ps.setString(5, replaceNullChars(kvEntity.getStrValue())); |
203 | - ps.setString(10, replaceNullChars(kvEntity.getStrValue())); | |
130 | + ps.setString(11, replaceNullChars(kvEntity.getStrValue())); | |
204 | 131 | |
205 | 132 | if (kvEntity.getLongValue() != null) { |
206 | 133 | ps.setLong(6, kvEntity.getLongValue()); |
207 | - ps.setLong(11, kvEntity.getLongValue()); | |
134 | + ps.setLong(12, kvEntity.getLongValue()); | |
208 | 135 | } else { |
209 | 136 | ps.setNull(6, Types.BIGINT); |
210 | - ps.setNull(11, Types.BIGINT); | |
137 | + ps.setNull(12, Types.BIGINT); | |
211 | 138 | } |
212 | 139 | |
213 | 140 | if (kvEntity.getDoubleValue() != null) { |
214 | 141 | ps.setDouble(7, kvEntity.getDoubleValue()); |
215 | - ps.setDouble(12, kvEntity.getDoubleValue()); | |
142 | + ps.setDouble(13, kvEntity.getDoubleValue()); | |
216 | 143 | } else { |
217 | 144 | ps.setNull(7, Types.DOUBLE); |
218 | - ps.setNull(12, Types.DOUBLE); | |
145 | + ps.setNull(13, Types.DOUBLE); | |
219 | 146 | } |
220 | 147 | |
221 | 148 | if (kvEntity.getBooleanValue() != null) { |
222 | 149 | ps.setBoolean(8, kvEntity.getBooleanValue()); |
223 | - ps.setBoolean(13, kvEntity.getBooleanValue()); | |
150 | + ps.setBoolean(14, kvEntity.getBooleanValue()); | |
224 | 151 | } else { |
225 | 152 | ps.setNull(8, Types.BOOLEAN); |
226 | - ps.setNull(13, Types.BOOLEAN); | |
153 | + ps.setNull(14, Types.BOOLEAN); | |
227 | 154 | } |
228 | 155 | |
229 | - ps.setLong(9, kvEntity.getLastUpdateTs()); | |
230 | - ps.setLong(14, kvEntity.getLastUpdateTs()); | |
156 | + ps.setString(9, replaceNullChars(kvEntity.getJsonValue())); | |
157 | + ps.setString(15, replaceNullChars(kvEntity.getJsonValue())); | |
158 | + | |
159 | + ps.setLong(10, kvEntity.getLastUpdateTs()); | |
160 | + ps.setLong(16, kvEntity.getLastUpdateTs()); | |
231 | 161 | } |
232 | 162 | |
233 | 163 | @Override | ... | ... |
... | ... | @@ -30,36 +30,16 @@ import java.util.List; |
30 | 30 | @Transactional |
31 | 31 | public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository { |
32 | 32 | |
33 | - private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.long_v = null, attribute_kv.dbl_v = null "; | |
34 | - private static final String ON_STR_VALUE_UPDATE_SET_NULLS = " attribute_kv.bool_v = null, attribute_kv.long_v = null, attribute_kv.dbl_v = null "; | |
35 | - private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.bool_v = null, attribute_kv.dbl_v = null "; | |
36 | - private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = " attribute_kv.str_v = null, attribute_kv.long_v = null, attribute_kv.bool_v = null "; | |
37 | - | |
38 | - private static final String INSERT_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); | |
39 | - private static final String INSERT_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); | |
40 | - private static final String INSERT_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, ON_LONG_VALUE_UPDATE_SET_NULLS); | |
41 | - private static final String INSERT_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); | |
42 | - | |
43 | 33 | private static final String INSERT_OR_UPDATE = |
44 | - "MERGE INTO attribute_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?) " + | |
45 | - "A (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + | |
34 | + "MERGE INTO attribute_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + | |
35 | + "A (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + | |
46 | 36 | "ON (attribute_kv.entity_type=A.entity_type " + |
47 | 37 | "AND attribute_kv.entity_id=A.entity_id " + |
48 | 38 | "AND attribute_kv.attribute_type=A.attribute_type " + |
49 | 39 | "AND attribute_kv.attribute_key=A.attribute_key) " + |
50 | - "WHEN MATCHED THEN UPDATE SET attribute_kv.str_v = A.str_v, attribute_kv.long_v = A.long_v, attribute_kv.dbl_v = A.dbl_v, attribute_kv.bool_v = A.bool_v, attribute_kv.last_update_ts = A.last_update_ts " + | |
51 | - "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + | |
52 | - "VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.last_update_ts)"; | |
53 | - | |
54 | - @Override | |
55 | - public void saveOrUpdate(AttributeKvEntity entity) { | |
56 | - processSaveOrUpdate(entity, INSERT_BOOL_STATEMENT, INSERT_STR_STATEMENT, INSERT_LONG_STATEMENT, INSERT_DBL_STATEMENT); | |
57 | - } | |
58 | - | |
59 | - private static String getInsertOrUpdateString(String value, String nullValues) { | |
60 | - return "MERGE INTO attribute_kv USING(VALUES :entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) A (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) ON (attribute_kv.entity_type=A.entity_type AND attribute_kv.entity_id=A.entity_id AND attribute_kv.attribute_type=A.attribute_type AND attribute_kv.attribute_key=A.attribute_key) WHEN MATCHED THEN UPDATE SET attribute_kv." + value + " = A." + value + ", attribute_kv.last_update_ts = A.last_update_ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A." + value + ", A.last_update_ts)"; | |
61 | - } | |
62 | - | |
40 | + "WHEN MATCHED THEN UPDATE SET attribute_kv.str_v = A.str_v, attribute_kv.long_v = A.long_v, attribute_kv.dbl_v = A.dbl_v, attribute_kv.bool_v = A.bool_v, attribute_kv.json_v = A.json_v, attribute_kv.last_update_ts = A.last_update_ts " + | |
41 | + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + | |
42 | + "VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.json_v, A.last_update_ts)"; | |
63 | 43 | |
64 | 44 | @Override |
65 | 45 | protected void saveOrUpdate(List<AttributeKvEntity> entities) { |
... | ... | @@ -89,7 +69,9 @@ public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository |
89 | 69 | ps.setNull(8, Types.BOOLEAN); |
90 | 70 | } |
91 | 71 | |
92 | - ps.setLong(9, entity.getLastUpdateTs()); | |
72 | + ps.setString(9, entity.getJsonValue()); | |
73 | + | |
74 | + ps.setLong(10, entity.getLastUpdateTs()); | |
93 | 75 | }); |
94 | 76 | }); |
95 | 77 | } | ... | ... |
... | ... | @@ -128,6 +128,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl |
128 | 128 | entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); |
129 | 129 | entity.setLongValue(attribute.getLongValue().orElse(null)); |
130 | 130 | entity.setBooleanValue(attribute.getBooleanValue().orElse(null)); |
131 | + entity.setJsonValue(attribute.getJsonValue().orElse(null)); | |
131 | 132 | return addToQueue(entity); |
132 | 133 | } |
133 | 134 | ... | ... |
... | ... | @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sql.attributes; |
17 | 17 | |
18 | 18 | import org.springframework.stereotype.Repository; |
19 | 19 | import org.springframework.transaction.annotation.Transactional; |
20 | -import org.thingsboard.server.dao.model.sql.AttributeKvEntity; | |
21 | 20 | import org.thingsboard.server.dao.util.PsqlDao; |
22 | 21 | import org.thingsboard.server.dao.util.SqlDao; |
23 | 22 | |
... | ... | @@ -27,22 +26,4 @@ import org.thingsboard.server.dao.util.SqlDao; |
27 | 26 | @Transactional |
28 | 27 | public class PsqlAttributesInsertRepository extends AttributeKvInsertRepository { |
29 | 28 | |
30 | - private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null"; | |
31 | - private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null"; | |
32 | - private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null"; | |
33 | - private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null"; | |
34 | - | |
35 | - private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); | |
36 | - private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); | |
37 | - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS); | |
38 | - private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); | |
39 | - | |
40 | - @Override | |
41 | - public void saveOrUpdate(AttributeKvEntity entity) { | |
42 | - processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); | |
43 | - } | |
44 | - | |
45 | - private static String getInsertOrUpdateString(String value, String nullValues) { | |
46 | - return "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (:entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) DO UPDATE SET " + value + " = :" + value + ", last_update_ts = :last_update_ts," + nullValues; | |
47 | - } | |
48 | 29 | } |
\ No newline at end of file | ... | ... |
... | ... | @@ -253,6 +253,8 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
253 | 253 | latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); |
254 | 254 | latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null)); |
255 | 255 | latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); |
256 | + latestEntity.setJsonValue(tsKvEntry.getJsonValue().orElse(null)); | |
257 | + | |
256 | 258 | return tsLatestQueue.add(latestEntity); |
257 | 259 | } |
258 | 260 | ... | ... |
... | ... | @@ -37,14 +37,14 @@ import java.util.List; |
37 | 37 | public class HsqlInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository<TsKvEntity> { |
38 | 38 | |
39 | 39 | private static final String INSERT_OR_UPDATE = |
40 | - "MERGE INTO ts_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?) " + | |
41 | - "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + | |
40 | + "MERGE INTO ts_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?) " + | |
41 | + "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + | |
42 | 42 | "ON (ts_kv.entity_id=T.entity_id " + |
43 | 43 | "AND ts_kv.key=T.key " + |
44 | 44 | "AND ts_kv.ts=T.ts) " + |
45 | - "WHEN MATCHED THEN UPDATE SET ts_kv.bool_v = T.bool_v, ts_kv.str_v = T.str_v, ts_kv.long_v = T.long_v, ts_kv.dbl_v = T.dbl_v " + | |
46 | - "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + | |
47 | - "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);"; | |
45 | + "WHEN MATCHED THEN UPDATE SET ts_kv.bool_v = T.bool_v, ts_kv.str_v = T.str_v, ts_kv.long_v = T.long_v, ts_kv.dbl_v = T.dbl_v ,ts_kv.json_v = T.json_v " + | |
46 | + "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + | |
47 | + "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v, T.json_v);"; | |
48 | 48 | |
49 | 49 | @Override |
50 | 50 | public void saveOrUpdate(List<EntityContainer<TsKvEntity>> entities) { |
... | ... | @@ -76,6 +76,8 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements |
76 | 76 | } else { |
77 | 77 | ps.setNull(7, Types.DOUBLE); |
78 | 78 | } |
79 | + | |
80 | + ps.setString(8, tsKvEntity.getJsonValue()); | |
79 | 81 | } |
80 | 82 | |
81 | 83 | @Override | ... | ... |
... | ... | @@ -97,7 +97,8 @@ public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo |
97 | 97 | @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " + |
98 | 98 | "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " + |
99 | 99 | "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + |
100 | - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + | |
100 | + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + | |
101 | + "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + | |
101 | 102 | "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") |
102 | 103 | CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId, |
103 | 104 | @Param("entityKey") int entityKey, | ... | ... |
... | ... | @@ -36,11 +36,11 @@ import java.util.List; |
36 | 36 | public class HsqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository { |
37 | 37 | |
38 | 38 | private static final String INSERT_OR_UPDATE = |
39 | - "MERGE INTO ts_kv_latest USING(VALUES ?, ?, ?, ?, ?, ?, ?) " + | |
40 | - "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + | |
39 | + "MERGE INTO ts_kv_latest USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?) " + | |
40 | + "T (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + | |
41 | 41 | "ON (ts_kv_latest.entity_id=T.entity_id " + |
42 | 42 | "AND ts_kv_latest.key=T.key) " + |
43 | - "WHEN MATCHED THEN UPDATE SET ts_kv_latest.ts = T.ts, ts_kv_latest.bool_v = T.bool_v, ts_kv_latest.str_v = T.str_v, ts_kv_latest.long_v = T.long_v, ts_kv_latest.dbl_v = T.dbl_v " + | |
43 | + "WHEN MATCHED THEN UPDATE SET ts_kv_latest.ts = T.ts, ts_kv_latest.bool_v = T.bool_v, ts_kv_latest.str_v = T.str_v, ts_kv_latest.long_v = T.long_v, ts_kv_latest.dbl_v = T.dbl_v, ts_kv_latest.json_v = T.json_v " + | |
44 | 44 | "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + |
45 | 45 | "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);"; |
46 | 46 | |
... | ... | @@ -72,6 +72,8 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple |
72 | 72 | } else { |
73 | 73 | ps.setNull(7, Types.DOUBLE); |
74 | 74 | } |
75 | + | |
76 | + ps.setString(8, entities.get(i).getJsonValue()); | |
75 | 77 | } |
76 | 78 | |
77 | 79 | @Override | ... | ... |
... | ... | @@ -38,12 +38,12 @@ import java.util.List; |
38 | 38 | public class PsqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository { |
39 | 39 | |
40 | 40 | private static final String BATCH_UPDATE = |
41 | - "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ? WHERE entity_id = ? and key = ?"; | |
41 | + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json) WHERE entity_id = ? and key = ?"; | |
42 | 42 | |
43 | 43 | |
44 | 44 | private static final String INSERT_OR_UPDATE = |
45 | - "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?) " + | |
46 | - "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; | |
45 | + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + | |
46 | + "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; | |
47 | 47 | |
48 | 48 | @Override |
49 | 49 | public void saveOrUpdate(List<TsKvLatestEntity> entities) { |
... | ... | @@ -76,8 +76,10 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple |
76 | 76 | ps.setNull(5, Types.DOUBLE); |
77 | 77 | } |
78 | 78 | |
79 | - ps.setObject(6, tsKvLatestEntity.getEntityId()); | |
80 | - ps.setInt(7, tsKvLatestEntity.getKey()); | |
79 | + ps.setString(6, replaceNullChars(tsKvLatestEntity.getJsonValue())); | |
80 | + | |
81 | + ps.setObject(7, tsKvLatestEntity.getEntityId()); | |
82 | + ps.setInt(8, tsKvLatestEntity.getKey()); | |
81 | 83 | } |
82 | 84 | |
83 | 85 | @Override |
... | ... | @@ -106,36 +108,39 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple |
106 | 108 | TsKvLatestEntity tsKvLatestEntity = insertEntities.get(i); |
107 | 109 | ps.setObject(1, tsKvLatestEntity.getEntityId()); |
108 | 110 | ps.setInt(2, tsKvLatestEntity.getKey()); |
111 | + | |
109 | 112 | ps.setLong(3, tsKvLatestEntity.getTs()); |
110 | - ps.setLong(8, tsKvLatestEntity.getTs()); | |
113 | + ps.setLong(9, tsKvLatestEntity.getTs()); | |
111 | 114 | |
112 | 115 | if (tsKvLatestEntity.getBooleanValue() != null) { |
113 | 116 | ps.setBoolean(4, tsKvLatestEntity.getBooleanValue()); |
114 | - ps.setBoolean(9, tsKvLatestEntity.getBooleanValue()); | |
117 | + ps.setBoolean(10, tsKvLatestEntity.getBooleanValue()); | |
115 | 118 | } else { |
116 | 119 | ps.setNull(4, Types.BOOLEAN); |
117 | - ps.setNull(9, Types.BOOLEAN); | |
120 | + ps.setNull(10, Types.BOOLEAN); | |
118 | 121 | } |
119 | 122 | |
120 | 123 | ps.setString(5, replaceNullChars(tsKvLatestEntity.getStrValue())); |
121 | - ps.setString(10, replaceNullChars(tsKvLatestEntity.getStrValue())); | |
122 | - | |
124 | + ps.setString(11, replaceNullChars(tsKvLatestEntity.getStrValue())); | |
123 | 125 | |
124 | 126 | if (tsKvLatestEntity.getLongValue() != null) { |
125 | 127 | ps.setLong(6, tsKvLatestEntity.getLongValue()); |
126 | - ps.setLong(11, tsKvLatestEntity.getLongValue()); | |
128 | + ps.setLong(12, tsKvLatestEntity.getLongValue()); | |
127 | 129 | } else { |
128 | 130 | ps.setNull(6, Types.BIGINT); |
129 | - ps.setNull(11, Types.BIGINT); | |
131 | + ps.setNull(12, Types.BIGINT); | |
130 | 132 | } |
131 | 133 | |
132 | 134 | if (tsKvLatestEntity.getDoubleValue() != null) { |
133 | 135 | ps.setDouble(7, tsKvLatestEntity.getDoubleValue()); |
134 | - ps.setDouble(12, tsKvLatestEntity.getDoubleValue()); | |
136 | + ps.setDouble(13, tsKvLatestEntity.getDoubleValue()); | |
135 | 137 | } else { |
136 | 138 | ps.setNull(7, Types.DOUBLE); |
137 | - ps.setNull(12, Types.DOUBLE); | |
139 | + ps.setNull(13, Types.DOUBLE); | |
138 | 140 | } |
141 | + | |
142 | + ps.setString(8, replaceNullChars(tsKvLatestEntity.getJsonValue())); | |
143 | + ps.setString(14, replaceNullChars(tsKvLatestEntity.getJsonValue())); | |
139 | 144 | } |
140 | 145 | |
141 | 146 | @Override | ... | ... |
... | ... | @@ -29,8 +29,9 @@ import java.util.UUID; |
29 | 29 | public class SearchTsKvLatestRepository { |
30 | 30 | |
31 | 31 | public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId"; |
32 | + | |
32 | 33 | public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," + |
33 | - " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " + | |
34 | + " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " + | |
34 | 35 | "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)"; |
35 | 36 | |
36 | 37 | @PersistenceContext | ... | ... |
... | ... | @@ -20,11 +20,7 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey; |
20 | 20 | import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; |
21 | 21 | import org.thingsboard.server.dao.util.SqlDao; |
22 | 22 | |
23 | -import java.util.List; | |
24 | -import java.util.UUID; | |
25 | - | |
26 | 23 | @SqlDao |
27 | 24 | public interface TsKvLatestRepository extends CrudRepository<TsKvLatestEntity, TsKvLatestCompositeKey> { |
28 | 25 | |
29 | - List<TsKvLatestEntity> findAllByEntityId(UUID entityId); | |
30 | 26 | } | ... | ... |
... | ... | @@ -104,6 +104,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa |
104 | 104 | entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); |
105 | 105 | entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); |
106 | 106 | entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); |
107 | + entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null)); | |
107 | 108 | PsqlPartition psqlPartition = toPartition(tsKvEntry.getTs()); |
108 | 109 | log.trace("Saving entity: {}", entity); |
109 | 110 | return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); | ... | ... |
... | ... | @@ -41,8 +41,8 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements |
41 | 41 | |
42 | 42 | private static final String INSERT_INTO_TS_KV = "INSERT INTO ts_kv_"; |
43 | 43 | |
44 | - private static final String VALUES_ON_CONFLICT_DO_UPDATE = " (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES (?, ?, ?, ?, ?, ?, ?) " + | |
45 | - "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; | |
44 | + private static final String VALUES_ON_CONFLICT_DO_UPDATE = " (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES (?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + | |
45 | + "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; | |
46 | 46 | |
47 | 47 | @Override |
48 | 48 | public void saveOrUpdate(List<EntityContainer<TsKvEntity>> entities) { |
... | ... | @@ -61,30 +61,33 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements |
61 | 61 | |
62 | 62 | if (tsKvEntity.getBooleanValue() != null) { |
63 | 63 | ps.setBoolean(4, tsKvEntity.getBooleanValue()); |
64 | - ps.setBoolean(8, tsKvEntity.getBooleanValue()); | |
64 | + ps.setBoolean(9, tsKvEntity.getBooleanValue()); | |
65 | 65 | } else { |
66 | 66 | ps.setNull(4, Types.BOOLEAN); |
67 | - ps.setNull(8, Types.BOOLEAN); | |
67 | + ps.setNull(9, Types.BOOLEAN); | |
68 | 68 | } |
69 | 69 | |
70 | 70 | ps.setString(5, replaceNullChars(tsKvEntity.getStrValue())); |
71 | - ps.setString(9, replaceNullChars(tsKvEntity.getStrValue())); | |
71 | + ps.setString(10, replaceNullChars(tsKvEntity.getStrValue())); | |
72 | 72 | |
73 | 73 | |
74 | 74 | if (tsKvEntity.getLongValue() != null) { |
75 | 75 | ps.setLong(6, tsKvEntity.getLongValue()); |
76 | - ps.setLong(10, tsKvEntity.getLongValue()); | |
76 | + ps.setLong(11, tsKvEntity.getLongValue()); | |
77 | 77 | } else { |
78 | 78 | ps.setNull(6, Types.BIGINT); |
79 | - ps.setNull(10, Types.BIGINT); | |
79 | + ps.setNull(11, Types.BIGINT); | |
80 | 80 | } |
81 | 81 | |
82 | 82 | if (tsKvEntity.getDoubleValue() != null) { |
83 | 83 | ps.setDouble(7, tsKvEntity.getDoubleValue()); |
84 | - ps.setDouble(11, tsKvEntity.getDoubleValue()); | |
84 | + ps.setDouble(12, tsKvEntity.getDoubleValue()); | |
85 | 85 | } else { |
86 | 86 | ps.setNull(7, Types.DOUBLE); |
87 | - ps.setNull(11, Types.DOUBLE); | |
87 | + ps.setNull(12, Types.DOUBLE); | |
88 | + | |
89 | + ps.setString(8, replaceNullChars(tsKvEntity.getJsonValue())); | |
90 | + ps.setString(13, replaceNullChars(tsKvEntity.getJsonValue())); | |
88 | 91 | } |
89 | 92 | } |
90 | 93 | ... | ... |
... | ... | @@ -98,7 +98,8 @@ public interface TsKvPsqlRepository extends CrudRepository<TsKvEntity, TsKvCompo |
98 | 98 | @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " + |
99 | 99 | "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " + |
100 | 100 | "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + |
101 | - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + | |
101 | + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + | |
102 | + "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + | |
102 | 103 | "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") |
103 | 104 | CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId, |
104 | 105 | @Param("entityKey") int entityKey, | ... | ... |
... | ... | @@ -36,18 +36,17 @@ public class AggregationRepository { |
36 | 36 | public static final String FIND_SUM = "findSum"; |
37 | 37 | public static final String FIND_COUNT = "findCount"; |
38 | 38 | |
39 | - | |
40 | 39 | public static final String FROM_WHERE_CLAUSE = "FROM tenant_ts_kv tskv WHERE tskv.tenant_id = cast(:tenantId AS uuid) AND tskv.entity_id = cast(:entityId AS uuid) AND tskv.key= cast(:entityKey AS int) AND tskv.ts > :startTs AND tskv.ts <= :endTs GROUP BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket ORDER BY tskv.tenant_id, tskv.entity_id, tskv.key, tsBucket"; |
41 | 40 | |
42 | 41 | public static final String FIND_AVG_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(COALESCE(tskv.long_v, 0)) AS longValue, SUM(COALESCE(tskv.dbl_v, 0.0)) AS doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, null AS strValue, 'AVG' AS aggType "; |
43 | 42 | |
44 | - public static final String FIND_MAX_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, MAX(COALESCE(tskv.long_v, -9223372036854775807)) AS longValue, MAX(COALESCE(tskv.dbl_v, -1.79769E+308)) as doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, MAX(tskv.str_v) AS strValue, 'MAX' AS aggType "; | |
43 | + public static final String FIND_MAX_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, MAX(COALESCE(tskv.long_v, -9223372036854775807)) AS longValue, MAX(COALESCE(tskv.dbl_v, -1.79769E+308)) as doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, MAX(tskv.str_v) AS strValue, MAX(tskv.json_v) AS jsonValue, 'MAX' AS aggType "; | |
45 | 44 | |
46 | - public static final String FIND_MIN_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, MIN(COALESCE(tskv.long_v, 9223372036854775807)) AS longValue, MIN(COALESCE(tskv.dbl_v, 1.79769E+308)) as doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, MIN(tskv.str_v) AS strValue, 'MIN' AS aggType "; | |
45 | + public static final String FIND_MIN_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, MIN(COALESCE(tskv.long_v, 9223372036854775807)) AS longValue, MIN(COALESCE(tskv.dbl_v, 1.79769E+308)) as doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, MIN(tskv.str_v) AS strValue, MIN(tskv.json_v) AS jsonValue,'MIN' AS aggType "; | |
47 | 46 | |
48 | - public static final String FIND_SUM_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(COALESCE(tskv.long_v, 0)) AS longValue, SUM(COALESCE(tskv.dbl_v, 0.0)) AS doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, null AS strValue, 'SUM' AS aggType "; | |
47 | + public static final String FIND_SUM_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(COALESCE(tskv.long_v, 0)) AS longValue, SUM(COALESCE(tskv.dbl_v, 0.0)) AS doubleValue, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longCountValue, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleCountValue, null AS strValue, null AS jsonValue, 'SUM' AS aggType "; | |
49 | 48 | |
50 | - public static final String FIND_COUNT_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(CASE WHEN tskv.bool_v IS NULL THEN 0 ELSE 1 END) AS booleanValueCount, SUM(CASE WHEN tskv.str_v IS NULL THEN 0 ELSE 1 END) AS strValueCount, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longValueCount, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleValueCount "; | |
49 | + public static final String FIND_COUNT_QUERY = "SELECT time_bucket(:timeBucket, tskv.ts) AS tsBucket, :timeBucket AS interval, SUM(CASE WHEN tskv.bool_v IS NULL THEN 0 ELSE 1 END) AS booleanValueCount, SUM(CASE WHEN tskv.str_v IS NULL THEN 0 ELSE 1 END) AS strValueCount, SUM(CASE WHEN tskv.long_v IS NULL THEN 0 ELSE 1 END) AS longValueCount, SUM(CASE WHEN tskv.dbl_v IS NULL THEN 0 ELSE 1 END) AS doubleValueCount, SUM(CASE WHEN tskv.json_v IS NULL THEN 0 ELSE 1 END) AS jsonValueCount "; | |
51 | 50 | |
52 | 51 | @PersistenceContext |
53 | 52 | private EntityManager entityManager; | ... | ... |
... | ... | @@ -37,8 +37,8 @@ import java.util.List; |
37 | 37 | public class TimescaleInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository<TimescaleTsKvEntity> { |
38 | 38 | |
39 | 39 | private static final String INSERT_OR_UPDATE = |
40 | - "INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?) " + | |
41 | - "ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?;"; | |
40 | + "INSERT INTO tenant_ts_kv (tenant_id, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + | |
41 | + "ON CONFLICT (tenant_id, entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);"; | |
42 | 42 | |
43 | 43 | @Override |
44 | 44 | public void saveOrUpdate(List<EntityContainer<TimescaleTsKvEntity>> entities) { |
... | ... | @@ -56,28 +56,31 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem |
56 | 56 | ps.setBoolean(9, tsKvEntity.getBooleanValue()); |
57 | 57 | } else { |
58 | 58 | ps.setNull(5, Types.BOOLEAN); |
59 | - ps.setNull(9, Types.BOOLEAN); | |
59 | + ps.setNull(10, Types.BOOLEAN); | |
60 | 60 | } |
61 | 61 | |
62 | 62 | ps.setString(6, replaceNullChars(tsKvEntity.getStrValue())); |
63 | - ps.setString(10, replaceNullChars(tsKvEntity.getStrValue())); | |
63 | + ps.setString(11, replaceNullChars(tsKvEntity.getStrValue())); | |
64 | 64 | |
65 | 65 | |
66 | 66 | if (tsKvEntity.getLongValue() != null) { |
67 | 67 | ps.setLong(7, tsKvEntity.getLongValue()); |
68 | - ps.setLong(11, tsKvEntity.getLongValue()); | |
68 | + ps.setLong(12, tsKvEntity.getLongValue()); | |
69 | 69 | } else { |
70 | 70 | ps.setNull(7, Types.BIGINT); |
71 | - ps.setNull(11, Types.BIGINT); | |
71 | + ps.setNull(12, Types.BIGINT); | |
72 | 72 | } |
73 | 73 | |
74 | 74 | if (tsKvEntity.getDoubleValue() != null) { |
75 | 75 | ps.setDouble(8, tsKvEntity.getDoubleValue()); |
76 | - ps.setDouble(12, tsKvEntity.getDoubleValue()); | |
76 | + ps.setDouble(13, tsKvEntity.getDoubleValue()); | |
77 | 77 | } else { |
78 | 78 | ps.setNull(8, Types.DOUBLE); |
79 | - ps.setNull(12, Types.DOUBLE); | |
79 | + ps.setNull(13, Types.DOUBLE); | |
80 | 80 | } |
81 | + | |
82 | + ps.setString(9, replaceNullChars(tsKvEntity.getJsonValue())); | |
83 | + ps.setString(14, replaceNullChars(tsKvEntity.getJsonValue())); | |
81 | 84 | } |
82 | 85 | |
83 | 86 | @Override | ... | ... |
... | ... | @@ -174,6 +174,8 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements |
174 | 174 | entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null)); |
175 | 175 | entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); |
176 | 176 | entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); |
177 | + entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null)); | |
178 | + | |
177 | 179 | log.trace("Saving entity to timescale db: {}", entity); |
178 | 180 | return tsQueue.add(new EntityContainer(entity, null)); |
179 | 181 | } | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
23 | 23 | import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
24 | 24 | import org.thingsboard.server.common.data.kv.DataType; |
25 | 25 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
26 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
26 | 27 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
27 | 28 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
28 | 29 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
... | ... | @@ -41,10 +42,12 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
41 | 42 | private static final int DOUBLE_CNT_POS = 1; |
42 | 43 | private static final int BOOL_CNT_POS = 2; |
43 | 44 | private static final int STR_CNT_POS = 3; |
44 | - private static final int LONG_POS = 4; | |
45 | - private static final int DOUBLE_POS = 5; | |
46 | - private static final int BOOL_POS = 6; | |
47 | - private static final int STR_POS = 7; | |
45 | + private static final int JSON_CNT_POS = 4; | |
46 | + private static final int LONG_POS = 5; | |
47 | + private static final int DOUBLE_POS = 6; | |
48 | + private static final int BOOL_POS = 7; | |
49 | + private static final int STR_POS = 8; | |
50 | + private static final int JSON_POS = 9; | |
48 | 51 | |
49 | 52 | private final Aggregation aggregation; |
50 | 53 | private final String key; |
... | ... | @@ -72,7 +75,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
72 | 75 | } |
73 | 76 | } |
74 | 77 | return processAggregationResult(aggResult); |
75 | - }catch (Exception e){ | |
78 | + } catch (Exception e) { | |
76 | 79 | log.error("[{}][{}][{}] Failed to aggregate data", key, ts, aggregation, e); |
77 | 80 | return Optional.empty(); |
78 | 81 | } |
... | ... | @@ -85,11 +88,13 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
85 | 88 | Double curDValue = null; |
86 | 89 | Boolean curBValue = null; |
87 | 90 | String curSValue = null; |
91 | + String curJValue = null; | |
88 | 92 | |
89 | 93 | long longCount = row.getLong(LONG_CNT_POS); |
90 | 94 | long doubleCount = row.getLong(DOUBLE_CNT_POS); |
91 | 95 | long boolCount = row.getLong(BOOL_CNT_POS); |
92 | 96 | long strCount = row.getLong(STR_CNT_POS); |
97 | + long jsonCount = row.getLong(JSON_CNT_POS); | |
93 | 98 | |
94 | 99 | if (longCount > 0 || doubleCount > 0) { |
95 | 100 | if (longCount > 0) { |
... | ... | @@ -111,6 +116,10 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
111 | 116 | aggResult.dataType = DataType.STRING; |
112 | 117 | curCount = strCount; |
113 | 118 | curSValue = getStringValue(row); |
119 | + } else if (jsonCount > 0) { | |
120 | + aggResult.dataType = DataType.JSON; | |
121 | + curCount = jsonCount; | |
122 | + curJValue = getJsonValue(row); | |
114 | 123 | } else { |
115 | 124 | return; |
116 | 125 | } |
... | ... | @@ -120,9 +129,9 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
120 | 129 | } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) { |
121 | 130 | processAvgOrSumAggregation(aggResult, curCount, curLValue, curDValue); |
122 | 131 | } else if (aggregation == Aggregation.MIN) { |
123 | - processMinAggregation(aggResult, curLValue, curDValue, curBValue, curSValue); | |
132 | + processMinAggregation(aggResult, curLValue, curDValue, curBValue, curSValue, curJValue); | |
124 | 133 | } else if (aggregation == Aggregation.MAX) { |
125 | - processMaxAggregation(aggResult, curLValue, curDValue, curBValue, curSValue); | |
134 | + processMaxAggregation(aggResult, curLValue, curDValue, curBValue, curSValue, curJValue); | |
126 | 135 | } |
127 | 136 | } |
128 | 137 | |
... | ... | @@ -136,7 +145,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
136 | 145 | } |
137 | 146 | } |
138 | 147 | |
139 | - private void processMinAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) { | |
148 | + private void processMinAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue, String curJValue) { | |
140 | 149 | if (curDValue != null || curLValue != null) { |
141 | 150 | if (curDValue != null) { |
142 | 151 | aggResult.dValue = aggResult.dValue == null ? curDValue : Math.min(aggResult.dValue, curDValue); |
... | ... | @@ -148,10 +157,12 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
148 | 157 | aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue && curBValue; |
149 | 158 | } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) < 0)) { |
150 | 159 | aggResult.sValue = curSValue; |
160 | + } else if (curJValue != null && (aggResult.jValue == null || curJValue.compareTo(aggResult.jValue) < 0)) { | |
161 | + aggResult.jValue = curJValue; | |
151 | 162 | } |
152 | 163 | } |
153 | 164 | |
154 | - private void processMaxAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) { | |
165 | + private void processMaxAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue, String curJValue) { | |
155 | 166 | if (curDValue != null || curLValue != null) { |
156 | 167 | if (curDValue != null) { |
157 | 168 | aggResult.dValue = aggResult.dValue == null ? curDValue : Math.max(aggResult.dValue, curDValue); |
... | ... | @@ -163,6 +174,8 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
163 | 174 | aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue || curBValue; |
164 | 175 | } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) > 0)) { |
165 | 176 | aggResult.sValue = curSValue; |
177 | + } else if (curJValue != null && (aggResult.jValue == null || curJValue.compareTo(aggResult.jValue) > 0)) { | |
178 | + aggResult.jValue = curJValue; | |
166 | 179 | } |
167 | 180 | } |
168 | 181 | |
... | ... | @@ -182,6 +195,14 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
182 | 195 | } |
183 | 196 | } |
184 | 197 | |
198 | + private String getJsonValue(Row row) { | |
199 | + if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) { | |
200 | + return row.getString(JSON_POS); | |
201 | + } else { | |
202 | + return null; | |
203 | + } | |
204 | + } | |
205 | + | |
185 | 206 | private Long getLongValue(Row row) { |
186 | 207 | if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX |
187 | 208 | || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) { |
... | ... | @@ -223,7 +244,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
223 | 244 | if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) { |
224 | 245 | return Optional.empty(); |
225 | 246 | } else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) { |
226 | - if(aggregation == Aggregation.AVG || aggResult.hasDouble) { | |
247 | + if (aggregation == Aggregation.AVG || aggResult.hasDouble) { | |
227 | 248 | double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L); |
228 | 249 | return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count)))); |
229 | 250 | } else { |
... | ... | @@ -235,15 +256,17 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
235 | 256 | |
236 | 257 | private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) { |
237 | 258 | if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) { |
238 | - if(aggResult.hasDouble) { | |
259 | + if (aggResult.hasDouble) { | |
239 | 260 | double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE); |
240 | 261 | double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE); |
241 | 262 | return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL)))); |
242 | 263 | } else { |
243 | 264 | return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue))); |
244 | 265 | } |
245 | - } else if (aggResult.dataType == DataType.STRING) { | |
266 | + } else if (aggResult.dataType == DataType.STRING) { | |
246 | 267 | return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue))); |
268 | + } else if (aggResult.dataType == DataType.JSON) { | |
269 | + return Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, aggResult.jValue))); | |
247 | 270 | } else { |
248 | 271 | return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, aggResult.bValue))); |
249 | 272 | } |
... | ... | @@ -253,6 +276,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct |
253 | 276 | DataType dataType = null; |
254 | 277 | Boolean bValue = null; |
255 | 278 | String sValue = null; |
279 | + String jValue = null; | |
256 | 280 | Double dValue = null; |
257 | 281 | Long lValue = null; |
258 | 282 | long count = 0; | ... | ... |
... | ... | @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.FutureCallback; |
29 | 29 | import com.google.common.util.concurrent.Futures; |
30 | 30 | import com.google.common.util.concurrent.ListenableFuture; |
31 | 31 | import lombok.extern.slf4j.Slf4j; |
32 | +import org.apache.commons.lang3.StringUtils; | |
32 | 33 | import org.springframework.beans.factory.annotation.Autowired; |
33 | 34 | import org.springframework.beans.factory.annotation.Value; |
34 | 35 | import org.springframework.core.env.Environment; |
... | ... | @@ -42,6 +43,7 @@ import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
42 | 43 | import org.thingsboard.server.common.data.kv.DataType; |
43 | 44 | import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; |
44 | 45 | import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
46 | +import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
45 | 47 | import org.thingsboard.server.common.data.kv.KvEntry; |
46 | 48 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
47 | 49 | import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
... | ... | @@ -337,21 +339,31 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
337 | 339 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); |
338 | 340 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); |
339 | 341 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); |
342 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); | |
340 | 343 | break; |
341 | 344 | case BOOLEAN: |
342 | 345 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); |
343 | 346 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); |
344 | 347 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); |
348 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); | |
345 | 349 | break; |
346 | 350 | case DOUBLE: |
347 | 351 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); |
348 | 352 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); |
349 | 353 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); |
354 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); | |
350 | 355 | break; |
351 | 356 | case STRING: |
352 | 357 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); |
353 | 358 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); |
354 | 359 | futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); |
360 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.JSON)); | |
361 | + break; | |
362 | + case JSON: | |
363 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); | |
364 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); | |
365 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); | |
366 | + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); | |
355 | 367 | break; |
356 | 368 | } |
357 | 369 | } |
... | ... | @@ -411,6 +423,13 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
411 | 423 | .set(5, tsKvEntry.getStrValue().orElse(null), String.class) |
412 | 424 | .set(6, tsKvEntry.getLongValue().orElse(null), Long.class) |
413 | 425 | .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class); |
426 | + Optional<String> jsonV = tsKvEntry.getJsonValue(); | |
427 | + if (jsonV.isPresent()) { | |
428 | + stmt.setString(8, tsKvEntry.getJsonValue().get()); | |
429 | + } else { | |
430 | + stmt.setToNull(8); | |
431 | + } | |
432 | + | |
414 | 433 | return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); |
415 | 434 | } |
416 | 435 | |
... | ... | @@ -669,7 +688,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
669 | 688 | if (boolV != null) { |
670 | 689 | kvEntry = new BooleanDataEntry(key, boolV); |
671 | 690 | } else { |
672 | - log.warn("All values in key-value row are nullable "); | |
691 | + String jsonV = row.get(ModelConstants.JSON_VALUE_COLUMN, String.class); | |
692 | + if (StringUtils.isNoneEmpty(jsonV)) { | |
693 | + kvEntry = new JsonDataEntry(key, jsonV); | |
694 | + } else { | |
695 | + log.warn("All values in key-value row are nullable "); | |
696 | + } | |
673 | 697 | } |
674 | 698 | } |
675 | 699 | } |
... | ... | @@ -772,8 +796,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
772 | 796 | "," + ModelConstants.BOOLEAN_VALUE_COLUMN + |
773 | 797 | "," + ModelConstants.STRING_VALUE_COLUMN + |
774 | 798 | "," + ModelConstants.LONG_VALUE_COLUMN + |
775 | - "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + | |
776 | - " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); | |
799 | + "," + ModelConstants.DOUBLE_VALUE_COLUMN + | |
800 | + "," + ModelConstants.JSON_VALUE_COLUMN + ")" + | |
801 | + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)"); | |
777 | 802 | } |
778 | 803 | return latestInsertStmt; |
779 | 804 | } |
... | ... | @@ -812,7 +837,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
812 | 837 | ModelConstants.STRING_VALUE_COLUMN + "," + |
813 | 838 | ModelConstants.BOOLEAN_VALUE_COLUMN + "," + |
814 | 839 | ModelConstants.LONG_VALUE_COLUMN + "," + |
815 | - ModelConstants.DOUBLE_VALUE_COLUMN + " " + | |
840 | + ModelConstants.DOUBLE_VALUE_COLUMN + "," + | |
841 | + ModelConstants.JSON_VALUE_COLUMN + " " + | |
816 | 842 | "FROM " + ModelConstants.TS_KV_LATEST_CF + " " + |
817 | 843 | "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + |
818 | 844 | "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM + |
... | ... | @@ -829,7 +855,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
829 | 855 | ModelConstants.STRING_VALUE_COLUMN + "," + |
830 | 856 | ModelConstants.BOOLEAN_VALUE_COLUMN + "," + |
831 | 857 | ModelConstants.LONG_VALUE_COLUMN + "," + |
832 | - ModelConstants.DOUBLE_VALUE_COLUMN + " " + | |
858 | + ModelConstants.DOUBLE_VALUE_COLUMN + "," + | |
859 | + ModelConstants.JSON_VALUE_COLUMN + " " + | |
833 | 860 | "FROM " + ModelConstants.TS_KV_LATEST_CF + " " + |
834 | 861 | "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + |
835 | 862 | "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM); |
... | ... | @@ -847,6 +874,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
847 | 874 | return ModelConstants.LONG_VALUE_COLUMN; |
848 | 875 | case DOUBLE: |
849 | 876 | return ModelConstants.DOUBLE_VALUE_COLUMN; |
877 | + case JSON: | |
878 | + return ModelConstants.JSON_VALUE_COLUMN; | |
850 | 879 | default: |
851 | 880 | throw new RuntimeException("Not implemented!"); |
852 | 881 | } |
... | ... | @@ -856,27 +885,23 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
856 | 885 | switch (kvEntry.getDataType()) { |
857 | 886 | case BOOLEAN: |
858 | 887 | Optional<Boolean> booleanValue = kvEntry.getBooleanValue(); |
859 | - if (booleanValue.isPresent()) { | |
860 | - stmt.setBool(column, booleanValue.get().booleanValue()); | |
861 | - } | |
888 | + booleanValue.ifPresent(b -> stmt.setBool(column, b)); | |
862 | 889 | break; |
863 | 890 | case STRING: |
864 | 891 | Optional<String> stringValue = kvEntry.getStrValue(); |
865 | - if (stringValue.isPresent()) { | |
866 | - stmt.setString(column, stringValue.get()); | |
867 | - } | |
892 | + stringValue.ifPresent(s -> stmt.setString(column, s)); | |
868 | 893 | break; |
869 | 894 | case LONG: |
870 | 895 | Optional<Long> longValue = kvEntry.getLongValue(); |
871 | - if (longValue.isPresent()) { | |
872 | - stmt.setLong(column, longValue.get().longValue()); | |
873 | - } | |
896 | + longValue.ifPresent(l -> stmt.setLong(column, l)); | |
874 | 897 | break; |
875 | 898 | case DOUBLE: |
876 | 899 | Optional<Double> doubleValue = kvEntry.getDoubleValue(); |
877 | - if (doubleValue.isPresent()) { | |
878 | - stmt.setDouble(column, doubleValue.get().doubleValue()); | |
879 | - } | |
900 | + doubleValue.ifPresent(d -> stmt.setDouble(column, d)); | |
901 | + break; | |
902 | + case JSON: | |
903 | + Optional<String> jsonValue = kvEntry.getJsonValue(); | |
904 | + jsonValue.ifPresent(jsonObject -> stmt.setString(column, jsonObject)); | |
880 | 905 | break; |
881 | 906 | } |
882 | 907 | } | ... | ... |
... | ... | @@ -410,6 +410,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.attributes_kv_cf ( |
410 | 410 | str_v text, |
411 | 411 | long_v bigint, |
412 | 412 | dbl_v double, |
413 | + json_v text, | |
413 | 414 | last_update_ts bigint, |
414 | 415 | PRIMARY KEY ((entity_type, entity_id, attribute_type), attribute_key) |
415 | 416 | ) WITH compaction = { 'class' : 'LeveledCompactionStrategy' }; | ... | ... |
... | ... | @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_cf ( |
30 | 30 | str_v text, |
31 | 31 | long_v bigint, |
32 | 32 | dbl_v double, |
33 | + json_v text, | |
33 | 34 | PRIMARY KEY (( entity_type, entity_id, key, partition ), ts) |
34 | 35 | ); |
35 | 36 | |
... | ... | @@ -51,5 +52,6 @@ CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_latest_cf ( |
51 | 52 | str_v text, |
52 | 53 | long_v bigint, |
53 | 54 | dbl_v double, |
55 | + json_v text, | |
54 | 56 | PRIMARY KEY (( entity_type, entity_id ), key) |
55 | 57 | ) WITH compaction = { 'class' : 'LeveledCompactionStrategy' }; | ... | ... |
1 | +-- | |
2 | +-- Copyright © 2016-2020 The Thingsboard Authors | |
3 | +-- | |
4 | +-- Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | +-- you may not use this file except in compliance with the License. | |
6 | +-- You may obtain a copy of the License at | |
7 | +-- | |
8 | +-- http://www.apache.org/licenses/LICENSE-2.0 | |
9 | +-- | |
10 | +-- Unless required by applicable law or agreed to in writing, software | |
11 | +-- distributed under the License is distributed on an "AS IS" BASIS, | |
12 | +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | +-- See the License for the specific language governing permissions and | |
14 | +-- limitations under the License. | |
15 | +-- | |
16 | + | |
17 | + | |
18 | +CREATE TABLE IF NOT EXISTS admin_settings ( | |
19 | + id varchar(31) NOT NULL CONSTRAINT admin_settings_pkey PRIMARY KEY, | |
20 | + json_value varchar, | |
21 | + key varchar(255) | |
22 | +); | |
23 | + | |
24 | +CREATE TABLE IF NOT EXISTS alarm ( | |
25 | + id varchar(31) NOT NULL CONSTRAINT alarm_pkey PRIMARY KEY, | |
26 | + ack_ts bigint, | |
27 | + clear_ts bigint, | |
28 | + additional_info varchar, | |
29 | + end_ts bigint, | |
30 | + originator_id varchar(31), | |
31 | + originator_type integer, | |
32 | + propagate boolean, | |
33 | + severity varchar(255), | |
34 | + start_ts bigint, | |
35 | + status varchar(255), | |
36 | + tenant_id varchar(31), | |
37 | + propagate_relation_types varchar, | |
38 | + type varchar(255) | |
39 | +); | |
40 | + | |
41 | +CREATE TABLE IF NOT EXISTS asset ( | |
42 | + id varchar(31) NOT NULL CONSTRAINT asset_pkey PRIMARY KEY, | |
43 | + additional_info varchar, | |
44 | + customer_id varchar(31), | |
45 | + name varchar(255), | |
46 | + label varchar(255), | |
47 | + search_text varchar(255), | |
48 | + tenant_id varchar(31), | |
49 | + type varchar(255), | |
50 | + CONSTRAINT asset_name_unq_key UNIQUE (tenant_id, name) | |
51 | +); | |
52 | + | |
53 | +CREATE TABLE IF NOT EXISTS audit_log ( | |
54 | + id varchar(31) NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY, | |
55 | + tenant_id varchar(31), | |
56 | + customer_id varchar(31), | |
57 | + entity_id varchar(31), | |
58 | + entity_type varchar(255), | |
59 | + entity_name varchar(255), | |
60 | + user_id varchar(31), | |
61 | + user_name varchar(255), | |
62 | + action_type varchar(255), | |
63 | + action_data varchar(1000000), | |
64 | + action_status varchar(255), | |
65 | + action_failure_details varchar(1000000) | |
66 | +); | |
67 | + | |
68 | +CREATE TABLE IF NOT EXISTS attribute_kv ( | |
69 | + entity_type varchar(255), | |
70 | + entity_id varchar(31), | |
71 | + attribute_type varchar(255), | |
72 | + attribute_key varchar(255), | |
73 | + bool_v boolean, | |
74 | + str_v varchar(10000000), | |
75 | + long_v bigint, | |
76 | + dbl_v double precision, | |
77 | + json_v varchar(10000000), | |
78 | + last_update_ts bigint, | |
79 | + CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_type, entity_id, attribute_type, attribute_key) | |
80 | +); | |
81 | + | |
82 | +CREATE TABLE IF NOT EXISTS component_descriptor ( | |
83 | + id varchar(31) NOT NULL CONSTRAINT component_descriptor_pkey PRIMARY KEY, | |
84 | + actions varchar(255), | |
85 | + clazz varchar UNIQUE, | |
86 | + configuration_descriptor varchar, | |
87 | + name varchar(255), | |
88 | + scope varchar(255), | |
89 | + search_text varchar(255), | |
90 | + type varchar(255) | |
91 | +); | |
92 | + | |
93 | +CREATE TABLE IF NOT EXISTS customer ( | |
94 | + id varchar(31) NOT NULL CONSTRAINT customer_pkey PRIMARY KEY, | |
95 | + additional_info varchar, | |
96 | + address varchar, | |
97 | + address2 varchar, | |
98 | + city varchar(255), | |
99 | + country varchar(255), | |
100 | + email varchar(255), | |
101 | + phone varchar(255), | |
102 | + search_text varchar(255), | |
103 | + state varchar(255), | |
104 | + tenant_id varchar(31), | |
105 | + title varchar(255), | |
106 | + zip varchar(255) | |
107 | +); | |
108 | + | |
109 | +CREATE TABLE IF NOT EXISTS dashboard ( | |
110 | + id varchar(31) NOT NULL CONSTRAINT dashboard_pkey PRIMARY KEY, | |
111 | + configuration varchar(10000000), | |
112 | + assigned_customers varchar(1000000), | |
113 | + search_text varchar(255), | |
114 | + tenant_id varchar(31), | |
115 | + title varchar(255) | |
116 | +); | |
117 | + | |
118 | +CREATE TABLE IF NOT EXISTS device ( | |
119 | + id varchar(31) NOT NULL CONSTRAINT device_pkey PRIMARY KEY, | |
120 | + additional_info varchar, | |
121 | + customer_id varchar(31), | |
122 | + type varchar(255), | |
123 | + name varchar(255), | |
124 | + label varchar(255), | |
125 | + search_text varchar(255), | |
126 | + tenant_id varchar(31), | |
127 | + CONSTRAINT device_name_unq_key UNIQUE (tenant_id, name) | |
128 | +); | |
129 | + | |
130 | +CREATE TABLE IF NOT EXISTS device_credentials ( | |
131 | + id varchar(31) NOT NULL CONSTRAINT device_credentials_pkey PRIMARY KEY, | |
132 | + credentials_id varchar, | |
133 | + credentials_type varchar(255), | |
134 | + credentials_value varchar, | |
135 | + device_id varchar(31), | |
136 | + CONSTRAINT device_credentials_id_unq_key UNIQUE (credentials_id) | |
137 | +); | |
138 | + | |
139 | +CREATE TABLE IF NOT EXISTS event ( | |
140 | + id varchar(31) NOT NULL CONSTRAINT event_pkey PRIMARY KEY, | |
141 | + body varchar(10000000), | |
142 | + entity_id varchar(31), | |
143 | + entity_type varchar(255), | |
144 | + event_type varchar(255), | |
145 | + event_uid varchar(255), | |
146 | + tenant_id varchar(31), | |
147 | + CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid) | |
148 | +); | |
149 | + | |
150 | +CREATE TABLE IF NOT EXISTS relation ( | |
151 | + from_id varchar(31), | |
152 | + from_type varchar(255), | |
153 | + to_id varchar(31), | |
154 | + to_type varchar(255), | |
155 | + relation_type_group varchar(255), | |
156 | + relation_type varchar(255), | |
157 | + additional_info varchar, | |
158 | + CONSTRAINT relation_pkey PRIMARY KEY (from_id, from_type, relation_type_group, relation_type, to_id, to_type) | |
159 | +); | |
160 | + | |
161 | +CREATE TABLE IF NOT EXISTS tb_user ( | |
162 | + id varchar(31) NOT NULL CONSTRAINT tb_user_pkey PRIMARY KEY, | |
163 | + additional_info varchar, | |
164 | + authority varchar(255), | |
165 | + customer_id varchar(31), | |
166 | + email varchar(255) UNIQUE, | |
167 | + first_name varchar(255), | |
168 | + last_name varchar(255), | |
169 | + search_text varchar(255), | |
170 | + tenant_id varchar(31) | |
171 | +); | |
172 | + | |
173 | +CREATE TABLE IF NOT EXISTS tenant ( | |
174 | + id varchar(31) NOT NULL CONSTRAINT tenant_pkey PRIMARY KEY, | |
175 | + additional_info varchar, | |
176 | + address varchar, | |
177 | + address2 varchar, | |
178 | + city varchar(255), | |
179 | + country varchar(255), | |
180 | + email varchar(255), | |
181 | + phone varchar(255), | |
182 | + region varchar(255), | |
183 | + search_text varchar(255), | |
184 | + state varchar(255), | |
185 | + title varchar(255), | |
186 | + zip varchar(255) | |
187 | +); | |
188 | + | |
189 | +CREATE TABLE IF NOT EXISTS user_credentials ( | |
190 | + id varchar(31) NOT NULL CONSTRAINT user_credentials_pkey PRIMARY KEY, | |
191 | + activate_token varchar(255) UNIQUE, | |
192 | + enabled boolean, | |
193 | + password varchar(255), | |
194 | + reset_token varchar(255) UNIQUE, | |
195 | + user_id varchar(31) UNIQUE | |
196 | +); | |
197 | + | |
198 | +CREATE TABLE IF NOT EXISTS widget_type ( | |
199 | + id varchar(31) NOT NULL CONSTRAINT widget_type_pkey PRIMARY KEY, | |
200 | + alias varchar(255), | |
201 | + bundle_alias varchar(255), | |
202 | + descriptor varchar(1000000), | |
203 | + name varchar(255), | |
204 | + tenant_id varchar(31) | |
205 | +); | |
206 | + | |
207 | +CREATE TABLE IF NOT EXISTS widgets_bundle ( | |
208 | + id varchar(31) NOT NULL CONSTRAINT widgets_bundle_pkey PRIMARY KEY, | |
209 | + alias varchar(255), | |
210 | + search_text varchar(255), | |
211 | + tenant_id varchar(31), | |
212 | + title varchar(255) | |
213 | +); | |
214 | + | |
215 | +CREATE TABLE IF NOT EXISTS rule_chain ( | |
216 | + id varchar(31) NOT NULL CONSTRAINT rule_chain_pkey PRIMARY KEY, | |
217 | + additional_info varchar, | |
218 | + configuration varchar(10000000), | |
219 | + name varchar(255), | |
220 | + first_rule_node_id varchar(31), | |
221 | + root boolean, | |
222 | + debug_mode boolean, | |
223 | + search_text varchar(255), | |
224 | + tenant_id varchar(31) | |
225 | +); | |
226 | + | |
227 | +CREATE TABLE IF NOT EXISTS rule_node ( | |
228 | + id varchar(31) NOT NULL CONSTRAINT rule_node_pkey PRIMARY KEY, | |
229 | + rule_chain_id varchar(31), | |
230 | + additional_info varchar, | |
231 | + configuration varchar(10000000), | |
232 | + type varchar(255), | |
233 | + name varchar(255), | |
234 | + debug_mode boolean, | |
235 | + search_text varchar(255) | |
236 | +); | |
237 | + | |
238 | +CREATE TABLE IF NOT EXISTS entity_view ( | |
239 | + id varchar(31) NOT NULL CONSTRAINT entity_view_pkey PRIMARY KEY, | |
240 | + entity_id varchar(31), | |
241 | + entity_type varchar(255), | |
242 | + tenant_id varchar(31), | |
243 | + customer_id varchar(31), | |
244 | + type varchar(255), | |
245 | + name varchar(255), | |
246 | + keys varchar(10000000), | |
247 | + start_ts bigint, | |
248 | + end_ts bigint, | |
249 | + search_text varchar(255), | |
250 | + additional_info varchar | |
251 | +); | ... | ... |
... | ... | @@ -74,6 +74,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv ( |
74 | 74 | str_v varchar(10000000), |
75 | 75 | long_v bigint, |
76 | 76 | dbl_v double precision, |
77 | + json_v json, | |
77 | 78 | last_update_ts bigint, |
78 | 79 | CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_type, entity_id, attribute_type, attribute_key) |
79 | 80 | ); | ... | ... |
... | ... | @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS tenant_ts_kv ( |
25 | 25 | str_v varchar(10000000), |
26 | 26 | long_v bigint, |
27 | 27 | dbl_v double precision, |
28 | + json_v json, | |
28 | 29 | CONSTRAINT tenant_ts_kv_pkey PRIMARY KEY (tenant_id, entity_id, key, ts) |
29 | 30 | ); |
30 | 31 | |
... | ... | @@ -42,5 +43,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( |
42 | 43 | str_v varchar(10000000), |
43 | 44 | long_v bigint, |
44 | 45 | dbl_v double precision, |
46 | + json_v json, | |
45 | 47 | CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) |
46 | 48 | ); |
\ No newline at end of file | ... | ... |
... | ... | @@ -24,6 +24,7 @@ CREATE TABLE IF NOT EXISTS ts_kv ( |
24 | 24 | str_v varchar(10000000), |
25 | 25 | long_v bigint, |
26 | 26 | dbl_v double precision, |
27 | + json_v varchar(10000000), | |
27 | 28 | CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts) |
28 | 29 | ); |
29 | 30 | |
... | ... | @@ -35,6 +36,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( |
35 | 36 | str_v varchar(10000000), |
36 | 37 | long_v bigint, |
37 | 38 | dbl_v double precision, |
39 | + json_v varchar(10000000), | |
38 | 40 | CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) |
39 | 41 | ); |
40 | 42 | ... | ... |
... | ... | @@ -21,7 +21,8 @@ CREATE TABLE IF NOT EXISTS ts_kv ( |
21 | 21 | bool_v boolean, |
22 | 22 | str_v varchar(10000000), |
23 | 23 | long_v bigint, |
24 | - dbl_v double precision | |
24 | + dbl_v double precision, | |
25 | + json_v json | |
25 | 26 | ) PARTITION BY RANGE (ts); |
26 | 27 | |
27 | 28 | CREATE TABLE IF NOT EXISTS ts_kv_latest ( |
... | ... | @@ -32,6 +33,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( |
32 | 33 | str_v varchar(10000000), |
33 | 34 | long_v bigint, |
34 | 35 | dbl_v double precision, |
36 | + json_v json, | |
35 | 37 | CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) |
36 | 38 | ); |
37 | 39 | ... | ... |
... | ... | @@ -30,7 +30,7 @@ public class JpaDaoTestSuite { |
30 | 30 | |
31 | 31 | @ClassRule |
32 | 32 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
33 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/system-data.sql"), | |
33 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), | |
34 | 34 | "sql/drop-all-tables.sql", |
35 | 35 | "sql-test.properties" |
36 | 36 | ); | ... | ... |
... | ... | @@ -30,7 +30,7 @@ public class SqlDaoServiceTestSuite { |
30 | 30 | |
31 | 31 | @ClassRule |
32 | 32 | public static CustomSqlUnit sqlUnit = new CustomSqlUnit( |
33 | - Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), | |
33 | + Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql", "sql/system-test.sql"), | |
34 | 34 | "sql/drop-all-tables.sql", |
35 | 35 | "sql-test.properties" |
36 | 36 | ); | ... | ... |
... | ... | @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; |
21 | 21 | import com.fasterxml.jackson.databind.node.ObjectNode; |
22 | 22 | import com.google.common.util.concurrent.Futures; |
23 | 23 | import com.google.common.util.concurrent.ListenableFuture; |
24 | +import com.google.gson.JsonParseException; | |
24 | 25 | import org.apache.commons.collections.CollectionUtils; |
25 | 26 | import org.apache.commons.lang3.BooleanUtils; |
26 | 27 | import org.thingsboard.rule.engine.api.TbContext; |
... | ... | @@ -33,6 +34,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; |
33 | 34 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
34 | 35 | import org.thingsboard.server.common.msg.TbMsg; |
35 | 36 | |
37 | +import java.io.IOException; | |
36 | 38 | import java.util.ArrayList; |
37 | 39 | import java.util.List; |
38 | 40 | import java.util.concurrent.ConcurrentHashMap; |
... | ... | @@ -77,7 +79,8 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
77 | 79 | } |
78 | 80 | |
79 | 81 | @Override |
80 | - public void destroy() { } | |
82 | + public void destroy() { | |
83 | + } | |
81 | 84 | |
82 | 85 | protected abstract ListenableFuture<T> findEntityIdAsync(TbContext ctx, TbMsg msg); |
83 | 86 | |
... | ... | @@ -168,6 +171,12 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC |
168 | 171 | case DOUBLE: |
169 | 172 | value.put(VALUE, r.getDoubleValue().get()); |
170 | 173 | break; |
174 | + case JSON: | |
175 | + try { | |
176 | + value.set(VALUE, mapper.readTree(r.getJsonValue().get())); | |
177 | + } catch (IOException e) { | |
178 | + throw new JsonParseException("Can't parse jsonValue: " + r.getJsonValue().get(), e); | |
179 | + } | |
171 | 180 | } |
172 | 181 | msg.getMetaData().putValue(r.getKey(), value.toString()); |
173 | 182 | } | ... | ... |
... | ... | @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; |
21 | 21 | import com.fasterxml.jackson.databind.node.ArrayNode; |
22 | 22 | import com.fasterxml.jackson.databind.node.ObjectNode; |
23 | 23 | import com.google.common.util.concurrent.ListenableFuture; |
24 | +import com.google.gson.JsonParseException; | |
24 | 25 | import lombok.Data; |
25 | 26 | import lombok.NoArgsConstructor; |
26 | 27 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -39,6 +40,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; |
39 | 40 | import org.thingsboard.server.common.data.plugin.ComponentType; |
40 | 41 | import org.thingsboard.server.common.msg.TbMsg; |
41 | 42 | |
43 | +import java.io.IOException; | |
42 | 44 | import java.util.List; |
43 | 45 | import java.util.concurrent.ExecutionException; |
44 | 46 | import java.util.concurrent.TimeUnit; |
... | ... | @@ -180,6 +182,13 @@ public class TbGetTelemetryNode implements TbNode { |
180 | 182 | case DOUBLE: |
181 | 183 | obj.put("value", entry.getDoubleValue().get()); |
182 | 184 | break; |
185 | + case JSON: | |
186 | + try { | |
187 | + obj.set("value", mapper.readTree(entry.getJsonValue().get())); | |
188 | + } catch (IOException e) { | |
189 | + throw new JsonParseException("Can't parse jsonValue: " + entry.getJsonValue().get(), e); | |
190 | + } | |
191 | + break; | |
183 | 192 | } |
184 | 193 | return obj; |
185 | 194 | } | ... | ... |