Commit 4255bf485a59555da5c1c463a98d311679d652c0

Authored by Igor Kulikov
1 parent e31052bc

Fixed memory leaks in MqttTransportHandler (#1787)

@@ -34,6 +34,7 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; @@ -34,6 +34,7 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
34 import io.netty.handler.codec.mqtt.MqttTopicSubscription; 34 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
35 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; 35 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
36 import io.netty.handler.ssl.SslHandler; 36 import io.netty.handler.ssl.SslHandler;
  37 +import io.netty.util.ReferenceCountUtil;
37 import io.netty.util.concurrent.Future; 38 import io.netty.util.concurrent.Future;
38 import io.netty.util.concurrent.GenericFutureListener; 39 import io.netty.util.concurrent.GenericFutureListener;
39 import lombok.extern.slf4j.Slf4j; 40 import lombok.extern.slf4j.Slf4j;
@@ -112,10 +113,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -112,10 +113,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
112 @Override 113 @Override
113 public void channelRead(ChannelHandlerContext ctx, Object msg) { 114 public void channelRead(ChannelHandlerContext ctx, Object msg) {
114 log.trace("[{}] Processing msg: {}", sessionId, msg); 115 log.trace("[{}] Processing msg: {}", sessionId, msg);
115 - if (msg instanceof MqttMessage) {  
116 - processMqttMsg(ctx, (MqttMessage) msg);  
117 - } else {  
118 - ctx.close(); 116 + try {
  117 + if (msg instanceof MqttMessage) {
  118 + processMqttMsg(ctx, (MqttMessage) msg);
  119 + } else {
  120 + ctx.close();
  121 + }
  122 + } finally {
  123 + ReferenceCountUtil.safeRelease(msg);
119 } 124 }
120 } 125 }
121 126
@@ -422,6 +427,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -422,6 +427,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
422 } 427 }
423 428
424 @Override 429 @Override
  430 + public void channelReadComplete(ChannelHandlerContext ctx) {
  431 + ctx.flush();
  432 + }
  433 +
  434 + @Override
425 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 435 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
426 log.error("[{}] Unexpected Exception", sessionId, cause); 436 log.error("[{}] Unexpected Exception", sessionId, cause);
427 ctx.close(); 437 ctx.close();
@@ -212,18 +212,14 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -212,18 +212,14 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
212 } 212 }
213 213
214 private static String validatePayload(UUID sessionId, ByteBuf payloadData, boolean isEmptyPayloadAllowed) throws AdaptorException { 214 private static String validatePayload(UUID sessionId, ByteBuf payloadData, boolean isEmptyPayloadAllowed) throws AdaptorException {
215 - try {  
216 - String payload = payloadData.toString(UTF8);  
217 - if (payload == null) {  
218 - log.warn("[{}] Payload is empty!", sessionId);  
219 - if (!isEmptyPayloadAllowed) {  
220 - throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));  
221 - } 215 + String payload = payloadData.toString(UTF8);
  216 + if (payload == null) {
  217 + log.warn("[{}] Payload is empty!", sessionId);
  218 + if (!isEmptyPayloadAllowed) {
  219 + throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
222 } 220 }
223 - return payload;  
224 - } finally {  
225 - payloadData.release();  
226 } 221 }
  222 + return payload;
227 } 223 }
228 224
229 } 225 }