Commit 603457a6a8c5a6fcc481631c0ecf6d347e50fe6b

Authored by xp.Huang
2 parents 4243e916 a6b8f012

Merge branch '20230627' into 'master_dev'

refactor: UDP协议功能完善

See merge request yunteng/thingskit!201
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.udp;
  17 +
  18 +
  19 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
  20 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
  21 +
  22 +import com.fasterxml.jackson.databind.JsonNode;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.MoreExecutors;
  27 +import io.netty.buffer.ByteBuf;
  28 +import io.netty.buffer.Unpooled;
  29 +import io.netty.channel.ChannelFuture;
  30 +import io.netty.channel.ChannelHandlerContext;
  31 +import io.netty.handler.codec.mqtt.*;
  32 +import io.netty.util.concurrent.Future;
  33 +import io.netty.util.concurrent.GenericFutureListener;
  34 +import java.io.IOException;
  35 +import java.net.InetSocketAddress;
  36 +import java.util.Map;
  37 +import java.util.Optional;
  38 +import java.util.UUID;
  39 +import java.util.concurrent.atomic.AtomicInteger;
  40 +import lombok.extern.slf4j.Slf4j;
  41 +import org.apache.commons.lang3.StringUtils;
  42 +import org.checkerframework.checker.nullness.qual.Nullable;
  43 +import org.thingsboard.common.util.JacksonUtil;
  44 +import org.thingsboard.server.common.data.DataConstants;
  45 +import org.thingsboard.server.common.data.Device;
  46 +import org.thingsboard.server.common.data.DeviceProfile;
  47 +import org.thingsboard.server.common.data.DeviceTransportType;
  48 +import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
  49 +import org.thingsboard.server.common.data.id.DeviceId;
  50 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  51 +import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
  52 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
  53 +import org.thingsboard.server.common.transport.SessionMsgListener;
  54 +import org.thingsboard.server.common.transport.TransportService;
  55 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  56 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  57 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  58 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  59 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  60 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  61 +import org.thingsboard.server.common.transport.service.SessionMetaData;
  62 +import org.thingsboard.server.gen.transport.TransportProtos;
  63 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  64 +import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
  65 +import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
  66 +import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
  67 +import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
  68 +import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;
  69 +import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
  70 +import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
  71 +
  72 +/**
  73 + * @author Andrew Shvayka
  74 + */
  75 +@Slf4j
  76 + public class UdpDatagramDataHandler implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
  77 +
  78 + private final UUID sessionId;
  79 + private final TcpTransportContext context;
  80 + private final TransportService transportService;
  81 +
  82 +
  83 + /**
  84 + * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
  85 + */
  86 + final TcpDeviceSessionCtx deviceSessionCtx;
  87 + volatile InetSocketAddress address;
  88 +
  89 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
  90 + private final AtomicInteger authedCounter = new AtomicInteger();
  91 +
  92 +
  93 + UdpDatagramDataHandler(ChannelHandlerContext ctx,TcpTransportContext context, InetSocketAddress devAddress) {
  94 + super();
  95 + this.sessionId = UUID.randomUUID();
  96 + this.context = context;
  97 + this.address = devAddress;
  98 + this.transportService = context.getTransportService();
  99 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  100 + }
  101 +
  102 +
  103 +
  104 +
  105 +
  106 +
  107 + public void dealDeviceMsg(ChannelHandlerContext ctx, String msg) {
  108 + deviceSessionCtx.setChannel(ctx);
  109 + if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
  110 + processConnect(ctx, msg);
  111 + return;
  112 + }
  113 + final int queueSize = deviceSessionCtx.getMsgQueueSize();
  114 + if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
  115 + log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
  116 + deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
  117 +// ctx.close();
  118 + return;
  119 + }
  120 + deviceSessionCtx.addToQueue(msg);
  121 + processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
  122 +
  123 + }
  124 +
  125 +
  126 + void processQueueMessage(ChannelHandlerContext ctx) {
  127 + if (!deviceSessionCtx.isConnected()) {
  128 + log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
  129 + return;
  130 + }
  131 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  132 + }
  133 +
  134 +
  135 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
  136 + if (!checkConnected(ctx, tcpMessage)) {
  137 + return;
  138 + }
  139 + deviceSessionCtx.doUpScript(tcpMessage, r -> {
  140 + if (gatewaySessionHandler != null) {
  141 + processGatewayDeviceMsg(ctx, r);
  142 + }
  143 + processDirectDeviceMsg(ctx, r);
  144 + });
  145 +
  146 + }
  147 +
  148 +
  149 + /**
  150 + * 上行脚本解析结果是否包含数据
  151 + * @param datas 数据集合
  152 + * @return
  153 + */
  154 + private boolean hasDatas(Map<String, Object> datas) {
  155 + if (datas == null || datas.isEmpty()) {
  156 + return false;
  157 + }
  158 + return true;
  159 + }
  160 +
  161 +
  162 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  163 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  164 + try {
  165 + Map<String, Object> datas = tcpMessage.getDatas();
  166 + if (hasDatas(datas)) {
  167 + datas.forEach((devName, param) -> {
  168 + if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
  169 + return;
  170 + }
  171 + if (tcpMessage.getTelemetry()) {
  172 + gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
  173 + } else {
  174 +// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
  175 + }
  176 +
  177 + });
  178 + } else {
  179 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  180 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  181 + }
  182 + } catch (Exception e) {
  183 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  184 +// ctx.close();
  185 + }
  186 + }
  187 +
  188 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  189 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  190 + try {
  191 + TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
  192 + Map<String, Object> datas = tcpMessage.getDatas();
  193 + if (hasDatas(datas)) {
  194 + String dataStr = JacksonUtil.toString(datas);
  195 + if (tcpMessage.getTelemetry()) {
  196 + TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
  197 + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
  198 + } else {
  199 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  200 + transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
  201 + }
  202 + } else {
  203 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  204 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  205 + }
  206 + } catch (AdaptorException e) {
  207 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  208 +// ctx.close();
  209 + }
  210 + }
  211 +
  212 +
  213 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {
  214 + return new TransportServiceCallback<>() {
  215 + @Override
  216 + public void onSuccess(Void dummy) {
  217 + log.trace("[{}] Published msg: {}", sessionId, msg);
  218 + if (StringUtils.isNotEmpty(msg.getAckMsg())) {
  219 + pushDeviceMsg(ctx, msg.getAckMsg());
  220 + }
  221 + }
  222 +
  223 + @Override
  224 + public void onError(Throwable e) {
  225 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  226 +// ctx.close();
  227 + }
  228 + };
  229 + }
  230 +
  231 +
  232 + void processConnect(ChannelHandlerContext ctx, String accessToken) {
  233 + log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
  234 +
  235 + if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {
  236 + deviceSessionCtx.setProvisionOnly(true);
  237 + pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");
  238 + } else {
  239 + TkScriptInvokeService.authScripts.forEach(id -> {
  240 + ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
  241 + Futures.addCallback(item, new FutureCallback<String>() {
  242 + @Override
  243 + public void onSuccess(@Nullable String result) {
  244 + processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));
  245 + }
  246 +
  247 + @Override
  248 + public void onFailure(Throwable t) {
  249 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  250 + }
  251 + }, MoreExecutors.directExecutor());
  252 +
  253 + });
  254 + }
  255 + }
  256 +
  257 + private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
  258 +
  259 + log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);
  260 + if (null != accessToken.getClientId()) {
  261 + }
  262 + if (null != accessToken.getUserName()) {
  263 + }
  264 + String token = accessToken.getPassword();
  265 + if(StringUtils.isNotEmpty(token)){
  266 + TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
  267 + request.setToken(token);
  268 + transportService.process(DeviceTransportType.TCP, request.build(),
  269 + new TransportServiceCallback<>() {
  270 + @Override
  271 + public void onSuccess(ValidateDeviceCredentialsResponse msg) {
  272 + onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
  273 + }
  274 +
  275 + @Override
  276 + public void onError(Throwable e) {
  277 + log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
  278 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  279 + }
  280 + });
  281 + }else{
  282 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  283 + }
  284 + }
  285 +
  286 +
  287 +
  288 +
  289 +
  290 +
  291 +
  292 +
  293 +
  294 + private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
  295 + if (deviceSessionCtx.isConnected()) {
  296 + return true;
  297 + } else {
  298 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
  299 + return false;
  300 + }
  301 + }
  302 +
  303 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  304 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  305 + try {
  306 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  307 + if (infoNode != null) {
  308 + JsonNode gatewayNode = infoNode.get("gateway");
  309 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  310 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  311 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  312 + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  313 + }
  314 + }
  315 + }
  316 + } catch (IOException e) {
  317 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  318 + }
  319 + }
  320 +
  321 + @Override
  322 + public void operationComplete(Future<? super Void> future) throws Exception {
  323 + log.trace("[{}] Channel closed!", sessionId);
  324 + doDisconnect();
  325 + }
  326 +
  327 + public void doDisconnect() {
  328 + if (deviceSessionCtx.isConnected()) {
  329 + log.debug("[{}] Client disconnected!", sessionId);
  330 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  331 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  332 + if (gatewaySessionHandler != null) {
  333 + gatewaySessionHandler.onGatewayDisconnect();
  334 + }
  335 + deviceSessionCtx.setDisconnected();
  336 + }
  337 + deviceSessionCtx.release();
  338 + }
  339 +
  340 +
  341 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {
  342 + if (!msg.hasDeviceInfo()) {
  343 + context.onAuthFailure(address);
  344 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
  345 + } else {
  346 + DeviceProfile profile = msg.getDeviceProfile();
  347 + TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();
  348 + if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
  349 + authedCounter.incrementAndGet();
  350 + return;
  351 + }
  352 + context.onAuthSuccess(address);
  353 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  354 + deviceSessionCtx.setDeviceProfile(profile);
  355 + deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
  356 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
  357 + @Override
  358 + public void onSuccess(Void msg) {
  359 + SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpDatagramDataHandler.this);
  360 + checkGatewaySession(sessionMetaData);
  361 + pushDeviceMsg(ctx,authEntry.getSuccess());
  362 + deviceSessionCtx.setConnected(true);
  363 + log.debug("[{}] Client connected!", sessionId);
  364 +
  365 + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
  366 + transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
  367 + }
  368 +
  369 + @Override
  370 + public void onError(Throwable e) {
  371 + if (e instanceof TbRateLimitsException) {
  372 + log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
  373 + } else {
  374 + log.warn("[{}] Failed to submit session event", sessionId, e);
  375 + }
  376 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  377 + }
  378 + });
  379 + }
  380 + }
  381 +
  382 + private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
  383 + authedCounter.incrementAndGet();
  384 + if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
  385 + pushDeviceMsg(ctx,msg.name());
  386 +
  387 +// ctx.close();
  388 + }
  389 + }
  390 +
  391 + @Override
  392 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  393 +
  394 + }
  395 +
  396 + @Override
  397 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  398 +
  399 + }
  400 +
  401 + @Override
  402 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  403 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  404 +// deviceSessionCtx.getChannel().close();
  405 + }
  406 +
  407 + @Override
  408 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  409 + log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
  410 + TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
  411 + try {
  412 + adaptor
  413 + .convertToPublish(deviceSessionCtx, rpcRequest)
  414 + .ifPresent(
  415 + payload -> {
  416 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
  417 + Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{
  418 + cf.addListener(
  419 + result -> {
  420 + if (result.cause() == null) {
  421 + transportService.process(
  422 + deviceSessionCtx.getSessionInfo(),
  423 + rpcRequest,
  424 + RpcStatus.DELIVERED,
  425 + TransportServiceCallback.EMPTY);
  426 + } else {
  427 + // TODO: send error
  428 + }
  429 + });
  430 + });
  431 + ;
  432 + });
  433 + } catch (Exception e) {
  434 + transportService.process(deviceSessionCtx.getSessionInfo(),
  435 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  436 + .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
  437 + log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
  438 + }
  439 + }
  440 +
  441 +
  442 + @Override
  443 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  444 + log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
  445 + }
  446 +
  447 + /**
  448 + * 往设备推送消息
  449 + *
  450 + * @param message
  451 + * @return
  452 + */
  453 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
  454 + if(StringUtils.isBlank(message)){
  455 + return null;
  456 + }
  457 + ByteBuf buff = Unpooled.buffer();
  458 + if(!message.matches("-?[0-9a-fA-F]+")){
  459 + //不满足16进制将字符串转为16进制
  460 + message = ByteUtils.stringEncodeToHex(message);
  461 + }
  462 + buff.writeBytes(ByteUtils.hexToByteArray(message));
  463 + return ctx.writeAndFlush(buff);
  464 + }
  465 +
  466 +
  467 + @Override
  468 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  469 + deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
  470 + }
  471 +
  472 + @Override
  473 + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  474 + deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
  475 + }
  476 +
  477 + @Override
  478 + public void onDeviceDeleted(DeviceId deviceId) {
  479 + context.onAuthFailure(address);
  480 + ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
  481 +// ctx.close();
  482 + }
  483 +}
@@ -16,14 +16,7 @@ @@ -16,14 +16,7 @@
16 package org.thingsboard.server.transport.udp; 16 package org.thingsboard.server.transport.udp;
17 17
18 18
19 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;  
20 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;  
21 -  
22 -import com.fasterxml.jackson.databind.JsonNode;  
23 -import com.google.common.util.concurrent.FutureCallback;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import com.google.common.util.concurrent.MoreExecutors; 19 +
27 import io.netty.buffer.ByteBuf; 20 import io.netty.buffer.ByteBuf;
28 import io.netty.buffer.Unpooled; 21 import io.netty.buffer.Unpooled;
29 import io.netty.channel.ChannelFuture; 22 import io.netty.channel.ChannelFuture;
@@ -31,79 +24,40 @@ import io.netty.channel.ChannelHandlerContext; @@ -31,79 +24,40 @@ import io.netty.channel.ChannelHandlerContext;
31 import io.netty.channel.ChannelInboundHandlerAdapter; 24 import io.netty.channel.ChannelInboundHandlerAdapter;
32 import io.netty.channel.socket.DatagramPacket; 25 import io.netty.channel.socket.DatagramPacket;
33 import io.netty.handler.codec.mqtt.*; 26 import io.netty.handler.codec.mqtt.*;
34 -import io.netty.handler.ssl.SslHandler;  
35 import io.netty.util.ReferenceCountUtil; 27 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.concurrent.Future; 28 import io.netty.util.concurrent.Future;
37 import io.netty.util.concurrent.GenericFutureListener; 29 import io.netty.util.concurrent.GenericFutureListener;
38 -import java.io.IOException;  
39 import java.net.InetSocketAddress; 30 import java.net.InetSocketAddress;
40 import java.util.Map; 31 import java.util.Map;
41 -import java.util.Optional;  
42 import java.util.UUID; 32 import java.util.UUID;
43 -import java.util.concurrent.atomic.AtomicInteger;  
44 - 33 +import java.util.concurrent.ConcurrentHashMap;
  34 +import java.util.concurrent.ConcurrentMap;
45 import lombok.extern.slf4j.Slf4j; 35 import lombok.extern.slf4j.Slf4j;
46 import org.apache.commons.lang3.StringUtils; 36 import org.apache.commons.lang3.StringUtils;
47 -import org.checkerframework.checker.nullness.qual.Nullable;  
48 -import org.thingsboard.common.util.JacksonUtil;  
49 -import org.thingsboard.server.common.data.DataConstants;  
50 -import org.thingsboard.server.common.data.Device;  
51 -import org.thingsboard.server.common.data.DeviceProfile;  
52 -import org.thingsboard.server.common.data.DeviceTransportType;  
53 -import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;  
54 -import org.thingsboard.server.common.data.id.DeviceId;  
55 -import org.thingsboard.server.common.data.rpc.RpcStatus;  
56 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils; 37 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
57 -import org.thingsboard.server.common.msg.tools.TbRateLimitsException;  
58 -import org.thingsboard.server.common.transport.SessionMsgListener;  
59 -import org.thingsboard.server.common.transport.TransportService;  
60 -import org.thingsboard.server.common.transport.TransportServiceCallback;  
61 -import org.thingsboard.server.common.transport.adaptor.AdaptorException;  
62 -import org.thingsboard.server.common.transport.auth.SessionInfoCreator;  
63 -import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;  
64 -import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;  
65 -import org.thingsboard.server.common.transport.service.DefaultTransportService;  
66 -import org.thingsboard.server.common.transport.service.SessionMetaData;  
67 -import org.thingsboard.server.gen.transport.TransportProtos;  
68 import org.thingsboard.server.transport.tcp.TcpTransportContext; 38 import org.thingsboard.server.transport.tcp.TcpTransportContext;
69 -import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;  
70 -import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;  
71 -import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;  
72 -import org.thingsboard.server.transport.tcp.script.TkScriptFactory;  
73 -import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;  
74 -import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;  
75 -import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;  
76 import org.thingsboard.server.transport.tcp.util.ByteBufUtils; 39 import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
77 40
78 /** 41 /**
79 * @author Andrew Shvayka 42 * @author Andrew Shvayka
80 */ 43 */
81 @Slf4j 44 @Slf4j
82 - public class UdpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener { 45 + public class UdpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
83 46
84 private final UUID sessionId; 47 private final UUID sessionId;
85 private final TcpTransportContext context; 48 private final TcpTransportContext context;
86 - private final TransportService transportService;  
87 - private final SslHandler sslHandler;  
88 49
89 50
90 /** 51 /**
91 * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。 52 * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
92 */ 53 */
93 - final TcpDeviceSessionCtx deviceSessionCtx;  
94 - volatile InetSocketAddress address; 54 + private final ConcurrentMap<InetSocketAddress, UdpDatagramDataHandler> deviceSessiones = new ConcurrentHashMap<>();
95 55
96 - volatile TcpGatewaySessionHandler gatewaySessionHandler;  
97 - private final AtomicInteger authedCounter = new AtomicInteger();  
98 56
99 -  
100 - UdpTransportHandler(TcpTransportContext context, SslHandler sslHandler) { 57 + UdpTransportHandler(TcpTransportContext context) {
101 super(); 58 super();
102 this.sessionId = UUID.randomUUID(); 59 this.sessionId = UUID.randomUUID();
103 this.context = context; 60 this.context = context;
104 - this.transportService = context.getTransportService();  
105 - this.sslHandler = sslHandler;  
106 - this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);  
107 } 61 }
108 62
109 @Override 63 @Override
@@ -123,24 +77,16 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -123,24 +77,16 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
123 log.error("【{}】 Processing msg: 【{}】", sessionId, msg); 77 log.error("【{}】 Processing msg: 【{}】", sessionId, msg);
124 try { 78 try {
125 if(!context.isReady()){ 79 if(!context.isReady()){
126 - ctx.close();  
127 return; 80 return;
128 } 81 }
129 - if (address == null) {  
130 - address = getAddress(ctx);  
131 - }  
132 if (msg instanceof DatagramPacket) { 82 if (msg instanceof DatagramPacket) {
133 DatagramPacket message = (DatagramPacket) msg; 83 DatagramPacket message = (DatagramPacket) msg;
  84 + InetSocketAddress address = getAddress(message,ctx);
134 byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content()); 85 byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content());
135 String msgStr = ByteUtils.bytesToStr(byteMsg); 86 String msgStr = ByteUtils.bytesToStr(byteMsg);
136 - log.error("会话【{}】收到设备【{}】来自【{}】数据【{}】", sessionId, deviceSessionCtx.getDeviceId(), address, msgStr);  
137 - deviceSessionCtx.setChannel(ctx);  
138 - if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {  
139 - processConnect(ctx, msgStr);  
140 - } else {  
141 - enqueueRegularSessionMsg(ctx, msgStr);  
142 - }  
143 - 87 + log.error("UDP服务【{}】收到来自【{}】数据【{}】", sessionId, address, msgStr);
  88 + UdpDatagramDataHandler dataHandler = deviceSessiones.computeIfAbsent(address,k->new UdpDatagramDataHandler(ctx,context,address));
  89 + dataHandler.dealDeviceMsg(ctx,msgStr);
144 } else { 90 } else {
145 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName()); 91 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
146 ctx.close(); 92 ctx.close();
@@ -150,8 +96,8 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -150,8 +96,8 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
150 } 96 }
151 } 97 }
152 98
153 - InetSocketAddress getAddress(ChannelHandlerContext ctx) {  
154 - var address = ctx.channel().attr(UdpTransportService.ADDRESS).get(); 99 + InetSocketAddress getAddress(DatagramPacket msg,ChannelHandlerContext ctx) {
  100 + var address = msg.sender();
155 if (address == null) { 101 if (address == null) {
156 log.trace("[{}] Received empty address.", ctx.channel().id()); 102 log.trace("[{}] Received empty address.", ctx.channel().id());
157 InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); 103 InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
@@ -164,42 +110,9 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -164,42 +110,9 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
164 } 110 }
165 111
166 112
167 - void enqueueRegularSessionMsg(ChannelHandlerContext ctx, String msg) {  
168 - final int queueSize = deviceSessionCtx.getMsgQueueSize();  
169 - if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {  
170 - log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",  
171 - deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());  
172 - ctx.close();  
173 - return;  
174 - }  
175 113
176 - deviceSessionCtx.addToQueue(msg);  
177 - processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool  
178 -  
179 - }  
180 114
181 115
182 - void processQueueMessage(ChannelHandlerContext ctx) {  
183 - if (!deviceSessionCtx.isConnected()) {  
184 - log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());  
185 - return;  
186 - }  
187 - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));  
188 - }  
189 -  
190 -  
191 - private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {  
192 - if (!checkConnected(ctx, tcpMessage)) {  
193 - return;  
194 - }  
195 - deviceSessionCtx.doUpScript(tcpMessage, r -> {  
196 - if (gatewaySessionHandler != null) {  
197 - processGatewayDeviceMsg(ctx, r);  
198 - }  
199 - processDirectDeviceMsg(ctx, r);  
200 - });  
201 -  
202 - }  
203 116
204 117
205 /** 118 /**
@@ -215,129 +128,7 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -215,129 +128,7 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
215 } 128 }
216 129
217 130
218 - private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
219 - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);  
220 - try {  
221 - Map<String, Object> datas = tcpMessage.getDatas();  
222 - if (hasDatas(datas)) {  
223 - datas.forEach((devName, param) -> {  
224 - if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {  
225 - return;  
226 - }  
227 - if (tcpMessage.getTelemetry()) {  
228 - gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());  
229 - } else {  
230 -// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());  
231 - }  
232 -  
233 - });  
234 - } else {  
235 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
236 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
237 - }  
238 - } catch (Exception e) {  
239 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
240 - ctx.close();  
241 - }  
242 - }  
243 131
244 - private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
245 - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);  
246 - try {  
247 - TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();  
248 - Map<String, Object> datas = tcpMessage.getDatas();  
249 - if (hasDatas(datas)) {  
250 - String dataStr = JacksonUtil.toString(datas);  
251 - if (tcpMessage.getTelemetry()) {  
252 - TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);  
253 - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));  
254 - } else {  
255 - TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);  
256 - transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));  
257 - }  
258 - } else {  
259 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
260 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
261 - }  
262 - } catch (AdaptorException e) {  
263 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
264 - ctx.close();  
265 - }  
266 - }  
267 -  
268 -  
269 - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {  
270 - return new TransportServiceCallback<>() {  
271 - @Override  
272 - public void onSuccess(Void dummy) {  
273 - log.trace("[{}] Published msg: {}", sessionId, msg);  
274 - if (StringUtils.isNotEmpty(msg.getAckMsg())) {  
275 - pushDeviceMsg(ctx, msg.getAckMsg());  
276 - }  
277 - }  
278 -  
279 - @Override  
280 - public void onError(Throwable e) {  
281 - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);  
282 - ctx.close();  
283 - }  
284 - };  
285 - }  
286 -  
287 -  
288 - void processConnect(ChannelHandlerContext ctx, String accessToken) {  
289 - log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);  
290 -  
291 - if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {  
292 - deviceSessionCtx.setProvisionOnly(true);  
293 - pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");  
294 - } else {  
295 - TkScriptInvokeService.authScripts.forEach(id -> {  
296 - ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);  
297 - Futures.addCallback(item, new FutureCallback<String>() {  
298 - @Override  
299 - public void onSuccess(@Nullable String result) {  
300 - processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));  
301 - }  
302 -  
303 - @Override  
304 - public void onFailure(Throwable t) {  
305 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);  
306 - }  
307 - }, MoreExecutors.directExecutor());  
308 -  
309 - });  
310 - }  
311 - }  
312 -  
313 - private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {  
314 -  
315 - log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);  
316 - if (null != accessToken.getClientId()) {  
317 - }  
318 - if (null != accessToken.getUserName()) {  
319 - }  
320 - String token = accessToken.getPassword();  
321 - if(StringUtils.isNotEmpty(token)){  
322 - TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();  
323 - request.setToken(token);  
324 - transportService.process(DeviceTransportType.TCP, request.build(),  
325 - new TransportServiceCallback<>() {  
326 - @Override  
327 - public void onSuccess(ValidateDeviceCredentialsResponse msg) {  
328 - onValidateDeviceResponse(msg, ctx, accessToken, scriptId);  
329 - }  
330 -  
331 - @Override  
332 - public void onError(Throwable e) {  
333 - log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);  
334 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);  
335 - }  
336 - });  
337 - }else{  
338 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);  
339 - }  
340 - }  
341 132
342 133
343 134
@@ -359,159 +150,14 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -359,159 +150,14 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
359 150
360 151
361 152
362 -  
363 - private boolean checkConnected(ChannelHandlerContext ctx, String msg) {  
364 - if (deviceSessionCtx.isConnected()) {  
365 - return true;  
366 - } else {  
367 - log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);  
368 - return false;  
369 - }  
370 - }  
371 -  
372 - private void checkGatewaySession(SessionMetaData sessionMetaData) {  
373 - TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();  
374 - try {  
375 - JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());  
376 - if (infoNode != null) {  
377 - JsonNode gatewayNode = infoNode.get("gateway");  
378 - if (gatewayNode != null && gatewayNode.asBoolean()) {  
379 - gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);  
380 - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {  
381 - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());  
382 - }  
383 - }  
384 - }  
385 - } catch (IOException e) {  
386 - log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);  
387 - }  
388 - }  
389 -  
390 @Override 153 @Override
391 public void operationComplete(Future<? super Void> future) throws Exception { 154 public void operationComplete(Future<? super Void> future) throws Exception {
392 log.trace("[{}] Channel closed!", sessionId); 155 log.trace("[{}] Channel closed!", sessionId);
393 - doDisconnect();  
394 } 156 }
395 157
396 - public void doDisconnect() {  
397 - if (deviceSessionCtx.isConnected()) {  
398 - log.debug("[{}] Client disconnected!", sessionId);  
399 - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);  
400 - transportService.deregisterSession(deviceSessionCtx.getSessionInfo());  
401 - if (gatewaySessionHandler != null) {  
402 - gatewaySessionHandler.onGatewayDisconnect();  
403 - }  
404 - deviceSessionCtx.setDisconnected();  
405 - }  
406 - deviceSessionCtx.release();  
407 - }  
408 158
409 159
410 - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {  
411 - if (!msg.hasDeviceInfo()) {  
412 - context.onAuthFailure(address);  
413 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);  
414 - } else {  
415 - DeviceProfile profile = msg.getDeviceProfile();  
416 - TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();  
417 - if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {  
418 - authedCounter.incrementAndGet();  
419 - return;  
420 - }  
421 - context.onAuthSuccess(address);  
422 - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());  
423 - deviceSessionCtx.setDeviceProfile(profile);  
424 - deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));  
425 - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {  
426 - @Override  
427 - public void onSuccess(Void msg) {  
428 - SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpTransportHandler.this);  
429 - checkGatewaySession(sessionMetaData);  
430 - pushDeviceMsg(ctx,authEntry.getSuccess());  
431 - deviceSessionCtx.setConnected(true);  
432 - log.debug("[{}] Client connected!", sessionId);  
433 -  
434 - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);  
435 - transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.  
436 - }  
437 -  
438 - @Override  
439 - public void onError(Throwable e) {  
440 - if (e instanceof TbRateLimitsException) {  
441 - log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());  
442 - } else {  
443 - log.warn("[{}] Failed to submit session event", sessionId, e);  
444 - }  
445 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);  
446 - }  
447 - });  
448 - }  
449 - }  
450 -  
451 - private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {  
452 - authedCounter.incrementAndGet();  
453 - if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {  
454 - pushDeviceMsg(ctx,msg.name());  
455 -  
456 - ctx.close();  
457 - }  
458 - }  
459 -  
460 - @Override  
461 - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {  
462 160
463 - }  
464 -  
465 - @Override  
466 - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {  
467 -  
468 - }  
469 -  
470 - @Override  
471 - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {  
472 - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());  
473 - deviceSessionCtx.getChannel().close();  
474 - }  
475 -  
476 - @Override  
477 - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {  
478 - log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());  
479 - TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();  
480 - try {  
481 - adaptor  
482 - .convertToPublish(deviceSessionCtx, rpcRequest)  
483 - .ifPresent(  
484 - payload -> {  
485 - deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);  
486 - Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{  
487 - cf.addListener(  
488 - result -> {  
489 - if (result.cause() == null) {  
490 - transportService.process(  
491 - deviceSessionCtx.getSessionInfo(),  
492 - rpcRequest,  
493 - RpcStatus.DELIVERED,  
494 - TransportServiceCallback.EMPTY);  
495 - } else {  
496 - // TODO: send error  
497 - }  
498 - });  
499 - });  
500 - ;  
501 - });  
502 - } catch (Exception e) {  
503 - transportService.process(deviceSessionCtx.getSessionInfo(),  
504 - TransportProtos.ToDeviceRpcResponseMsg.newBuilder()  
505 - .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);  
506 - log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);  
507 - }  
508 - }  
509 -  
510 -  
511 - @Override  
512 - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {  
513 - log.debug("[{}] 服务端响应设备的RPC请求", sessionId);  
514 - }  
515 161
516 /** 162 /**
517 * 往设备推送消息 163 * 往设备推送消息
@@ -532,21 +178,4 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -532,21 +178,4 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
532 return ctx.writeAndFlush(buff); 178 return ctx.writeAndFlush(buff);
533 } 179 }
534 180
535 -  
536 - @Override  
537 - public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {  
538 - deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);  
539 - }  
540 -  
541 - @Override  
542 - public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {  
543 - deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);  
544 - }  
545 -  
546 - @Override  
547 - public void onDeviceDeleted(DeviceId deviceId) {  
548 - context.onAuthFailure(address);  
549 - ChannelHandlerContext ctx = deviceSessionCtx.getChannel();  
550 - ctx.close();  
551 - }  
552 } 181 }
@@ -39,7 +39,7 @@ public class UdpTransportServerInitializer extends ChannelInitializer<Channel> { @@ -39,7 +39,7 @@ public class UdpTransportServerInitializer extends ChannelInitializer<Channel> {
39 ChannelPipeline pipeline = ch.pipeline(); 39 ChannelPipeline pipeline = ch.pipeline();
40 SslHandler sslHandler = null; 40 SslHandler sslHandler = null;
41 41
42 - UdpTransportHandler handler = new UdpTransportHandler(context, sslHandler); 42 + UdpTransportHandler handler = new UdpTransportHandler(context);
43 // NettyUdpServerHandler handler = new NettyUdpServerHandler(); 43 // NettyUdpServerHandler handler = new NettyUdpServerHandler();
44 44
45 pipeline.addLast(handler); 45 pipeline.addLast(handler);