Commit f63b4b1f7c3b843b391cb244307538400439e9e4

Authored by YevhenBondarenko
1 parent 9a03fbad

created findEntityTimeseriesAndAttributesKeysByQuery instead findEntityTimeserie…

…sKeysByQuery and findEntityAttributesKeysByQuery
... ... @@ -16,14 +16,16 @@
16 16 package org.thingsboard.server.controller;
17 17
18 18 import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.http.ResponseEntity;
19 20 import org.springframework.security.access.prepost.PreAuthorize;
20 21 import org.springframework.web.bind.annotation.RequestBody;
21 22 import org.springframework.web.bind.annotation.RequestMapping;
22 23 import org.springframework.web.bind.annotation.RequestMethod;
  24 +import org.springframework.web.bind.annotation.RequestParam;
23 25 import org.springframework.web.bind.annotation.ResponseBody;
24 26 import org.springframework.web.bind.annotation.RestController;
  27 +import org.springframework.web.context.request.async.DeferredResult;
25 28 import org.thingsboard.server.common.data.exception.ThingsboardException;
26   -import org.thingsboard.server.common.data.id.EntityId;
27 29 import org.thingsboard.server.common.data.id.TenantId;
28 30 import org.thingsboard.server.common.data.page.PageData;
29 31 import org.thingsboard.server.common.data.query.AlarmData;
... ... @@ -32,16 +34,9 @@ import org.thingsboard.server.common.data.query.EntityCountQuery;
32 34 import org.thingsboard.server.common.data.query.EntityData;
33 35 import org.thingsboard.server.common.data.query.EntityDataPageLink;
34 36 import org.thingsboard.server.common.data.query.EntityDataQuery;
35   -import org.thingsboard.server.dao.attributes.AttributesService;
36   -import org.thingsboard.server.dao.timeseries.TimeseriesService;
37 37 import org.thingsboard.server.queue.util.TbCoreComponent;
38 38 import org.thingsboard.server.service.query.EntityQueryService;
39 39
40   -import java.util.Collections;
41   -import java.util.List;
42   -import java.util.function.Function;
43   -import java.util.stream.Collectors;
44   -
45 40 @RestController
46 41 @TbCoreComponent
47 42 @RequestMapping("/api")
... ... @@ -50,13 +45,6 @@ public class EntityQueryController extends BaseController {
50 45 @Autowired
51 46 private EntityQueryService entityQueryService;
52 47
53   - @Autowired
54   - private AttributesService attributesService;
55   -
56   - @Autowired
57   - private TimeseriesService timeseriesService;
58   -
59   -
60 48 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
61 49 @RequestMapping(value = "/entitiesQuery/count", method = RequestMethod.POST)
62 50 @ResponseBody
... ... @@ -96,35 +84,22 @@ public class EntityQueryController extends BaseController {
96 84 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
97 85 @RequestMapping(value = "/entitiesQuery/find/keys/timeseries", method = RequestMethod.POST)
98 86 @ResponseBody
99   - public List<String> findEntityTimeseriesKeysByQuery(@RequestBody EntityDataQuery query) throws ThingsboardException {
  87 + public DeferredResult<ResponseEntity> findEntityTimeseriesAndAttributesKeysByQuery(@RequestBody EntityDataQuery query,
  88 + @RequestParam("timeseries") boolean isTimeseries,
  89 + @RequestParam("attributes") boolean isAttributes) throws ThingsboardException {
100 90 TenantId tenantId = getTenantId();
101   - return getKeys(query, entityIds -> timeseriesService.findAllKeysByEntityIds(tenantId, entityIds));
102   - }
103   -
104   - @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
105   - @RequestMapping(value = "/entitiesQuery/find/keys/attributes", method = RequestMethod.POST)
106   - @ResponseBody
107   - public List<String> findEntityAttributesKeysByQuery(@RequestBody EntityDataQuery query) throws ThingsboardException {
108   - TenantId tenantId = getTenantId();
109   - return getKeys(query, entityIds -> attributesService.findAllKeysByEntityIds(tenantId, entityIds.get(0).getEntityType(), entityIds));
110   - }
111   -
112   - private List<String> getKeys(EntityDataQuery query, Function<List<EntityId>, List<String>> function) throws ThingsboardException {
113 91 checkNotNull(query);
114 92 try {
115 93 EntityDataPageLink pageLink = query.getPageLink();
116 94 if (pageLink.getPageSize() > 100) {
117 95 pageLink.setPageSize(100);
118 96 }
119   - List<EntityId> ids = this.entityQueryService.findEntityDataByQuery(getCurrentUser(), query).getData().stream()
120   - .map(EntityData::getEntityId)
121   - .collect(Collectors.toList());
122   - if (ids.isEmpty()) {
123   - return Collections.emptyList();
124   - }
125   - return function.apply(ids);
  97 + DeferredResult<ResponseEntity> response = new DeferredResult<>();
  98 + entityQueryService.getKeysByQueryCallback(getCurrentUser(), tenantId, query, isTimeseries, isAttributes, response);
  99 + return response;
126 100 } catch (Exception e) {
127 101 throw handleException(e);
128 102 }
129 103 }
  104 +
130 105 }
... ...
... ... @@ -15,11 +15,24 @@
15 15 */
16 16 package org.thingsboard.server.service.query;
17 17
  18 +import com.datastax.oss.driver.internal.core.util.CollectionsUtils;
  19 +import com.fasterxml.jackson.databind.node.ArrayNode;
  20 +import com.fasterxml.jackson.databind.node.ObjectNode;
  21 +import com.google.common.util.concurrent.FutureCallback;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import com.google.common.util.concurrent.ListenableFuture;
18 24 import lombok.extern.slf4j.Slf4j;
  25 +import org.checkerframework.checker.nullness.qual.Nullable;
19 26 import org.springframework.beans.factory.annotation.Autowired;
20 27 import org.springframework.beans.factory.annotation.Value;
  28 +import org.springframework.http.HttpStatus;
  29 +import org.springframework.http.ResponseEntity;
21 30 import org.springframework.stereotype.Service;
  31 +import org.springframework.util.CollectionUtils;
  32 +import org.springframework.web.context.request.async.DeferredResult;
  33 +import org.thingsboard.server.common.data.EntityType;
22 34 import org.thingsboard.server.common.data.id.EntityId;
  35 +import org.thingsboard.server.common.data.id.TenantId;
23 36 import org.thingsboard.server.common.data.page.PageData;
24 37 import org.thingsboard.server.common.data.query.AlarmData;
25 38 import org.thingsboard.server.common.data.query.AlarmDataQuery;
... ... @@ -31,12 +44,24 @@ import org.thingsboard.server.common.data.query.EntityDataSortOrder;
31 44 import org.thingsboard.server.common.data.query.EntityKey;
32 45 import org.thingsboard.server.common.data.query.EntityKeyType;
33 46 import org.thingsboard.server.dao.alarm.AlarmService;
  47 +import org.thingsboard.server.dao.attributes.AttributesService;
34 48 import org.thingsboard.server.dao.entity.EntityService;
35 49 import org.thingsboard.server.dao.model.ModelConstants;
  50 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  51 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
36 52 import org.thingsboard.server.queue.util.TbCoreComponent;
  53 +import org.thingsboard.server.service.executors.DbCallbackExecutorService;
  54 +import org.thingsboard.server.service.security.AccessValidator;
37 55 import org.thingsboard.server.service.security.model.SecurityUser;
38 56
  57 +import java.util.ArrayList;
  58 +import java.util.Collection;
  59 +import java.util.Collections;
39 60 import java.util.LinkedHashMap;
  61 +import java.util.List;
  62 +import java.util.Map;
  63 +import java.util.Set;
  64 +import java.util.stream.Collectors;
40 65
41 66 @Service
42 67 @Slf4j
... ... @@ -52,6 +77,15 @@ public class DefaultEntityQueryService implements EntityQueryService {
52 77 @Value("${server.ws.max_entities_per_alarm_subscription:1000}")
53 78 private int maxEntitiesPerAlarmSubscription;
54 79
  80 + @Autowired
  81 + private DbCallbackExecutorService dbCallbackExecutor;
  82 +
  83 + @Autowired
  84 + private TimeseriesService timeseriesService;
  85 +
  86 + @Autowired
  87 + private AttributesService attributesService;
  88 +
55 89 @Override
56 90 public long countEntitiesByQuery(SecurityUser securityUser, EntityCountQuery query) {
57 91 return entityService.countEntitiesByQuery(securityUser.getTenantId(), securityUser.getCustomerId(), query);
... ... @@ -89,6 +123,107 @@ public class DefaultEntityQueryService implements EntityQueryService {
89 123 }
90 124 }
91 125
  126 + @Override
  127 + public void getKeysByQueryCallback(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
  128 + boolean isTimeseries, boolean isAttributes, DeferredResult<ResponseEntity> response) {
  129 + if (!isAttributes && !isTimeseries) {
  130 + getEmptyResponseCallback(response);
  131 + return;
  132 + }
  133 +
  134 + List<EntityId> ids = this.findEntityDataByQuery(securityUser, query).getData().stream()
  135 + .map(EntityData::getEntityId)
  136 + .collect(Collectors.toList());
  137 + if (ids.isEmpty()) {
  138 + getEmptyResponseCallback(response);
  139 + return;
  140 + }
  141 +
  142 + Set<EntityType> types = ids.stream().map(EntityId::getEntityType).collect(Collectors.toSet());
  143 + ListenableFuture<List<String>> timeseriesKeysFuture;
  144 + ListenableFuture<List<String>> attributesKeysFuture;
  145 +
  146 + if (isTimeseries) {
  147 + timeseriesKeysFuture = dbCallbackExecutor.submit(() -> timeseriesService.findAllKeysByEntityIds(tenantId, ids));
  148 + } else {
  149 + timeseriesKeysFuture = null;
  150 + }
  151 +
  152 + if (isAttributes) {
  153 + Map<EntityType, List<EntityId>> typesMap = ids.stream().collect(Collectors.groupingBy(EntityId::getEntityType));
  154 + List<ListenableFuture<List<String>>> futures = new ArrayList<>(typesMap.size());
  155 + typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, type, entityIds))));
  156 + attributesKeysFuture = Futures.transform(Futures.allAsList(futures), lists -> {
  157 + if (CollectionUtils.isEmpty(lists)) {
  158 + return null;
  159 + }
  160 +
  161 + return lists.stream().flatMap(List::stream).distinct().sorted().collect(Collectors.toList());
  162 + }, dbCallbackExecutor);
  163 + } else {
  164 + attributesKeysFuture = null;
  165 + }
  166 +
  167 + if (timeseriesKeysFuture != null && attributesKeysFuture != null) {
  168 + Futures.whenAllComplete(timeseriesKeysFuture, attributesKeysFuture).call(() -> {
  169 + try {
  170 + getResponseCallback(response, types, timeseriesKeysFuture.get(), attributesKeysFuture.get());
  171 + } catch (Exception e) {
  172 + log.error("Failed to fetch timeseries and attributes keys!", e);
  173 + AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
  174 + }
  175 +
  176 + return null;
  177 + }, dbCallbackExecutor);
  178 + } else if (timeseriesKeysFuture != null) {
  179 + Futures.addCallback(timeseriesKeysFuture, new FutureCallback<List<String>>() {
  180 + @Override
  181 + public void onSuccess(@Nullable List<String> keys) {
  182 + getResponseCallback(response, types, keys, null);
  183 + }
  184 +
  185 + @Override
  186 + public void onFailure(Throwable t) {
  187 + log.error("Failed to fetch timeseries keys!", t);
  188 + AccessValidator.handleError(t, response, HttpStatus.INTERNAL_SERVER_ERROR);
  189 + }
  190 +
  191 + }, dbCallbackExecutor);
  192 + } else {
  193 + Futures.addCallback(attributesKeysFuture, new FutureCallback<List<String>>() {
  194 + @Override
  195 + public void onSuccess(@Nullable List<String> keys) {
  196 + getResponseCallback(response, types, null, keys);
  197 + }
  198 +
  199 + @Override
  200 + public void onFailure(Throwable t) {
  201 + log.error("Failed to fetch attributes keys!", t);
  202 + AccessValidator.handleError(t, response, HttpStatus.INTERNAL_SERVER_ERROR);
  203 + }
  204 + }, dbCallbackExecutor);
  205 + }
  206 + }
  207 +
  208 + private void getResponseCallback(DeferredResult<ResponseEntity> response, Set<EntityType> types, List<String> timeseriesKeys, List<String> attributesKeys) {
  209 + ObjectNode json = JacksonUtil.newObjectNode();
  210 + addItemsToArrayNode(json.putArray("types"), types);
  211 + addItemsToArrayNode(json.putArray("timeseriesKeys"), timeseriesKeys);
  212 + addItemsToArrayNode(json.putArray("attributesKeys"), attributesKeys);
  213 +
  214 + response.setResult(new ResponseEntity(json, HttpStatus.OK));
  215 + }
  216 +
  217 + private void getEmptyResponseCallback(DeferredResult<ResponseEntity> response) {
  218 + getResponseCallback(response, null, null, null);
  219 + }
  220 +
  221 + private void addItemsToArrayNode(ArrayNode arrayNode, Collection<?> collection) {
  222 + if (!CollectionUtils.isEmpty(collection)) {
  223 + collection.forEach(item -> arrayNode.add(item.toString()));
  224 + }
  225 + }
  226 +
92 227 private EntityDataQuery buildEntityDataQuery(AlarmDataQuery query) {
93 228 EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder();
94 229 EntityDataSortOrder entitiesSortOrder;
... ...
... ... @@ -15,6 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.service.query;
17 17
  18 +import org.springframework.http.ResponseEntity;
  19 +import org.springframework.web.context.request.async.DeferredResult;
  20 +import org.thingsboard.server.common.data.id.TenantId;
18 21 import org.thingsboard.server.common.data.page.PageData;
19 22 import org.thingsboard.server.common.data.query.AlarmData;
20 23 import org.thingsboard.server.common.data.query.AlarmDataQuery;
... ... @@ -31,4 +34,7 @@ public interface EntityQueryService {
31 34
32 35 PageData<AlarmData> findAlarmDataByQuery(SecurityUser securityUser, AlarmDataQuery query);
33 36
  37 + void getKeysByQueryCallback(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
  38 + boolean isTimeseries, boolean isAttributes, DeferredResult<ResponseEntity> response);
  39 +
34 40 }
... ...