Commit 89e9d27baf446ec28c43343deed23dc9cdd987ae

Authored by Andrew Shvayka
1 parent 1b469df4

TTL Rule Implementation

... ... @@ -168,12 +168,18 @@ public final class PluginProcessingContext implements PluginContext {
168 168
169 169 @Override
170 170 public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
  171 + saveTsData(entityId, entries, 0L, callback);
  172 + }
  173 +
  174 + @Override
  175 + public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
171 176 validate(entityId, new ValidationCallback(callback, ctx -> {
172   - ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries);
  177 + ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
173 178 Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
174 179 }));
175 180 }
176 181
  182 +
177 183 @Override
178 184 public void loadTimeseries(final EntityId entityId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
179 185 validate(entityId, new ValidationCallback(callback, ctx -> {
... ...
... ... @@ -19,7 +19,7 @@ akka {
19 19 # JVM shutdown, System.exit(-1), in case of a fatal error,
20 20 # such as OutOfMemoryError
21 21 jvm-exit-on-fatal-error = off
22   - loglevel = "INFO"
  22 + loglevel = "DEBUG"
23 23 loggers = ["akka.event.slf4j.Slf4jLogger"]
24 24 }
25 25
... ...
... ... @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.kv.*;
31 31 import org.thingsboard.server.common.data.kv.DataType;
32 32 import org.thingsboard.server.dao.AbstractAsyncDao;
33   -import org.thingsboard.server.dao.AbstractDao;
34 33 import org.thingsboard.server.dao.model.ModelConstants;
35 34
36 35 import javax.annotation.Nullable;
... ... @@ -40,8 +39,6 @@ import java.time.Instant;
40 39 import java.time.LocalDateTime;
41 40 import java.time.ZoneOffset;
42 41 import java.util.*;
43   -import java.util.concurrent.ExecutorService;
44   -import java.util.concurrent.Executors;
45 42 import java.util.stream.Collectors;
46 43
47 44 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
... ... @@ -64,8 +61,10 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
64 61 private TsPartitionDate tsFormat;
65 62
66 63 private PreparedStatement partitionInsertStmt;
  64 + private PreparedStatement partitionInsertTtlStmt;
67 65 private PreparedStatement[] latestInsertStmts;
68 66 private PreparedStatement[] saveStmts;
  67 + private PreparedStatement[] saveTtlStmts;
69 68 private PreparedStatement[] fetchStmts;
70 69 private PreparedStatement findLatestStmt;
71 70 private PreparedStatement findAllLatestStmt;
... ... @@ -255,15 +254,32 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
255 254 }
256 255
257 256 @Override
258   - public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) {
  257 + public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
259 258 DataType type = tsKvEntry.getDataType();
260   - BoundStatement stmt = getSaveStmt(type).bind()
261   - .setString(0, entityId.getEntityType().name())
  259 + BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
  260 + stmt.setString(0, entityId.getEntityType().name())
262 261 .setUUID(1, entityId.getId())
263 262 .setString(2, tsKvEntry.getKey())
264 263 .setLong(3, partition)
265 264 .setLong(4, tsKvEntry.getTs());
266 265 addValue(tsKvEntry, stmt, 5);
  266 + if (ttl > 0) {
  267 + stmt.setInt(6, (int) ttl);
  268 + }
  269 + return executeAsyncWrite(stmt);
  270 + }
  271 +
  272 + @Override
  273 + public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
  274 + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
  275 + BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
  276 + stmt = stmt.setString(0, entityId.getEntityType().name())
  277 + .setUUID(1, entityId.getId())
  278 + .setLong(2, partition)
  279 + .setString(3, key);
  280 + if (ttl > 0) {
  281 + stmt.setInt(4, (int) ttl);
  282 + }
267 283 return executeAsyncWrite(stmt);
268 284 }
269 285
... ... @@ -280,16 +296,6 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
280 296 }
281 297
282 298 @Override
283   - public ResultSetFuture savePartition(EntityId entityId, long partition, String key) {
284   - log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
285   - return executeAsyncWrite(getPartitionInsertStmt().bind()
286   - .setString(0, entityId.getEntityType().name())
287   - .setUUID(1, entityId.getId())
288   - .setLong(2, partition)
289   - .setString(3, key));
290   - }
291   -
292   - @Override
293 299 public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
294 300 List<TsKvEntry> entries = new ArrayList<>(rows.size());
295 301 if (!rows.isEmpty()) {
... ... @@ -365,6 +371,23 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
365 371 return saveStmts[dataType.ordinal()];
366 372 }
367 373
  374 + private PreparedStatement getSaveTtlStmt(DataType dataType) {
  375 + if (saveTtlStmts == null) {
  376 + saveTtlStmts = new PreparedStatement[DataType.values().length];
  377 + for (DataType type : DataType.values()) {
  378 + saveTtlStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF +
  379 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  380 + "," + ModelConstants.ENTITY_ID_COLUMN +
  381 + "," + ModelConstants.KEY_COLUMN +
  382 + "," + ModelConstants.PARTITION_COLUMN +
  383 + "," + ModelConstants.TS_COLUMN +
  384 + "," + getColumnName(type) + ")" +
  385 + " VALUES(?, ?, ?, ?, ?, ?) USING TTL ?");
  386 + }
  387 + }
  388 + return saveTtlStmts[dataType.ordinal()];
  389 + }
  390 +
368 391 private PreparedStatement getFetchStmt(Aggregation aggType) {
369 392 if (fetchStmts == null) {
370 393 fetchStmts = new PreparedStatement[Aggregation.values().length];
... ... @@ -418,6 +441,19 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
418 441 return partitionInsertStmt;
419 442 }
420 443
  444 + private PreparedStatement getPartitionInsertTtlStmt() {
  445 + if (partitionInsertTtlStmt == null) {
  446 + partitionInsertTtlStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF +
  447 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  448 + "," + ModelConstants.ENTITY_ID_COLUMN +
  449 + "," + ModelConstants.PARTITION_COLUMN +
  450 + "," + ModelConstants.KEY_COLUMN + ")" +
  451 + " VALUES(?, ?, ?, ?) USING TTL ?");
  452 + }
  453 + return partitionInsertTtlStmt;
  454 + }
  455 +
  456 +
421 457 private PreparedStatement getFindLatestStmt() {
422 458 if (findLatestStmt == null) {
423 459 findLatestStmt = getSession().prepare("SELECT " +
... ...
... ... @@ -87,29 +87,33 @@ public class BaseTimeseriesService implements TimeseriesService {
87 87 if (tsKvEntry == null) {
88 88 throw new IncorrectParameterException("Key value entry can't be null");
89 89 }
90   - UUID uid = entityId.getId();
91 90 long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
92 91
93 92 List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
94   - saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
  93 + saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
95 94 return Futures.allAsList(futures);
96 95 }
97 96
98 97 @Override
99 98 public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {
  99 + return save(entityId, tsKvEntries, 0L);
  100 + }
  101 +
  102 + @Override
  103 + public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
100 104 validate(entityId);
101 105 List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
102 106 for (TsKvEntry tsKvEntry : tsKvEntries) {
103 107 if (tsKvEntry == null) {
104 108 throw new IncorrectParameterException("Key value entry can't be null");
105 109 }
106   - UUID uid = entityId.getId();
107 110 long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
108   - saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
  111 + saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, ttl);
109 112 }
110 113 return Futures.allAsList(futures);
111 114 }
112 115
  116 +
113 117 @Override
114 118 public TsKvEntry convertResultToTsKvEntry(Row row) {
115 119 return timeseriesDao.convertResultToTsKvEntry(row);
... ... @@ -120,10 +124,10 @@ public class BaseTimeseriesService implements TimeseriesService {
120 124 return timeseriesDao.convertResultToTsKvEntryList(rs.all());
121 125 }
122 126
123   - private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs) {
124   - futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey()));
  127 + private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
  128 + futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
125 129 futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
126   - futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry));
  130 + futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
127 131 }
128 132
129 133 private static void validate(EntityId entityId) {
... ...
... ... @@ -23,9 +23,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
23 23 import org.thingsboard.server.common.data.kv.TsKvQuery;
24 24
25 25 import java.util.List;
26   -import java.util.Optional;
27   -import java.util.Set;
28   -import java.util.UUID;
29 26
30 27 /**
31 28 * @author Andrew Shvayka
... ... @@ -40,9 +37,9 @@ public interface TimeseriesDao {
40 37
41 38 ResultSetFuture findAllLatest(EntityId entityId);
42 39
43   - ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry);
  40 + ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
44 41
45   - ResultSetFuture savePartition(EntityId entityId, long partition, String key);
  42 + ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);
46 43
47 44 ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
48 45
... ...
... ... @@ -44,6 +44,8 @@ public interface TimeseriesService {
44 44
45 45 ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);
46 46
  47 + ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
  48 +
47 49 TsKvEntry convertResultToTsKvEntry(Row row);
48 50
49 51 List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
... ...
... ... @@ -79,7 +79,9 @@ public interface PluginContext {
79 79
80 80 void saveTsData(EntityId entityId, TsKvEntry entry, PluginCallback<Void> callback);
81 81
82   - void saveTsData(EntityId entityId, List<TsKvEntry> entry, PluginCallback<Void> callback);
  82 + void saveTsData(EntityId entityId, List<TsKvEntry> entries, PluginCallback<Void> callback);
  83 +
  84 + void saveTsData(EntityId deviceId, List<TsKvEntry> entries, long ttl, PluginCallback<Void> pluginCallback);
83 85
84 86 void loadTimeseries(EntityId entityId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
85 87
... ... @@ -106,4 +108,5 @@ public interface PluginContext {
106 108 void loadAttributes(EntityId entityId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
107 109
108 110 void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
  111 +
109 112 }
... ...
... ... @@ -23,9 +23,14 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
23 23 public class TelemetryUploadRequestRuleToPluginMsg extends AbstractRuleToPluginMsg<TelemetryUploadRequest> {
24 24
25 25 private static final long serialVersionUID = 1L;
  26 + private final long ttl;
26 27
27   - public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload) {
  28 + public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload, long ttl) {
28 29 super(tenantId, customerId, deviceId, payload);
  30 + this.ttl = ttl;
29 31 }
30 32
  33 + public long getTtl() {
  34 + return ttl;
  35 + }
31 36 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.extensions.core.action.telemetry;
17 17
  18 +import org.springframework.util.StringUtils;
18 19 import org.thingsboard.server.common.msg.core.GetAttributesRequest;
19 20 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
20 21 import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
... ... @@ -23,7 +24,6 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
23 24 import org.thingsboard.server.common.msg.session.MsgType;
24 25 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
25 26 import org.thingsboard.server.extensions.api.component.Action;
26   -import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
27 27 import org.thingsboard.server.extensions.api.plugins.PluginAction;
28 28 import org.thingsboard.server.extensions.api.plugins.msg.*;
29 29 import org.thingsboard.server.extensions.api.rules.RuleContext;
... ... @@ -31,11 +31,22 @@ import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
31 31 import org.thingsboard.server.extensions.api.rules.SimpleRuleLifecycleComponent;
32 32
33 33 import java.util.Optional;
  34 +import java.util.concurrent.TimeUnit;
34 35
35   -@Action(name = "Telemetry Plugin Action")
36   -public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<EmptyComponentConfiguration> {
  36 +@Action(name = "Telemetry Plugin Action", descriptor = "TelemetryPluginActionDescriptor.json", configuration = TelemetryPluginActionConfiguration.class)
  37 +public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<TelemetryPluginActionConfiguration> {
37 38
38   - public void init(EmptyComponentConfiguration configuration) {
  39 + protected TelemetryPluginActionConfiguration configuration;
  40 + protected long ttl;
  41 +
  42 + @Override
  43 + public void init(TelemetryPluginActionConfiguration configuration) {
  44 + this.configuration = configuration;
  45 + if (StringUtils.isEmpty(configuration.getTimeUnit()) || configuration.getTtlValue() == 0L) {
  46 + this.ttl = 0L;
  47 + } else {
  48 + this.ttl = TimeUnit.valueOf(configuration.getTimeUnit()).toSeconds(configuration.getTtlValue());
  49 + }
39 50 }
40 51
41 52 @Override
... ... @@ -44,7 +55,7 @@ public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implemen
44 55 if (msg.getMsgType() == MsgType.POST_TELEMETRY_REQUEST) {
45 56 TelemetryUploadRequest payload = (TelemetryUploadRequest) msg;
46 57 return Optional.of(new TelemetryUploadRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
47   - toDeviceActorMsg.getDeviceId(), payload));
  58 + toDeviceActorMsg.getDeviceId(), payload, ttl));
48 59 } else if (msg.getMsgType() == MsgType.POST_ATTRIBUTES_REQUEST) {
49 60 UpdateAttributesRequest payload = (UpdateAttributesRequest) msg;
50 61 return Optional.of(new UpdateAttributesRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.extensions.core.action.telemetry;
  17 +
  18 +import com.fasterxml.jackson.annotation.JsonInclude;
  19 +import lombok.Data;
  20 +
  21 +/**
  22 + * @author Andrew Shvayka
  23 + */
  24 +@Data
  25 +@JsonInclude(JsonInclude.Include.NON_NULL)
  26 +public class TelemetryPluginActionConfiguration {
  27 +
  28 + private String timeUnit;
  29 + private int ttlValue;
  30 +
  31 +}
... ...
... ... @@ -148,6 +148,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
148 148 String[] pathParams = request.getPathParams();
149 149 EntityId entityId;
150 150 String scope;
  151 + long ttl = 0L;
151 152 TelemetryFeature feature;
152 153 if (pathParams.length == 2) {
153 154 entityId = DeviceId.fromString(pathParams[0]);
... ... @@ -161,6 +162,11 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
161 162 entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
162 163 feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
163 164 scope = pathParams[3];
  165 + } else if (pathParams.length == 5) {
  166 + entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
  167 + feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
  168 + scope = pathParams[3];
  169 + ttl = Long.parseLong(pathParams[4]);
164 170 } else {
165 171 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
166 172 return;
... ... @@ -211,7 +217,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
211 217 entries.add(new BasicTsKvEntry(entry.getKey(), kv));
212 218 }
213 219 }
214   - ctx.saveTsData(entityId, entries, new PluginCallback<Void>() {
  220 + ctx.saveTsData(entityId, entries, ttl, new PluginCallback<Void>() {
215 221 @Override
216 222 public void onSuccess(PluginContext ctx, Void value) {
217 223 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
... ...
... ... @@ -92,7 +92,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
92 92 tsKvEntries.add(new BasicTsKvEntry(entry.getKey(), kv));
93 93 }
94 94 }
95   - ctx.saveTsData(msg.getDeviceId(), tsKvEntries, new PluginCallback<Void>() {
  95 + ctx.saveTsData(msg.getDeviceId(), tsKvEntries, msg.getTtl(), new PluginCallback<Void>() {
96 96 @Override
97 97 public void onSuccess(PluginContext ctx, Void data) {
98 98 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
... ...
  1 +{
  2 + "schema": {
  3 + "title": "Telemetry Plugin Action Configuration",
  4 + "type": "object",
  5 + "properties": {
  6 + "timeUnit": {
  7 + "title": "Time Unit",
  8 + "type": "string",
  9 + "default": "DAYS"
  10 + },
  11 + "ttlValue": {
  12 + "title": "TTL",
  13 + "type": "integer",
  14 + "default": 365,
  15 + "minimum": 0,
  16 + "maximum": 100000
  17 + }
  18 + },
  19 + "required": [
  20 + "timeUnit",
  21 + "ttlValue"
  22 + ]
  23 + },
  24 + "form": [
  25 + {
  26 + "key": "timeUnit",
  27 + "type": "rc-select",
  28 + "multiple": false,
  29 + "items": [
  30 + {
  31 + "value": "SECONDS",
  32 + "label": "Seconds"
  33 + },
  34 + {
  35 + "value": "MINUTES",
  36 + "label": "Minutes"
  37 + },
  38 + {
  39 + "value": "HOURS",
  40 + "label": "Hours"
  41 + },
  42 + {
  43 + "value": "DAYS",
  44 + "label": "Days"
  45 + }
  46 + ]
  47 + },
  48 + "ttlValue"
  49 + ]
  50 +}
\ No newline at end of file
... ...