Commit 093eafb462cab211107761c6fa54f1aba54835cb

Authored by Dima Landiak
1 parent f45ba17f

delete timeseries fixes and controller, entityView findByName added

... ... @@ -29,7 +29,6 @@ import org.springframework.web.bind.annotation.RequestParam;
29 29 import org.springframework.web.bind.annotation.ResponseBody;
30 30 import org.springframework.web.bind.annotation.ResponseStatus;
31 31 import org.springframework.web.bind.annotation.RestController;
32   -import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
33 32 import org.thingsboard.server.common.data.Customer;
34 33 import org.thingsboard.server.common.data.DataConstants;
35 34 import org.thingsboard.server.common.data.EntitySubtype;
... ... @@ -39,7 +38,6 @@ import org.thingsboard.server.common.data.audit.ActionType;
39 38 import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
40 39 import org.thingsboard.server.common.data.exception.ThingsboardException;
41 40 import org.thingsboard.server.common.data.id.CustomerId;
42   -import org.thingsboard.server.common.data.id.DeviceId;
43 41 import org.thingsboard.server.common.data.id.EntityId;
44 42 import org.thingsboard.server.common.data.id.EntityViewId;
45 43 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -47,7 +45,6 @@ import org.thingsboard.server.common.data.id.UUIDBased;
47 45 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
48 46 import org.thingsboard.server.common.data.page.TextPageData;
49 47 import org.thingsboard.server.common.data.page.TextPageLink;
50   -import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
51 48 import org.thingsboard.server.dao.exception.IncorrectParameterException;
52 49 import org.thingsboard.server.dao.model.ModelConstants;
53 50 import org.thingsboard.server.service.security.model.SecurityUser;
... ... @@ -174,7 +171,7 @@ public class EntityViewController extends BaseController {
174 171 EntityView entityView = checkEntityViewId(entityViewId);
175 172 entityViewService.deleteEntityView(entityViewId);
176 173 logEntityAction(entityViewId, entityView, entityView.getCustomerId(),
177   - ActionType.DELETED,null, strEntityViewId);
  174 + ActionType.DELETED, null, strEntityViewId);
178 175 } catch (Exception e) {
179 176 logEntityAction(emptyId(EntityType.ENTITY_VIEW),
180 177 null,
... ... @@ -185,10 +182,23 @@ public class EntityViewController extends BaseController {
185 182 }
186 183
187 184 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
  185 + @RequestMapping(value = "/tenant/entityViews", params = {"entityViewName"}, method = RequestMethod.GET)
  186 + @ResponseBody
  187 + public EntityView getTenantEntityView(
  188 + @RequestParam String entityViewName) throws ThingsboardException {
  189 + try {
  190 + TenantId tenantId = getCurrentUser().getTenantId();
  191 + return checkNotNull(entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName));
  192 + } catch (Exception e) {
  193 + throw handleException(e);
  194 + }
  195 + }
  196 +
  197 + @PreAuthorize("hasAuthority('TENANT_ADMIN')")
188 198 @RequestMapping(value = "/customer/{customerId}/entityView/{entityViewId}", method = RequestMethod.POST)
189 199 @ResponseBody
190 200 public EntityView assignEntityViewToCustomer(@PathVariable(CUSTOMER_ID) String strCustomerId,
191   - @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
  201 + @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
192 202 checkParameter(CUSTOMER_ID, strCustomerId);
193 203 checkParameter(ENTITY_VIEW_ID, strEntityViewId);
194 204 try {
... ...
... ... @@ -49,9 +49,11 @@ import org.thingsboard.server.common.data.kv.Aggregation;
49 49 import org.thingsboard.server.common.data.kv.AttributeKey;
50 50 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
51 51 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  52 +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
52 53 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
53 54 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
54 55 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  56 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
55 57 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
56 58 import org.thingsboard.server.common.data.kv.KvEntry;
57 59 import org.thingsboard.server.common.data.kv.LongDataEntry;
... ... @@ -60,12 +62,10 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
60 62 import org.thingsboard.server.common.data.kv.TsKvEntry;
61 63 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
62 64 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
63   -import org.thingsboard.server.dao.attributes.AttributesService;
64 65 import org.thingsboard.server.dao.timeseries.TimeseriesService;
65 66 import org.thingsboard.server.service.security.AccessValidator;
66 67 import org.thingsboard.server.service.security.model.SecurityUser;
67 68 import org.thingsboard.server.service.telemetry.AttributeData;
68   -import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
69 69 import org.thingsboard.server.service.telemetry.TsData;
70 70 import org.thingsboard.server.service.telemetry.exception.InvalidParametersException;
71 71 import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
... ... @@ -250,6 +250,60 @@ public class TelemetryController extends BaseController {
250 250 }
251 251
252 252 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  253 + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE)
  254 + @ResponseBody
  255 + public DeferredResult<ResponseEntity> deleteEntityTimeseries(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  256 + @RequestParam(name = "keys") String keysStr,
  257 + @RequestParam(name = "deleteAllDataForKeys", defaultValue = "false") boolean deleteAllDataForKeys,
  258 + @RequestParam(name = "startTs", required = false) Long startTs,
  259 + @RequestParam(name = "endTs", required = false) Long endTs,
  260 + @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException {
  261 + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  262 + return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted);
  263 + }
  264 +
  265 + private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys,
  266 + Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException {
  267 + List<String> keys = toKeysList(keysStr);
  268 + if (keys.isEmpty()) {
  269 + return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
  270 + }
  271 + SecurityUser user = getCurrentUser();
  272 +
  273 + long deleteFromTs;
  274 + long deleteToTs;
  275 + if (deleteAllDataForKeys) {
  276 + deleteFromTs = 0L;
  277 + deleteToTs = System.currentTimeMillis();
  278 + } else {
  279 + deleteFromTs = startTs;
  280 + deleteToTs = endTs;
  281 + }
  282 +
  283 + return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> {
  284 + List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>();
  285 + for (String key : keys) {
  286 + deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
  287 + }
  288 +
  289 + ListenableFuture<List<Void>> future = tsService.remove(entityId, deleteTsKvQueries);
  290 + Futures.addCallback(future, new FutureCallback<List<Void>>() {
  291 + @Override
  292 + public void onSuccess(@Nullable List<Void> tmp) {
  293 + logTimeseriesDeleted(user, entityId, keys, null);
  294 + result.setResult(new ResponseEntity<>(HttpStatus.OK));
  295 + }
  296 +
  297 + @Override
  298 + public void onFailure(Throwable t) {
  299 + logTimeseriesDeleted(user, entityId, keys, t);
  300 + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  301 + }
  302 + }, executor);
  303 + });
  304 + }
  305 +
  306 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
253 307 @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
254 308 @ResponseBody
255 309 public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
... ... @@ -506,6 +560,15 @@ public class TelemetryController extends BaseController {
506 560 };
507 561 }
508 562
  563 + private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
  564 + try {
  565 + logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
  566 + keys);
  567 + } catch (ThingsboardException te) {
  568 + log.warn("Failed to log timeseries delete", te);
  569 + }
  570 + }
  571 +
509 572 private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
510 573 try {
511 574 logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),
... ...
... ... @@ -24,6 +24,7 @@ public enum ActionType {
24 24 UPDATED(false), // log entity
25 25 ATTRIBUTES_UPDATED(false), // log attributes/values
26 26 ATTRIBUTES_DELETED(false), // log attributes
  27 + TIMESERIES_DELETED(false), // log timeseries
27 28 RPC_CALL(false), // log method and params
28 29 CREDENTIALS_UPDATED(false), // log new credentials
29 30 ASSIGNED_TO_CUSTOMER(false), // log customer name
... ... @@ -32,11 +33,11 @@ public enum ActionType {
32 33 SUSPENDED(false), // log string id
33 34 CREDENTIALS_READ(true), // log device id
34 35 ATTRIBUTES_READ(true), // log attributes
35   - RELATION_ADD_OR_UPDATE (false),
36   - RELATION_DELETED (false),
37   - RELATIONS_DELETED (false),
38   - ALARM_ACK (false),
39   - ALARM_CLEAR (false);
  36 + RELATION_ADD_OR_UPDATE(false),
  37 + RELATION_DELETED(false),
  38 + RELATIONS_DELETED(false),
  39 + ALARM_ACK(false),
  40 + ALARM_CLEAR(false);
40 41
41 42 private final boolean isRead;
42 43
... ...
... ... @@ -43,6 +43,8 @@ public interface EntityViewService {
43 43
44 44 EntityView findEntityViewById(EntityViewId entityViewId);
45 45
  46 + EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name);
  47 +
46 48 TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink);
47 49
48 50 TextPageData<EntityView> findEntityViewByTenantIdAndType(TenantId tenantId, TextPageLink pageLink, String type);
... ...
... ... @@ -29,8 +29,6 @@ import org.springframework.cache.annotation.Cacheable;
29 29 import org.springframework.cache.annotation.Caching;
30 30 import org.springframework.stereotype.Service;
31 31 import org.thingsboard.server.common.data.Customer;
32   -import org.thingsboard.server.common.data.DataConstants;
33   -import org.thingsboard.server.common.data.Device;
34 32 import org.thingsboard.server.common.data.EntitySubtype;
35 33 import org.thingsboard.server.common.data.EntityType;
36 34 import org.thingsboard.server.common.data.EntityView;
... ... @@ -40,12 +38,10 @@ import org.thingsboard.server.common.data.id.CustomerId;
40 38 import org.thingsboard.server.common.data.id.EntityId;
41 39 import org.thingsboard.server.common.data.id.EntityViewId;
42 40 import org.thingsboard.server.common.data.id.TenantId;
43   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
44 41 import org.thingsboard.server.common.data.page.TextPageData;
45 42 import org.thingsboard.server.common.data.page.TextPageLink;
46 43 import org.thingsboard.server.common.data.relation.EntityRelation;
47 44 import org.thingsboard.server.common.data.relation.EntitySearchDirection;
48   -import org.thingsboard.server.dao.attributes.AttributesService;
49 45 import org.thingsboard.server.dao.customer.CustomerDao;
50 46 import org.thingsboard.server.dao.entity.AbstractEntityService;
51 47 import org.thingsboard.server.dao.exception.DataValidationException;
... ... @@ -56,15 +52,13 @@ import org.thingsboard.server.dao.tenant.TenantDao;
56 52 import javax.annotation.Nullable;
57 53 import java.util.ArrayList;
58 54 import java.util.Arrays;
59   -import java.util.Collection;
60 55 import java.util.Collections;
61 56 import java.util.Comparator;
62 57 import java.util.List;
63   -import java.util.concurrent.ExecutionException;
  58 +import java.util.Optional;
64 59 import java.util.stream.Collectors;
65 60
66 61 import static org.thingsboard.server.common.data.CacheConstants.ENTITY_VIEW_CACHE;
67   -import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
68 62 import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
69 63 import static org.thingsboard.server.dao.service.Validator.validateId;
70 64 import static org.thingsboard.server.dao.service.Validator.validatePageLink;
... ... @@ -96,6 +90,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
96 90
97 91 @Caching(evict = {
98 92 @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.entityId}"),
  93 + @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.name}"),
99 94 @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.id}")})
100 95 @Override
101 96 public EntityView saveEntityView(EntityView entityView) {
... ... @@ -137,6 +132,15 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
137 132 return entityViewDao.findById(entityViewId.getId());
138 133 }
139 134
  135 + @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}")
  136 + @Override
  137 + public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) {
  138 + log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name);
  139 + validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
  140 + Optional<EntityView> entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name);
  141 + return entityViewOpt.orElse(null);
  142 + }
  143 +
140 144 @Override
141 145 public TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink) {
142 146 log.trace("Executing findEntityViewsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);
... ... @@ -255,6 +259,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
255 259 deleteEntityRelations(entityViewId);
256 260 EntityView entityView = entityViewDao.findById(entityViewId.getId());
257 261 cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId()));
  262 + cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName()));
258 263 entityViewDao.removeById(entityViewId.getId());
259 264 }
260 265
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
17 17
18 18 import com.google.common.base.Function;
19 19 import com.google.common.collect.Lists;
  20 +import com.google.common.util.concurrent.FutureCallback;
20 21 import com.google.common.util.concurrent.Futures;
21 22 import com.google.common.util.concurrent.ListenableFuture;
22 23 import com.google.common.util.concurrent.ListeningExecutorService;
... ... @@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
31 32 import org.thingsboard.server.common.data.UUIDConverter;
32 33 import org.thingsboard.server.common.data.id.EntityId;
33 34 import org.thingsboard.server.common.data.kv.Aggregation;
  35 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
34 36 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
35 37 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
36 38 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
... ... @@ -41,9 +43,9 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
41 43 import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
42 44 import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
43 45 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
  46 +import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
44 47 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
45 48 import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
46   -import org.thingsboard.server.dao.util.SqlDao;
47 49 import org.thingsboard.server.dao.util.SqlTsDao;
48 50
49 51 import javax.annotation.Nullable;
... ... @@ -53,6 +55,7 @@ import java.util.ArrayList;
53 55 import java.util.List;
54 56 import java.util.Optional;
55 57 import java.util.concurrent.CompletableFuture;
  58 +import java.util.concurrent.ExecutionException;
56 59 import java.util.concurrent.Executors;
57 60 import java.util.stream.Collectors;
58 61
... ... @@ -64,6 +67,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
64 67 @SqlTsDao
65 68 public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
66 69
  70 + private static final String DESC_ORDER = "DESC";
  71 +
67 72 @Value("${sql.ts_inserts_executor_type}")
68 73 private String insertExecutorType;
69 74
... ... @@ -326,14 +331,72 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
326 331
327 332 @Override
328 333 public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
329   - TsKvLatestEntity latestEntity = new TsKvLatestEntity();
330   - latestEntity.setEntityType(entityId.getEntityType());
331   - latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
332   - latestEntity.setKey(query.getKey());
333   - return service.submit(() -> {
334   - tsKvLatestRepository.delete(latestEntity);
335   - return null;
  334 + ListenableFuture<TsKvEntry> latestFuture = findLatest(entityId, query.getKey());
  335 +
  336 + ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
  337 + long ts = tsKvEntry.getTs();
  338 + return ts > query.getStartTs() && ts <= query.getEndTs();
  339 + }, service);
  340 +
  341 + ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  342 + if (isRemove) {
  343 + TsKvLatestEntity latestEntity = new TsKvLatestEntity();
  344 + latestEntity.setEntityType(entityId.getEntityType());
  345 + latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
  346 + latestEntity.setKey(query.getKey());
  347 + return service.submit(() -> {
  348 + tsKvLatestRepository.delete(latestEntity);
  349 + return null;
  350 + });
  351 + }
  352 + return Futures.immediateFuture(null);
  353 + }, service);
  354 +
  355 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  356 + Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
  357 + @Override
  358 + public void onSuccess(@Nullable Void result) {
  359 + if (query.getRewriteLatestIfDeleted()) {
  360 + ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  361 + if (isRemove) {
  362 + return getNewLatestEntryFuture(entityId, query);
  363 + }
  364 + return Futures.immediateFuture(null);
  365 + }, service);
  366 +
  367 + try {
  368 + resultFuture.set(savedLatestFuture.get());
  369 + } catch (InterruptedException | ExecutionException e) {
  370 + log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
  371 + }
  372 + } else {
  373 + resultFuture.set(null);
  374 + }
  375 + }
  376 +
  377 + @Override
  378 + public void onFailure(Throwable t) {
  379 + log.warn("[{}] Failed to process remove of the latest value", entityId, t);
  380 + }
336 381 });
  382 + return resultFuture;
  383 + }
  384 +
  385 + private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
  386 + long startTs = 0;
  387 + long endTs = query.getStartTs() - 1;
  388 + ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
  389 + Aggregation.NONE, DESC_ORDER);
  390 + ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
  391 +
  392 + return Futures.transformAsync(future, entryList -> {
  393 + if (entryList.size() == 1) {
  394 + return saveLatest(entityId, entryList.get(0));
  395 + } else {
  396 + log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
  397 + }
  398 + return Futures.immediateFuture(null);
  399 + }, service);
337 400 }
338 401
339 402 @Override
... ...
... ... @@ -47,7 +47,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
47 47 @Modifying
48 48 @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
49 49 "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
50   - "AND tskv.ts > :startTs AND tskv.ts < :endTs")
  50 + "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
51 51 void delete(@Param("entityId") String entityId,
52 52 @Param("entityType") EntityType entityType,
53 53 @Param("entityKey") String key,
... ...
... ... @@ -48,7 +48,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
48 48 import org.thingsboard.server.common.data.kv.TsKvEntry;
49 49 import org.thingsboard.server.dao.model.ModelConstants;
50 50 import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
51   -import org.thingsboard.server.dao.util.NoSqlDao;
52 51 import org.thingsboard.server.dao.util.NoSqlTsDao;
53 52
54 53 import javax.annotation.Nullable;
... ... @@ -62,6 +61,7 @@ import java.util.Arrays;
62 61 import java.util.Collections;
63 62 import java.util.List;
64 63 import java.util.Optional;
  64 +import java.util.concurrent.ExecutionException;
65 65 import java.util.stream.Collectors;
66 66
67 67 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
... ... @@ -434,14 +434,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
434 434 public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
435 435 ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
436 436
437   - ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
  437 + ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
438 438 long ts = latestEntry.getTs();
439   - if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
440   - return Futures.immediateFuture(true);
  439 + if (ts > query.getStartTs() && ts <= query.getEndTs()) {
  440 + return true;
441 441 } else {
442 442 log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
443 443 }
444   - return Futures.immediateFuture(false);
  444 + return false;
445 445 }, readResultsProcessingExecutor);
446 446
447 447 ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
... ... @@ -451,18 +451,34 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
451 451 return Futures.immediateFuture(null);
452 452 }, readResultsProcessingExecutor);
453 453
454   - if (query.getRewriteLatestIfDeleted()) {
455   - ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
456   - if (isRemove) {
457   - return getNewLatestEntryFuture(entityId, query);
  454 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  455 + Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
  456 + @Override
  457 + public void onSuccess(@Nullable Void result) {
  458 + if (query.getRewriteLatestIfDeleted()) {
  459 + ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  460 + if (isRemove) {
  461 + return getNewLatestEntryFuture(entityId, query);
  462 + }
  463 + return Futures.immediateFuture(null);
  464 + }, readResultsProcessingExecutor);
  465 +
  466 + try {
  467 + resultFuture.set(savedLatestFuture.get());
  468 + } catch (InterruptedException | ExecutionException e) {
  469 + log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
  470 + }
  471 + } else {
  472 + resultFuture.set(null);
458 473 }
459   - return Futures.immediateFuture(null);
460   - }, readResultsProcessingExecutor);
  474 + }
461 475
462   - return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
463   - list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
464   - }
465   - return removedLatestFuture;
  476 + @Override
  477 + public void onFailure(Throwable t) {
  478 + log.warn("[{}] Failed to process remove of the latest value", entityId, t);
  479 + }
  480 + });
  481 + return resultFuture;
466 482 }
467 483
468 484 private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
... ...
... ... @@ -152,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
152 152 }
153 153
154 154 @Test
155   - public void testDeleteDeviceTsData() throws Exception {
  155 + public void testDeleteDeviceTsDataWithoutOverwritingLatest() throws Exception {
156 156 DeviceId deviceId = new DeviceId(UUIDs.timeBased());
157 157
158 158 saveEntries(deviceId, 10000);
... ... @@ -172,6 +172,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
172 172 }
173 173
174 174 @Test
  175 + public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception {
  176 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  177 +
  178 + saveEntries(deviceId, 10000);
  179 + saveEntries(deviceId, 20000);
  180 + saveEntries(deviceId, 30000);
  181 + saveEntries(deviceId, 40000);
  182 +
  183 + tsService.remove(deviceId, Collections.singletonList(
  184 + new BaseDeleteTsKvQuery(STRING_KEY, 25000, 45000, true))).get();
  185 +
  186 + List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
  187 + new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
  188 + Assert.assertEquals(2, list.size());
  189 +
  190 + List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
  191 + Assert.assertEquals(20000, latest.get(0).getTs());
  192 + }
  193 +
  194 + @Test
175 195 public void testFindDeviceTsData() throws Exception {
176 196 DeviceId deviceId = new DeviceId(UUIDs.timeBased());
177 197 List<TsKvEntry> entries = new ArrayList<>();
... ...