Commit 2590f04ac0fd9882ecbefa98111ce57dd9264ab1

Authored by Andrew Shvayka
1 parent c6800dbd

Local Transport Service implementation

  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.transport;
  17 +
  18 +import com.fasterxml.jackson.core.JsonProcessingException;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.beans.factory.annotation.Autowired;
  24 +import org.springframework.beans.factory.annotation.Value;
  25 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  26 +import org.springframework.stereotype.Service;
  27 +import org.thingsboard.server.common.data.Device;
  28 +import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.relation.EntityRelation;
  30 +import org.thingsboard.server.common.data.security.DeviceCredentials;
  31 +import org.thingsboard.server.common.data.security.DeviceCredentialsType;
  32 +import org.thingsboard.server.dao.device.DeviceCredentialsService;
  33 +import org.thingsboard.server.dao.device.DeviceService;
  34 +import org.thingsboard.server.dao.relation.RelationService;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  40 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  43 +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
  44 +import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
  45 +import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
  46 +import org.thingsboard.server.kafka.TbKafkaSettings;
  47 +import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
  48 +import org.thingsboard.server.service.executors.DbCallbackExecutorService;
  49 +import org.thingsboard.server.service.state.DeviceStateService;
  50 +
  51 +import javax.annotation.PostConstruct;
  52 +import javax.annotation.PreDestroy;
  53 +import java.util.UUID;
  54 +import java.util.concurrent.ExecutorService;
  55 +import java.util.concurrent.Executors;
  56 +import java.util.concurrent.locks.ReentrantLock;
  57 +
  58 +/**
  59 + * Created by ashvayka on 05.10.18.
  60 + */
  61 +@Slf4j
  62 +@Service
  63 +public class LocalTransportApiService implements TransportApiService {
  64 +
  65 + private static final ObjectMapper mapper = new ObjectMapper();
  66 +
  67 + @Autowired
  68 + private DeviceService deviceService;
  69 +
  70 + @Autowired
  71 + private RelationService relationService;
  72 +
  73 + @Autowired
  74 + private DeviceCredentialsService deviceCredentialsService;
  75 +
  76 + @Autowired
  77 + private DeviceStateService deviceStateService;
  78 +
  79 + @Autowired
  80 + private DbCallbackExecutorService dbCallbackExecutorService;
  81 +
  82 + private ReentrantLock deviceCreationLock = new ReentrantLock();
  83 +
  84 + @Override
  85 + public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) {
  86 + if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
  87 + ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
  88 + return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
  89 + } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
  90 + ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
  91 + return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
  92 + } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
  93 + return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
  94 + }
  95 + return getEmptyTransportApiResponseFuture();
  96 + }
  97 +
  98 + private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
  99 + //TODO: Make async and enable caching
  100 + DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
  101 + if (credentials != null && credentials.getCredentialsType() == credentialsType) {
  102 + return getDeviceInfo(credentials.getDeviceId());
  103 + } else {
  104 + return getEmptyTransportApiResponseFuture();
  105 + }
  106 + }
  107 +
  108 + private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
  109 + DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
  110 + ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
  111 + return Futures.transform(gatewayFuture, gateway -> {
  112 + deviceCreationLock.lock();
  113 + try {
  114 + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
  115 + if (device == null) {
  116 + device = new Device();
  117 + device.setTenantId(gateway.getTenantId());
  118 + device.setName(requestMsg.getDeviceName());
  119 + device.setType(requestMsg.getDeviceType());
  120 + device.setCustomerId(gateway.getCustomerId());
  121 + device = deviceService.saveDevice(device);
  122 + relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
  123 + deviceStateService.onDeviceAdded(device);
  124 + }
  125 + return TransportApiResponseMsg.newBuilder()
  126 + .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
  127 + } catch (JsonProcessingException e) {
  128 + log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
  129 + throw new RuntimeException(e);
  130 + } finally {
  131 + deviceCreationLock.unlock();
  132 + }
  133 + }, dbCallbackExecutorService);
  134 + }
  135 +
  136 +
  137 + private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
  138 + return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
  139 + if (device == null) {
  140 + log.trace("[{}] Failed to lookup device by id", deviceId);
  141 + return getEmptyTransportApiResponse();
  142 + }
  143 + try {
  144 + return TransportApiResponseMsg.newBuilder()
  145 + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
  146 + } catch (JsonProcessingException e) {
  147 + log.warn("[{}] Failed to lookup device by id", deviceId, e);
  148 + return getEmptyTransportApiResponse();
  149 + }
  150 + });
  151 + }
  152 +
  153 + private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
  154 + return DeviceInfoProto.newBuilder()
  155 + .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
  156 + .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
  157 + .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
  158 + .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
  159 + .setDeviceName(device.getName())
  160 + .setDeviceType(device.getType())
  161 + .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
  162 + .build();
  163 + }
  164 +
  165 + private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
  166 + return Futures.immediateFuture(getEmptyTransportApiResponse());
  167 + }
  168 +
  169 + private TransportApiResponseMsg getEmptyTransportApiResponse() {
  170 + return TransportApiResponseMsg.newBuilder()
  171 + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
  172 + }
  173 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.transport;
  17 +
  18 +import akka.actor.ActorRef;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  22 +import org.springframework.stereotype.Service;
  23 +import org.thingsboard.rule.engine.api.util.DonAsynchron;
  24 +import org.thingsboard.server.actors.ActorSystemContext;
  25 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  26 +import org.thingsboard.server.common.transport.SessionMsgListener;
  27 +import org.thingsboard.server.common.transport.TransportService;
  28 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  29 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
  31 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  32 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
  33 +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
  34 +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  40 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  45 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  46 +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  47 +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  48 +import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
  49 +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
  50 +
  51 +import javax.annotation.PostConstruct;
  52 +import javax.annotation.PreDestroy;
  53 +import java.util.Optional;
  54 +import java.util.UUID;
  55 +import java.util.concurrent.ConcurrentHashMap;
  56 +import java.util.concurrent.ConcurrentMap;
  57 +import java.util.concurrent.ExecutorService;
  58 +import java.util.concurrent.SynchronousQueue;
  59 +import java.util.concurrent.ThreadPoolExecutor;
  60 +import java.util.concurrent.TimeUnit;
  61 +import java.util.function.Consumer;
  62 +
  63 +/**
  64 + * Created by ashvayka on 12.10.18.
  65 + */
  66 +@Slf4j
  67 +@Service
  68 +@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
  69 +public class LocalTransportService implements TransportService, RuleEngineTransportService {
  70 +
  71 + private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
  72 +
  73 + private ExecutorService transportCallbackExecutor;
  74 +
  75 + @Autowired
  76 + private TransportApiService transportApiService;
  77 +
  78 + @Autowired
  79 + private ActorSystemContext actorContext;
  80 +
  81 + //TODO: completely replace this routing with the Kafka routing by partition ids.
  82 + @Autowired
  83 + private ClusterRoutingService routingService;
  84 + @Autowired
  85 + private ClusterRpcService rpcService;
  86 + @Autowired
  87 + private DataDecodingEncodingService encodingService;
  88 +
  89 + @PostConstruct
  90 + public void init() {
  91 + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
  92 + }
  93 +
  94 + @PreDestroy
  95 + public void destroy() {
  96 + if (transportCallbackExecutor != null) {
  97 + transportCallbackExecutor.shutdownNow();
  98 + }
  99 + }
  100 +
  101 + @Override
  102 + public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  103 + DonAsynchron.withCallback(
  104 + transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
  105 + transportApiResponseMsg -> {
  106 + if (callback != null) {
  107 + callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
  108 + }
  109 + },
  110 + getThrowableConsumer(callback), transportCallbackExecutor);
  111 + }
  112 +
  113 + @Override
  114 + public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
  115 + DonAsynchron.withCallback(
  116 + transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
  117 + transportApiResponseMsg -> {
  118 + if (callback != null) {
  119 + callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
  120 + }
  121 + },
  122 + getThrowableConsumer(callback), transportCallbackExecutor);
  123 + }
  124 +
  125 + @Override
  126 + public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
  127 + DonAsynchron.withCallback(
  128 + transportApiService.handle(TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
  129 + transportApiResponseMsg -> {
  130 + if (callback != null) {
  131 + callback.onSuccess(transportApiResponseMsg.getGetOrCreateDeviceResponseMsg());
  132 + }
  133 + },
  134 + getThrowableConsumer(callback), transportCallbackExecutor);
  135 + }
  136 +
  137 + @Override
  138 + public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  139 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
  140 + }
  141 +
  142 + @Override
  143 + public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  144 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
  145 + }
  146 +
  147 + @Override
  148 + public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  149 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
  150 + }
  151 +
  152 + @Override
  153 + public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  154 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
  155 + }
  156 +
  157 + @Override
  158 + public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  159 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
  160 + }
  161 +
  162 + @Override
  163 + public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  164 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
  165 + }
  166 +
  167 + @Override
  168 + public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  169 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
  170 + }
  171 +
  172 + @Override
  173 + public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
  174 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
  175 + }
  176 +
  177 + @Override
  178 + public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
  179 + sessions.putIfAbsent(toId(sessionInfo), listener);
  180 + //TODO: monitor sessions periodically: PING REQ/RESP, etc.
  181 + }
  182 +
  183 + @Override
  184 + public void deregisterSession(SessionInfoProto sessionInfo) {
  185 + sessions.remove(toId(sessionInfo));
  186 + }
  187 +
  188 + private UUID toId(SessionInfoProto sessionInfo) {
  189 + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  190 + }
  191 +
  192 + @Override
  193 + public void process(String nodeId, DeviceActorToTransportMsg msg) {
  194 + process(nodeId, msg, null, null);
  195 + }
  196 +
  197 + @Override
  198 + public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
  199 + UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
  200 + SessionMsgListener listener = sessions.get(sessionId);
  201 + if (listener != null) {
  202 + transportCallbackExecutor.submit(() -> {
  203 + if (msg.hasGetAttributesResponse()) {
  204 + listener.onGetAttributesResponse(msg.getGetAttributesResponse());
  205 + }
  206 + if (msg.hasAttributeUpdateNotification()) {
  207 + listener.onAttributeUpdate(msg.getAttributeUpdateNotification());
  208 + }
  209 + if (msg.hasSessionCloseNotification()) {
  210 + listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification());
  211 + }
  212 + if (msg.hasToDeviceRequest()) {
  213 + listener.onToDeviceRpcRequest(msg.getToDeviceRequest());
  214 + }
  215 + if (msg.hasToServerResponse()) {
  216 + listener.onToServerRpcResponse(msg.getToServerResponse());
  217 + }
  218 + });
  219 + } else {
  220 + //TODO: should we notify the device actor about missed session?
  221 + log.debug("[{}] Missing session.", sessionId);
  222 + }
  223 + if (onSuccess != null) {
  224 + onSuccess.run();
  225 + }
  226 + }
  227 +
  228 + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
  229 + TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
  230 + Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
  231 + if (address.isPresent()) {
  232 + rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
  233 + } else {
  234 + actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
  235 + }
  236 + if (callback != null) {
  237 + callback.onSuccess(null);
  238 + }
  239 + }
  240 +
  241 + private <T> Consumer<Throwable> getThrowableConsumer(TransportServiceCallback<T> callback) {
  242 + return e -> {
  243 + if (callback != null) {
  244 + callback.onError(e);
  245 + }
  246 + };
  247 + }
  248 +
  249 +}
@@ -55,7 +55,7 @@ import java.util.function.Consumer; @@ -55,7 +55,7 @@ import java.util.function.Consumer;
55 */ 55 */
56 @Slf4j 56 @Slf4j
57 @Service 57 @Service
58 -@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true") 58 +@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
59 public class RemoteRuleEngineTransportService implements RuleEngineTransportService { 59 public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
60 60
61 private static final ObjectMapper mapper = new ObjectMapper(); 61 private static final ObjectMapper mapper = new ObjectMapper();
@@ -78,9 +78,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ @@ -78,9 +78,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
78 @Autowired 78 @Autowired
79 private ActorSystemContext actorContext; 79 private ActorSystemContext actorContext;
80 80
81 - @Autowired  
82 - private ActorService actorService;  
83 -  
84 //TODO: completely replace this routing with the Kafka routing by partition ids. 81 //TODO: completely replace this routing with the Kafka routing by partition ids.
85 @Autowired 82 @Autowired
86 private ClusterRoutingService routingService; 83 private ClusterRoutingService routingService;
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
23 import org.springframework.beans.factory.annotation.Autowired; 23 import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.beans.factory.annotation.Value; 24 import org.springframework.beans.factory.annotation.Value;
25 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 25 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  26 +import org.springframework.stereotype.Component;
26 import org.springframework.stereotype.Service; 27 import org.springframework.stereotype.Service;
27 import org.thingsboard.server.common.data.Device; 28 import org.thingsboard.server.common.data.Device;
28 import org.thingsboard.server.common.data.id.DeviceId; 29 import org.thingsboard.server.common.data.id.DeviceId;
@@ -48,20 +49,22 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService; @@ -48,20 +49,22 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
48 import org.thingsboard.server.service.state.DeviceStateService; 49 import org.thingsboard.server.service.state.DeviceStateService;
49 50
50 import javax.annotation.PostConstruct; 51 import javax.annotation.PostConstruct;
  52 +import javax.annotation.PreDestroy;
51 import java.util.UUID; 53 import java.util.UUID;
52 import java.util.concurrent.ExecutorService; 54 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors; 55 import java.util.concurrent.Executors;
  56 +import java.util.concurrent.SynchronousQueue;
  57 +import java.util.concurrent.ThreadPoolExecutor;
  58 +import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.locks.ReentrantLock; 59 import java.util.concurrent.locks.ReentrantLock;
55 60
56 /** 61 /**
57 * Created by ashvayka on 05.10.18. 62 * Created by ashvayka on 05.10.18.
58 */ 63 */
59 @Slf4j 64 @Slf4j
60 -@Service  
61 -@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")  
62 -public class RemoteTransportApiService implements TransportApiService {  
63 -  
64 - private static final ObjectMapper mapper = new ObjectMapper(); 65 +@Component
  66 +@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
  67 +public class RemoteTransportApiService {
65 68
66 @Value("${transport.remote.transport_api.requests_topic}") 69 @Value("${transport.remote.transport_api.requests_topic}")
67 private String transportApiRequestsTopic; 70 private String transportApiRequestsTopic;
@@ -83,26 +86,15 @@ public class RemoteTransportApiService implements TransportApiService { @@ -83,26 +86,15 @@ public class RemoteTransportApiService implements TransportApiService {
83 private DiscoveryService discoveryService; 86 private DiscoveryService discoveryService;
84 87
85 @Autowired 88 @Autowired
86 - private DeviceService deviceService;  
87 -  
88 - @Autowired  
89 - private RelationService relationService;  
90 -  
91 - @Autowired  
92 - private DeviceCredentialsService deviceCredentialsService;  
93 -  
94 - @Autowired  
95 - private DeviceStateService deviceStateService; 89 + private TransportApiService transportApiService;
96 90
97 private ExecutorService transportCallbackExecutor; 91 private ExecutorService transportCallbackExecutor;
98 92
99 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate; 93 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
100 94
101 - private ReentrantLock deviceCreationLock = new ReentrantLock();  
102 -  
103 @PostConstruct 95 @PostConstruct
104 public void init() { 96 public void init() {
105 - this.transportCallbackExecutor = Executors.newCachedThreadPool(); 97 + this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
106 98
107 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder(); 99 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
108 responseBuilder.settings(kafkaSettings); 100 responseBuilder.settings(kafkaSettings);
@@ -126,98 +118,19 @@ public class RemoteTransportApiService implements TransportApiService { @@ -126,98 +118,19 @@ public class RemoteTransportApiService implements TransportApiService {
126 builder.requestTimeout(requestTimeout); 118 builder.requestTimeout(requestTimeout);
127 builder.pollInterval(responsePollDuration); 119 builder.pollInterval(responsePollDuration);
128 builder.executor(transportCallbackExecutor); 120 builder.executor(transportCallbackExecutor);
129 - builder.handler(this); 121 + builder.handler(transportApiService);
130 transportApiTemplate = builder.build(); 122 transportApiTemplate = builder.build();
131 transportApiTemplate.init(); 123 transportApiTemplate.init();
132 } 124 }
133 125
134 - @Override  
135 - public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {  
136 - if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {  
137 - ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();  
138 - return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);  
139 - } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {  
140 - ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();  
141 - return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);  
142 - } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {  
143 - return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); 126 + @PreDestroy
  127 + public void destroy() {
  128 + if (transportApiTemplate != null) {
  129 + transportApiTemplate.stop();
144 } 130 }
145 - return getEmptyTransportApiResponseFuture();  
146 - }  
147 -  
148 - private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {  
149 - //TODO: Make async and enable caching  
150 - DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);  
151 - if (credentials != null && credentials.getCredentialsType() == credentialsType) {  
152 - return getDeviceInfo(credentials.getDeviceId());  
153 - } else {  
154 - return getEmptyTransportApiResponseFuture(); 131 + if (transportCallbackExecutor != null) {
  132 + transportCallbackExecutor.shutdownNow();
155 } 133 }
156 } 134 }
157 135
158 - private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {  
159 - DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));  
160 - ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);  
161 - return Futures.transform(gatewayFuture, gateway -> {  
162 - deviceCreationLock.lock();  
163 - try {  
164 - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());  
165 - if (device == null) {  
166 - device = new Device();  
167 - device.setTenantId(gateway.getTenantId());  
168 - device.setName(requestMsg.getDeviceName());  
169 - device.setType(requestMsg.getDeviceType());  
170 - device.setCustomerId(gateway.getCustomerId());  
171 - device = deviceService.saveDevice(device);  
172 - relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));  
173 - deviceStateService.onDeviceAdded(device);  
174 - }  
175 - return TransportApiResponseMsg.newBuilder()  
176 - .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();  
177 - } catch (JsonProcessingException e) {  
178 - log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);  
179 - throw new RuntimeException(e);  
180 - } finally {  
181 - deviceCreationLock.unlock();  
182 - }  
183 - }, transportCallbackExecutor);  
184 - }  
185 -  
186 -  
187 - private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {  
188 - return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {  
189 - if (device == null) {  
190 - log.trace("[{}] Failed to lookup device by id", deviceId);  
191 - return getEmptyTransportApiResponse();  
192 - }  
193 - try {  
194 - return TransportApiResponseMsg.newBuilder()  
195 - .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();  
196 - } catch (JsonProcessingException e) {  
197 - log.warn("[{}] Failed to lookup device by id", deviceId, e);  
198 - return getEmptyTransportApiResponse();  
199 - }  
200 - });  
201 - }  
202 -  
203 - private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {  
204 - return DeviceInfoProto.newBuilder()  
205 - .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())  
206 - .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())  
207 - .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())  
208 - .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())  
209 - .setDeviceName(device.getName())  
210 - .setDeviceType(device.getType())  
211 - .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))  
212 - .build();  
213 - }  
214 -  
215 - private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {  
216 - return Futures.immediateFuture(getEmptyTransportApiResponse());  
217 - }  
218 -  
219 - private TransportApiResponseMsg getEmptyTransportApiResponse() {  
220 - return TransportApiResponseMsg.newBuilder()  
221 - .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();  
222 - }  
223 } 136 }
@@ -452,8 +452,8 @@ js: @@ -452,8 +452,8 @@ js:
452 max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" 452 max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
453 453
454 transport: 454 transport:
  455 + type: "${TRANSPORT_TYPE:remote}" # local or remote
455 remote: 456 remote:
456 - enabled: "${REMOTE_TRANSPORT_ENABLED:true}"  
457 transport_api: 457 transport_api:
458 requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" 458 requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
459 responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" 459 responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
@@ -22,6 +22,6 @@ import com.google.common.util.concurrent.ListenableFuture; @@ -22,6 +22,6 @@ import com.google.common.util.concurrent.ListenableFuture;
22 */ 22 */
23 public interface TbKafkaHandler<Request, Response> { 23 public interface TbKafkaHandler<Request, Response> {
24 24
25 - ListenableFuture<Response> handle(Request request) throws Exception; 25 + ListenableFuture<Response> handle(Request request);
26 26
27 } 27 }
@@ -48,7 +48,6 @@ public class MqttTransportContext { @@ -48,7 +48,6 @@ public class MqttTransportContext {
48 private final ObjectMapper mapper = new ObjectMapper(); 48 private final ObjectMapper mapper = new ObjectMapper();
49 49
50 @Autowired 50 @Autowired
51 - @Lazy  
52 private TransportService transportService; 51 private TransportService transportService;
53 52
54 @Autowired(required = false) 53 @Autowired(required = false)
@@ -494,7 +494,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -494,7 +494,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
494 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED)); 494 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
495 ctx.close(); 495 ctx.close();
496 } else { 496 } else {
497 - ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));  
498 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); 497 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
499 sessionInfo = SessionInfoProto.newBuilder() 498 sessionInfo = SessionInfoProto.newBuilder()
500 .setNodeId(context.getNodeId()) 499 .setNodeId(context.getNodeId())
@@ -508,6 +507,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -508,6 +507,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
508 transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null); 507 transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
509 transportService.registerSession(sessionInfo, this); 508 transportService.registerSession(sessionInfo, this);
510 checkGatewaySession(); 509 checkGatewaySession();
  510 + ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
511 } 511 }
512 } 512 }
513 513
@@ -87,10 +87,12 @@ public class GatewaySessionHandler { @@ -87,10 +87,12 @@ public class GatewaySessionHandler {
87 JsonElement json = getJson(msg); 87 JsonElement json = getJson(msg);
88 String deviceName = checkDeviceName(getDeviceName(json)); 88 String deviceName = checkDeviceName(getDeviceName(json));
89 String deviceType = getDeviceType(json); 89 String deviceType = getDeviceType(json);
  90 + log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
90 Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() { 91 Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
91 @Override 92 @Override
92 public void onSuccess(@Nullable GatewayDeviceSessionCtx result) { 93 public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
93 ack(msg); 94 ack(msg);
  95 + log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
94 } 96 }
95 97
96 @Override 98 @Override
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!--
  3 +
  4 + Copyright © 2016-2018 The Thingsboard Authors
  5 +
  6 + Licensed under the Apache License, Version 2.0 (the "License");
  7 + you may not use this file except in compliance with the License.
  8 + You may obtain a copy of the License at
  9 +
  10 + http://www.apache.org/licenses/LICENSE-2.0
  11 +
  12 + Unless required by applicable law or agreed to in writing, software
  13 + distributed under the License is distributed on an "AS IS" BASIS,
  14 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + See the License for the specific language governing permissions and
  16 + limitations under the License.
  17 +
  18 +-->
  19 +<!DOCTYPE configuration>
  20 +<configuration scan="true" scanPeriod="10 seconds">
  21 +
  22 + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  23 + <encoder>
  24 + <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
  25 + </encoder>
  26 + </appender>
  27 +
  28 + <logger name="org.thingsboard.server" level="TRACE" />
  29 +
  30 + <root level="INFO">
  31 + <appender-ref ref="STDOUT"/>
  32 + </root>
  33 +
  34 +</configuration>