Showing
4 changed files
with
14 additions
and
8 deletions
@@ -87,6 +87,7 @@ mqtt: | @@ -87,6 +87,7 @@ mqtt: | ||
87 | leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" | 87 | leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}" |
88 | boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" | 88 | boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}" |
89 | worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" | 89 | worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" |
90 | + max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}" | ||
90 | # MQTT SSL configuration | 91 | # MQTT SSL configuration |
91 | ssl: | 92 | ssl: |
92 | # Enable/disable SSL support | 93 | # Enable/disable SSL support |
@@ -60,6 +60,10 @@ public abstract class AbstractCassandraCluster { | @@ -60,6 +60,10 @@ public abstract class AbstractCassandraCluster { | ||
60 | private long initTimeout; | 60 | private long initTimeout; |
61 | @Value("${cassandra.init_retry_interval_ms}") | 61 | @Value("${cassandra.init_retry_interval_ms}") |
62 | private long initRetryInterval; | 62 | private long initRetryInterval; |
63 | + @Value("${cassandra.max_requests_per_connection_local:128}") | ||
64 | + private int max_requests_local; | ||
65 | + @Value("${cassandra.max_requests_per_connection_remote:128}") | ||
66 | + private int max_requests_remote; | ||
63 | 67 | ||
64 | @Autowired | 68 | @Autowired |
65 | private CassandraSocketOptions socketOpts; | 69 | private CassandraSocketOptions socketOpts; |
@@ -90,8 +94,8 @@ public abstract class AbstractCassandraCluster { | @@ -90,8 +94,8 @@ public abstract class AbstractCassandraCluster { | ||
90 | .withClusterName(clusterName) | 94 | .withClusterName(clusterName) |
91 | .withSocketOptions(socketOpts.getOpts()) | 95 | .withSocketOptions(socketOpts.getOpts()) |
92 | .withPoolingOptions(new PoolingOptions() | 96 | .withPoolingOptions(new PoolingOptions() |
93 | - .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) | ||
94 | - .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)); | 97 | + .setMaxRequestsPerConnection(HostDistance.LOCAL, max_requests_local) |
98 | + .setMaxRequestsPerConnection(HostDistance.REMOTE, max_requests_remote)); | ||
95 | this.clusterBuilder.withQueryOptions(queryOpts.getOpts()); | 99 | this.clusterBuilder.withQueryOptions(queryOpts.getOpts()); |
96 | this.clusterBuilder.withCompression(StringUtils.isEmpty(compression) ? Compression.NONE : Compression.valueOf(compression.toUpperCase())); | 100 | this.clusterBuilder.withCompression(StringUtils.isEmpty(compression) ? Compression.NONE : Compression.valueOf(compression.toUpperCase())); |
97 | if (ssl) { | 101 | if (ssl) { |
@@ -33,8 +33,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; | @@ -33,8 +33,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; | ||
33 | */ | 33 | */ |
34 | public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> { | 34 | public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> { |
35 | 35 | ||
36 | - private static final int MAX_PAYLOAD_SIZE = 64 * 1024 * 1024; | ||
37 | - | ||
38 | private final SessionMsgProcessor processor; | 36 | private final SessionMsgProcessor processor; |
39 | private final DeviceService deviceService; | 37 | private final DeviceService deviceService; |
40 | private final DeviceAuthService authService; | 38 | private final DeviceAuthService authService; |
@@ -42,10 +40,11 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | @@ -42,10 +40,11 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | ||
42 | private final MqttTransportAdaptor adaptor; | 40 | private final MqttTransportAdaptor adaptor; |
43 | private final MqttSslHandlerProvider sslHandlerProvider; | 41 | private final MqttSslHandlerProvider sslHandlerProvider; |
44 | private final QuotaService quotaService; | 42 | private final QuotaService quotaService; |
43 | + private final int maxPayloadSize; | ||
45 | 44 | ||
46 | public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, | 45 | public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, |
47 | MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider, | 46 | MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider, |
48 | - QuotaService quotaService) { | 47 | + QuotaService quotaService, int maxPayloadSize) { |
49 | this.processor = processor; | 48 | this.processor = processor; |
50 | this.deviceService = deviceService; | 49 | this.deviceService = deviceService; |
51 | this.authService = authService; | 50 | this.authService = authService; |
@@ -53,6 +52,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | @@ -53,6 +52,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | ||
53 | this.adaptor = adaptor; | 52 | this.adaptor = adaptor; |
54 | this.sslHandlerProvider = sslHandlerProvider; | 53 | this.sslHandlerProvider = sslHandlerProvider; |
55 | this.quotaService = quotaService; | 54 | this.quotaService = quotaService; |
55 | + this.maxPayloadSize = maxPayloadSize; | ||
56 | } | 56 | } |
57 | 57 | ||
58 | @Override | 58 | @Override |
@@ -63,7 +63,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | @@ -63,7 +63,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha | ||
63 | sslHandler = sslHandlerProvider.getSslHandler(); | 63 | sslHandler = sslHandlerProvider.getSslHandler(); |
64 | pipeline.addLast(sslHandler); | 64 | pipeline.addLast(sslHandler); |
65 | } | 65 | } |
66 | - pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE)); | 66 | + pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize)); |
67 | pipeline.addLast("encoder", MqttEncoder.INSTANCE); | 67 | pipeline.addLast("encoder", MqttEncoder.INSTANCE); |
68 | 68 | ||
69 | MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, | 69 | MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, |
@@ -82,7 +82,8 @@ public class MqttTransportService { | @@ -82,7 +82,8 @@ public class MqttTransportService { | ||
82 | private Integer bossGroupThreadCount; | 82 | private Integer bossGroupThreadCount; |
83 | @Value("${mqtt.netty.worker_group_thread_count}") | 83 | @Value("${mqtt.netty.worker_group_thread_count}") |
84 | private Integer workerGroupThreadCount; | 84 | private Integer workerGroupThreadCount; |
85 | - | 85 | + @Value("${mqtt.netty.max_payload_size}") |
86 | + private Integer maxPayloadSize; | ||
86 | 87 | ||
87 | private MqttTransportAdaptor adaptor; | 88 | private MqttTransportAdaptor adaptor; |
88 | 89 | ||
@@ -106,7 +107,7 @@ public class MqttTransportService { | @@ -106,7 +107,7 @@ public class MqttTransportService { | ||
106 | b.group(bossGroup, workerGroup) | 107 | b.group(bossGroup, workerGroup) |
107 | .channel(NioServerSocketChannel.class) | 108 | .channel(NioServerSocketChannel.class) |
108 | .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, | 109 | .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, |
109 | - adaptor, sslHandlerProvider, quotaService)); | 110 | + adaptor, sslHandlerProvider, quotaService, maxPayloadSize)); |
110 | 111 | ||
111 | serverChannel = b.bind(host, port).sync().channel(); | 112 | serverChannel = b.bind(host, port).sync().channel(); |
112 | log.info("Mqtt transport started!"); | 113 | log.info("Mqtt transport started!"); |