Showing
1 changed file
with
12 additions
and
8 deletions
... | ... | @@ -232,7 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
232 | 232 | |
233 | 233 | if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { |
234 | 234 | if (gatewaySessionHandler != null) { |
235 | - handleGatewayPublishMsg(topicName, msgId, mqttMsg); | |
235 | + handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); | |
236 | 236 | transportService.reportActivity(deviceSessionCtx.getSessionInfo()); |
237 | 237 | } |
238 | 238 | } else { |
... | ... | @@ -240,7 +240,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
240 | 240 | } |
241 | 241 | } |
242 | 242 | |
243 | - private void handleGatewayPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { | |
243 | + private void handleGatewayPublishMsg(ChannelHandlerContext ctx, String topicName, int msgId, MqttPublishMessage mqttMsg) { | |
244 | 244 | try { |
245 | 245 | switch (topicName) { |
246 | 246 | case MqttTopics.GATEWAY_TELEMETRY_TOPIC: |
... | ... | @@ -264,6 +264,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
264 | 264 | case MqttTopics.GATEWAY_DISCONNECT_TOPIC: |
265 | 265 | gatewaySessionHandler.onDeviceDisconnect(mqttMsg); |
266 | 266 | break; |
267 | + default: | |
268 | + ack(ctx, msgId); | |
267 | 269 | } |
268 | 270 | } catch (RuntimeException | AdaptorException e) { |
269 | 271 | log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); |
... | ... | @@ -293,6 +295,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
293 | 295 | transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); |
294 | 296 | } else { |
295 | 297 | transportService.reportActivity(deviceSessionCtx.getSessionInfo()); |
298 | + ack(ctx, msgId); | |
296 | 299 | } |
297 | 300 | } catch (AdaptorException e) { |
298 | 301 | log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); |
... | ... | @@ -301,15 +304,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
301 | 304 | } |
302 | 305 | } |
303 | 306 | |
307 | + private void ack(ChannelHandlerContext ctx, int msgId) { | |
308 | + if (msgId > 0) { | |
309 | + ctx.writeAndFlush(createMqttPubAckMsg(msgId)); | |
310 | + } | |
311 | + } | |
304 | 312 | |
305 | 313 | private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) { |
306 | 314 | return new TransportServiceCallback<Void>() { |
307 | 315 | @Override |
308 | 316 | public void onSuccess(Void dummy) { |
309 | 317 | log.trace("[{}] Published msg: {}", sessionId, msg); |
310 | - if (msgId > 0) { | |
311 | - ctx.writeAndFlush(createMqttPubAckMsg(msgId)); | |
312 | - } | |
318 | + ack(ctx, msgId); | |
313 | 319 | } |
314 | 320 | |
315 | 321 | @Override |
... | ... | @@ -334,9 +340,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
334 | 340 | @Override |
335 | 341 | public void onSuccess(TransportProtos.ProvisionDeviceResponseMsg provisionResponseMsg) { |
336 | 342 | log.trace("[{}] Published msg: {}", sessionId, msg); |
337 | - if (msgId > 0) { | |
338 | - ctx.writeAndFlush(createMqttPubAckMsg(msgId)); | |
339 | - } | |
343 | + ack(ctx, msgId); | |
340 | 344 | try { |
341 | 345 | if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) { |
342 | 346 | deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); | ... | ... |