Commit 2ba9b463feeb77bb4ba611c50051fa65d9bf2bfc

Authored by Andrew Shvayka
Committed by GitHub
2 parents fe511f08 3820d2bb

Merge pull request #5358 from smatvienko-tb/TbGetTelemetryNode_Aggregation_feature

Tb get telemetry node aggregation feature
@@ -15,7 +15,6 @@ @@ -15,7 +15,6 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.metadata; 16 package org.thingsboard.rule.engine.metadata;
17 17
18 -import com.fasterxml.jackson.core.JsonGenerator;  
19 import com.fasterxml.jackson.core.JsonParser; 18 import com.fasterxml.jackson.core.JsonParser;
20 import com.fasterxml.jackson.core.json.JsonWriteFeature; 19 import com.fasterxml.jackson.core.json.JsonWriteFeature;
21 import com.fasterxml.jackson.databind.ObjectMapper; 20 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +34,7 @@ import org.thingsboard.rule.engine.api.TbNode; @@ -35,6 +34,7 @@ import org.thingsboard.rule.engine.api.TbNode;
35 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 34 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
36 import org.thingsboard.rule.engine.api.TbNodeException; 35 import org.thingsboard.rule.engine.api.TbNodeException;
37 import org.thingsboard.rule.engine.api.util.TbNodeUtils; 36 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  37 +import org.thingsboard.server.common.data.kv.Aggregation;
38 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; 38 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
39 import org.thingsboard.server.common.data.kv.ReadTsKvQuery; 39 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
40 import org.thingsboard.server.common.data.kv.TsKvEntry; 40 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -50,7 +50,6 @@ import java.util.stream.Collectors; @@ -50,7 +50,6 @@ import java.util.stream.Collectors;
50 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL; 50 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL;
51 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST; 51 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_FIRST;
52 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; 52 import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE;
53 -import static org.thingsboard.server.common.data.kv.Aggregation.NONE;  
54 53
55 /** 54 /**
56 * Created by mshvayka on 04.09.18. 55 * Created by mshvayka on 04.09.18.
@@ -64,6 +63,7 @@ import static org.thingsboard.server.common.data.kv.Aggregation.NONE; @@ -64,6 +63,7 @@ import static org.thingsboard.server.common.data.kv.Aggregation.NONE;
64 "If selected fetch mode <b>ALL</b> Telemetry will be added like array into Message Metadata where <b>key</b> is Timestamp and <b>value</b> is value of Telemetry.</br>" + 63 "If selected fetch mode <b>ALL</b> Telemetry will be added like array into Message Metadata where <b>key</b> is Timestamp and <b>value</b> is value of Telemetry.</br>" +
65 "If selected fetch mode <b>FIRST</b> or <b>LAST</b> Telemetry will be added like string without Timestamp.</br>" + 64 "If selected fetch mode <b>FIRST</b> or <b>LAST</b> Telemetry will be added like string without Timestamp.</br>" +
66 "Also, the rule node allows you to select telemetry sampling order: <b>ASC</b> or <b>DESC</b>. </br>" + 65 "Also, the rule node allows you to select telemetry sampling order: <b>ASC</b> or <b>DESC</b>. </br>" +
  66 + "Aggregation feature allows you to fetch aggregated telemetry as a single value by <b>AVG, COUNT, SUM, MIN, MAX, NONE</b>. </br>" +
67 "<b>Note</b>: The maximum size of the fetched array is 1000 records.\n ", 67 "<b>Note</b>: The maximum size of the fetched array is 1000 records.\n ",
68 uiResources = {"static/rulenode/rulenode-core-config.js"}, 68 uiResources = {"static/rulenode/rulenode-core-config.js"},
69 configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase") 69 configDirective = "tbEnrichmentNodeGetTelemetryFromDatabase")
@@ -78,6 +78,7 @@ public class TbGetTelemetryNode implements TbNode { @@ -78,6 +78,7 @@ public class TbGetTelemetryNode implements TbNode {
78 private ObjectMapper mapper; 78 private ObjectMapper mapper;
79 private String fetchMode; 79 private String fetchMode;
80 private String orderByFetchAll; 80 private String orderByFetchAll;
  81 + private Aggregation aggregation;
81 82
82 @Override 83 @Override
83 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { 84 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@@ -89,11 +90,20 @@ public class TbGetTelemetryNode implements TbNode { @@ -89,11 +90,20 @@ public class TbGetTelemetryNode implements TbNode {
89 if (StringUtils.isEmpty(orderByFetchAll)) { 90 if (StringUtils.isEmpty(orderByFetchAll)) {
90 orderByFetchAll = ASC_ORDER; 91 orderByFetchAll = ASC_ORDER;
91 } 92 }
  93 + aggregation = parseAggregationConfig(config.getAggregation());
  94 +
92 mapper = new ObjectMapper(); 95 mapper = new ObjectMapper();
93 mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false); 96 mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false);
94 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 97 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
95 } 98 }
96 99
  100 + Aggregation parseAggregationConfig(String aggName) {
  101 + if (StringUtils.isEmpty(aggName)) {
  102 + return Aggregation.NONE;
  103 + }
  104 + return Aggregation.valueOf(aggName);
  105 + }
  106 +
97 @Override 107 @Override
98 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
99 if (tsKeyNames.isEmpty()) { 109 if (tsKeyNames.isEmpty()) {
@@ -120,8 +130,14 @@ public class TbGetTelemetryNode implements TbNode { @@ -120,8 +130,14 @@ public class TbGetTelemetryNode implements TbNode {
120 } 130 }
121 131
122 private List<ReadTsKvQuery> buildQueries(TbMsg msg, List<String> keys) { 132 private List<ReadTsKvQuery> buildQueries(TbMsg msg, List<String> keys) {
  133 + final Interval interval = getInterval(msg);
  134 + final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 :
  135 + // exact how it validates on BaseTimeseriesService.validate()
  136 + // see CassandraBaseTimeseriesDao.findAllAsync()
  137 + interval.getEndTs() - interval.getStartTs();
  138 +
123 return keys.stream() 139 return keys.stream()
124 - .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) 140 + .map(key -> new BaseReadTsKvQuery(key, interval.getStartTs(), interval.getEndTs(), aggIntervalStep, limit, aggregation, getOrderBy()))
125 .collect(Collectors.toList()); 141 .collect(Collectors.toList());
126 } 142 }
127 143
@@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 import org.thingsboard.rule.engine.api.NodeConfiguration; 19 import org.thingsboard.rule.engine.api.NodeConfiguration;
  20 +import org.thingsboard.server.common.data.kv.Aggregation;
20 21
21 import java.util.Collections; 22 import java.util.Collections;
22 import java.util.List; 23 import java.util.List;
@@ -46,6 +47,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT @@ -46,6 +47,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
46 private String endIntervalTimeUnit; 47 private String endIntervalTimeUnit;
47 private String fetchMode; //FIRST, LAST, ALL 48 private String fetchMode; //FIRST, LAST, ALL
48 private String orderBy; //ASC, DESC 49 private String orderBy; //ASC, DESC
  50 + private String aggregation; //MIN, MAX, AVG, SUM, COUNT, NONE;
49 private int limit; 51 private int limit;
50 52
51 private List<String> latestTsKeyNames; 53 private List<String> latestTsKeyNames;
@@ -63,6 +65,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT @@ -63,6 +65,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
63 configuration.setStartIntervalPattern(""); 65 configuration.setStartIntervalPattern("");
64 configuration.setEndIntervalPattern(""); 66 configuration.setEndIntervalPattern("");
65 configuration.setOrderBy("ASC"); 67 configuration.setOrderBy("ASC");
  68 + configuration.setAggregation(Aggregation.NONE.name());
66 configuration.setLimit(MAX_FETCH_SIZE); 69 configuration.setLimit(MAX_FETCH_SIZE);
67 return configuration; 70 return configuration;
68 } 71 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import org.junit.Before;
  19 +import org.junit.Test;
  20 +import org.thingsboard.server.common.data.kv.Aggregation;
  21 +
  22 +import static org.hamcrest.CoreMatchers.is;
  23 +import static org.hamcrest.MatcherAssert.assertThat;
  24 +import static org.mockito.ArgumentMatchers.any;
  25 +import static org.mockito.BDDMockito.willCallRealMethod;
  26 +import static org.mockito.Mockito.mock;
  27 +
  28 +public class TbGetTelemetryNodeTest {
  29 +
  30 + TbGetTelemetryNode node;
  31 +
  32 + @Before
  33 + public void setUp() throws Exception {
  34 + node = mock(TbGetTelemetryNode.class);
  35 + willCallRealMethod().given(node).parseAggregationConfig(any());
  36 + }
  37 +
  38 + @Test
  39 + public void givenAggregationAsString_whenParseAggregation_thenReturnEnum() {
  40 + //compatibility with old configs without "aggregation" parameter
  41 + assertThat(node.parseAggregationConfig(null), is(Aggregation.NONE));
  42 + assertThat(node.parseAggregationConfig(""), is(Aggregation.NONE));
  43 +
  44 + //common values
  45 + assertThat(node.parseAggregationConfig("MIN"), is(Aggregation.MIN));
  46 + assertThat(node.parseAggregationConfig("MAX"), is(Aggregation.MAX));
  47 + assertThat(node.parseAggregationConfig("AVG"), is(Aggregation.AVG));
  48 + assertThat(node.parseAggregationConfig("SUM"), is(Aggregation.SUM));
  49 + assertThat(node.parseAggregationConfig("COUNT"), is(Aggregation.COUNT));
  50 + assertThat(node.parseAggregationConfig("NONE"), is(Aggregation.NONE));
  51 +
  52 + //all possible values in future
  53 + for (Aggregation aggEnum : Aggregation.values()) {
  54 + assertThat(node.parseAggregationConfig(aggEnum.name()), is(aggEnum));
  55 + }
  56 + }
  57 +
  58 + @Test(expected = IllegalArgumentException.class)
  59 + public void givenAggregationWhiteSpace_whenParseAggregation_thenException() {
  60 + node.parseAggregationConfig(" ");
  61 + }
  62 +
  63 + @Test(expected = IllegalArgumentException.class)
  64 + public void givenAggregationIncorrect_whenParseAggregation_thenException() {
  65 + node.parseAggregationConfig("TOP");
  66 + }
  67 +
  68 +}