Commit 86779276cc4f047a8cadd11b31b0e1233cc87131

Authored by Valerii Sosliuk
Committed by Andrew Shvayka
1 parent e318b193

Use metadata keys in originator attribute/telemetry nodes

@@ -17,11 +17,15 @@ package org.thingsboard.rule.engine.api.util; @@ -17,11 +17,15 @@ package org.thingsboard.rule.engine.api.util;
17 17
18 import com.fasterxml.jackson.core.JsonProcessingException; 18 import com.fasterxml.jackson.core.JsonProcessingException;
19 import com.fasterxml.jackson.databind.ObjectMapper; 19 import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import org.springframework.util.CollectionUtils;
20 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 21 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
21 import org.thingsboard.rule.engine.api.TbNodeException; 22 import org.thingsboard.rule.engine.api.TbNodeException;
22 import org.thingsboard.server.common.msg.TbMsgMetaData; 23 import org.thingsboard.server.common.msg.TbMsgMetaData;
23 24
  25 +import java.util.Collections;
  26 +import java.util.List;
24 import java.util.Map; 27 import java.util.Map;
  28 +import java.util.stream.Collectors;
25 29
26 /** 30 /**
27 * Created by ashvayka on 19.01.18. 31 * Created by ashvayka on 19.01.18.
@@ -41,6 +45,13 @@ public class TbNodeUtils { @@ -41,6 +45,13 @@ public class TbNodeUtils {
41 } 45 }
42 } 46 }
43 47
  48 + public static List<String> processPatterns(List<String> patterns, TbMsgMetaData metaData) {
  49 + if (!CollectionUtils.isEmpty(patterns)) {
  50 + return patterns.stream().map(p -> processPattern(p, metaData)).collect(Collectors.toList());
  51 + }
  52 + return Collections.emptyList();
  53 + }
  54 +
44 public static String processPattern(String pattern, TbMsgMetaData metaData) { 55 public static String processPattern(String pattern, TbMsgMetaData metaData) {
45 String result = new String(pattern); 56 String result = new String(pattern);
46 for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) { 57 for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) {
@@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext;
29 import org.thingsboard.rule.engine.api.TbNode; 29 import org.thingsboard.rule.engine.api.TbNode;
30 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 30 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
31 import org.thingsboard.rule.engine.api.TbNodeException; 31 import org.thingsboard.rule.engine.api.TbNodeException;
  32 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
32 import org.thingsboard.server.common.data.id.EntityId; 33 import org.thingsboard.server.common.data.id.EntityId;
33 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 34 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
34 import org.thingsboard.server.common.data.kv.KvEntry; 35 import org.thingsboard.server.common.data.kv.KvEntry;
@@ -91,10 +92,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -91,10 +92,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
91 } 92 }
92 ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>(); 93 ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
93 ListenableFuture<List<Void>> allFutures = Futures.allAsList( 94 ListenableFuture<List<Void>> allFutures = Futures.allAsList(
94 - putLatestTelemetry(ctx, entityId, msg, LATEST_TS, config.getLatestTsKeyNames(), failuresMap),  
95 - putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, config.getClientAttributeNames(), failuresMap, "cs_"),  
96 - putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), failuresMap, "shared_"),  
97 - putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), failuresMap, "ss_") 95 + putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg.getMetaData()), failuresMap),
  96 + putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg.getMetaData()), failuresMap, "cs_"),
  97 + putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg.getMetaData()), failuresMap, "shared_"),
  98 + putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg.getMetaData()), failuresMap, "ss_")
98 ); 99 );
99 withCallback(allFutures, i -> { 100 withCallback(allFutures, i -> {
100 if (!failuresMap.isEmpty()) { 101 if (!failuresMap.isEmpty()) {
@@ -103,9 +103,10 @@ public class TbGetTelemetryNode implements TbNode { @@ -103,9 +103,10 @@ public class TbGetTelemetryNode implements TbNode {
103 if (config.isUseMetadataIntervalPatterns()) { 103 if (config.isUseMetadataIntervalPatterns()) {
104 checkMetadataKeyPatterns(msg); 104 checkMetadataKeyPatterns(msg);
105 } 105 }
106 - ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg)); 106 + List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg.getMetaData());
  107 + ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys));
107 DonAsynchron.withCallback(list, data -> { 108 DonAsynchron.withCallback(list, data -> {
108 - process(data, msg); 109 + process(data, msg, keys);
109 ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData())); 110 ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
110 }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); 111 }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
111 } catch (Exception e) { 112 } catch (Exception e) {
@@ -118,8 +119,8 @@ public class TbGetTelemetryNode implements TbNode { @@ -118,8 +119,8 @@ public class TbGetTelemetryNode implements TbNode {
118 public void destroy() { 119 public void destroy() {
119 } 120 }
120 121
121 - private List<ReadTsKvQuery> buildQueries(TbMsg msg) {  
122 - return tsKeyNames.stream() 122 + private List<ReadTsKvQuery> buildQueries(TbMsg msg, List<String> keys) {
  123 + return keys.stream()
123 .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) 124 .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy()))
124 .collect(Collectors.toList()); 125 .collect(Collectors.toList());
125 } 126 }
@@ -135,7 +136,7 @@ public class TbGetTelemetryNode implements TbNode { @@ -135,7 +136,7 @@ public class TbGetTelemetryNode implements TbNode {
135 } 136 }
136 } 137 }
137 138
138 - private void process(List<TsKvEntry> entries, TbMsg msg) { 139 + private void process(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
139 ObjectNode resultNode = mapper.createObjectNode(); 140 ObjectNode resultNode = mapper.createObjectNode();
140 if (FETCH_MODE_ALL.equals(fetchMode)) { 141 if (FETCH_MODE_ALL.equals(fetchMode)) {
141 entries.forEach(entry -> processArray(resultNode, entry)); 142 entries.forEach(entry -> processArray(resultNode, entry));
@@ -143,7 +144,7 @@ public class TbGetTelemetryNode implements TbNode { @@ -143,7 +144,7 @@ public class TbGetTelemetryNode implements TbNode {
143 entries.forEach(entry -> processSingle(resultNode, entry)); 144 entries.forEach(entry -> processSingle(resultNode, entry));
144 } 145 }
145 146
146 - for (String key : tsKeyNames) { 147 + for (String key : keys) {
147 if (resultNode.has(key)) { 148 if (resultNode.has(key)) {
148 msg.getMetaData().putValue(key, resultNode.get(key).toString()); 149 msg.getMetaData().putValue(key, resultNode.get(key).toString());
149 } 150 }