Commit cc9e5fe6dbc2c7769094127d537b1cfd012ce66f

Authored by Igor Kulikov
1 parent f026b588

Introduce WebSocket blocking send timeout parameter. Use Work Stealing Pool for …

…dynamic threads management instead of custom ThreadPoolExecutor.
@@ -41,7 +41,6 @@ import java.util.Map; @@ -41,7 +41,6 @@ import java.util.Map;
41 public class WebSocketConfiguration implements WebSocketConfigurer { 41 public class WebSocketConfiguration implements WebSocketConfigurer {
42 42
43 public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/"; 43 public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/";
44 - public static final String WS_SECURITY_USER_ATTRIBUTE = "SECURITY_USER";  
45 private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**"; 44 private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**";
46 45
47 @Bean 46 @Bean
@@ -68,7 +67,6 @@ public class WebSocketConfiguration implements WebSocketConfigurer { @@ -68,7 +67,6 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
68 response.setStatusCode(HttpStatus.UNAUTHORIZED); 67 response.setStatusCode(HttpStatus.UNAUTHORIZED);
69 return false; 68 return false;
70 } else { 69 } else {
71 - attributes.put(WS_SECURITY_USER_ATTRIBUTE, user);  
72 return true; 70 return true;
73 } 71 }
74 } 72 }
@@ -16,14 +16,17 @@ @@ -16,14 +16,17 @@
16 package org.thingsboard.server.controller.plugin; 16 package org.thingsboard.server.controller.plugin;
17 17
18 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.tomcat.websocket.Constants;
19 import org.springframework.beans.factory.BeanCreationNotAllowedException; 20 import org.springframework.beans.factory.BeanCreationNotAllowedException;
20 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
21 import org.springframework.beans.factory.annotation.Value; 22 import org.springframework.beans.factory.annotation.Value;
  23 +import org.springframework.security.core.Authentication;
22 import org.springframework.stereotype.Service; 24 import org.springframework.stereotype.Service;
23 import org.springframework.util.StringUtils; 25 import org.springframework.util.StringUtils;
24 import org.springframework.web.socket.CloseStatus; 26 import org.springframework.web.socket.CloseStatus;
25 import org.springframework.web.socket.TextMessage; 27 import org.springframework.web.socket.TextMessage;
26 import org.springframework.web.socket.WebSocketSession; 28 import org.springframework.web.socket.WebSocketSession;
  29 +import org.springframework.web.socket.adapter.NativeWebSocketSession;
27 import org.springframework.web.socket.handler.TextWebSocketHandler; 30 import org.springframework.web.socket.handler.TextWebSocketHandler;
28 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; 31 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
29 import org.thingsboard.server.common.data.id.CustomerId; 32 import org.thingsboard.server.common.data.id.CustomerId;
@@ -38,6 +41,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; @@ -38,6 +41,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
38 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; 41 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
39 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; 42 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
40 43
  44 +import javax.websocket.Session;
41 import java.io.IOException; 45 import java.io.IOException;
42 import java.net.URI; 46 import java.net.URI;
43 import java.security.InvalidParameterException; 47 import java.security.InvalidParameterException;
@@ -56,6 +60,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -56,6 +60,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
56 @Autowired 60 @Autowired
57 private TelemetryWebSocketService webSocketService; 61 private TelemetryWebSocketService webSocketService;
58 62
  63 + @Value("${server.ws.blocking_send_timeout:5000}")
  64 + private long blockingSendTimeout;
  65 +
59 @Value("${server.ws.limits.max_sessions_per_tenant:0}") 66 @Value("${server.ws.limits.max_sessions_per_tenant:0}")
60 private int maxSessionsPerTenant; 67 private int maxSessionsPerTenant;
61 @Value("${server.ws.limits.max_sessions_per_customer:0}") 68 @Value("${server.ws.limits.max_sessions_per_customer:0}")
@@ -96,6 +103,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -96,6 +103,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
96 public void afterConnectionEstablished(WebSocketSession session) throws Exception { 103 public void afterConnectionEstablished(WebSocketSession session) throws Exception {
97 super.afterConnectionEstablished(session); 104 super.afterConnectionEstablished(session);
98 try { 105 try {
  106 + if (session instanceof NativeWebSocketSession) {
  107 + Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
  108 + if (nativeSession != null) {
  109 + nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout));
  110 + }
  111 + }
99 String internalSessionId = session.getId(); 112 String internalSessionId = session.getId();
100 TelemetryWebSocketSessionRef sessionRef = toRef(session); 113 TelemetryWebSocketSessionRef sessionRef = toRef(session);
101 String externalSessionId = sessionRef.getSessionId(); 114 String externalSessionId = sessionRef.getSessionId();
@@ -159,7 +172,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -159,7 +172,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
159 if (!"telemetry".equalsIgnoreCase(serviceToken)) { 172 if (!"telemetry".equalsIgnoreCase(serviceToken)) {
160 throw new InvalidParameterException("Can't find plugin with specified token!"); 173 throw new InvalidParameterException("Can't find plugin with specified token!");
161 } else { 174 } else {
162 - SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE); 175 + SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal();
163 return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress()); 176 return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
164 } 177 }
165 } 178 }
@@ -75,14 +75,7 @@ import java.util.List; @@ -75,14 +75,7 @@ import java.util.List;
75 import java.util.Map; 75 import java.util.Map;
76 import java.util.Optional; 76 import java.util.Optional;
77 import java.util.Set; 77 import java.util.Set;
78 -import java.util.concurrent.ConcurrentHashMap;  
79 -import java.util.concurrent.ConcurrentMap;  
80 -import java.util.concurrent.ExecutorService;  
81 -import java.util.concurrent.Executors;  
82 -import java.util.concurrent.LinkedBlockingQueue;  
83 -import java.util.concurrent.SynchronousQueue;  
84 -import java.util.concurrent.ThreadPoolExecutor;  
85 -import java.util.concurrent.TimeUnit; 78 +import java.util.concurrent.*;
86 import java.util.function.Consumer; 79 import java.util.function.Consumer;
87 import java.util.stream.Collectors; 80 import java.util.stream.Collectors;
88 81
@@ -137,7 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -137,7 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
137 130
138 @PostConstruct 131 @PostConstruct
139 public void initExecutor() { 132 public void initExecutor() {
140 - executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 133 + executor = Executors.newWorkStealingPool(50);
141 } 134 }
142 135
143 @PreDestroy 136 @PreDestroy
@@ -23,28 +23,10 @@ import org.springframework.stereotype.Service; @@ -23,28 +23,10 @@ import org.springframework.stereotype.Service;
23 import org.thingsboard.rule.engine.api.util.DonAsynchron; 23 import org.thingsboard.rule.engine.api.util.DonAsynchron;
24 import org.thingsboard.server.actors.ActorSystemContext; 24 import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.common.msg.cluster.ServerAddress; 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
26 -import org.thingsboard.server.common.transport.SessionMsgListener;  
27 -import org.thingsboard.server.common.transport.TransportService;  
28 import org.thingsboard.server.common.transport.TransportServiceCallback; 26 import org.thingsboard.server.common.transport.TransportServiceCallback;
29 import org.thingsboard.server.common.transport.service.AbstractTransportService; 27 import org.thingsboard.server.common.transport.service.AbstractTransportService;
30 import org.thingsboard.server.gen.transport.TransportProtos; 28 import org.thingsboard.server.gen.transport.TransportProtos;
31 -import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;  
32 -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;  
33 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;  
34 -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;  
35 -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;  
36 -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;  
37 -import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;  
38 -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;  
39 -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;  
40 -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;  
41 -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;  
42 -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;  
43 -import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;  
44 -import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;  
45 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;  
46 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;  
47 -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 29 +import org.thingsboard.server.gen.transport.TransportProtos.*;
48 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; 30 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
49 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; 31 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
50 import org.thingsboard.server.service.encoding.DataDecodingEncodingService; 32 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
@@ -53,15 +35,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra @@ -53,15 +35,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
53 import javax.annotation.PostConstruct; 35 import javax.annotation.PostConstruct;
54 import javax.annotation.PreDestroy; 36 import javax.annotation.PreDestroy;
55 import java.util.Optional; 37 import java.util.Optional;
56 -import java.util.UUID;  
57 -import java.util.concurrent.ConcurrentHashMap;  
58 -import java.util.concurrent.ConcurrentMap;  
59 -import java.util.concurrent.ExecutorService;  
60 -import java.util.concurrent.Executors;  
61 -import java.util.concurrent.ScheduledExecutorService;  
62 -import java.util.concurrent.SynchronousQueue;  
63 -import java.util.concurrent.ThreadPoolExecutor;  
64 -import java.util.concurrent.TimeUnit;  
65 import java.util.function.Consumer; 38 import java.util.function.Consumer;
66 39
67 /** 40 /**
@@ -28,11 +28,7 @@ import org.thingsboard.server.kafka.*; @@ -28,11 +28,7 @@ import org.thingsboard.server.kafka.*;
28 28
29 import javax.annotation.PostConstruct; 29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy; 30 import javax.annotation.PreDestroy;
31 -import java.util.concurrent.ExecutorService;  
32 -import java.util.concurrent.LinkedBlockingQueue;  
33 -import java.util.concurrent.SynchronousQueue;  
34 -import java.util.concurrent.ThreadPoolExecutor;  
35 -import java.util.concurrent.TimeUnit; 31 +import java.util.concurrent.*;
36 32
37 /** 33 /**
38 * Created by ashvayka on 05.10.18. 34 * Created by ashvayka on 05.10.18.
@@ -68,7 +64,7 @@ public class RemoteTransportApiService { @@ -68,7 +64,7 @@ public class RemoteTransportApiService {
68 64
69 @PostConstruct 65 @PostConstruct
70 public void init() { 66 public void init() {
71 - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 67 + this.transportCallbackExecutor = Executors.newWorkStealingPool(100);
72 68
73 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder(); 69 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
74 responseBuilder.settings(kafkaSettings); 70 responseBuilder.settings(kafkaSettings);
@@ -33,6 +33,7 @@ server: @@ -33,6 +33,7 @@ server:
33 key-alias: "${SSL_KEY_ALIAS:tomcat}" 33 key-alias: "${SSL_KEY_ALIAS:tomcat}"
34 log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}" 34 log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
35 ws: 35 ws:
  36 + blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}"
36 limits: 37 limits:
37 # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation 38 # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
38 max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}" 39 max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
@@ -167,6 +168,9 @@ cassandra: @@ -167,6 +168,9 @@ cassandra:
167 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" 168 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
168 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" 169 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
169 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" 170 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
  171 + dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
  172 + callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
  173 + poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
170 rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" 174 rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
171 tenant_rate_limits: 175 tenant_rate_limits:
172 enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" 176 enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
@@ -28,15 +28,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; @@ -28,15 +28,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
28 import org.thingsboard.server.gen.transport.TransportProtos; 28 import org.thingsboard.server.gen.transport.TransportProtos;
29 29
30 import java.util.UUID; 30 import java.util.UUID;
31 -import java.util.concurrent.ConcurrentHashMap;  
32 -import java.util.concurrent.ConcurrentMap;  
33 -import java.util.concurrent.ExecutorService;  
34 -import java.util.concurrent.Executors;  
35 -import java.util.concurrent.LinkedBlockingQueue;  
36 -import java.util.concurrent.ScheduledExecutorService;  
37 -import java.util.concurrent.SynchronousQueue;  
38 -import java.util.concurrent.ThreadPoolExecutor;  
39 -import java.util.concurrent.TimeUnit; 31 +import java.util.concurrent.*;
40 32
41 /** 33 /**
42 * Created by ashvayka on 17.10.18. 34 * Created by ashvayka on 17.10.18.
@@ -278,7 +270,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -278,7 +270,7 @@ public abstract class AbstractTransportService implements TransportService {
278 new TbRateLimits(perDevicesLimitsConf); 270 new TbRateLimits(perDevicesLimitsConf);
279 } 271 }
280 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); 272 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
281 - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 273 + this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
282 this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); 274 this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
283 } 275 }
284 276
@@ -52,7 +52,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< @@ -52,7 +52,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
52 @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, 52 @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
53 @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime, 53 @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
54 @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, 54 @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
55 - @Value("${cassandra.query.callback_threads:2}") int callbackThreads, 55 + @Value("${cassandra.query.callback_threads:4}") int callbackThreads,
56 @Value("${cassandra.query.poll_ms:50}") long pollMs, 56 @Value("${cassandra.query.poll_ms:50}") long pollMs,
57 @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, 57 @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
58 @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, 58 @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
@@ -25,17 +25,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimits; @@ -25,17 +25,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimits;
25 25
26 import javax.annotation.Nullable; 26 import javax.annotation.Nullable;
27 import java.util.UUID; 27 import java.util.UUID;
28 -import java.util.concurrent.BlockingQueue;  
29 -import java.util.concurrent.ConcurrentHashMap;  
30 -import java.util.concurrent.ConcurrentMap;  
31 -import java.util.concurrent.ExecutorService;  
32 -import java.util.concurrent.Executors;  
33 -import java.util.concurrent.LinkedBlockingDeque;  
34 -import java.util.concurrent.LinkedBlockingQueue;  
35 -import java.util.concurrent.ScheduledExecutorService;  
36 -import java.util.concurrent.ThreadPoolExecutor;  
37 -import java.util.concurrent.TimeUnit;  
38 -import java.util.concurrent.TimeoutException; 28 +import java.util.concurrent.*;
39 import java.util.concurrent.atomic.AtomicInteger; 29 import java.util.concurrent.atomic.AtomicInteger;
40 30
41 /** 31 /**
@@ -72,7 +62,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend @@ -72,7 +62,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
72 this.concurrencyLimit = concurrencyLimit; 62 this.concurrencyLimit = concurrencyLimit;
73 this.queue = new LinkedBlockingDeque<>(queueLimit); 63 this.queue = new LinkedBlockingDeque<>(queueLimit);
74 this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); 64 this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
75 - this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 65 + this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
76 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); 66 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
77 this.perTenantLimitsEnabled = perTenantLimitsEnabled; 67 this.perTenantLimitsEnabled = perTenantLimitsEnabled;
78 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration; 68 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;