Commit ca65e3fb091438e06bbe1a288138ed23509d7642

Authored by 芯火源
1 parent 85719f12

refactor: TCP会话信息调整

common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/adaptors/TcpUpEntry.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/session/TCPMessage.java
1 -package org.thingsboard.server.transport.tcp.session; 1 +package org.thingsboard.server.transport.tcp.adaptors;
2 2
3 -import io.netty.handler.codec.mqtt.MqttMessageType;  
4 import lombok.Data; 3 import lombok.Data;
5 4
6 import java.io.Serializable; 5 import java.io.Serializable;
7 6
8 @Data 7 @Data
9 -public class TCPMessage implements Serializable { 8 +public class TcpUpEntry implements Serializable {
10 9
11 /**消息ID,用于请求与响应的匹配,例如:modbus由地址码和功能码组成。*/ 10 /**消息ID,用于请求与响应的匹配,例如:modbus由地址码和功能码组成。*/
12 private String requestId; 11 private String requestId;
@@ -14,15 +13,18 @@ public class TCPMessage implements Serializable { @@ -14,15 +13,18 @@ public class TCPMessage implements Serializable {
14 13
15 private String message; 14 private String message;
16 15
17 - /**数据主题,例如:modbus的功能码等。*/ 16 + /**数据主题,例如:协议的功能码等。*/
18 private String topic; 17 private String topic;
19 18
20 /**设备地址码,例如:modbus的地址吗*/ 19 /**设备地址码,例如:modbus的地址吗*/
21 private String deviceCode; 20 private String deviceCode;
22 21
  22 + /**从设备(网关子设备)唯一标识符*/
  23 + private String slaveCode;
23 24
24 25
25 - public TCPMessage(String message){ 26 +
  27 + public TcpUpEntry(String message){
26 this.message = message; 28 this.message = message;
27 } 29 }
28 30
@@ -21,6 +21,7 @@ import lombok.Getter; @@ -21,6 +21,7 @@ import lombok.Getter;
21 import lombok.Setter; 21 import lombok.Setter;
22 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
23 import org.thingsboard.server.transport.tcp.TcpTransportContext; 23 import org.thingsboard.server.transport.tcp.TcpTransportContext;
  24 +import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
24 25
25 import java.util.UUID; 26 import java.util.UUID;
26 import java.util.concurrent.ConcurrentLinkedQueue; 27 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,7 +43,7 @@ public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext { @@ -42,7 +43,7 @@ public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext {
42 43
43 private final AtomicInteger msgIdSeq = new AtomicInteger(0); 44 private final AtomicInteger msgIdSeq = new AtomicInteger(0);
44 45
45 - private final ConcurrentLinkedQueue<TCPMessage> msgQueue = new ConcurrentLinkedQueue<>(); 46 + private final ConcurrentLinkedQueue<TcpUpEntry> msgQueue = new ConcurrentLinkedQueue<>();
46 47
47 @Getter 48 @Getter
48 private final Lock msgQueueProcessorLock = new ReentrantLock(); 49 private final Lock msgQueueProcessorLock = new ReentrantLock();
@@ -73,17 +74,17 @@ public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext { @@ -73,17 +74,17 @@ public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext {
73 74
74 75
75 76
76 - public void addToQueue(TCPMessage msg) { 77 + public void addToQueue(TcpUpEntry msg) {
77 msgQueueSize.incrementAndGet(); 78 msgQueueSize.incrementAndGet();
78 ReferenceCountUtil.retain(msg); 79 ReferenceCountUtil.retain(msg);
79 msgQueue.add(msg); 80 msgQueue.add(msg);
80 } 81 }
81 82
82 - public void tryProcessQueuedMsgs(Consumer<TCPMessage> msgProcessor) { 83 + public void tryProcessQueuedMsgs(Consumer<TcpUpEntry> msgProcessor) {
83 while (!msgQueue.isEmpty()) { 84 while (!msgQueue.isEmpty()) {
84 if (msgQueueProcessorLock.tryLock()) { 85 if (msgQueueProcessorLock.tryLock()) {
85 try { 86 try {
86 - TCPMessage msg; 87 + TcpUpEntry msg;
87 while ((msg = msgQueue.poll()) != null) { 88 while ((msg = msgQueue.poll()) != null) {
88 try { 89 try {
89 msgQueueSize.decrementAndGet(); 90 msgQueueSize.decrementAndGet();
1 -/**  
2 - * Copyright © 2016-2022 The Thingsboard Authors  
3 - * <p>  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p>  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 package org.thingsboard.server.transport.tcp.session; 1 package org.thingsboard.server.transport.tcp.session;
17 2
18 import com.fasterxml.jackson.core.JsonProcessingException; 3 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -23,16 +8,13 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -23,16 +8,13 @@ import org.thingsboard.server.common.data.DeviceProfile;
23 import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; 8 import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
24 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration; 9 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
25 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants; 10 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
26 -import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;  
27 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; 11 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
28 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 12 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
29 -import org.thingsboard.server.transport.tcp.script.TkScriptFactory;  
30 import org.thingsboard.server.gen.transport.TransportProtos; 13 import org.thingsboard.server.gen.transport.TransportProtos;
31 import org.thingsboard.server.transport.tcp.TcpTransportContext; 14 import org.thingsboard.server.transport.tcp.TcpTransportContext;
32 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor; 15 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
33 16
34 import java.util.UUID; 17 import java.util.UUID;
35 -import java.util.concurrent.ExecutionException;  
36 18
37 /** 19 /**
38 * @author Andrew Shvayka 20 * @author Andrew Shvayka
@@ -43,21 +25,14 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -43,21 +25,14 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
43 @Getter 25 @Getter
44 private final TcpTransportContext context; 26 private final TcpTransportContext context;
45 27
46 - private volatile String telemetryTopicFilter ;  
47 - private volatile String attributesTopicFilter;  
48 - private volatile String toDeviceRpcResponseTopicFilter;  
49 - @Getter  
50 - private volatile TcpDataTypeEnum payloadType = TcpDataTypeEnum.HEX;  
51 -  
52 - private volatile TcpTransportAdaptor adaptor; 28 + private volatile String authScriptId ;
  29 + private volatile String telemetryScriptId ;
  30 + private volatile String rpcScriptId;
53 31
54 /**设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符*/ 32 /**设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符*/
55 @Getter 33 @Getter
56 - private volatile String deviceCode = "55";  
57 -  
58 -  
59 - @Getter  
60 - private UUID scriptId; 34 + private volatile String deviceCode;
  35 + private volatile TcpTransportAdaptor adaptor;
61 36
62 public TcpDeviceWareSessionContext(UUID sessionId, TcpTransportContext context) { 37 public TcpDeviceWareSessionContext(UUID sessionId, TcpTransportContext context) {
63 super(sessionId); 38 super(sessionId);
@@ -66,16 +41,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -66,16 +41,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
66 } 41 }
67 42
68 43
69 - public boolean isDeviceTelemetryTopic(String topicName) {  
70 - return telemetryTopicFilter.equals(topicName);  
71 - }  
72 44
73 - public boolean isDeviceAttributesTopic(String topicName) {  
74 - return attributesTopicFilter.equals(topicName);  
75 - }  
76 - public boolean isToDeviceRpcResponseTopic(String topicName) {  
77 - return toDeviceRpcResponseTopicFilter.equals(topicName);  
78 - }  
79 45
80 public TcpTransportAdaptor getPayloadAdaptor() { 46 public TcpTransportAdaptor getPayloadAdaptor() {
81 return this.adaptor; 47 return this.adaptor;
@@ -85,15 +51,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -85,15 +51,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
85 @Override 51 @Override
86 public void setDeviceInfo(TransportDeviceInfo deviceInfo) { 52 public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
87 super.setDeviceInfo(deviceInfo); 53 super.setDeviceInfo(deviceInfo);
88 - try {  
89 - JsonNode additionalInfo = context.getMapper().readTree(deviceInfo.getAdditionalInfo());  
90 - if(additionalInfo !=null && additionalInfo.has(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED)){  
91 - deviceCode = additionalInfo.get(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED).asText();  
92 - }  
93 - } catch (JsonProcessingException e) {  
94 - log.trace("[{}][{}] Failed to fetch device additional info", sessionId, deviceInfo.getDeviceName(), e);  
95 - }  
96 - 54 + deviceCode = deviceInfo.getDeviceName();
97 } 55 }
98 56
99 @Override 57 @Override
@@ -112,24 +70,9 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -112,24 +70,9 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
112 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); 70 DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
113 71
114 TkTcpDeviceProfileTransportConfiguration tcpConfiguration = (TkTcpDeviceProfileTransportConfiguration) transportConfiguration; 72 TkTcpDeviceProfileTransportConfiguration tcpConfiguration = (TkTcpDeviceProfileTransportConfiguration) transportConfiguration;
115 - payloadType = tcpConfiguration.getDataFormat();  
116 - if (TcpDataTypeEnum.ASCII.equals(payloadType)) {  
117 - payloadType = TcpDataTypeEnum.ASCII;  
118 - } else {  
119 - payloadType = TcpDataTypeEnum.HEX;  
120 - }  
121 - this.attributesTopicFilter = tcpConfiguration.getAttributesTopic();  
122 - this.telemetryTopicFilter = tcpConfiguration.getTelemetryTopic();  
123 - this.toDeviceRpcResponseTopicFilter = tcpConfiguration.getRpcTopic();  
124 - String scriptBody = tcpConfiguration.getScriptText();  
125 - try {  
126 - this.scriptId = this.adaptor.getJsScriptEngineFunctionId(scriptBody==null? TkScriptFactory.INCLUD_ORIGINAL_DATA:scriptBody);  
127 - } catch (ExecutionException e) {  
128 - log.warn("设备配置【{}】的脚本【{}】解析异常",deviceProfile.getSearchText(),scriptBody);  
129 - throw new RuntimeException(e);  
130 - } catch (InterruptedException e) {  
131 - throw new RuntimeException(e);  
132 - } 73 + this.authScriptId = tcpConfiguration.getAuthScriptId();
  74 + this.telemetryScriptId = tcpConfiguration.getUpScriptId();
  75 + this.rpcScriptId = tcpConfiguration.getDownScriptId();
133 } 76 }
134 77
135 } 78 }