Commit fe30a23ef5596546842f59c6a62e80c5d54f680e

Authored by Igor Kulikov
2 parents 6801c34c 5f7c4748

Merge branch 'master' of github.com:thingsboard/thingsboard

... ... @@ -75,6 +75,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
75 75 private int maxSessionsPerRegularUser;
76 76 @Value("${server.ws.limits.max_sessions_per_public_user:0}")
77 77 private int maxSessionsPerPublicUser;
  78 + @Value("${server.ws.limits.max_queue_per_ws_session:1000}")
  79 + private int maxMsgQueuePerSession;
78 80
79 81 @Value("${server.ws.limits.max_updates_per_session:}")
80 82 private String perSessionUpdatesConfiguration;
... ... @@ -108,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
108 110 super.afterConnectionEstablished(session);
109 111 try {
110 112 if (session instanceof NativeWebSocketSession) {
111   - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
  113 + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
112 114 if (nativeSession != null) {
113 115 nativeSession.getAsyncRemote().setSendTimeout(sendTimeout);
114 116 }
... ... @@ -119,7 +121,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
119 121 if (!checkLimits(session, sessionRef)) {
120 122 return;
121 123 }
122   - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef));
  124 + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
123 125 externalSessionMap.put(externalSessionId, internalSessionId);
124 126 processInWebSocketService(sessionRef, SessionEvent.onEstablished());
125 127 log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
... ... @@ -176,31 +178,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
176 178 if (!"telemetry".equalsIgnoreCase(serviceToken)) {
177 179 throw new InvalidParameterException("Can't find plugin with specified token!");
178 180 } else {
179   - SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal();
  181 + SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal();
180 182 return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
181 183 }
182 184 }
183 185
184   - private static class SessionMetaData implements SendHandler {
  186 + private class SessionMetaData implements SendHandler {
185 187 private final WebSocketSession session;
186 188 private final RemoteEndpoint.Async asyncRemote;
187 189 private final TelemetryWebSocketSessionRef sessionRef;
188 190
189 191 private volatile boolean isSending = false;
  192 + private final Queue<String> msgQueue;
190 193
191   - private Queue<String> msgQueue = new LinkedBlockingQueue<>();
192   -
193   - SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
  194 + SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef, int maxMsgQueuePerSession) {
194 195 super();
195 196 this.session = session;
196   - Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
  197 + Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
197 198 this.asyncRemote = nativeSession.getAsyncRemote();
198 199 this.sessionRef = sessionRef;
  200 + this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession);
199 201 }
200 202
201   - public synchronized void sendMsg(String msg) {
  203 + synchronized void sendMsg(String msg) {
202 204 if (isSending) {
203   - msgQueue.add(msg);
  205 + try {
  206 + msgQueue.add(msg);
  207 + } catch (RuntimeException e){
  208 + log.trace("[{}] Session closed due to queue error", session.getId(), e);
  209 + try {
  210 + close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
  211 + } catch (IOException ioe) {
  212 + log.trace("[{}] Session transport error", session.getId(), ioe);
  213 + }
  214 + }
204 215 } else {
205 216 isSending = true;
206 217 sendMsgInternal(msg);
... ...
... ... @@ -40,6 +40,7 @@ server:
40 40 max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}"
41 41 max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}"
42 42 max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}"
  43 + max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}"
43 44 max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}"
44 45 max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
45 46 max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"
... ...