Commit d708f58999b4654cbb8c3358e13ae33e4dfe9b4e

Authored by 芯火源
1 parent 163badcf

feat: UDP协议接入设备功能

  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.udp;
  17 +
  18 +
  19 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
  20 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
  21 +
  22 +import com.fasterxml.jackson.databind.JsonNode;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.MoreExecutors;
  27 +import io.netty.buffer.ByteBuf;
  28 +import io.netty.buffer.Unpooled;
  29 +import io.netty.channel.ChannelFuture;
  30 +import io.netty.channel.ChannelHandlerContext;
  31 +import io.netty.channel.ChannelInboundHandlerAdapter;
  32 +import io.netty.channel.socket.DatagramPacket;
  33 +import io.netty.handler.codec.mqtt.*;
  34 +import io.netty.handler.ssl.SslHandler;
  35 +import io.netty.util.ReferenceCountUtil;
  36 +import io.netty.util.concurrent.Future;
  37 +import io.netty.util.concurrent.GenericFutureListener;
  38 +import java.io.IOException;
  39 +import java.net.InetSocketAddress;
  40 +import java.util.Map;
  41 +import java.util.Optional;
  42 +import java.util.UUID;
  43 +import java.util.concurrent.atomic.AtomicInteger;
  44 +
  45 +import lombok.extern.slf4j.Slf4j;
  46 +import org.apache.commons.lang3.StringUtils;
  47 +import org.checkerframework.checker.nullness.qual.Nullable;
  48 +import org.thingsboard.common.util.JacksonUtil;
  49 +import org.thingsboard.server.common.data.DataConstants;
  50 +import org.thingsboard.server.common.data.Device;
  51 +import org.thingsboard.server.common.data.DeviceProfile;
  52 +import org.thingsboard.server.common.data.DeviceTransportType;
  53 +import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
  54 +import org.thingsboard.server.common.data.id.DeviceId;
  55 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  56 +import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
  57 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
  58 +import org.thingsboard.server.common.transport.SessionMsgListener;
  59 +import org.thingsboard.server.common.transport.TransportService;
  60 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  61 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  62 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  63 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  64 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  65 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  66 +import org.thingsboard.server.common.transport.service.SessionMetaData;
  67 +import org.thingsboard.server.gen.transport.TransportProtos;
  68 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  69 +import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
  70 +import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
  71 +import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
  72 +import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
  73 +import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;
  74 +import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
  75 +import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
  76 +import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
  77 +
  78 +/**
  79 + * @author Andrew Shvayka
  80 + */
  81 +@Slf4j
  82 + public class UdpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
  83 +
  84 + private final UUID sessionId;
  85 + private final TcpTransportContext context;
  86 + private final TransportService transportService;
  87 + private final SslHandler sslHandler;
  88 +
  89 +
  90 + /**
  91 + * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
  92 + */
  93 + final TcpDeviceSessionCtx deviceSessionCtx;
  94 + volatile InetSocketAddress address;
  95 +
  96 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
  97 + private final AtomicInteger authedCounter = new AtomicInteger();
  98 +
  99 +
  100 + UdpTransportHandler(TcpTransportContext context, SslHandler sslHandler) {
  101 + super();
  102 + this.sessionId = UUID.randomUUID();
  103 + this.context = context;
  104 + this.transportService = context.getTransportService();
  105 + this.sslHandler = sslHandler;
  106 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  107 + }
  108 +
  109 + @Override
  110 + public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  111 + super.channelRegistered(ctx);
  112 + context.channelRegistered();
  113 + }
  114 +
  115 + @Override
  116 + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  117 + super.channelUnregistered(ctx);
  118 + context.channelUnregistered();
  119 + }
  120 +
  121 + @Override
  122 + public void channelRead(ChannelHandlerContext ctx, Object msg) {
  123 + log.error("【{}】 Processing msg: 【{}】", sessionId, msg);
  124 + try {
  125 + if(!context.isReady()){
  126 + ctx.close();
  127 + return;
  128 + }
  129 + if (address == null) {
  130 + address = getAddress(ctx);
  131 + }
  132 + if (msg instanceof DatagramPacket) {
  133 + DatagramPacket message = (DatagramPacket) msg;
  134 + byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content());
  135 + String msgStr = ByteUtils.bytesToStr(byteMsg);
  136 + log.error("会话【{}】收到设备【{}】来自【{}】数据【{}】", sessionId, deviceSessionCtx.getDeviceId(), address, msgStr);
  137 + deviceSessionCtx.setChannel(ctx);
  138 + if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
  139 + processConnect(ctx, msgStr);
  140 + } else {
  141 + enqueueRegularSessionMsg(ctx, msgStr);
  142 + }
  143 +
  144 + } else {
  145 + log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
  146 + ctx.close();
  147 + }
  148 + } finally {
  149 + ReferenceCountUtil.safeRelease(msg);
  150 + }
  151 + }
  152 +
  153 + InetSocketAddress getAddress(ChannelHandlerContext ctx) {
  154 + var address = ctx.channel().attr(UdpTransportService.ADDRESS).get();
  155 + if (address == null) {
  156 + log.trace("[{}] Received empty address.", ctx.channel().id());
  157 + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
  158 + log.trace("[{}] Going to use address: {}", ctx.channel().id(), remoteAddress);
  159 + return remoteAddress;
  160 + } else {
  161 + log.trace("[{}] Received address: {}", ctx.channel().id(), address);
  162 + }
  163 + return address;
  164 + }
  165 +
  166 +
  167 + void enqueueRegularSessionMsg(ChannelHandlerContext ctx, String msg) {
  168 + final int queueSize = deviceSessionCtx.getMsgQueueSize();
  169 + if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
  170 + log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
  171 + deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
  172 + ctx.close();
  173 + return;
  174 + }
  175 +
  176 + deviceSessionCtx.addToQueue(msg);
  177 + processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
  178 +
  179 + }
  180 +
  181 +
  182 + void processQueueMessage(ChannelHandlerContext ctx) {
  183 + if (!deviceSessionCtx.isConnected()) {
  184 + log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
  185 + return;
  186 + }
  187 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  188 + }
  189 +
  190 +
  191 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
  192 + if (!checkConnected(ctx, tcpMessage)) {
  193 + return;
  194 + }
  195 + deviceSessionCtx.doUpScript(tcpMessage, r -> {
  196 + if (gatewaySessionHandler != null) {
  197 + processGatewayDeviceMsg(ctx, r);
  198 + }
  199 + processDirectDeviceMsg(ctx, r);
  200 + });
  201 +
  202 + }
  203 +
  204 +
  205 + /**
  206 + * 上行脚本解析结果是否包含数据
  207 + * @param datas 数据集合
  208 + * @return
  209 + */
  210 + private boolean hasDatas(Map<String, Object> datas) {
  211 + if (datas == null || datas.isEmpty()) {
  212 + return false;
  213 + }
  214 + return true;
  215 + }
  216 +
  217 +
  218 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  219 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  220 + try {
  221 + Map<String, Object> datas = tcpMessage.getDatas();
  222 + if (hasDatas(datas)) {
  223 + datas.forEach((devName, param) -> {
  224 + if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
  225 + return;
  226 + }
  227 + if (tcpMessage.getTelemetry()) {
  228 + gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
  229 + } else {
  230 +// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
  231 + }
  232 +
  233 + });
  234 + } else {
  235 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  236 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  237 + }
  238 + } catch (Exception e) {
  239 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  240 + ctx.close();
  241 + }
  242 + }
  243 +
  244 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  245 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  246 + try {
  247 + TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
  248 + Map<String, Object> datas = tcpMessage.getDatas();
  249 + if (hasDatas(datas)) {
  250 + String dataStr = JacksonUtil.toString(datas);
  251 + if (tcpMessage.getTelemetry()) {
  252 + TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
  253 + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
  254 + } else {
  255 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  256 + transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
  257 + }
  258 + } else {
  259 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  260 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  261 + }
  262 + } catch (AdaptorException e) {
  263 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  264 + ctx.close();
  265 + }
  266 + }
  267 +
  268 +
  269 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {
  270 + return new TransportServiceCallback<>() {
  271 + @Override
  272 + public void onSuccess(Void dummy) {
  273 + log.trace("[{}] Published msg: {}", sessionId, msg);
  274 + if (StringUtils.isNotEmpty(msg.getAckMsg())) {
  275 + pushDeviceMsg(ctx, msg.getAckMsg());
  276 + }
  277 + }
  278 +
  279 + @Override
  280 + public void onError(Throwable e) {
  281 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  282 + ctx.close();
  283 + }
  284 + };
  285 + }
  286 +
  287 +
  288 + void processConnect(ChannelHandlerContext ctx, String accessToken) {
  289 + log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
  290 +
  291 + if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {
  292 + deviceSessionCtx.setProvisionOnly(true);
  293 + pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");
  294 + } else {
  295 + TkScriptInvokeService.authScripts.forEach(id -> {
  296 + ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
  297 + Futures.addCallback(item, new FutureCallback<String>() {
  298 + @Override
  299 + public void onSuccess(@Nullable String result) {
  300 + processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));
  301 + }
  302 +
  303 + @Override
  304 + public void onFailure(Throwable t) {
  305 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  306 + }
  307 + }, MoreExecutors.directExecutor());
  308 +
  309 + });
  310 + }
  311 + }
  312 +
  313 + private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
  314 +
  315 + log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);
  316 + if (null != accessToken.getClientId()) {
  317 + }
  318 + if (null != accessToken.getUserName()) {
  319 + }
  320 + String token = accessToken.getPassword();
  321 + if(StringUtils.isNotEmpty(token)){
  322 + TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
  323 + request.setToken(token);
  324 + transportService.process(DeviceTransportType.TCP, request.build(),
  325 + new TransportServiceCallback<>() {
  326 + @Override
  327 + public void onSuccess(ValidateDeviceCredentialsResponse msg) {
  328 + onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
  329 + }
  330 +
  331 + @Override
  332 + public void onError(Throwable e) {
  333 + log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
  334 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  335 + }
  336 + });
  337 + }else{
  338 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  339 + }
  340 + }
  341 +
  342 +
  343 +
  344 +
  345 + @Override
  346 + public void channelReadComplete(ChannelHandlerContext ctx) {
  347 + ctx.flush();
  348 + }
  349 +
  350 + @Override
  351 + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  352 + log.error("[{}] Unexpected Exception", sessionId, cause);
  353 + ctx.close();
  354 + if (cause instanceof OutOfMemoryError) {
  355 + log.error("Received critical error. Going to shutdown the service.");
  356 + System.exit(1);
  357 + }
  358 + }
  359 +
  360 +
  361 +
  362 +
  363 + private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
  364 + if (deviceSessionCtx.isConnected()) {
  365 + return true;
  366 + } else {
  367 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
  368 + return false;
  369 + }
  370 + }
  371 +
  372 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  373 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  374 + try {
  375 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  376 + if (infoNode != null) {
  377 + JsonNode gatewayNode = infoNode.get("gateway");
  378 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  379 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  380 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  381 + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  382 + }
  383 + }
  384 + }
  385 + } catch (IOException e) {
  386 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  387 + }
  388 + }
  389 +
  390 + @Override
  391 + public void operationComplete(Future<? super Void> future) throws Exception {
  392 + log.trace("[{}] Channel closed!", sessionId);
  393 + doDisconnect();
  394 + }
  395 +
  396 + public void doDisconnect() {
  397 + if (deviceSessionCtx.isConnected()) {
  398 + log.debug("[{}] Client disconnected!", sessionId);
  399 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  400 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  401 + if (gatewaySessionHandler != null) {
  402 + gatewaySessionHandler.onGatewayDisconnect();
  403 + }
  404 + deviceSessionCtx.setDisconnected();
  405 + }
  406 + deviceSessionCtx.release();
  407 + }
  408 +
  409 +
  410 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {
  411 + if (!msg.hasDeviceInfo()) {
  412 + context.onAuthFailure(address);
  413 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
  414 + } else {
  415 + DeviceProfile profile = msg.getDeviceProfile();
  416 + TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();
  417 + if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
  418 + authedCounter.incrementAndGet();
  419 + return;
  420 + }
  421 + context.onAuthSuccess(address);
  422 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  423 + deviceSessionCtx.setDeviceProfile(profile);
  424 + deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
  425 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
  426 + @Override
  427 + public void onSuccess(Void msg) {
  428 + SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpTransportHandler.this);
  429 + checkGatewaySession(sessionMetaData);
  430 + pushDeviceMsg(ctx,authEntry.getSuccess());
  431 + deviceSessionCtx.setConnected(true);
  432 + log.debug("[{}] Client connected!", sessionId);
  433 +
  434 + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
  435 + transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
  436 + }
  437 +
  438 + @Override
  439 + public void onError(Throwable e) {
  440 + if (e instanceof TbRateLimitsException) {
  441 + log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
  442 + } else {
  443 + log.warn("[{}] Failed to submit session event", sessionId, e);
  444 + }
  445 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  446 + }
  447 + });
  448 + }
  449 + }
  450 +
  451 + private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
  452 + authedCounter.incrementAndGet();
  453 + if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
  454 + pushDeviceMsg(ctx,msg.name());
  455 +
  456 + ctx.close();
  457 + }
  458 + }
  459 +
  460 + @Override
  461 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  462 +
  463 + }
  464 +
  465 + @Override
  466 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  467 +
  468 + }
  469 +
  470 + @Override
  471 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  472 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  473 + deviceSessionCtx.getChannel().close();
  474 + }
  475 +
  476 + @Override
  477 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  478 + log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
  479 + TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
  480 + try {
  481 + adaptor
  482 + .convertToPublish(deviceSessionCtx, rpcRequest)
  483 + .ifPresent(
  484 + payload -> {
  485 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
  486 + Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{
  487 + cf.addListener(
  488 + result -> {
  489 + if (result.cause() == null) {
  490 + transportService.process(
  491 + deviceSessionCtx.getSessionInfo(),
  492 + rpcRequest,
  493 + RpcStatus.DELIVERED,
  494 + TransportServiceCallback.EMPTY);
  495 + } else {
  496 + // TODO: send error
  497 + }
  498 + });
  499 + });
  500 + ;
  501 + });
  502 + } catch (Exception e) {
  503 + transportService.process(deviceSessionCtx.getSessionInfo(),
  504 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  505 + .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
  506 + log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
  507 + }
  508 + }
  509 +
  510 +
  511 + @Override
  512 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  513 + log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
  514 + }
  515 +
  516 + /**
  517 + * 往设备推送消息
  518 + *
  519 + * @param message
  520 + * @return
  521 + */
  522 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
  523 + if(StringUtils.isBlank(message)){
  524 + return null;
  525 + }
  526 + ByteBuf buff = Unpooled.buffer();
  527 + if(!message.matches("-?[0-9a-fA-F]+")){
  528 + //不满足16进制将字符串转为16进制
  529 + message = ByteUtils.stringEncodeToHex(message);
  530 + }
  531 + buff.writeBytes(ByteUtils.hexToByteArray(message));
  532 + return ctx.writeAndFlush(buff);
  533 + }
  534 +
  535 +
  536 + @Override
  537 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  538 + deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
  539 + }
  540 +
  541 + @Override
  542 + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  543 + deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
  544 + }
  545 +
  546 + @Override
  547 + public void onDeviceDeleted(DeviceId deviceId) {
  548 + context.onAuthFailure(address);
  549 + ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
  550 + ctx.close();
  551 + }
  552 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.udp;
  17 +
  18 +import io.netty.channel.Channel;
  19 +import io.netty.channel.ChannelInitializer;
  20 +import io.netty.channel.ChannelPipeline;
  21 +import io.netty.handler.ssl.SslHandler;
  22 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  23 +
  24 +/**
  25 + * @author Andrew Shvayka
  26 + */
  27 +public class UdpTransportServerInitializer extends ChannelInitializer<Channel> {
  28 +
  29 + private final TcpTransportContext context;
  30 + private final boolean sslEnabled;
  31 +
  32 + public UdpTransportServerInitializer(TcpTransportContext context, boolean sslEnabled) {
  33 + this.context = context;
  34 + this.sslEnabled = sslEnabled;
  35 + }
  36 +
  37 + @Override
  38 + public void initChannel(Channel ch) {
  39 + ChannelPipeline pipeline = ch.pipeline();
  40 + SslHandler sslHandler = null;
  41 +
  42 + UdpTransportHandler handler = new UdpTransportHandler(context, sslHandler);
  43 +// NettyUdpServerHandler handler = new NettyUdpServerHandler();
  44 +
  45 + pipeline.addLast(handler);
  46 + ch.closeFuture().addListener(handler);
  47 + }
  48 +
  49 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.udp;
  17 +
  18 +import io.netty.bootstrap.Bootstrap;
  19 +import io.netty.buffer.PooledByteBufAllocator;
  20 +import io.netty.channel.Channel;
  21 +import io.netty.channel.ChannelOption;
  22 +import io.netty.channel.EventLoopGroup;
  23 +import io.netty.channel.nio.NioEventLoopGroup;
  24 +import io.netty.channel.socket.nio.NioDatagramChannel;
  25 +import io.netty.util.AttributeKey;
  26 +import io.netty.util.ResourceLeakDetector;
  27 +import lombok.extern.slf4j.Slf4j;
  28 +import org.springframework.beans.factory.annotation.Autowired;
  29 +import org.springframework.beans.factory.annotation.Value;
  30 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  31 +import org.springframework.stereotype.Service;
  32 +import org.thingsboard.server.common.data.DataConstants;
  33 +import org.thingsboard.server.common.data.TbTransportService;
  34 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  35 +
  36 +import javax.annotation.PostConstruct;
  37 +import javax.annotation.PreDestroy;
  38 +import java.net.InetSocketAddress;
  39 +
  40 +/**
  41 + * @author Andrew Shvayka
  42 + */
  43 +@Service("UdpTransportService")
  44 +@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.tcp.enabled}'=='true')")
  45 +@Slf4j
  46 +public class UdpTransportService implements TbTransportService {
  47 +
  48 + public static AttributeKey<InetSocketAddress> ADDRESS = AttributeKey.newInstance("UDP_SRC_ADDRESS");
  49 +
  50 + @Value("${transport.tcp.bind_address}")
  51 + private String host;
  52 + @Value("${transport.tcp.bind_port}")
  53 + private Integer port;
  54 +
  55 + @Value("${transport.tcp.ssl.enabled}")
  56 + private boolean sslEnabled;
  57 +
  58 + @Value("${transport.tcp.ssl.bind_address}")
  59 + private String sslHost;
  60 + @Value("${transport.tcp.ssl.bind_port}")
  61 + private Integer sslPort;
  62 +
  63 + @Value("${transport.tcp.netty.leak_detector_level}")
  64 + private String leakDetectorLevel;
  65 + @Value("${transport.tcp.netty.boss_group_thread_count}")
  66 + private Integer bossGroupThreadCount;
  67 + @Value("${transport.tcp.netty.worker_group_thread_count}")
  68 + private Integer workerGroupThreadCount;
  69 + @Value("${transport.tcp.netty.so_keep_alive}")
  70 + private boolean keepAlive;
  71 +
  72 + @Autowired
  73 + private TcpTransportContext context;
  74 +
  75 + private Channel serverChannel;
  76 + private Channel sslServerChannel;
  77 + private EventLoopGroup bossGroup;
  78 +
  79 + @PostConstruct
  80 + public void init() throws Exception {
  81 + log.info("Setting resource leak detector level to {}", leakDetectorLevel);
  82 + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
  83 +
  84 + log.info("Starting TCP transport...");
  85 + bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
  86 + Bootstrap b = new Bootstrap();
  87 + b.group(bossGroup)
  88 + .channel(NioDatagramChannel.class)
  89 + .option(ChannelOption.SO_BROADCAST, true)
  90 + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  91 + .handler(new UdpTransportServerInitializer(context, false));
  92 + serverChannel = b.bind(host, port).sync().channel();
  93 + if (sslEnabled) {
  94 + b = new Bootstrap();
  95 + b.group(bossGroup)
  96 + .channel(NioDatagramChannel.class)
  97 + .handler(new UdpTransportServerInitializer(context, false))
  98 + .option(ChannelOption.SO_BROADCAST, true);
  99 + sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
  100 + }
  101 + log.info("TCP transport started!");
  102 + }
  103 +
  104 + @PreDestroy
  105 + public void shutdown() throws InterruptedException {
  106 + log.info("Stopping TCP transport!");
  107 + try {
  108 + serverChannel.close().sync();
  109 + if (sslEnabled) {
  110 + sslServerChannel.close().sync();
  111 + }
  112 + } finally {
  113 + bossGroup.shutdownGracefully();
  114 + }
  115 + log.info("TCP transport stopped!");
  116 + }
  117 +
  118 + @Override
  119 + public String getName() {
  120 + return DataConstants.TCP_TRANSPORT_NAME;
  121 + }
  122 +}
... ...