|
@@ -64,7 +64,7 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -64,7 +64,7 @@ public class BaseTimeseriesService implements TimeseriesService { |
64
|
EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
|
64
|
EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
|
65
|
List<ReadTsKvQuery> filteredQueries =
|
65
|
List<ReadTsKvQuery> filteredQueries =
|
66
|
queries.stream()
|
66
|
queries.stream()
|
67
|
- .filter(query -> entityView.getKeys().getTimeseries().contains(query.getKey()))
|
67
|
+ .filter(query -> entityView.getKeys().getTimeseries().isEmpty() || entityView.getKeys().getTimeseries().contains(query.getKey()))
|
68
|
.collect(Collectors.toList());
|
68
|
.collect(Collectors.toList());
|
69
|
return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries));
|
69
|
return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries));
|
70
|
}
|
70
|
}
|
|
@@ -79,7 +79,9 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -79,7 +79,9 @@ public class BaseTimeseriesService implements TimeseriesService { |
79
|
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
79
|
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
80
|
EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
|
80
|
EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
|
81
|
List<String> filteredKeys = new ArrayList<>(keys);
|
81
|
List<String> filteredKeys = new ArrayList<>(keys);
|
82
|
- filteredKeys.retainAll(entityView.getKeys().getTimeseries());
|
82
|
+ if (!entityView.getKeys().getTimeseries().isEmpty()) {
|
|
|
83
|
+ filteredKeys.retainAll(entityView.getKeys().getTimeseries());
|
|
|
84
|
+ }
|
83
|
List<ReadTsKvQuery> queries =
|
85
|
List<ReadTsKvQuery> queries =
|
84
|
filteredKeys.stream()
|
86
|
filteredKeys.stream()
|
85
|
.map(key -> new BaseReadTsKvQuery(key, entityView.getStartTs(), entityView.getEndTs(), 1, "ASC"))
|
87
|
.map(key -> new BaseReadTsKvQuery(key, entityView.getStartTs(), entityView.getEndTs(), 1, "ASC"))
|
|
@@ -100,11 +102,6 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -100,11 +102,6 @@ public class BaseTimeseriesService implements TimeseriesService { |
100
|
@Override
|
102
|
@Override
|
101
|
public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) {
|
103
|
public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) {
|
102
|
validate(entityId);
|
104
|
validate(entityId);
|
103
|
- try {
|
|
|
104
|
- checkForNonEntityView(entityId);
|
|
|
105
|
- } catch (Exception e) {
|
|
|
106
|
- e.printStackTrace();
|
|
|
107
|
- }
|
|
|
108
|
if (tsKvEntry == null) {
|
105
|
if (tsKvEntry == null) {
|
109
|
throw new IncorrectParameterException("Key value entry can't be null");
|
106
|
throw new IncorrectParameterException("Key value entry can't be null");
|
110
|
}
|
107
|
}
|
|
@@ -115,11 +112,6 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -115,11 +112,6 @@ public class BaseTimeseriesService implements TimeseriesService { |
115
|
|
112
|
|
116
|
@Override
|
113
|
@Override
|
117
|
public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
114
|
public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
118
|
- try {
|
|
|
119
|
- checkForNonEntityView(entityId);
|
|
|
120
|
- } catch (Exception e) {
|
|
|
121
|
- e.printStackTrace();
|
|
|
122
|
- }
|
|
|
123
|
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
|
115
|
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
|
124
|
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
116
|
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
125
|
if (tsKvEntry == null) {
|
117
|
if (tsKvEntry == null) {
|
|
@@ -131,10 +123,8 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -131,10 +123,8 @@ public class BaseTimeseriesService implements TimeseriesService { |
131
|
}
|
123
|
}
|
132
|
|
124
|
|
133
|
private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
125
|
private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
134
|
- try {
|
|
|
135
|
- checkForNonEntityView(entityId);
|
|
|
136
|
- } catch (Exception e) {
|
|
|
137
|
- e.printStackTrace();
|
126
|
+ if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
|
|
127
|
+ throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Only read only");
|
138
|
}
|
128
|
}
|
139
|
futures.add(timeseriesDao.savePartition(entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl));
|
129
|
futures.add(timeseriesDao.savePartition(entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl));
|
140
|
futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
|
130
|
futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
|
|
@@ -145,7 +135,9 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -145,7 +135,9 @@ public class BaseTimeseriesService implements TimeseriesService { |
145
|
return queries.stream().map(query -> {
|
135
|
return queries.stream().map(query -> {
|
146
|
long startTs = entityView.getStartTs() == 0 ? query.getStartTs() : entityView.getStartTs();
|
136
|
long startTs = entityView.getStartTs() == 0 ? query.getStartTs() : entityView.getStartTs();
|
147
|
long endTs = entityView.getEndTs() == 0 ? query.getEndTs() : entityView.getEndTs();
|
137
|
long endTs = entityView.getEndTs() == 0 ? query.getEndTs() : entityView.getEndTs();
|
148
|
- return updateQuery(startTs, endTs, query);
|
138
|
+
|
|
|
139
|
+ return startTs <= query.getStartTs() && endTs >= query.getEndTs() ? query :
|
|
|
140
|
+ new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation());
|
149
|
}).collect(Collectors.toList());
|
141
|
}).collect(Collectors.toList());
|
150
|
}
|
142
|
}
|
151
|
|
143
|
|
|
@@ -187,15 +179,4 @@ public class BaseTimeseriesService implements TimeseriesService { |
|
@@ -187,15 +179,4 @@ public class BaseTimeseriesService implements TimeseriesService { |
187
|
throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
|
179
|
throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
|
188
|
}
|
180
|
}
|
189
|
}
|
181
|
}
|
190
|
-
|
|
|
191
|
- private ReadTsKvQuery updateQuery(Long startTs, Long endTs, ReadTsKvQuery query) {
|
|
|
192
|
- return startTs <= query.getStartTs() && endTs >= query.getEndTs() ? query :
|
|
|
193
|
- new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation());
|
|
|
194
|
- }
|
|
|
195
|
-
|
|
|
196
|
- private static void checkForNonEntityView(EntityId entityId) throws Exception {
|
|
|
197
|
- if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
|
|
198
|
- throw new Exception("Entity-views were read only");
|
|
|
199
|
- }
|
|
|
200
|
- }
|
|
|
201
|
} |
182
|
} |