Commit 005c886cc59d6f7a6fd7862459bc1942eb1df628

Authored by Andrew Shvayka
Committed by GitHub
2 parents 70041766 3efa6a59

Merge pull request #36 from thingsboard/feature/TB-34

TB-34: Implementation
@@ -75,6 +75,10 @@ mqtt: @@ -75,6 +75,10 @@ mqtt:
75 bind_port: "${MQTT_BIND_PORT:1883}" 75 bind_port: "${MQTT_BIND_PORT:1883}"
76 adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" 76 adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
77 timeout: "${MQTT_TIMEOUT:10000}" 77 timeout: "${MQTT_TIMEOUT:10000}"
  78 + netty:
  79 + leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
  80 + boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
  81 + worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
78 # Uncomment the following lines to enable ssl for MQTT 82 # Uncomment the following lines to enable ssl for MQTT
79 ssl: 83 ssl:
80 key_store: keystore/mqttserver.jks 84 key_store: keystore/mqttserver.jks
@@ -68,6 +68,14 @@ public class MqttTransportService { @@ -68,6 +68,14 @@ public class MqttTransportService {
68 @Value("${mqtt.adaptor}") 68 @Value("${mqtt.adaptor}")
69 private String adaptorName; 69 private String adaptorName;
70 70
  71 + @Value("${mqtt.netty.leak_detector_level}")
  72 + private String leakDetectorLevel;
  73 + @Value("${mqtt.netty.boss_group_thread_count}")
  74 + private Integer bossGroupThreadCount;
  75 + @Value("${mqtt.netty.worker_group_thread_count}")
  76 + private Integer workerGroupThreadCount;
  77 +
  78 +
71 private MqttTransportAdaptor adaptor; 79 private MqttTransportAdaptor adaptor;
72 80
73 private Channel serverChannel; 81 private Channel serverChannel;
@@ -76,17 +84,19 @@ public class MqttTransportService { @@ -76,17 +84,19 @@ public class MqttTransportService {
76 84
77 @PostConstruct 85 @PostConstruct
78 public void init() throws Exception { 86 public void init() throws Exception {
  87 + log.info("Setting resource leak detector level to {}", leakDetectorLevel);
  88 + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
  89 +
79 log.info("Starting MQTT transport..."); 90 log.info("Starting MQTT transport...");
80 log.info("Lookup MQTT transport adaptor {}", adaptorName); 91 log.info("Lookup MQTT transport adaptor {}", adaptorName);
81 this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); 92 this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
82 93
83 log.info("Starting MQTT transport server"); 94 log.info("Starting MQTT transport server");
84 - bossGroup = new NioEventLoopGroup(1);  
85 - workerGroup = new NioEventLoopGroup(); 95 + bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
  96 + workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
86 ServerBootstrap b = new ServerBootstrap(); 97 ServerBootstrap b = new ServerBootstrap();
87 b.group(bossGroup, workerGroup) 98 b.group(bossGroup, workerGroup)
88 .channel(NioServerSocketChannel.class) 99 .channel(NioServerSocketChannel.class)
89 - .handler(new LoggingHandler(LogLevel.TRACE))  
90 .childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider)); 100 .childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider));
91 101
92 serverChannel = b.bind(host, port).sync().channel(); 102 serverChannel = b.bind(host, port).sync().channel();