Commit da5679d498108f81a58a74b60ec87e9c0aae04f3

Authored by Igor Kulikov
1 parent cc9e5fe6

Add async websocket send support.

... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.controller.plugin;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19   -import org.apache.tomcat.websocket.Constants;
20 19 import org.springframework.beans.factory.BeanCreationNotAllowedException;
21 20 import org.springframework.beans.factory.annotation.Autowired;
22 21 import org.springframework.beans.factory.annotation.Value;
... ... @@ -41,14 +40,19 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
41 40 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
42 41 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
43 42
  43 +import javax.websocket.RemoteEndpoint;
  44 +import javax.websocket.SendHandler;
  45 +import javax.websocket.SendResult;
44 46 import javax.websocket.Session;
45 47 import java.io.IOException;
46 48 import java.net.URI;
47 49 import java.security.InvalidParameterException;
  50 +import java.util.Queue;
48 51 import java.util.Set;
49 52 import java.util.UUID;
50 53 import java.util.concurrent.ConcurrentHashMap;
51 54 import java.util.concurrent.ConcurrentMap;
  55 +import java.util.concurrent.LinkedBlockingQueue;
52 56
53 57 @Service
54 58 @Slf4j
... ... @@ -60,8 +64,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
60 64 @Autowired
61 65 private TelemetryWebSocketService webSocketService;
62 66
63   - @Value("${server.ws.blocking_send_timeout:5000}")
64   - private long blockingSendTimeout;
  67 + @Value("${server.ws.send_timeout:5000}")
  68 + private long sendTimeout;
65 69
66 70 @Value("${server.ws.limits.max_sessions_per_tenant:0}")
67 71 private int maxSessionsPerTenant;
... ... @@ -106,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
106 110 if (session instanceof NativeWebSocketSession) {
107 111 Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
108 112 if (nativeSession != null) {
109   - nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout));
  113 + nativeSession.getAsyncRemote().setSendTimeout(sendTimeout);
110 114 }
111 115 }
112 116 String internalSessionId = session.getId();
... ... @@ -177,15 +181,52 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
177 181 }
178 182 }
179 183
180   - private static class SessionMetaData {
  184 + private static class SessionMetaData implements SendHandler {
181 185 private final WebSocketSession session;
  186 + private final RemoteEndpoint.Async asyncRemote;
182 187 private final TelemetryWebSocketSessionRef sessionRef;
183 188
  189 + private volatile boolean isSending = false;
  190 +
  191 + private Queue<String> msgQueue = new LinkedBlockingQueue<>();
  192 +
184 193 SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
185 194 super();
186 195 this.session = session;
  196 + Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
  197 + this.asyncRemote = nativeSession.getAsyncRemote();
187 198 this.sessionRef = sessionRef;
188 199 }
  200 +
  201 + public synchronized void sendMsg(String msg) {
  202 + if (isSending) {
  203 + msgQueue.add(msg);
  204 + } else {
  205 + isSending = true;
  206 + sendMsgInternal(msg);
  207 + }
  208 + }
  209 +
  210 + private void sendMsgInternal(String msg) {
  211 + try {
  212 + this.asyncRemote.sendText(msg, this);
  213 + } catch (Exception e) {
  214 + log.error("[{}] Failed to send msg", session.getId(), e);
  215 + }
  216 + }
  217 +
  218 + @Override
  219 + public void onResult(SendResult result) {
  220 + if (!result.isOK()) {
  221 + log.error("[{}] Failed to send msg", session.getId(), result.getException());
  222 + }
  223 + String msg = msgQueue.poll();
  224 + if (msg != null) {
  225 + sendMsgInternal(msg);
  226 + } else {
  227 + isSending = false;
  228 + }
  229 + }
189 230 }
190 231
191 232 @Override
... ... @@ -202,9 +243,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
202 243 if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
203 244 log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
204 245 , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
205   - synchronized (sessionMd) {
206   - sessionMd.session.sendMessage(new TextMessage("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}"));
207   - }
  246 + sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
208 247 }
209 248 return;
210 249 } else {
... ... @@ -212,14 +251,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
212 251 blacklistedSessions.remove(externalId);
213 252 }
214 253 }
215   - synchronized (sessionMd) {
216   - long start = System.currentTimeMillis();
217   - sessionMd.session.sendMessage(new TextMessage(msg));
218   - long took = System.currentTimeMillis() - start;
219   - if (took >= 1000) {
220   - log.info("[{}][{}] Sending message took more than 1 second [{}ms] {}", sessionRef.getSecurityCtx().getTenantId(), externalId, took, msg);
221   - }
222   - }
  254 + sessionMd.sendMsg(msg);
223 255 } else {
224 256 log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
225 257 }
... ...
... ... @@ -33,7 +33,7 @@ server:
33 33 key-alias: "${SSL_KEY_ALIAS:tomcat}"
34 34 log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
35 35 ws:
36   - blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}"
  36 + send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
37 37 limits:
38 38 # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
39 39 max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
... ...