Showing
7 changed files
with
34 additions
and
59 deletions
... | ... | @@ -33,11 +33,8 @@ import org.thingsboard.server.common.data.EntityView; |
33 | 33 | import org.thingsboard.server.common.data.id.DeviceId; |
34 | 34 | import org.thingsboard.server.common.data.id.EntityId; |
35 | 35 | import org.thingsboard.server.common.data.id.EntityIdFactory; |
36 | -//<<<<<<< HEAD | |
37 | 36 | import org.thingsboard.server.common.data.id.EntityViewId; |
38 | -//======= | |
39 | 37 | import org.thingsboard.server.common.data.id.TenantId; |
40 | -//>>>>>>> d909192071880b7af2137333142bc62ece369ec1 | |
41 | 38 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
42 | 39 | import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
43 | 40 | import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
... | ... | @@ -108,9 +105,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
108 | 105 | @Autowired |
109 | 106 | private ClusterRpcService rpcService; |
110 | 107 | |
111 | - /*@Autowired | |
112 | - private EntityViewService entityViewService;*/ | |
113 | - | |
114 | 108 | @Autowired |
115 | 109 | @Lazy |
116 | 110 | private DeviceStateService stateService; |
... | ... | @@ -143,17 +137,15 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
143 | 137 | |
144 | 138 | @Override |
145 | 139 | public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) { |
146 | - String familyName = entityId.getEntityType().equals(EntityType.ENTITY_VIEW) | |
147 | - ? ModelConstants.ENTITY_VIEW_FAMILY_NAME : ModelConstants.DEVICE_FAMILY_NAME; | |
148 | 140 | Optional<ServerAddress> server = routingService.resolveById(entityId); |
149 | 141 | Subscription subscription; |
150 | 142 | if (server.isPresent()) { |
151 | 143 | ServerAddress address = server.get(); |
152 | - log.trace("[{}] Forwarding subscription [{}] for " + familyName + " [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address); | |
144 | + log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId, address); | |
153 | 145 | subscription = new Subscription(sub, true, address); |
154 | 146 | tellNewSubscription(address, sessionId, subscription); |
155 | 147 | } else { |
156 | - log.trace("[{}] Registering local subscription [{}] for " + familyName + " [{}]", sessionId, sub.getSubscriptionId(), entityId); | |
148 | + log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId); | |
157 | 149 | subscription = new Subscription(sub, true); |
158 | 150 | } |
159 | 151 | registerSubscription(sessionId, entityId, subscription); | ... | ... |
... | ... | @@ -34,7 +34,7 @@ public class Subscription { |
34 | 34 | private long endTime; |
35 | 35 | |
36 | 36 | public Subscription(SubscriptionState sub, boolean local) { |
37 | - this(sub, local, null, 0L, 0L); | |
37 | + this(sub, local, null); | |
38 | 38 | } |
39 | 39 | |
40 | 40 | public Subscription(SubscriptionState sub, boolean local, ServerAddress server) { | ... | ... |
... | ... | @@ -24,7 +24,7 @@ import java.util.Arrays; |
24 | 24 | |
25 | 25 | @RunWith(ClasspathSuite.class) |
26 | 26 | @ClasspathSuite.ClassnameFilters({ |
27 | - "org.thingsboard.server.controller.sql.EntityViewControllerSqlTest", | |
27 | + "org.thingsboard.server.controller.sql.*Test", | |
28 | 28 | }) |
29 | 29 | public class ControllerSqlTestSuite { |
30 | 30 | ... | ... |
... | ... | @@ -43,7 +43,7 @@ public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery { |
43 | 43 | } |
44 | 44 | |
45 | 45 | public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String orderBy) { |
46 | - this(key, startTs, endTs, endTs - startTs, limit, Aggregation.AVG, orderBy); | |
46 | + this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, orderBy); | |
47 | 47 | } |
48 | 48 | |
49 | 49 | } | ... | ... |
... | ... | @@ -89,7 +89,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
89 | 89 | @Autowired |
90 | 90 | private CacheManager cacheManager; |
91 | 91 | |
92 | - @Cacheable(cacheNames = ENTITY_VIEW_CACHE) | |
92 | +// @Cacheable(cacheNames = ENTITY_VIEW_CACHE) | |
93 | 93 | @Override |
94 | 94 | public EntityView findEntityViewById(EntityViewId entityViewId) { |
95 | 95 | log.trace("Executing findEntityViewById [{}]", entityViewId); |
... | ... | @@ -105,7 +105,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
105 | 105 | .orElse(null); |
106 | 106 | } |
107 | 107 | |
108 | - @CachePut(cacheNames = ENTITY_VIEW_CACHE) | |
108 | +// @CachePut(cacheNames = ENTITY_VIEW_CACHE) | |
109 | 109 | @Override |
110 | 110 | public EntityView saveEntityView(EntityView entityView) { |
111 | 111 | log.trace("Executing save entity view [{}]", entityView); |
... | ... | @@ -130,14 +130,14 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
130 | 130 | @Override |
131 | 131 | public void deleteEntityView(EntityViewId entityViewId) { |
132 | 132 | log.trace("Executing deleteEntityView [{}]", entityViewId); |
133 | - Cache cache = cacheManager.getCache(ENTITY_VIEW_CACHE); | |
133 | +// Cache cache = cacheManager.getCache(ENTITY_VIEW_CACHE); | |
134 | 134 | validateId(entityViewId, INCORRECT_ENTITY_VIEW_ID + entityViewId); |
135 | 135 | deleteEntityRelations(entityViewId); |
136 | 136 | EntityView entityView = entityViewDao.findById(entityViewId.getId()); |
137 | - List<Object> list = new ArrayList<>(); | |
138 | - list.add(entityView.getTenantId()); | |
139 | - list.add(entityView.getName()); | |
140 | - cache.evict(list); | |
137 | +// List<Object> list = new ArrayList<>(); | |
138 | +// list.add(entityView.getTenantId()); | |
139 | +// list.add(entityView.getName()); | |
140 | +// cache.evict(list); | |
141 | 141 | entityViewDao.removeById(entityViewId.getId()); |
142 | 142 | } |
143 | 143 | |
... | ... | @@ -150,7 +150,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
150 | 150 | return new TextPageData<>(entityViews, pageLink); |
151 | 151 | } |
152 | 152 | |
153 | - @Cacheable(cacheNames = ENTITY_VIEW_CACHE) | |
153 | +// @Cacheable(cacheNames = ENTITY_VIEW_CACHE) | |
154 | 154 | @Override |
155 | 155 | public TextPageData<EntityView> findEntityViewByTenantIdAndEntityId(TenantId tenantId, EntityId entityId, |
156 | 156 | TextPageLink pageLink) { |
... | ... | @@ -190,7 +190,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti |
190 | 190 | return new TextPageData<>(entityViews, pageLink); |
191 | 191 | } |
192 | 192 | |
193 | - @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #customerId, #entityId, #pageLink}") | |
193 | +// @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #customerId, #entityId, #pageLink}") | |
194 | 194 | @Override |
195 | 195 | public TextPageData<EntityView> findEntityViewsByTenantIdAndCustomerIdAndEntityId(TenantId tenantId, |
196 | 196 | CustomerId customerId, | ... | ... |
... | ... | @@ -36,6 +36,7 @@ import org.thingsboard.server.dao.service.Validator; |
36 | 36 | import java.util.ArrayList; |
37 | 37 | import java.util.Collection; |
38 | 38 | import java.util.List; |
39 | +import java.util.stream.Collectors; | |
39 | 40 | |
40 | 41 | import static org.apache.commons.lang3.StringUtils.isBlank; |
41 | 42 | |
... | ... | @@ -61,7 +62,11 @@ public class BaseTimeseriesService implements TimeseriesService { |
61 | 62 | queries.forEach(BaseTimeseriesService::validate); |
62 | 63 | if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { |
63 | 64 | EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); |
64 | - return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, queries)); | |
65 | + List<ReadTsKvQuery> filteredQueries = | |
66 | + queries.stream() | |
67 | + .filter(query -> entityView.getKeys().getTimeseries().contains(query.getKey())) | |
68 | + .collect(Collectors.toList()); | |
69 | + return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries)); | |
65 | 70 | } |
66 | 71 | return timeseriesDao.findAllAsync(entityId, queries); |
67 | 72 | } |
... | ... | @@ -73,11 +78,12 @@ public class BaseTimeseriesService implements TimeseriesService { |
73 | 78 | keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); |
74 | 79 | if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { |
75 | 80 | EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); |
76 | - Collection<String> matchingKeys = chooseKeysForEntityView(entityView, keys); | |
77 | - List<ReadTsKvQuery> queries = new ArrayList<>(); | |
78 | - | |
79 | - matchingKeys.forEach(key -> queries.add( | |
80 | - new BaseReadTsKvQuery(key, entityView.getStartTs(), entityView.getEndTs(), 1, "ASC"))); | |
81 | + List<String> filteredKeys = new ArrayList<>(keys); | |
82 | + filteredKeys.retainAll(entityView.getKeys().getTimeseries()); | |
83 | + List<ReadTsKvQuery> queries = | |
84 | + filteredKeys.stream() | |
85 | + .map(key -> new BaseReadTsKvQuery(key, entityView.getStartTs(), entityView.getEndTs(), 1, "ASC")) | |
86 | + .collect(Collectors.toList()); | |
81 | 87 | |
82 | 88 | return timeseriesDao.findAllAsync(entityView.getEntityId(), updateQueriesForEntityView(entityView, queries)); |
83 | 89 | } |
... | ... | @@ -136,34 +142,11 @@ public class BaseTimeseriesService implements TimeseriesService { |
136 | 142 | } |
137 | 143 | |
138 | 144 | private List<ReadTsKvQuery> updateQueriesForEntityView(EntityView entityView, List<ReadTsKvQuery> queries) { |
139 | - List<ReadTsKvQuery> newQueries = new ArrayList<>(); | |
140 | - entityView.getKeys().getTimeseries() | |
141 | - .forEach(viewKey -> queries | |
142 | - .forEach(query -> { | |
143 | - if (query.getKey().equals(viewKey)) { | |
144 | - if (entityView.getStartTs() == 0 && entityView.getEndTs() == 0) { | |
145 | - newQueries.add(updateQuery(query.getStartTs(), query.getEndTs(), viewKey, query)); | |
146 | - } else if (entityView.getStartTs() == 0 && entityView.getEndTs() != 0) { | |
147 | - newQueries.add(updateQuery(query.getStartTs(), entityView.getEndTs(), viewKey, query)); | |
148 | - } else if (entityView.getStartTs() != 0 && entityView.getEndTs() == 0) { | |
149 | - newQueries.add(updateQuery(entityView.getStartTs(), query.getEndTs(), viewKey, query)); | |
150 | - } else { | |
151 | - newQueries.add(updateQuery(entityView.getStartTs(), entityView.getEndTs(), viewKey, query)); | |
152 | - } | |
153 | - }})); | |
154 | - return newQueries; | |
155 | - } | |
156 | - | |
157 | - private Collection<String> chooseKeysForEntityView(EntityView entityView, Collection<String> keys) { | |
158 | - Collection<String> newKeys = new ArrayList<>(); | |
159 | - entityView.getKeys().getTimeseries() | |
160 | - .forEach(viewKey -> keys | |
161 | - .forEach(key -> { | |
162 | - if (key.equals(viewKey)) { | |
163 | - newKeys.add(key); | |
164 | - } | |
165 | - })); | |
166 | - return newKeys; | |
145 | + return queries.stream().map(query -> { | |
146 | + long startTs = entityView.getStartTs() == 0 ? query.getStartTs() : entityView.getStartTs(); | |
147 | + long endTs = entityView.getEndTs() == 0 ? query.getEndTs() : entityView.getEndTs(); | |
148 | + return updateQuery(startTs, endTs, query); | |
149 | + }).collect(Collectors.toList()); | |
167 | 150 | } |
168 | 151 | |
169 | 152 | @Override |
... | ... | @@ -205,9 +188,9 @@ public class BaseTimeseriesService implements TimeseriesService { |
205 | 188 | } |
206 | 189 | } |
207 | 190 | |
208 | - private static ReadTsKvQuery updateQuery(Long startTs, Long endTs, String viewKey, ReadTsKvQuery query) { | |
191 | + private ReadTsKvQuery updateQuery(Long startTs, Long endTs, ReadTsKvQuery query) { | |
209 | 192 | return startTs <= query.getStartTs() && endTs >= query.getEndTs() ? query : |
210 | - new BaseReadTsKvQuery(viewKey, startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation()); | |
193 | + new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation()); | |
211 | 194 | } |
212 | 195 | |
213 | 196 | private static void checkForNonEntityView(EntityId entityId) throws Exception { | ... | ... |