Commit 02c4d6b08a3eb34e2de252ac97c7e55da8cfe785
Merge branch 'develop/1.5' of github.com:thingsboard/thingsboard into develop/1.5
Showing
5 changed files
with
13 additions
and
6 deletions
... | ... | @@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId> |
106 | 106 | try { |
107 | 107 | pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); |
108 | 108 | } catch (Exception ex) { |
109 | + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex); | |
109 | 110 | RuleToPluginMsg ruleMsg = msg.getMsg(); |
110 | 111 | MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; |
111 | 112 | Integer requestId = 0; | ... | ... |
... | ... | @@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
439 | 439 | |
440 | 440 | private PreparedStatement getLatestStmt() { |
441 | 441 | if (latestInsertStmt == null) { |
442 | -// latestInsertStmt = new PreparedStatement[DataType.values().length]; | |
443 | -// for (DataType type : DataType.values()) { | |
444 | 442 | latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + |
445 | 443 | "(" + ModelConstants.ENTITY_TYPE_COLUMN + |
446 | 444 | "," + ModelConstants.ENTITY_ID_COLUMN + |
... | ... | @@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
451 | 449 | "," + ModelConstants.LONG_VALUE_COLUMN + |
452 | 450 | "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + |
453 | 451 | " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); |
454 | -// } | |
455 | 452 | } |
456 | 453 | return latestInsertStmt; |
457 | 454 | } | ... | ... |
... | ... | @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; |
29 | 29 | |
30 | 30 | @Component |
31 | 31 | @Slf4j |
32 | +@NoSqlDao | |
32 | 33 | public class BufferedRateLimiter implements AsyncRateLimiter { |
33 | 34 | |
34 | 35 | private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); |
... | ... | @@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
113 | 114 | lockedFuture.cancelFuture(); |
114 | 115 | return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); |
115 | 116 | } |
117 | + if(permits.get() < permitsLimit) { | |
118 | + reprocessQueue(); | |
119 | + } | |
116 | 120 | return lockedFuture.future; |
117 | 121 | } catch (InterruptedException e) { |
118 | 122 | return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); |
... | ... | @@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
130 | 134 | expiredCount++; |
131 | 135 | } |
132 | 136 | } |
133 | - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0), | |
134 | - maxGrantedPermissions.getAndSet(0), expiredCount); | |
137 | + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), | |
138 | + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); | |
135 | 139 | } |
136 | 140 | |
137 | 141 | private class LockedFuture { | ... | ... |
... | ... | @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 |
47 | 47 | cassandra.query.ts_key_value_partitioning=HOURS |
48 | 48 | |
49 | 49 | cassandra.query.max_limit_per_request=1000 |
50 | +cassandra.query.buffer_size=100000 | |
51 | +cassandra.query.concurrent_limit=1000 | |
52 | +cassandra.query.permit_max_wait_time=20000 | |
53 | +cassandra.query.rate_limit_print_interval_ms=30000 | |
54 | + | ... | ... |