Commit 44a1fa2911b1f51830c97ea98e4c221a67c4e2ad
1 parent
42a831a3
ReconnectDelay and MaxBytesInMessage configurable
Showing
2 changed files
with
38 additions
and
2 deletions
@@ -40,6 +40,8 @@ public final class MqttClientConfig { | @@ -40,6 +40,8 @@ public final class MqttClientConfig { | ||
40 | private Class<? extends Channel> channelClass = NioSocketChannel.class; | 40 | private Class<? extends Channel> channelClass = NioSocketChannel.class; |
41 | 41 | ||
42 | private boolean reconnect = true; | 42 | private boolean reconnect = true; |
43 | + private long reconnectDelay = 1L; | ||
44 | + private int maxBytesInMessage = 8092; | ||
43 | 45 | ||
44 | public MqttClientConfig() { | 46 | public MqttClientConfig() { |
45 | this(null); | 47 | this(null); |
@@ -146,4 +148,38 @@ public final class MqttClientConfig { | @@ -146,4 +148,38 @@ public final class MqttClientConfig { | ||
146 | public void setReconnect(boolean reconnect) { | 148 | public void setReconnect(boolean reconnect) { |
147 | this.reconnect = reconnect; | 149 | this.reconnect = reconnect; |
148 | } | 150 | } |
151 | + | ||
152 | + public long getReconnectDelay() { | ||
153 | + return reconnectDelay; | ||
154 | + } | ||
155 | + | ||
156 | + /** | ||
157 | + * Sets the reconnect delay in seconds. Defaults to 1 second. | ||
158 | + * @param reconnectDelay | ||
159 | + * @throws IllegalArgumentException if reconnectDelay is smaller than 1. | ||
160 | + */ | ||
161 | + public void setReconnectDelay(long reconnectDelay) { | ||
162 | + if (reconnectDelay <= 0) { | ||
163 | + throw new IllegalArgumentException("reconnectDelay must be > 0"); | ||
164 | + } | ||
165 | + this.reconnectDelay = reconnectDelay; | ||
166 | + } | ||
167 | + | ||
168 | + public int getMaxBytesInMessage() { | ||
169 | + return maxBytesInMessage; | ||
170 | + } | ||
171 | + | ||
172 | + /** | ||
173 | + * Sets the maximum number of bytes in the message for the {@link io.netty.handler.codec.mqtt.MqttDecoder}. | ||
174 | + * Default value is 8092 as specified by Netty. The absolute maximum size is 256MB as set by the MQTT spec. | ||
175 | + * | ||
176 | + * @param maxBytesInMessage | ||
177 | + * @throws IllegalArgumentException if maxBytesInMessage is smaller than 1 or greater than 256_000_000. | ||
178 | + */ | ||
179 | + public void setMaxBytesInMessage(int maxBytesInMessage) { | ||
180 | + if (maxBytesInMessage <= 0 || maxBytesInMessage > 256_000_000) { | ||
181 | + throw new IllegalArgumentException("maxBytesInMessage must be > 0 or < 256_000_000"); | ||
182 | + } | ||
183 | + this.maxBytesInMessage = maxBytesInMessage; | ||
184 | + } | ||
149 | } | 185 | } |
@@ -155,7 +155,7 @@ final class MqttClientImpl implements MqttClient { | @@ -155,7 +155,7 @@ final class MqttClientImpl implements MqttClient { | ||
155 | if (reconnect) { | 155 | if (reconnect) { |
156 | this.reconnect = true; | 156 | this.reconnect = true; |
157 | } | 157 | } |
158 | - eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), 1L, TimeUnit.SECONDS); | 158 | + eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), clientConfig.getReconnectDelay(), TimeUnit.SECONDS); |
159 | } | 159 | } |
160 | } | 160 | } |
161 | 161 | ||
@@ -512,7 +512,7 @@ final class MqttClientImpl implements MqttClient { | @@ -512,7 +512,7 @@ final class MqttClientImpl implements MqttClient { | ||
512 | ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port)); | 512 | ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port)); |
513 | } | 513 | } |
514 | 514 | ||
515 | - ch.pipeline().addLast("mqttDecoder", new MqttDecoder()); | 515 | + ch.pipeline().addLast("mqttDecoder", new MqttDecoder(clientConfig.getMaxBytesInMessage())); |
516 | ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE); | 516 | ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE); |
517 | ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0)); | 517 | ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0)); |
518 | ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds())); | 518 | ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds())); |