Commit 2e096cf269e916788cac758d19fcefb17c7c1d82

Authored by Igor Kulikov
Committed by GitHub
2 parents f29d548d e3851dca

Merge pull request #4187 from YevhenBondarenko/fix/ws

added ping for WS
... ... @@ -41,9 +41,13 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
41 41 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
42 42 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
43 43
44   -import javax.websocket.*;
  44 +import javax.websocket.RemoteEndpoint;
  45 +import javax.websocket.SendHandler;
  46 +import javax.websocket.SendResult;
  47 +import javax.websocket.Session;
45 48 import java.io.IOException;
46 49 import java.net.URI;
  50 +import java.nio.ByteBuffer;
47 51 import java.security.InvalidParameterException;
48 52 import java.util.Queue;
49 53 import java.util.Set;
... ... @@ -79,6 +83,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
79 83 @Value("${server.ws.limits.max_updates_per_session:}")
80 84 private String perSessionUpdatesConfiguration;
81 85
  86 + @Value("${server.ws.ping_timeout:30000}")
  87 + private long pingTimeout;
  88 +
82 89 private ConcurrentMap<String, TelemetryWebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
83 90 private ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>();
84 91
... ... @@ -120,6 +127,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
120 127 return;
121 128 }
122 129 internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
  130 +
123 131 externalSessionMap.put(externalSessionId, internalSessionId);
124 132 processInWebSocketService(sessionRef, SessionEvent.onEstablished());
125 133 log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
... ... @@ -189,6 +197,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
189 197 private volatile boolean isSending = false;
190 198 private final Queue<String> msgQueue;
191 199
  200 + private volatile long lastActivityTime;
  201 +
192 202 SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef, int maxMsgQueuePerSession) {
193 203 super();
194 204 this.session = session;
... ... @@ -196,6 +206,23 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
196 206 this.asyncRemote = nativeSession.getAsyncRemote();
197 207 this.sessionRef = sessionRef;
198 208 this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession);
  209 + this.lastActivityTime = System.currentTimeMillis();
  210 + }
  211 +
  212 + synchronized void sendPing(long currentTime) {
  213 + try {
  214 + if (currentTime - lastActivityTime >= pingTimeout) {
  215 + this.asyncRemote.sendPing(ByteBuffer.wrap(new byte[]{}));
  216 + lastActivityTime = currentTime;
  217 + }
  218 + } catch (Exception e) {
  219 + log.trace("[{}] Failed to send ping msg", session.getId(), e);
  220 + try {
  221 + close(this.sessionRef, CloseStatus.SESSION_NOT_RELIABLE);
  222 + } catch (IOException ioe) {
  223 + log.trace("[{}] Session transport error", session.getId(), ioe);
  224 + }
  225 + }
199 226 }
200 227
201 228 synchronized void sendMsg(String msg) {
... ... @@ -243,6 +270,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
243 270 log.trace("[{}] Session transport error", session.getId(), ioe);
244 271 }
245 272 } else {
  273 + lastActivityTime = System.currentTimeMillis();
246 274 String msg = msgQueue.poll();
247 275 if (msg != null) {
248 276 sendMsgInternal(msg);
... ... @@ -285,6 +313,22 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
285 313 }
286 314
287 315 @Override
  316 + public void sendPing(TelemetryWebSocketSessionRef sessionRef, long currentTime) throws IOException {
  317 + String externalId = sessionRef.getSessionId();
  318 + String internalId = externalSessionMap.get(externalId);
  319 + if (internalId != null) {
  320 + SessionMetaData sessionMd = internalSessionMap.get(internalId);
  321 + if (sessionMd != null) {
  322 + sessionMd.sendPing(currentTime);
  323 + } else {
  324 + log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
  325 + }
  326 + } else {
  327 + log.warn("[{}] Failed to find session by external id", externalId);
  328 + }
  329 + }
  330 +
  331 + @Override
288 332 public void close(TelemetryWebSocketSessionRef sessionRef, CloseStatus reason) throws IOException {
289 333 String externalId = sessionRef.getSessionId();
290 334 log.debug("[{}] Processing close request", externalId);
... ...
... ... @@ -64,7 +64,6 @@ import org.thingsboard.server.service.telemetry.cmd.v1.TelemetryPluginCmd;
64 64 import org.thingsboard.server.service.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
65 65 import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
66 66 import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate;
67   -import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate;
68 67 import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
69 68 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
70 69 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
... ... @@ -89,6 +88,8 @@ import java.util.concurrent.ConcurrentHashMap;
89 88 import java.util.concurrent.ConcurrentMap;
90 89 import java.util.concurrent.ExecutorService;
91 90 import java.util.concurrent.Executors;
  91 +import java.util.concurrent.ScheduledExecutorService;
  92 +import java.util.concurrent.TimeUnit;
92 93 import java.util.function.Consumer;
93 94 import java.util.stream.Collectors;
94 95
... ... @@ -151,14 +152,23 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
151 152 private ExecutorService executor;
152 153 private String serviceId;
153 154
  155 + private ScheduledExecutorService pingExecutor;
  156 +
154 157 @PostConstruct
155 158 public void initExecutor() {
156 159 serviceId = serviceInfoProvider.getServiceId();
157 160 executor = Executors.newWorkStealingPool(50);
  161 +
  162 + pingExecutor = Executors.newSingleThreadScheduledExecutor();
  163 + pingExecutor.scheduleWithFixedDelay(this::sendPing, 10000, 10000, TimeUnit.MILLISECONDS);
158 164 }
159 165
160 166 @PreDestroy
161 167 public void shutdownExecutor() {
  168 + if (pingExecutor != null) {
  169 + pingExecutor.shutdownNow();
  170 + }
  171 +
162 172 if (executor != null) {
163 173 executor.shutdownNow();
164 174 }
... ... @@ -774,6 +784,17 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
774 784 }
775 785 }
776 786
  787 + private void sendPing() {
  788 + long currentTime = System.currentTimeMillis();
  789 + wsSessionsMap.values().forEach(md ->
  790 + executor.submit(() -> {
  791 + try {
  792 + msgEndpoint.sendPing(md.getSessionRef(), currentTime);
  793 + } catch (IOException e) {
  794 + log.warn("[{}] Failed to send ping: {}", md.getSessionRef().getSessionId(), e);
  795 + }
  796 + }));
  797 + }
777 798
778 799 private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
779 800 if (!StringUtils.isEmpty(cmd.getKeys())) {
... ...
... ... @@ -26,5 +26,7 @@ public interface TelemetryWebSocketMsgEndpoint {
26 26
27 27 void send(TelemetryWebSocketSessionRef sessionRef, int subscriptionId, String msg) throws IOException;
28 28
  29 + void sendPing(TelemetryWebSocketSessionRef sessionRef, long currentTime) throws IOException;
  30 +
29 31 void close(TelemetryWebSocketSessionRef sessionRef, CloseStatus withReason) throws IOException;
30 32 }
... ...
... ... @@ -34,6 +34,7 @@ server:
34 34 log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}"
35 35 ws:
36 36 send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
  37 + ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
37 38 limits:
38 39 # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
39 40 max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
... ...