Commit 16d82e77fc5000a19d30797ef195ac30b2cf406a

Authored by YevhenBondarenko
1 parent bff16dda

refactored

@@ -363,31 +363,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -363,31 +363,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
363 } 363 }
364 } 364 }
365 365
366 - private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {  
367 - String userName = msg.payload().userName(); 366 + private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  367 + String userName = connectMessage.payload().userName();
368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName); 368 log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
369 if (StringUtils.isEmpty(userName)) { 369 if (StringUtils.isEmpty(userName)) {
370 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, msg)); 370 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, connectMessage));
371 ctx.close(); 371 ctx.close();
372 } else { 372 } else {
373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(), 373 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 374 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
375 @Override 375 @Override
376 - public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) {  
377 - onValidateDeviceResponse(responseMsg, ctx, msg); 376 + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
  377 + onValidateDeviceResponse(msg, ctx, connectMessage);
378 } 378 }
379 379
380 @Override 380 @Override
381 public void onError(Throwable e) { 381 public void onError(Throwable e) {
382 log.trace("[{}] Failed to process credentials: {}", address, userName, e); 382 log.trace("[{}] Failed to process credentials: {}", address, userName, e);
383 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); 383 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
384 ctx.close(); 384 ctx.close();
385 } 385 }
386 }); 386 });
387 } 387 }
388 } 388 }
389 389
390 - private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage msg) { 390 + private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert, MqttConnectMessage connectMessage) {
391 try { 391 try {
392 if(!context.isSkipValidityCheckForClientCert()){ 392 if(!context.isSkipValidityCheckForClientCert()){
393 cert.checkValidity(); 393 cert.checkValidity();
@@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -397,19 +397,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
397 transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), 397 transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() { 398 new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
399 @Override 399 @Override
400 - public void onSuccess(ValidateDeviceCredentialsResponseMsg responseMsg) {  
401 - onValidateDeviceResponse(responseMsg, ctx, msg); 400 + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
  401 + onValidateDeviceResponse(msg, ctx, connectMessage);
402 } 402 }
403 403
404 @Override 404 @Override
405 public void onError(Throwable e) { 405 public void onError(Throwable e) {
406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); 406 log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
407 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); 407 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
408 ctx.close(); 408 ctx.close();
409 } 409 }
410 }); 410 });
411 } catch (Exception e) { 411 } catch (Exception e) {
412 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); 412 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
413 ctx.close(); 413 ctx.close();
414 } 414 }
415 } 415 }
@@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -513,36 +513,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
513 } 513 }
514 } 514 }
515 515
516 - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg responseMsg, ChannelHandlerContext ctx, MqttConnectMessage msg) {  
517 - if (!responseMsg.hasDeviceInfo()) {  
518 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, msg)); 516 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  517 + if (!msg.hasDeviceInfo()) {
  518 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
519 ctx.close(); 519 ctx.close();
520 } else { 520 } else {
521 - deviceSessionCtx.setDeviceInfo(responseMsg.getDeviceInfo()); 521 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
522 sessionInfo = SessionInfoProto.newBuilder() 522 sessionInfo = SessionInfoProto.newBuilder()
523 .setNodeId(context.getNodeId()) 523 .setNodeId(context.getNodeId())
524 .setSessionIdMSB(sessionId.getMostSignificantBits()) 524 .setSessionIdMSB(sessionId.getMostSignificantBits())
525 .setSessionIdLSB(sessionId.getLeastSignificantBits()) 525 .setSessionIdLSB(sessionId.getLeastSignificantBits())
526 - .setDeviceIdMSB(responseMsg.getDeviceInfo().getDeviceIdMSB())  
527 - .setDeviceIdLSB(responseMsg.getDeviceInfo().getDeviceIdLSB())  
528 - .setTenantIdMSB(responseMsg.getDeviceInfo().getTenantIdMSB())  
529 - .setTenantIdLSB(responseMsg.getDeviceInfo().getTenantIdLSB())  
530 - .setDeviceName(responseMsg.getDeviceInfo().getDeviceName())  
531 - .setDeviceType(responseMsg.getDeviceInfo().getDeviceType()) 526 + .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
  527 + .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
  528 + .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
  529 + .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
  530 + .setDeviceName(msg.getDeviceInfo().getDeviceName())
  531 + .setDeviceType(msg.getDeviceInfo().getDeviceType())
532 .build(); 532 .build();
533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { 533 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
534 @Override 534 @Override
535 public void onSuccess(Void response) { 535 public void onSuccess(Void response) {
536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); 536 transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this);
537 checkGatewaySession(); 537 checkGatewaySession();
538 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg)); 538 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
539 log.info("[{}] Client connected!", sessionId); 539 log.info("[{}] Client connected!", sessionId);
540 } 540 }
541 541
542 @Override 542 @Override
543 public void onError(Throwable e) { 543 public void onError(Throwable e) {
544 log.warn("[{}] Failed to submit session event", sessionId, e); 544 log.warn("[{}] Failed to submit session event", sessionId, e);
545 - ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, msg)); 545 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
546 ctx.close(); 546 ctx.close();
547 } 547 }
548 }); 548 });