Commit cd1947663eb76cb8b4bae87505006f2b23d56f17

Authored by Andrew Shvayka
1 parent a4faa317

Added max size of queue per websocket

@@ -75,6 +75,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -75,6 +75,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
75 private int maxSessionsPerRegularUser; 75 private int maxSessionsPerRegularUser;
76 @Value("${server.ws.limits.max_sessions_per_public_user:0}") 76 @Value("${server.ws.limits.max_sessions_per_public_user:0}")
77 private int maxSessionsPerPublicUser; 77 private int maxSessionsPerPublicUser;
  78 + @Value("${server.ws.limits.max_queue_per_ws_session:1000}")
  79 + private int maxMsgQueuePerSession;
78 80
79 @Value("${server.ws.limits.max_updates_per_session:}") 81 @Value("${server.ws.limits.max_updates_per_session:}")
80 private String perSessionUpdatesConfiguration; 82 private String perSessionUpdatesConfiguration;
@@ -108,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -108,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
108 super.afterConnectionEstablished(session); 110 super.afterConnectionEstablished(session);
109 try { 111 try {
110 if (session instanceof NativeWebSocketSession) { 112 if (session instanceof NativeWebSocketSession) {
111 - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); 113 + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
112 if (nativeSession != null) { 114 if (nativeSession != null) {
113 nativeSession.getAsyncRemote().setSendTimeout(sendTimeout); 115 nativeSession.getAsyncRemote().setSendTimeout(sendTimeout);
114 } 116 }
@@ -119,7 +121,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -119,7 +121,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
119 if (!checkLimits(session, sessionRef)) { 121 if (!checkLimits(session, sessionRef)) {
120 return; 122 return;
121 } 123 }
122 - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); 124 + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
123 externalSessionMap.put(externalSessionId, internalSessionId); 125 externalSessionMap.put(externalSessionId, internalSessionId);
124 processInWebSocketService(sessionRef, SessionEvent.onEstablished()); 126 processInWebSocketService(sessionRef, SessionEvent.onEstablished());
125 log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId()); 127 log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
@@ -176,31 +178,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @@ -176,31 +178,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
176 if (!"telemetry".equalsIgnoreCase(serviceToken)) { 178 if (!"telemetry".equalsIgnoreCase(serviceToken)) {
177 throw new InvalidParameterException("Can't find plugin with specified token!"); 179 throw new InvalidParameterException("Can't find plugin with specified token!");
178 } else { 180 } else {
179 - SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal(); 181 + SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal();
180 return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress()); 182 return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
181 } 183 }
182 } 184 }
183 185
184 - private static class SessionMetaData implements SendHandler { 186 + private class SessionMetaData implements SendHandler {
185 private final WebSocketSession session; 187 private final WebSocketSession session;
186 private final RemoteEndpoint.Async asyncRemote; 188 private final RemoteEndpoint.Async asyncRemote;
187 private final TelemetryWebSocketSessionRef sessionRef; 189 private final TelemetryWebSocketSessionRef sessionRef;
188 190
189 private volatile boolean isSending = false; 191 private volatile boolean isSending = false;
  192 + private final Queue<String> msgQueue;
190 193
191 - private Queue<String> msgQueue = new LinkedBlockingQueue<>();  
192 -  
193 - SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) { 194 + SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef, int maxMsgQueuePerSession) {
194 super(); 195 super();
195 this.session = session; 196 this.session = session;
196 - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); 197 + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
197 this.asyncRemote = nativeSession.getAsyncRemote(); 198 this.asyncRemote = nativeSession.getAsyncRemote();
198 this.sessionRef = sessionRef; 199 this.sessionRef = sessionRef;
  200 + this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession);
199 } 201 }
200 202
201 - public synchronized void sendMsg(String msg) { 203 + synchronized void sendMsg(String msg) {
202 if (isSending) { 204 if (isSending) {
203 - msgQueue.add(msg); 205 + try {
  206 + msgQueue.add(msg);
  207 + } catch (RuntimeException e){
  208 + log.trace("[{}] Session closed due to queue error", session.getId(), e);
  209 + try {
  210 + close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
  211 + } catch (IOException ioe) {
  212 + log.trace("[{}] Session transport error", session.getId(), ioe);
  213 + }
  214 + }
204 } else { 215 } else {
205 isSending = true; 216 isSending = true;
206 sendMsgInternal(msg); 217 sendMsgInternal(msg);
@@ -40,6 +40,7 @@ server: @@ -40,6 +40,7 @@ server:
40 max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}" 40 max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}"
41 max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}" 41 max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}"
42 max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}" 42 max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}"
  43 + max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}"
43 max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}" 44 max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}"
44 max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}" 45 max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
45 max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}" 46 max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"