Commit d5910fe69cdfeb96a5a2bb7c85bbf8ee4bf027c3

Authored by xp.Huang
1 parent 85b24ea0

refactor: TCP/UDP结构调整及优化


(cherry picked from commit 69ed52a8427e1c7f2e1b0250693f796710c4999c)
1 /** 1 /**
2 * Copyright © 2016-2022 The Thingsboard Authors 2 * Copyright © 2016-2022 The Thingsboard Authors
3 * 3 *
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at 4 + * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  5 + * except in compliance with the License. You may obtain a copy of the License at
7 * 6 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 7 + * <p>http://www.apache.org/licenses/LICENSE-2.0
9 * 8 *
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and 9 + * <p>Unless required by applicable law or agreed to in writing, software distributed under the
  10 + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  11 + * express or implied. See the License for the specific language governing permissions and
14 * limitations under the License. 12 * limitations under the License.
15 */ 13 */
16 package org.thingsboard.server.transport.tcp; 14 package org.thingsboard.server.transport.tcp;
17 -  
18 -import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;  
19 -import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;  
20 -import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;  
21 -import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;  
22 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;  
23 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;  
24 -  
25 -import com.fasterxml.jackson.databind.JsonNode;  
26 -import com.fasterxml.jackson.databind.node.ObjectNode;  
27 -import com.fasterxml.jackson.databind.node.TextNode;  
28 -import com.google.common.util.concurrent.FutureCallback;  
29 -import com.google.common.util.concurrent.Futures;  
30 -import com.google.common.util.concurrent.ListenableFuture;  
31 -import com.google.common.util.concurrent.MoreExecutors;  
32 import io.netty.buffer.ByteBuf; 15 import io.netty.buffer.ByteBuf;
33 -import io.netty.buffer.Unpooled;  
34 -import io.netty.channel.ChannelFuture;  
35 import io.netty.channel.ChannelHandlerContext; 16 import io.netty.channel.ChannelHandlerContext;
36 import io.netty.channel.ChannelInboundHandlerAdapter; 17 import io.netty.channel.ChannelInboundHandlerAdapter;
37 -import io.netty.handler.codec.mqtt.*;  
38 -import io.netty.handler.ssl.SslHandler;  
39 import io.netty.util.ReferenceCountUtil; 18 import io.netty.util.ReferenceCountUtil;
40 -import io.netty.util.concurrent.Future;  
41 -import io.netty.util.concurrent.GenericFutureListener;  
42 -import java.io.IOException;  
43 import java.net.InetSocketAddress; 19 import java.net.InetSocketAddress;
44 import java.util.*; 20 import java.util.*;
45 -import java.util.concurrent.atomic.AtomicInteger;  
46 21
  22 +import io.netty.util.concurrent.Future;
  23 +import io.netty.util.concurrent.GenericFutureListener;
47 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
48 -import org.apache.commons.lang3.StringUtils;  
49 -import org.checkerframework.checker.nullness.qual.Nullable;  
50 -import org.thingsboard.common.util.JacksonUtil;  
51 -import org.thingsboard.server.common.data.DataConstants;  
52 -import org.thingsboard.server.common.data.Device;  
53 -import org.thingsboard.server.common.data.DeviceProfile;  
54 -import org.thingsboard.server.common.data.DeviceTransportType;  
55 -import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;  
56 -import org.thingsboard.server.common.data.id.DeviceId;  
57 -import org.thingsboard.server.common.data.rpc.RpcStatus;  
58 -import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;  
59 -import org.thingsboard.server.common.data.yunteng.dto.TkThingsModel;  
60 -import org.thingsboard.server.common.data.yunteng.enums.AttributeSourceDataTypeEnum;  
61 -import org.thingsboard.server.common.data.yunteng.enums.ProtocolAnalysisEnum;  
62 import org.thingsboard.server.common.data.yunteng.utils.*; 25 import org.thingsboard.server.common.data.yunteng.utils.*;
63 -import org.thingsboard.server.common.msg.tools.TbRateLimitsException;  
64 -import org.thingsboard.server.common.transport.SessionMsgListener;  
65 import org.thingsboard.server.common.transport.TransportService; 26 import org.thingsboard.server.common.transport.TransportService;
66 -import org.thingsboard.server.common.transport.TransportServiceCallback;  
67 -import org.thingsboard.server.common.transport.adaptor.AdaptorException;  
68 -import org.thingsboard.server.common.transport.auth.SessionInfoCreator;  
69 -import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;  
70 -import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;  
71 -import org.thingsboard.server.common.transport.service.DefaultTransportService;  
72 -import org.thingsboard.server.common.transport.service.SessionMetaData;  
73 -import org.thingsboard.server.gen.transport.TransportProtos;  
74 -import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;  
75 -import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;  
76 -import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;  
77 -import org.thingsboard.server.transport.tcp.script.TkScriptFactory;  
78 -import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;  
79 import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx; 27 import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
80 import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler; 28 import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
81 import org.thingsboard.server.transport.tcp.util.ByteBufUtils; 29 import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
82 30
  31 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
  32 +
83 /** 33 /**
84 * @author Andrew Shvayka 34 * @author Andrew Shvayka
85 */ 35 */
86 @Slf4j 36 @Slf4j
87 -public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {  
88 -  
89 -  
90 - private final UUID sessionId;  
91 - private final TcpTransportContext context;  
92 - private final TransportService transportService;  
93 - private final SslHandler sslHandler;  
94 -  
95 -  
96 - /**  
97 - * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。  
98 - */  
99 - final TcpDeviceSessionCtx deviceSessionCtx;  
100 - volatile InetSocketAddress address;  
101 -  
102 - volatile TcpGatewaySessionHandler gatewaySessionHandler;  
103 - private final AtomicInteger authedCounter = new AtomicInteger();  
104 -  
105 -  
106 - TcpTransportHandler(TcpTransportContext context, SslHandler sslHandler) {  
107 - this.sessionId = UUID.randomUUID();  
108 - this.context = context;  
109 - this.transportService = context.getTransportService();  
110 - this.sslHandler = sslHandler;  
111 - this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);  
112 - }  
113 -  
114 - @Override  
115 - public void channelRegistered(ChannelHandlerContext ctx) throws Exception {  
116 - super.channelRegistered(ctx);  
117 - context.channelRegistered();  
118 - }  
119 -  
120 - @Override  
121 - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {  
122 - super.channelUnregistered(ctx);  
123 - context.channelUnregistered();  
124 - }  
125 -  
126 - @Override  
127 - public void channelRead(ChannelHandlerContext ctx, Object msg) {  
128 - log.trace("【{}】 Processing msg: 【{}】", sessionId, msg);  
129 - try {  
130 - if(!context.isReady()){  
131 - ctx.close();  
132 - return;  
133 - }  
134 - if (address == null) {  
135 - address = getAddress(ctx);  
136 - }  
137 - if (msg instanceof ByteBuf) {  
138 - ByteBuf message = (ByteBuf) msg;  
139 - byte[] byteMsg = ByteBufUtils.buf2Bytes(message);  
140 - String msgStr = ByteUtils.bytesToStr(byteMsg);  
141 - log.debug("会话【{}】收到设备【{}】来自【{}】数据【{}】", sessionId, deviceSessionCtx.getDeviceId(), address, msgStr);  
142 - deviceSessionCtx.setChannel(ctx);  
143 - if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {  
144 - processConnect(ctx, msgStr);  
145 - } else {  
146 - enqueueRegularSessionMsg(ctx, msgStr);  
147 - }  
148 -  
149 - } else {  
150 - log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());  
151 - ctx.close();  
152 - }  
153 - } finally {  
154 - ReferenceCountUtil.safeRelease(msg);  
155 - }  
156 - }  
157 -  
158 - InetSocketAddress getAddress(ChannelHandlerContext ctx) {  
159 - var address = ctx.channel().attr(TcpTransportService.ADDRESS).get();  
160 - if (address == null) {  
161 - log.trace("[{}] Received empty address.", ctx.channel().id());  
162 - InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();  
163 - log.trace("[{}] Going to use address: {}", ctx.channel().id(), remoteAddress);  
164 - return remoteAddress;  
165 - } else {  
166 - log.trace("[{}] Received address: {}", ctx.channel().id(), address);  
167 - }  
168 - return address;  
169 - }  
170 -  
171 -  
172 - void enqueueRegularSessionMsg(ChannelHandlerContext ctx, String msg) {  
173 - final int queueSize = deviceSessionCtx.getMsgQueueSize();  
174 - if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {  
175 - log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",  
176 - deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());  
177 - ctx.close();  
178 - return;  
179 - }  
180 -  
181 - deviceSessionCtx.addToQueue(msg);  
182 - processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool  
183 -  
184 - }  
185 -  
186 -  
187 - void processQueueMessage(ChannelHandlerContext ctx) {  
188 - if (!deviceSessionCtx.isConnected()) {  
189 - log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());  
190 - return;  
191 - }  
192 - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));  
193 - }  
194 -  
195 -  
196 - private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {  
197 - if (!checkConnected(ctx, tcpMessage)) {  
198 - return;  
199 - }  
200 - //判断协议类型  
201 - TkTcpDeviceProfileTransportConfiguration transportConfiguration =  
202 - (TkTcpDeviceProfileTransportConfiguration) deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration();  
203 - switch (transportConfiguration.getProtocol())  
204 - {  
205 - case CUSTOM:  
206 - customScriptProcess(ctx,tcpMessage);  
207 - break;  
208 - case MODBUS_RTU:  
209 - modbusRtuProcess(ctx,tcpMessage);  
210 - break;  
211 -  
212 - }  
213 - }  
214 -  
215 -  
216 - private void modbusRtuProcess(ChannelHandlerContext ctx, String tcpMessage){  
217 - //移除空格  
218 - String hexString = tcpMessage.trim().replaceAll(" ","");  
219 - //判断是否为16进制HEX  
220 - if(hexString.matches("-?[0-9a-fA-F]+")){  
221 - boolean modbusCheckResult = ModbusUtils.isValidModbusResponseFrame(hexString);  
222 - //判断是否满足modbus标准,满足的才处理不满足的过滤掉  
223 - if(modbusCheckResult){  
224 - //根据上报的地址码,判断该条消息归属于那个设备的数据  
225 - String deviceAddress = hexString.substring(0,2);  
226 - int deviceCode = Integer.parseInt(deviceSessionCtx.getDeviceCode(),16);  
227 - int deviceAddressCode = Integer.parseInt(deviceAddress,16);  
228 - if(gatewaySessionHandler != null && deviceCode != deviceAddressCode){  
229 - gatewaySessionHandler.onDeviceTelemetry(deviceAddress, null, hexString,  
230 - ProtocolAnalysisEnum.MODBUS_RTU);  
231 - }else{  
232 - if(deviceCode == deviceAddressCode){  
233 - processCustomDirectDeviceMsg(ctx, deviceSessionCtx.getPayloadAdaptor()  
234 - .convertModbusHexToPublish(deviceSessionCtx,hexString).get());  
235 - }  
236 - }  
237 - }  
238 - }  
239 - }  
240 -  
241 -  
242 -  
243 -  
244 - private void customScriptProcess(ChannelHandlerContext ctx, String tcpMessage){  
245 - deviceSessionCtx.doUpScript(tcpMessage, r -> {  
246 - //根据网关上报的消息,判断消息的来源是否为网关子设备,判断依据deviceCode即设备地址码或设备标识符  
247 - if (gatewaySessionHandler != null && checkMessageIsFromSensor(r.getDatas())) {  
248 - processCustomGatewayDeviceMsg(ctx, r);  
249 - }else{  
250 - processCustomDirectDeviceMsg(ctx, r);  
251 - }  
252 - });  
253 - }  
254 -  
255 - private boolean checkMessageIsFromSensor(Map<String,Object> dataMap){  
256 - boolean isSensorMessage = true;  
257 - String gateWayDeviceCode = deviceSessionCtx.getDeviceCode();  
258 - for (Map.Entry<String,Object> entry :dataMap.entrySet()){  
259 - if(entry.getKey().equals(gateWayDeviceCode)){  
260 - isSensorMessage = false;  
261 - break;  
262 - }  
263 - }  
264 - return isSensorMessage;  
265 - }  
266 -  
267 - /**  
268 - * 上行脚本解析结果是否包含数据  
269 - * @param datas 数据集合  
270 - * @return  
271 - */  
272 - private boolean hasDatas(Map<String, Object> datas) {  
273 - if (datas == null || datas.isEmpty()) {  
274 - return false;  
275 - }  
276 - return true;  
277 - }  
278 -  
279 - private void processCustomGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
280 - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);  
281 - try {  
282 - Map<String, Object> datas = tcpMessage.getDatas();  
283 - if (hasDatas(datas)) {  
284 - datas.forEach((devName, param) -> {  
285 - if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {  
286 - return;  
287 - }  
288 - if (tcpMessage.getTelemetry()) {  
289 - gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString(),  
290 - ProtocolAnalysisEnum.CUSTOM);  
291 - } else {  
292 -// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());  
293 - }  
294 -  
295 - });  
296 - } else {  
297 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
298 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
299 - }  
300 - } catch (Exception e) {  
301 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
302 - ctx.close();  
303 - }  
304 - }  
305 -  
306 - private void processCustomDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
307 - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);  
308 - try {  
309 - TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();  
310 - Map<String, Object> datas = tcpMessage.getDatas();  
311 - if (hasDatas(datas)) {  
312 - String dataStr = JacksonUtil.toString(datas);  
313 - if (tcpMessage.getTelemetry()) {  
314 - TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);  
315 - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));  
316 - } else {  
317 - TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);  
318 - transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));  
319 - }  
320 - } else {  
321 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
322 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
323 - }  
324 - } catch (AdaptorException e) {  
325 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
326 - ctx.close();  
327 - }  
328 - }  
329 -  
330 -  
331 - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {  
332 - return new TransportServiceCallback<>() {  
333 - @Override  
334 - public void onSuccess(Void dummy) {  
335 - log.trace("[{}] Published msg: {}", sessionId, msg);  
336 - if (StringUtils.isNotEmpty(msg.getAckMsg())) {  
337 - pushDeviceMsg(ctx, msg.getAckMsg());  
338 - }  
339 - }  
340 -  
341 - @Override  
342 - public void onError(Throwable e) {  
343 - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);  
344 - ctx.close();  
345 - }  
346 - };  
347 - }  
348 -  
349 -  
350 - void processConnect(ChannelHandlerContext ctx, String accessToken) {  
351 - log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);  
352 -  
353 - if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {  
354 - deviceSessionCtx.setProvisionOnly(true);  
355 - pushDeviceMsg(ctx,CONNECTION_ACCEPTED.name());  
356 - } else {  
357 - TkScriptInvokeService.authScripts.forEach(id -> {  
358 - ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);  
359 - Futures.addCallback(item, new FutureCallback<String>() {  
360 - @Override  
361 - public void onSuccess(@Nullable String result) {  
362 - processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));  
363 - }  
364 -  
365 - @Override  
366 - public void onFailure(Throwable t) {  
367 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);  
368 - }  
369 - }, MoreExecutors.directExecutor());  
370 -  
371 - });  
372 - }  
373 - }  
374 -  
375 - private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {  
376 -  
377 - log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);  
378 - if (null != accessToken.getClientId()) {  
379 - }  
380 - if (null != accessToken.getUserName()) {  
381 - }  
382 - String token = accessToken.getPassword();  
383 - if(StringUtils.isNotEmpty(token)){  
384 - TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();  
385 - request.setToken(token);  
386 - transportService.process(DeviceTransportType.TCP, request.build(),  
387 - new TransportServiceCallback<>() {  
388 - @Override  
389 - public void onSuccess(ValidateDeviceCredentialsResponse msg) {  
390 - onValidateDeviceResponse(msg, ctx, accessToken, scriptId);  
391 - }  
392 -  
393 - @Override  
394 - public void onError(Throwable e) {  
395 - log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);  
396 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);  
397 - }  
398 - });  
399 - }else{  
400 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);  
401 - }  
402 - }  
403 -  
404 -  
405 -  
406 -  
407 - @Override  
408 - public void channelReadComplete(ChannelHandlerContext ctx) {  
409 - ctx.flush();  
410 - }  
411 -  
412 - @Override  
413 - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
414 - log.error("[{}] Unexpected Exception", sessionId, cause); 37 +public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
  38 +
  39 + private final UUID sessionId;
  40 + private final TcpTransportContext context;
  41 + private final TransportService transportService;
  42 +
  43 + private TcpUdpDataHandler dataHandler = null;
  44 + /** 需要处理的消息队列,例如:需要下发给设备的,设备上传的。 */
  45 + final TcpDeviceSessionCtx deviceSessionCtx;
  46 +
  47 + volatile InetSocketAddress address;
  48 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
  49 + TcpTransportHandler(TcpTransportContext context) {
  50 + this.sessionId = UUID.randomUUID();
  51 + this.context = context;
  52 + this.transportService = context.getTransportService();
  53 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  54 + }
  55 +
  56 + @Override
  57 + public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  58 + super.channelRegistered(ctx);
  59 + context.channelRegistered();
  60 + }
  61 +
  62 + @Override
  63 + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  64 + super.channelUnregistered(ctx);
  65 + context.channelUnregistered();
  66 + }
  67 +
  68 + @Override
  69 + public void channelRead(ChannelHandlerContext ctx, Object msg) {
  70 + log.debug("【{}】 TCP channelRead Processing msg: 【{}】", sessionId, msg);
  71 + try {
  72 + if (!context.isReady()) {
415 ctx.close(); 73 ctx.close();
416 - if (cause instanceof OutOfMemoryError) {  
417 - log.error("Received critical error. Going to shutdown the service.");  
418 - System.exit(1);  
419 - }  
420 - }  
421 -  
422 - private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {  
423 - MqttFixedHeader mqttFixedHeader =  
424 - new MqttFixedHeader(SUBACK, false, AT_MOST_ONCE, false, 0);  
425 - MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);  
426 - MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);  
427 - return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);  
428 - }  
429 -  
430 -  
431 - public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {  
432 - MqttFixedHeader mqttFixedHeader =  
433 - new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0);  
434 - MqttMessageIdVariableHeader mqttMsgIdVariableHeader =  
435 - MqttMessageIdVariableHeader.from(requestId);  
436 - return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);  
437 - }  
438 -  
439 - private boolean checkConnected(ChannelHandlerContext ctx, String msg) {  
440 - if (deviceSessionCtx.isConnected()) {  
441 - return true;  
442 - } else {  
443 - log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);  
444 - return false;  
445 - }  
446 - }  
447 -  
448 - private void checkGatewaySession(SessionMetaData sessionMetaData) {  
449 - TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();  
450 - try {  
451 - JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());  
452 - if (infoNode != null) {  
453 - JsonNode gatewayNode = infoNode.get("gateway");  
454 - if (gatewayNode != null && gatewayNode.asBoolean()) {  
455 - gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);  
456 - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {  
457 - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());  
458 - }  
459 - }  
460 - }  
461 - } catch (IOException e) {  
462 - log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);  
463 - }  
464 - }  
465 -  
466 - @Override  
467 - public void operationComplete(Future<? super Void> future) throws Exception {  
468 - log.trace("[{}] Channel closed!", sessionId);  
469 - doDisconnect();  
470 - }  
471 -  
472 - public void doDisconnect() {  
473 - if (deviceSessionCtx.isConnected()) {  
474 - log.debug("[{}] Client disconnected!", sessionId);  
475 - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);  
476 - transportService.deregisterSession(deviceSessionCtx.getSessionInfo());  
477 - if (gatewaySessionHandler != null) {  
478 - gatewaySessionHandler.onGatewayDisconnect();  
479 - }  
480 - deviceSessionCtx.setDisconnected();  
481 - }  
482 - deviceSessionCtx.release();  
483 - }  
484 -  
485 -  
486 - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {  
487 - if (!msg.hasDeviceInfo()) {  
488 - context.onAuthFailure(address);  
489 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);  
490 - } else {  
491 - DeviceProfile profile = msg.getDeviceProfile();  
492 - TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();  
493 - if (scriptId != null&& tcpConfig.getProtocol().equals(ProtocolAnalysisEnum.CUSTOM) && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {  
494 - authedCounter.incrementAndGet();  
495 - return;  
496 - }  
497 - context.onAuthSuccess(address);  
498 - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());  
499 - deviceSessionCtx.setDeviceProfile(profile);  
500 - deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));  
501 - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {  
502 - @Override  
503 - public void onSuccess(Void msg) {  
504 - SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), TcpTransportHandler.this);  
505 - checkGatewaySession(sessionMetaData);  
506 - pushDeviceMsg(ctx,authEntry.getSuccess());  
507 - deviceSessionCtx.setConnected(true);  
508 - log.debug("[{}] Client connected!", sessionId);  
509 -  
510 - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);  
511 - transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.  
512 - }  
513 -  
514 - @Override  
515 - public void onError(Throwable e) {  
516 - if (e instanceof TbRateLimitsException) {  
517 - log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());  
518 - } else {  
519 - log.warn("[{}] Failed to submit session event", sessionId, e);  
520 - }  
521 - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);  
522 - }  
523 - });  
524 - }  
525 - }  
526 -  
527 - private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {  
528 - authedCounter.incrementAndGet();  
529 - if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {  
530 - pushDeviceMsg(ctx,msg.name());  
531 -  
532 - ctx.close();  
533 - }  
534 - }  
535 -  
536 - @Override  
537 - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {  
538 -  
539 - }  
540 -  
541 - @Override  
542 - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {  
543 -  
544 - }  
545 -  
546 - @Override  
547 - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {  
548 - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());  
549 - deviceSessionCtx.getChannel().close();  
550 - }  
551 -  
552 - @Override  
553 - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {  
554 - log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());  
555 - TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();  
556 - try {  
557 - adaptor  
558 - .convertToPublish(deviceSessionCtx, rpcRequest)  
559 - .ifPresent(  
560 - payload -> {  
561 - deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);  
562 - Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{  
563 - cf.addListener(  
564 - result -> {  
565 - if (result.cause() == null) {  
566 - transportService.process(  
567 - deviceSessionCtx.getSessionInfo(),  
568 - rpcRequest,  
569 - RpcStatus.DELIVERED,  
570 - TransportServiceCallback.EMPTY);  
571 - } else {  
572 - // TODO: send error  
573 - }  
574 - });  
575 - });  
576 - ;  
577 - });  
578 - } catch (Exception e) {  
579 - transportService.process(deviceSessionCtx.getSessionInfo(),  
580 - TransportProtos.ToDeviceRpcResponseMsg.newBuilder()  
581 - .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);  
582 - log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);  
583 - }  
584 - }  
585 -  
586 -  
587 - @Override  
588 - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {  
589 - log.debug("[{}] 服务端响应设备的RPC请求", sessionId);  
590 - }  
591 -  
592 - /**  
593 - * 往设备推送消息  
594 - *  
595 - * @param message  
596 - * @return  
597 - */  
598 - private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {  
599 - if(StringUtils.isBlank(message)){  
600 - return null;  
601 - }  
602 - ByteBuf buff = Unpooled.buffer();  
603 - if(!message.matches("-?[0-9a-fA-F]+")){  
604 - //不满足16进制将字符串转为16进制  
605 - message = ByteUtils.stringEncodeToHex(message);  
606 - }  
607 - buff.writeBytes(ByteUtils.hexToByteArray(message));  
608 - return ctx.writeAndFlush(buff);  
609 - }  
610 -  
611 -  
612 - @Override  
613 - public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {  
614 - deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);  
615 - }  
616 -  
617 - @Override  
618 - public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {  
619 - deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);  
620 - deviceSessionCtx.setDeviceCode(JacksonUtil.toString(device.getAdditionalInfo()));  
621 - }  
622 -  
623 - @Override  
624 - public void onDeviceDeleted(DeviceId deviceId) {  
625 - context.onAuthFailure(address);  
626 - ChannelHandlerContext ctx = deviceSessionCtx.getChannel(); 74 + return;
  75 + }
  76 + if (address == null) {
  77 + address = getAddress(ctx);
  78 + }
  79 + if (msg instanceof ByteBuf) {
  80 + ByteBuf message = (ByteBuf) msg;
  81 + byte[] byteMsg = ByteBufUtils.buf2Bytes(message);
  82 + String msgStr = ByteUtils.bytesToStr(byteMsg);
  83 + log.debug(
  84 + "TCP服务【{}】收到设备【{}】来自【{}】数据【{}】",
  85 + sessionId,
  86 + deviceSessionCtx.getDeviceId(),
  87 + address,
  88 + msgStr);
  89 + if(null == dataHandler){
  90 + dataHandler = new TcpUdpDataHandler(context,sessionId,deviceSessionCtx,address,gatewaySessionHandler,true);
  91 + }
  92 + dataHandler.enqueueRegularSessionMsg(ctx, msgStr);
  93 + } else {
  94 + log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
627 ctx.close(); 95 ctx.close();
628 - } 96 + }
  97 + } finally {
  98 + ReferenceCountUtil.safeRelease(msg);
  99 + }
  100 + }
  101 +
  102 + InetSocketAddress getAddress(ChannelHandlerContext ctx) {
  103 + var address = ctx.channel().attr(TcpTransportService.ADDRESS).get();
  104 + if (address == null) {
  105 + log.trace("[{}] Received empty address.", ctx.channel().id());
  106 + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
  107 + log.trace("[{}] Going to use address: {}", ctx.channel().id(), remoteAddress);
  108 + return remoteAddress;
  109 + } else {
  110 + log.trace("[{}] Received address: {}", ctx.channel().id(), address);
  111 + }
  112 + return address;
  113 + }
  114 +
  115 +
  116 + @Override
  117 + public void channelReadComplete(ChannelHandlerContext ctx) {
  118 + ctx.flush();
  119 + }
  120 +
  121 + @Override
  122 + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  123 + log.error("[{}] Unexpected Exception", sessionId, cause);
  124 + ctx.close();
  125 + if (cause instanceof OutOfMemoryError) {
  126 + log.error("Received critical error. Going to shutdown the service.");
  127 + System.exit(1);
  128 + }
  129 + }
  130 +
  131 + @Override
  132 + public void operationComplete(Future<? super Void> future) throws Exception {
  133 + log.trace("[{}] Channel closed!", sessionId);
  134 + doDisconnect();
  135 + }
  136 +
  137 + public void doDisconnect() {
  138 + if (deviceSessionCtx.isConnected()) {
  139 + log.debug("[{}] Client disconnected!", sessionId);
  140 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  141 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  142 + if (gatewaySessionHandler != null) {
  143 + gatewaySessionHandler.onGatewayDisconnect();
  144 + }
  145 + deviceSessionCtx.setDisconnected();
  146 + }
  147 + deviceSessionCtx.release();
  148 + }
629 } 149 }
@@ -57,7 +57,7 @@ public class TcpTransportServerInitializer extends ChannelInitializer<SocketChan @@ -57,7 +57,7 @@ public class TcpTransportServerInitializer extends ChannelInitializer<SocketChan
57 // pipeline.addLast("decoder", new StringDecoder()); 57 // pipeline.addLast("decoder", new StringDecoder());
58 // pipeline.addLast("encoder", new StringEncoder()); 58 // pipeline.addLast("encoder", new StringEncoder());
59 59
60 - TcpTransportHandler handler = new TcpTransportHandler(context, sslHandler); 60 + TcpTransportHandler handler = new TcpTransportHandler(context);
61 61
62 pipeline.addLast(handler); 62 pipeline.addLast(handler);
63 ch.closeFuture().addListener(handler); 63 ch.closeFuture().addListener(handler);
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/TcpUdpDataHandler.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/udp/UdpDatagramDataHandler.java
1 -/**  
2 - * Copyright © 2016-2022 The Thingsboard Authors  
3 - *  
4 - * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file  
5 - * except in compliance with the License. You may obtain a copy of the License at  
6 - *  
7 - * <p>http://www.apache.org/licenses/LICENSE-2.0  
8 - *  
9 - * <p>Unless required by applicable law or agreed to in writing, software distributed under the  
10 - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either  
11 - * express or implied. See the License for the specific language governing permissions and  
12 - * limitations under the License.  
13 - */  
14 -package org.thingsboard.server.transport.udp;  
15 -  
16 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;  
17 -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; 1 +package org.thingsboard.server.transport.tcp;
18 2
19 import com.fasterxml.jackson.databind.JsonNode; 3 import com.fasterxml.jackson.databind.JsonNode;
20 import com.google.common.util.concurrent.FutureCallback; 4 import com.google.common.util.concurrent.FutureCallback;
@@ -27,21 +11,12 @@ import io.netty.buffer.Unpooled; @@ -27,21 +11,12 @@ import io.netty.buffer.Unpooled;
27 import io.netty.channel.ChannelFuture; 11 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelHandlerContext; 12 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.socket.DatagramPacket; 13 import io.netty.channel.socket.DatagramPacket;
30 -import io.netty.handler.codec.mqtt.*;  
31 -import io.netty.util.concurrent.Future;  
32 -import io.netty.util.concurrent.GenericFutureListener;  
33 -import java.io.IOException;  
34 -import java.net.InetSocketAddress;  
35 -import java.util.Map;  
36 -import java.util.Optional;  
37 -import java.util.UUID;  
38 -import java.util.concurrent.atomic.AtomicInteger; 14 +import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
39 import lombok.extern.slf4j.Slf4j; 15 import lombok.extern.slf4j.Slf4j;
40 import org.apache.commons.lang3.StringUtils; 16 import org.apache.commons.lang3.StringUtils;
41 import org.checkerframework.checker.nullness.qual.Nullable; 17 import org.checkerframework.checker.nullness.qual.Nullable;
42 import org.thingsboard.common.util.JacksonUtil; 18 import org.thingsboard.common.util.JacksonUtil;
43 import org.thingsboard.server.common.data.DataConstants; 19 import org.thingsboard.server.common.data.DataConstants;
44 -import org.thingsboard.server.common.data.Device;  
45 import org.thingsboard.server.common.data.DeviceProfile; 20 import org.thingsboard.server.common.data.DeviceProfile;
46 import org.thingsboard.server.common.data.DeviceTransportType; 21 import org.thingsboard.server.common.data.DeviceTransportType;
47 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration; 22 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
@@ -49,6 +24,7 @@ import org.thingsboard.server.common.data.id.DeviceId; @@ -49,6 +24,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
49 import org.thingsboard.server.common.data.rpc.RpcStatus; 24 import org.thingsboard.server.common.data.rpc.RpcStatus;
50 import org.thingsboard.server.common.data.yunteng.enums.ProtocolAnalysisEnum; 25 import org.thingsboard.server.common.data.yunteng.enums.ProtocolAnalysisEnum;
51 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils; 26 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
  27 +import org.thingsboard.server.common.data.yunteng.utils.ModbusUtils;
52 import org.thingsboard.server.common.msg.tools.TbRateLimitsException; 28 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
53 import org.thingsboard.server.common.transport.SessionMsgListener; 29 import org.thingsboard.server.common.transport.SessionMsgListener;
54 import org.thingsboard.server.common.transport.TransportService; 30 import org.thingsboard.server.common.transport.TransportService;
@@ -60,7 +36,6 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes @@ -60,7 +36,6 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes
60 import org.thingsboard.server.common.transport.service.DefaultTransportService; 36 import org.thingsboard.server.common.transport.service.DefaultTransportService;
61 import org.thingsboard.server.common.transport.service.SessionMetaData; 37 import org.thingsboard.server.common.transport.service.SessionMetaData;
62 import org.thingsboard.server.gen.transport.TransportProtos; 38 import org.thingsboard.server.gen.transport.TransportProtos;
63 -import org.thingsboard.server.transport.tcp.TcpTransportContext;  
64 import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry; 39 import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
65 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor; 40 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
66 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry; 41 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
@@ -69,36 +44,59 @@ import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService; @@ -69,36 +44,59 @@ import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService;
69 import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx; 44 import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
70 import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler; 45 import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
71 46
72 -/**  
73 - * @author Andrew Shvayka  
74 - */  
75 -@Slf4j  
76 -public class UdpDatagramDataHandler  
77 - implements GenericFutureListener<Future<? super Void>>, SessionMsgListener { 47 +import java.io.IOException;
  48 +import java.net.InetSocketAddress;
  49 +import java.util.Map;
  50 +import java.util.Optional;
  51 +import java.util.UUID;
  52 +import java.util.concurrent.atomic.AtomicInteger;
  53 +
  54 +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
  55 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
78 56
  57 +@Slf4j
  58 +public class TcpUdpDataHandler implements SessionMsgListener {
79 private final UUID sessionId; 59 private final UUID sessionId;
80 private final TcpTransportContext context; 60 private final TcpTransportContext context;
81 private final TransportService transportService; 61 private final TransportService transportService;
82 62
83 - /** 需要处理的消息队列,例如:需要下发给设备的,设备上传的。 */ 63 + private final boolean isTcp;
84 final TcpDeviceSessionCtx deviceSessionCtx; 64 final TcpDeviceSessionCtx deviceSessionCtx;
85 -  
86 volatile InetSocketAddress address; 65 volatile InetSocketAddress address;
87 -  
88 volatile TcpGatewaySessionHandler gatewaySessionHandler; 66 volatile TcpGatewaySessionHandler gatewaySessionHandler;
89 private final AtomicInteger authedCounter = new AtomicInteger(); 67 private final AtomicInteger authedCounter = new AtomicInteger();
90 68
91 - UdpDatagramDataHandler(  
92 - ChannelHandlerContext ctx, TcpTransportContext context, InetSocketAddress devAddress) { 69 + volatile UdpTransportHandler udpTransportHandler;
  70 +
  71 + public TcpUdpDataHandler(UdpTransportHandler udpTransportHandler,TcpTransportContext context, InetSocketAddress address, boolean isTcp) {
93 super(); 72 super();
94 this.sessionId = UUID.randomUUID(); 73 this.sessionId = UUID.randomUUID();
95 this.context = context; 74 this.context = context;
96 - this.address = devAddress; 75 + this.address = address;
97 this.transportService = context.getTransportService(); 76 this.transportService = context.getTransportService();
98 this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context); 77 this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  78 + this.isTcp = isTcp;
  79 + this.udpTransportHandler = udpTransportHandler;
99 } 80 }
100 81
101 - public void dealDeviceMsg(ChannelHandlerContext ctx, String msg) { 82 + TcpUdpDataHandler(
  83 + TcpTransportContext context,
  84 + UUID sessionId,
  85 + TcpDeviceSessionCtx deviceSessionCtx,
  86 + InetSocketAddress address,
  87 + TcpGatewaySessionHandler gatewaySessionHandler,
  88 + boolean isTcp) {
  89 + super();
  90 + this.sessionId = sessionId;
  91 + this.context = context;
  92 + this.address = address;
  93 + this.transportService = context.getTransportService();
  94 + this.deviceSessionCtx = deviceSessionCtx;
  95 + this.gatewaySessionHandler = gatewaySessionHandler;
  96 + this.isTcp = isTcp;
  97 + }
  98 +
  99 + public void enqueueRegularSessionMsg(ChannelHandlerContext ctx, String msg) {
102 deviceSessionCtx.setChannel(ctx); 100 deviceSessionCtx.setChannel(ctx);
103 if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) { 101 if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
104 processConnect(ctx, msg); 102 processConnect(ctx, msg);
@@ -112,137 +110,15 @@ public class UdpDatagramDataHandler @@ -112,137 +110,15 @@ public class UdpDatagramDataHandler
112 context.getMessageQueueSizePerDeviceLimit(), 110 context.getMessageQueueSizePerDeviceLimit(),
113 queueSize, 111 queueSize,
114 deviceSessionCtx.getMsgQueueSize()); 112 deviceSessionCtx.getMsgQueueSize());
115 - // ctx.close(); 113 + if (isTcp) {
  114 + ctx.close();
  115 + }
116 return; 116 return;
117 } 117 }
118 deviceSessionCtx.addToQueue(msg); 118 deviceSessionCtx.addToQueue(msg);
119 processQueueMessage( 119 processQueueMessage(
120 ctx); // Under the normal conditions the msg queue will contain 0 messages. Many messages 120 ctx); // Under the normal conditions the msg queue will contain 0 messages. Many messages
121 - // will be processed on device connect event in separate thread pool  
122 - }  
123 -  
124 - void processQueueMessage(ChannelHandlerContext ctx) {  
125 - if (!deviceSessionCtx.isConnected()) {  
126 - log.trace(  
127 - "[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}",  
128 - sessionId,  
129 - deviceSessionCtx.getDeviceId(),  
130 - deviceSessionCtx.getMsgQueueSize());  
131 - return;  
132 - }  
133 - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));  
134 - }  
135 -  
136 - private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {  
137 - if (!checkConnected(ctx, tcpMessage)) {  
138 - return;  
139 - }  
140 - deviceSessionCtx.doUpScript(  
141 - tcpMessage,  
142 - r -> {  
143 - if (gatewaySessionHandler != null) {  
144 - processGatewayDeviceMsg(ctx, r);  
145 - }  
146 - processDirectDeviceMsg(ctx, r);  
147 - });  
148 - }  
149 -  
150 - /**  
151 - * 上行脚本解析结果是否包含数据  
152 - *  
153 - * @param datas 数据集合  
154 - * @return  
155 - */  
156 - private boolean hasDatas(Map<String, Object> datas) {  
157 - if (datas == null || datas.isEmpty()) {  
158 - return false;  
159 - }  
160 - return true;  
161 - }  
162 -  
163 - private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
164 - log.trace(  
165 - "[{}][{}] Processing publish msg [{}]!",  
166 - sessionId,  
167 - deviceSessionCtx.getDeviceId(),  
168 - tcpMessage);  
169 - try {  
170 - Map<String, Object> datas = tcpMessage.getDatas();  
171 - if (hasDatas(datas)) {  
172 - datas.forEach(  
173 - (devName, param) -> {  
174 - if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {  
175 - return;  
176 - }  
177 - if (tcpMessage.getTelemetry()) {  
178 - gatewaySessionHandler.onDeviceTelemetry(  
179 - devName, tcpMessage.getRequestId(), param.toString(), ProtocolAnalysisEnum.CUSTOM);  
180 - } else {  
181 - // gatewaySessionHandler.onDeviceRpcResponse(devName,  
182 - // tcpMessage.getRequestId(), param.toString());  
183 - }  
184 - });  
185 - } else {  
186 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
187 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
188 - }  
189 - } catch (Exception e) {  
190 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
191 - // ctx.close();  
192 - }  
193 - }  
194 -  
195 - private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {  
196 - log.trace(  
197 - "[{}][{}] Processing publish msg [{}]!",  
198 - sessionId,  
199 - deviceSessionCtx.getDeviceId(),  
200 - tcpMessage);  
201 - try {  
202 - TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();  
203 - Map<String, Object> datas = tcpMessage.getDatas();  
204 - if (hasDatas(datas)) {  
205 - String dataStr = JacksonUtil.toString(datas);  
206 - if (tcpMessage.getTelemetry()) {  
207 - TransportProtos.PostTelemetryMsg postTelemetryMsg =  
208 - payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);  
209 - transportService.process(  
210 - deviceSessionCtx.getSessionInfo(),  
211 - postTelemetryMsg,  
212 - getPubAckCallback(ctx, tcpMessage));  
213 - } else {  
214 - TransportProtos.ToDeviceRpcResponseMsg postRpcMsg =  
215 - payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);  
216 - transportService.process(  
217 - deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));  
218 - }  
219 - } else {  
220 - transportService.reportActivity(deviceSessionCtx.getSessionInfo());  
221 - pushDeviceMsg(ctx, tcpMessage.getAckMsg());  
222 - }  
223 - } catch (AdaptorException e) {  
224 - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);  
225 - // ctx.close();  
226 - }  
227 - }  
228 -  
229 - private <T> TransportServiceCallback<Void> getPubAckCallback(  
230 - final ChannelHandlerContext ctx, final TcpUpEntry msg) {  
231 - return new TransportServiceCallback<>() {  
232 - @Override  
233 - public void onSuccess(Void dummy) {  
234 - log.trace("[{}] Published msg: {}", sessionId, msg);  
235 - if (StringUtils.isNotEmpty(msg.getAckMsg())) {  
236 - pushDeviceMsg(ctx, msg.getAckMsg());  
237 - }  
238 - }  
239 -  
240 - @Override  
241 - public void onError(Throwable e) {  
242 - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);  
243 - // ctx.close();  
244 - }  
245 - }; 121 + // will be processed on device connect event in separate thread pool
246 } 122 }
247 123
248 void processConnect(ChannelHandlerContext ctx, String accessToken) { 124 void processConnect(ChannelHandlerContext ctx, String accessToken) {
@@ -251,7 +127,7 @@ public class UdpDatagramDataHandler @@ -251,7 +127,7 @@ public class UdpDatagramDataHandler
251 if (DataConstants.PROVISION.equals(accessToken) 127 if (DataConstants.PROVISION.equals(accessToken)
252 || DataConstants.PROVISION.equals(accessToken)) { 128 || DataConstants.PROVISION.equals(accessToken)) {
253 deviceSessionCtx.setProvisionOnly(true); 129 deviceSessionCtx.setProvisionOnly(true);
254 - pushDeviceMsg(ctx, "CONNECTION_ACCEPTED"); 130 + pushDeviceMsg(ctx, CONNECTION_ACCEPTED.name());
255 } else { 131 } else {
256 TkScriptInvokeService.authScripts.forEach( 132 TkScriptInvokeService.authScripts.forEach(
257 id -> { 133 id -> {
@@ -278,6 +154,18 @@ public class UdpDatagramDataHandler @@ -278,6 +154,18 @@ public class UdpDatagramDataHandler
278 } 154 }
279 } 155 }
280 156
  157 + void processQueueMessage(ChannelHandlerContext ctx) {
  158 + if (!deviceSessionCtx.isConnected()) {
  159 + log.trace(
  160 + "[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}",
  161 + sessionId,
  162 + deviceSessionCtx.getDeviceId(),
  163 + deviceSessionCtx.getMsgQueueSize());
  164 + return;
  165 + }
  166 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  167 + }
  168 +
281 private void processAuthTokenConnect( 169 private void processAuthTokenConnect(
282 ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) { 170 ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
283 171
@@ -313,53 +201,14 @@ public class UdpDatagramDataHandler @@ -313,53 +201,14 @@ public class UdpDatagramDataHandler
313 } 201 }
314 } 202 }
315 203
316 - private boolean checkConnected(ChannelHandlerContext ctx, String msg) {  
317 - if (deviceSessionCtx.isConnected()) {  
318 - return true;  
319 - } else {  
320 - log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);  
321 - return false;  
322 - }  
323 - }  
324 -  
325 - private void checkGatewaySession(SessionMetaData sessionMetaData) {  
326 - TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();  
327 - try {  
328 - JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());  
329 - if (infoNode != null) {  
330 - JsonNode gatewayNode = infoNode.get("gateway");  
331 - if (gatewayNode != null && gatewayNode.asBoolean()) {  
332 - gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);  
333 - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME)  
334 - && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {  
335 - sessionMetaData.setOverwriteActivityTime(  
336 - infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());  
337 - }  
338 - }  
339 - }  
340 - } catch (IOException e) {  
341 - log.trace(  
342 - "[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);  
343 - }  
344 - }  
345 -  
346 - @Override  
347 - public void operationComplete(Future<? super Void> future) throws Exception {  
348 - log.trace("[{}] Channel closed!", sessionId);  
349 - doDisconnect();  
350 - }  
351 -  
352 - public void doDisconnect() {  
353 - if (deviceSessionCtx.isConnected()) {  
354 - log.debug("[{}] Client disconnected!", sessionId);  
355 - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);  
356 - transportService.deregisterSession(deviceSessionCtx.getSessionInfo());  
357 - if (gatewaySessionHandler != null) {  
358 - gatewaySessionHandler.onGatewayDisconnect(); 204 + private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
  205 + authedCounter.incrementAndGet();
  206 + if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
  207 + pushDeviceMsg(ctx, msg.name());
  208 + if (isTcp) {
  209 + ctx.close();
359 } 210 }
360 - deviceSessionCtx.setDisconnected();  
361 } 211 }
362 - deviceSessionCtx.release();  
363 } 212 }
364 213
365 private void onValidateDeviceResponse( 214 private void onValidateDeviceResponse(
@@ -375,7 +224,9 @@ public class UdpDatagramDataHandler @@ -375,7 +224,9 @@ public class UdpDatagramDataHandler
375 TkTcpDeviceProfileTransportConfiguration tcpConfig = 224 TkTcpDeviceProfileTransportConfiguration tcpConfig =
376 (TkTcpDeviceProfileTransportConfiguration) 225 (TkTcpDeviceProfileTransportConfiguration)
377 profile.getProfileData().getTransportConfiguration(); 226 profile.getProfileData().getTransportConfiguration();
378 - if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) { 227 + if (scriptId != null
  228 + && tcpConfig.getProtocol().equals(ProtocolAnalysisEnum.CUSTOM)
  229 + && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
379 authedCounter.incrementAndGet(); 230 authedCounter.incrementAndGet();
380 return; 231 return;
381 } 232 }
@@ -391,7 +242,7 @@ public class UdpDatagramDataHandler @@ -391,7 +242,7 @@ public class UdpDatagramDataHandler
391 public void onSuccess(Void msg) { 242 public void onSuccess(Void msg) {
392 SessionMetaData sessionMetaData = 243 SessionMetaData sessionMetaData =
393 transportService.registerAsyncSession( 244 transportService.registerAsyncSession(
394 - deviceSessionCtx.getSessionInfo(), UdpDatagramDataHandler.this); 245 + deviceSessionCtx.getSessionInfo(), TcpUdpDataHandler.this);
395 checkGatewaySession(sessionMetaData); 246 checkGatewaySession(sessionMetaData);
396 pushDeviceMsg(ctx, authEntry.getSuccess()); 247 pushDeviceMsg(ctx, authEntry.getSuccess());
397 deviceSessionCtx.setConnected(true); 248 deviceSessionCtx.setConnected(true);
@@ -407,8 +258,8 @@ public class UdpDatagramDataHandler @@ -407,8 +258,8 @@ public class UdpDatagramDataHandler
407 () -> 258 () ->
408 processQueueMessage( 259 processQueueMessage(
409 ctx)); // this callback will execute in Producer worker thread and 260 ctx)); // this callback will execute in Producer worker thread and
410 - // hard or blocking work have to be submitted to the separate  
411 - // thread. 261 + // hard or blocking work have to be submitted to the separate
  262 + // thread.
412 } 263 }
413 264
414 @Override 265 @Override
@@ -424,30 +275,236 @@ public class UdpDatagramDataHandler @@ -424,30 +275,236 @@ public class UdpDatagramDataHandler
424 } 275 }
425 } 276 }
426 277
427 - private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {  
428 - authedCounter.incrementAndGet();  
429 - if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {  
430 - pushDeviceMsg(ctx, msg.name()); 278 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  279 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  280 + try {
  281 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  282 + if (infoNode != null) {
  283 + JsonNode gatewayNode = infoNode.get("gateway");
  284 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  285 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  286 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME)
  287 + && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  288 + sessionMetaData.setOverwriteActivityTime(
  289 + infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  290 + }
  291 + }
  292 + }
  293 + } catch (IOException e) {
  294 + log.trace(
  295 + "[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  296 + }
  297 + }
  298 +
  299 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
  300 + if (!checkConnected(tcpMessage)) {
  301 + return;
  302 + }
  303 + // 判断协议类型
  304 + TkTcpDeviceProfileTransportConfiguration transportConfiguration =
  305 + (TkTcpDeviceProfileTransportConfiguration)
  306 + deviceSessionCtx.getDeviceProfile().getProfileData().getTransportConfiguration();
  307 + switch (transportConfiguration.getProtocol()) {
  308 + case CUSTOM:
  309 + customScriptProcess(ctx, tcpMessage);
  310 + break;
  311 + case MODBUS_RTU:
  312 + modbusRtuProcess(ctx, tcpMessage);
  313 + break;
  314 + }
  315 + }
  316 +
  317 + private void modbusRtuProcess(ChannelHandlerContext ctx, String tcpMessage) {
  318 + // 移除空格
  319 + String hexString = tcpMessage.trim().replaceAll(" ", "");
  320 + // 判断是否为16进制HEX
  321 + if (hexString.matches("-?[0-9a-fA-F]+")) {
  322 + boolean modbusCheckResult = ModbusUtils.isValidModbusResponseFrame(hexString);
  323 + // 判断是否满足modbus标准,满足的才处理不满足的过滤掉
  324 + if (modbusCheckResult) {
  325 + // 根据上报的地址码,判断该条消息归属于那个设备的数据
  326 + String deviceAddress = hexString.substring(0, 2);
  327 + int deviceCode = Integer.parseInt(deviceSessionCtx.getDeviceCode(), 16);
  328 + int deviceAddressCode = Integer.parseInt(deviceAddress, 16);
  329 + if (gatewaySessionHandler != null && deviceCode != deviceAddressCode) {
  330 + gatewaySessionHandler.onDeviceTelemetry(
  331 + deviceAddress, null, hexString, ProtocolAnalysisEnum.MODBUS_RTU);
  332 + } else {
  333 + if (deviceCode == deviceAddressCode) {
  334 + processCustomDirectDeviceMsg(
  335 + ctx,
  336 + deviceSessionCtx
  337 + .getPayloadAdaptor()
  338 + .convertModbusHexToPublish(deviceSessionCtx, hexString)
  339 + .get());
  340 + }
  341 + }
  342 + }
  343 + }
  344 + }
  345 +
  346 + private void customScriptProcess(ChannelHandlerContext ctx, String tcpMessage) {
  347 + deviceSessionCtx.doUpScript(
  348 + tcpMessage,
  349 + r -> {
  350 + // 根据网关上报的消息,判断消息的来源是否为网关子设备,判断依据deviceCode即设备地址码或设备标识符
  351 + if (gatewaySessionHandler != null && checkMessageIsFromSensor(r.getDatas())) {
  352 + processCustomGatewayDeviceMsg(ctx, r);
  353 + } else {
  354 + processCustomDirectDeviceMsg(ctx, r);
  355 + }
  356 + });
  357 + }
  358 +
  359 + private boolean checkMessageIsFromSensor(Map<String, Object> dataMap) {
  360 + boolean isSensorMessage = true;
  361 + String gateWayDeviceCode = deviceSessionCtx.getDeviceCode();
  362 + for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
  363 + if (entry.getKey().equals(gateWayDeviceCode)) {
  364 + isSensorMessage = false;
  365 + break;
  366 + }
  367 + }
  368 + return isSensorMessage;
  369 + }
  370 +
  371 + private boolean checkConnected(String msg) {
  372 + if (deviceSessionCtx.isConnected()) {
  373 + return true;
  374 + } else {
  375 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
  376 + return false;
  377 + }
  378 + }
  379 +
  380 + private void processCustomGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  381 + log.trace(
  382 + "[{}][{}] Processing publish msg [{}]!",
  383 + sessionId,
  384 + deviceSessionCtx.getDeviceId(),
  385 + tcpMessage);
  386 + try {
  387 + Map<String, Object> datas = tcpMessage.getDatas();
  388 + if (hasDatas(datas)) {
  389 + datas.forEach(
  390 + (devName, param) -> {
  391 + if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
  392 + return;
  393 + }
  394 + if (tcpMessage.getTelemetry()) {
  395 + gatewaySessionHandler.onDeviceTelemetry(
  396 + devName,
  397 + tcpMessage.getRequestId(),
  398 + param.toString(),
  399 + ProtocolAnalysisEnum.CUSTOM);
  400 + } else {
  401 + // gatewaySessionHandler.onDeviceRpcResponse(devName,
  402 + // tcpMessage.getRequestId(), param.toString());
  403 + }
  404 + });
  405 + } else {
  406 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  407 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  408 + }
  409 + } catch (Exception e) {
  410 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  411 + if (isTcp) {
  412 + ctx.close();
  413 + }
  414 + }
  415 + }
431 416
432 - // ctx.close(); 417 + private void processCustomDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  418 + log.trace(
  419 + "[{}][{}] Processing publish msg [{}]!",
  420 + sessionId,
  421 + deviceSessionCtx.getDeviceId(),
  422 + tcpMessage);
  423 + try {
  424 + TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
  425 + Map<String, Object> datas = tcpMessage.getDatas();
  426 + if (hasDatas(datas)) {
  427 + String dataStr = JacksonUtil.toString(datas);
  428 + if (tcpMessage.getTelemetry()) {
  429 + TransportProtos.PostTelemetryMsg postTelemetryMsg =
  430 + payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
  431 + transportService.process(
  432 + deviceSessionCtx.getSessionInfo(),
  433 + postTelemetryMsg,
  434 + getPubAckCallback(ctx, tcpMessage));
  435 + } else {
  436 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg =
  437 + payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  438 + transportService.process(
  439 + deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
  440 + }
  441 + } else {
  442 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  443 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  444 + }
  445 + } catch (AdaptorException e) {
  446 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  447 + if (isTcp) {
  448 + ctx.close();
  449 + }
433 } 450 }
434 } 451 }
435 452
  453 + private <T> TransportServiceCallback<Void> getPubAckCallback(
  454 + final ChannelHandlerContext ctx, final TcpUpEntry msg) {
  455 + return new TransportServiceCallback<>() {
  456 + @Override
  457 + public void onSuccess(Void dummy) {
  458 + log.trace("[{}] Published msg: {}", sessionId, msg);
  459 + if (StringUtils.isNotEmpty(msg.getAckMsg())) {
  460 + pushDeviceMsg(ctx, msg.getAckMsg());
  461 + }
  462 + }
  463 +
  464 + @Override
  465 + public void onError(Throwable e) {
  466 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  467 + if (isTcp) {
  468 + ctx.close();
  469 + }
  470 + }
  471 + };
  472 + }
  473 +
  474 + /**
  475 + * 上行脚本解析结果是否包含数据
  476 + *
  477 + * @param datas 数据集合
  478 + * @return
  479 + */
  480 + private boolean hasDatas(Map<String, Object> datas) {
  481 + if (datas == null || datas.isEmpty()) {
  482 + return false;
  483 + }
  484 + return true;
  485 + }
  486 +
436 @Override 487 @Override
437 - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {} 488 + public void onGetAttributesResponse(
  489 + TransportProtos.GetAttributeResponseMsg getAttributesResponse) {}
438 490
439 @Override 491 @Override
440 public void onAttributeUpdate( 492 public void onAttributeUpdate(
441 - UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {} 493 + UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotification) {}
442 494
443 @Override 495 @Override
444 public void onRemoteSessionCloseCommand( 496 public void onRemoteSessionCloseCommand(
445 UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { 497 UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
446 - log.trace(  
447 - "[{}] Received the remote command to close the session: {}", 498 + log.info(
  499 + "isTcp device[{}] ,[{}] Received the remote command to close the session: {}",isTcp,
448 sessionId, 500 sessionId,
449 sessionCloseNotification.getMessage()); 501 sessionCloseNotification.getMessage());
450 - // deviceSessionCtx.getChannel().close(); 502 + if(isTcp){
  503 + deviceSessionCtx.getChannel().close();
  504 + }else{
  505 + //UDP只能移除缓存
  506 + udpTransportHandler.remove(address);
  507 + }
451 } 508 }
452 509
453 @Override 510 @Override
@@ -497,14 +554,24 @@ public class UdpDatagramDataHandler @@ -497,14 +554,24 @@ public class UdpDatagramDataHandler
497 } 554 }
498 555
499 @Override 556 @Override
500 - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) { 557 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
501 log.debug("[{}] 服务端响应设备的RPC请求", sessionId); 558 log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
502 } 559 }
503 560
  561 + @Override
  562 + public void onDeviceDeleted(DeviceId deviceId) {
  563 + context.onAuthFailure(address);
  564 + if (isTcp) {
  565 + ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
  566 + ctx.close();
  567 + }
  568 + }
  569 +
504 /** 570 /**
505 * 往设备推送消息 571 * 往设备推送消息
506 * 572 *
507 - * @param message 573 + * @param ctx 上下文
  574 + * @param message 消息内容
508 * @return 575 * @return
509 */ 576 */
510 private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) { 577 private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
@@ -517,28 +584,10 @@ public class UdpDatagramDataHandler @@ -517,28 +584,10 @@ public class UdpDatagramDataHandler
517 message = ByteUtils.stringEncodeToHex(message); 584 message = ByteUtils.stringEncodeToHex(message);
518 } 585 }
519 buff.writeBytes(ByteBufUtil.decodeHexDump(message)); 586 buff.writeBytes(ByteBufUtil.decodeHexDump(message));
  587 + if (isTcp) {
  588 + return ctx.writeAndFlush(buff);
  589 + }
520 DatagramPacket packet = new DatagramPacket(buff, address); 590 DatagramPacket packet = new DatagramPacket(buff, address);
521 return ctx.writeAndFlush(packet); 591 return ctx.writeAndFlush(packet);
522 } 592 }
523 -  
524 - @Override  
525 - public void onDeviceProfileUpdate(  
526 - TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {  
527 - deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);  
528 - }  
529 -  
530 - @Override  
531 - public void onDeviceUpdate(  
532 - TransportProtos.SessionInfoProto sessionInfo,  
533 - Device device,  
534 - Optional<DeviceProfile> deviceProfileOpt) {  
535 - deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);  
536 - }  
537 -  
538 - @Override  
539 - public void onDeviceDeleted(DeviceId deviceId) {  
540 - context.onAuthFailure(address);  
541 - ChannelHandlerContext ctx = deviceSessionCtx.getChannel();  
542 - // ctx.close();  
543 - }  
544 } 593 }
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/UdpTransportHandler.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/udp/UdpTransportHandler.java
@@ -13,29 +13,20 @@ @@ -13,29 +13,20 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.transport.udp;  
17 -  
18 -  
19 -  
20 -import io.netty.buffer.ByteBuf;  
21 -import io.netty.buffer.Unpooled;  
22 -import io.netty.channel.ChannelFuture; 16 +package org.thingsboard.server.transport.tcp;
23 import io.netty.channel.ChannelHandlerContext; 17 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter; 18 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.DatagramPacket; 19 import io.netty.channel.socket.DatagramPacket;
26 -import io.netty.handler.codec.mqtt.*;  
27 import io.netty.util.ReferenceCountUtil; 20 import io.netty.util.ReferenceCountUtil;
28 import io.netty.util.concurrent.Future; 21 import io.netty.util.concurrent.Future;
29 -import io.netty.util.concurrent.GenericFutureListener;  
30 import java.net.InetSocketAddress; 22 import java.net.InetSocketAddress;
31 -import java.util.Map;  
32 import java.util.UUID; 23 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap; 24 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap; 25 import java.util.concurrent.ConcurrentMap;
  26 +
  27 +import io.netty.util.concurrent.GenericFutureListener;
35 import lombok.extern.slf4j.Slf4j; 28 import lombok.extern.slf4j.Slf4j;
36 -import org.apache.commons.lang3.StringUtils;  
37 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils; 29 import org.thingsboard.server.common.data.yunteng.utils.ByteUtils;
38 -import org.thingsboard.server.transport.tcp.TcpTransportContext;  
39 import org.thingsboard.server.transport.tcp.util.ByteBufUtils; 30 import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
40 31
41 /** 32 /**
@@ -48,10 +39,11 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -48,10 +39,11 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
48 private final TcpTransportContext context; 39 private final TcpTransportContext context;
49 40
50 41
  42 +
51 /** 43 /**
52 * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。 44 * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
53 */ 45 */
54 - private final ConcurrentMap<InetSocketAddress, UdpDatagramDataHandler> deviceSessiones = new ConcurrentHashMap<>(); 46 + private final ConcurrentMap<InetSocketAddress, TcpUdpDataHandler> deviceSessiones = new ConcurrentHashMap<>();
55 47
56 48
57 UdpTransportHandler(TcpTransportContext context) { 49 UdpTransportHandler(TcpTransportContext context) {
@@ -62,19 +54,21 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -62,19 +54,21 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
62 54
63 @Override 55 @Override
64 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 56 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  57 + //启动时注册
65 super.channelRegistered(ctx); 58 super.channelRegistered(ctx);
66 context.channelRegistered(); 59 context.channelRegistered();
67 } 60 }
68 61
69 @Override 62 @Override
70 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { 63 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  64 + //停止时注销
71 super.channelUnregistered(ctx); 65 super.channelUnregistered(ctx);
72 context.channelUnregistered(); 66 context.channelUnregistered();
73 } 67 }
74 68
75 @Override 69 @Override
76 public void channelRead(ChannelHandlerContext ctx, Object msg) { 70 public void channelRead(ChannelHandlerContext ctx, Object msg) {
77 - log.error("【{}】 Processing msg: 【{}】", sessionId, msg); 71 + log.debug("【{}】 UDP channelRead Processing msg: 【{}】", sessionId, msg);
78 try { 72 try {
79 if(!context.isReady()){ 73 if(!context.isReady()){
80 return; 74 return;
@@ -84,9 +78,9 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -84,9 +78,9 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
84 InetSocketAddress address = getAddress(message,ctx); 78 InetSocketAddress address = getAddress(message,ctx);
85 byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content()); 79 byte[] byteMsg = ByteBufUtils.buf2Bytes(message.content());
86 String msgStr = ByteUtils.bytesToStr(byteMsg); 80 String msgStr = ByteUtils.bytesToStr(byteMsg);
87 - log.error("UDP服务【{}】收到来自【{}】数据【{}】", sessionId, address, msgStr);  
88 - UdpDatagramDataHandler dataHandler = deviceSessiones.computeIfAbsent(address,k->new UdpDatagramDataHandler(ctx,context,address));  
89 - dataHandler.dealDeviceMsg(ctx,msgStr); 81 + log.debug("UDP服务【{}】收到来自【{}】数据【{}】", sessionId, address, msgStr);
  82 + TcpUdpDataHandler dataHandler = deviceSessiones.computeIfAbsent(address, k->new TcpUdpDataHandler(this,context,address,false));
  83 + dataHandler.enqueueRegularSessionMsg(ctx,msgStr);
90 } else { 84 } else {
91 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName()); 85 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
92 ctx.close(); 86 ctx.close();
@@ -109,30 +103,6 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -109,30 +103,6 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
109 return address; 103 return address;
110 } 104 }
111 105
112 -  
113 -  
114 -  
115 -  
116 -  
117 -  
118 - /**  
119 - * 上行脚本解析结果是否包含数据  
120 - * @param datas 数据集合  
121 - * @return  
122 - */  
123 - private boolean hasDatas(Map<String, Object> datas) {  
124 - if (datas == null || datas.isEmpty()) {  
125 - return false;  
126 - }  
127 - return true;  
128 - }  
129 -  
130 -  
131 -  
132 -  
133 -  
134 -  
135 -  
136 @Override 106 @Override
137 public void channelReadComplete(ChannelHandlerContext ctx) { 107 public void channelReadComplete(ChannelHandlerContext ctx) {
138 ctx.flush(); 108 ctx.flush();
@@ -148,34 +118,13 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils; @@ -148,34 +118,13 @@ import org.thingsboard.server.transport.tcp.util.ByteBufUtils;
148 } 118 }
149 } 119 }
150 120
151 -  
152 -  
153 @Override 121 @Override
154 - public void operationComplete(Future<? super Void> future) throws Exception {  
155 - log.trace("[{}] Channel closed!", sessionId); 122 + public void operationComplete(Future<? super Void> future){
  123 + log.debug("[{}] Channel closed!", sessionId);
156 } 124 }
157 125
158 -  
159 -  
160 -  
161 -  
162 - /**  
163 - * 往设备推送消息  
164 - *  
165 - * @param message  
166 - * @return  
167 - */  
168 - private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {  
169 - if(StringUtils.isBlank(message)){  
170 - return null;  
171 - }  
172 - ByteBuf buff = Unpooled.buffer();  
173 - if(!message.matches("-?[0-9a-fA-F]+")){  
174 - //不满足16进制将字符串转为16进制  
175 - message = ByteUtils.stringEncodeToHex(message);  
176 - }  
177 - buff.writeBytes(ByteUtils.hexToByteArray(message));  
178 - return ctx.writeAndFlush(buff); 126 + public void remove( InetSocketAddress address){
  127 + log.debug("remove remote device address 【{}】",address);
  128 + this.deviceSessiones.remove(address);
179 } 129 }
180 -  
181 } 130 }
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/UdpTransportServerInitializer.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/udp/UdpTransportServerInitializer.java
@@ -13,13 +13,11 @@ @@ -13,13 +13,11 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.transport.udp; 16 +package org.thingsboard.server.transport.tcp;
17 17
18 import io.netty.channel.Channel; 18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelInitializer; 19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.ChannelPipeline; 20 import io.netty.channel.ChannelPipeline;
21 -import io.netty.handler.ssl.SslHandler;  
22 -import org.thingsboard.server.transport.tcp.TcpTransportContext;  
23 21
24 /** 22 /**
25 * @author Andrew Shvayka 23 * @author Andrew Shvayka
@@ -27,21 +25,14 @@ import org.thingsboard.server.transport.tcp.TcpTransportContext; @@ -27,21 +25,14 @@ import org.thingsboard.server.transport.tcp.TcpTransportContext;
27 public class UdpTransportServerInitializer extends ChannelInitializer<Channel> { 25 public class UdpTransportServerInitializer extends ChannelInitializer<Channel> {
28 26
29 private final TcpTransportContext context; 27 private final TcpTransportContext context;
30 - private final boolean sslEnabled;  
31 28
32 public UdpTransportServerInitializer(TcpTransportContext context, boolean sslEnabled) { 29 public UdpTransportServerInitializer(TcpTransportContext context, boolean sslEnabled) {
33 this.context = context; 30 this.context = context;
34 - this.sslEnabled = sslEnabled;  
35 } 31 }
36 -  
37 @Override 32 @Override
38 public void initChannel(Channel ch) { 33 public void initChannel(Channel ch) {
39 ChannelPipeline pipeline = ch.pipeline(); 34 ChannelPipeline pipeline = ch.pipeline();
40 - SslHandler sslHandler = null;  
41 -  
42 UdpTransportHandler handler = new UdpTransportHandler(context); 35 UdpTransportHandler handler = new UdpTransportHandler(context);
43 -// NettyUdpServerHandler handler = new NettyUdpServerHandler();  
44 -  
45 pipeline.addLast(handler); 36 pipeline.addLast(handler);
46 ch.closeFuture().addListener(handler); 37 ch.closeFuture().addListener(handler);
47 } 38 }
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/UdpTransportService.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/udp/UdpTransportService.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.transport.udp; 16 +package org.thingsboard.server.transport.tcp;
17 17
18 import io.netty.bootstrap.Bootstrap; 18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.buffer.PooledByteBufAllocator; 19 import io.netty.buffer.PooledByteBufAllocator;
@@ -31,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -31,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
31 import org.springframework.stereotype.Service; 31 import org.springframework.stereotype.Service;
32 import org.thingsboard.server.common.data.DataConstants; 32 import org.thingsboard.server.common.data.DataConstants;
33 import org.thingsboard.server.common.data.TbTransportService; 33 import org.thingsboard.server.common.data.TbTransportService;
34 -import org.thingsboard.server.transport.tcp.TcpTransportContext;  
35 34
36 import javax.annotation.PostConstruct; 35 import javax.annotation.PostConstruct;
37 import javax.annotation.PreDestroy; 36 import javax.annotation.PreDestroy;
@@ -81,7 +80,7 @@ public class UdpTransportService implements TbTransportService { @@ -81,7 +80,7 @@ public class UdpTransportService implements TbTransportService {
81 log.info("Setting resource leak detector level to {}", leakDetectorLevel); 80 log.info("Setting resource leak detector level to {}", leakDetectorLevel);
82 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); 81 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
83 82
84 - log.info("Starting TCP transport..."); 83 + log.info("Starting UDP transport...");
85 bossGroup = new NioEventLoopGroup(bossGroupThreadCount); 84 bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
86 Bootstrap b = new Bootstrap(); 85 Bootstrap b = new Bootstrap();
87 b.group(bossGroup) 86 b.group(bossGroup)
@@ -98,12 +97,12 @@ public class UdpTransportService implements TbTransportService { @@ -98,12 +97,12 @@ public class UdpTransportService implements TbTransportService {
98 .option(ChannelOption.SO_BROADCAST, true); 97 .option(ChannelOption.SO_BROADCAST, true);
99 sslServerChannel = b.bind(sslHost, sslPort).sync().channel(); 98 sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
100 } 99 }
101 - log.info("TCP transport started!"); 100 + log.info("UDP transport started!");
102 } 101 }
103 102
104 @PreDestroy 103 @PreDestroy
105 public void shutdown() throws InterruptedException { 104 public void shutdown() throws InterruptedException {
106 - log.info("Stopping TCP transport!"); 105 + log.info("Stopping UDP transport!");
107 try { 106 try {
108 serverChannel.close().sync(); 107 serverChannel.close().sync();
109 if (sslEnabled) { 108 if (sslEnabled) {
@@ -112,7 +111,7 @@ public class UdpTransportService implements TbTransportService { @@ -112,7 +111,7 @@ public class UdpTransportService implements TbTransportService {
112 } finally { 111 } finally {
113 bossGroup.shutdownGracefully(); 112 bossGroup.shutdownGracefully();
114 } 113 }
115 - log.info("TCP transport stopped!"); 114 + log.info("UDP transport stopped!");
116 } 115 }
117 116
118 @Override 117 @Override