Commit 571f96c4a99d189023886fe0e5903151a4776814

Authored by Andrew Shvayka
1 parent c1675db1

New Buffered Rate Limit implementation

... ... @@ -140,7 +140,7 @@ cassandra:
140 140 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
141 141 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
142 142 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
143   - rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
  143 + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
144 144
145 145 # SQL configuration parameters
146 146 sql:
... ...
... ... @@ -35,7 +35,6 @@ import org.thingsboard.server.dao.model.type.ComponentTypeCodec;
35 35 import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec;
36 36 import org.thingsboard.server.dao.model.type.EntityTypeCodec;
37 37 import org.thingsboard.server.dao.model.type.JsonCodec;
38   -import org.thingsboard.server.dao.util.BufferedRateLimiter;
39 38
40 39 import java.util.concurrent.ConcurrentHashMap;
41 40 import java.util.concurrent.ConcurrentMap;
... ... @@ -49,7 +48,7 @@ public abstract class CassandraAbstractDao {
49 48 private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
50 49
51 50 @Autowired
52   - private BufferedRateLimiter rateLimiter;
  51 + private CassandraBufferedRateExecutor rateLimiter;
53 52
54 53 private Session session;
55 54
... ... @@ -115,12 +114,12 @@ public abstract class CassandraAbstractDao {
115 114 if (statement.getConsistencyLevel() == null) {
116 115 statement.setConsistencyLevel(level);
117 116 }
118   - return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
  117 + return rateLimiter.submit(new CassandraStatementTask(getSession(), statement));
119 118 }
120 119
121 120 private static String statementToString(Statement statement) {
122 121 if (statement instanceof BoundStatement) {
123   - return ((BoundStatement)statement).preparedStatement().getQueryString();
  122 + return ((BoundStatement) statement).preparedStatement().getQueryString();
124 123 } else {
125 124 return statement.toString();
126 125 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.google.common.util.concurrent.SettableFuture;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.beans.factory.annotation.Value;
  23 +import org.springframework.scheduling.annotation.Scheduled;
  24 +import org.springframework.stereotype.Component;
  25 +import org.thingsboard.server.dao.nosql.tmp.AbstractBufferedRateExecutor;
  26 +import org.thingsboard.server.dao.nosql.tmp.AsyncTaskContext;
  27 +import org.thingsboard.server.dao.util.NoSqlAnyDao;
  28 +
  29 +import javax.annotation.PreDestroy;
  30 +
  31 +/**
  32 + * Created by ashvayka on 24.10.18.
  33 + */
  34 +@Component
  35 +@Slf4j
  36 +@NoSqlAnyDao
  37 +public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, ResultSetFuture, ResultSet> {
  38 +
  39 + public CassandraBufferedRateExecutor(
  40 + @Value("${cassandra.query.buffer_size}") int queueLimit,
  41 + @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
  42 + @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
  43 + @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
  44 + @Value("${cassandra.query.callback_threads:2}") int callbackThreads,
  45 + @Value("${cassandra.query.poll_ms:50}") long pollMs) {
  46 + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs);
  47 + }
  48 +
  49 + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
  50 + public void printStats() {
  51 + log.info("Permits totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] currBuffer [{}] ",
  52 + totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0),
  53 + totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0),
  54 + concurrencyLevel.get());
  55 + }
  56 +
  57 + @PreDestroy
  58 + public void stop() {
  59 + super.stop();
  60 + }
  61 +
  62 + @Override
  63 + protected SettableFuture<ResultSet> create() {
  64 + return SettableFuture.create();
  65 + }
  66 +
  67 + @Override
  68 + protected ResultSetFuture wrap(CassandraStatementTask task, SettableFuture<ResultSet> future) {
  69 + return new TbResultSetFuture(future);
  70 + }
  71 +
  72 + @Override
  73 + protected ResultSetFuture execute(AsyncTaskContext<CassandraStatementTask, ResultSet> taskCtx) {
  74 + CassandraStatementTask task = taskCtx.getTask();
  75 + return task.getSession().executeAsync(task.getStatement());
  76 + }
  77 +
  78 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.Session;
  19 +import com.datastax.driver.core.Statement;
  20 +import lombok.Data;
  21 +import org.thingsboard.server.dao.nosql.tmp.AsyncTask;
  22 +
  23 +/**
  24 + * Created by ashvayka on 24.10.18.
  25 + */
  26 +@Data
  27 +public class CassandraStatementTask implements AsyncTask {
  28 +
  29 + private final Session session;
  30 + private final Statement statement;
  31 +
  32 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.google.common.util.concurrent.SettableFuture;
  21 +
  22 +import java.util.concurrent.ExecutionException;
  23 +import java.util.concurrent.Executor;
  24 +import java.util.concurrent.TimeUnit;
  25 +import java.util.concurrent.TimeoutException;
  26 +
  27 +/**
  28 + * Created by ashvayka on 24.10.18.
  29 + */
  30 +public class TbResultSetFuture implements ResultSetFuture {
  31 +
  32 + private final SettableFuture<ResultSet> mainFuture;
  33 +
  34 + public TbResultSetFuture(SettableFuture<ResultSet> mainFuture) {
  35 + this.mainFuture = mainFuture;
  36 + }
  37 +
  38 + @Override
  39 + public ResultSet getUninterruptibly() {
  40 + return getSafe();
  41 + }
  42 +
  43 + @Override
  44 + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
  45 + return getSafe(timeout, unit);
  46 + }
  47 +
  48 + @Override
  49 + public boolean cancel(boolean mayInterruptIfRunning) {
  50 + return mainFuture.cancel(mayInterruptIfRunning);
  51 + }
  52 +
  53 + @Override
  54 + public boolean isCancelled() {
  55 + return mainFuture.isCancelled();
  56 + }
  57 +
  58 + @Override
  59 + public boolean isDone() {
  60 + return mainFuture.isDone();
  61 + }
  62 +
  63 + @Override
  64 + public ResultSet get() throws InterruptedException, ExecutionException {
  65 + return mainFuture.get();
  66 + }
  67 +
  68 + @Override
  69 + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  70 + return mainFuture.get(timeout, unit);
  71 + }
  72 +
  73 + @Override
  74 + public void addListener(Runnable listener, Executor executor) {
  75 + mainFuture.addListener(listener, executor);
  76 + }
  77 +
  78 + private ResultSet getSafe() {
  79 + try {
  80 + return mainFuture.get();
  81 + } catch (InterruptedException | ExecutionException e) {
  82 + throw new IllegalStateException(e);
  83 + }
  84 + }
  85 +
  86 + private ResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException {
  87 + try {
  88 + return mainFuture.get(timeout, unit);
  89 + } catch (InterruptedException | ExecutionException e) {
  90 + throw new IllegalStateException(e);
  91 + }
  92 + }
  93 +
  94 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql.tmp;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import com.google.common.util.concurrent.SettableFuture;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +
  24 +import javax.annotation.Nullable;
  25 +import java.util.UUID;
  26 +import java.util.concurrent.BlockingQueue;
  27 +import java.util.concurrent.ExecutorService;
  28 +import java.util.concurrent.Executors;
  29 +import java.util.concurrent.LinkedBlockingDeque;
  30 +import java.util.concurrent.ScheduledExecutorService;
  31 +import java.util.concurrent.TimeUnit;
  32 +import java.util.concurrent.TimeoutException;
  33 +import java.util.concurrent.atomic.AtomicInteger;
  34 +
  35 +/**
  36 + * Created by ashvayka on 24.10.18.
  37 + */
  38 +@Slf4j
  39 +public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
  40 +
  41 + private final long maxWaitTime;
  42 + private final long pollMs;
  43 + private final BlockingQueue<AsyncTaskContext<T, V>> queue;
  44 + private final ExecutorService dispatcherExecutor;
  45 + private final ExecutorService callbackExecutor;
  46 + private final ScheduledExecutorService timeoutExecutor;
  47 + private final int concurrencyLimit;
  48 +
  49 + protected final AtomicInteger concurrencyLevel = new AtomicInteger();
  50 + protected final AtomicInteger totalAdded = new AtomicInteger();
  51 + protected final AtomicInteger totalLaunched = new AtomicInteger();
  52 + protected final AtomicInteger totalReleased = new AtomicInteger();
  53 + protected final AtomicInteger totalFailed = new AtomicInteger();
  54 + protected final AtomicInteger totalExpired = new AtomicInteger();
  55 + protected final AtomicInteger totalRejected = new AtomicInteger();
  56 +
  57 + public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs) {
  58 + this.maxWaitTime = maxWaitTime;
  59 + this.pollMs = pollMs;
  60 + this.concurrencyLimit = concurrencyLimit;
  61 + this.queue = new LinkedBlockingDeque<>(queueLimit);
  62 + this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
  63 + this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads);
  64 + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
  65 + for (int i = 0; i < dispatcherThreads; i++) {
  66 + dispatcherExecutor.submit(this::dispatch);
  67 + }
  68 + }
  69 +
  70 + @Override
  71 + public F submit(T task) {
  72 + SettableFuture<V> settableFuture = create();
  73 + F result = wrap(task, settableFuture);
  74 + try {
  75 + totalAdded.incrementAndGet();
  76 + queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
  77 + } catch (IllegalStateException e) {
  78 + totalRejected.incrementAndGet();
  79 + settableFuture.setException(e);
  80 + }
  81 + return result;
  82 + }
  83 +
  84 + public void stop() {
  85 + if (dispatcherExecutor != null) {
  86 + dispatcherExecutor.shutdownNow();
  87 + }
  88 + if (callbackExecutor != null) {
  89 + callbackExecutor.shutdownNow();
  90 + }
  91 + if (timeoutExecutor != null) {
  92 + timeoutExecutor.shutdownNow();
  93 + }
  94 + }
  95 +
  96 + protected abstract SettableFuture<V> create();
  97 +
  98 + protected abstract F wrap(T task, SettableFuture<V> future);
  99 +
  100 + protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
  101 +
  102 + private void dispatch() {
  103 + log.info("Buffered rate executor thread started");
  104 + while (!Thread.interrupted()) {
  105 + int curLvl = concurrencyLevel.get();
  106 + AsyncTaskContext<T, V> taskCtx = null;
  107 + try {
  108 + if (curLvl <= concurrencyLimit) {
  109 + taskCtx = queue.take();
  110 + final AsyncTaskContext<T, V> finalTaskCtx = taskCtx;
  111 + logTask("Processing", finalTaskCtx);
  112 + concurrencyLevel.incrementAndGet();
  113 + long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
  114 + if (timeout > 0) {
  115 + totalLaunched.incrementAndGet();
  116 + ListenableFuture<V> result = execute(finalTaskCtx);
  117 + result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
  118 + Futures.addCallback(result, new FutureCallback<V>() {
  119 + @Override
  120 + public void onSuccess(@Nullable V result) {
  121 + logTask("Releasing", finalTaskCtx);
  122 + totalReleased.incrementAndGet();
  123 + concurrencyLevel.decrementAndGet();
  124 + finalTaskCtx.getFuture().set(result);
  125 + }
  126 +
  127 + @Override
  128 + public void onFailure(Throwable t) {
  129 + if (t instanceof TimeoutException) {
  130 + logTask("Expired During Execution", finalTaskCtx);
  131 + } else {
  132 + logTask("Failed", finalTaskCtx);
  133 + }
  134 + totalFailed.incrementAndGet();
  135 + concurrencyLevel.decrementAndGet();
  136 + finalTaskCtx.getFuture().setException(t);
  137 + log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t);
  138 + }
  139 + }, callbackExecutor);
  140 + } else {
  141 + logTask("Expired Before Execution", finalTaskCtx);
  142 + totalExpired.incrementAndGet();
  143 + concurrencyLevel.decrementAndGet();
  144 + taskCtx.getFuture().setException(new TimeoutException());
  145 + }
  146 + } else {
  147 + Thread.sleep(pollMs);
  148 + }
  149 + } catch (Throwable e) {
  150 + if (taskCtx != null) {
  151 + log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);
  152 + totalFailed.incrementAndGet();
  153 + concurrencyLevel.decrementAndGet();
  154 + } else {
  155 + log.debug("Failed to queue task:", e);
  156 + }
  157 + }
  158 + }
  159 + log.info("Buffered rate executor thread stopped");
  160 + }
  161 +
  162 + private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
  163 + if (log.isTraceEnabled()) {
  164 + log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
  165 + } else {
  166 + log.debug("[{}] {} task", taskCtx.getId(), action);
  167 + }
  168 + }
  169 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql.tmp;
  17 +
  18 +/**
  19 + * Created by ashvayka on 24.10.18.
  20 + */
  21 +public interface AsyncTask {
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql.tmp;
  17 +
  18 +import com.google.common.util.concurrent.SettableFuture;
  19 +import lombok.Data;
  20 +
  21 +import java.util.UUID;
  22 +
  23 +/**
  24 + * Created by ashvayka on 24.10.18.
  25 + */
  26 +@Data
  27 +public class AsyncTaskContext<T extends AsyncTask, V> {
  28 +
  29 + private final UUID id;
  30 + private final T task;
  31 + private final SettableFuture<V> future;
  32 + private final long createTime;
  33 +
  34 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql.tmp;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +
  20 +/**
  21 + * Created by ashvayka on 24.10.18.
  22 + */
  23 +public interface BufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture> {
  24 +
  25 + F submit(T task);
  26 +
  27 +}
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.util;
17   -
18   -import com.google.common.util.concurrent.Futures;
19   -import com.google.common.util.concurrent.ListenableFuture;
20   -import com.google.common.util.concurrent.ListeningExecutorService;
21   -import com.google.common.util.concurrent.MoreExecutors;
22   -import lombok.extern.slf4j.Slf4j;
23   -import org.springframework.beans.factory.annotation.Value;
24   -import org.springframework.scheduling.annotation.Scheduled;
25   -import org.springframework.stereotype.Component;
26   -import org.thingsboard.server.dao.exception.BufferLimitException;
27   -
28   -import java.util.concurrent.BlockingQueue;
29   -import java.util.concurrent.CountDownLatch;
30   -import java.util.concurrent.Executors;
31   -import java.util.concurrent.LinkedBlockingQueue;
32   -import java.util.concurrent.TimeUnit;
33   -import java.util.concurrent.atomic.AtomicInteger;
34   -
35   -@Component
36   -@Slf4j
37   -@NoSqlAnyDao
38   -public class BufferedRateLimiter implements AsyncRateLimiter {
39   -
40   - private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
41   -
42   - private final int permitsLimit;
43   - private final int maxPermitWaitTime;
44   - private final AtomicInteger permits;
45   - private final BlockingQueue<LockedFuture> queue;
46   -
47   - private final AtomicInteger maxQueueSize = new AtomicInteger();
48   - private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
49   - private final AtomicInteger totalGranted = new AtomicInteger();
50   - private final AtomicInteger totalReleased = new AtomicInteger();
51   - private final AtomicInteger totalRequested = new AtomicInteger();
52   -
53   - public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
54   - @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
55   - @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
56   - this.permitsLimit = permitsLimit;
57   - this.maxPermitWaitTime = maxPermitWaitTime;
58   - this.permits = new AtomicInteger();
59   - this.queue = new LinkedBlockingQueue<>(queueLimit);
60   - }
61   -
62   - @Override
63   - public ListenableFuture<Void> acquireAsync() {
64   - totalRequested.incrementAndGet();
65   - if (queue.isEmpty()) {
66   - if (permits.incrementAndGet() <= permitsLimit) {
67   - if (permits.get() > maxGrantedPermissions.get()) {
68   - maxGrantedPermissions.set(permits.get());
69   - }
70   - totalGranted.incrementAndGet();
71   - return Futures.immediateFuture(null);
72   - }
73   - permits.decrementAndGet();
74   - }
75   -
76   - return putInQueue();
77   - }
78   -
79   - @Override
80   - public void release() {
81   - permits.decrementAndGet();
82   - totalReleased.incrementAndGet();
83   - reprocessQueue();
84   - }
85   -
86   - private void reprocessQueue() {
87   - while (permits.get() < permitsLimit) {
88   - if (permits.incrementAndGet() <= permitsLimit) {
89   - if (permits.get() > maxGrantedPermissions.get()) {
90   - maxGrantedPermissions.set(permits.get());
91   - }
92   - LockedFuture lockedFuture = queue.poll();
93   - if (lockedFuture != null) {
94   - totalGranted.incrementAndGet();
95   - lockedFuture.latch.countDown();
96   - } else {
97   - permits.decrementAndGet();
98   - break;
99   - }
100   - } else {
101   - permits.decrementAndGet();
102   - }
103   - }
104   - }
105   -
106   - private LockedFuture createLockedFuture() {
107   - CountDownLatch latch = new CountDownLatch(1);
108   - ListenableFuture<Void> future = pool.submit(() -> {
109   - latch.await();
110   - return null;
111   - });
112   - return new LockedFuture(latch, future, System.currentTimeMillis());
113   - }
114   -
115   - private ListenableFuture<Void> putInQueue() {
116   -
117   - int size = queue.size();
118   - if (size > maxQueueSize.get()) {
119   - maxQueueSize.set(size);
120   - }
121   -
122   - if (queue.remainingCapacity() > 0) {
123   - try {
124   - LockedFuture lockedFuture = createLockedFuture();
125   - if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
126   - lockedFuture.cancelFuture();
127   - return Futures.immediateFailedFuture(new BufferLimitException());
128   - }
129   - if(permits.get() < permitsLimit) {
130   - reprocessQueue();
131   - }
132   - if(permits.get() < permitsLimit) {
133   - reprocessQueue();
134   - }
135   - return lockedFuture.future;
136   - } catch (InterruptedException e) {
137   - return Futures.immediateFailedFuture(new BufferLimitException());
138   - }
139   - }
140   - return Futures.immediateFailedFuture(new BufferLimitException());
141   - }
142   -
143   - @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
144   - public void printStats() {
145   - int expiredCount = 0;
146   - for (LockedFuture lockedFuture : queue) {
147   - if (lockedFuture.isExpired()) {
148   - lockedFuture.cancelFuture();
149   - expiredCount++;
150   - }
151   - }
152   - log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " +
153   - "totalPermits [{}] totalRequests [{}] totalReleased [{}]",
154   - maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount,
155   - permits.get(), queue.size(),
156   - totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0));
157   - }
158   -
159   - private class LockedFuture {
160   - final CountDownLatch latch;
161   - final ListenableFuture<Void> future;
162   - final long createTime;
163   -
164   - public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
165   - this.latch = latch;
166   - this.future = future;
167   - this.createTime = createTime;
168   - }
169   -
170   - void cancelFuture() {
171   - future.cancel(false);
172   - latch.countDown();
173   - }
174   -
175   - boolean isExpired() {
176   - return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
177   - }
178   -
179   - }
180   -
181   -
182   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.util;
17   -
18   -import com.google.common.util.concurrent.FutureCallback;
19   -import com.google.common.util.concurrent.Futures;
20   -import com.google.common.util.concurrent.ListenableFuture;
21   -import com.google.common.util.concurrent.ListeningExecutorService;
22   -import com.google.common.util.concurrent.MoreExecutors;
23   -import org.junit.Test;
24   -import org.thingsboard.server.dao.exception.BufferLimitException;
25   -
26   -import javax.annotation.Nullable;
27   -import java.util.concurrent.ExecutionException;
28   -import java.util.concurrent.Executors;
29   -import java.util.concurrent.TimeUnit;
30   -import java.util.concurrent.atomic.AtomicInteger;
31   -
32   -import static org.junit.Assert.assertEquals;
33   -import static org.junit.Assert.assertFalse;
34   -import static org.junit.Assert.assertTrue;
35   -import static org.junit.Assert.fail;
36   -
37   -
38   -public class BufferedRateLimiterTest {
39   -
40   - @Test
41   - public void finishedFutureReturnedIfPermitsAreGranted() {
42   - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
43   - ListenableFuture<Void> actual = limiter.acquireAsync();
44   - assertTrue(actual.isDone());
45   - }
46   -
47   - @Test
48   - public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
49   - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
50   - ListenableFuture<Void> actual1 = limiter.acquireAsync();
51   - ListenableFuture<Void> actual2 = limiter.acquireAsync();
52   - assertTrue(actual1.isDone());
53   - assertFalse(actual2.isDone());
54   - }
55   -
56   - @Test
57   - public void failedFutureReturnedIfQueueIsfull() {
58   - BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
59   - ListenableFuture<Void> actual1 = limiter.acquireAsync();
60   - ListenableFuture<Void> actual2 = limiter.acquireAsync();
61   - ListenableFuture<Void> actual3 = limiter.acquireAsync();
62   -
63   - assertTrue(actual1.isDone());
64   - assertFalse(actual2.isDone());
65   - assertTrue(actual3.isDone());
66   - try {
67   - actual3.get();
68   - fail();
69   - } catch (Exception e) {
70   - assertTrue(e instanceof ExecutionException);
71   - Throwable actualCause = e.getCause();
72   - assertTrue(actualCause instanceof BufferLimitException);
73   - assertEquals("Rate Limit Buffer is full", actualCause.getMessage());
74   - }
75   - }
76   -
77   - @Test
78   - public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
79   - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
80   - ListenableFuture<Void> actual1 = limiter.acquireAsync();
81   - ListenableFuture<Void> actual2 = limiter.acquireAsync();
82   - ListenableFuture<Void> actual3 = limiter.acquireAsync();
83   - ListenableFuture<Void> actual4 = limiter.acquireAsync();
84   - assertTrue(actual1.isDone());
85   - assertTrue(actual2.isDone());
86   - assertFalse(actual3.isDone());
87   - assertFalse(actual4.isDone());
88   - limiter.release();
89   - TimeUnit.MILLISECONDS.sleep(100L);
90   - assertTrue(actual3.isDone());
91   - assertFalse(actual4.isDone());
92   - limiter.release();
93   - TimeUnit.MILLISECONDS.sleep(100L);
94   - assertTrue(actual4.isDone());
95   - }
96   -
97   - @Test
98   - public void permitsReleasedInConcurrentMode() throws InterruptedException {
99   - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
100   - AtomicInteger actualReleased = new AtomicInteger();
101   - AtomicInteger actualRejected = new AtomicInteger();
102   - ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
103   - for (int i = 0; i < 100; i++) {
104   - ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
105   - Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
106   - @Override
107   - public void onSuccess(@Nullable ListenableFuture<Void> result) {
108   - Futures.addCallback(result, new FutureCallback<Void>() {
109   - @Override
110   - public void onSuccess(@Nullable Void result) {
111   - try {
112   - TimeUnit.MILLISECONDS.sleep(100);
113   - } catch (InterruptedException e) {
114   - e.printStackTrace();
115   - }
116   - limiter.release();
117   - actualReleased.incrementAndGet();
118   - }
119   -
120   - @Override
121   - public void onFailure(Throwable t) {
122   - actualRejected.incrementAndGet();
123   - }
124   - });
125   - }
126   -
127   - @Override
128   - public void onFailure(Throwable t) {
129   - }
130   - });
131   - }
132   -
133   - TimeUnit.SECONDS.sleep(2);
134   - assertTrue("Unexpected released count " + actualReleased.get(),
135   - actualReleased.get() > 10 && actualReleased.get() < 20);
136   - assertTrue("Unexpected rejected count " + actualRejected.get(),
137   - actualRejected.get() > 80 && actualRejected.get() < 90);
138   -
139   - }
140   -
141   -
142   -}
\ No newline at end of file