Commit cc0e5417887266847e4e5ac2cf9b41a8023bdba5

Authored by ShvaykaD
Committed by GitHub
1 parent 0dd313dd

fixed tell-failure for attributes fetch node (#2365)

* fixed tell-failure for attributes fetch node
@@ -27,6 +27,7 @@ public class DataConstants { @@ -27,6 +27,7 @@ public class DataConstants {
27 public static final String CLIENT_SCOPE = "CLIENT_SCOPE"; 27 public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
28 public static final String SERVER_SCOPE = "SERVER_SCOPE"; 28 public static final String SERVER_SCOPE = "SERVER_SCOPE";
29 public static final String SHARED_SCOPE = "SHARED_SCOPE"; 29 public static final String SHARED_SCOPE = "SHARED_SCOPE";
  30 + public static final String LATEST_TS = "LATEST_TS";
30 31
31 public static final String[] allScopes() { 32 public static final String[] allScopes() {
32 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE}; 33 return new String[]{CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
@@ -29,16 +29,20 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -29,16 +29,20 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
29 import org.thingsboard.rule.engine.api.TbNodeException; 29 import org.thingsboard.rule.engine.api.TbNodeException;
30 import org.thingsboard.server.common.data.id.EntityId; 30 import org.thingsboard.server.common.data.id.EntityId;
31 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  32 +import org.thingsboard.server.common.data.kv.KvEntry;
32 import org.thingsboard.server.common.data.kv.TsKvEntry; 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
33 import org.thingsboard.server.common.msg.TbMsg; 34 import org.thingsboard.server.common.msg.TbMsg;
34 35
  36 +import java.util.ArrayList;
35 import java.util.List; 37 import java.util.List;
36 - 38 +import java.util.concurrent.ConcurrentHashMap;
  39 +import java.util.stream.Collectors;
37 40
38 import static org.thingsboard.common.util.DonAsynchron.withCallback; 41 import static org.thingsboard.common.util.DonAsynchron.withCallback;
39 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; 42 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
40 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; 43 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
41 import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; 44 import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
  45 +import static org.thingsboard.server.common.data.DataConstants.LATEST_TS;
42 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; 46 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
43 import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; 47 import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
44 48
@@ -82,40 +86,43 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -82,40 +86,43 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
82 ctx.tellNext(msg, FAILURE); 86 ctx.tellNext(msg, FAILURE);
83 return; 87 return;
84 } 88 }
  89 + ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
85 ListenableFuture<List<Void>> allFutures = Futures.allAsList( 90 ListenableFuture<List<Void>> allFutures = Futures.allAsList(
86 - putLatestTelemetry(ctx, entityId, msg, config.getLatestTsKeyNames()),  
87 - putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),  
88 - putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),  
89 - putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_") 91 + putLatestTelemetry(ctx, entityId, msg, LATEST_TS, config.getLatestTsKeyNames(), failuresMap),
  92 + putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, config.getClientAttributeNames(), failuresMap, "cs_"),
  93 + putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), failuresMap, "shared_"),
  94 + putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), failuresMap, "ss_")
90 ); 95 );
91 - withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); 96 + withCallback(allFutures, i -> {
  97 + if (!failuresMap.isEmpty()) {
  98 + throw reportFailures(failuresMap);
  99 + }
  100 + ctx.tellNext(msg, SUCCESS);
  101 + }, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
92 } 102 }
93 103
94 - private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, String prefix) { 104 + private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap, String prefix) {
95 if (CollectionUtils.isEmpty(keys)) { 105 if (CollectionUtils.isEmpty(keys)) {
96 return Futures.immediateFuture(null); 106 return Futures.immediateFuture(null);
97 } 107 }
98 - ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(ctx.getTenantId(), entityId, scope, keys);  
99 - return Futures.transform(latest, l -> {  
100 - l.forEach(r -> { 108 + ListenableFuture<List<AttributeKvEntry>> attributeKvEntryListFuture = ctx.getAttributesService().find(ctx.getTenantId(), entityId, scope, keys);
  109 + return Futures.transform(attributeKvEntryListFuture, attributeKvEntryList -> {
  110 + if (!CollectionUtils.isEmpty(attributeKvEntryList)) {
  111 + List<AttributeKvEntry> existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList());
  112 + existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()));
  113 + if (existingAttributesKvEntry.size() != keys.size() && BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
  114 + getNotExistingKeys(existingAttributesKvEntry, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key));
  115 + }
  116 + } else {
101 if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) { 117 if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
102 - if (r.getValue() != null) {  
103 - msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());  
104 - } else {  
105 - throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");  
106 - }  
107 - } else {  
108 - if (r.getValue() != null) {  
109 - msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());  
110 - } 118 + keys.forEach(key -> computeFailuresMap(scope, failuresMap, key));
111 } 119 }
112 -  
113 - }); 120 + }
114 return null; 121 return null;
115 }); 122 });
116 } 123 }
117 124
118 - private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, List<String> keys) { 125 + private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
119 if (CollectionUtils.isEmpty(keys)) { 126 if (CollectionUtils.isEmpty(keys)) {
120 return Futures.immediateFuture(null); 127 return Futures.immediateFuture(null);
121 } 128 }
@@ -125,7 +132,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -125,7 +132,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
125 boolean getLatestValueWithTs = BooleanUtils.toBooleanDefaultIfNull(this.config.isGetLatestValueWithTs(), false); 132 boolean getLatestValueWithTs = BooleanUtils.toBooleanDefaultIfNull(this.config.isGetLatestValueWithTs(), false);
126 if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) { 133 if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
127 if (r.getValue() == null) { 134 if (r.getValue() == null) {
128 - throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!"); 135 + computeFailuresMap(scope, failuresMap, r.getKey());
129 } else if (getLatestValueWithTs) { 136 } else if (getLatestValueWithTs) {
130 putValueWithTs(msg, r); 137 putValueWithTs(msg, r);
131 } else { 138 } else {
@@ -164,4 +171,32 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -164,4 +171,32 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
164 } 171 }
165 msg.getMetaData().putValue(r.getKey(), value.toString()); 172 msg.getMetaData().putValue(r.getKey(), value.toString());
166 } 173 }
  174 +
  175 + private List<String> getNotExistingKeys(List<AttributeKvEntry> existingAttributesKvEntry, List<String> allKeys) {
  176 + List<String> existingKeys = existingAttributesKvEntry.stream().map(KvEntry::getKey).collect(Collectors.toList());
  177 + return allKeys.stream().filter(key -> !existingKeys.contains(key)).collect(Collectors.toList());
  178 + }
  179 +
  180 + private void computeFailuresMap(String scope, ConcurrentHashMap<String, List<String>> failuresMap, String key) {
  181 + List<String> failures = failuresMap.computeIfAbsent(scope, k -> new ArrayList<>());
  182 + failures.add(key);
  183 + }
  184 +
  185 + private RuntimeException reportFailures(ConcurrentHashMap<String, List<String>> failuresMap) {
  186 + StringBuilder errorMessage = new StringBuilder("The following attribute/telemetry keys is not present in the DB: ").append("\n");
  187 + if (failuresMap.containsKey(CLIENT_SCOPE)) {
  188 + errorMessage.append("\t").append("[" + CLIENT_SCOPE + "]:").append(failuresMap.get(CLIENT_SCOPE).toString()).append("\n");
  189 + }
  190 + if (failuresMap.containsKey(SERVER_SCOPE)) {
  191 + errorMessage.append("\t").append("[" + SERVER_SCOPE + "]:").append(failuresMap.get(SERVER_SCOPE).toString()).append("\n");
  192 + }
  193 + if (failuresMap.containsKey(SHARED_SCOPE)) {
  194 + errorMessage.append("\t").append("[" + SHARED_SCOPE + "]:").append(failuresMap.get(SHARED_SCOPE).toString()).append("\n");
  195 + }
  196 + if (failuresMap.containsKey(LATEST_TS)) {
  197 + errorMessage.append("\t").append("[" + LATEST_TS + "]:").append(failuresMap.get(LATEST_TS).toString()).append("\n");
  198 + }
  199 + failuresMap.clear();
  200 + return new RuntimeException(errorMessage.toString());
  201 + }
167 } 202 }