Commit e516cd31dc411acf84b38c22211120b84ee97aef

Authored by ShvaykaD
1 parent 6bfc1b89

added Caffeine cache for Cassandra ts partitions saving

@@ -192,8 +192,9 @@ cassandra: @@ -192,8 +192,9 @@ cassandra:
192 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}" 192 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
193 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}" 193 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
194 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 194 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
195 - # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS,INDEFINITE 195 + # Specify partitioning size for timestamp key-value storage. Example: MINUTES, HOURS, DAYS, MONTHS, INDEFINITE
196 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 196 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  197 + ts_key_value_partitions_max_cache_size: "${TS_KV_PARTITIONS_MAX_CACHE_SIZE:100000}"
197 ts_key_value_ttl: "${TS_KV_TTL:0}" 198 ts_key_value_ttl: "${TS_KV_TTL:0}"
198 events_ttl: "${TS_EVENTS_TTL:0}" 199 events_ttl: "${TS_EVENTS_TTL:0}"
199 # Specify TTL of debug log in seconds. The current value corresponds to one week 200 # Specify TTL of debug log in seconds. The current value corresponds to one week
@@ -79,12 +79,17 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -79,12 +79,17 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
79 79
80 protected static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L}); 80 protected static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
81 81
  82 + private CassandraTsPartitionsCache cassandraTsPartitionsCache;
  83 +
82 @Autowired 84 @Autowired
83 private Environment environment; 85 private Environment environment;
84 86
85 @Value("${cassandra.query.ts_key_value_partitioning}") 87 @Value("${cassandra.query.ts_key_value_partitioning}")
86 private String partitioning; 88 private String partitioning;
87 89
  90 + @Value("${cassandra.query.ts_key_value_partitions_max_cache_size:100000}")
  91 + private long partitionsCacheSize;
  92 +
88 @Value("${cassandra.query.ts_key_value_ttl}") 93 @Value("${cassandra.query.ts_key_value_ttl}")
89 private long systemTtl; 94 private long systemTtl;
90 95
@@ -111,13 +116,16 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -111,13 +116,16 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
111 super.startExecutor(); 116 super.startExecutor();
112 if (!isInstall()) { 117 if (!isInstall()) {
113 getFetchStmt(Aggregation.NONE, DESC_ORDER); 118 getFetchStmt(Aggregation.NONE, DESC_ORDER);
114 - }  
115 - Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);  
116 - if (partition.isPresent()) {  
117 - tsFormat = partition.get();  
118 - } else {  
119 - log.warn("Incorrect configuration of partitioning {}", partitioning);  
120 - throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); 119 + Optional<NoSqlTsPartitionDate> partition = NoSqlTsPartitionDate.parse(partitioning);
  120 + if (partition.isPresent()) {
  121 + tsFormat = partition.get();
  122 + if (!isFixedPartitioning() && partitionsCacheSize > 0) {
  123 + cassandraTsPartitionsCache = new CassandraTsPartitionsCache(partitionsCacheSize);
  124 + }
  125 + } else {
  126 + log.warn("Incorrect configuration of partitioning {}", partitioning);
  127 + throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
  128 + }
121 } 129 }
122 } 130 }
123 131
@@ -169,26 +177,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -169,26 +177,6 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
169 } 177 }
170 178
171 @Override 179 @Override
172 - public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {  
173 - if (isFixedPartitioning()) {  
174 - return Futures.immediateFuture(null);  
175 - }  
176 - ttl = computeTtl(ttl);  
177 - long partition = toPartitionTs(tsKvEntryTs);  
178 - log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);  
179 - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder((ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind());  
180 - stmtBuilder.setString(0, entityId.getEntityType().name())  
181 - .setUuid(1, entityId.getId())  
182 - .setLong(2, partition)  
183 - .setString(3, key);  
184 - if (ttl > 0) {  
185 - stmtBuilder.setInt(4, (int) ttl);  
186 - }  
187 - BoundStatement stmt = stmtBuilder.build();  
188 - return getFuture(executeAsyncWrite(tenantId, stmt), rs -> 0);  
189 - }  
190 -  
191 - @Override  
192 public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { 180 public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
193 long minPartition = toPartitionTs(query.getStartTs()); 181 long minPartition = toPartitionTs(query.getStartTs());
194 long maxPartition = toPartitionTs(query.getEndTs()); 182 long maxPartition = toPartitionTs(query.getEndTs());
@@ -461,6 +449,68 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -461,6 +449,68 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
461 return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); 449 return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
462 } 450 }
463 451
  452 + @Override
  453 + public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
  454 + if (isFixedPartitioning()) {
  455 + return Futures.immediateFuture(null);
  456 + }
  457 + ttl = computeTtl(ttl);
  458 + long partition = toPartitionTs(tsKvEntryTs);
  459 + if (cassandraTsPartitionsCache == null) {
  460 + return doSavePartition(tenantId, entityId, key, ttl, partition);
  461 + } else {
  462 + CassandraPartitionCacheKey partitionSearchKey = new CassandraPartitionCacheKey(entityId, key, partition);
  463 + if (!cassandraTsPartitionsCache.has(partitionSearchKey)) {
  464 + ListenableFuture<Integer> result = doSavePartition(tenantId, entityId, key, ttl, partition);
  465 + Futures.addCallback(result, new CacheCallback<>(partitionSearchKey), MoreExecutors.directExecutor());
  466 + return result;
  467 + } else {
  468 + return Futures.immediateFuture(0);
  469 + }
  470 + }
  471 + }
  472 +
  473 + private ListenableFuture<Integer> doSavePartition(TenantId tenantId, EntityId entityId, String key, long ttl, long partition) {
  474 + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
  475 + PreparedStatement preparedStatement = ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt();
  476 + BoundStatement stmt = preparedStatement.bind();
  477 + stmt.setString(0, entityId.getEntityType().name());
  478 + stmt.setUuid(1, entityId.getId());
  479 + stmt.setLong(2, partition);
  480 + stmt.setString(3, key);
  481 + if (ttl > 0) {
  482 + stmt.setInt(4, (int) ttl);
  483 + }
  484 +// BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(bind);
  485 +// stmtBuilder.setString(0, entityId.getEntityType().name())
  486 +// .setUuid(1, entityId.getId())
  487 +// .setLong(2, partition)
  488 +// .setString(3, key);
  489 +// if (ttl > 0) {
  490 +// stmtBuilder.setInt(4, (int) ttl);
  491 +// }
  492 +// BoundStatement stmt = stmtBuilder.build();
  493 + return getFuture(executeAsyncWrite(tenantId, stmt), rs -> 0);
  494 + }
  495 +
  496 + private class CacheCallback<Void> implements FutureCallback<Void> {
  497 + private final CassandraPartitionCacheKey key;
  498 +
  499 + private CacheCallback(CassandraPartitionCacheKey key) {
  500 + this.key = key;
  501 + }
  502 +
  503 + @Override
  504 + public void onSuccess(Void result) {
  505 + cassandraTsPartitionsCache.put(key);
  506 + }
  507 +
  508 + @Override
  509 + public void onFailure(Throwable t) {
  510 +
  511 + }
  512 + }
  513 +
464 private long computeTtl(long ttl) { 514 private long computeTtl(long ttl) {
465 if (systemTtl > 0) { 515 if (systemTtl > 0) {
466 if (ttl == 0) { 516 if (ttl == 0) {
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +
  22 +@Data
  23 +@AllArgsConstructor
  24 +public class CassandraPartitionCacheKey {
  25 +
  26 + private EntityId entityId;
  27 + private String key;
  28 + private long partition;
  29 +
  30 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
  19 +import com.github.benmanes.caffeine.cache.Caffeine;
  20 +
  21 +import java.util.concurrent.CompletableFuture;
  22 +
  23 +public class CassandraTsPartitionsCache {
  24 +
  25 + private AsyncLoadingCache<CassandraPartitionCacheKey, Boolean> partitionsCache;
  26 +
  27 + public CassandraTsPartitionsCache(long maxCacheSize) {
  28 + this.partitionsCache = Caffeine.newBuilder()
  29 + .maximumSize(maxCacheSize)
  30 + .buildAsync(key -> {
  31 + throw new IllegalStateException("'get' methods calls are not supported!");
  32 + });
  33 + }
  34 +
  35 + public boolean has(CassandraPartitionCacheKey key) {
  36 + return partitionsCache.getIfPresent(key) != null;
  37 + }
  38 +
  39 + public void put(CassandraPartitionCacheKey key) {
  40 + partitionsCache.put(key, CompletableFuture.completedFuture(true));
  41 + }
  42 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.oss.driver.api.core.ConsistencyLevel;
  19 +import com.datastax.oss.driver.api.core.cql.BoundStatement;
  20 +import com.datastax.oss.driver.api.core.cql.PreparedStatement;
  21 +import com.datastax.oss.driver.api.core.cql.Statement;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import org.junit.Before;
  24 +import org.junit.Test;
  25 +import org.junit.runner.RunWith;
  26 +import org.mockito.Mock;
  27 +import org.mockito.runners.MockitoJUnitRunner;
  28 +import org.springframework.core.env.Environment;
  29 +import org.springframework.test.util.ReflectionTestUtils;
  30 +import org.thingsboard.server.common.data.id.TenantId;
  31 +import org.thingsboard.server.dao.cassandra.CassandraCluster;
  32 +import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
  33 +import org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao;
  34 +
  35 +import java.util.UUID;
  36 +
  37 +import static org.mockito.Matchers.any;
  38 +import static org.mockito.Matchers.anyString;
  39 +import static org.mockito.Mockito.doReturn;
  40 +import static org.mockito.Mockito.spy;
  41 +import static org.mockito.Mockito.times;
  42 +import static org.mockito.Mockito.verify;
  43 +import static org.mockito.Mockito.when;
  44 +
  45 +@RunWith(MockitoJUnitRunner.class)
  46 +public class CassandraPartitionsCacheTest {
  47 +
  48 + private CassandraBaseTimeseriesDao cassandraBaseTimeseriesDao;
  49 +
  50 + @Mock
  51 + private Environment environment;
  52 +
  53 + @Mock
  54 + private CassandraBufferedRateExecutor rateLimiter;
  55 +
  56 + @Mock
  57 + private CassandraCluster cluster;
  58 +
  59 + @Mock
  60 + private GuavaSession session;
  61 +
  62 + @Mock
  63 + private PreparedStatement preparedStatement;
  64 +
  65 + @Mock
  66 + private BoundStatement boundStatement;
  67 +
  68 + @Before
  69 + public void setUp() throws Exception {
  70 + when(cluster.getDefaultReadConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  71 + when(cluster.getDefaultWriteConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
  72 + when(cluster.getSession()).thenReturn(session);
  73 + when(session.prepare(anyString())).thenReturn(preparedStatement);
  74 + when(preparedStatement.bind()).thenReturn(boundStatement);
  75 +
  76 + cassandraBaseTimeseriesDao = spy(new CassandraBaseTimeseriesDao());
  77 +
  78 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitioning", "MONTHS");
  79 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "partitionsCacheSize", 100000);
  80 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "systemTtl", 0);
  81 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "setNullValuesEnabled", false);
  82 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "environment", environment);
  83 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "rateLimiter", rateLimiter);
  84 + ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
  85 +
  86 + doReturn(Futures.immediateFuture(null)).when(cassandraBaseTimeseriesDao).getFuture(any(TbResultSetFuture.class), any());
  87 + }
  88 +
  89 + @Test
  90 + public void testPartitionSave() throws Exception {
  91 + cassandraBaseTimeseriesDao.init();
  92 +
  93 + UUID id = UUID.randomUUID();
  94 + TenantId tenantId = new TenantId(id);
  95 + long tsKvEntryTs = System.currentTimeMillis();
  96 +
  97 + for (int i = 0; i < 50000; i++) {
  98 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  99 + }
  100 +
  101 + for (int i = 0; i < 60000; i++) {
  102 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0);
  103 + }
  104 + verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
  105 + }
  106 +}
@@ -54,6 +54,8 @@ cassandra.query.default_fetch_size=2000 @@ -54,6 +54,8 @@ cassandra.query.default_fetch_size=2000
54 54
55 cassandra.query.ts_key_value_partitioning=HOURS 55 cassandra.query.ts_key_value_partitioning=HOURS
56 56
  57 +cassandra.query.ts_key_value_partitions_max_cache_size=100000
  58 +
57 cassandra.query.ts_key_value_ttl=0 59 cassandra.query.ts_key_value_ttl=0
58 60
59 cassandra.query.debug_events_ttl=604800 61 cassandra.query.debug_events_ttl=604800