Commit a6b8f012ee08e65d4dd401f7d2e8538b5c3355cb

Authored by 芯火源
1 parent 4243e916

refactor: UDP协议功能完善

  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.udp;
  17 +
  18 +
  19 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
  20 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
  21 +
  22 +import com.fasterxml.jackson.databind.JsonNode;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.MoreExecutors;
  27 +import io.netty.buffer.ByteBuf;
  28 +import io.netty.buffer.Unpooled;
  29 +import io.netty.channel.ChannelFuture;
  30 +import io.netty.channel.ChannelHandlerContext;
  31 +import io.netty.handler.codec.mqtt.*;
  32 +import io.netty.util.concurrent.Future;
  33 +import io.netty.util.concurrent.GenericFutureListener;
  34 +import java.io.IOException;
  35 +import java.net.InetSocketAddress;
  36 +import java.util.Map;
  37 +import java.util.Optional;
  38 +import java.util.UUID;
  39 +import java.util.concurrent.atomic.AtomicInteger;
  40 +import lombok.extern.slf4j.Slf4j;
  41 +import org.apache.commons.lang3.StringUtils;
  42 +import org.checkerframework.checker.nullness.qual.Nullable;
  43 +import org.thingsboard.common.util.JacksonUtil;
  44 +import org.thingsboard.server.common.data.DataConstants;
  45 +import org.thingsboard.server.common.data.Device;
  46 +import org.thingsboard.server.common.data.DeviceProfile;
  47 +import org.thingsboard.server.common.data.DeviceTransportType;
  48 +import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
  49 +import org.thingsboard.server.common.data.id.DeviceId;
  50 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  51 +import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
  52 +import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
  53 +import org.thingsboard.server.common.transport.SessionMsgListener;
  54 +import org.thingsboard.server.common.transport.TransportService;
  55 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  56 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  57 +import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  58 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  59 +import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  60 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  61 +import org.thingsboard.server.common.transport.service.SessionMetaData;
  62 +import org.thingsboard.server.gen.transport.TransportProtos;
  63 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  64 +import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
  65 +import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
  66 +import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
  67 +import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
  68 +import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;
  69 +import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
  70 +import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
  71 +
  72 +/**
  73 + * @author Andrew Shvayka
  74 + */
  75 +@Slf4j
  76 + public class UdpDatagramDataHandler implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
  77 +
  78 + private final UUID sessionId;
  79 + private final TcpTransportContext context;
  80 + private final TransportService transportService;
  81 +
  82 +
  83 + /**
  84 + * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
  85 + */
  86 + final TcpDeviceSessionCtx deviceSessionCtx;
  87 + volatile InetSocketAddress address;
  88 +
  89 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
  90 + private final AtomicInteger authedCounter = new AtomicInteger();
  91 +
  92 +
  93 + UdpDatagramDataHandler(ChannelHandlerContext ctx,TcpTransportContext context, InetSocketAddress devAddress) {
  94 + super();
  95 + this.sessionId = UUID.randomUUID();
  96 + this.context = context;
  97 + this.address = devAddress;
  98 + this.transportService = context.getTransportService();
  99 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  100 + }
  101 +
  102 +
  103 +
  104 +
  105 +
  106 +
  107 + public void dealDeviceMsg(ChannelHandlerContext ctx, String msg) {
  108 + deviceSessionCtx.setChannel(ctx);
  109 + if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
  110 + processConnect(ctx, msg);
  111 + return;
  112 + }
  113 + final int queueSize = deviceSessionCtx.getMsgQueueSize();
  114 + if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
  115 + log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
  116 + deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
  117 +// ctx.close();
  118 + return;
  119 + }
  120 + deviceSessionCtx.addToQueue(msg);
  121 + processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
  122 +
  123 + }
  124 +
  125 +
  126 + void processQueueMessage(ChannelHandlerContext ctx) {
  127 + if (!deviceSessionCtx.isConnected()) {
  128 + log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
  129 + return;
  130 + }
  131 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  132 + }
  133 +
  134 +
  135 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
  136 + if (!checkConnected(ctx, tcpMessage)) {
  137 + return;
  138 + }
  139 + deviceSessionCtx.doUpScript(tcpMessage, r -> {
  140 + if (gatewaySessionHandler != null) {
  141 + processGatewayDeviceMsg(ctx, r);
  142 + }
  143 + processDirectDeviceMsg(ctx, r);
  144 + });
  145 +
  146 + }
  147 +
  148 +
  149 + /**
  150 + * 上行脚本解析结果是否包含数据
  151 + * @param datas 数据集合
  152 + * @return
  153 + */
  154 + private boolean hasDatas(Map<String, Object> datas) {
  155 + if (datas == null || datas.isEmpty()) {
  156 + return false;
  157 + }
  158 + return true;
  159 + }
  160 +
  161 +
  162 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  163 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  164 + try {
  165 + Map<String, Object> datas = tcpMessage.getDatas();
  166 + if (hasDatas(datas)) {
  167 + datas.forEach((devName, param) -> {
  168 + if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
  169 + return;
  170 + }
  171 + if (tcpMessage.getTelemetry()) {
  172 + gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
  173 + } else {
  174 +// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
  175 + }
  176 +
  177 + });
  178 + } else {
  179 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  180 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  181 + }
  182 + } catch (Exception e) {
  183 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  184 +// ctx.close();
  185 + }
  186 + }
  187 +
  188 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  189 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
  190 + try {
  191 + TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
  192 + Map<String, Object> datas = tcpMessage.getDatas();
  193 + if (hasDatas(datas)) {
  194 + String dataStr = JacksonUtil.toString(datas);
  195 + if (tcpMessage.getTelemetry()) {
  196 + TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
  197 + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
  198 + } else {
  199 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  200 + transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
  201 + }
  202 + } else {
  203 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  204 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  205 + }
  206 + } catch (AdaptorException e) {
  207 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  208 +// ctx.close();
  209 + }
  210 + }
  211 +
  212 +
  213 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {
  214 + return new TransportServiceCallback<>() {
  215 + @Override
  216 + public void onSuccess(Void dummy) {
  217 + log.trace("[{}] Published msg: {}", sessionId, msg);
  218 + if (StringUtils.isNotEmpty(msg.getAckMsg())) {
  219 + pushDeviceMsg(ctx, msg.getAckMsg());
  220 + }
  221 + }
  222 +
  223 + @Override
  224 + public void onError(Throwable e) {
  225 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  226 +// ctx.close();
  227 + }
  228 + };
  229 + }
  230 +
  231 +
  232 + void processConnect(ChannelHandlerContext ctx, String accessToken) {
  233 + log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
  234 +
  235 + if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {
  236 + deviceSessionCtx.setProvisionOnly(true);
  237 + pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");
  238 + } else {
  239 + TkScriptInvokeService.authScripts.forEach(id -> {
  240 + ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
  241 + Futures.addCallback(item, new FutureCallback<String>() {
  242 + @Override
  243 + public void onSuccess(@Nullable String result) {
  244 + processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));
  245 + }
  246 +
  247 + @Override
  248 + public void onFailure(Throwable t) {
  249 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  250 + }
  251 + }, MoreExecutors.directExecutor());
  252 +
  253 + });
  254 + }
  255 + }
  256 +
  257 + private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
  258 +
  259 + log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);
  260 + if (null != accessToken.getClientId()) {
  261 + }
  262 + if (null != accessToken.getUserName()) {
  263 + }
  264 + String token = accessToken.getPassword();
  265 + if(StringUtils.isNotEmpty(token)){
  266 + TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
  267 + request.setToken(token);
  268 + transportService.process(DeviceTransportType.TCP, request.build(),
  269 + new TransportServiceCallback<>() {
  270 + @Override
  271 + public void onSuccess(ValidateDeviceCredentialsResponse msg) {
  272 + onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
  273 + }
  274 +
  275 + @Override
  276 + public void onError(Throwable e) {
  277 + log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
  278 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  279 + }
  280 + });
  281 + }else{
  282 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  283 + }
  284 + }
  285 +
  286 +
  287 +
  288 +
  289 +
  290 +
  291 +
  292 +
  293 +
  294 + private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
  295 + if (deviceSessionCtx.isConnected()) {
  296 + return true;
  297 + } else {
  298 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
  299 + return false;
  300 + }
  301 + }
  302 +
  303 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  304 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  305 + try {
  306 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  307 + if (infoNode != null) {
  308 + JsonNode gatewayNode = infoNode.get("gateway");
  309 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  310 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  311 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  312 + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  313 + }
  314 + }
  315 + }
  316 + } catch (IOException e) {
  317 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  318 + }
  319 + }
  320 +
  321 + @Override
  322 + public void operationComplete(Future<? super Void> future) throws Exception {
  323 + log.trace("[{}] Channel closed!", sessionId);
  324 + doDisconnect();
  325 + }
  326 +
  327 + public void doDisconnect() {
  328 + if (deviceSessionCtx.isConnected()) {
  329 + log.debug("[{}] Client disconnected!", sessionId);
  330 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  331 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  332 + if (gatewaySessionHandler != null) {
  333 + gatewaySessionHandler.onGatewayDisconnect();
  334 + }
  335 + deviceSessionCtx.setDisconnected();
  336 + }
  337 + deviceSessionCtx.release();
  338 + }
  339 +
  340 +
  341 + private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {
  342 + if (!msg.hasDeviceInfo()) {
  343 + context.onAuthFailure(address);
  344 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
  345 + } else {
  346 + DeviceProfile profile = msg.getDeviceProfile();
  347 + TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();
  348 + if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
  349 + authedCounter.incrementAndGet();
  350 + return;
  351 + }
  352 + context.onAuthSuccess(address);
  353 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  354 + deviceSessionCtx.setDeviceProfile(profile);
  355 + deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
  356 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
  357 + @Override
  358 + public void onSuccess(Void msg) {
  359 + SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpDatagramDataHandler.this);
  360 + checkGatewaySession(sessionMetaData);
  361 + pushDeviceMsg(ctx,authEntry.getSuccess());
  362 + deviceSessionCtx.setConnected(true);
  363 + log.debug("[{}] Client connected!", sessionId);
  364 +
  365 + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
  366 + transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
  367 + }
  368 +
  369 + @Override
  370 + public void onError(Throwable e) {
  371 + if (e instanceof TbRateLimitsException) {
  372 + log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
  373 + } else {
  374 + log.warn("[{}] Failed to submit session event", sessionId, e);
  375 + }
  376 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
  377 + }
  378 + });
  379 + }
  380 + }
  381 +
  382 + private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
  383 + authedCounter.incrementAndGet();
  384 + if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
  385 + pushDeviceMsg(ctx,msg.name());
  386 +
  387 +// ctx.close();
  388 + }
  389 + }
  390 +
  391 + @Override
  392 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  393 +
  394 + }
  395 +
  396 + @Override
  397 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  398 +
  399 + }
  400 +
  401 + @Override
  402 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  403 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  404 +// deviceSessionCtx.getChannel().close();
  405 + }
  406 +
  407 + @Override
  408 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  409 + log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
  410 + TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
  411 + try {
  412 + adaptor
  413 + .convertToPublish(deviceSessionCtx, rpcRequest)
  414 + .ifPresent(
  415 + payload -> {
  416 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
  417 + Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{
  418 + cf.addListener(
  419 + result -> {
  420 + if (result.cause() == null) {
  421 + transportService.process(
  422 + deviceSessionCtx.getSessionInfo(),
  423 + rpcRequest,
  424 + RpcStatus.DELIVERED,
  425 + TransportServiceCallback.EMPTY);
  426 + } else {
  427 + // TODO: send error
  428 + }
  429 + });
  430 + });
  431 + ;
  432 + });
  433 + } catch (Exception e) {
  434 + transportService.process(deviceSessionCtx.getSessionInfo(),
  435 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  436 + .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
  437 + log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
  438 + }
  439 + }
  440 +
  441 +
  442 + @Override
  443 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  444 + log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
  445 + }
  446 +
  447 + /**
  448 + * 往设备推送消息
  449 + *
  450 + * @param message
  451 + * @return
  452 + */
  453 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
  454 + if(StringUtils.isBlank(message)){
  455 + return null;
  456 + }
  457 + ByteBuf buff = Unpooled.buffer();
  458 + if(!message.matches("-?[0-9a-fA-F]+")){
  459 + //不满足16进制将字符串转为16进制
  460 + message = ByteUtils.stringEncodeToHex(message);
  461 + }
  462 + buff.writeBytes(ByteUtils.hexToByteArray(message));
  463 + return ctx.writeAndFlush(buff);
  464 + }
  465 +
  466 +
  467 + @Override
  468 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  469 + deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
  470 + }
  471 +
  472 + @Override
  473 + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  474 + deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
  475 + }
  476 +
  477 + @Override
  478 + public void onDeviceDeleted(DeviceId deviceId) {
  479 + context.onAuthFailure(address);
  480 + ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
  481 +// ctx.close();
  482 + }
  483 +}
... ...
... ... @@ -16,14 +16,7 @@
16 16 package org.thingsboard.server.transport.udp;
17 17
18 18
19   -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
20   -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
21   -
22   -import com.fasterxml.jackson.databind.JsonNode;
23   -import com.google.common.util.concurrent.FutureCallback;
24   -import com.google.common.util.concurrent.Futures;
25   -import com.google.common.util.concurrent.ListenableFuture;
26   -import com.google.common.util.concurrent.MoreExecutors;
  19 +
27 20 import io.netty.buffer.ByteBuf;
28 21 import io.netty.buffer.Unpooled;
29 22 import io.netty.channel.ChannelFuture;
... ... @@ -31,79 +24,40 @@ import io.netty.channel.ChannelHandlerContext;
31 24 import io.netty.channel.ChannelInboundHandlerAdapter;
32 25 import io.netty.channel.socket.DatagramPacket;
33 26 import io.netty.handler.codec.mqtt.*;
34   -import io.netty.handler.ssl.SslHandler;
35 27 import io.netty.util.ReferenceCountUtil;
36 28 import io.netty.util.concurrent.Future;
37 29 import io.netty.util.concurrent.GenericFutureListener;
38   -import java.io.IOException;
39 30 import java.net.InetSocketAddress;
40 31 import java.util.Map;
41   -import java.util.Optional;
42 32 import java.util.UUID;
43   -import java.util.concurrent.atomic.AtomicInteger;
44   -
  33 +import java.util.concurrent.ConcurrentHashMap;
  34 +import java.util.concurrent.ConcurrentMap;
45 35 import lombok.extern.slf4j.Slf4j;
46 36 import org.apache.commons.lang3.StringUtils;
47   -import org.checkerframework.checker.nullness.qual.Nullable;
48   -import org.thingsboard.common.util.JacksonUtil;
49   -import org.thingsboard.server.common.data.DataConstants;
50   -import org.thingsboard.server.common.data.Device;
51   -import org.thingsboard.server.common.data.DeviceProfile;
52   -import org.thingsboard.server.common.data.DeviceTransportType;
53   -import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
54   -import org.thingsboard.server.common.data.id.DeviceId;
55   -import org.thingsboard.server.common.data.rpc.RpcStatus;
56 37 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
57   -import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
58   -import org.thingsboard.server.common.transport.SessionMsgListener;
59   -import org.thingsboard.server.common.transport.TransportService;
60   -import org.thingsboard.server.common.transport.TransportServiceCallback;
61   -import org.thingsboard.server.common.transport.adaptor.AdaptorException;
62   -import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
63   -import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
64   -import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
65   -import org.thingsboard.server.common.transport.service.DefaultTransportService;
66   -import org.thingsboard.server.common.transport.service.SessionMetaData;
67   -import org.thingsboard.server.gen.transport.TransportProtos;
68 38 import org.thingsboard.server.transport.tcp.TcpTransportContext;
69   -import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
70   -import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
71   -import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
72   -import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
73   -import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;
74   -import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
75   -import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
76 39 import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
77 40
78 41 /**
79 42 * @author Andrew Shvayka
80 43 */
81 44 @Slf4j
82   - public class UdpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
  45 + public class UdpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
83 46
84 47 private final UUID sessionId;
85 48 private final TcpTransportContext context;
86   - private final TransportService transportService;
87   - private final SslHandler sslHandler;
88 49
89 50
90 51 /**
91 52 * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
92 53 */
93   - final TcpDeviceSessionCtx deviceSessionCtx;
94   - volatile InetSocketAddress address;
  54 + private final ConcurrentMap<InetSocketAddress, UdpDatagramDataHandler> deviceSessiones = new ConcurrentHashMap<>();
95 55
96   - volatile TcpGatewaySessionHandler gatewaySessionHandler;
97   - private final AtomicInteger authedCounter = new AtomicInteger();
98 56
99   -
100   - UdpTransportHandler(TcpTransportContext context, SslHandler sslHandler) {
  57 + UdpTransportHandler(TcpTransportContext context) {
101 58 super();
102 59 this.sessionId = UUID.randomUUID();
103 60 this.context = context;
104   - this.transportService = context.getTransportService();
105   - this.sslHandler = sslHandler;
106   - this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
107 61 }
108 62
109 63 @Override
... ... @@ -123,24 +77,16 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
123 77 log.error("【{}】 Processing msg: 【{}】", sessionId, msg);
124 78 try {
125 79 if(!context.isReady()){
126   - ctx.close();
127 80 return;
128 81 }
129   - if (address == null) {
130   - address = getAddress(ctx);
131   - }
132 82 if (msg instanceof DatagramPacket) {
133 83 DatagramPacket message = (DatagramPacket) msg;
  84 + InetSocketAddress address = getAddress(message,ctx);
134 85 byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content());
135 86 String msgStr = ByteUtils.bytesToStr(byteMsg);
136   - log.error("会话【{}】收到设备【{}】来自【{}】数据【{}】", sessionId, deviceSessionCtx.getDeviceId(), address, msgStr);
137   - deviceSessionCtx.setChannel(ctx);
138   - if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
139   - processConnect(ctx, msgStr);
140   - } else {
141   - enqueueRegularSessionMsg(ctx, msgStr);
142   - }
143   -
  87 + log.error("UDP服务【{}】收到来自【{}】数据【{}】", sessionId, address, msgStr);
  88 + UdpDatagramDataHandler dataHandler = deviceSessiones.computeIfAbsent(address,k->new UdpDatagramDataHandler(ctx,context,address));
  89 + dataHandler.dealDeviceMsg(ctx,msgStr);
144 90 } else {
145 91 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
146 92 ctx.close();
... ... @@ -150,8 +96,8 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
150 96 }
151 97 }
152 98
153   - InetSocketAddress getAddress(ChannelHandlerContext ctx) {
154   - var address = ctx.channel().attr(UdpTransportService.ADDRESS).get();
  99 + InetSocketAddress getAddress(DatagramPacket msg,ChannelHandlerContext ctx) {
  100 + var address = msg.sender();
155 101 if (address == null) {
156 102 log.trace("[{}] Received empty address.", ctx.channel().id());
157 103 InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
... ... @@ -164,42 +110,9 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
164 110 }
165 111
166 112
167   - void enqueueRegularSessionMsg(ChannelHandlerContext ctx, String msg) {
168   - final int queueSize = deviceSessionCtx.getMsgQueueSize();
169   - if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
170   - log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
171   - deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
172   - ctx.close();
173   - return;
174   - }
175 113
176   - deviceSessionCtx.addToQueue(msg);
177   - processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
178   -
179   - }
180 114
181 115
182   - void processQueueMessage(ChannelHandlerContext ctx) {
183   - if (!deviceSessionCtx.isConnected()) {
184   - log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
185   - return;
186   - }
187   - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
188   - }
189   -
190   -
191   - private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
192   - if (!checkConnected(ctx, tcpMessage)) {
193   - return;
194   - }
195   - deviceSessionCtx.doUpScript(tcpMessage, r -> {
196   - if (gatewaySessionHandler != null) {
197   - processGatewayDeviceMsg(ctx, r);
198   - }
199   - processDirectDeviceMsg(ctx, r);
200   - });
201   -
202   - }
203 116
204 117
205 118 /**
... ... @@ -215,129 +128,7 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
215 128 }
216 129
217 130
218   - private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
219   - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
220   - try {
221   - Map<String, Object> datas = tcpMessage.getDatas();
222   - if (hasDatas(datas)) {
223   - datas.forEach((devName, param) -> {
224   - if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
225   - return;
226   - }
227   - if (tcpMessage.getTelemetry()) {
228   - gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
229   - } else {
230   -// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
231   - }
232   -
233   - });
234   - } else {
235   - transportService.reportActivity(deviceSessionCtx.getSessionInfo());
236   - pushDeviceMsg(ctx, tcpMessage.getAckMsg());
237   - }
238   - } catch (Exception e) {
239   - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
240   - ctx.close();
241   - }
242   - }
243 131
244   - private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
245   - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
246   - try {
247   - TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
248   - Map<String, Object> datas = tcpMessage.getDatas();
249   - if (hasDatas(datas)) {
250   - String dataStr = JacksonUtil.toString(datas);
251   - if (tcpMessage.getTelemetry()) {
252   - TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
253   - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
254   - } else {
255   - TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
256   - transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
257   - }
258   - } else {
259   - transportService.reportActivity(deviceSessionCtx.getSessionInfo());
260   - pushDeviceMsg(ctx, tcpMessage.getAckMsg());
261   - }
262   - } catch (AdaptorException e) {
263   - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
264   - ctx.close();
265   - }
266   - }
267   -
268   -
269   - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {
270   - return new TransportServiceCallback<>() {
271   - @Override
272   - public void onSuccess(Void dummy) {
273   - log.trace("[{}] Published msg: {}", sessionId, msg);
274   - if (StringUtils.isNotEmpty(msg.getAckMsg())) {
275   - pushDeviceMsg(ctx, msg.getAckMsg());
276   - }
277   - }
278   -
279   - @Override
280   - public void onError(Throwable e) {
281   - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
282   - ctx.close();
283   - }
284   - };
285   - }
286   -
287   -
288   - void processConnect(ChannelHandlerContext ctx, String accessToken) {
289   - log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
290   -
291   - if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {
292   - deviceSessionCtx.setProvisionOnly(true);
293   - pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");
294   - } else {
295   - TkScriptInvokeService.authScripts.forEach(id -> {
296   - ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
297   - Futures.addCallback(item, new FutureCallback<String>() {
298   - @Override
299   - public void onSuccess(@Nullable String result) {
300   - processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));
301   - }
302   -
303   - @Override
304   - public void onFailure(Throwable t) {
305   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
306   - }
307   - }, MoreExecutors.directExecutor());
308   -
309   - });
310   - }
311   - }
312   -
313   - private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
314   -
315   - log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);
316   - if (null != accessToken.getClientId()) {
317   - }
318   - if (null != accessToken.getUserName()) {
319   - }
320   - String token = accessToken.getPassword();
321   - if(StringUtils.isNotEmpty(token)){
322   - TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
323   - request.setToken(token);
324   - transportService.process(DeviceTransportType.TCP, request.build(),
325   - new TransportServiceCallback<>() {
326   - @Override
327   - public void onSuccess(ValidateDeviceCredentialsResponse msg) {
328   - onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
329   - }
330   -
331   - @Override
332   - public void onError(Throwable e) {
333   - log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
334   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
335   - }
336   - });
337   - }else{
338   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
339   - }
340   - }
341 132
342 133
343 134
... ... @@ -359,159 +150,14 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
359 150
360 151
361 152
362   -
363   - private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
364   - if (deviceSessionCtx.isConnected()) {
365   - return true;
366   - } else {
367   - log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
368   - return false;
369   - }
370   - }
371   -
372   - private void checkGatewaySession(SessionMetaData sessionMetaData) {
373   - TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
374   - try {
375   - JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
376   - if (infoNode != null) {
377   - JsonNode gatewayNode = infoNode.get("gateway");
378   - if (gatewayNode != null && gatewayNode.asBoolean()) {
379   - gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
380   - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
381   - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
382   - }
383   - }
384   - }
385   - } catch (IOException e) {
386   - log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
387   - }
388   - }
389   -
390 153 @Override
391 154 public void operationComplete(Future<? super Void> future) throws Exception {
392 155 log.trace("[{}] Channel closed!", sessionId);
393   - doDisconnect();
394 156 }
395 157
396   - public void doDisconnect() {
397   - if (deviceSessionCtx.isConnected()) {
398   - log.debug("[{}] Client disconnected!", sessionId);
399   - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
400   - transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
401   - if (gatewaySessionHandler != null) {
402   - gatewaySessionHandler.onGatewayDisconnect();
403   - }
404   - deviceSessionCtx.setDisconnected();
405   - }
406   - deviceSessionCtx.release();
407   - }
408 158
409 159
410   - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {
411   - if (!msg.hasDeviceInfo()) {
412   - context.onAuthFailure(address);
413   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
414   - } else {
415   - DeviceProfile profile = msg.getDeviceProfile();
416   - TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();
417   - if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
418   - authedCounter.incrementAndGet();
419   - return;
420   - }
421   - context.onAuthSuccess(address);
422   - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
423   - deviceSessionCtx.setDeviceProfile(profile);
424   - deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
425   - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
426   - @Override
427   - public void onSuccess(Void msg) {
428   - SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpTransportHandler.this);
429   - checkGatewaySession(sessionMetaData);
430   - pushDeviceMsg(ctx,authEntry.getSuccess());
431   - deviceSessionCtx.setConnected(true);
432   - log.debug("[{}] Client connected!", sessionId);
433   -
434   - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
435   - transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
436   - }
437   -
438   - @Override
439   - public void onError(Throwable e) {
440   - if (e instanceof TbRateLimitsException) {
441   - log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
442   - } else {
443   - log.warn("[{}] Failed to submit session event", sessionId, e);
444   - }
445   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
446   - }
447   - });
448   - }
449   - }
450   -
451   - private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
452   - authedCounter.incrementAndGet();
453   - if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
454   - pushDeviceMsg(ctx,msg.name());
455   -
456   - ctx.close();
457   - }
458   - }
459   -
460   - @Override
461   - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
462 160
463   - }
464   -
465   - @Override
466   - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
467   -
468   - }
469   -
470   - @Override
471   - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
472   - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
473   - deviceSessionCtx.getChannel().close();
474   - }
475   -
476   - @Override
477   - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
478   - log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
479   - TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
480   - try {
481   - adaptor
482   - .convertToPublish(deviceSessionCtx, rpcRequest)
483   - .ifPresent(
484   - payload -> {
485   - deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
486   - Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{
487   - cf.addListener(
488   - result -> {
489   - if (result.cause() == null) {
490   - transportService.process(
491   - deviceSessionCtx.getSessionInfo(),
492   - rpcRequest,
493   - RpcStatus.DELIVERED,
494   - TransportServiceCallback.EMPTY);
495   - } else {
496   - // TODO: send error
497   - }
498   - });
499   - });
500   - ;
501   - });
502   - } catch (Exception e) {
503   - transportService.process(deviceSessionCtx.getSessionInfo(),
504   - TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
505   - .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
506   - log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
507   - }
508   - }
509   -
510   -
511   - @Override
512   - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
513   - log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
514   - }
515 161
516 162 /**
517 163 * 往设备推送消息
... ... @@ -532,21 +178,4 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
532 178 return ctx.writeAndFlush(buff);
533 179 }
534 180
535   -
536   - @Override
537   - public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
538   - deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
539   - }
540   -
541   - @Override
542   - public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
543   - deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
544   - }
545   -
546   - @Override
547   - public void onDeviceDeleted(DeviceId deviceId) {
548   - context.onAuthFailure(address);
549   - ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
550   - ctx.close();
551   - }
552 181 }
... ...
... ... @@ -39,7 +39,7 @@ public class UdpTransportServerInitializer extends ChannelInitializer<Channel> {
39 39 ChannelPipeline pipeline = ch.pipeline();
40 40 SslHandler sslHandler = null;
41 41
42   - UdpTransportHandler handler = new UdpTransportHandler(context, sslHandler);
  42 + UdpTransportHandler handler = new UdpTransportHandler(context);
43 43 // NettyUdpServerHandler handler = new NettyUdpServerHandler();
44 44
45 45 pipeline.addLast(handler);
... ...