Commit bff16ddafd650f5f1117fd0a73131ee5e07bc133

Authored by YevhenBondarenko
1 parent 2ea3b187

Mqtt flag sessionPresent depends on flag isCleanSession

@@ -357,7 +357,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -357,7 +357,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
357 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier()); 357 log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
358 X509Certificate cert; 358 X509Certificate cert;
359 if (sslHandler != null && (cert = getX509Certificate()) != null) { 359 if (sslHandler != null && (cert = getX509Certificate()) != null) {
360 - processX509CertConnect(ctx, cert); 360 + processX509CertConnect(ctx, cert, msg);
361 } else { 361 } else {
362 processAuthTokenConnect(ctx, msg); 362 processAuthTokenConnect(ctx, msg);
363 } 363 }
@@ -367,27 +367,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -367,27 +367,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
367 String userName = msg.payload().userName(); 367 String userName = msg.payload().userName();
368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName); 368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
369 if (StringUtils.isEmpty(userName)) { 369 if (StringUtils.isEmpty(userName)) {
370 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)); 370 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, msg));
371 ctx.close(); 371 ctx.close();
372 } else { 372 } else {
373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), 373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
375 @Override 375 @Override
376 - public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {  
377 - onValidateDeviceResponse(msg, ctx); 376 + public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) {
  377 + onValidateDeviceResponse(responseMsg, ctx, msg);
378 } 378 }
379 379
380 @Override 380 @Override
381 public void onError(Throwable e) { 381 public void onError(Throwable e) {
382 log.trace("[{}] Failed to process credentials: {}", address, userName, e); 382 log.trace("[{}] Failed to process credentials: {}", address, userName, e);
383 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 383 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg));
384 ctx.close(); 384 ctx.close();
385 } 385 }
386 }); 386 });
387 } 387 }
388 } 388 }
389 389
390 - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { 390 + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage msg) {
391 try { 391 try {
392 if(!context.isSkipValidityCheckForClientCert()){ 392 if(!context.isSkipValidityCheckForClientCert()){
393 cert.checkValidity(); 393 cert.checkValidity();
@@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
397 transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), 397 transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
399 @Override 399 @Override
400 - public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {  
401 - onValidateDeviceResponse(msg, ctx); 400 + public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) {
  401 + onValidateDeviceResponse(responseMsg, ctx, msg);
402 } 402 }
403 403
404 @Override 404 @Override
405 public void onError(Throwable e) { 405 public void onError(Throwable e) {
406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); 406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
407 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 407 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg));
408 ctx.close(); 408 ctx.close();
409 } 409 }
410 }); 410 });
411 } catch (Exception e) { 411 } catch (Exception e) {
412 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); 412 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg));
413 ctx.close(); 413 ctx.close();
414 } 414 }
415 } 415 }
@@ -433,11 +433,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -433,11 +433,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
433 doDisconnect(); 433 doDisconnect();
434 } 434 }
435 435
436 - private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) { 436 + private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) {
437 MqttFixedHeader mqttFixedHeader = 437 MqttFixedHeader mqttFixedHeader =
438 new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0); 438 new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
439 MqttConnAckVariableHeader mqttConnAckVariableHeader = 439 MqttConnAckVariableHeader mqttConnAckVariableHeader =
440 - new MqttConnAckVariableHeader(returnCode, true); 440 + new MqttConnAckVariableHeader(returnCode, !msg.variableHeader().isCleanSession());
441 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); 441 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
442 } 442 }
443 443
@@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
513 } 513 }
514 } 514 }
515 515
516 - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {  
517 - if (!msg.hasDeviceInfo()) {  
518 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); 516 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg responseMsg, ChannelHandlerContext ctx, MqttConnectMessage msg) {
  517 + if (!responseMsg.hasDeviceInfo()) {
  518 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg));
519 ctx.close(); 519 ctx.close();
520 } else { 520 } else {
521 - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); 521 + deviceSessionCtx.setDeviceInfo(responseMsg.getDeviceInfo());
522 sessionInfo = SessionInfoProto.newBuilder() 522 sessionInfo = SessionInfoProto.newBuilder()
523 .setNodeId(context.getNodeId()) 523 .setNodeId(context.getNodeId())
524 .setSessionIdMSB(sessionId.getMostSignificantBits()) 524 .setSessionIdMSB(sessionId.getMostSignificantBits())
525 .setSessionIdLSB(sessionId.getLeastSignificantBits()) 525 .setSessionIdLSB(sessionId.getLeastSignificantBits())
526 - .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())  
527 - .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())  
528 - .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())  
529 - .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())  
530 - .setDeviceName(msg.getDeviceInfo().getDeviceName())  
531 - .setDeviceType(msg.getDeviceInfo().getDeviceType()) 526 + .setDeviceIdMSB(responseMsg.getDeviceInfo().getDeviceIdMSB())
  527 + .setDeviceIdLSB(responseMsg.getDeviceInfo().getDeviceIdLSB())
  528 + .setTenantIdMSB(responseMsg.getDeviceInfo().getTenantIdMSB())
  529 + .setTenantIdLSB(responseMsg.getDeviceInfo().getTenantIdLSB())
  530 + .setDeviceName(responseMsg.getDeviceInfo().getDeviceName())
  531 + .setDeviceType(responseMsg.getDeviceInfo().getDeviceType())
532 .build(); 532 .build();
533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { 533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
534 @Override 534 @Override
535 - public void onSuccess(Void msg) { 535 + public void onSuccess(Void response) {
536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); 536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this);
537 checkGatewaySession(); 537 checkGatewaySession();
538 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); 538 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg));
539 log.info("[{}] Client connected!", sessionId); 539 log.info("[{}] Client connected!", sessionId);
540 } 540 }
541 541
542 @Override 542 @Override
543 public void onError(Throwable e) { 543 public void onError(Throwable e) {
544 log.warn("[{}] Failed to submit session event", sessionId, e); 544 log.warn("[{}] Failed to submit session event", sessionId, e);
545 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE)); 545 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg));
546 ctx.close(); 546 ctx.close();
547 } 547 }
548 }); 548 });