Commit 7e1466ffdeb5e2a8845568e56076aed1a9508fc9
1 parent
5a5211ac
Improvement to websocket logging and parallelizm
Showing
2 changed files
with
6 additions
and
5 deletions
@@ -51,9 +51,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr | @@ -51,9 +51,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr | ||
51 | @Override | 51 | @Override |
52 | public void handleTextMessage(WebSocketSession session, TextMessage message) { | 52 | public void handleTextMessage(WebSocketSession session, TextMessage message) { |
53 | try { | 53 | try { |
54 | - log.info("[{}] Processing {}", session.getId(), message); | ||
55 | SessionMetaData sessionMd = internalSessionMap.get(session.getId()); | 54 | SessionMetaData sessionMd = internalSessionMap.get(session.getId()); |
56 | if (sessionMd != null) { | 55 | if (sessionMd != null) { |
56 | + log.info("[{}][{}] Processing {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message); | ||
57 | webSocketService.handleWebSocketMsg(sessionMd.sessionRef, message.getPayload()); | 57 | webSocketService.handleWebSocketMsg(sessionMd.sessionRef, message.getPayload()); |
58 | } else { | 58 | } else { |
59 | log.warn("[{}] Failed to find session", session.getId()); | 59 | log.warn("[{}] Failed to find session", session.getId()); |
@@ -74,7 +74,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr | @@ -74,7 +74,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr | ||
74 | internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); | 74 | internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); |
75 | externalSessionMap.put(externalSessionId, internalSessionId); | 75 | externalSessionMap.put(externalSessionId, internalSessionId); |
76 | processInWebSocketService(sessionRef, SessionEvent.onEstablished()); | 76 | processInWebSocketService(sessionRef, SessionEvent.onEstablished()); |
77 | - log.info("[{}][{}] Session is started", externalSessionId, session.getId()); | 77 | + log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId()); |
78 | } catch (InvalidParameterException e) { | 78 | } catch (InvalidParameterException e) { |
79 | log.warn("[[{}] Failed to start session", session.getId(), e); | 79 | log.warn("[[{}] Failed to start session", session.getId(), e); |
80 | session.close(CloseStatus.BAD_DATA.withReason(e.getMessage())); | 80 | session.close(CloseStatus.BAD_DATA.withReason(e.getMessage())); |
@@ -72,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap; | @@ -72,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap; | ||
72 | import java.util.concurrent.ConcurrentMap; | 72 | import java.util.concurrent.ConcurrentMap; |
73 | import java.util.concurrent.ExecutorService; | 73 | import java.util.concurrent.ExecutorService; |
74 | import java.util.concurrent.Executors; | 74 | import java.util.concurrent.Executors; |
75 | +import java.util.concurrent.LinkedBlockingQueue; | ||
75 | import java.util.concurrent.SynchronousQueue; | 76 | import java.util.concurrent.SynchronousQueue; |
76 | import java.util.concurrent.ThreadPoolExecutor; | 77 | import java.util.concurrent.ThreadPoolExecutor; |
77 | import java.util.concurrent.TimeUnit; | 78 | import java.util.concurrent.TimeUnit; |
@@ -85,8 +86,8 @@ import java.util.stream.Collectors; | @@ -85,8 +86,8 @@ import java.util.stream.Collectors; | ||
85 | @Slf4j | 86 | @Slf4j |
86 | public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { | 87 | public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { |
87 | 88 | ||
88 | - public static final int DEFAULT_LIMIT = 100; | ||
89 | - public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; | 89 | + private static final int DEFAULT_LIMIT = 100; |
90 | + private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; | ||
90 | private static final int UNKNOWN_SUBSCRIPTION_ID = 0; | 91 | private static final int UNKNOWN_SUBSCRIPTION_ID = 0; |
91 | private static final String PROCESSING_MSG = "[{}] Processing: {}"; | 92 | private static final String PROCESSING_MSG = "[{}] Processing: {}"; |
92 | private static final ObjectMapper jsonMapper = new ObjectMapper(); | 93 | private static final ObjectMapper jsonMapper = new ObjectMapper(); |
@@ -115,7 +116,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | @@ -115,7 +116,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi | ||
115 | 116 | ||
116 | @PostConstruct | 117 | @PostConstruct |
117 | public void initExecutor() { | 118 | public void initExecutor() { |
118 | - executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); | 119 | + executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); |
119 | } | 120 | } |
120 | 121 | ||
121 | @PreDestroy | 122 | @PreDestroy |