Commit 0d64a64bfe1686077e5841bd0b66f48141ce0947

Authored by Igor Kulikov
2 parents bfee3016 ccab1aa2

Merge with develop/2.5.6

... ... @@ -462,26 +462,26 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
462 462 String clientId = msg.payload().clientIdentifier();
463 463 if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
464 464 deviceSessionCtx.setProvisionOnly(true);
465   - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  465 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg));
466 466 } else {
467 467 X509Certificate cert;
468 468 if (sslHandler != null && (cert = getX509Certificate()) != null) {
469   - processX509CertConnect(ctx, cert);
  469 + processX509CertConnect(ctx, cert, msg);
470 470 } else {
471 471 processAuthTokenConnect(ctx, msg);
472 472 }
473 473 }
474 474 }
475 475
476   - private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
477   - String userName = msg.payload().userName();
  476 + private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  477 + String userName = connectMessage.payload().userName();
478 478 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
479 479 TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder()
480   - .setClientId(msg.payload().clientIdentifier());
  480 + .setClientId(connectMessage.payload().clientIdentifier());
481 481 if (userName != null) {
482 482 request.setUserName(userName);
483 483 }
484   - String password = msg.payload().password();
  484 + String password = connectMessage.payload().password();
485 485 if (password != null) {
486 486 request.setPassword(password);
487 487 }
... ... @@ -489,19 +489,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
489 489 new TransportServiceCallback<ValidateDeviceCredentialsResponse>() {
490 490 @Override
491 491 public void onSuccess(ValidateDeviceCredentialsResponse msg) {
492   - onValidateDeviceResponse(msg, ctx);
  492 + onValidateDeviceResponse(msg, ctx, connectMessage);
493 493 }
494 494
495 495 @Override
496 496 public void onError(Throwable e) {
497 497 log.trace("[{}] Failed to process credentials: {}", address, userName, e);
498   - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  498 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
499 499 ctx.close();
500 500 }
501 501 });
502 502 }
503 503
504   - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
  504 + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage connectMessage) {
505 505 try {
506 506 if (!context.isSkipValidityCheckForClientCert()) {
507 507 cert.checkValidity();
... ... @@ -512,18 +512,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
512 512 new TransportServiceCallback<ValidateDeviceCredentialsResponse>() {
513 513 @Override
514 514 public void onSuccess(ValidateDeviceCredentialsResponse msg) {
515   - onValidateDeviceResponse(msg, ctx);
  515 + onValidateDeviceResponse(msg, ctx, connectMessage);
516 516 }
517 517
518 518 @Override
519 519 public void onError(Throwable e) {
520 520 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
521   - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  521 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
522 522 ctx.close();
523 523 }
524 524 });
525 525 } catch (Exception e) {
526   - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
  526 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
527 527 ctx.close();
528 528 }
529 529 }
... ... @@ -547,11 +547,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
547 547 doDisconnect();
548 548 }
549 549
550   - private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
  550 + private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) {
551 551 MqttFixedHeader mqttFixedHeader =
552 552 new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
553 553 MqttConnAckVariableHeader mqttConnAckVariableHeader =
554   - new MqttConnAckVariableHeader(returnCode, true);
  554 + new MqttConnAckVariableHeader(returnCode, !msg.variableHeader().isCleanSession());
555 555 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
556 556 }
557 557
... ... @@ -627,9 +627,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
627 627 }
628 628 }
629 629
630   - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx) {
  630 +
  631 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
631 632 if (!msg.hasDeviceInfo()) {
632   - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
  633 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
633 634 ctx.close();
634 635 } else {
635 636 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
... ... @@ -640,7 +641,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
640 641 public void onSuccess(Void msg) {
641 642 transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
642 643 checkGatewaySession();
643   - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  644 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
644 645 log.info("[{}] Client connected!", sessionId);
645 646 }
646 647
... ... @@ -651,7 +652,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
651 652 } else {
652 653 log.warn("[{}] Failed to submit session event", sessionId, e);
653 654 }
654   - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  655 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
655 656 ctx.close();
656 657 }
657 658 });
... ...
... ... @@ -107,4 +107,4 @@ public class CassandraPartitionsCacheTest {
107 107 }
108 108 verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
109 109 }
110   -}
\ No newline at end of file
  110 +}
... ...