Commit 4ac26bc66428380d5b19e2e1d67b944ea1dc7afb

Authored by Mitia Shvayka
1 parent 291634c1

TbGetTelemetryCertainTimeRangeNode

  1 +package org.thingsboard.rule.engine.metadata;
  2 +
  3 +import com.fasterxml.jackson.databind.JsonNode;
  4 +import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import com.fasterxml.jackson.databind.node.ArrayNode;
  6 +import com.fasterxml.jackson.databind.node.ObjectNode;
  7 +import com.google.common.util.concurrent.ListenableFuture;
  8 +import lombok.extern.slf4j.Slf4j;
  9 +import org.thingsboard.rule.engine.api.*;
  10 +import org.thingsboard.rule.engine.api.util.DonAsynchron;
  11 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  12 +import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  13 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  14 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  15 +import org.thingsboard.server.common.data.plugin.ComponentType;
  16 +import org.thingsboard.server.common.msg.TbMsg;
  17 +
  18 +import java.util.ArrayList;
  19 +import java.util.List;
  20 +import java.util.concurrent.ExecutionException;
  21 +import java.util.concurrent.TimeUnit;
  22 +
  23 +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
  24 +import static org.thingsboard.server.common.data.kv.Aggregation.NONE;
  25 +
  26 +/**
  27 + * Created by mshvayka on 04.09.18.
  28 + */
  29 +@Slf4j
  30 +@RuleNode(type = ComponentType.ENRICHMENT,
  31 + name = "huy",
  32 + configClazz = TbGetTelemetryCertainTimeRangeNodeConfiguration.class,
  33 + nodeDescription = "",
  34 + nodeDetails = "",
  35 + uiResources = "", //{"static/rulenode/rulenode-core-config.js"},
  36 + configDirective = "")
  37 +public class TbGetTelemetryCertainTimeRangeNode implements TbNode {
  38 +
  39 + private TbGetTelemetryCertainTimeRangeNodeConfiguration config;
  40 + private List<String> tsKeyNames;
  41 + private long startTsOffset;
  42 + private long endTsOffset;
  43 + private int limit;
  44 + private ObjectMapper mapper;
  45 +
  46 + @Override
  47 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  48 + this.config = TbNodeUtils.convert(configuration, TbGetTelemetryCertainTimeRangeNodeConfiguration.class);
  49 + tsKeyNames = config.getLatestTsKeyNames();
  50 + startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval());
  51 + endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval());
  52 + limit = config.getFetchMode().equals(TbGetTelemetryCertainTimeRangeNodeConfiguration.FETCH_MODE_ALL)
  53 + ? TbGetTelemetryCertainTimeRangeNodeConfiguration.MAX_FETCH_SIZE : 1;
  54 + mapper = new ObjectMapper();
  55 + }
  56 +
  57 + @Override
  58 + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
  59 + ObjectNode resultNode = mapper.createObjectNode();
  60 + List<TsKvQuery> queries = new ArrayList<>();
  61 + long ts = System.currentTimeMillis();
  62 + long startTs = ts - startTsOffset;
  63 + long endTs = ts - endTsOffset;
  64 + if (tsKeyNames.isEmpty()) {
  65 + ctx.tellFailure(msg, new Exception("Telemetry not found"));
  66 + } else {
  67 + for (String key : tsKeyNames) {
  68 + //TODO: handle direction;
  69 + queries.add(new BaseTsKvQuery(key, startTs, endTs, 1, limit, NONE));
  70 + if (limit == TbGetTelemetryCertainTimeRangeNodeConfiguration.MAX_FETCH_SIZE) {
  71 + resultNode.set(key, mapper.createArrayNode());
  72 + } else {
  73 + resultNode.putObject(key);
  74 + }
  75 + }
  76 + try {
  77 + ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(msg.getOriginator(), queries);
  78 + DonAsynchron.withCallback(list, data -> {
  79 + for (TsKvEntry tsKvEntry : data) {
  80 + JsonNode node = resultNode.get(tsKvEntry.getKey());
  81 + if (node.isArray()) {
  82 + ArrayNode arrayNode = (ArrayNode) node;
  83 + arrayNode.add(mapper.createObjectNode().put(String.valueOf(tsKvEntry.getTs()), tsKvEntry.getValueAsString()));
  84 + } else {
  85 + ObjectNode object = mapper.createObjectNode().put(String.valueOf(tsKvEntry.getTs()), tsKvEntry.getValueAsString());
  86 + resultNode.set(tsKvEntry.getKey(), object);
  87 + }
  88 + }
  89 + for (String key : tsKeyNames) {
  90 + msg.getMetaData().putValue(key, resultNode.get(key).toString());
  91 + }
  92 + TbMsg newMsg = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
  93 + ctx.tellNext(newMsg, SUCCESS);
  94 + }, error -> ctx.tellFailure(msg, error));
  95 + } catch (Exception e) {
  96 + ctx.tellFailure(msg, e);
  97 + }
  98 + }
  99 + }
  100 +
  101 + @Override
  102 + public void destroy() {
  103 +
  104 + }
  105 +}
  1 +package org.thingsboard.rule.engine.metadata;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  5 +
  6 +import java.util.Collections;
  7 +import java.util.List;
  8 +import java.util.concurrent.TimeUnit;
  9 +
  10 +/**
  11 + * Created by mshvayka on 04.09.18.
  12 + */
  13 +@Data
  14 +public class TbGetTelemetryCertainTimeRangeNodeConfiguration implements NodeConfiguration<TbGetTelemetryCertainTimeRangeNodeConfiguration> {
  15 +
  16 + public static final String FETCH_MODE_FIRST = "FIRST";
  17 + public static final String FETCH_MODE_LAST = "LAST";
  18 + public static final String FETCH_MODE_ALL = "ALL";
  19 + public static final int MAX_FETCH_SIZE = 1000;
  20 +
  21 + private int startInterval;
  22 + private int endInterval;
  23 + private String startIntervalTimeUnit;
  24 + private String endIntervalTimeUnit;
  25 + private String fetchMode; //FIRST, LAST, LATEST
  26 +
  27 + private List<String> latestTsKeyNames;
  28 +
  29 +
  30 +
  31 + @Override
  32 + public TbGetTelemetryCertainTimeRangeNodeConfiguration defaultConfiguration() {
  33 + TbGetTelemetryCertainTimeRangeNodeConfiguration configuration = new TbGetTelemetryCertainTimeRangeNodeConfiguration();
  34 + configuration.setLatestTsKeyNames(Collections.emptyList());
  35 + configuration.setStartIntervalTimeUnit(TimeUnit.MINUTES.name());
  36 + configuration.setStartInterval(1);
  37 + configuration.setEndIntervalTimeUnit(TimeUnit.MINUTES.name());
  38 + configuration.setEndInterval(2);
  39 + return configuration;
  40 + }
  41 +}