Commit 66b330280ebe114d8ebe75f61aaa4a9da4f7e622
1 parent
ad48ecab
Was added checks on entity-views obj fiew method for fetch update queries startTs & entdTs and other
Showing
1 changed file
with
81 additions
and
5 deletions
1 | /** | 1 | /** |
2 | * Copyright © 2016-2018 The Thingsboard Authors | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | 3 | + * <p> |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. | 5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at | 6 | * You may obtain a copy of the License at |
7 | - * | ||
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | - * | 7 | + * <p> |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * <p> | ||
10 | * Unless required by applicable law or agreed to in writing, software | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
@@ -21,12 +21,18 @@ import com.google.common.util.concurrent.ListenableFuture; | @@ -21,12 +21,18 @@ import com.google.common.util.concurrent.ListenableFuture; | ||
21 | import lombok.extern.slf4j.Slf4j; | 21 | import lombok.extern.slf4j.Slf4j; |
22 | import org.springframework.beans.factory.annotation.Autowired; | 22 | import org.springframework.beans.factory.annotation.Autowired; |
23 | import org.springframework.stereotype.Service; | 23 | import org.springframework.stereotype.Service; |
24 | +import org.thingsboard.server.common.data.EntityType; | ||
25 | +import org.thingsboard.server.common.data.EntityView; | ||
24 | import org.thingsboard.server.common.data.id.EntityId; | 26 | import org.thingsboard.server.common.data.id.EntityId; |
27 | +import org.thingsboard.server.common.data.id.EntityViewId; | ||
28 | +import org.thingsboard.server.common.data.kv.BaseTsKvQuery; | ||
25 | import org.thingsboard.server.common.data.kv.TsKvEntry; | 29 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
26 | import org.thingsboard.server.common.data.kv.TsKvQuery; | 30 | import org.thingsboard.server.common.data.kv.TsKvQuery; |
31 | +import org.thingsboard.server.dao.entityview.EntityViewService; | ||
27 | import org.thingsboard.server.dao.exception.IncorrectParameterException; | 32 | import org.thingsboard.server.dao.exception.IncorrectParameterException; |
28 | import org.thingsboard.server.dao.service.Validator; | 33 | import org.thingsboard.server.dao.service.Validator; |
29 | 34 | ||
35 | +import java.util.ArrayList; | ||
30 | import java.util.Collection; | 36 | import java.util.Collection; |
31 | import java.util.List; | 37 | import java.util.List; |
32 | 38 | ||
@@ -44,10 +50,17 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -44,10 +50,17 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
44 | @Autowired | 50 | @Autowired |
45 | private TimeseriesDao timeseriesDao; | 51 | private TimeseriesDao timeseriesDao; |
46 | 52 | ||
53 | + @Autowired | ||
54 | + private EntityViewService entityViewService; | ||
55 | + | ||
47 | @Override | 56 | @Override |
48 | public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) { | 57 | public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) { |
49 | validate(entityId); | 58 | validate(entityId); |
50 | queries.forEach(query -> validate(query)); | 59 | queries.forEach(query -> validate(query)); |
60 | + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { | ||
61 | + EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); | ||
62 | + return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, queries)); | ||
63 | + } | ||
51 | return timeseriesDao.findAllAsync(entityId, queries); | 64 | return timeseriesDao.findAllAsync(entityId, queries); |
52 | } | 65 | } |
53 | 66 | ||
@@ -56,7 +69,13 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -56,7 +69,13 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
56 | validate(entityId); | 69 | validate(entityId); |
57 | List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size()); | 70 | List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size()); |
58 | keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); | 71 | keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); |
59 | - keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key))); | 72 | + if (false/*entityId.getEntityType().equals(EntityType.ENTITY_VIEW)*/) { |
73 | + EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); | ||
74 | + Collection<String> newKeys = chooseKeysForEntityView(entityView, keys); | ||
75 | + newKeys.forEach(newKey -> futures.add(timeseriesDao.findLatest(entityView.getEntityId(), newKey))); | ||
76 | + } else { | ||
77 | + keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key))); | ||
78 | + } | ||
60 | return Futures.allAsList(futures); | 79 | return Futures.allAsList(futures); |
61 | } | 80 | } |
62 | 81 | ||
@@ -69,6 +88,11 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -69,6 +88,11 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
69 | @Override | 88 | @Override |
70 | public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) { | 89 | public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) { |
71 | validate(entityId); | 90 | validate(entityId); |
91 | + try { | ||
92 | + checkForNonEntityView(entityId); | ||
93 | + } catch (Exception e) { | ||
94 | + e.printStackTrace(); | ||
95 | + } | ||
72 | if (tsKvEntry == null) { | 96 | if (tsKvEntry == null) { |
73 | throw new IncorrectParameterException("Key value entry can't be null"); | 97 | throw new IncorrectParameterException("Key value entry can't be null"); |
74 | } | 98 | } |
@@ -79,6 +103,11 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -79,6 +103,11 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
79 | 103 | ||
80 | @Override | 104 | @Override |
81 | public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) { | 105 | public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) { |
106 | + try { | ||
107 | + checkForNonEntityView(entityId); | ||
108 | + } catch (Exception e) { | ||
109 | + e.printStackTrace(); | ||
110 | + } | ||
82 | List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); | 111 | List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); |
83 | for (TsKvEntry tsKvEntry : tsKvEntries) { | 112 | for (TsKvEntry tsKvEntry : tsKvEntries) { |
84 | if (tsKvEntry == null) { | 113 | if (tsKvEntry == null) { |
@@ -90,11 +119,47 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -90,11 +119,47 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
90 | } | 119 | } |
91 | 120 | ||
92 | private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { | 121 | private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { |
122 | + try { | ||
123 | + checkForNonEntityView(entityId); | ||
124 | + } catch (Exception e) { | ||
125 | + e.printStackTrace(); | ||
126 | + } | ||
93 | futures.add(timeseriesDao.savePartition(entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl)); | 127 | futures.add(timeseriesDao.savePartition(entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl)); |
94 | futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry)); | 128 | futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry)); |
95 | futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl)); | 129 | futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl)); |
96 | } | 130 | } |
97 | 131 | ||
132 | + private List<TsKvQuery> updateQueriesForEntityView(EntityView entityView, List<TsKvQuery> queries) { | ||
133 | + List<TsKvQuery> newQueries = new ArrayList<>(); | ||
134 | + entityView.getKeys().getTimeseries() | ||
135 | + .forEach(viewKey -> queries | ||
136 | + .forEach(query -> { | ||
137 | + if (query.getKey().equals(viewKey)) { | ||
138 | + if (entityView.getStartTs() == 0 && entityView.getEndTs() == 0) { | ||
139 | + newQueries.add(updateQuery(query.getStartTs(), query.getEndTs(), viewKey, query)); | ||
140 | + } else if (entityView.getStartTs() == 0 && entityView.getEndTs() != 0) { | ||
141 | + newQueries.add(updateQuery(query.getStartTs(), entityView.getEndTs(), viewKey, query)); | ||
142 | + } else if (entityView.getStartTs() != 0 && entityView.getEndTs() == 0) { | ||
143 | + newQueries.add(updateQuery(entityView.getStartTs(), query.getEndTs(), viewKey, query)); | ||
144 | + } else { | ||
145 | + newQueries.add(updateQuery(entityView.getStartTs(), entityView.getEndTs(), viewKey, query)); | ||
146 | + } | ||
147 | + }})); | ||
148 | + return newQueries; | ||
149 | + } | ||
150 | + | ||
151 | + @Deprecated /*Will be a modified*/ | ||
152 | + private Collection<String> chooseKeysForEntityView(EntityView entityView, Collection<String> keys) { | ||
153 | + Collection<String> newKeys = new ArrayList<>(); | ||
154 | + entityView.getKeys().getTimeseries() | ||
155 | + .forEach(viewKey -> keys | ||
156 | + .forEach(key -> { | ||
157 | + if (key.equals(viewKey)) { | ||
158 | + newKeys.add(key); | ||
159 | + }})); | ||
160 | + return newKeys; | ||
161 | + } | ||
162 | + | ||
98 | private static void validate(EntityId entityId) { | 163 | private static void validate(EntityId entityId) { |
99 | Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); | 164 | Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); |
100 | } | 165 | } |
@@ -108,4 +173,15 @@ public class BaseTimeseriesService implements TimeseriesService { | @@ -108,4 +173,15 @@ public class BaseTimeseriesService implements TimeseriesService { | ||
108 | throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty"); | 173 | throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty"); |
109 | } | 174 | } |
110 | } | 175 | } |
176 | + | ||
177 | + private static TsKvQuery updateQuery(Long startTs, Long endTs, String viewKey, TsKvQuery query) { | ||
178 | + return startTs <= query.getStartTs() && endTs >= query.getEndTs() ? query : | ||
179 | + new BaseTsKvQuery(viewKey, startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation()); | ||
180 | + } | ||
181 | + | ||
182 | + private static void checkForNonEntityView(EntityId entityId) throws Exception { | ||
183 | + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { | ||
184 | + throw new Exception("Entity-views were read only"); | ||
185 | + } | ||
186 | + } | ||
111 | } | 187 | } |