Commit 6c61169452092e18611f968e0b32da68309ba7db

Authored by Andrew Shvayka
1 parent 1ac0ed40

Ability to handle X509 certificate login via MQTT

... ... @@ -33,7 +33,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
33 33 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
34 34 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
35 35 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
36   -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
37 38 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
38 39 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
39 40 import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
... ... @@ -118,17 +119,24 @@ public class RemoteTransportApiService implements TransportApiService {
118 119 public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
119 120 if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
120 121 ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
121   - //TODO: Make async and enable caching
122   - DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken());
123   - if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) {
124   - return getDeviceInfo(credentials.getDeviceId());
125   - } else {
126   - return getEmptyTransportApiResponseFuture();
127   - }
  122 + return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
  123 + } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
  124 + ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
  125 + return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
128 126 }
129 127 return getEmptyTransportApiResponseFuture();
130 128 }
131 129
  130 + private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
  131 + //TODO: Make async and enable caching
  132 + DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
  133 + if (credentials != null && credentials.getCredentialsType() == credentialsType) {
  134 + return getDeviceInfo(credentials.getDeviceId());
  135 + } else {
  136 + return getEmptyTransportApiResponseFuture();
  137 + }
  138 + }
  139 +
132 140 private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
133 141 return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
134 142 if (device == null) {
... ... @@ -146,7 +154,7 @@ public class RemoteTransportApiService implements TransportApiService {
146 154 .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
147 155 .build();
148 156 return TransportApiResponseMsg.newBuilder()
149   - .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
  157 + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
150 158 } catch (JsonProcessingException e) {
151 159 log.warn("[{}] Failed to lookup device by id", deviceId, e);
152 160 return getEmptyTransportApiResponse();
... ... @@ -160,6 +168,6 @@ public class RemoteTransportApiService implements TransportApiService {
160 168
161 169 private TransportApiResponseMsg getEmptyTransportApiResponse() {
162 170 return TransportApiResponseMsg.newBuilder()
163   - .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build();
  171 + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
164 172 }
165 173 }
... ...
... ... @@ -23,7 +23,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
23 23 public interface TransportService {
24 24
25 25 void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
26   - TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
  26 + TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
  27 +
  28 + void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
  29 + TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
27 30
28 31 void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
29 32
... ...
... ... @@ -93,7 +93,11 @@ message ValidateDeviceTokenRequestMsg {
93 93 string token = 1;
94 94 }
95 95
96   -message ValidateDeviceTokenResponseMsg {
  96 +message ValidateDeviceX509CertRequestMsg {
  97 + string hash = 1;
  98 +}
  99 +
  100 +message ValidateDeviceCredentialsResponseMsg {
97 101 DeviceInfoProto deviceInfo = 1;
98 102 }
99 103
... ... @@ -106,8 +110,9 @@ message TransportToRuleEngineMsg {
106 110
107 111 message TransportApiRequestMsg {
108 112 ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
  113 + ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
109 114 }
110 115
111 116 message TransportApiResponseMsg {
112   - ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
  117 + ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
113 118 }
\ No newline at end of file
... ...
... ... @@ -380,9 +380,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
380 380 ctx.close();
381 381 } else {
382 382 transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
383   - new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
  383 + new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
384 384 @Override
385   - public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
  385 + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
386 386 if (!msg.hasDeviceInfo()) {
387 387 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
388 388 ctx.close();
... ... @@ -404,32 +404,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
404 404 }
405 405 }
406 406
407   - protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
408   - return SessionEventMsg.newBuilder()
409   - .setSessionInfo(sessionInfo)
410   - .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
411   - .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
412   - .setEvent(event).build();
413   - }
414   -
415 407 private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
416   -// try {
417   -// String strCert = SslUtil.getX509CertificateString(cert);
418   -// String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
419   -// if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
420   -// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
421   -// connected = true;
422   -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
423   -// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
424   -// checkGatewaySession();
425   -// } else {
426   -// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
427   -// ctx.close();
428   -// }
429   -// } catch (Exception e) {
430   -// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
431   -// ctx.close();
432   -// }
  408 + try {
  409 + String strCert = SslUtil.getX509CertificateString(cert);
  410 + String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
  411 + transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
  412 + new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
  413 + @Override
  414 + public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
  415 + if (!msg.hasDeviceInfo()) {
  416 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
  417 + ctx.close();
  418 + } else {
  419 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  420 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  421 + transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
  422 + checkGatewaySession();
  423 + }
  424 + }
  425 +
  426 + @Override
  427 + public void onError(Throwable e) {
  428 + log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
  429 + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  430 + ctx.close();
  431 + }
  432 + });
  433 + } catch (Exception e) {
  434 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
  435 + ctx.close();
  436 + }
433 437 }
434 438
435 439 private X509Certificate getX509Certificate() {
... ... @@ -519,6 +523,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
519 523 }
520 524 }
521 525
  526 + private SessionEventMsg getSessionEventMsg(SessionEvent event) {
  527 + return SessionEventMsg.newBuilder()
  528 + .setSessionInfo(sessionInfo)
  529 + .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
  530 + .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
  531 + .setEvent(event).build();
  532 + }
  533 +
522 534 @Override
523 535 public void operationComplete(Future<? super Void> future) throws Exception {
524 536 if (deviceSessionCtx.isConnected()) {
... ...
... ... @@ -104,8 +104,16 @@ public class MqttTransportService implements TransportService {
104 104 }
105 105
106 106 @Override
107   - public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
108   - AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
  107 + public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  108 + AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
  109 + TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
  110 + response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
  111 + }
  112 +
  113 + @Override
  114 + public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  115 + AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
  116 + TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
109 117 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
110 118 }
111 119
... ...