Commit 452090b9b7c3112d97ed0c0ffd34175031dabf79

Authored by Igor Kulikov
Committed by GitHub
2 parents 45756dc7 fb8ddbda

Merge pull request #3883 from ShvaykaD/feature/cassandra-partitions-cache-3.2

[3.2.x] Feature/cassandra partitions cache
... ... @@ -192,8 +192,9 @@ cassandra:
192 192 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
193 193 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
194 194 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
195   - # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
  195 + # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
196 196 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  197 + ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
197 198 ts_key_value_ttl: "${TS_KV_TTL:0}"
198 199 events_ttl: "${TS_EVENTS_TTL:0}"
199 200 # Specify TTL of debug log in seconds. The current value corresponds to one week
... ...
... ... @@ -79,12 +79,17 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
79 79
80 80 protected static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
81 81
  82 + private CassandraTsPartitionsCache cassandraTsPartitionsCache;
  83 +
82 84 @Autowired
83 85 private Environment environment;
84 86
85 87 @Value("${cassandra.query.ts_key_value_partitioning}")
86 88 private String partitioning;
87 89
  90 + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
  91 + private long partitionsCacheSize;
  92 +
88 93 @Value("${cassandra.query.ts_key_value_ttl}")
89 94 private long systemTtl;
90 95
... ... @@ -111,13 +116,16 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
111 116 super.startExecutor();
112 117 if (!isInstall()) {
113 118 getFetchStmt(Aggregation.NONE, DESC_ORDER);
114   - }
115   - Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);
116   - if (partition.isPresent()) {
117   - tsFormat = partition.get();
118   - } else {
119   - log.warn("Incorrect configuration of partitioning {}", partitioning);
120   - throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
  119 + Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);
  120 + if (partition.isPresent()) {
  121 + tsFormat = partition.get();
  122 + if (!isFixedPartitioning() && partitionsCacheSize > 0) {
  123 + cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize);
  124 + }
  125 + } else {
  126 + log.warn("Incorrect configuration of partitioning {}", partitioning);
  127 + throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
  128 + }
121 129 }
122 130 }
123 131
... ... @@ -175,17 +183,18 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
175 183 }
176 184 ttl = computeTtl(ttl);
177 185 long partition = toPartitionTs(tsKvEntryTs);
178   - log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
179   - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind());
180   - stmtBuilder.setString(0, entityId.getEntityType().name())
181   - .setUuid(1, entityId.getId())
182   - .setLong(2, partition)
183   - .setString(3, key);
184   - if (ttl > 0) {
185   - stmtBuilder.setInt(4, (int) ttl);
  186 + if (cassandraTsPartitionsCache == null) {
  187 + return doSavePartition(tenantId, entityId, key, ttl, partition);
  188 + } else {
  189 + CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
  190 + if (!cassandraTsPartitionsCache.has(partitionSearchKey)) {
  191 + ListenableFuture<Integer> result = doSavePartition(tenantId, entityId, key, ttl, partition);
  192 + Futures.addCallback(result, new CacheCallback<>(partitionSearchKey), MoreExecutors.directExecutor());
  193 + return result;
  194 + } else {
  195 + return Futures.immediateFuture(0);
  196 + }
186 197 }
187   - BoundStatement stmt = stmtBuilder.build();
188   - return getFuture(executeAsyncWrite(tenantId, stmt), rs -> 0);
189 198 }
190 199
191 200 @Override
... ... @@ -461,6 +470,38 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
461 470 return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
462 471 }
463 472
  473 + private ListenableFuture<Integer> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
  474 + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
  475 + PreparedStatement preparedStatement = ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt();
  476 + BoundStatement stmt = preparedStatement.bind();
  477 + stmt = stmt.setString(0, entityId.getEntityType().name())
  478 + .setUuid(1, entityId.getId())
  479 + .setLong(2, partition)
  480 + .setString(3, key);
  481 + if (ttl > 0) {
  482 + stmt = stmt.setInt(4, (int) ttl);
  483 + }
  484 + return getFuture(executeAsyncWrite(tenantId, stmt), rs -> 0);
  485 + }
  486 +
  487 + private class CacheCallback<Void> implements FutureCallback<Void> {
  488 + private final CassandraPartitionCacheKey key;
  489 +
  490 + private CacheCallback(CassandraPartitionCacheKey key) {
  491 + this.key = key;
  492 + }
  493 +
  494 + @Override
  495 + public void onSuccess(Void result) {
  496 + cassandraTsPartitionsCache.put(key);
  497 + }
  498 +
  499 + @Override
  500 + public void onFailure(Throwable t) {
  501 +
  502 + }
  503 + }
  504 +
464 505 private long computeTtl(long ttl) {
465 506 if (systemTtl > 0) {
466 507 if (ttl == 0) {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +
  22 +@Data
  23 +@AllArgsConstructor
  24 +public class CassandraPartitionCacheKey {
  25 +
  26 + private EntityId entityId;
  27 + private String key;
  28 + private long partition;
  29 +
  30 +}
\ No newline at end of file
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
  19 +import com.github.benmanes.caffeine.cache.Caffeine;
  20 +
  21 +import java.util.concurrent.CompletableFuture;
  22 +
  23 +public class CassandraTsPartitionsCache {
  24 +
  25 + private AsyncLoadingCache<CassandraPartitionCacheKey, Boolean> partitionsCache;
  26 +
  27 + public CassandraTsPartitionsCache(long maxCacheSize) {
  28 + this.partitionsCache = Caffeine.newBuilder()
  29 + .maximumSize(maxCacheSize)
  30 + .buildAsync(key -> {
  31 + throw new IllegalStateException("'get' methods calls are not supported!");
  32 + });
  33 + }
  34 +
  35 + public boolean has(CassandraPartitionCacheKey key) {
  36 + return partitionsCache.getIfPresent(key) != null;
  37 + }
  38 +
  39 + public void put(CassandraPartitionCacheKey key) {
  40 + partitionsCache.put(key, CompletableFuture.completedFuture(true));
  41 + }
  42 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.oss.driver.api.core.ConsistencyLevel;
  19 +import com.datastax.oss.driver.api.core.cql.BoundStatement;
  20 +import com.datastax.oss.driver.api.core.cql.PreparedStatement;
  21 +import com.datastax.oss.driver.api.core.cql.Statement;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import org.junit.Before;
  24 +import org.junit.Test;
  25 +import org.junit.runner.RunWith;
  26 +import org.mockito.Mock;
  27 +import org.mockito.Spy;
  28 +import org.mockito.runners.MockitoJUnitRunner;
  29 +import org.springframework.core.env.Environment;
  30 +import org.springframework.test.util.ReflectionTestUtils;
  31 +import org.thingsboard.server.common.data.id.TenantId;
  32 +import org.thingsboard.server.dao.cassandra.CassandraCluster;
  33 +import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
  34 +import org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao;
  35 +
  36 +import java.util.UUID;
  37 +
  38 +import static org.mockito.Matchers.any;
  39 +import static org.mockito.Matchers.anyInt;
  40 +import static org.mockito.Matchers.anyString;
  41 +import static org.mockito.Mockito.doReturn;
  42 +import static org.mockito.Mockito.times;
  43 +import static org.mockito.Mockito.verify;
  44 +import static org.mockito.Mockito.when;
  45 +
  46 +@RunWith(MockitoJUnitRunner.class)
  47 +public class CassandraPartitionsCacheTest {
  48 +
  49 + @Spy
  50 + private CassandraBaseTimeseriesDao cassandraBaseTimeseriesDao;
  51 +
  52 + @Mock
  53 + private PreparedStatement preparedStatement;
  54 +
  55 + @Mock
  56 + private BoundStatement boundStatement;
  57 +
  58 + @Mock
  59 + private Environment environment;
  60 +
  61 + @Mock
  62 + private CassandraBufferedRateExecutor rateLimiter;
  63 +
  64 + @Mock
  65 + private CassandraCluster cluster;
  66 +
  67 + @Mock
  68 + private GuavaSession session;
  69 +
  70 + @Before
  71 + public void setUp() throws Exception {
  72 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitioning", "MONTHS");
  73 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitionsCacheSize", 100000);
  74 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "systemTtl", 0);
  75 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "setNullValuesEnabled", false);
  76 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "environment", environment);
  77 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "rateLimiter", rateLimiter);
  78 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
  79 +
  80 + when(cluster.getDefaultReadConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  81 + when(cluster.getDefaultWriteConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  82 + when(cluster.getSession()).thenReturn(session);
  83 + when(session.prepare(anyString())).thenReturn(preparedStatement);
  84 +
  85 + when(preparedStatement.bind()).thenReturn(boundStatement);
  86 +
  87 + when(boundStatement.setString(anyInt(), anyString())).thenReturn(boundStatement);
  88 + when(boundStatement.setUuid(anyInt(), any(UUID.class))).thenReturn(boundStatement);
  89 + when(boundStatement.setLong(anyInt(), any(Long.class))).thenReturn(boundStatement);
  90 +
  91 + doReturn(Futures.immediateFuture(0)).when(cassandraBaseTimeseriesDao).getFuture(any(TbResultSetFuture.class), any());
  92 + }
  93 +
  94 + @Test
  95 + public void testPartitionSave() throws Exception {
  96 + cassandraBaseTimeseriesDao.init();
  97 +
  98 + UUID id = UUID.randomUUID();
  99 + TenantId tenantId = new TenantId(id);
  100 + long tsKvEntryTs = System.currentTimeMillis();
  101 +
  102 + for (int i = 0; i < 50000; i++) {
  103 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  104 + }
  105 + for (int i = 0; i < 60000; i++) {
  106 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  107 + }
  108 + verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
  109 + }
  110 +}
\ No newline at end of file
... ...
... ... @@ -54,6 +54,8 @@ cassandra.query.default_fetch_size=2000
54 54
55 55 cassandra.query.ts_key_value_partitioning=HOURS
56 56
  57 +cassandra.query.ts_key_value_partitions_max_cache_size=100000
  58 +
57 59 cassandra.query.ts_key_value_ttl=0
58 60
59 61 cassandra.query.debug_events_ttl=604800
... ...