Showing
1 changed file
with
17 additions
and
14 deletions
1 | /** | 1 | /** |
2 | * Copyright © 2016-2018 The Thingsboard Authors | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | 3 | + * <p> |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. | 5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at | 6 | * You may obtain a copy of the License at |
7 | - * | ||
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | - * | 7 | + * <p> |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * <p> | ||
10 | * Unless required by applicable law or agreed to in writing, software | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
@@ -26,10 +26,8 @@ import org.thingsboard.rule.engine.api.*; | @@ -26,10 +26,8 @@ import org.thingsboard.rule.engine.api.*; | ||
26 | import org.thingsboard.rule.engine.api.util.DonAsynchron; | 26 | import org.thingsboard.rule.engine.api.util.DonAsynchron; |
27 | import org.thingsboard.rule.engine.api.util.TbNodeUtils; | 27 | import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
28 | import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; | 28 | import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
29 | -import org.thingsboard.server.common.data.kv.BaseTsKvQuery; | ||
30 | import org.thingsboard.server.common.data.kv.ReadTsKvQuery; | 29 | import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
31 | import org.thingsboard.server.common.data.kv.TsKvEntry; | 30 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
32 | -import org.thingsboard.server.common.data.kv.TsKvQuery; | ||
33 | import org.thingsboard.server.common.data.plugin.ComponentType; | 31 | import org.thingsboard.server.common.data.plugin.ComponentType; |
34 | import org.thingsboard.server.common.msg.TbMsg; | 32 | import org.thingsboard.server.common.msg.TbMsg; |
35 | 33 | ||
@@ -39,8 +37,7 @@ import java.util.concurrent.TimeUnit; | @@ -39,8 +37,7 @@ import java.util.concurrent.TimeUnit; | ||
39 | import java.util.stream.Collectors; | 37 | import java.util.stream.Collectors; |
40 | 38 | ||
41 | import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; | 39 | import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; |
42 | -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL; | ||
43 | -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; | 40 | +import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.*; |
44 | import static org.thingsboard.server.common.data.kv.Aggregation.NONE; | 41 | import static org.thingsboard.server.common.data.kv.Aggregation.NONE; |
45 | 42 | ||
46 | /** | 43 | /** |
@@ -64,6 +61,7 @@ public class TbGetTelemetryNode implements TbNode { | @@ -64,6 +61,7 @@ public class TbGetTelemetryNode implements TbNode { | ||
64 | private long endTsOffset; | 61 | private long endTsOffset; |
65 | private int limit; | 62 | private int limit; |
66 | private ObjectMapper mapper; | 63 | private ObjectMapper mapper; |
64 | + private String fetchMode; | ||
67 | 65 | ||
68 | @Override | 66 | @Override |
69 | public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { | 67 | public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
@@ -72,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode { | @@ -72,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode { | ||
72 | startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()); | 70 | startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()); |
73 | endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()); | 71 | endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()); |
74 | limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1; | 72 | limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1; |
73 | + fetchMode = config.getFetchMode(); | ||
75 | mapper = new ObjectMapper(); | 74 | mapper = new ObjectMapper(); |
76 | mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false); | 75 | mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false); |
77 | mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); | 76 | mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); |
@@ -96,14 +95,18 @@ public class TbGetTelemetryNode implements TbNode { | @@ -96,14 +95,18 @@ public class TbGetTelemetryNode implements TbNode { | ||
96 | } | 95 | } |
97 | } | 96 | } |
98 | 97 | ||
99 | - //TODO: handle direction; | ||
100 | private List<ReadTsKvQuery> buildQueries() { | 98 | private List<ReadTsKvQuery> buildQueries() { |
101 | long ts = System.currentTimeMillis(); | 99 | long ts = System.currentTimeMillis(); |
102 | long startTs = ts - startTsOffset; | 100 | long startTs = ts - startTsOffset; |
103 | long endTs = ts - endTsOffset; | 101 | long endTs = ts - endTsOffset; |
104 | - | 102 | + String orderBy; |
103 | + if (fetchMode.equals(FETCH_MODE_FIRST) || fetchMode.equals(FETCH_MODE_ALL)) { | ||
104 | + orderBy = "ASC"; | ||
105 | + } else { | ||
106 | + orderBy = "DESC"; | ||
107 | + } | ||
105 | return tsKeyNames.stream() | 108 | return tsKeyNames.stream() |
106 | - .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE)) | 109 | + .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE, orderBy)) |
107 | .collect(Collectors.toList()); | 110 | .collect(Collectors.toList()); |
108 | } | 111 | } |
109 | 112 | ||
@@ -116,7 +119,7 @@ public class TbGetTelemetryNode implements TbNode { | @@ -116,7 +119,7 @@ public class TbGetTelemetryNode implements TbNode { | ||
116 | } | 119 | } |
117 | 120 | ||
118 | for (String key : tsKeyNames) { | 121 | for (String key : tsKeyNames) { |
119 | - if(resultNode.has(key)){ | 122 | + if (resultNode.has(key)) { |
120 | msg.getMetaData().putValue(key, resultNode.get(key).toString()); | 123 | msg.getMetaData().putValue(key, resultNode.get(key).toString()); |
121 | } | 124 | } |
122 | } | 125 | } |
@@ -127,11 +130,11 @@ public class TbGetTelemetryNode implements TbNode { | @@ -127,11 +130,11 @@ public class TbGetTelemetryNode implements TbNode { | ||
127 | } | 130 | } |
128 | 131 | ||
129 | private void processArray(ObjectNode node, TsKvEntry entry) { | 132 | private void processArray(ObjectNode node, TsKvEntry entry) { |
130 | - if(node.has(entry.getKey())){ | 133 | + if (node.has(entry.getKey())) { |
131 | ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey()); | 134 | ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey()); |
132 | ObjectNode obj = buildNode(entry); | 135 | ObjectNode obj = buildNode(entry); |
133 | arrayNode.add(obj); | 136 | arrayNode.add(obj); |
134 | - }else { | 137 | + } else { |
135 | ArrayNode arrayNode = mapper.createArrayNode(); | 138 | ArrayNode arrayNode = mapper.createArrayNode(); |
136 | ObjectNode obj = buildNode(entry); | 139 | ObjectNode obj = buildNode(entry); |
137 | arrayNode.add(obj); | 140 | arrayNode.add(obj); |