Commit d80f666b8f64cfe807c94ab6909e5e62f3e66e92

Authored by Andrew Shvayka
1 parent 1b48704c

Added rate limits for websocket updates and REST API

  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.config;
  17 +
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.security.core.Authentication;
  21 +import org.springframework.security.core.context.SecurityContextHolder;
  22 +import org.springframework.stereotype.Component;
  23 +import org.springframework.web.filter.GenericFilterBean;
  24 +import org.thingsboard.server.common.data.EntityType;
  25 +import org.thingsboard.server.common.data.id.CustomerId;
  26 +import org.thingsboard.server.common.data.id.TenantId;
  27 +import org.thingsboard.server.common.msg.tools.TbRateLimits;
  28 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
  29 +import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
  30 +import org.thingsboard.server.service.security.model.SecurityUser;
  31 +
  32 +import javax.servlet.FilterChain;
  33 +import javax.servlet.ServletException;
  34 +import javax.servlet.ServletRequest;
  35 +import javax.servlet.ServletResponse;
  36 +import javax.servlet.http.HttpServletResponse;
  37 +import java.io.IOException;
  38 +import java.util.concurrent.ConcurrentHashMap;
  39 +import java.util.concurrent.ConcurrentMap;
  40 +
  41 +@Component
  42 +public class RateLimitProcessingFilter extends GenericFilterBean {
  43 +
  44 + @Value("${server.rest.limits.tenant.enabled:false}")
  45 + private boolean perTenantLimitsEnabled;
  46 + @Value("${server.rest.limits.tenant.configuration:}")
  47 + private String perTenantLimitsConfiguration;
  48 + @Value("${server.rest.limits.customer.enabled:false}")
  49 + private boolean perCustomerLimitsEnabled;
  50 + @Value("${server.rest.limits.customer.configuration:}")
  51 + private String perCustomerLimitsConfiguration;
  52 +
  53 + @Autowired
  54 + private ThingsboardErrorResponseHandler errorResponseHandler;
  55 +
  56 + private ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
  57 + private ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
  58 +
  59 + @Override
  60 + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
  61 + SecurityUser user = getCurrentUser();
  62 + if (user != null && !user.isSystemAdmin()) {
  63 + if (perTenantLimitsEnabled) {
  64 + TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(user.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
  65 + if (!rateLimits.tryConsume()) {
  66 + errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response);
  67 + return;
  68 + }
  69 + }
  70 + if (perCustomerLimitsEnabled && user.isCustomerUser()) {
  71 + TbRateLimits rateLimits = perCustomerLimits.computeIfAbsent(user.getCustomerId(), id -> new TbRateLimits(perCustomerLimitsConfiguration));
  72 + if (!rateLimits.tryConsume()) {
  73 + errorResponseHandler.handle(new TbRateLimitsException(EntityType.CUSTOMER), (HttpServletResponse) response);
  74 + return;
  75 + }
  76 + }
  77 + }
  78 + chain.doFilter(request, response);
  79 + }
  80 +
  81 + protected SecurityUser getCurrentUser() {
  82 + Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
  83 + if (authentication != null && authentication.getPrincipal() instanceof SecurityUser) {
  84 + return (SecurityUser) authentication.getPrincipal();
  85 + } else {
  86 + return null;
  87 + }
  88 + }
  89 +
  90 +}
... ...
... ... @@ -91,6 +91,8 @@ public class ThingsboardSecurityConfiguration extends WebSecurityConfigurerAdapt
91 91
92 92 @Autowired private ObjectMapper objectMapper;
93 93
  94 + @Autowired private RateLimitProcessingFilter rateLimitProcessingFilter;
  95 +
94 96 @Bean
95 97 protected RestLoginProcessingFilter buildRestLoginProcessingFilter() throws Exception {
96 98 RestLoginProcessingFilter filter = new RestLoginProcessingFilter(FORM_BASED_LOGIN_ENTRY_POINT, successHandler, failureHandler, objectMapper);
... ... @@ -186,7 +188,8 @@ public class ThingsboardSecurityConfiguration extends WebSecurityConfigurerAdapt
186 188 .addFilterBefore(buildRestPublicLoginProcessingFilter(), UsernamePasswordAuthenticationFilter.class)
187 189 .addFilterBefore(buildJwtTokenAuthenticationProcessingFilter(), UsernamePasswordAuthenticationFilter.class)
188 190 .addFilterBefore(buildRefreshTokenProcessingFilter(), UsernamePasswordAuthenticationFilter.class)
189   - .addFilterBefore(buildWsJwtTokenAuthenticationProcessingFilter(), UsernamePasswordAuthenticationFilter.class);
  191 + .addFilterBefore(buildWsJwtTokenAuthenticationProcessingFilter(), UsernamePasswordAuthenticationFilter.class)
  192 + .addFilterAfter(rateLimitProcessingFilter, UsernamePasswordAuthenticationFilter.class);
190 193 }
191 194
192 195
... ...
... ... @@ -52,6 +52,7 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
52 52 import org.thingsboard.server.common.msg.TbMsgMetaData;
53 53 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
54 54 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  55 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
55 56 import org.thingsboard.server.dao.alarm.AlarmService;
56 57 import org.thingsboard.server.dao.asset.AssetService;
57 58 import org.thingsboard.server.dao.attributes.AttributesService;
... ...
... ... @@ -19,15 +19,17 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.BeanCreationNotAllowedException;
20 20 import org.springframework.beans.factory.annotation.Autowired;
21 21 import org.springframework.beans.factory.annotation.Value;
22   -import org.springframework.scheduling.annotation.Scheduled;
23 22 import org.springframework.stereotype.Service;
  23 +import org.springframework.util.StringUtils;
24 24 import org.springframework.web.socket.CloseStatus;
25 25 import org.springframework.web.socket.TextMessage;
26 26 import org.springframework.web.socket.WebSocketSession;
27 27 import org.springframework.web.socket.handler.TextWebSocketHandler;
  28 +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
28 29 import org.thingsboard.server.common.data.id.CustomerId;
29 30 import org.thingsboard.server.common.data.id.TenantId;
30 31 import org.thingsboard.server.common.data.id.UserId;
  32 +import org.thingsboard.server.common.msg.tools.TbRateLimits;
31 33 import org.thingsboard.server.config.WebSocketConfiguration;
32 34 import org.thingsboard.server.service.security.model.SecurityUser;
33 35 import org.thingsboard.server.service.security.model.UserPrincipal;
... ... @@ -63,11 +65,17 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
63 65 @Value("${server.ws.limits.max_sessions_per_public_user:0}")
64 66 private int maxSessionsPerPublicUser;
65 67
  68 + @Value("${server.ws.limits.max_updates_per_session:}")
  69 + private String perSessionUpdatesConfiguration;
  70 +
  71 + private ConcurrentMap<String, TelemetryWebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
  72 + private ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>();
  73 +
66 74 private ConcurrentMap<TenantId, Set<String>> tenantSessionsMap = new ConcurrentHashMap<>();
67 75 private ConcurrentMap<CustomerId, Set<String>> customerSessionsMap = new ConcurrentHashMap<>();
68 76 private ConcurrentMap<UserId, Set<String>> regularUserSessionsMap = new ConcurrentHashMap<>();
69 77 private ConcurrentMap<UserId, Set<String>> publicUserSessionsMap = new ConcurrentHashMap<>();
70   -
  78 +
71 79 @Override
72 80 public void handleTextMessage(WebSocketSession session, TextMessage message) {
73 81 try {
... ... @@ -168,13 +176,29 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
168 176 }
169 177
170 178 @Override
171   - public void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException {
  179 + public void send(TelemetryWebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException {
172 180 String externalId = sessionRef.getSessionId();
173 181 log.debug("[{}] Processing {}", externalId, msg);
174 182 String internalId = externalSessionMap.get(externalId);
175 183 if (internalId != null) {
176 184 SessionMetaData sessionMd = internalSessionMap.get(internalId);
177 185 if (sessionMd != null) {
  186 + if (!StringUtils.isEmpty(perSessionUpdatesConfiguration)) {
  187 + TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(perSessionUpdatesConfiguration));
  188 + if (!rateLimits.tryConsume()) {
  189 + if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
  190 + log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
  191 + , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
  192 + synchronized (sessionMd) {
  193 + sessionMd.session.sendMessage(new TextMessage("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}"));
  194 + }
  195 + }
  196 + return;
  197 + } else {
  198 + log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
  199 + blacklistedSessions.remove(externalId);
  200 + }
  201 + }
178 202 synchronized (sessionMd) {
179 203 sessionMd.session.sendMessage(new TextMessage(msg));
180 204 }
... ... @@ -186,12 +210,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
186 210 }
187 211 }
188 212
189   -
190   - @Override
191   - public void close(TelemetryWebSocketSessionRef sessionRef) throws IOException {
192   - close(sessionRef, CloseStatus.NORMAL);
193   - }
194   -
195 213 @Override
196 214 public void close(TelemetryWebSocketSessionRef sessionRef, CloseStatus reason) throws IOException {
197 215 String externalId = sessionRef.getSessionId();
... ... @@ -271,6 +289,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
271 289
272 290 private void cleanupLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
273 291 String sessionId = session.getId();
  292 + perSessionUpdateLimits.remove(sessionRef.getSessionId());
  293 + blacklistedSessions.remove(sessionRef.getSessionId());
274 294 if (maxSessionsPerTenant > 0) {
275 295 Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
276 296 synchronized (tenantSessions) {
... ...
... ... @@ -25,8 +25,10 @@ import org.springframework.security.authentication.BadCredentialsException;
25 25 import org.springframework.security.core.AuthenticationException;
26 26 import org.springframework.security.web.access.AccessDeniedHandler;
27 27 import org.springframework.stereotype.Component;
  28 +import org.thingsboard.server.common.data.EntityType;
28 29 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
29 30 import org.thingsboard.server.common.data.exception.ThingsboardException;
  31 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
30 32 import org.thingsboard.server.service.security.exception.AuthMethodNotSupportedException;
31 33 import org.thingsboard.server.service.security.exception.JwtExpiredTokenException;
32 34
... ... @@ -34,6 +36,7 @@ import javax.servlet.ServletException;
34 36 import javax.servlet.http.HttpServletRequest;
35 37 import javax.servlet.http.HttpServletResponse;
36 38 import java.io.IOException;
  39 +
37 40 @Component
38 41 @Slf4j
39 42 public class ThingsboardErrorResponseHandler implements AccessDeniedHandler {
... ... @@ -62,6 +65,8 @@ public class ThingsboardErrorResponseHandler implements AccessDeniedHandler {
62 65
63 66 if (exception instanceof ThingsboardException) {
64 67 handleThingsboardException((ThingsboardException) exception, response);
  68 + } else if (exception instanceof TbRateLimitsException) {
  69 + handleRateLimitException(response, (TbRateLimitsException) exception);
65 70 } else if (exception instanceof AccessDeniedException) {
66 71 handleAccessDeniedException(response);
67 72 } else if (exception instanceof AuthenticationException) {
... ... @@ -77,6 +82,7 @@ public class ThingsboardErrorResponseHandler implements AccessDeniedHandler {
77 82 }
78 83 }
79 84
  85 +
80 86 private void handleThingsboardException(ThingsboardException thingsboardException, HttpServletResponse response) throws IOException {
81 87
82 88 ThingsboardErrorCode errorCode = thingsboardException.getErrorCode();
... ... @@ -110,6 +116,15 @@ public class ThingsboardErrorResponseHandler implements AccessDeniedHandler {
110 116 mapper.writeValue(response.getWriter(), ThingsboardErrorResponse.of(thingsboardException.getMessage(), errorCode, status));
111 117 }
112 118
  119 + private void handleRateLimitException(HttpServletResponse response, TbRateLimitsException exception) throws IOException {
  120 + response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());
  121 + String message = "Too many requests for current " + exception.getEntityType().name().toLowerCase() + "!";
  122 + mapper.writeValue(response.getWriter(),
  123 + ThingsboardErrorResponse.of(message,
  124 + ThingsboardErrorCode.TOO_MANY_REQUESTS, HttpStatus.TOO_MANY_REQUESTS));
  125 + }
  126 +
  127 +
113 128 private void handleAccessDeniedException(HttpServletResponse response) throws IOException {
114 129 response.setStatus(HttpStatus.FORBIDDEN.value());
115 130 mapper.writeValue(response.getWriter(),
... ...
... ... @@ -582,7 +582,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
582 582
583 583 private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) {
584 584 try {
585   - msgEndpoint.send(sessionRef, jsonMapper.writeValueAsString(update));
  585 + msgEndpoint.send(sessionRef, update.getSubscriptionId(), jsonMapper.writeValueAsString(update));
586 586 } catch (JsonProcessingException e) {
587 587 log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
588 588 } catch (IOException e) {
... ...
... ... @@ -24,9 +24,7 @@ import java.io.IOException;
24 24 */
25 25 public interface TelemetryWebSocketMsgEndpoint {
26 26
27   - void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException;
28   -
29   - void close(TelemetryWebSocketSessionRef sessionRef) throws IOException;
  27 + void send(TelemetryWebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException;
30 28
31 29 void close(TelemetryWebSocketSessionRef sessionRef, CloseStatus withReason) throws IOException;
32 30 }
... ...
... ... @@ -43,6 +43,15 @@ server:
43 43 max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
44 44 max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"
45 45 max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}"
  46 + max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}"
  47 + rest:
  48 + limits:
  49 + tenant:
  50 + enabled: "${TB_SERVER_REST_LIMITS_TENANT_ENABLED:false}"
  51 + configuration: "${TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION:100:1,2000:60}"
  52 + customer:
  53 + enabled: "${TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED:false}"
  54 + configuration: "${TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION:50:1,1000:60}"
46 55
47 56 # Zookeeper connection parameters. Used for service discovery.
48 57 zk:
... ...
... ... @@ -25,7 +25,9 @@ public enum ThingsboardErrorCode {
25 25 PERMISSION_DENIED(20),
26 26 INVALID_ARGUMENTS(30),
27 27 BAD_REQUEST_PARAMS(31),
28   - ITEM_NOT_FOUND(32);
  28 + ITEM_NOT_FOUND(32),
  29 + TOO_MANY_REQUESTS(33),
  30 + TOO_MANY_UPDATES(34);
29 31
30 32 private int errorCode;
31 33
... ...
common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimitsException.java renamed from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java
... ... @@ -13,17 +13,19 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.common.transport.service;
  16 +package org.thingsboard.server.common.msg.tools;
17 17
  18 +import lombok.Getter;
18 19 import org.thingsboard.server.common.data.EntityType;
19 20
20 21 /**
21 22 * Created by ashvayka on 22.10.18.
22 23 */
23   -public class TbRateLimitsException extends Exception {
  24 +public class TbRateLimitsException extends RuntimeException {
  25 + @Getter
24 26 private final EntityType entityType;
25 27
26   - TbRateLimitsException(EntityType entityType) {
  28 + public TbRateLimitsException(EntityType entityType) {
27 29 this.entityType = entityType;
28 30 }
29 31 }
... ...
... ... @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.EntityType;
21 21 import org.thingsboard.server.common.data.id.DeviceId;
22 22 import org.thingsboard.server.common.data.id.TenantId;
23 23 import org.thingsboard.server.common.msg.tools.TbRateLimits;
  24 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
24 25 import org.thingsboard.server.common.transport.SessionMsgListener;
25 26 import org.thingsboard.server.common.transport.TransportService;
26 27 import org.thingsboard.server.common.transport.TransportServiceCallback;
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.nosql;
17 17
18 18 import com.datastax.driver.core.ResultSet;
19 19 import com.datastax.driver.core.ResultSetFuture;
20   -import com.google.common.util.concurrent.Futures;
21 20 import com.google.common.util.concurrent.SettableFuture;
22 21 import lombok.extern.slf4j.Slf4j;
23 22 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -26,7 +25,6 @@ import org.springframework.scheduling.annotation.Scheduled;
26 25 import org.springframework.stereotype.Component;
27 26 import org.thingsboard.server.common.data.id.TenantId;
28 27 import org.thingsboard.server.dao.entity.EntityService;
29   -import org.thingsboard.server.dao.tenant.TenantService;
30 28 import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
31 29 import org.thingsboard.server.dao.util.AsyncTaskContext;
32 30 import org.thingsboard.server.dao.util.NoSqlAnyDao;
... ... @@ -34,7 +32,6 @@ import org.thingsboard.server.dao.util.NoSqlAnyDao;
34 32 import javax.annotation.PreDestroy;
35 33 import java.util.HashMap;
36 34 import java.util.Map;
37   -import java.util.concurrent.ExecutionException;
38 35
39 36 /**
40 37 * Created by ashvayka on 24.10.18.
... ...
... ... @@ -20,12 +20,10 @@ import com.google.common.util.concurrent.Futures;
20 20 import com.google.common.util.concurrent.ListenableFuture;
21 21 import com.google.common.util.concurrent.SettableFuture;
22 22 import lombok.extern.slf4j.Slf4j;
23   -import org.thingsboard.server.common.data.EntityType;
24 23 import org.thingsboard.server.common.data.id.TenantId;
25 24 import org.thingsboard.server.common.msg.tools.TbRateLimits;
26 25
27 26 import javax.annotation.Nullable;
28   -import java.util.Set;
29 27 import java.util.UUID;
30 28 import java.util.concurrent.BlockingQueue;
31 29 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -37,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService;
37 35 import java.util.concurrent.TimeUnit;
38 36 import java.util.concurrent.TimeoutException;
39 37 import java.util.concurrent.atomic.AtomicInteger;
40   -import java.util.concurrent.atomic.AtomicLong;
41 38
42 39 /**
43 40 * Created by ashvayka on 24.10.18.
... ...