Showing
6 changed files
with
86 additions
and
16 deletions
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.exception; | |
17 | + | |
18 | +public class BufferLimitException extends RuntimeException { | |
19 | + | |
20 | + private static final long serialVersionUID = 4513762009041887588L; | |
21 | + | |
22 | + public BufferLimitException() { | |
23 | + super("Rate Limit Buffer is full"); | |
24 | + } | |
25 | +} | ... | ... |
... | ... | @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback; |
24 | 24 | import com.google.common.util.concurrent.Futures; |
25 | 25 | import com.google.common.util.concurrent.ListenableFuture; |
26 | 26 | import com.google.common.util.concurrent.Uninterruptibles; |
27 | +import org.thingsboard.server.dao.exception.BufferLimitException; | |
27 | 28 | import org.thingsboard.server.dao.util.AsyncRateLimiter; |
28 | 29 | |
29 | 30 | import javax.annotation.Nullable; |
... | ... | @@ -35,9 +36,15 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { |
35 | 36 | private final ListenableFuture<Void> rateLimitFuture; |
36 | 37 | |
37 | 38 | public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { |
38 | - this.rateLimitFuture = rateLimiter.acquireAsync(); | |
39 | + this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> { | |
40 | + if (!(t instanceof BufferLimitException)) { | |
41 | + rateLimiter.release(); | |
42 | + } | |
43 | + return Futures.immediateFailedFuture(t); | |
44 | + }); | |
39 | 45 | this.originalFuture = Futures.transform(rateLimitFuture, |
40 | 46 | (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement)); |
47 | + | |
41 | 48 | } |
42 | 49 | |
43 | 50 | @Override |
... | ... | @@ -108,10 +115,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { |
108 | 115 | try { |
109 | 116 | ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); |
110 | 117 | resultSetFuture.addListener(listener, executor); |
111 | - } catch (CancellationException e) { | |
112 | - cancel(false); | |
113 | - return; | |
114 | - } catch (ExecutionException e) { | |
118 | + } catch (CancellationException | ExecutionException e) { | |
115 | 119 | Futures.immediateFailedFuture(e).addListener(listener, executor); |
116 | 120 | } |
117 | 121 | }, executor); | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; |
23 | 23 | import org.springframework.beans.factory.annotation.Value; |
24 | 24 | import org.springframework.scheduling.annotation.Scheduled; |
25 | 25 | import org.springframework.stereotype.Component; |
26 | +import org.thingsboard.server.dao.exception.BufferLimitException; | |
26 | 27 | |
27 | 28 | import java.util.concurrent.*; |
28 | 29 | import java.util.concurrent.atomic.AtomicInteger; |
... | ... | @@ -41,6 +42,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
41 | 42 | |
42 | 43 | private final AtomicInteger maxQueueSize = new AtomicInteger(); |
43 | 44 | private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); |
45 | + private final AtomicInteger totalGranted = new AtomicInteger(); | |
46 | + private final AtomicInteger totalReleased = new AtomicInteger(); | |
47 | + private final AtomicInteger totalRequested = new AtomicInteger(); | |
44 | 48 | |
45 | 49 | public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, |
46 | 50 | @Value("${cassandra.query.concurrent_limit}") int permitsLimit, |
... | ... | @@ -53,11 +57,13 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
53 | 57 | |
54 | 58 | @Override |
55 | 59 | public ListenableFuture<Void> acquireAsync() { |
60 | + totalRequested.incrementAndGet(); | |
56 | 61 | if (queue.isEmpty()) { |
57 | 62 | if (permits.incrementAndGet() <= permitsLimit) { |
58 | 63 | if (permits.get() > maxGrantedPermissions.get()) { |
59 | 64 | maxGrantedPermissions.set(permits.get()); |
60 | 65 | } |
66 | + totalGranted.incrementAndGet(); | |
61 | 67 | return Futures.immediateFuture(null); |
62 | 68 | } |
63 | 69 | permits.decrementAndGet(); |
... | ... | @@ -69,6 +75,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
69 | 75 | @Override |
70 | 76 | public void release() { |
71 | 77 | permits.decrementAndGet(); |
78 | + totalReleased.incrementAndGet(); | |
72 | 79 | reprocessQueue(); |
73 | 80 | } |
74 | 81 | |
... | ... | @@ -80,6 +87,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
80 | 87 | } |
81 | 88 | LockedFuture lockedFuture = queue.poll(); |
82 | 89 | if (lockedFuture != null) { |
90 | + totalGranted.incrementAndGet(); | |
83 | 91 | lockedFuture.latch.countDown(); |
84 | 92 | } else { |
85 | 93 | permits.decrementAndGet(); |
... | ... | @@ -112,17 +120,20 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
112 | 120 | LockedFuture lockedFuture = createLockedFuture(); |
113 | 121 | if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { |
114 | 122 | lockedFuture.cancelFuture(); |
115 | - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); | |
123 | + return Futures.immediateFailedFuture(new BufferLimitException()); | |
124 | + } | |
125 | + if(permits.get() < permitsLimit) { | |
126 | + reprocessQueue(); | |
116 | 127 | } |
117 | 128 | if(permits.get() < permitsLimit) { |
118 | 129 | reprocessQueue(); |
119 | 130 | } |
120 | 131 | return lockedFuture.future; |
121 | 132 | } catch (InterruptedException e) { |
122 | - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); | |
133 | + return Futures.immediateFailedFuture(new BufferLimitException()); | |
123 | 134 | } |
124 | 135 | } |
125 | - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); | |
136 | + return Futures.immediateFailedFuture(new BufferLimitException()); | |
126 | 137 | } |
127 | 138 | |
128 | 139 | @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") |
... | ... | @@ -134,8 +145,11 @@ public class BufferedRateLimiter implements AsyncRateLimiter { |
134 | 145 | expiredCount++; |
135 | 146 | } |
136 | 147 | } |
137 | - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), | |
138 | - maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); | |
148 | + log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " + | |
149 | + "totalPermits [{}] totalRequests [{}] totalReleased [{}]", | |
150 | + maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount, | |
151 | + permits.get(), queue.size(), | |
152 | + totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0)); | |
139 | 153 | } |
140 | 154 | |
141 | 155 | private class LockedFuture { | ... | ... |
... | ... | @@ -19,16 +19,17 @@ import com.datastax.driver.core.*; |
19 | 19 | import com.datastax.driver.core.exceptions.UnsupportedFeatureException; |
20 | 20 | import com.google.common.util.concurrent.Futures; |
21 | 21 | import com.google.common.util.concurrent.ListenableFuture; |
22 | +import com.google.common.util.concurrent.MoreExecutors; | |
22 | 23 | import org.junit.Test; |
23 | 24 | import org.junit.runner.RunWith; |
24 | 25 | import org.mockito.Mock; |
25 | 26 | import org.mockito.Mockito; |
26 | 27 | import org.mockito.runners.MockitoJUnitRunner; |
27 | 28 | import org.mockito.stubbing.Answer; |
29 | +import org.thingsboard.server.dao.exception.BufferLimitException; | |
28 | 30 | import org.thingsboard.server.dao.util.AsyncRateLimiter; |
29 | 31 | |
30 | -import java.util.concurrent.ExecutionException; | |
31 | -import java.util.concurrent.TimeoutException; | |
32 | +import java.util.concurrent.*; | |
32 | 33 | |
33 | 34 | import static org.junit.Assert.*; |
34 | 35 | import static org.mockito.Mockito.*; |
... | ... | @@ -53,7 +54,7 @@ public class RateLimitedResultSetFutureTest { |
53 | 54 | |
54 | 55 | @Test |
55 | 56 | public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { |
56 | - when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); | |
57 | + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new BufferLimitException())); | |
57 | 58 | resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); |
58 | 59 | Thread.sleep(1000L); |
59 | 60 | verify(rateLimiter).acquireAsync(); |
... | ... | @@ -153,4 +154,29 @@ public class RateLimitedResultSetFutureTest { |
153 | 154 | verify(rateLimiter, times(1)).release(); |
154 | 155 | } |
155 | 156 | |
157 | + @Test | |
158 | + public void expiredQueryReturnPermit() throws InterruptedException, ExecutionException { | |
159 | + CountDownLatch latch = new CountDownLatch(1); | |
160 | + ListenableFuture<Void> future = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> { | |
161 | + latch.await(); | |
162 | + return null; | |
163 | + }); | |
164 | + when(rateLimiter.acquireAsync()).thenReturn(future); | |
165 | + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); | |
166 | + | |
167 | + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one); | |
168 | +// TimeUnit.MILLISECONDS.sleep(200); | |
169 | + future.cancel(false); | |
170 | + latch.countDown(); | |
171 | + | |
172 | + try { | |
173 | + transform.get(); | |
174 | + fail(); | |
175 | + } catch (Exception e) { | |
176 | + assertTrue(e instanceof ExecutionException); | |
177 | + } | |
178 | + verify(rateLimiter, times(1)).acquireAsync(); | |
179 | + verify(rateLimiter, times(1)).release(); | |
180 | + } | |
181 | + | |
156 | 182 | } |
\ No newline at end of file | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.util; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.*; |
19 | 19 | import org.junit.Test; |
20 | +import org.thingsboard.server.dao.exception.BufferLimitException; | |
20 | 21 | |
21 | 22 | import javax.annotation.Nullable; |
22 | 23 | import java.util.concurrent.ExecutionException; |
... | ... | @@ -61,8 +62,8 @@ public class BufferedRateLimiterTest { |
61 | 62 | } catch (Exception e) { |
62 | 63 | assertTrue(e instanceof ExecutionException); |
63 | 64 | Throwable actualCause = e.getCause(); |
64 | - assertTrue(actualCause instanceof IllegalStateException); | |
65 | - assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); | |
65 | + assertTrue(actualCause instanceof BufferLimitException); | |
66 | + assertEquals("Rate Limit Buffer is full", actualCause.getMessage()); | |
66 | 67 | } |
67 | 68 | } |
68 | 69 | ... | ... |