Commit 1776aadbec41b1ad93dacf5eb08fbd58ea92f9af

Authored by vparomskiy
1 parent c5b03cb5

cassandra rate limit

clear latest telemetry
cleare rule-plugin cache
Showing 18 changed files with 737 additions and 72 deletions
@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
19 import org.springframework.boot.SpringBootConfiguration; 19 import org.springframework.boot.SpringBootConfiguration;
20 import org.springframework.context.annotation.ComponentScan; 20 import org.springframework.context.annotation.ComponentScan;
21 import org.springframework.scheduling.annotation.EnableAsync; 21 import org.springframework.scheduling.annotation.EnableAsync;
  22 +import org.springframework.scheduling.annotation.EnableScheduling;
22 import springfox.documentation.swagger2.annotations.EnableSwagger2; 23 import springfox.documentation.swagger2.annotations.EnableSwagger2;
23 24
24 import java.util.Arrays; 25 import java.util.Arrays;
@@ -26,6 +27,7 @@ import java.util.Arrays; @@ -26,6 +27,7 @@ import java.util.Arrays;
26 @SpringBootConfiguration 27 @SpringBootConfiguration
27 @EnableAsync 28 @EnableAsync
28 @EnableSwagger2 29 @EnableSwagger2
  30 +@EnableScheduling
29 @ComponentScan({"org.thingsboard.server"}) 31 @ComponentScan({"org.thingsboard.server"})
30 public class ThingsboardServerApplication { 32 public class ThingsboardServerApplication {
31 33
@@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
28 import org.thingsboard.server.common.data.plugin.PluginMetaData; 28 import org.thingsboard.server.common.data.plugin.PluginMetaData;
29 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 29 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
30 import org.thingsboard.server.common.msg.cluster.ServerAddress; 30 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  31 +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
  32 +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
  33 +import org.thingsboard.server.common.msg.session.MsgType;
31 import org.thingsboard.server.extensions.api.plugins.Plugin; 34 import org.thingsboard.server.extensions.api.plugins.Plugin;
32 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException; 35 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
33 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; 36 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  37 +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
  38 +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
34 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg; 39 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
35 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; 40 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
36 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; 41 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -98,7 +103,19 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId> @@ -98,7 +103,19 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
98 103
99 public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException { 104 public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
100 if (state == ComponentLifecycleState.ACTIVE) { 105 if (state == ComponentLifecycleState.ACTIVE) {
101 - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); 106 + try {
  107 + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
  108 + } catch (Exception ex) {
  109 + RuleToPluginMsg ruleMsg = msg.getMsg();
  110 + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
  111 + Integer requestId = 0;
  112 + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
  113 + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
  114 + }
  115 + trustedCtx.reply(
  116 + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
  117 + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
  118 + }
102 } else { 119 } else {
103 //TODO: reply with plugin suspended message 120 //TODO: reply with plugin suspended message
104 } 121 }
@@ -15,8 +15,9 @@ @@ -15,8 +15,9 @@
15 */ 15 */
16 package org.thingsboard.server.actors.rule; 16 package org.thingsboard.server.actors.rule;
17 17
18 -import java.util.*;  
19 - 18 +import akka.actor.ActorContext;
  19 +import akka.actor.ActorRef;
  20 +import akka.event.LoggingAdapter;
20 import com.fasterxml.jackson.core.JsonProcessingException; 21 import com.fasterxml.jackson.core.JsonProcessingException;
21 import org.springframework.util.StringUtils; 22 import org.springframework.util.StringUtils;
22 import org.thingsboard.server.actors.ActorSystemContext; 23 import org.thingsboard.server.actors.ActorSystemContext;
@@ -29,23 +30,17 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @@ -29,23 +30,17 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
29 import org.thingsboard.server.common.data.plugin.PluginMetaData; 30 import org.thingsboard.server.common.data.plugin.PluginMetaData;
30 import org.thingsboard.server.common.data.rule.RuleMetaData; 31 import org.thingsboard.server.common.data.rule.RuleMetaData;
31 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 32 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
32 -import org.thingsboard.server.common.msg.core.BasicRequest;  
33 import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; 33 import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
34 import org.thingsboard.server.common.msg.core.RuleEngineError; 34 import org.thingsboard.server.common.msg.core.RuleEngineError;
35 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; 35 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
36 -import org.thingsboard.server.common.msg.session.MsgType;  
37 import org.thingsboard.server.common.msg.session.ToDeviceMsg; 36 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
38 -import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException;  
39 -import org.thingsboard.server.extensions.api.rules.*;  
40 import org.thingsboard.server.extensions.api.plugins.PluginAction; 37 import org.thingsboard.server.extensions.api.plugins.PluginAction;
41 import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg; 38 import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
  39 +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
42 import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; 40 import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
  41 +import org.thingsboard.server.extensions.api.rules.*;
43 42
44 -import com.fasterxml.jackson.databind.JsonNode;  
45 -  
46 -import akka.actor.ActorContext;  
47 -import akka.actor.ActorRef;  
48 -import akka.event.LoggingAdapter; 43 +import java.util.*;
49 44
50 class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { 45 class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
51 46
@@ -190,18 +185,32 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { @@ -190,18 +185,32 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
190 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); 185 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
191 if (pendingMsg != null) { 186 if (pendingMsg != null) {
192 ChainProcessingContext ctx = pendingMsg.getCtx(); 187 ChainProcessingContext ctx = pendingMsg.getCtx();
193 - Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg);  
194 - if (ruleResponseOptional.isPresent()) {  
195 - ctx.mergeResponse(ruleResponseOptional.get());  
196 - pushToNextRule(context, ctx, null);  
197 - } else { 188 + if (isErrorResponce(msg)) {
198 pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS); 189 pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS);
  190 + } else {
  191 + Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg);
  192 + if (ruleResponseOptional.isPresent()) {
  193 + ctx.mergeResponse(ruleResponseOptional.get());
  194 + pushToNextRule(context, ctx, null);
  195 + } else {
  196 + pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS);
  197 + }
199 } 198 }
200 } else { 199 } else {
201 logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid()); 200 logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid());
202 } 201 }
203 } 202 }
204 203
  204 + private boolean isErrorResponce(PluginToRuleMsg<?> msg) {
  205 + if (msg instanceof ResponsePluginToRuleMsg) {
  206 + if (((ResponsePluginToRuleMsg) msg).getPayload() instanceof BasicStatusCodeResponse) {
  207 + BasicStatusCodeResponse responce = (BasicStatusCodeResponse) ((ResponsePluginToRuleMsg) msg).getPayload();
  208 + return !responce.isSuccess();
  209 + }
  210 + }
  211 + return false;
  212 + }
  213 +
205 void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { 214 void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
206 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); 215 RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
207 if (pendingMsg != null) { 216 if (pendingMsg != null) {
@@ -133,7 +133,7 @@ quota: @@ -133,7 +133,7 @@ quota:
133 intervalMin: 2 133 intervalMin: 2
134 134
135 database: 135 database:
136 - type: "${DATABASE_TYPE:sql}" # cassandra OR sql 136 + type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
137 137
138 # Cassandra driver configuration parameters 138 # Cassandra driver configuration parameters
139 cassandra: 139 cassandra:
@@ -181,6 +181,10 @@ cassandra: @@ -181,6 +181,10 @@ cassandra:
181 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 181 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
182 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 182 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
183 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 183 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  184 + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
  185 + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
  186 + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
  187 + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
184 188
185 # SQL configuration parameters 189 # SQL configuration parameters
186 sql: 190 sql:
@@ -222,7 +226,7 @@ caffeine: @@ -222,7 +226,7 @@ caffeine:
222 specs: 226 specs:
223 relations: 227 relations:
224 timeToLiveInMinutes: 1440 228 timeToLiveInMinutes: 1440
225 - maxSize: 100000 229 + maxSize: 0
226 deviceCredentials: 230 deviceCredentials:
227 timeToLiveInMinutes: 1440 231 timeToLiveInMinutes: 1440
228 maxSize: 100000 232 maxSize: 100000
@@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET)); 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151 - ResultSetFuture resultSetFuture = getSession().executeAsync(query); 151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 @Nullable 153 @Nullable
154 @Override 154 @Override
@@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
147 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) 147 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
148 .and(eq(ATTRIBUTE_KEY_COLUMN, key)); 148 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
149 log.debug("Remove request: {}", delete.toString()); 149 log.debug("Remove request: {}", delete.toString());
150 - return getFuture(getSession().executeAsync(delete), rs -> null); 150 + return getFuture(executeAsyncWrite(delete), rs -> null);
151 } 151 }
152 152
153 private PreparedStatement getSaveStmt() { 153 private PreparedStatement getSaveStmt() {
154 if (saveStmt == null) { 154 if (saveStmt == null) {
155 - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + 155 + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
156 "(" + ENTITY_TYPE_COLUMN + 156 "(" + ENTITY_TYPE_COLUMN +
157 "," + ENTITY_ID_COLUMN + 157 "," + ENTITY_ID_COLUMN +
158 "," + ATTRIBUTE_TYPE_COLUMN + 158 "," + ATTRIBUTE_TYPE_COLUMN +
@@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
244 values.add("?"); 244 values.add("?");
245 } 245 }
246 String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")"; 246 String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
247 - return getSession().prepare(statementString); 247 + return prepare(statementString);
248 } 248 }
249 249
250 private PreparedStatement getPartitionInsertStmt() { 250 private PreparedStatement getPartitionInsertStmt() {
251 if (partitionInsertStmt == null) { 251 if (partitionInsertStmt == null) {
252 - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF + 252 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
253 "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY + 253 "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
254 "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" + 254 "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
255 " VALUES(?, ?)"); 255 " VALUES(?, ?)");
@@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo @@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
343 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId)); 343 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
344 select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition)); 344 select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
345 select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition)); 345 select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
346 - return getSession().execute(select); 346 + return executeRead(select);
347 } 347 }
348 348
349 } 349 }
@@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch @@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
130 public boolean removeById(UUID key) { 130 public boolean removeById(UUID key) {
131 Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key)); 131 Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
132 log.debug("Remove request: {}", delete.toString()); 132 log.debug("Remove request: {}", delete.toString());
133 - return getSession().execute(delete).wasApplied(); 133 + return executeWrite(delete).wasApplied();
134 } 134 }
135 135
136 @Override 136 @Override
@@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch @@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
145 log.debug("Delete plugin meta-data entity by id [{}]", clazz); 145 log.debug("Delete plugin meta-data entity by id [{}]", clazz);
146 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz)); 146 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
147 log.debug("Remove request: {}", delete.toString()); 147 log.debug("Remove request: {}", delete.toString());
148 - ResultSet resultSet = getSession().execute(delete); 148 + ResultSet resultSet = executeWrite(delete);
149 log.debug("Delete result: [{}]", resultSet.wasApplied()); 149 log.debug("Delete result: [{}]", resultSet.wasApplied());
150 } 150 }
151 151
@@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt @@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE)); 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151 - ResultSetFuture resultSetFuture = getSession().executeAsync(query); 151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 @Nullable 153 @Nullable
154 @Override 154 @Override
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
22 import org.thingsboard.server.dao.cassandra.CassandraCluster; 22 import org.thingsboard.server.dao.cassandra.CassandraCluster;
23 import org.thingsboard.server.dao.model.type.*; 23 import org.thingsboard.server.dao.model.type.*;
  24 +import org.thingsboard.server.dao.util.BufferedRateLimiter;
24 25
25 @Slf4j 26 @Slf4j
26 public abstract class CassandraAbstractDao { 27 public abstract class CassandraAbstractDao {
@@ -28,12 +29,15 @@ public abstract class CassandraAbstractDao { @@ -28,12 +29,15 @@ public abstract class CassandraAbstractDao {
28 @Autowired 29 @Autowired
29 protected CassandraCluster cluster; 30 protected CassandraCluster cluster;
30 31
  32 + @Autowired
  33 + private BufferedRateLimiter rateLimiter;
  34 +
31 private Session session; 35 private Session session;
32 36
33 private ConsistencyLevel defaultReadLevel; 37 private ConsistencyLevel defaultReadLevel;
34 private ConsistencyLevel defaultWriteLevel; 38 private ConsistencyLevel defaultWriteLevel;
35 39
36 - protected Session getSession() { 40 + private Session getSession() {
37 if (session == null) { 41 if (session == null) {
38 session = cluster.getSession(); 42 session = cluster.getSession();
39 defaultReadLevel = cluster.getDefaultReadConsistencyLevel(); 43 defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
@@ -50,6 +54,10 @@ public abstract class CassandraAbstractDao { @@ -50,6 +54,10 @@ public abstract class CassandraAbstractDao {
50 return session; 54 return session;
51 } 55 }
52 56
  57 + protected PreparedStatement prepare(String query) {
  58 + return getSession().prepare(query);
  59 + }
  60 +
53 private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) { 61 private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
54 try { 62 try {
55 registry.codecFor(codec.getCqlType(), codec.getJavaType()); 63 registry.codecFor(codec.getCqlType(), codec.getJavaType());
@@ -76,10 +84,7 @@ public abstract class CassandraAbstractDao { @@ -76,10 +84,7 @@ public abstract class CassandraAbstractDao {
76 84
77 private ResultSet execute(Statement statement, ConsistencyLevel level) { 85 private ResultSet execute(Statement statement, ConsistencyLevel level) {
78 log.debug("Execute cassandra statement {}", statement); 86 log.debug("Execute cassandra statement {}", statement);
79 - if (statement.getConsistencyLevel() == null) {  
80 - statement.setConsistencyLevel(level);  
81 - }  
82 - return getSession().execute(statement); 87 + return executeAsync(statement, level).getUninterruptibly();
83 } 88 }
84 89
85 private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { 90 private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
@@ -87,6 +92,6 @@ public abstract class CassandraAbstractDao { @@ -87,6 +92,6 @@ public abstract class CassandraAbstractDao {
87 if (statement.getConsistencyLevel() == null) { 92 if (statement.getConsistencyLevel() == null) {
88 statement.setConsistencyLevel(level); 93 statement.setConsistencyLevel(level);
89 } 94 }
90 - return getSession().executeAsync(statement); 95 + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
91 } 96 }
92 } 97 }
@@ -60,7 +60,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -60,7 +60,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
60 List<E> list = Collections.emptyList(); 60 List<E> list = Collections.emptyList();
61 if (statement != null) { 61 if (statement != null) {
62 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 62 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
63 - ResultSet resultSet = getSession().execute(statement); 63 + ResultSet resultSet = executeRead(statement);
64 Result<E> result = getMapper().map(resultSet); 64 Result<E> result = getMapper().map(resultSet);
65 if (result != null) { 65 if (result != null) {
66 list = result.all(); 66 list = result.all();
@@ -72,7 +72,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -72,7 +72,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
72 protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) { 72 protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
73 if (statement != null) { 73 if (statement != null) {
74 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 74 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
75 - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); 75 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
76 return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() { 76 return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
77 @Nullable 77 @Nullable
78 @Override 78 @Override
@@ -94,7 +94,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -94,7 +94,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
94 E object = null; 94 E object = null;
95 if (statement != null) { 95 if (statement != null) {
96 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 96 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
97 - ResultSet resultSet = getSession().execute(statement); 97 + ResultSet resultSet = executeRead(statement);
98 Result<E> result = getMapper().map(resultSet); 98 Result<E> result = getMapper().map(resultSet);
99 if (result != null) { 99 if (result != null) {
100 object = result.one(); 100 object = result.one();
@@ -106,7 +106,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -106,7 +106,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
106 protected ListenableFuture<D> findOneByStatementAsync(Statement statement) { 106 protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
107 if (statement != null) { 107 if (statement != null) {
108 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 108 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
109 - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); 109 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
110 return Futures.transform(resultSetFuture, new Function<ResultSet, D>() { 110 return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
111 @Nullable 111 @Nullable
112 @Override 112 @Override
@@ -181,7 +181,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -181,7 +181,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
181 public boolean removeById(UUID key) { 181 public boolean removeById(UUID key) {
182 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); 182 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
183 log.debug("Remove request: {}", delete.toString()); 183 log.debug("Remove request: {}", delete.toString());
184 - return getSession().execute(delete).wasApplied(); 184 + return executeWrite(delete).wasApplied();
185 } 185 }
186 186
187 @Override 187 @Override
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.datastax.driver.core.Session;
  21 +import com.datastax.driver.core.Statement;
  22 +import com.google.common.base.Function;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.Uninterruptibles;
  27 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  28 +
  29 +import javax.annotation.Nullable;
  30 +import java.util.concurrent.*;
  31 +
  32 +public class RateLimitedResultSetFuture implements ResultSetFuture {
  33 +
  34 + private final ListenableFuture<ResultSetFuture> originalFuture;
  35 + private final ListenableFuture<Void> rateLimitFuture;
  36 +
  37 + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
  38 + this.rateLimitFuture = rateLimiter.acquireAsync();
  39 + this.originalFuture = Futures.transform(rateLimitFuture,
  40 + (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
  41 + }
  42 +
  43 + @Override
  44 + public ResultSet getUninterruptibly() {
  45 + return safeGet().getUninterruptibly();
  46 + }
  47 +
  48 + @Override
  49 + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
  50 + long rateLimitStart = System.nanoTime();
  51 + ResultSetFuture resultSetFuture = null;
  52 + try {
  53 + resultSetFuture = originalFuture.get(timeout, unit);
  54 + } catch (InterruptedException | ExecutionException e) {
  55 + throw new IllegalStateException(e);
  56 + }
  57 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  58 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  59 + if (innerTimeoutNano > 0) {
  60 + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
  61 + }
  62 + throw new TimeoutException("Timeout waiting for task.");
  63 + }
  64 +
  65 + @Override
  66 + public boolean cancel(boolean mayInterruptIfRunning) {
  67 + if (originalFuture.isDone()) {
  68 + return safeGet().cancel(mayInterruptIfRunning);
  69 + } else {
  70 + return originalFuture.cancel(mayInterruptIfRunning);
  71 + }
  72 + }
  73 +
  74 + @Override
  75 + public boolean isCancelled() {
  76 + if (originalFuture.isDone()) {
  77 + return safeGet().isCancelled();
  78 + }
  79 +
  80 + return originalFuture.isCancelled();
  81 + }
  82 +
  83 + @Override
  84 + public boolean isDone() {
  85 + return originalFuture.isDone() && safeGet().isDone();
  86 + }
  87 +
  88 + @Override
  89 + public ResultSet get() throws InterruptedException, ExecutionException {
  90 + return safeGet().get();
  91 + }
  92 +
  93 + @Override
  94 + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  95 + long rateLimitStart = System.nanoTime();
  96 + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
  97 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  98 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  99 + if (innerTimeoutNano > 0) {
  100 + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
  101 + }
  102 + throw new TimeoutException("Timeout waiting for task.");
  103 + }
  104 +
  105 + @Override
  106 + public void addListener(Runnable listener, Executor executor) {
  107 + originalFuture.addListener(() -> {
  108 + try {
  109 + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
  110 + resultSetFuture.addListener(listener, executor);
  111 + } catch (CancellationException e) {
  112 + cancel(false);
  113 + return;
  114 + } catch (ExecutionException e) {
  115 + Futures.immediateFailedFuture(e).addListener(listener, executor);
  116 + }
  117 + }, executor);
  118 + }
  119 +
  120 + private ResultSetFuture safeGet() {
  121 + try {
  122 + return originalFuture.get();
  123 + } catch (InterruptedException | ExecutionException e) {
  124 + throw new IllegalStateException(e);
  125 + }
  126 + }
  127 +
  128 + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
  129 + try {
  130 + ResultSetFuture resultSetFuture = session.executeAsync(statement);
  131 + Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
  132 + @Override
  133 + public void onSuccess(@Nullable ResultSet result) {
  134 + rateLimiter.release();
  135 + }
  136 +
  137 + @Override
  138 + public void onFailure(Throwable t) {
  139 + rateLimiter.release();
  140 + }
  141 + });
  142 + return resultSetFuture;
  143 + } catch (RuntimeException re) {
  144 + rateLimiter.release();
  145 + throw re;
  146 + }
  147 + }
  148 +}
@@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
242 242
243 private PreparedStatement getSaveStmt() { 243 private PreparedStatement getSaveStmt() {
244 if (saveStmt == null) { 244 if (saveStmt == null) {
245 - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 245 + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
246 "(" + ModelConstants.RELATION_FROM_ID_PROPERTY + 246 "(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
247 "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY + 247 "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
248 "," + ModelConstants.RELATION_TO_ID_PROPERTY + 248 "," + ModelConstants.RELATION_TO_ID_PROPERTY +
@@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
257 257
258 private PreparedStatement getDeleteStmt() { 258 private PreparedStatement getDeleteStmt() {
259 if (deleteStmt == null) { 259 if (deleteStmt == null) {
260 - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + 260 + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
261 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + 261 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
262 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" + 262 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
263 AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" + 263 AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
@@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
270 270
271 private PreparedStatement getDeleteAllByEntityStmt() { 271 private PreparedStatement getDeleteAllByEntityStmt() {
272 if (deleteAllByEntityStmt == null) { 272 if (deleteAllByEntityStmt == null) {
273 - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + 273 + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
274 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + 274 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
275 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?"); 275 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
276 } 276 }
@@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
279 279
280 private PreparedStatement getFindAllByFromStmt() { 280 private PreparedStatement getFindAllByFromStmt() {
281 if (findAllByFromStmt == null) { 281 if (findAllByFromStmt == null) {
282 - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " + 282 + findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
283 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 283 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
284 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 284 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
285 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 285 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
290 290
291 private PreparedStatement getFindAllByFromAndTypeStmt() { 291 private PreparedStatement getFindAllByFromAndTypeStmt() {
292 if (findAllByFromAndTypeStmt == null) { 292 if (findAllByFromAndTypeStmt == null) {
293 - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + 293 + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
294 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 294 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
295 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 295 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
296 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 296 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
303 303
304 private PreparedStatement getFindAllByToStmt() { 304 private PreparedStatement getFindAllByToStmt() {
305 if (findAllByToStmt == null) { 305 if (findAllByToStmt == null) {
306 - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " + 306 + findAllByToStmt = prepare(SELECT_COLUMNS + " " +
307 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + 307 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
308 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + 308 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
309 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + 309 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
314 314
315 private PreparedStatement getFindAllByToAndTypeStmt() { 315 private PreparedStatement getFindAllByToAndTypeStmt() {
316 if (findAllByToAndTypeStmt == null) { 316 if (findAllByToAndTypeStmt == null) {
317 - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + 317 + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
318 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + 318 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
319 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + 319 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
320 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + 320 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
327 327
328 private PreparedStatement getCheckRelationStmt() { 328 private PreparedStatement getCheckRelationStmt() {
329 if (checkRelationStmt == null) { 329 if (checkRelationStmt == null) {
330 - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " + 330 + checkRelationStmt = prepare(SELECT_COLUMNS + " " +
331 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 331 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
332 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 332 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
333 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 333 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
73 73
74 private PreparedStatement partitionInsertStmt; 74 private PreparedStatement partitionInsertStmt;
75 private PreparedStatement partitionInsertTtlStmt; 75 private PreparedStatement partitionInsertTtlStmt;
76 - private PreparedStatement[] latestInsertStmts; 76 + private PreparedStatement latestInsertStmt;
77 private PreparedStatement[] saveStmts; 77 private PreparedStatement[] saveStmts;
78 private PreparedStatement[] saveTtlStmts; 78 private PreparedStatement[] saveTtlStmts;
79 private PreparedStatement[] fetchStmts; 79 private PreparedStatement[] fetchStmts;
@@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
306 306
307 @Override 307 @Override
308 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 308 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
309 - DataType type = tsKvEntry.getDataType();  
310 - BoundStatement stmt = getLatestStmt(type).bind() 309 + BoundStatement stmt = getLatestStmt().bind()
311 .setString(0, entityId.getEntityType().name()) 310 .setString(0, entityId.getEntityType().name())
312 .setUUID(1, entityId.getId()) 311 .setUUID(1, entityId.getId())
313 .setString(2, tsKvEntry.getKey()) 312 .setString(2, tsKvEntry.getKey())
314 - .setLong(3, tsKvEntry.getTs());  
315 - addValue(tsKvEntry, stmt, 4); 313 + .setLong(3, tsKvEntry.getTs())
  314 + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
  315 + .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
  316 + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
  317 + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
316 return getFuture(executeAsyncWrite(stmt), rs -> null); 318 return getFuture(executeAsyncWrite(stmt), rs -> null);
317 } 319 }
318 320
@@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
381 if (saveStmts == null) { 383 if (saveStmts == null) {
382 saveStmts = new PreparedStatement[DataType.values().length]; 384 saveStmts = new PreparedStatement[DataType.values().length];
383 for (DataType type : DataType.values()) { 385 for (DataType type : DataType.values()) {
384 - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + 386 + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
385 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 387 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
386 "," + ModelConstants.ENTITY_ID_COLUMN + 388 "," + ModelConstants.ENTITY_ID_COLUMN +
387 "," + ModelConstants.KEY_COLUMN + 389 "," + ModelConstants.KEY_COLUMN +
@@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
398 if (saveTtlStmts == null) { 400 if (saveTtlStmts == null) {
399 saveTtlStmts = new PreparedStatement[DataType.values().length]; 401 saveTtlStmts = new PreparedStatement[DataType.values().length];
400 for (DataType type : DataType.values()) { 402 for (DataType type : DataType.values()) {
401 - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + 403 + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
402 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 404 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
403 "," + ModelConstants.ENTITY_ID_COLUMN + 405 "," + ModelConstants.ENTITY_ID_COLUMN +
404 "," + ModelConstants.KEY_COLUMN + 406 "," + ModelConstants.KEY_COLUMN +
@@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
420 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { 422 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
421 fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; 423 fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
422 } else { 424 } else {
423 - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX + 425 + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
424 String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF 426 String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
425 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM 427 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
426 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM 428 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
@@ -435,26 +437,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -435,26 +437,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
435 return fetchStmts[aggType.ordinal()]; 437 return fetchStmts[aggType.ordinal()];
436 } 438 }
437 439
438 - private PreparedStatement getLatestStmt(DataType dataType) {  
439 - if (latestInsertStmts == null) {  
440 - latestInsertStmts = new PreparedStatement[DataType.values().length];  
441 - for (DataType type : DataType.values()) {  
442 - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +  
443 - "(" + ModelConstants.ENTITY_TYPE_COLUMN +  
444 - "," + ModelConstants.ENTITY_ID_COLUMN +  
445 - "," + ModelConstants.KEY_COLUMN +  
446 - "," + ModelConstants.TS_COLUMN +  
447 - "," + getColumnName(type) + ")" +  
448 - " VALUES(?, ?, ?, ?, ?)");  
449 - } 440 + private PreparedStatement getLatestStmt() {
  441 + if (latestInsertStmt == null) {
  442 +// latestInsertStmt = new PreparedStatement[DataType.values().length];
  443 +// for (DataType type : DataType.values()) {
  444 + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
  445 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  446 + "," + ModelConstants.ENTITY_ID_COLUMN +
  447 + "," + ModelConstants.KEY_COLUMN +
  448 + "," + ModelConstants.TS_COLUMN +
  449 + "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
  450 + "," + ModelConstants.STRING_VALUE_COLUMN +
  451 + "," + ModelConstants.LONG_VALUE_COLUMN +
  452 + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
  453 + " VALUES(?, ?, ?, ?, ?)");
  454 +// }
450 } 455 }
451 - return latestInsertStmts[dataType.ordinal()]; 456 + return latestInsertStmt;
452 } 457 }
453 458
454 459
455 private PreparedStatement getPartitionInsertStmt() { 460 private PreparedStatement getPartitionInsertStmt() {
456 if (partitionInsertStmt == null) { 461 if (partitionInsertStmt == null) {
457 - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + 462 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
458 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 463 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
459 "," + ModelConstants.ENTITY_ID_COLUMN + 464 "," + ModelConstants.ENTITY_ID_COLUMN +
460 "," + ModelConstants.PARTITION_COLUMN + 465 "," + ModelConstants.PARTITION_COLUMN +
@@ -466,7 +471,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -466,7 +471,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
466 471
467 private PreparedStatement getPartitionInsertTtlStmt() { 472 private PreparedStatement getPartitionInsertTtlStmt() {
468 if (partitionInsertTtlStmt == null) { 473 if (partitionInsertTtlStmt == null) {
469 - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + 474 + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
470 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 475 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
471 "," + ModelConstants.ENTITY_ID_COLUMN + 476 "," + ModelConstants.ENTITY_ID_COLUMN +
472 "," + ModelConstants.PARTITION_COLUMN + 477 "," + ModelConstants.PARTITION_COLUMN +
@@ -479,7 +484,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -479,7 +484,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
479 484
480 private PreparedStatement getFindLatestStmt() { 485 private PreparedStatement getFindLatestStmt() {
481 if (findLatestStmt == null) { 486 if (findLatestStmt == null) {
482 - findLatestStmt = getSession().prepare(SELECT_PREFIX + 487 + findLatestStmt = prepare(SELECT_PREFIX +
483 ModelConstants.KEY_COLUMN + "," + 488 ModelConstants.KEY_COLUMN + "," +
484 ModelConstants.TS_COLUMN + "," + 489 ModelConstants.TS_COLUMN + "," +
485 ModelConstants.STRING_VALUE_COLUMN + "," + 490 ModelConstants.STRING_VALUE_COLUMN + "," +
@@ -496,7 +501,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -496,7 +501,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
496 501
497 private PreparedStatement getFindAllLatestStmt() { 502 private PreparedStatement getFindAllLatestStmt() {
498 if (findAllLatestStmt == null) { 503 if (findAllLatestStmt == null) {
499 - findAllLatestStmt = getSession().prepare(SELECT_PREFIX + 504 + findAllLatestStmt = prepare(SELECT_PREFIX +
500 ModelConstants.KEY_COLUMN + "," + 505 ModelConstants.KEY_COLUMN + "," +
501 ModelConstants.TS_COLUMN + "," + 506 ModelConstants.TS_COLUMN + "," +
502 ModelConstants.STRING_VALUE_COLUMN + "," + 507 ModelConstants.STRING_VALUE_COLUMN + "," +
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +
  20 +public interface AsyncRateLimiter {
  21 +
  22 + ListenableFuture<Void> acquireAsync();
  23 +
  24 + void release();
  25 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.ListeningExecutorService;
  21 +import com.google.common.util.concurrent.MoreExecutors;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.beans.factory.annotation.Value;
  24 +import org.springframework.scheduling.annotation.Scheduled;
  25 +import org.springframework.stereotype.Component;
  26 +
  27 +import java.util.concurrent.*;
  28 +import java.util.concurrent.atomic.AtomicInteger;
  29 +
  30 +@Component
  31 +@Slf4j
  32 +public class BufferedRateLimiter implements AsyncRateLimiter {
  33 +
  34 + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  35 +
  36 + private final int permitsLimit;
  37 + private final int maxPermitWaitTime;
  38 + private final AtomicInteger permits;
  39 + private final BlockingQueue<LockedFuture> queue;
  40 +
  41 + private final AtomicInteger maxQueueSize = new AtomicInteger();
  42 + private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
  43 +
  44 + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
  45 + @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
  46 + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
  47 + this.permitsLimit = permitsLimit;
  48 + this.maxPermitWaitTime = maxPermitWaitTime;
  49 + this.permits = new AtomicInteger();
  50 + this.queue = new LinkedBlockingQueue<>(queueLimit);
  51 + }
  52 +
  53 + @Override
  54 + public ListenableFuture<Void> acquireAsync() {
  55 + if (queue.isEmpty()) {
  56 + if (permits.incrementAndGet() <= permitsLimit) {
  57 + if (permits.get() > maxGrantedPermissions.get()) {
  58 + maxGrantedPermissions.set(permits.get());
  59 + }
  60 + return Futures.immediateFuture(null);
  61 + }
  62 + permits.decrementAndGet();
  63 + }
  64 +
  65 + return putInQueue();
  66 + }
  67 +
  68 + @Override
  69 + public void release() {
  70 + permits.decrementAndGet();
  71 + reprocessQueue();
  72 + }
  73 +
  74 + private void reprocessQueue() {
  75 + while (permits.get() < permitsLimit) {
  76 + if (permits.incrementAndGet() <= permitsLimit) {
  77 + if (permits.get() > maxGrantedPermissions.get()) {
  78 + maxGrantedPermissions.set(permits.get());
  79 + }
  80 + LockedFuture lockedFuture = queue.poll();
  81 + if (lockedFuture != null) {
  82 + lockedFuture.latch.countDown();
  83 + } else {
  84 + permits.decrementAndGet();
  85 + break;
  86 + }
  87 + } else {
  88 + permits.decrementAndGet();
  89 + }
  90 + }
  91 + }
  92 +
  93 + private LockedFuture createLockedFuture() {
  94 + CountDownLatch latch = new CountDownLatch(1);
  95 + ListenableFuture<Void> future = pool.submit(() -> {
  96 + latch.await();
  97 + return null;
  98 + });
  99 + return new LockedFuture(latch, future, System.currentTimeMillis());
  100 + }
  101 +
  102 + private ListenableFuture<Void> putInQueue() {
  103 +
  104 + int size = queue.size();
  105 + if (size > maxQueueSize.get()) {
  106 + maxQueueSize.set(size);
  107 + }
  108 +
  109 + if (queue.remainingCapacity() > 0) {
  110 + try {
  111 + LockedFuture lockedFuture = createLockedFuture();
  112 + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
  113 + lockedFuture.cancelFuture();
  114 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  115 + }
  116 + return lockedFuture.future;
  117 + } catch (InterruptedException e) {
  118 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
  119 + }
  120 + }
  121 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  122 + }
  123 +
  124 + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
  125 + public void printStats() {
  126 + int expiredCount = 0;
  127 + for (LockedFuture lockedFuture : queue) {
  128 + if (lockedFuture.isExpired()) {
  129 + lockedFuture.cancelFuture();
  130 + expiredCount++;
  131 + }
  132 + }
  133 + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),
  134 + maxGrantedPermissions.getAndSet(0), expiredCount);
  135 + }
  136 +
  137 + private class LockedFuture {
  138 + final CountDownLatch latch;
  139 + final ListenableFuture<Void> future;
  140 + final long createTime;
  141 +
  142 + public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
  143 + this.latch = latch;
  144 + this.future = future;
  145 + this.createTime = createTime;
  146 + }
  147 +
  148 + void cancelFuture() {
  149 + future.cancel(false);
  150 + latch.countDown();
  151 + }
  152 +
  153 + boolean isExpired() {
  154 + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
  155 + }
  156 +
  157 + }
  158 +
  159 +
  160 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import org.junit.Test;
  23 +import org.junit.runner.RunWith;
  24 +import org.mockito.Mock;
  25 +import org.mockito.Mockito;
  26 +import org.mockito.runners.MockitoJUnitRunner;
  27 +import org.mockito.stubbing.Answer;
  28 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  29 +
  30 +import java.util.concurrent.ExecutionException;
  31 +import java.util.concurrent.TimeoutException;
  32 +
  33 +import static org.junit.Assert.*;
  34 +import static org.mockito.Mockito.*;
  35 +
  36 +@RunWith(MockitoJUnitRunner.class)
  37 +public class RateLimitedResultSetFutureTest {
  38 +
  39 + private RateLimitedResultSetFuture resultSetFuture;
  40 +
  41 + @Mock
  42 + private AsyncRateLimiter rateLimiter;
  43 + @Mock
  44 + private Session session;
  45 + @Mock
  46 + private Statement statement;
  47 + @Mock
  48 + private ResultSetFuture realFuture;
  49 + @Mock
  50 + private ResultSet rows;
  51 + @Mock
  52 + private Row row;
  53 +
  54 + @Test
  55 + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
  56 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
  57 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  58 + Thread.sleep(1000L);
  59 + verify(rateLimiter).acquireAsync();
  60 + try {
  61 + assertTrue(resultSetFuture.isDone());
  62 + fail();
  63 + } catch (Exception e) {
  64 + assertTrue(e instanceof IllegalStateException);
  65 + Throwable actualCause = e.getCause();
  66 + assertTrue(actualCause instanceof ExecutionException);
  67 + }
  68 + verifyNoMoreInteractions(session, rateLimiter, statement);
  69 +
  70 + }
  71 +
  72 + @Test
  73 + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
  74 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  75 + when(session.executeAsync(statement)).thenReturn(realFuture);
  76 + Mockito.doAnswer((Answer<Void>) invocation -> {
  77 + Object[] args = invocation.getArguments();
  78 + Runnable task = (Runnable) args[0];
  79 + task.run();
  80 + return null;
  81 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  82 +
  83 + when(realFuture.getUninterruptibly()).thenReturn(rows);
  84 +
  85 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  86 + ResultSet actual = resultSetFuture.getUninterruptibly();
  87 + assertSame(rows, actual);
  88 + verify(rateLimiter, times(1)).acquireAsync();
  89 + verify(rateLimiter, times(1)).release();
  90 + }
  91 +
  92 + @Test
  93 + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
  94 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  95 + when(session.executeAsync(statement)).thenReturn(realFuture);
  96 + Mockito.doAnswer((Answer<Void>) invocation -> {
  97 + Object[] args = invocation.getArguments();
  98 + Runnable task = (Runnable) args[0];
  99 + task.run();
  100 + return null;
  101 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  102 +
  103 + when(realFuture.get()).thenReturn(rows);
  104 + when(rows.one()).thenReturn(row);
  105 +
  106 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  107 +
  108 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  109 + Row actualRow = transform.get();
  110 +
  111 + assertSame(row, actualRow);
  112 + verify(rateLimiter, times(1)).acquireAsync();
  113 + verify(rateLimiter, times(1)).release();
  114 + }
  115 +
  116 + @Test
  117 + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
  118 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  119 + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
  120 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  121 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  122 + try {
  123 + transform.get();
  124 + fail();
  125 + } catch (Exception e) {
  126 + assertTrue(e instanceof ExecutionException);
  127 + }
  128 + verify(rateLimiter, times(1)).acquireAsync();
  129 + verify(rateLimiter, times(1)).release();
  130 + }
  131 +
  132 + @Test
  133 + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
  134 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  135 + when(session.executeAsync(statement)).thenReturn(realFuture);
  136 + Mockito.doAnswer((Answer<Void>) invocation -> {
  137 + Object[] args = invocation.getArguments();
  138 + Runnable task = (Runnable) args[0];
  139 + task.run();
  140 + return null;
  141 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  142 +
  143 + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
  144 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  145 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  146 + try {
  147 + transform.get();
  148 + fail();
  149 + } catch (Exception e) {
  150 + assertTrue(e instanceof ExecutionException);
  151 + }
  152 + verify(rateLimiter, times(1)).acquireAsync();
  153 + verify(rateLimiter, times(1)).release();
  154 + }
  155 +
  156 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.*;
  19 +import org.junit.Test;
  20 +
  21 +import javax.annotation.Nullable;
  22 +import java.util.concurrent.ExecutionException;
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +import java.util.concurrent.atomic.AtomicInteger;
  26 +
  27 +import static org.junit.Assert.*;
  28 +
  29 +
  30 +public class BufferedRateLimiterTest {
  31 +
  32 + @Test
  33 + public void finishedFutureReturnedIfPermitsAreGranted() {
  34 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
  35 + ListenableFuture<Void> actual = limiter.acquireAsync();
  36 + assertTrue(actual.isDone());
  37 + }
  38 +
  39 + @Test
  40 + public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
  41 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
  42 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  43 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  44 + assertTrue(actual1.isDone());
  45 + assertFalse(actual2.isDone());
  46 + }
  47 +
  48 + @Test
  49 + public void failedFutureReturnedIfQueueIsfull() {
  50 + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
  51 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  52 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  53 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  54 +
  55 + assertTrue(actual1.isDone());
  56 + assertFalse(actual2.isDone());
  57 + assertTrue(actual3.isDone());
  58 + try {
  59 + actual3.get();
  60 + fail();
  61 + } catch (Exception e) {
  62 + assertTrue(e instanceof ExecutionException);
  63 + Throwable actualCause = e.getCause();
  64 + assertTrue(actualCause instanceof IllegalStateException);
  65 + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
  66 + }
  67 + }
  68 +
  69 + @Test
  70 + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
  71 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  72 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  73 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  74 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  75 + ListenableFuture<Void> actual4 = limiter.acquireAsync();
  76 + assertTrue(actual1.isDone());
  77 + assertTrue(actual2.isDone());
  78 + assertFalse(actual3.isDone());
  79 + assertFalse(actual4.isDone());
  80 + limiter.release();
  81 + TimeUnit.MILLISECONDS.sleep(100L);
  82 + assertTrue(actual3.isDone());
  83 + assertFalse(actual4.isDone());
  84 + limiter.release();
  85 + TimeUnit.MILLISECONDS.sleep(100L);
  86 + assertTrue(actual4.isDone());
  87 + }
  88 +
  89 + @Test
  90 + public void permitsReleasedInConcurrentMode() throws InterruptedException {
  91 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  92 + AtomicInteger actualReleased = new AtomicInteger();
  93 + AtomicInteger actualRejected = new AtomicInteger();
  94 + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
  95 + for (int i = 0; i < 100; i++) {
  96 + ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
  97 + Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
  98 + @Override
  99 + public void onSuccess(@Nullable ListenableFuture<Void> result) {
  100 + Futures.addCallback(result, new FutureCallback<Void>() {
  101 + @Override
  102 + public void onSuccess(@Nullable Void result) {
  103 + try {
  104 + TimeUnit.MILLISECONDS.sleep(100);
  105 + } catch (InterruptedException e) {
  106 + e.printStackTrace();
  107 + }
  108 + limiter.release();
  109 + actualReleased.incrementAndGet();
  110 + }
  111 +
  112 + @Override
  113 + public void onFailure(Throwable t) {
  114 + actualRejected.incrementAndGet();
  115 + }
  116 + });
  117 + }
  118 +
  119 + @Override
  120 + public void onFailure(Throwable t) {
  121 + }
  122 + });
  123 + }
  124 +
  125 + TimeUnit.SECONDS.sleep(2);
  126 + assertTrue("Unexpected released count " + actualReleased.get(),
  127 + actualReleased.get() > 10 && actualReleased.get() < 20);
  128 + assertTrue("Unexpected rejected count " + actualRejected.get(),
  129 + actualRejected.get() > 80 && actualRejected.get() < 90);
  130 +
  131 + }
  132 +
  133 +
  134 +}