Commit 8478cd9b371a7b50406fc659db30522ea0de3123

Authored by Igor Kulikov
1 parent 265f9e22

Copy latest timeseries to entityView on post telemetry

... ... @@ -18,11 +18,14 @@ package org.thingsboard.server.service.telemetry;
18 18 import com.google.common.util.concurrent.FutureCallback;
19 19 import com.google.common.util.concurrent.Futures;
20 20 import com.google.common.util.concurrent.ListenableFuture;
  21 +import com.google.common.util.concurrent.MoreExecutors;
21 22 import lombok.extern.slf4j.Slf4j;
22 23 import org.springframework.beans.factory.annotation.Autowired;
23 24 import org.springframework.context.event.EventListener;
24 25 import org.springframework.stereotype.Service;
25 26 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  27 +import org.thingsboard.server.common.data.EntityType;
  28 +import org.thingsboard.server.common.data.EntityView;
26 29 import org.thingsboard.server.common.data.id.EntityId;
27 30 import org.thingsboard.server.common.data.id.TenantId;
28 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
... ... @@ -36,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
36 39 import org.thingsboard.server.common.msg.queue.TbCallback;
37 40 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
38 41 import org.thingsboard.server.dao.attributes.AttributesService;
  42 +import org.thingsboard.server.dao.entityview.EntityViewService;
39 43 import org.thingsboard.server.dao.timeseries.TimeseriesService;
40 44 import org.thingsboard.server.gen.transport.TransportProtos;
41 45 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
... ... @@ -47,7 +51,9 @@ import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
47 51 import javax.annotation.Nullable;
48 52 import javax.annotation.PostConstruct;
49 53 import javax.annotation.PreDestroy;
  54 +import java.util.ArrayList;
50 55 import java.util.Collections;
  56 +import java.util.Comparator;
51 57 import java.util.List;
52 58 import java.util.Optional;
53 59 import java.util.Set;
... ... @@ -55,6 +61,7 @@ import java.util.concurrent.ConcurrentHashMap;
55 61 import java.util.concurrent.ExecutorService;
56 62 import java.util.concurrent.Executors;
57 63 import java.util.function.Consumer;
  64 +import java.util.stream.Collectors;
58 65
59 66 /**
60 67 * Created by ashvayka on 27.03.18.
... ... @@ -65,16 +72,19 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
65 72
66 73 private final AttributesService attrService;
67 74 private final TimeseriesService tsService;
  75 + private final EntityViewService entityViewService;
68 76
69 77 private ExecutorService tsCallBackExecutor;
70 78
71 79 public DefaultTelemetrySubscriptionService(AttributesService attrService,
72 80 TimeseriesService tsService,
  81 + EntityViewService entityViewService,
73 82 TbClusterService clusterService,
74 83 PartitionService partitionService) {
75 84 super(clusterService, partitionService);
76 85 this.attrService = attrService;
77 86 this.tsService = tsService;
  87 + this.entityViewService = entityViewService;
78 88 }
79 89
80 90 @PostConstruct
... ... @@ -106,6 +116,50 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
106 116 ListenableFuture<List<Void>> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
107 117 addMainCallback(saveFuture, callback);
108 118 addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
  119 + if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
  120 + Futures.addCallback(this.entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
  121 + new FutureCallback<List<EntityView>>() {
  122 + @Override
  123 + public void onSuccess(@Nullable List<EntityView> result) {
  124 + if (result != null) {
  125 + for (EntityView entityView : result) {
  126 + if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null
  127 + && !entityView.getKeys().getTimeseries().isEmpty()) {
  128 + List<TsKvEntry> entityViewLatest = new ArrayList<>();
  129 + for (String key : entityView.getKeys().getTimeseries()) {
  130 + long startTime = entityView.getStartTimeMs();
  131 + long endTime = entityView.getEndTimeMs();
  132 + long startTs = startTime;
  133 + long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
  134 + Optional<TsKvEntry> tsKvEntry = ts.stream()
  135 + .filter(entry -> entry.getKey().equals(key) && entry.getTs() > startTs && entry.getTs() <= endTs)
  136 + .max(Comparator.comparingLong(TsKvEntry::getTs));
  137 + if (tsKvEntry.isPresent()) {
  138 + entityViewLatest.add(tsKvEntry.get());
  139 + }
  140 + }
  141 + if (!entityViewLatest.isEmpty()) {
  142 + saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<Void>() {
  143 + @Override
  144 + public void onSuccess(@Nullable Void tmp) {
  145 + }
  146 +
  147 + @Override
  148 + public void onFailure(Throwable t) {
  149 + }
  150 + });
  151 + }
  152 + }
  153 + }
  154 + }
  155 + }
  156 +
  157 + @Override
  158 + public void onFailure(Throwable t) {
  159 + log.error("Error while finding entity views by tenantId and entityId", t);
  160 + }
  161 + }, MoreExecutors.directExecutor());
  162 + }
109 163 }
110 164
111 165 @Override
... ...