Commit 0de5868bc5886e3c2acc2cc1c56dba8dd0fa9a7b

Authored by ShvaykaD
Committed by Andrew Shvayka
1 parent be84a0fc

added Caffeine cache for Cassandra ts partitions saving

@@ -223,8 +223,9 @@ cassandra: @@ -223,8 +223,9 @@ cassandra:
223 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}" 223 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
224 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}" 224 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
225 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 225 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
226 - # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS,INDEFINITE 226 + # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
227 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 227 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  228 + ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
228 ts_key_value_ttl: "${TS_KV_TTL:0}" 229 ts_key_value_ttl: "${TS_KV_TTL:0}"
229 events_ttl: "${TS_EVENTS_TTL:0}" 230 events_ttl: "${TS_EVENTS_TTL:0}"
230 # Specify TTL of debug log in seconds. The current value corresponds to one week 231 # Specify TTL of debug log in seconds. The current value corresponds to one week
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.FutureCallback; @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.FutureCallback;
29 import com.google.common.util.concurrent.Futures; 29 import com.google.common.util.concurrent.Futures;
30 import com.google.common.util.concurrent.ListenableFuture; 30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.MoreExecutors; 31 import com.google.common.util.concurrent.MoreExecutors;
  32 +import com.google.common.util.concurrent.SettableFuture;
32 import lombok.extern.slf4j.Slf4j; 33 import lombok.extern.slf4j.Slf4j;
33 import org.apache.commons.lang3.StringUtils; 34 import org.apache.commons.lang3.StringUtils;
34 import org.springframework.beans.factory.annotation.Autowired; 35 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,6 +67,7 @@ import java.util.Arrays; @@ -66,6 +67,7 @@ import java.util.Arrays;
66 import java.util.Collections; 67 import java.util.Collections;
67 import java.util.List; 68 import java.util.List;
68 import java.util.Optional; 69 import java.util.Optional;
  70 +import java.util.concurrent.CompletableFuture;
69 import java.util.concurrent.ExecutionException; 71 import java.util.concurrent.ExecutionException;
70 import java.util.stream.Collectors; 72 import java.util.stream.Collectors;
71 73
@@ -88,12 +90,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -88,12 +90,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
88 public static final String DESC_ORDER = "DESC"; 90 public static final String DESC_ORDER = "DESC";
89 private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L}); 91 private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
90 92
  93 + private CassandraTsPartitionsCache cassandraTsPartitionsCache;
  94 +
91 @Autowired 95 @Autowired
92 private Environment environment; 96 private Environment environment;
93 97
94 @Value("${cassandra.query.ts_key_value_partitioning}") 98 @Value("${cassandra.query.ts_key_value_partitioning}")
95 private String partitioning; 99 private String partitioning;
96 100
  101 + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size}")
  102 + private long partitionsCacheSize;
  103 +
97 @Value("${cassandra.query.ts_key_value_ttl}") 104 @Value("${cassandra.query.ts_key_value_ttl}")
98 private long systemTtl; 105 private long systemTtl;
99 106
@@ -126,6 +133,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -126,6 +133,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
126 Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning); 133 Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);
127 if (partition.isPresent()) { 134 if (partition.isPresent()) {
128 tsFormat = partition.get(); 135 tsFormat = partition.get();
  136 + if (!isFixedPartitioning() && partitionsCacheSize > 0) {
  137 + cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize);
  138 + }
129 } else { 139 } else {
130 log.warn("Incorrect configuration of partitioning {}", partitioning); 140 log.warn("Incorrect configuration of partitioning {}", partitioning);
131 throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); 141 throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
@@ -390,6 +400,42 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -390,6 +400,42 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
390 } 400 }
391 ttl = computeTtl(ttl); 401 ttl = computeTtl(ttl);
392 long partition = toPartitionTs(tsKvEntryTs); 402 long partition = toPartitionTs(tsKvEntryTs);
  403 + if (cassandraTsPartitionsCache == null) {
  404 + return doSavePartition(tenantId, entityId, key, ttl, partition);
  405 + } else {
  406 + CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
  407 + CompletableFuture<Boolean> hasFuture = cassandraTsPartitionsCache.has(partitionSearchKey);
  408 + SettableFuture<Boolean> listenableFuture = SettableFuture.create();
  409 + if (hasFuture == null) {
  410 + return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, ttl);
  411 + } else {
  412 + hasFuture.whenComplete((result, throwable) -> {
  413 + if (throwable != null) {
  414 + listenableFuture.setException(throwable);
  415 + } else {
  416 + listenableFuture.set(result);
  417 + }
  418 + });
  419 + long finalTtl = ttl;
  420 + return Futures.transformAsync(listenableFuture, result -> {
  421 + if (result) {
  422 + return Futures.immediateFuture(null);
  423 + } else {
  424 + return processDoSavePartition(tenantId, entityId, key, partition, partitionSearchKey, finalTtl);
  425 + }
  426 + }, readResultsProcessingExecutor);
  427 + }
  428 + }
  429 + }
  430 +
  431 + private ListenableFuture<Void> processDoSavePartition(TenantId tenantId, EntityId entityId, String key, long partition, CassandraPartitionCacheKey partitionSearchKey, long ttl) {
  432 + return Futures.transformAsync(doSavePartition(tenantId, entityId, key, ttl, partition), input -> {
  433 + cassandraTsPartitionsCache.put(partitionSearchKey);
  434 + return Futures.immediateFuture(input);
  435 + }, readResultsProcessingExecutor);
  436 + }
  437 +
  438 + private ListenableFuture<Void> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
393 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); 439 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
394 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); 440 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
395 stmt = stmt.setString(0, entityId.getEntityType().name()) 441 stmt = stmt.setString(0, entityId.getEntityType().name())
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +
  22 +@Data
  23 +@AllArgsConstructor
  24 +public class CassandraPartitionCacheKey {
  25 +
  26 + private EntityId entityId;
  27 + private String key;
  28 + private long partition;
  29 +
  30 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
  19 +import com.github.benmanes.caffeine.cache.Caffeine;
  20 +
  21 +import java.util.concurrent.CompletableFuture;
  22 +
  23 +public class CassandraTsPartitionsCache {
  24 +
  25 + private AsyncLoadingCache<CassandraPartitionCacheKey, Boolean> partitionsCache;
  26 +
  27 + public CassandraTsPartitionsCache(long maxCacheSize) {
  28 + this.partitionsCache = Caffeine.newBuilder()
  29 + .maximumSize(maxCacheSize)
  30 + .buildAsync(key -> {
  31 + throw new IllegalStateException("'get' methods calls are not supported!");
  32 + });
  33 + }
  34 +
  35 + public CompletableFuture<Boolean> has(CassandraPartitionCacheKey key) {
  36 + return partitionsCache.getIfPresent(key);
  37 + }
  38 +
  39 + public void put(CassandraPartitionCacheKey key) {
  40 + partitionsCache.put(key, CompletableFuture.completedFuture(true));
  41 + }
  42 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.BoundStatement;
  19 +import com.datastax.driver.core.Cluster;
  20 +import com.datastax.driver.core.CodecRegistry;
  21 +import com.datastax.driver.core.Configuration;
  22 +import com.datastax.driver.core.ConsistencyLevel;
  23 +import com.datastax.driver.core.PreparedStatement;
  24 +import com.datastax.driver.core.ResultSetFuture;
  25 +import com.datastax.driver.core.Session;
  26 +import com.datastax.driver.core.Statement;
  27 +import com.google.common.util.concurrent.Futures;
  28 +import org.junit.Before;
  29 +import org.junit.Test;
  30 +import org.junit.runner.RunWith;
  31 +import org.mockito.Mock;
  32 +import org.mockito.runners.MockitoJUnitRunner;
  33 +import org.springframework.core.env.Environment;
  34 +import org.springframework.test.util.ReflectionTestUtils;
  35 +import org.thingsboard.server.common.data.id.TenantId;
  36 +import org.thingsboard.server.dao.cassandra.CassandraCluster;
  37 +import org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao;
  38 +
  39 +import java.util.UUID;
  40 +
  41 +import static org.mockito.Matchers.any;
  42 +import static org.mockito.Matchers.anyInt;
  43 +import static org.mockito.Matchers.anyLong;
  44 +import static org.mockito.Matchers.anyString;
  45 +import static org.mockito.Mockito.doReturn;
  46 +import static org.mockito.Mockito.spy;
  47 +import static org.mockito.Mockito.times;
  48 +import static org.mockito.Mockito.verify;
  49 +import static org.mockito.Mockito.when;
  50 +
  51 +@RunWith(MockitoJUnitRunner.class)
  52 +public class CassandraPartitionsCacheTest {
  53 +
  54 + private CassandraBaseTimeseriesDao cassandraBaseTimeseriesDao;
  55 +
  56 + @Mock
  57 + private Environment environment;
  58 +
  59 + @Mock
  60 + private CassandraBufferedRateExecutor rateLimiter;
  61 +
  62 + @Mock
  63 + private CassandraCluster cluster;
  64 +
  65 + @Mock
  66 + private Session session;
  67 +
  68 + @Mock
  69 + private Cluster sessionCluster;
  70 +
  71 + @Mock
  72 + private Configuration configuration;
  73 +
  74 + @Mock
  75 + private PreparedStatement preparedStatement;
  76 +
  77 + @Mock
  78 + private BoundStatement boundStatement;
  79 +
  80 + @Before
  81 + public void setUp() {
  82 + when(cluster.getDefaultReadConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  83 + when(cluster.getDefaultWriteConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  84 + when(cluster.getSession()).thenReturn(session);
  85 + when(session.getCluster()).thenReturn(sessionCluster);
  86 + when(sessionCluster.getConfiguration()).thenReturn(configuration);
  87 + when(configuration.getCodecRegistry()).thenReturn(CodecRegistry.DEFAULT_INSTANCE);
  88 + when(session.prepare(anyString())).thenReturn(preparedStatement);
  89 + when(preparedStatement.bind()).thenReturn(boundStatement);
  90 + when(boundStatement.setString(anyInt(), anyString())).thenReturn(boundStatement);
  91 + when(boundStatement.setUUID(anyInt(), any(UUID.class))).thenReturn(boundStatement);
  92 + when(boundStatement.setLong(anyInt(), anyLong())).thenReturn(boundStatement);
  93 + when(boundStatement.setInt(anyInt(), anyInt())).thenReturn(boundStatement);
  94 +
  95 + cassandraBaseTimeseriesDao = spy(new CassandraBaseTimeseriesDao());
  96 +
  97 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitioning", "MONTHS");
  98 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitionsCacheSize", 100000);
  99 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "systemTtl", 0);
  100 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "setNullValuesEnabled", false);
  101 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "environment", environment);
  102 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "rateLimiter", rateLimiter);
  103 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
  104 +
  105 + doReturn(Futures.immediateFuture(null)).when(cassandraBaseTimeseriesDao).getFuture(any(ResultSetFuture.class), any());
  106 +
  107 + }
  108 +
  109 + @Test
  110 + public void testPartitionSave() throws Exception {
  111 +
  112 + cassandraBaseTimeseriesDao.init();
  113 +
  114 +
  115 + UUID id = UUID.randomUUID();
  116 + TenantId tenantId = new TenantId(id);
  117 + long tsKvEntryTs = System.currentTimeMillis();
  118 +
  119 + for (int i = 0; i < 50000; i++) {
  120 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  121 + }
  122 +
  123 + for (int i = 0; i < 60000; i++) {
  124 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  125 + }
  126 +
  127 + verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
  128 +
  129 +
  130 + }
  131 +
  132 +}
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000 @@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
46 46
47 cassandra.query.ts_key_value_partitioning=HOURS 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
  49 +cassandra.query.ts_key_value_partitions_max_cache_size=100000
  50 +
49 cassandra.query.ts_key_value_ttl=0 51 cassandra.query.ts_key_value_ttl=0
50 52
51 cassandra.query.debug_events_ttl=604800 53 cassandra.query.debug_events_ttl=604800