Commit b773ee0c66742cdef3223009a04845ef70858928

Authored by Andrew Shvayka
1 parent 39e6b904

Rate limits draft

@@ -134,42 +134,58 @@ public class LocalTransportService extends AbstractTransportService implements R @@ -134,42 +134,58 @@ public class LocalTransportService extends AbstractTransportService implements R
134 134
135 @Override 135 @Override
136 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) { 136 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
137 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback); 137 + if (checkLimits(sessionInfo, callback)) {
  138 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
  139 + }
138 } 140 }
139 141
140 @Override 142 @Override
141 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 143 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
142 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback); 144 + if (checkLimits(sessionInfo, callback)) {
  145 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
  146 + }
143 } 147 }
144 148
145 @Override 149 @Override
146 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 150 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
147 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback); 151 + if (checkLimits(sessionInfo, callback)) {
  152 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
  153 + }
148 } 154 }
149 155
150 @Override 156 @Override
151 public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 157 public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
152 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback); 158 + if (checkLimits(sessionInfo, callback)) {
  159 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
  160 + }
153 } 161 }
154 162
155 @Override 163 @Override
156 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 164 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
157 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback); 165 + if (checkLimits(sessionInfo, callback)) {
  166 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
  167 + }
158 } 168 }
159 169
160 @Override 170 @Override
161 public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 171 public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
162 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback); 172 + if (checkLimits(sessionInfo, callback)) {
  173 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
  174 + }
163 } 175 }
164 176
165 @Override 177 @Override
166 public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 178 public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
167 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback); 179 + if (checkLimits(sessionInfo, callback)) {
  180 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
  181 + }
168 } 182 }
169 183
170 @Override 184 @Override
171 public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 185 public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
172 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback); 186 + if (checkLimits(sessionInfo, callback)) {
  187 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
  188 + }
173 } 189 }
174 190
175 @Override 191 @Override
@@ -444,7 +444,7 @@ transport: @@ -444,7 +444,7 @@ transport:
444 bind_port: "${MQTT_BIND_PORT:1883}" 444 bind_port: "${MQTT_BIND_PORT:1883}"
445 timeout: "${MQTT_TIMEOUT:10000}" 445 timeout: "${MQTT_TIMEOUT:10000}"
446 netty: 446 netty:
447 - leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" 447 + leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
448 boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" 448 boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
449 worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" 449 worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
450 max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}" 450 max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
@@ -249,7 +249,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -249,7 +249,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
249 @Override 249 @Override
250 public void onError(Throwable e) { 250 public void onError(Throwable e) {
251 log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e); 251 log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
252 - ctx.close(); 252 + processDisconnect(ctx);
253 } 253 }
254 }; 254 };
255 } 255 }
@@ -99,6 +99,10 @@ @@ -99,6 +99,10 @@
99 <groupId>com.google.protobuf</groupId> 99 <groupId>com.google.protobuf</groupId>
100 <artifactId>protobuf-java</artifactId> 100 <artifactId>protobuf-java</artifactId>
101 </dependency> 101 </dependency>
  102 + <dependency>
  103 + <groupId>com.github.vladimir-bukhtoyarov</groupId>
  104 + <artifactId>bucket4j-core</artifactId>
  105 + </dependency>
102 </dependencies> 106 </dependencies>
103 107
104 <build> 108 <build>
@@ -43,6 +43,8 @@ public interface TransportService { @@ -43,6 +43,8 @@ public interface TransportService {
43 void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, 43 void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
44 TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback); 44 TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
45 45
  46 + boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
  47 +
46 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback); 48 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
47 49
48 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback); 50 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
@@ -16,8 +16,13 @@ @@ -16,8 +16,13 @@
16 package org.thingsboard.server.common.transport.service; 16 package org.thingsboard.server.common.transport.service;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.thingsboard.server.common.data.EntityType;
  21 +import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
19 import org.thingsboard.server.common.transport.SessionMsgListener; 23 import org.thingsboard.server.common.transport.SessionMsgListener;
20 import org.thingsboard.server.common.transport.TransportService; 24 import org.thingsboard.server.common.transport.TransportService;
  25 +import org.thingsboard.server.common.transport.TransportServiceCallback;
21 import org.thingsboard.server.gen.transport.TransportProtos; 26 import org.thingsboard.server.gen.transport.TransportProtos;
22 27
23 import java.util.UUID; 28 import java.util.UUID;
@@ -36,9 +41,20 @@ import java.util.concurrent.TimeUnit; @@ -36,9 +41,20 @@ import java.util.concurrent.TimeUnit;
36 @Slf4j 41 @Slf4j
37 public abstract class AbstractTransportService implements TransportService { 42 public abstract class AbstractTransportService implements TransportService {
38 43
  44 + @Value("${transport.rate_limits.enabled}")
  45 + private boolean rateLimitEnabled;
  46 + @Value("${transport.rate_limits.tenant}")
  47 + private String perTenantLimitsConf;
  48 + @Value("${transport.rate_limits.tenant}")
  49 + private String perDevicesLimitsConf;
  50 +
39 protected ScheduledExecutorService schedulerExecutor; 51 protected ScheduledExecutorService schedulerExecutor;
40 protected ExecutorService transportCallbackExecutor; 52 protected ExecutorService transportCallbackExecutor;
41 - protected ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>(); 53 + private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
  54 +
  55 + //TODO: Implement cleanup of this maps.
  56 + private ConcurrentMap<TenantId, TbTransportRateLimits> perTenantLimits = new ConcurrentHashMap<>();
  57 + private ConcurrentMap<DeviceId, TbTransportRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
42 58
43 @Override 59 @Override
44 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { 60 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
@@ -53,7 +69,6 @@ public abstract class AbstractTransportService implements TransportService { @@ -53,7 +69,6 @@ public abstract class AbstractTransportService implements TransportService {
53 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); 69 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
54 deregisterSession(sessionInfo); 70 deregisterSession(sessionInfo);
55 }, timeout, TimeUnit.MILLISECONDS); 71 }, timeout, TimeUnit.MILLISECONDS);
56 -  
57 } 72 }
58 73
59 @Override 74 @Override
@@ -61,6 +76,30 @@ public abstract class AbstractTransportService implements TransportService { @@ -61,6 +76,30 @@ public abstract class AbstractTransportService implements TransportService {
61 sessions.remove(toId(sessionInfo)); 76 sessions.remove(toId(sessionInfo));
62 } 77 }
63 78
  79 + @Override
  80 + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
  81 + if (!rateLimitEnabled) {
  82 + return true;
  83 + }
  84 + TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
  85 + TbTransportRateLimits rateLimits = perTenantLimits.computeIfAbsent(tenantId, id -> new TbTransportRateLimits(perTenantLimitsConf));
  86 + if (!rateLimits.tryConsume()) {
  87 + if (callback != null) {
  88 + callback.onError(new TbRateLimitsException(EntityType.TENANT));
  89 + }
  90 + return false;
  91 + }
  92 + DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  93 + rateLimits = perDeviceLimits.computeIfAbsent(deviceId, id -> new TbTransportRateLimits(perDevicesLimitsConf));
  94 + if (!rateLimits.tryConsume()) {
  95 + if (callback != null) {
  96 + callback.onError(new TbRateLimitsException(EntityType.DEVICE));
  97 + }
  98 + return false;
  99 + }
  100 + return true;
  101 + }
  102 +
64 protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) { 103 protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
65 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); 104 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
66 SessionMetaData md = sessions.get(sessionId); 105 SessionMetaData md = sessions.get(sessionId);
@@ -101,11 +140,20 @@ public abstract class AbstractTransportService implements TransportService { @@ -101,11 +140,20 @@ public abstract class AbstractTransportService implements TransportService {
101 } 140 }
102 141
103 public void init() { 142 public void init() {
  143 + if (rateLimitEnabled) {
  144 + //Just checking the configuration parameters
  145 + new TbTransportRateLimits(perTenantLimitsConf);
  146 + new TbTransportRateLimits(perDevicesLimitsConf);
  147 + }
104 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); 148 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
105 this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); 149 this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
106 } 150 }
107 151
108 public void destroy() { 152 public void destroy() {
  153 + if (rateLimitEnabled) {
  154 + perTenantLimits.clear();
  155 + perDeviceLimits.clear();
  156 + }
109 if (schedulerExecutor != null) { 157 if (schedulerExecutor != null) {
110 schedulerExecutor.shutdownNow(); 158 schedulerExecutor.shutdownNow();
111 } 159 }
@@ -218,74 +218,90 @@ public class RemoteTransportService extends AbstractTransportService { @@ -218,74 +218,90 @@ public class RemoteTransportService extends AbstractTransportService {
218 218
219 @Override 219 @Override
220 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) { 220 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
221 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
222 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
223 - .setSessionEvent(msg).build()  
224 - ).build();  
225 - send(sessionInfo, toRuleEngineMsg, callback); 221 + if (checkLimits(sessionInfo, callback)) {
  222 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  223 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  224 + .setSessionEvent(msg).build()
  225 + ).build();
  226 + send(sessionInfo, toRuleEngineMsg, callback);
  227 + }
226 } 228 }
227 229
228 @Override 230 @Override
229 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 231 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
230 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
231 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
232 - .setPostTelemetry(msg).build()  
233 - ).build();  
234 - send(sessionInfo, toRuleEngineMsg, callback); 232 + if (checkLimits(sessionInfo, callback)) {
  233 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  234 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  235 + .setPostTelemetry(msg).build()
  236 + ).build();
  237 + send(sessionInfo, toRuleEngineMsg, callback);
  238 + }
235 } 239 }
236 240
237 @Override 241 @Override
238 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 242 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
239 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
240 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
241 - .setPostAttributes(msg).build()  
242 - ).build();  
243 - send(sessionInfo, toRuleEngineMsg, callback); 243 + if (checkLimits(sessionInfo, callback)) {
  244 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  245 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  246 + .setPostAttributes(msg).build()
  247 + ).build();
  248 + send(sessionInfo, toRuleEngineMsg, callback);
  249 + }
244 } 250 }
245 251
246 @Override 252 @Override
247 public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 253 public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
248 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
249 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
250 - .setGetAttributes(msg).build()  
251 - ).build();  
252 - send(sessionInfo, toRuleEngineMsg, callback); 254 + if (checkLimits(sessionInfo, callback)) {
  255 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  256 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  257 + .setGetAttributes(msg).build()
  258 + ).build();
  259 + send(sessionInfo, toRuleEngineMsg, callback);
  260 + }
253 } 261 }
254 262
255 @Override 263 @Override
256 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 264 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
257 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
258 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
259 - .setSubscribeToAttributes(msg).build()  
260 - ).build();  
261 - send(sessionInfo, toRuleEngineMsg, callback); 265 + if (checkLimits(sessionInfo, callback)) {
  266 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  267 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  268 + .setSubscribeToAttributes(msg).build()
  269 + ).build();
  270 + send(sessionInfo, toRuleEngineMsg, callback);
  271 + }
262 } 272 }
263 273
264 @Override 274 @Override
265 public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 275 public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
266 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
267 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
268 - .setSubscribeToRPC(msg).build()  
269 - ).build();  
270 - send(sessionInfo, toRuleEngineMsg, callback); 276 + if (checkLimits(sessionInfo, callback)) {
  277 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  278 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  279 + .setSubscribeToRPC(msg).build()
  280 + ).build();
  281 + send(sessionInfo, toRuleEngineMsg, callback);
  282 + }
271 } 283 }
272 284
273 @Override 285 @Override
274 public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 286 public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
275 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
276 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
277 - .setToDeviceRPCCallResponse(msg).build()  
278 - ).build();  
279 - send(sessionInfo, toRuleEngineMsg, callback); 287 + if (checkLimits(sessionInfo, callback)) {
  288 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  289 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  290 + .setToDeviceRPCCallResponse(msg).build()
  291 + ).build();
  292 + send(sessionInfo, toRuleEngineMsg, callback);
  293 + }
280 } 294 }
281 295
282 @Override 296 @Override
283 public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 297 public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
284 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
285 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
286 - .setToServerRPCCallRequest(msg).build()  
287 - ).build();  
288 - send(sessionInfo, toRuleEngineMsg, callback); 298 + if (checkLimits(sessionInfo, callback)) {
  299 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  300 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  301 + .setToServerRPCCallRequest(msg).build()
  302 + ).build();
  303 + send(sessionInfo, toRuleEngineMsg, callback);
  304 + }
289 } 305 }
290 306
291 private static class TransportCallbackAdaptor implements Callback { 307 private static class TransportCallbackAdaptor implements Callback {
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.service;
  17 +
  18 +import org.thingsboard.server.common.data.EntityType;
  19 +
  20 +/**
  21 + * Created by ashvayka on 22.10.18.
  22 + */
  23 +public class TbRateLimitsException extends Exception {
  24 + private final EntityType entityType;
  25 +
  26 + TbRateLimitsException(EntityType entityType) {
  27 + this.entityType = entityType;
  28 + }
  29 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.service;
  17 +
  18 +import io.github.bucket4j.Bandwidth;
  19 +import io.github.bucket4j.Bucket4j;
  20 +import io.github.bucket4j.local.LocalBucket;
  21 +import io.github.bucket4j.local.LocalBucketBuilder;
  22 +
  23 +import java.time.Duration;
  24 +
  25 +/**
  26 + * Created by ashvayka on 22.10.18.
  27 + */
  28 +class TbTransportRateLimits {
  29 + private final LocalBucket bucket;
  30 +
  31 + public TbTransportRateLimits(String limitsConfiguration) {
  32 + LocalBucketBuilder builder = Bucket4j.builder();
  33 + boolean initialized = false;
  34 + for (String limitSrc : limitsConfiguration.split(",")) {
  35 + long capacity = Long.parseLong(limitSrc.split(":")[0]);
  36 + long duration = Long.parseLong(limitSrc.split(":")[1]);
  37 + builder.addLimit(Bandwidth.simple(capacity, Duration.ofSeconds(duration)));
  38 + initialized = true;
  39 + }
  40 + if (initialized) {
  41 + bucket = builder.build();
  42 + } else {
  43 + throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
  44 + }
  45 +
  46 +
  47 + }
  48 +
  49 + boolean tryConsume() {
  50 + return bucket.tryConsume(1);
  51 + }
  52 +
  53 +}
@@ -82,6 +82,7 @@ @@ -82,6 +82,7 @@
82 <elasticsearch.version>5.0.2</elasticsearch.version> 82 <elasticsearch.version>5.0.2</elasticsearch.version>
83 <delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version> 83 <delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
84 <kafka.version>2.0.0</kafka.version> 84 <kafka.version>2.0.0</kafka.version>
  85 + <bucket4j.version>4.1.1</bucket4j.version>
85 </properties> 86 </properties>
86 87
87 <modules> 88 <modules>
@@ -778,6 +779,11 @@ @@ -778,6 +779,11 @@
778 <artifactId>delight-nashorn-sandbox</artifactId> 779 <artifactId>delight-nashorn-sandbox</artifactId>
779 <version>${delight-nashorn-sandbox.version}</version> 780 <version>${delight-nashorn-sandbox.version}</version>
780 </dependency> 781 </dependency>
  782 + <dependency>
  783 + <groupId>com.github.vladimir-bukhtoyarov</groupId>
  784 + <artifactId>bucket4j-core</artifactId>
  785 + <version>${bucket4j.version}</version>
  786 + </dependency>
781 </dependencies> 787 </dependencies>
782 </dependencyManagement> 788 </dependencyManagement>
783 789
@@ -25,7 +25,7 @@ transport: @@ -25,7 +25,7 @@ transport:
25 adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" 25 adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
26 timeout: "${MQTT_TIMEOUT:10000}" 26 timeout: "${MQTT_TIMEOUT:10000}"
27 netty: 27 netty:
28 - leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" 28 + leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
29 boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" 29 boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
30 worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" 30 worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
31 max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}" 31 max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
@@ -43,6 +43,13 @@ transport: @@ -43,6 +43,13 @@ transport:
43 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" 43 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
44 # Type of the key store 44 # Type of the key store
45 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" 45 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
  46 + sessions:
  47 + max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"
  48 + max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}"
  49 + rate_limits:
  50 + enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
  51 + tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
  52 + device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
46 53
47 #Quota parameters 54 #Quota parameters
48 quota: 55 quota: