Showing
2 changed files
with
12 additions
and
12 deletions
... | ... | @@ -52,7 +52,8 @@ import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; |
52 | 52 | import javax.annotation.Nullable; |
53 | 53 | import javax.annotation.PostConstruct; |
54 | 54 | import javax.annotation.PreDestroy; |
55 | -import java.util.Comparator; | |
55 | +import java.util.ArrayList; | |
56 | +import java.util.HashMap; | |
56 | 57 | import java.util.List; |
57 | 58 | import java.util.Map; |
58 | 59 | import java.util.Objects; |
... | ... | @@ -129,16 +130,15 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
129 | 130 | tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads); |
130 | 131 | |
131 | 132 | tsLatestQueue.init(logExecutor, v -> { |
132 | - Map<TsKey, List<TsKvLatestEntity>> tsMap = | |
133 | - v.stream().collect(Collectors.groupingBy(ts -> new TsKey(ts.getEntityId(), ts.getStrKey()))); | |
134 | - | |
135 | - List<TsKvLatestEntity> latestEntities = | |
136 | - tsMap.keySet() | |
137 | - .stream() | |
138 | - .map(tsMap::get) | |
139 | - .map(list -> list.stream().max(Comparator.comparing(TsKvLatestEntity::getTs)).get()) | |
140 | - .collect(Collectors.toList()); | |
141 | - | |
133 | + Map<TsKey, TsKvLatestEntity> trueLatest = new HashMap<>(); | |
134 | + v.forEach(ts -> { | |
135 | + TsKey key = new TsKey(ts.getEntityId(), ts.getKey()); | |
136 | + TsKvLatestEntity old = trueLatest.get(key); | |
137 | + if (old == null || old.getTs() < ts.getTs()) { | |
138 | + trueLatest.put(key, ts); | |
139 | + } | |
140 | + }); | |
141 | + List<TsKvLatestEntity> latestEntities = new ArrayList<>(trueLatest.values()); | |
142 | 142 | insertLatestTsRepository.saveOrUpdate(latestEntities); |
143 | 143 | }); |
144 | 144 | } | ... | ... |