Commit 7563e030743d81b9c54f311147d34fde6deae225

Authored by Andrew Shvayka
1 parent cdb2903f

Adding extra logs on transport layer

@@ -166,7 +166,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -166,7 +166,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
166 } 166 }
167 String topicName = mqttMsg.variableHeader().topicName(); 167 String topicName = mqttMsg.variableHeader().topicName();
168 int msgId = mqttMsg.variableHeader().packetId(); 168 int msgId = mqttMsg.variableHeader().packetId();
169 - log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); 169 + log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
170 170
171 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { 171 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
172 if (gatewaySessionHandler != null) { 172 if (gatewaySessionHandler != null) {
@@ -336,6 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -336,6 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
336 336
337 private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { 337 private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
338 String userName = msg.payload().userName(); 338 String userName = msg.payload().userName();
  339 + log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
339 if (StringUtils.isEmpty(userName)) { 340 if (StringUtils.isEmpty(userName)) {
340 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); 341 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
341 ctx.close(); 342 ctx.close();
@@ -43,7 +43,7 @@ public interface TransportService { @@ -43,7 +43,7 @@ public interface TransportService {
43 void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, 43 void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
44 TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback); 44 TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
45 45
46 - boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback); 46 + boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
47 47
48 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback); 48 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
49 49
@@ -68,7 +68,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -68,7 +68,7 @@ public abstract class AbstractTransportService implements TransportService {
68 68
69 @Override 69 @Override
70 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) { 70 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
71 - if (checkLimits(sessionInfo, callback)) { 71 + if (checkLimits(sessionInfo, msg, callback)) {
72 reportActivityInternal(sessionInfo); 72 reportActivityInternal(sessionInfo);
73 doProcess(sessionInfo, msg, callback); 73 doProcess(sessionInfo, msg, callback);
74 } 74 }
@@ -76,7 +76,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -76,7 +76,7 @@ public abstract class AbstractTransportService implements TransportService {
76 76
77 @Override 77 @Override
78 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 78 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
79 - if (checkLimits(sessionInfo, callback)) { 79 + if (checkLimits(sessionInfo, msg, callback)) {
80 reportActivityInternal(sessionInfo); 80 reportActivityInternal(sessionInfo);
81 doProcess(sessionInfo, msg, callback); 81 doProcess(sessionInfo, msg, callback);
82 } 82 }
@@ -84,7 +84,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -84,7 +84,7 @@ public abstract class AbstractTransportService implements TransportService {
84 84
85 @Override 85 @Override
86 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 86 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
87 - if (checkLimits(sessionInfo, callback)) { 87 + if (checkLimits(sessionInfo, msg, callback)) {
88 reportActivityInternal(sessionInfo); 88 reportActivityInternal(sessionInfo);
89 doProcess(sessionInfo, msg, callback); 89 doProcess(sessionInfo, msg, callback);
90 } 90 }
@@ -92,7 +92,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -92,7 +92,7 @@ public abstract class AbstractTransportService implements TransportService {
92 92
93 @Override 93 @Override
94 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 94 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
95 - if (checkLimits(sessionInfo, callback)) { 95 + if (checkLimits(sessionInfo, msg, callback)) {
96 reportActivityInternal(sessionInfo); 96 reportActivityInternal(sessionInfo);
97 doProcess(sessionInfo, msg, callback); 97 doProcess(sessionInfo, msg, callback);
98 } 98 }
@@ -100,7 +100,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -100,7 +100,7 @@ public abstract class AbstractTransportService implements TransportService {
100 100
101 @Override 101 @Override
102 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 102 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
103 - if (checkLimits(sessionInfo, callback)) { 103 + if (checkLimits(sessionInfo, msg, callback)) {
104 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); 104 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
105 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); 105 sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
106 doProcess(sessionInfo, msg, callback); 106 doProcess(sessionInfo, msg, callback);
@@ -109,7 +109,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -109,7 +109,7 @@ public abstract class AbstractTransportService implements TransportService {
109 109
110 @Override 110 @Override
111 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 111 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
112 - if (checkLimits(sessionInfo, callback)) { 112 + if (checkLimits(sessionInfo, msg, callback)) {
113 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); 113 SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
114 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); 114 sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
115 doProcess(sessionInfo, msg, callback); 115 doProcess(sessionInfo, msg, callback);
@@ -118,7 +118,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -118,7 +118,7 @@ public abstract class AbstractTransportService implements TransportService {
118 118
119 @Override 119 @Override
120 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 120 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
121 - if (checkLimits(sessionInfo, callback)) { 121 + if (checkLimits(sessionInfo, msg, callback)) {
122 reportActivityInternal(sessionInfo); 122 reportActivityInternal(sessionInfo);
123 doProcess(sessionInfo, msg, callback); 123 doProcess(sessionInfo, msg, callback);
124 } 124 }
@@ -126,7 +126,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -126,7 +126,7 @@ public abstract class AbstractTransportService implements TransportService {
126 126
127 @Override 127 @Override
128 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 128 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
129 - if (checkLimits(sessionInfo, callback)) { 129 + if (checkLimits(sessionInfo, msg, callback)) {
130 reportActivityInternal(sessionInfo); 130 reportActivityInternal(sessionInfo);
131 doProcess(sessionInfo, msg, callback); 131 doProcess(sessionInfo, msg, callback);
132 } 132 }
@@ -196,7 +196,10 @@ public abstract class AbstractTransportService implements TransportService { @@ -196,7 +196,10 @@ public abstract class AbstractTransportService implements TransportService {
196 } 196 }
197 197
198 @Override 198 @Override
199 - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) { 199 + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
  200 + if (log.isTraceEnabled()) {
  201 + log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
  202 + }
200 if (!rateLimitEnabled) { 203 if (!rateLimitEnabled) {
201 return true; 204 return true;
202 } 205 }
@@ -206,6 +209,9 @@ public abstract class AbstractTransportService implements TransportService { @@ -206,6 +209,9 @@ public abstract class AbstractTransportService implements TransportService {
206 if (callback != null) { 209 if (callback != null) {
207 callback.onError(new TbRateLimitsException(EntityType.TENANT)); 210 callback.onError(new TbRateLimitsException(EntityType.TENANT));
208 } 211 }
  212 + if (log.isTraceEnabled()) {
  213 + log.trace("[{}][{}] Tenant level rate limit detected: {}", toId(sessionInfo), tenantId, msg);
  214 + }
209 return false; 215 return false;
210 } 216 }
211 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 217 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
@@ -214,8 +220,12 @@ public abstract class AbstractTransportService implements TransportService { @@ -214,8 +220,12 @@ public abstract class AbstractTransportService implements TransportService {
214 if (callback != null) { 220 if (callback != null) {
215 callback.onError(new TbRateLimitsException(EntityType.DEVICE)); 221 callback.onError(new TbRateLimitsException(EntityType.DEVICE));
216 } 222 }
  223 + if (log.isTraceEnabled()) {
  224 + log.trace("[{}][{}] Device level rate limit detected: {}", toId(sessionInfo), deviceId, msg);
  225 + }
217 return false; 226 return false;
218 } 227 }
  228 +
219 return true; 229 return true;
220 } 230 }
221 231
@@ -250,11 +260,11 @@ public abstract class AbstractTransportService implements TransportService { @@ -250,11 +260,11 @@ public abstract class AbstractTransportService implements TransportService {
250 } 260 }
251 } 261 }
252 262
253 - private UUID toId(TransportProtos.SessionInfoProto sessionInfo) { 263 + protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
254 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); 264 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
255 } 265 }
256 266
257 - String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { 267 + protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
258 return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); 268 return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
259 } 269 }
260 270
@@ -197,6 +197,7 @@ public class RemoteTransportService extends AbstractTransportService { @@ -197,6 +197,7 @@ public class RemoteTransportService extends AbstractTransportService {
197 197
198 @Override 198 @Override
199 public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) { 199 public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  200 + log.trace("Processing msg: {}", msg);
200 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), 201 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
201 TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()), 202 TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
202 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); 203 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -204,6 +205,7 @@ public class RemoteTransportService extends AbstractTransportService { @@ -204,6 +205,7 @@ public class RemoteTransportService extends AbstractTransportService {
204 205
205 @Override 206 @Override
206 public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) { 207 public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  208 + log.trace("Processing msg: {}", msg);
207 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(), 209 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
208 TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()), 210 TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
209 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); 211 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -211,6 +213,7 @@ public class RemoteTransportService extends AbstractTransportService { @@ -211,6 +213,7 @@ public class RemoteTransportService extends AbstractTransportService {
211 213
212 @Override 214 @Override
213 public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) { 215 public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
  216 + log.trace("Processing msg: {}", msg);
214 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(), 217 AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(),
215 TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()), 218 TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
216 response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); 219 response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -218,6 +221,9 @@ public class RemoteTransportService extends AbstractTransportService { @@ -218,6 +221,9 @@ public class RemoteTransportService extends AbstractTransportService {
218 221
219 @Override 222 @Override
220 public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) { 223 public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
  224 + if (log.isTraceEnabled()) {
  225 + log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
  226 + }
221 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 227 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
222 TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 228 TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
223 .setSubscriptionInfo(msg).build() 229 .setSubscriptionInfo(msg).build()
@@ -45,7 +45,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @@ -45,7 +45,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
45 this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB())); 45 this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
46 } 46 }
47 47
48 -  
49 public boolean isConnected() { 48 public boolean isConnected() {
50 return deviceInfo != null; 49 return deviceInfo != null;
51 } 50 }