Commit 0c98ec78f0c3e5925a58c8dd0970208c21f73cfd

Authored by Igor Khanenko
1 parent 5fdbb51c

Timeseries and Attributes DAO

  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.attributes;
  17 +
  18 +import com.datastax.driver.core.ResultSetFuture;
  19 +import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  21 +
  22 +import java.util.List;
  23 +import java.util.UUID;
  24 +
  25 +/**
  26 + * @author Andrew Shvayka
  27 + */
  28 +public interface AttributesDao {
  29 +
  30 + AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey);
  31 +
  32 + List<AttributeKvEntry> findAll(EntityId entityId, String attributeType);
  33 +
  34 + ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
  35 +
  36 + void removeAll(EntityId entityId, String scope, List<String> keys);
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.attributes;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.id.EntityId;
  23 +import org.thingsboard.server.common.data.id.UUIDBased;
  24 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  25 +
  26 +import java.util.List;
  27 +
  28 +/**
  29 + * @author Andrew Shvayka
  30 + */
  31 +public interface AttributesService {
  32 +
  33 + AttributeKvEntry find(EntityId entityId, String scope, String attributeKey);
  34 +
  35 + List<AttributeKvEntry> findAll(EntityId entityId, String scope);
  36 +
  37 + ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
  38 +
  39 + void removeAll(EntityId entityId, String scope, List<String> attributeKeys);
  40 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.attributes;
  17 +
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.querybuilder.QueryBuilder;
  20 +import com.datastax.driver.core.querybuilder.Select;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.stereotype.Component;
  23 +import org.thingsboard.server.common.data.id.EntityId;
  24 +import org.thingsboard.server.common.data.kv.DataType;
  25 +import org.thingsboard.server.dao.AbstractDao;
  26 +import org.thingsboard.server.dao.model.ModelConstants;
  27 +import org.slf4j.Logger;
  28 +import org.slf4j.LoggerFactory;
  29 +import org.thingsboard.server.common.data.kv.*;
  30 +import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao;
  31 +
  32 +import java.util.ArrayList;
  33 +import java.util.List;
  34 +
  35 +import static org.thingsboard.server.dao.model.ModelConstants.*;
  36 +import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
  37 +
  38 +/**
  39 + * @author Andrew Shvayka
  40 + */
  41 +@Component
  42 +@Slf4j
  43 +public class BaseAttributesDao extends AbstractDao implements AttributesDao {
  44 +
  45 + private PreparedStatement saveStmt;
  46 +
  47 + @Override
  48 + public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) {
  49 + Select.Where select = select().from(ATTRIBUTES_KV_CF)
  50 + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
  51 + .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
  52 + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
  53 + .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
  54 + log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
  55 + return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one());
  56 + }
  57 +
  58 + @Override
  59 + public List<AttributeKvEntry> findAll(EntityId entityId, String attributeType) {
  60 + Select.Where select = select().from(ATTRIBUTES_KV_CF)
  61 + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
  62 + .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
  63 + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType));
  64 + log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType);
  65 + return convertResultToAttributesKvEntryList(executeRead(select));
  66 + }
  67 +
  68 + @Override
  69 + public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
  70 + BoundStatement stmt = getSaveStmt().bind();
  71 + stmt.setString(0, entityId.getEntityType().name());
  72 + stmt.setUUID(1, entityId.getId());
  73 + stmt.setString(2, attributeType);
  74 + stmt.setString(3, attribute.getKey());
  75 + stmt.setLong(4, attribute.getLastUpdateTs());
  76 + stmt.setString(5, attribute.getStrValue().orElse(null));
  77 + if (attribute.getBooleanValue().isPresent()) {
  78 + stmt.setBool(6, attribute.getBooleanValue().get());
  79 + } else {
  80 + stmt.setToNull(6);
  81 + }
  82 + if (attribute.getLongValue().isPresent()) {
  83 + stmt.setLong(7, attribute.getLongValue().get());
  84 + } else {
  85 + stmt.setToNull(7);
  86 + }
  87 + if (attribute.getDoubleValue().isPresent()) {
  88 + stmt.setDouble(8, attribute.getDoubleValue().get());
  89 + } else {
  90 + stmt.setToNull(8);
  91 + }
  92 + return executeAsyncWrite(stmt);
  93 + }
  94 +
  95 + @Override
  96 + public void removeAll(EntityId entityId, String attributeType, List<String> keys) {
  97 + for (String key : keys) {
  98 + delete(entityId, attributeType, key);
  99 + }
  100 + }
  101 +
  102 + private void delete(EntityId entityId, String attributeType, String key) {
  103 + Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
  104 + .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
  105 + .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
  106 + .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
  107 + .and(eq(ATTRIBUTE_KEY_COLUMN, key));
  108 + log.debug("Remove request: {}", delete.toString());
  109 + getSession().execute(delete);
  110 + }
  111 +
  112 + private PreparedStatement getSaveStmt() {
  113 + if (saveStmt == null) {
  114 + saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
  115 + "(" + ENTITY_TYPE_COLUMN +
  116 + "," + ENTITY_ID_COLUMN +
  117 + "," + ATTRIBUTE_TYPE_COLUMN +
  118 + "," + ATTRIBUTE_KEY_COLUMN +
  119 + "," + LAST_UPDATE_TS_COLUMN +
  120 + "," + ModelConstants.STRING_VALUE_COLUMN +
  121 + "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
  122 + "," + ModelConstants.LONG_VALUE_COLUMN +
  123 + "," + ModelConstants.DOUBLE_VALUE_COLUMN +
  124 + ")" +
  125 + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)");
  126 + }
  127 + return saveStmt;
  128 + }
  129 +
  130 + private AttributeKvEntry convertResultToAttributesKvEntry(String key, Row row) {
  131 + AttributeKvEntry attributeEntry = null;
  132 + if (row != null) {
  133 + long lastUpdateTs = row.get(LAST_UPDATE_TS_COLUMN, Long.class);
  134 + attributeEntry = new BaseAttributeKvEntry(BaseTimeseriesDao.toKvEntry(row, key), lastUpdateTs);
  135 + }
  136 + return attributeEntry;
  137 + }
  138 +
  139 + private List<AttributeKvEntry> convertResultToAttributesKvEntryList(ResultSet resultSet) {
  140 + List<Row> rows = resultSet.all();
  141 + List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
  142 + if (!rows.isEmpty()) {
  143 + rows.stream().forEach(row -> {
  144 + String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
  145 + AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
  146 + if (kvEntry != null) {
  147 + entries.add(kvEntry);
  148 + }
  149 + });
  150 + }
  151 + return entries;
  152 + }
  153 +
  154 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.attributes;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.google.common.collect.Lists;
  21 +import com.google.common.util.concurrent.Futures;
  22 +import com.google.common.util.concurrent.ListenableFuture;
  23 +import org.thingsboard.server.common.data.id.EntityId;
  24 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  25 +import org.thingsboard.server.dao.exception.IncorrectParameterException;
  26 +import org.springframework.beans.factory.annotation.Autowired;
  27 +import org.springframework.stereotype.Service;
  28 +import org.thingsboard.server.dao.service.Validator;
  29 +
  30 +import java.util.List;
  31 +
  32 +/**
  33 + * @author Andrew Shvayka
  34 + */
  35 +@Service
  36 +public class BaseAttributesService implements AttributesService {
  37 +
  38 + @Autowired
  39 + private AttributesDao attributesDao;
  40 +
  41 + @Override
  42 + public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) {
  43 + validate(entityId, scope);
  44 + Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
  45 + return attributesDao.find(entityId, scope, attributeKey);
  46 + }
  47 +
  48 + @Override
  49 + public List<AttributeKvEntry> findAll(EntityId entityId, String scope) {
  50 + validate(entityId, scope);
  51 + return attributesDao.findAll(entityId, scope);
  52 + }
  53 +
  54 + @Override
  55 + public ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  56 + validate(entityId, scope);
  57 + attributes.forEach(attribute -> validate(attribute));
  58 + List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
  59 + for(AttributeKvEntry attribute : attributes) {
  60 + futures.add(attributesDao.save(entityId, scope, attribute));
  61 + }
  62 + return Futures.allAsList(futures);
  63 + }
  64 +
  65 + @Override
  66 + public void removeAll(EntityId entityId, String scope, List<String> keys) {
  67 + validate(entityId, scope);
  68 + attributesDao.removeAll(entityId, scope, keys);
  69 + }
  70 +
  71 + private static void validate(EntityId id, String scope) {
  72 + Validator.validateId(id.getId(), "Incorrect id " + id);
  73 + Validator.validateString(scope, "Incorrect scope " + scope);
  74 + }
  75 +
  76 + private static void validate(AttributeKvEntry kvEntry) {
  77 + if (kvEntry == null) {
  78 + throw new IncorrectParameterException("Key value entry can't be null");
  79 + } else if (kvEntry.getDataType() == null) {
  80 + throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
  81 + } else {
  82 + Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
  83 + Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive");
  84 + }
  85 + }
  86 +
  87 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.querybuilder.QueryBuilder;
  20 +import com.datastax.driver.core.querybuilder.Select;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.beans.factory.annotation.Value;
  23 +import org.springframework.stereotype.Component;
  24 +import org.thingsboard.server.common.data.kv.*;
  25 +import org.thingsboard.server.common.data.kv.DataType;
  26 +import org.thingsboard.server.dao.AbstractDao;
  27 +import org.thingsboard.server.dao.model.ModelConstants;
  28 +
  29 +import java.util.*;
  30 +
  31 +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
  32 +import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
  33 +
  34 +/**
  35 + * @author Andrew Shvayka
  36 + */
  37 +@Component
  38 +@Slf4j
  39 +public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
  40 +
  41 + @Value("${cassandra.query.max_limit_per_request}")
  42 + protected Integer maxLimitPerRequest;
  43 +
  44 + private PreparedStatement partitionInsertStmt;
  45 + private PreparedStatement[] latestInsertStmts;
  46 + private PreparedStatement[] saveStmts;
  47 + private PreparedStatement findLatestStmt;
  48 + private PreparedStatement findAllLatestStmt;
  49 +
  50 + @Override
  51 + public List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition) {
  52 + List<Row> rows = Collections.emptyList();
  53 + Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
  54 + int partsLength = parts.length;
  55 + if (parts != null && partsLength > 0) {
  56 + int limit = maxLimitPerRequest;
  57 + Optional<Integer> lim = query.getLimit();
  58 + if (lim.isPresent() && lim.get() < maxLimitPerRequest) {
  59 + limit = lim.get();
  60 + }
  61 +
  62 + rows = new ArrayList<>(limit);
  63 + int lastIdx = partsLength - 1;
  64 + for (int i = 0; i < partsLength; i++) {
  65 + int currentLimit;
  66 + if (rows.size() >= limit) {
  67 + break;
  68 + } else {
  69 + currentLimit = limit - rows.size();
  70 + }
  71 + Long partition = parts[i];
  72 + Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
  73 + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId))
  74 + .and(eq(ModelConstants.KEY_COLUMN, query.getKey()))
  75 + .and(eq(ModelConstants.PARTITION_COLUMN, partition));
  76 + if (i == 0 && query.getStartTs().isPresent()) {
  77 + where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get()));
  78 + } else if (i == lastIdx && query.getEndTs().isPresent()) {
  79 + where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get()));
  80 + }
  81 + where.limit(currentLimit);
  82 + rows.addAll(executeRead(where).all());
  83 + }
  84 + }
  85 + return convertResultToTsKvEntryList(rows);
  86 + }
  87 +
  88 + @Override
  89 + public ResultSetFuture findLatest(String entityType, UUID entityId, String key) {
  90 + BoundStatement stmt = getFindLatestStmt().bind();
  91 + stmt.setString(0, entityType);
  92 + stmt.setUUID(1, entityId);
  93 + stmt.setString(2, key);
  94 + log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
  95 + return executeAsyncRead(stmt);
  96 + }
  97 +
  98 + @Override
  99 + public ResultSetFuture findAllLatest(String entityType, UUID entityId) {
  100 + BoundStatement stmt = getFindAllLatestStmt().bind();
  101 + stmt.setString(0, entityType);
  102 + stmt.setUUID(1, entityId);
  103 + log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
  104 + return executeAsyncRead(stmt);
  105 + }
  106 +
  107 + @Override
  108 + public ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry) {
  109 + DataType type = tsKvEntry.getDataType();
  110 + BoundStatement stmt = getSaveStmt(type).bind()
  111 + .setString(0, entityType)
  112 + .setUUID(1, entityId)
  113 + .setString(2, tsKvEntry.getKey())
  114 + .setLong(3, partition)
  115 + .setLong(4, tsKvEntry.getTs());
  116 + addValue(tsKvEntry, stmt, 5);
  117 + return executeAsyncWrite(stmt);
  118 + }
  119 +
  120 + @Override
  121 + public ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry) {
  122 + DataType type = tsKvEntry.getDataType();
  123 + BoundStatement stmt = getLatestStmt(type).bind()
  124 + .setString(0, entityType)
  125 + .setUUID(1, entityId)
  126 + .setString(2, tsKvEntry.getKey())
  127 + .setLong(3, tsKvEntry.getTs());
  128 + addValue(tsKvEntry, stmt, 4);
  129 + return executeAsyncWrite(stmt);
  130 + }
  131 +
  132 + @Override
  133 + public ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key) {
  134 + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityType, entityId, key);
  135 + return executeAsyncWrite(getPartitionInsertStmt().bind()
  136 + .setString(0, entityType)
  137 + .setUUID(1, entityId)
  138 + .setLong(2, partition)
  139 + .setString(3, key));
  140 + }
  141 +
  142 + @Override
  143 + public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
  144 + List<TsKvEntry> entries = new ArrayList<>(rows.size());
  145 + if (!rows.isEmpty()) {
  146 + rows.stream().forEach(row -> {
  147 + TsKvEntry kvEntry = convertResultToTsKvEntry(row);
  148 + if (kvEntry != null) {
  149 + entries.add(kvEntry);
  150 + }
  151 + });
  152 + }
  153 + return entries;
  154 + }
  155 +
  156 + @Override
  157 + public TsKvEntry convertResultToTsKvEntry(Row row) {
  158 + String key = row.getString(ModelConstants.KEY_COLUMN);
  159 + long ts = row.getLong(ModelConstants.TS_COLUMN);
  160 + return new BasicTsKvEntry(ts, toKvEntry(row, key));
  161 + }
  162 +
  163 + public static KvEntry toKvEntry(Row row, String key) {
  164 + KvEntry kvEntry = null;
  165 + String strV = row.get(ModelConstants.STRING_VALUE_COLUMN, String.class);
  166 + if (strV != null) {
  167 + kvEntry = new StringDataEntry(key, strV);
  168 + } else {
  169 + Long longV = row.get(ModelConstants.LONG_VALUE_COLUMN, Long.class);
  170 + if (longV != null) {
  171 + kvEntry = new LongDataEntry(key, longV);
  172 + } else {
  173 + Double doubleV = row.get(ModelConstants.DOUBLE_VALUE_COLUMN, Double.class);
  174 + if (doubleV != null) {
  175 + kvEntry = new DoubleDataEntry(key, doubleV);
  176 + } else {
  177 + Boolean boolV = row.get(ModelConstants.BOOLEAN_VALUE_COLUMN, Boolean.class);
  178 + if (boolV != null) {
  179 + kvEntry = new BooleanDataEntry(key, boolV);
  180 + } else {
  181 + log.warn("All values in key-value row are nullable ");
  182 + }
  183 + }
  184 + }
  185 + }
  186 + return kvEntry;
  187 + }
  188 +
  189 + /**
  190 + * Select existing partitions from the table
  191 + * <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
  192 + */
  193 + private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional<Long> minPartition, Optional<Long> maxPartition) {
  194 + Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
  195 + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key));
  196 + minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get())));
  197 + maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get())));
  198 + ResultSet resultSet = executeRead(select);
  199 + return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new);
  200 + }
  201 +
  202 + private PreparedStatement getSaveStmt(DataType dataType) {
  203 + if (saveStmts == null) {
  204 + saveStmts = new PreparedStatement[DataType.values().length];
  205 + for (DataType type : DataType.values()) {
  206 + saveStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF +
  207 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  208 + "," + ModelConstants.ENTITY_ID_COLUMN +
  209 + "," + ModelConstants.KEY_COLUMN +
  210 + "," + ModelConstants.PARTITION_COLUMN +
  211 + "," + ModelConstants.TS_COLUMN +
  212 + "," + getColumnName(type) + ")" +
  213 + " VALUES(?, ?, ?, ?, ?, ?)");
  214 + }
  215 + }
  216 + return saveStmts[dataType.ordinal()];
  217 + }
  218 +
  219 + private PreparedStatement getLatestStmt(DataType dataType) {
  220 + if (latestInsertStmts == null) {
  221 + latestInsertStmts = new PreparedStatement[DataType.values().length];
  222 + for (DataType type : DataType.values()) {
  223 + latestInsertStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_LATEST_CF +
  224 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  225 + "," + ModelConstants.ENTITY_ID_COLUMN +
  226 + "," + ModelConstants.KEY_COLUMN +
  227 + "," + ModelConstants.TS_COLUMN +
  228 + "," + getColumnName(type) + ")" +
  229 + " VALUES(?, ?, ?, ?, ?)");
  230 + }
  231 + }
  232 + return latestInsertStmts[dataType.ordinal()];
  233 + }
  234 +
  235 +
  236 + private PreparedStatement getPartitionInsertStmt() {
  237 + if (partitionInsertStmt == null) {
  238 + partitionInsertStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF +
  239 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  240 + "," + ModelConstants.ENTITY_ID_COLUMN +
  241 + "," + ModelConstants.PARTITION_COLUMN +
  242 + "," + ModelConstants.KEY_COLUMN + ")" +
  243 + " VALUES(?, ?, ?, ?)");
  244 + }
  245 + return partitionInsertStmt;
  246 + }
  247 +
  248 + private PreparedStatement getFindLatestStmt() {
  249 + if (findLatestStmt == null) {
  250 + findLatestStmt = getSession().prepare("SELECT " +
  251 + ModelConstants.KEY_COLUMN + "," +
  252 + ModelConstants.TS_COLUMN + "," +
  253 + ModelConstants.STRING_VALUE_COLUMN + "," +
  254 + ModelConstants.BOOLEAN_VALUE_COLUMN + "," +
  255 + ModelConstants.LONG_VALUE_COLUMN + "," +
  256 + ModelConstants.DOUBLE_VALUE_COLUMN + " " +
  257 + "FROM " + ModelConstants.TS_KV_LATEST_CF + " " +
  258 + "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " +
  259 + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " +
  260 + "AND " + ModelConstants.KEY_COLUMN + " = ? ");
  261 + }
  262 + return findLatestStmt;
  263 + }
  264 +
  265 + private PreparedStatement getFindAllLatestStmt() {
  266 + if (findAllLatestStmt == null) {
  267 + findAllLatestStmt = getSession().prepare("SELECT " +
  268 + ModelConstants.KEY_COLUMN + "," +
  269 + ModelConstants.TS_COLUMN + "," +
  270 + ModelConstants.STRING_VALUE_COLUMN + "," +
  271 + ModelConstants.BOOLEAN_VALUE_COLUMN + "," +
  272 + ModelConstants.LONG_VALUE_COLUMN + "," +
  273 + ModelConstants.DOUBLE_VALUE_COLUMN + " " +
  274 + "FROM " + ModelConstants.TS_KV_LATEST_CF + " " +
  275 + "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " +
  276 + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? ");
  277 + }
  278 + return findAllLatestStmt;
  279 + }
  280 +
  281 + public static String getColumnName(DataType type) {
  282 + switch (type) {
  283 + case BOOLEAN:
  284 + return ModelConstants.BOOLEAN_VALUE_COLUMN;
  285 + case STRING:
  286 + return ModelConstants.STRING_VALUE_COLUMN;
  287 + case LONG:
  288 + return ModelConstants.LONG_VALUE_COLUMN;
  289 + case DOUBLE:
  290 + return ModelConstants.DOUBLE_VALUE_COLUMN;
  291 + default:
  292 + throw new RuntimeException("Not implemented!");
  293 + }
  294 + }
  295 +
  296 + public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
  297 + switch (kvEntry.getDataType()) {
  298 + case BOOLEAN:
  299 + stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue());
  300 + break;
  301 + case STRING:
  302 + stmt.setString(column, kvEntry.getStrValue().get());
  303 + break;
  304 + case LONG:
  305 + stmt.setLong(column, kvEntry.getLongValue().get().longValue());
  306 + break;
  307 + case DOUBLE:
  308 + stmt.setDouble(column, kvEntry.getDoubleValue().get().doubleValue());
  309 + break;
  310 + }
  311 + }
  312 +
  313 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.datastax.driver.core.Row;
  21 +import com.google.common.collect.Lists;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import com.google.common.util.concurrent.ListenableFuture;
  24 +import lombok.extern.slf4j.Slf4j;
  25 +import org.thingsboard.server.common.data.id.UUIDBased;
  26 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  27 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  28 +import org.thingsboard.server.dao.exception.IncorrectParameterException;
  29 +import org.slf4j.Logger;
  30 +import org.slf4j.LoggerFactory;
  31 +import org.springframework.beans.factory.annotation.Autowired;
  32 +import org.springframework.beans.factory.annotation.Value;
  33 +import org.springframework.stereotype.Service;
  34 +import org.thingsboard.server.dao.service.Validator;
  35 +
  36 +import javax.annotation.PostConstruct;
  37 +import java.time.Instant;
  38 +import java.time.LocalDateTime;
  39 +import java.time.ZoneOffset;
  40 +import java.util.*;
  41 +
  42 +import static org.apache.commons.lang3.StringUtils.isBlank;
  43 +
  44 +/**
  45 + * @author Andrew Shvayka
  46 + */
  47 +@Service
  48 +@Slf4j
  49 +public class BaseTimeseriesService implements TimeseriesService {
  50 +
  51 + public static final int INSERTS_PER_ENTRY = 3;
  52 +
  53 + @Value("${cassandra.query.ts_key_value_partitioning}")
  54 + private String partitioning;
  55 +
  56 + @Autowired
  57 + private TimeseriesDao timeseriesDao;
  58 +
  59 + private TsPartitionDate tsFormat;
  60 +
  61 + @PostConstruct
  62 + public void init() {
  63 + Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
  64 + if (partition.isPresent()) {
  65 + tsFormat = partition.get();
  66 + } else {
  67 + log.warn("Incorrect configuration of partitioning {}", partitioning);
  68 + throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
  69 + }
  70 + }
  71 +
  72 + @Override
  73 + public List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query) {
  74 + validate(entityType, entityId);
  75 + validate(query);
  76 + return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()));
  77 + }
  78 +
  79 + private Optional<Long> toPartitionTs(Optional<Long> ts) {
  80 + if (ts.isPresent()) {
  81 + return Optional.of(toPartitionTs(ts.get()));
  82 + } else {
  83 + return Optional.empty();
  84 + }
  85 + }
  86 +
  87 + @Override
  88 + public ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys) {
  89 + validate(entityType, entityId);
  90 + List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(keys.size());
  91 + keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
  92 + keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityType, entityId.getId(), key)));
  93 + return Futures.allAsList(futures);
  94 + }
  95 +
  96 + @Override
  97 + public ResultSetFuture findAllLatest(String entityType, UUIDBased entityId) {
  98 + validate(entityType, entityId);
  99 + return timeseriesDao.findAllLatest(entityType, entityId.getId());
  100 + }
  101 +
  102 + @Override
  103 + public ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry) {
  104 + validate(entityType, entityId);
  105 + if (tsKvEntry == null) {
  106 + throw new IncorrectParameterException("Key value entry can't be null");
  107 + }
  108 + UUID uid = entityId.getId();
  109 + long partitionTs = toPartitionTs(tsKvEntry.getTs());
  110 +
  111 + List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
  112 + saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
  113 + return Futures.allAsList(futures);
  114 + }
  115 +
  116 + @Override
  117 + public ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, List<TsKvEntry> tsKvEntries) {
  118 + validate(entityType, entityId);
  119 + List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
  120 + for (TsKvEntry tsKvEntry : tsKvEntries) {
  121 + if (tsKvEntry == null) {
  122 + throw new IncorrectParameterException("Key value entry can't be null");
  123 + }
  124 + UUID uid = entityId.getId();
  125 + long partitionTs = toPartitionTs(tsKvEntry.getTs());
  126 + saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
  127 + }
  128 + return Futures.allAsList(futures);
  129 + }
  130 +
  131 + @Override
  132 + public TsKvEntry convertResultToTsKvEntry(Row row) {
  133 + return timeseriesDao.convertResultToTsKvEntry(row);
  134 + }
  135 +
  136 + @Override
  137 + public List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs) {
  138 + return timeseriesDao.convertResultToTsKvEntryList(rs.all());
  139 + }
  140 +
  141 + private void saveAndRegisterFutures(List<ResultSetFuture> futures, String entityType, TsKvEntry tsKvEntry, UUID uid, long partitionTs) {
  142 + futures.add(timeseriesDao.savePartition(entityType, uid, partitionTs, tsKvEntry.getKey()));
  143 + futures.add(timeseriesDao.saveLatest(entityType, uid, tsKvEntry));
  144 + futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry));
  145 + }
  146 +
  147 + private long toPartitionTs(long ts) {
  148 + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
  149 +
  150 + LocalDateTime parititonTime = tsFormat.truncatedTo(time);
  151 +
  152 + return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli();
  153 + }
  154 +
  155 + private static void validate(String entityType, UUIDBased entityId) {
  156 + Validator.validateString(entityType, "Incorrect entityType " + entityType);
  157 + Validator.validateId(entityId, "Incorrect entityId " + entityId);
  158 + }
  159 +
  160 + private static void validate(TsKvQuery query) {
  161 + if (query == null) {
  162 + throw new IncorrectParameterException("TsKvQuery can't be null");
  163 + } else if (isBlank(query.getKey())) {
  164 + throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
  165 + }
  166 + }
  167 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.datastax.driver.core.ResultSetFuture;
  19 +import com.datastax.driver.core.Row;
  20 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  21 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  22 +
  23 +import java.util.List;
  24 +import java.util.Optional;
  25 +import java.util.Set;
  26 +import java.util.UUID;
  27 +
  28 +/**
  29 + * @author Andrew Shvayka
  30 + */
  31 +public interface TimeseriesDao {
  32 +
  33 + List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
  34 +
  35 + ResultSetFuture findLatest(String entityType, UUID entityId, String key);
  36 +
  37 + ResultSetFuture findAllLatest(String entityType, UUID entityId);
  38 +
  39 + ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry);
  40 +
  41 + ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key);
  42 +
  43 + ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry);
  44 +
  45 + TsKvEntry convertResultToTsKvEntry(Row row);
  46 +
  47 + List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows);
  48 +
  49 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.datastax.driver.core.Row;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import org.thingsboard.server.common.data.id.UUIDBased;
  23 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  24 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  25 +
  26 +import java.util.Collection;
  27 +import java.util.List;
  28 +import java.util.Set;
  29 +
  30 +/**
  31 + * @author Andrew Shvayka
  32 + */
  33 +public interface TimeseriesService {
  34 +
  35 + //TODO: Replace this with async operation
  36 + List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query);
  37 +
  38 + ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
  39 +
  40 + ResultSetFuture findAllLatest(String entityType, UUIDBased entityId);
  41 +
  42 + ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry);
  43 +
  44 + ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, List<TsKvEntry> tsKvEntry);
  45 +
  46 + TsKvEntry convertResultToTsKvEntry(Row row);
  47 +
  48 + List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
  49 +
  50 +}
... ...
  1 +/**
  2 + * Copyright © 2016 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import java.time.LocalDateTime;
  19 +import java.time.temporal.ChronoUnit;
  20 +import java.time.temporal.TemporalUnit;
  21 +import java.util.Optional;
  22 +
  23 +public enum TsPartitionDate {
  24 +
  25 + MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS);
  26 +
  27 + private final String pattern;
  28 + private final TemporalUnit truncateUnit;
  29 +
  30 + TsPartitionDate(String pattern, TemporalUnit truncateUnit) {
  31 + this.pattern = pattern;
  32 + this.truncateUnit = truncateUnit;
  33 + }
  34 +
  35 + public String getPattern() {
  36 + return pattern;
  37 + }
  38 +
  39 + public TemporalUnit getTruncateUnit() {
  40 + return truncateUnit;
  41 + }
  42 +
  43 + public LocalDateTime truncatedTo(LocalDateTime time) {
  44 + switch (this){
  45 + case MONTHS:
  46 + return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
  47 + case YEARS:
  48 + return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
  49 + default:
  50 + return time.truncatedTo(truncateUnit);
  51 + }
  52 + }
  53 +
  54 + public static Optional<TsPartitionDate> parse(String name) {
  55 + TsPartitionDate partition = null;
  56 + if (name != null) {
  57 + for (TsPartitionDate partitionDate : TsPartitionDate.values()) {
  58 + if (partitionDate.name().equalsIgnoreCase(name)) {
  59 + partition = partitionDate;
  60 + break;
  61 + }
  62 + }
  63 + }
  64 + return Optional.of(partition);
  65 + }
  66 +}
... ...