Commit ccab1aa2c96bdc969bcba3d15d89203b212331a8

Authored by Igor Kulikov
Committed by GitHub
2 parents 2ea3b187 16d82e77

Merge pull request #3887 from YevhenBondarenko/mqtt-transport-improvements

Mqtt flag sessionPresent depends on flag isCleanSession
@@ -357,37 +357,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -357,37 +357,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
357 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); 357 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
358 X509Certificate cert; 358 X509Certificate cert;
359 if (sslHandler != null && (cert = getX509Certificate()) != null) { 359 if (sslHandler != null && (cert = getX509Certificate()) != null) {
360 - processX509CertConnect(ctx, cert); 360 + processX509CertConnect(ctx, cert, msg);
361 } else { 361 } else {
362 processAuthTokenConnect(ctx, msg); 362 processAuthTokenConnect(ctx, msg);
363 } 363 }
364 } 364 }
365 365
366 - private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {  
367 - String userName = msg.payload().userName(); 366 + private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  367 + String userName = connectMessage.payload().userName();
368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName); 368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
369 if (StringUtils.isEmpty(userName)) { 369 if (StringUtils.isEmpty(userName)) {
370 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); 370 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, connectMessage));
371 ctx.close(); 371 ctx.close();
372 } else { 372 } else {
373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), 373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
375 @Override 375 @Override
376 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { 376 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
377 - onValidateDeviceResponse(msg, ctx); 377 + onValidateDeviceResponse(msg, ctx, connectMessage);
378 } 378 }
379 379
380 @Override 380 @Override
381 public void onError(Throwable e) { 381 public void onError(Throwable e) {
382 log.trace("[{}] Failed to process credentials: {}", address, userName, e); 382 log.trace("[{}] Failed to process credentials: {}", address, userName, e);
383 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 383 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
384 ctx.close(); 384 ctx.close();
385 } 385 }
386 }); 386 });
387 } 387 }
388 } 388 }
389 389
390 - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { 390 + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage connectMessage) {
391 try { 391 try {
392 if(!context.isSkipValidityCheckForClientCert()){ 392 if(!context.isSkipValidityCheckForClientCert()){
393 cert.checkValidity(); 393 cert.checkValidity();
@@ -398,18 +398,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -398,18 +398,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
399 @Override 399 @Override
400 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) { 400 public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
401 - onValidateDeviceResponse(msg, ctx); 401 + onValidateDeviceResponse(msg, ctx, connectMessage);
402 } 402 }
403 403
404 @Override 404 @Override
405 public void onError(Throwable e) { 405 public void onError(Throwable e) {
406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); 406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
407 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 407 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
408 ctx.close(); 408 ctx.close();
409 } 409 }
410 }); 410 });
411 } catch (Exception e) { 411 } catch (Exception e) {
412 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); 412 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
413 ctx.close(); 413 ctx.close();
414 } 414 }
415 } 415 }
@@ -433,11 +433,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -433,11 +433,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
433 doDisconnect(); 433 doDisconnect();
434 } 434 }
435 435
436 - private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) { 436 + private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) {
437 MqttFixedHeader mqttFixedHeader = 437 MqttFixedHeader mqttFixedHeader =
438 new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0); 438 new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
439 MqttConnAckVariableHeader mqttConnAckVariableHeader = 439 MqttConnAckVariableHeader mqttConnAckVariableHeader =
440 - new MqttConnAckVariableHeader(returnCode, true); 440 + new MqttConnAckVariableHeader(returnCode, !msg.variableHeader().isCleanSession());
441 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); 441 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
442 } 442 }
443 443
@@ -513,9 +513,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -513,9 +513,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
513 } 513 }
514 } 514 }
515 515
516 - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) { 516 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
517 if (!msg.hasDeviceInfo()) { 517 if (!msg.hasDeviceInfo()) {
518 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); 518 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
519 ctx.close(); 519 ctx.close();
520 } else { 520 } else {
521 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); 521 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
@@ -532,17 +532,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -532,17 +532,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
532 .build(); 532 .build();
533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { 533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
534 @Override 534 @Override
535 - public void onSuccess(Void msg) { 535 + public void onSuccess(Void response) {
536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); 536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this);
537 checkGatewaySession(); 537 checkGatewaySession();
538 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); 538 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
539 log.info("[{}] Client connected!", sessionId); 539 log.info("[{}] Client connected!", sessionId);
540 } 540 }
541 541
542 @Override 542 @Override
543 public void onError(Throwable e) { 543 public void onError(Throwable e) {
544 log.warn("[{}] Failed to submit session event", sessionId, e); 544 log.warn("[{}] Failed to submit session event", sessionId, e);
545 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 545 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
546 ctx.close(); 546 ctx.close();
547 } 547 }
548 }); 548 });