Commit 94039f943b46caa0dc86ab722fbf0006b9c4acab

Authored by vparomskiy
1 parent 4ecb106a

Squashed commit of the following:

commit 8b637c9e
Author: vparomskiy <paromskiy@gmail.com>
Date:   Fri Mar 23 16:54:48 2018 +0200

    add cassandra properties in test suite
@@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId> @@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
106 try { 106 try {
107 pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); 107 pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
108 } catch (Exception ex) { 108 } catch (Exception ex) {
  109 + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
109 RuleToPluginMsg ruleMsg = msg.getMsg(); 110 RuleToPluginMsg ruleMsg = msg.getMsg();
110 MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; 111 MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
111 Integer requestId = 0; 112 Integer requestId = 0;
@@ -133,7 +133,7 @@ quota: @@ -133,7 +133,7 @@ quota:
133 intervalMin: 2 133 intervalMin: 2
134 134
135 database: 135 database:
136 - type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql 136 + type: "${DATABASE_TYPE:sql}" # cassandra OR sql
137 137
138 # Cassandra driver configuration parameters 138 # Cassandra driver configuration parameters
139 cassandra: 139 cassandra:
@@ -238,7 +238,7 @@ caffeine: @@ -238,7 +238,7 @@ caffeine:
238 specs: 238 specs:
239 relations: 239 relations:
240 timeToLiveInMinutes: 1440 240 timeToLiveInMinutes: 1440
241 - maxSize: 0 241 + maxSize: 100000
242 deviceCredentials: 242 deviceCredentials:
243 timeToLiveInMinutes: 1440 243 timeToLiveInMinutes: 1440
244 maxSize: 100000 244 maxSize: 100000
@@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
439 439
440 private PreparedStatement getLatestStmt() { 440 private PreparedStatement getLatestStmt() {
441 if (latestInsertStmt == null) { 441 if (latestInsertStmt == null) {
442 -// latestInsertStmt = new PreparedStatement[DataType.values().length];  
443 -// for (DataType type : DataType.values()) {  
444 latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + 442 latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
445 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 443 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
446 "," + ModelConstants.ENTITY_ID_COLUMN + 444 "," + ModelConstants.ENTITY_ID_COLUMN +
@@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
451 "," + ModelConstants.LONG_VALUE_COLUMN + 449 "," + ModelConstants.LONG_VALUE_COLUMN +
452 "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + 450 "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
453 " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); 451 " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
454 -// }  
455 } 452 }
456 return latestInsertStmt; 453 return latestInsertStmt;
457 } 454 }
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
29 29
30 @Component 30 @Component
31 @Slf4j 31 @Slf4j
  32 +@NoSqlDao
32 public class BufferedRateLimiter implements AsyncRateLimiter { 33 public class BufferedRateLimiter implements AsyncRateLimiter {
33 34
34 private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); 35 private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
@@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { @@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
113 lockedFuture.cancelFuture(); 114 lockedFuture.cancelFuture();
114 return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); 115 return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
115 } 116 }
  117 + if(permits.get() < permitsLimit) {
  118 + reprocessQueue();
  119 + }
116 return lockedFuture.future; 120 return lockedFuture.future;
117 } catch (InterruptedException e) { 121 } catch (InterruptedException e) {
118 return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); 122 return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
@@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter { @@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
130 expiredCount++; 134 expiredCount++;
131 } 135 }
132 } 136 }
133 - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),  
134 - maxGrantedPermissions.getAndSet(0), expiredCount); 137 + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
  138 + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
135 } 139 }
136 140
137 private class LockedFuture { 141 private class LockedFuture {
@@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
47 cassandra.query.ts_key_value_partitioning=HOURS 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
49 cassandra.query.max_limit_per_request=1000 49 cassandra.query.max_limit_per_request=1000
  50 +cassandra.query.buffer_size=100000
  51 +cassandra.query.concurrent_limit=1000
  52 +cassandra.query.permit_max_wait_time=20000
  53 +cassandra.query.rate_limit_print_interval_ms=30000
  54 +