|
@@ -68,6 +68,14 @@ public class MqttTransportService { |
|
@@ -68,6 +68,14 @@ public class MqttTransportService { |
68
|
@Value("${mqtt.adaptor}")
|
68
|
@Value("${mqtt.adaptor}")
|
69
|
private String adaptorName;
|
69
|
private String adaptorName;
|
70
|
|
70
|
|
|
|
71
|
+ @Value("${mqtt.netty.leak_detector_level}")
|
|
|
72
|
+ private String leakDetectorLevel;
|
|
|
73
|
+ @Value("${mqtt.netty.boss_group_thread_count}")
|
|
|
74
|
+ private Integer bossGroupThreadCount;
|
|
|
75
|
+ @Value("${mqtt.netty.worker_group_thread_count}")
|
|
|
76
|
+ private Integer workerGroupThreadCount;
|
|
|
77
|
+
|
|
|
78
|
+
|
71
|
private MqttTransportAdaptor adaptor;
|
79
|
private MqttTransportAdaptor adaptor;
|
72
|
|
80
|
|
73
|
private Channel serverChannel;
|
81
|
private Channel serverChannel;
|
|
@@ -76,17 +84,19 @@ public class MqttTransportService { |
|
@@ -76,17 +84,19 @@ public class MqttTransportService { |
76
|
|
84
|
|
77
|
@PostConstruct
|
85
|
@PostConstruct
|
78
|
public void init() throws Exception {
|
86
|
public void init() throws Exception {
|
|
|
87
|
+ log.info("Setting resource leak detector level to {}", leakDetectorLevel);
|
|
|
88
|
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
|
|
|
89
|
+
|
79
|
log.info("Starting MQTT transport...");
|
90
|
log.info("Starting MQTT transport...");
|
80
|
log.info("Lookup MQTT transport adaptor {}", adaptorName);
|
91
|
log.info("Lookup MQTT transport adaptor {}", adaptorName);
|
81
|
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
|
92
|
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
|
82
|
|
93
|
|
83
|
log.info("Starting MQTT transport server");
|
94
|
log.info("Starting MQTT transport server");
|
84
|
- bossGroup = new NioEventLoopGroup(1);
|
|
|
85
|
- workerGroup = new NioEventLoopGroup();
|
95
|
+ bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
|
|
|
96
|
+ workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
|
86
|
ServerBootstrap b = new ServerBootstrap();
|
97
|
ServerBootstrap b = new ServerBootstrap();
|
87
|
b.group(bossGroup, workerGroup)
|
98
|
b.group(bossGroup, workerGroup)
|
88
|
.channel(NioServerSocketChannel.class)
|
99
|
.channel(NioServerSocketChannel.class)
|
89
|
- .handler(new LoggingHandler(LogLevel.TRACE))
|
|
|
90
|
.childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider));
|
100
|
.childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider));
|
91
|
|
101
|
|
92
|
serverChannel = b.bind(host, port).sync().channel();
|
102
|
serverChannel = b.bind(host, port).sync().channel();
|