Commit 974bfd39a5e061b29b8ca196d433a96fe222a24a

Authored by Igor Kulikov
1 parent c13308b1

Update EntityView - copy latest timeseries, delete previous attributes, latest t…

…imeseries on entity view update
@@ -19,7 +19,9 @@ import com.google.common.util.concurrent.FutureCallback; @@ -19,7 +19,9 @@ import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures; 19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors; 21 import com.google.common.util.concurrent.MoreExecutors;
  22 +import com.google.common.util.concurrent.SettableFuture;
22 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
  24 +import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.http.HttpStatus; 25 import org.springframework.http.HttpStatus;
24 import org.springframework.security.access.prepost.PreAuthorize; 26 import org.springframework.security.access.prepost.PreAuthorize;
25 import org.springframework.web.bind.annotation.PathVariable; 27 import org.springframework.web.bind.annotation.PathVariable;
@@ -40,10 +42,14 @@ import org.thingsboard.server.common.data.id.EntityViewId; @@ -40,10 +42,14 @@ import org.thingsboard.server.common.data.id.EntityViewId;
40 import org.thingsboard.server.common.data.id.TenantId; 42 import org.thingsboard.server.common.data.id.TenantId;
41 import org.thingsboard.server.common.data.id.UUIDBased; 43 import org.thingsboard.server.common.data.id.UUIDBased;
42 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 44 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  45 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  46 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  47 +import org.thingsboard.server.common.data.kv.TsKvEntry;
43 import org.thingsboard.server.common.data.page.PageData; 48 import org.thingsboard.server.common.data.page.PageData;
44 import org.thingsboard.server.common.data.page.PageLink; 49 import org.thingsboard.server.common.data.page.PageLink;
45 import org.thingsboard.server.dao.exception.IncorrectParameterException; 50 import org.thingsboard.server.dao.exception.IncorrectParameterException;
46 import org.thingsboard.server.dao.model.ModelConstants; 51 import org.thingsboard.server.dao.model.ModelConstants;
  52 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
47 import org.thingsboard.server.queue.util.TbCoreComponent; 53 import org.thingsboard.server.queue.util.TbCoreComponent;
48 import org.thingsboard.server.service.security.model.SecurityUser; 54 import org.thingsboard.server.service.security.model.SecurityUser;
49 import org.thingsboard.server.service.security.permission.Operation; 55 import org.thingsboard.server.service.security.permission.Operation;
@@ -69,6 +75,9 @@ public class EntityViewController extends BaseController { @@ -69,6 +75,9 @@ public class EntityViewController extends BaseController {
69 75
70 public static final String ENTITY_VIEW_ID = "entityViewId"; 76 public static final String ENTITY_VIEW_ID = "entityViewId";
71 77
  78 + @Autowired
  79 + private TimeseriesService tsService;
  80 +
72 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") 81 @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
73 @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.GET) 82 @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.GET)
74 @ResponseBody 83 @ResponseBody
@@ -101,16 +110,37 @@ public class EntityViewController extends BaseController { @@ -101,16 +110,37 @@ public class EntityViewController extends BaseController {
101 try { 110 try {
102 entityView.setTenantId(getCurrentUser().getTenantId()); 111 entityView.setTenantId(getCurrentUser().getTenantId());
103 112
104 - checkEntity(entityView.getId(), entityView, Resource.ENTITY_VIEW); 113 + List<ListenableFuture<?>> futures = new ArrayList<>();
  114 +
  115 + if (entityView.getId() == null) {
  116 + accessControlService
  117 + .checkPermission(getCurrentUser(), Resource.ENTITY_VIEW, Operation.CREATE, null, entityView);
  118 + } else {
  119 + EntityView existingEntityView = checkNotNull(entityViewService.findEntityViewById(getCurrentUser().getTenantId(), entityView.getId()));
  120 + if (existingEntityView.getKeys() != null) {
  121 + if (existingEntityView.getKeys().getAttributes() != null) {
  122 + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.CLIENT_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
  123 + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
  124 + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
  125 + }
  126 + if (existingEntityView.getKeys().getTimeseries() != null && !existingEntityView.getKeys().getTimeseries().isEmpty()) {
  127 + futures.add(deleteLatestFromEntityView(existingEntityView, existingEntityView.getKeys().getTimeseries(), getCurrentUser()));
  128 + }
  129 + }
  130 + }
105 131
106 EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView)); 132 EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView));
107 - List<ListenableFuture<List<Void>>> futures = new ArrayList<>();  
108 - if (savedEntityView.getKeys() != null && savedEntityView.getKeys().getAttributes() != null) {  
109 - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));  
110 - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser()));  
111 - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); 133 + if (savedEntityView.getKeys() != null) {
  134 + if (savedEntityView.getKeys().getAttributes() != null) {
  135 + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
  136 + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser()));
  137 + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser()));
  138 + }
  139 + if (savedEntityView.getKeys().getTimeseries() != null && !savedEntityView.getKeys().getTimeseries().isEmpty()) {
  140 + futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser()));
  141 + }
112 } 142 }
113 - for (ListenableFuture<List<Void>> future : futures) { 143 + for (ListenableFuture<?> future : futures) {
114 try { 144 try {
115 future.get(); 145 future.get();
116 } catch (InterruptedException | ExecutionException e) { 146 } catch (InterruptedException | ExecutionException e) {
@@ -128,6 +158,98 @@ public class EntityViewController extends BaseController { @@ -128,6 +158,98 @@ public class EntityViewController extends BaseController {
128 } 158 }
129 } 159 }
130 160
  161 + private ListenableFuture<Void> deleteLatestFromEntityView(EntityView entityView, List<String> keys, SecurityUser user) {
  162 + EntityViewId entityId = entityView.getId();
  163 + SettableFuture<Void> resultFuture = SettableFuture.create();
  164 + if (keys != null && !keys.isEmpty()) {
  165 + tsSubService.deleteLatest(entityView.getTenantId(), entityId, keys, new FutureCallback<Void>() {
  166 + @Override
  167 + public void onSuccess(@Nullable Void tmp) {
  168 + try {
  169 + logTimeseriesDeleted(user, entityId, keys, null);
  170 + } catch (ThingsboardException e) {
  171 + log.error("Failed to log timeseries delete", e);
  172 + }
  173 + resultFuture.set(tmp);
  174 + }
  175 +
  176 + @Override
  177 + public void onFailure(Throwable t) {
  178 + try {
  179 + logTimeseriesDeleted(user, entityId, keys, t);
  180 + } catch (ThingsboardException e) {
  181 + log.error("Failed to log timeseries delete", e);
  182 + }
  183 + resultFuture.setException(t);
  184 + }
  185 + });
  186 + } else {
  187 + resultFuture.set(null);
  188 + }
  189 + return resultFuture;
  190 + }
  191 +
  192 + private ListenableFuture<Void> deleteAttributesFromEntityView(EntityView entityView, String scope, List<String> keys, SecurityUser user) {
  193 + EntityViewId entityId = entityView.getId();
  194 + SettableFuture<Void> resultFuture = SettableFuture.create();
  195 + if (keys != null && !keys.isEmpty()) {
  196 + tsSubService.deleteAndNotify(entityView.getTenantId(), entityId, scope, keys, new FutureCallback<Void>() {
  197 + @Override
  198 + public void onSuccess(@Nullable Void tmp) {
  199 + try {
  200 + logAttributesDeleted(user, entityId, scope, keys, null);
  201 + } catch (ThingsboardException e) {
  202 + log.error("Failed to log attribute delete", e);
  203 + }
  204 + resultFuture.set(tmp);
  205 + }
  206 +
  207 + @Override
  208 + public void onFailure(Throwable t) {
  209 + try {
  210 + logAttributesDeleted(user, entityId, scope, keys, t);
  211 + } catch (ThingsboardException e) {
  212 + log.error("Failed to log attribute delete", e);
  213 + }
  214 + resultFuture.setException(t);
  215 + }
  216 + });
  217 + } else {
  218 + resultFuture.set(null);
  219 + }
  220 + return resultFuture;
  221 + }
  222 +
  223 + private ListenableFuture<List<Void>> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) {
  224 + EntityViewId entityId = entityView.getId();
  225 + List<String> keys = entityView.getKeys().getTimeseries();
  226 + long startTime = entityView.getStartTimeMs();
  227 + long endTime = entityView.getEndTimeMs();
  228 + ListenableFuture<List<TsKvEntry>> latestFuture;
  229 + if (startTime == 0 && endTime == 0) {
  230 + latestFuture = tsService.findLatest(user.getTenantId(), entityView.getEntityId(), keys);
  231 + } else {
  232 + long startTs = startTime;
  233 + long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
  234 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
  235 + latestFuture = tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries);
  236 + }
  237 + return Futures.transform(latestFuture, latestValues -> {
  238 + if (latestValues != null && !latestValues.isEmpty()) {
  239 + tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback<Void>() {
  240 + @Override
  241 + public void onSuccess(@Nullable Void tmp) {
  242 + }
  243 +
  244 + @Override
  245 + public void onFailure(Throwable t) {
  246 + }
  247 + });
  248 + }
  249 + return null;
  250 + }, MoreExecutors.directExecutor());
  251 + }
  252 +
131 private ListenableFuture<List<Void>> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection<String> keys, SecurityUser user) throws ThingsboardException { 253 private ListenableFuture<List<Void>> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection<String> keys, SecurityUser user) throws ThingsboardException {
132 EntityViewId entityId = entityView.getId(); 254 EntityViewId entityId = entityView.getId();
133 if (keys != null && !keys.isEmpty()) { 255 if (keys != null && !keys.isEmpty()) {
@@ -174,10 +296,20 @@ public class EntityViewController extends BaseController { @@ -174,10 +296,20 @@ public class EntityViewController extends BaseController {
174 } 296 }
175 297
176 private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) throws ThingsboardException { 298 private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) throws ThingsboardException {
177 - logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_UPDATED, toException(e), 299 + logEntityAction(user, entityId, null, null, ActionType.ATTRIBUTES_UPDATED, toException(e),
178 scope, attributes); 300 scope, attributes);
179 } 301 }
180 302
  303 + private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) throws ThingsboardException {
  304 + logEntityAction(user, entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),
  305 + scope, keys);
  306 + }
  307 +
  308 + private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) throws ThingsboardException {
  309 + logEntityAction(user, entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
  310 + keys);
  311 + }
  312 +
181 @PreAuthorize("hasAuthority('TENANT_ADMIN')") 313 @PreAuthorize("hasAuthority('TENANT_ADMIN')")
182 @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.DELETE) 314 @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.DELETE)
183 @ResponseStatus(value = HttpStatus.OK) 315 @ResponseStatus(value = HttpStatus.OK)
@@ -171,6 +171,7 @@ public class ThingsboardInstallService { @@ -171,6 +171,7 @@ public class ThingsboardInstallService {
171 case "3.0.1": 171 case "3.0.1":
172 log.info("Upgrading ThingsBoard from version 3.0.1 to 3.1.0 ..."); 172 log.info("Upgrading ThingsBoard from version 3.0.1 to 3.1.0 ...");
173 databaseEntitiesUpgradeService.upgradeDatabase("3.0.1"); 173 databaseEntitiesUpgradeService.upgradeDatabase("3.0.1");
  174 + dataUpdateService.updateData("3.0.1");
174 log.info("Updating system data..."); 175 log.info("Updating system data...");
175 systemDataLoaderService.updateSystemWidgets(); 176 systemDataLoaderService.updateSystemWidgets();
176 break; 177 break;
@@ -15,20 +15,38 @@ @@ -15,20 +15,38 @@
15 */ 15 */
16 package org.thingsboard.server.service.install.update; 16 package org.thingsboard.server.service.install.update;
17 17
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import com.google.common.util.concurrent.MoreExecutors;
18 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
19 import org.springframework.beans.factory.annotation.Autowired; 23 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.context.annotation.Profile; 24 import org.springframework.context.annotation.Profile;
21 import org.springframework.stereotype.Service; 25 import org.springframework.stereotype.Service;
  26 +import org.thingsboard.server.common.data.EntityView;
22 import org.thingsboard.server.common.data.SearchTextBased; 27 import org.thingsboard.server.common.data.SearchTextBased;
23 import org.thingsboard.server.common.data.Tenant; 28 import org.thingsboard.server.common.data.Tenant;
  29 +import org.thingsboard.server.common.data.id.EntityViewId;
  30 +import org.thingsboard.server.common.data.id.TenantId;
24 import org.thingsboard.server.common.data.id.UUIDBased; 31 import org.thingsboard.server.common.data.id.UUIDBased;
  32 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  33 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  34 +import org.thingsboard.server.common.data.kv.TsKvEntry;
25 import org.thingsboard.server.common.data.page.PageData; 35 import org.thingsboard.server.common.data.page.PageData;
26 import org.thingsboard.server.common.data.page.PageLink; 36 import org.thingsboard.server.common.data.page.PageLink;
27 import org.thingsboard.server.common.data.rule.RuleChain; 37 import org.thingsboard.server.common.data.rule.RuleChain;
  38 +import org.thingsboard.server.dao.entityview.EntityViewService;
28 import org.thingsboard.server.dao.rule.RuleChainService; 39 import org.thingsboard.server.dao.rule.RuleChainService;
29 import org.thingsboard.server.dao.tenant.TenantService; 40 import org.thingsboard.server.dao.tenant.TenantService;
  41 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
30 import org.thingsboard.server.service.install.InstallScripts; 42 import org.thingsboard.server.service.install.InstallScripts;
31 43
  44 +import javax.annotation.Nullable;
  45 +import java.util.ArrayList;
  46 +import java.util.List;
  47 +import java.util.concurrent.ExecutionException;
  48 +import java.util.stream.Collectors;
  49 +
32 @Service 50 @Service
33 @Profile("install") 51 @Profile("install")
34 @Slf4j 52 @Slf4j
@@ -43,6 +61,12 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -43,6 +61,12 @@ public class DefaultDataUpdateService implements DataUpdateService {
43 @Autowired 61 @Autowired
44 private InstallScripts installScripts; 62 private InstallScripts installScripts;
45 63
  64 + @Autowired
  65 + private EntityViewService entityViewService;
  66 +
  67 + @Autowired
  68 + private TimeseriesService tsService;
  69 +
46 @Override 70 @Override
47 public void updateData(String fromVersion) throws Exception { 71 public void updateData(String fromVersion) throws Exception {
48 switch (fromVersion) { 72 switch (fromVersion) {
@@ -50,6 +74,10 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -50,6 +74,10 @@ public class DefaultDataUpdateService implements DataUpdateService {
50 log.info("Updating data from version 1.4.0 to 2.0.0 ..."); 74 log.info("Updating data from version 1.4.0 to 2.0.0 ...");
51 tenantsDefaultRuleChainUpdater.updateEntities(null); 75 tenantsDefaultRuleChainUpdater.updateEntities(null);
52 break; 76 break;
  77 + case "3.0.1":
  78 + log.info("Updating data from version 3.0.1 to 3.1.0 ...");
  79 + tenantsEntityViewsUpdater.updateEntities(null);
  80 + break;
53 default: 81 default:
54 throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); 82 throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
55 } 83 }
@@ -76,4 +104,66 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -76,4 +104,66 @@ public class DefaultDataUpdateService implements DataUpdateService {
76 } 104 }
77 }; 105 };
78 106
79 -}  
  107 + private PaginatedUpdater<String, Tenant> tenantsEntityViewsUpdater =
  108 + new PaginatedUpdater<String, Tenant>() {
  109 +
  110 + @Override
  111 + protected PageData<Tenant> findEntities(String region, PageLink pageLink) {
  112 + return tenantService.findTenants(pageLink);
  113 + }
  114 +
  115 + @Override
  116 + protected void updateEntity(Tenant tenant) {
  117 + updateTenantEntityViews(tenant.getId());
  118 + }
  119 + };
  120 +
  121 + private void updateTenantEntityViews(TenantId tenantId) {
  122 + PageLink pageLink = new PageLink(100);
  123 + PageData<EntityView> pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
  124 + boolean hasNext = true;
  125 + while (hasNext) {
  126 + List<ListenableFuture<List<Void>>> updateFutures = new ArrayList<>();
  127 + for (EntityView entityView : pageData.getData()) {
  128 + updateFutures.add(updateEntityViewLatestTelemetry(entityView));
  129 + }
  130 +
  131 + try {
  132 + Futures.allAsList(updateFutures).get();
  133 + } catch (InterruptedException | ExecutionException e) {
  134 + log.error("Failed to copy latest telemetry to entity view", e);
  135 + }
  136 +
  137 + if (pageData.hasNext()) {
  138 + pageLink = pageLink.nextPageLink();
  139 + pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
  140 + } else {
  141 + hasNext = false;
  142 + }
  143 + }
  144 + }
  145 +
  146 + private ListenableFuture<List<Void>> updateEntityViewLatestTelemetry(EntityView entityView) {
  147 + EntityViewId entityId = entityView.getId();
  148 + List<String> keys = entityView.getKeys().getTimeseries();
  149 + long startTime = entityView.getStartTimeMs();
  150 + long endTime = entityView.getEndTimeMs();
  151 + ListenableFuture<List<TsKvEntry>> latestFuture;
  152 + if (startTime == 0 && endTime == 0) {
  153 + latestFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, entityView.getEntityId(), keys);
  154 + } else {
  155 + long startTs = startTime;
  156 + long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
  157 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
  158 + latestFuture = tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
  159 + }
  160 + return Futures.transformAsync(latestFuture, latestValues -> {
  161 + if (latestValues != null && !latestValues.isEmpty()) {
  162 + ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues);
  163 + return saveFuture;
  164 + }
  165 + return null;
  166 + }, MoreExecutors.directExecutor());
  167 + }
  168 +
  169 +}
@@ -116,6 +116,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @@ -116,6 +116,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
116 } 116 }
117 117
118 @Override 118 @Override
  119 + public void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
  120 + ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
  121 + addMainCallback(saveFuture, callback);
  122 + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
  123 + }
  124 +
  125 + @Override
119 public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) { 126 public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
120 ListenableFuture<List<Void>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); 127 ListenableFuture<List<Void>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
121 addMainCallback(deleteFuture, callback); 128 addMainCallback(deleteFuture, callback);
@@ -123,6 +130,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @@ -123,6 +130,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
123 } 130 }
124 131
125 @Override 132 @Override
  133 + public void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
  134 + ListenableFuture<List<Void>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
  135 + addMainCallback(deleteFuture, callback);
  136 + }
  137 +
  138 + @Override
126 public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) { 139 public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
127 saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) 140 saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
128 , System.currentTimeMillis())), callback); 141 , System.currentTimeMillis())), callback);
@@ -40,5 +40,9 @@ public interface TimeseriesService { @@ -40,5 +40,9 @@ public interface TimeseriesService {
40 40
41 ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl); 41 ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
42 42
  43 + ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
  44 +
43 ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries); 45 ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
  46 +
  47 + ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
44 } 48 }
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId; @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId;
28 import org.thingsboard.server.common.data.id.EntityViewId; 28 import org.thingsboard.server.common.data.id.EntityViewId;
29 import org.thingsboard.server.common.data.id.TenantId; 29 import org.thingsboard.server.common.data.id.TenantId;
30 import org.thingsboard.server.common.data.kv.Aggregation; 30 import org.thingsboard.server.common.data.kv.Aggregation;
  31 +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
31 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; 32 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
32 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; 33 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
33 import org.thingsboard.server.common.data.kv.ReadTsKvQuery; 34 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@@ -149,6 +150,18 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -149,6 +150,18 @@ public class BaseTimeseriesService implements TimeseriesService {
149 return Futures.allAsList(futures); 150 return Futures.allAsList(futures);
150 } 151 }
151 152
  153 + @Override
  154 + public ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
  155 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size());
  156 + for (TsKvEntry tsKvEntry : tsKvEntries) {
  157 + if (tsKvEntry == null) {
  158 + throw new IncorrectParameterException("Key value entry can't be null");
  159 + }
  160 + futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
  161 + }
  162 + return Futures.allAsList(futures);
  163 + }
  164 +
152 private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { 165 private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
153 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { 166 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
154 throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); 167 throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
@@ -188,6 +201,17 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -188,6 +201,17 @@ public class BaseTimeseriesService implements TimeseriesService {
188 return Futures.allAsList(futures); 201 return Futures.allAsList(futures);
189 } 202 }
190 203
  204 + @Override
  205 + public ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
  206 + validate(entityId);
  207 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(keys.size());
  208 + for (String key : keys) {
  209 + DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
  210 + futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
  211 + }
  212 + return Futures.allAsList(futures);
  213 + }
  214 +
191 private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) { 215 private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
192 futures.add(timeseriesDao.remove(tenantId, entityId, query)); 216 futures.add(timeseriesDao.remove(tenantId, entityId, query));
193 futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); 217 futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
@@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService { @@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService {
36 36
37 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); 37 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
38 38
  39 + void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
  40 +
39 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback); 41 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
40 42
41 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback); 43 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
@@ -46,5 +48,7 @@ public interface RuleEngineTelemetryService { @@ -46,5 +48,7 @@ public interface RuleEngineTelemetryService {
46 48
47 void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback); 49 void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
48 50
  51 + void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
  52 +
49 53
50 } 54 }