Commit c5a655616700a293cd809aca088cff441ab6fd38

Authored by xp.Huang
2 parents 195c4c9b 676891fd

Merge branch '20221104' into 'master'

refactor: TCP对象优化

See merge request huang/thingsboard3.3.2!140
Showing 27 changed files with 1307 additions and 644 deletions
... ... @@ -71,89 +71,63 @@ public class YtDeviceController extends BaseController {
71 71 deviceService.validateFormdata(currentTenantId, deviceDTO);
72 72
73 73
74   - DeviceDTO newDeviceDTO = null;
75   - boolean isIncludeRelation = false;
76 74
  75 +
  76 + /**网关是否有效*/
77 77 String gatewayId = deviceDTO.getGatewayId();
78   - DeviceDTO gateWayDevice = null;
  78 + DeviceDTO gateWay = null;
79 79 if (StringUtils.isNotEmpty(gatewayId)) {
80   - gateWayDevice =
81   - deviceService.checkDeviceByTenantIdAndDeviceId(
82   - getCurrentUser().getCurrentTenantId(), gatewayId);
83   -
84   - // 第一步判断该网关设备是否存在于该租户下面
85   - if (null == gateWayDevice) {
  80 + gateWay = deviceService.checkDeviceByTenantIdAndDeviceId(getCurrentUser().getCurrentTenantId(), gatewayId);
  81 + if (null == gateWay) {
86 82 throw new YtDataValidationException(
87 83 ErrorMessage.DEVICE_NOT_EXISTENCE_IN_TENANT.getMessage());
88 84 }
89 85 }
90 86
91   - /** 子设备编辑时,TB中已经存在关联关系则值更新设备表信息 */
92   - String selfTbDeviceIdStr = deviceDTO.getTbDeviceId();
93   - if (selfTbDeviceIdStr != null
94   - && deviceDTO.getDeviceType().equals(DeviceTypeEnum.SENSOR)
95   - && StringUtils.isNotEmpty(gatewayId)) {
96   -
97   - // 第二步判断网关子设备是否已关联到网关设备
98   - EntityId entityId =
99   - EntityIdFactory.getByTypeAndId(
100   - "DEVICE", selfTbDeviceIdStr); // gateWayDevice.getTbDeviceId()
101   - List<EntityRelationInfo> list =
102   - relationService.findInfoByTo(getTenantId(), entityId, RelationTypeGroup.COMMON).get();
103   -
104   - for (EntityRelationInfo entityRelationInfo : list) {
105   - if (entityRelationInfo.getTo().getId().equals(selfTbDeviceIdStr)
106   - && entityRelationInfo.getFrom().getId().equals(gatewayId)) {
107   - deviceDTO.setTbDeviceId(entityRelationInfo.getTo().toString());
108   - newDeviceDTO =
109   - deviceService.insertOrUpdate(getCurrentUser().getCurrentTenantId(), deviceDTO);
110   - isIncludeRelation = true;
111   - break;
112   - }
113   - }
114   - }
115 87
116   - /** 需要更新设备表和关联关系表 */
117   - if (!isIncludeRelation) {
118   - Device tbDevice = buildTbDeviceFromDeviceDTO(getCurrentUser().getTenantId(), deviceDTO);
119   -
120   - DeviceId selfTbId = updateTbDevice(tbDevice, deviceDTO.getDeviceToken());
121   - selfTbDeviceIdStr = selfTbId.getId().toString();
122   - deviceDTO.setTbDeviceId(selfTbDeviceIdStr);
123   - newDeviceDTO = deviceService.insertOrUpdate(getCurrentUser().getCurrentTenantId(), deviceDTO);
124   -
125   - if (deviceDTO.getDeviceType().equals(DeviceTypeEnum.SENSOR)
126   - && StringUtils.isNotEmpty(selfTbDeviceIdStr)) {
127   - // 删除原來的关联关系
128   - List<DeviceDTO> list =
129   - deviceService.findGateWayDeviceByTbDeviceId(
130   - getCurrentUser().getCurrentTenantId(), selfTbDeviceIdStr);
131   - if (null != list && list.size() > 0) {
132   - DeviceId form = new DeviceId(UUID.fromString(list.get(0).getTbDeviceId()));
133   - EntityRelation relation =
134   - new EntityRelation(
135   - form, selfTbId, FastIotConstants.Relation.relationType, RelationTypeGroup.COMMON);
136   - boolean found =
137   - relationService.deleteRelation(
138   - getTenantId(),
139   - form,
140   - selfTbId,
141   - FastIotConstants.Relation.relationType,
142   - RelationTypeGroup.COMMON);
143   -
144   - if (!found) {
145   - throw new ThingsboardException(
146   - "Requested item wasn't found!", ThingsboardErrorCode.ITEM_NOT_FOUND);
147   - }
148   - sendRelationNotificationMsg(
149   - getTenantId(), relation, EdgeEventActionType.RELATION_DELETED);
150   - }
151 88
152   - if (gateWayDevice != null) {
153   - addRelation(getTenantId(), gateWayDevice.getTbDeviceId(), selfTbDeviceIdStr);
  89 +
  90 + /** 子设备编辑业务逻辑: 设备地址码必须同时设置到附加信息字段内。
  91 + * 1、新增或编辑网关和直连设备
  92 + * 2、新增网关子设备
  93 + * 3、编辑网关子设备时,关联关系已存在(未切换网关)
  94 + * 4、编辑网关子设备时,关联关系不存在(切换网关)
  95 + * 5、编辑网关子设备时,修改其它设备信息,例如:设备名称等。
  96 + * */
  97 + Device tbDevice = buildTbDeviceFromDeviceDTO(getCurrentUser().getTenantId(), deviceDTO);
  98 + DeviceId selfTbId = updateTbDevice(tbDevice, deviceDTO.getDeviceToken());
  99 + String selfTbIdStr = selfTbId.getId().toString();
  100 + deviceDTO.setTbDeviceId(selfTbIdStr);
  101 +
  102 +
  103 + if (selfTbIdStr != null
  104 + && deviceDTO.getDeviceType().equals(DeviceTypeEnum.SENSOR)
  105 + && StringUtils.isNotEmpty(gatewayId)) {
  106 + boolean relationNotMatched = true;
  107 +
  108 + EntityId slaveId = EntityIdFactory.getByTypeAndId("DEVICE", selfTbIdStr);
  109 + List<EntityRelationInfo> relations = relationService.findInfoByTo(getTenantId(), slaveId, RelationTypeGroup.COMMON).get();
  110 +
  111 + for (EntityRelationInfo relationInfo : relations) {
  112 + if(!FastIotConstants.Relation.relationType.equals(relationInfo.getType())){
  113 + continue;
  114 + }
  115 + if (relationInfo.getFrom().getId().toString().equals(gateWay.getTbDeviceId())) {
  116 + relationNotMatched = false;
  117 + }else {
  118 + relationService.deleteRelation(getTenantId(),relationInfo);
  119 + sendRelationNotificationMsg(getTenantId(), relationInfo, EdgeEventActionType.RELATION_DELETED);
154 120 }
155 121 }
  122 +
  123 + if(relationNotMatched){
  124 + addRelation(getTenantId(), gateWay.getTbDeviceId(), selfTbIdStr);
  125 + }
156 126 }
  127 +
  128 +
  129 +
  130 + DeviceDTO newDeviceDTO = deviceService.insertOrUpdate(getCurrentUser().getCurrentTenantId(), deviceDTO);
157 131 return ResponseEntity.ok(newDeviceDTO);
158 132 }
159 133
... ... @@ -452,6 +426,8 @@ public class YtDeviceController extends BaseController {
452 426 DeviceId id = new DeviceId(UUID.fromString(deviceId));
453 427 tbDevice.setId(id);
454 428 }
  429 +
  430 + /**扩展设备附加信息,例如:设备地址码、上下线时间等*/
455 431 ObjectNode additionalInfo = objectMapper.createObjectNode();
456 432 additionalInfo.put(
457 433 "gateway",
... ... @@ -465,6 +441,10 @@ public class YtDeviceController extends BaseController {
465 441 .map(JsonNode::asText)
466 442 .orElse(""));
467 443 additionalInfo.put("overwriteActivityTime", false);
  444 + //TCP协议需要设备地址码分发数据给对应设备
  445 + if(StringUtils.isNotEmpty(deviceDTO.getCode())){
  446 + additionalInfo.put(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED,deviceDTO.getCode());
  447 + }
468 448
469 449 DeviceProfileId deviceProfileId =
470 450 new DeviceProfileId(UUID.fromString(deviceDTO.getProfileId()));
... ...
... ... @@ -228,7 +228,7 @@ public class YtDeviceScriptController extends BaseController {
228 228 DeviceProfileDTO deviceProfileDTO, String scriptId, String scriptText) {
229 229 DeviceProfile tbDeviceProfile = new DeviceProfile();
230 230 if (StringUtils.isNotBlank(deviceProfileDTO.getId())) {
231   - UUID profileId = UUID.fromString(deviceProfileDTO.getId());
  231 + UUID profileId = UUID.fromString(deviceProfileDTO.getTbProfileId());
232 232 tbDeviceProfile.setId(new DeviceProfileId(profileId));
233 233 tbDeviceProfile.setCreatedTime(
234 234 deviceProfileDTO.getCreateTime().toInstant(ZoneOffset.of("+8")).toEpochMilli());
... ...
... ... @@ -60,6 +60,7 @@ import org.thingsboard.server.common.data.page.PageLink;
60 60 import org.thingsboard.server.common.data.relation.EntityRelation;
61 61 import org.thingsboard.server.common.data.security.DeviceCredentials;
62 62 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
  63 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
63 64 import org.thingsboard.server.common.msg.EncryptionUtil;
64 65 import org.thingsboard.server.common.msg.TbMsg;
65 66 import org.thingsboard.server.common.msg.TbMsgDataType;
... ... @@ -277,7 +278,24 @@ public class DefaultTransportApiService implements TransportApiService {
277 278 Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
278 279 deviceCreationLock.lock();
279 280 try {
280   - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
  281 +
  282 + //Thingskit function
  283 + String slaveName = requestMsg.getDeviceName();
  284 + if(gateway.getDeviceData().getTransportConfiguration().getType() == DeviceTransportType.TCP) {
  285 + DeviceDTO iotDev = ytDeviceService.findSlaveDevice(gateway.getTenantId().getId().toString()
  286 + , gateway.getId().getId().toString()
  287 + ,requestMsg.getDeviceName());
  288 + if(iotDev == null ){
  289 + GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder();
  290 + return TransportApiResponseMsg.newBuilder()
  291 + .setGetOrCreateDeviceResponseMsg(builder.build())
  292 + .build();
  293 + }
  294 + slaveName = iotDev.getName();
  295 + }
  296 +
  297 + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), slaveName);
  298 +
281 299 if (device == null) {
282 300 TenantId tenantId = gateway.getTenantId();
283 301 device = new Device();
... ...
... ... @@ -43,6 +43,8 @@ public interface DeviceProfileService {
43 43
44 44 DeviceProfile createDefaultDeviceProfile(TenantId tenantId);
45 45
  46 + DeviceProfile createDeviceProfile(TenantId tenantId,String name);
  47 +
46 48 DeviceProfile findDefaultDeviceProfile(TenantId tenantId);
47 49
48 50 DeviceProfileInfo findDefaultDeviceProfileInfo(TenantId tenantId);
... ...
... ... @@ -27,8 +27,13 @@ public class YtTcpDeviceProfileTransportConfiguration implements DeviceProfileTr
27 27 private TcpDataTypeEnum dataFormat = TcpDataTypeEnum.HEX;
28 28 private String scriptId;
29 29 private String scriptText;
30   - private String pingText;
31   -
  30 +// private String pingText;
  31 + @NoXss
  32 + private String telemetryTopic = "03";
  33 + @NoXss
  34 + private String attributesTopic = "01";
  35 + @NoXss
  36 + private String rpcTopic = "05";
32 37 @Override
33 38 public DeviceTransportType getType() {
34 39 return DeviceTransportType.TCP;
... ...
... ... @@ -12,6 +12,7 @@ public interface FastIotConstants {
12 12 String CHART_EXECUTE_CONDITION = "executeCondition";
13 13 String CHART_EXECUTE_ATTRIBUTES = "executeAttributes";
14 14 String ASSERT_DEFAULT_NAME = "default";
  15 + public static final String TCP_DEVICE_IDENTIFY_FILED = "deviceCode";
15 16 class DefaultOrder {
16 17 public static final String CREATE_TIME="create_time";
17 18 }
... ... @@ -62,7 +63,6 @@ public interface FastIotConstants {
62 63 * LDAPS 远程方法调用
63 64 */
64 65 public static final String LOOKUP_LDAPS = "ldaps:";
65   -
66 66 /**
67 67 * 定时任务违规的字符
68 68 */
... ...
... ... @@ -53,6 +53,10 @@ public class DeviceDTO extends TenantDTO {
53 53 groups = {AddGroup.class})
54 54 private String sn;
55 55
  56 +
  57 + @ApiModelProperty(value = "设备标识符,例如:地址码")
  58 + private String code;
  59 +
56 60 @NotEmpty(
57 61 message = "所属组织不能为空或者空字符串",
58 62 groups = {AddGroup.class})
... ...
... ... @@ -42,6 +42,7 @@ public class TcpTransportContext extends TransportContext {
42 42 @Autowired(required = false)
43 43 private TcpSslHandlerProvider sslHandlerProvider;
44 44
  45 + /**注入多种数据协议处理器,例如:modbus等*/
45 46 @Getter
46 47 @Autowired
47 48 private JsonTcpAdaptor jsonTcpAdaptor;
... ...
... ... @@ -15,7 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.transport.tcp;
17 17
18   -import com.google.gson.JsonParseException;
  18 +import com.fasterxml.jackson.databind.JsonNode;
19 19 import io.netty.buffer.ByteBuf;
20 20 import io.netty.buffer.Unpooled;
21 21 import io.netty.channel.ChannelFuture;
... ... @@ -28,11 +28,14 @@ import io.netty.util.concurrent.Future;
28 28 import io.netty.util.concurrent.GenericFutureListener;
29 29 import lombok.extern.slf4j.Slf4j;
30 30 import org.apache.commons.lang3.StringUtils;
31   -import org.thingsboard.server.common.data.*;
32   -import org.thingsboard.server.common.data.device.profile.MqttTopics;
  31 +import org.thingsboard.server.common.data.DataConstants;
  32 +import org.thingsboard.server.common.data.Device;
  33 +import org.thingsboard.server.common.data.DeviceProfile;
  34 +import org.thingsboard.server.common.data.DeviceTransportType;
33 35 import org.thingsboard.server.common.data.id.DeviceId;
34 36 import org.thingsboard.server.common.data.id.OtaPackageId;
35 37 import org.thingsboard.server.common.data.ota.OtaPackageType;
  38 +import org.thingsboard.server.common.data.rpc.RpcStatus;
36 39 import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
37 40 import org.thingsboard.server.common.msg.EncryptionUtil;
38 41 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
... ... @@ -41,18 +44,22 @@ import org.thingsboard.server.common.transport.TransportService;
41 44 import org.thingsboard.server.common.transport.TransportServiceCallback;
42 45 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
43 46 import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  47 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
44 48 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  49 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  50 +import org.thingsboard.server.common.transport.service.SessionMetaData;
45 51 import org.thingsboard.server.common.transport.util.SslUtil;
46 52 import org.thingsboard.server.gen.transport.TransportProtos;
47   -import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
48 53 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
49   -import org.thingsboard.server.queue.scheduler.SchedulerComponent;
50 54 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
51   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
  55 +import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
52 56 import org.thingsboard.server.transport.tcp.session.TCPMessage;
  57 +import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
53 58 import org.thingsboard.server.transport.tcp.util.ByteUtils;
54 59
55 60 import javax.net.ssl.SSLPeerUnverifiedException;
  61 +import java.io.IOException;
  62 +import java.io.UnsupportedEncodingException;
56 63 import java.net.InetSocketAddress;
57 64 import java.security.cert.Certificate;
58 65 import java.security.cert.X509Certificate;
... ... @@ -62,13 +69,13 @@ import java.util.UUID;
62 69 import java.util.concurrent.ConcurrentHashMap;
63 70 import java.util.concurrent.ConcurrentMap;
64 71 import java.util.regex.Matcher;
65   -import java.util.regex.Pattern;
66 72
67 73 import static com.amazonaws.util.StringUtils.UTF8;
68 74 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
69 75 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
70   -import static io.netty.handler.codec.mqtt.MqttMessageType.*;
71   -import static io.netty.handler.codec.mqtt.MqttQoS.*;
  76 +import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
  77 +import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
  78 +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
72 79 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
73 80 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
74 81
... ... @@ -78,25 +85,18 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
78 85 @Slf4j
79 86 public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
80 87
81   - private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
82   - private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
83   -
84   -
85   - private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
86   -
87   - private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
88 88
89 89 private final UUID sessionId;
90 90 private final TcpTransportContext context;
91 91 private final TransportService transportService;
92   - private final SchedulerComponent scheduler;
93 92 private final SslHandler sslHandler;
94 93
95 94
96 95 /**需要处理的消息队列,例如:需要下发给设备的,设备上传的。*/
97   - final DeviceSessionCtx deviceSessionCtx;
  96 + final TcpDeviceSessionCtx deviceSessionCtx;
98 97 volatile InetSocketAddress address;
99 98
  99 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
100 100 private final ConcurrentHashMap<String, String> otaPackSessions;
101 101 private final ConcurrentHashMap<String, Integer> chunkSizes;
102 102 private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
... ... @@ -107,9 +107,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
107 107 this.sessionId = UUID.randomUUID();
108 108 this.context = context;
109 109 this.transportService = context.getTransportService();
110   - this.scheduler = context.getScheduler();
111 110 this.sslHandler = sslHandler;
112   - this.deviceSessionCtx = new DeviceSessionCtx(sessionId, context);
  111 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
113 112 this.otaPackSessions = new ConcurrentHashMap<>();
114 113 this.chunkSizes = new ConcurrentHashMap<>();
115 114 this.rpcAwaitingAck = new ConcurrentHashMap<>();
... ... @@ -136,8 +135,14 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
136 135 try {
137 136 if (msg instanceof ByteBuf) {
138 137 ByteBuf message = (ByteBuf) msg;
  138 + byte[] byteMsg = ByteUtils.buf2Bytes(message);
139 139
140   - processTcpMsg(ctx, ByteUtils.buf2Bytes(message));
  140 + deviceSessionCtx.setChannel(ctx);
  141 + if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
  142 + processConnect(ctx, ByteUtils.getString(byteMsg, ByteUtils.UTF_8));
  143 + } else {
  144 + enqueueRegularSessionMsg(ctx,message);
  145 + }
141 146
142 147 } else {
143 148 log.debug("【{}】 Received non tcp message: 【{}】", sessionId, msg.getClass().getSimpleName());
... ... @@ -161,18 +166,10 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
161 166 return address;
162 167 }
163 168
164   - void processTcpMsg(ChannelHandlerContext ctx, byte[] msg) {
165   - deviceSessionCtx.setChannel(ctx);
166   - if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
167   - processConnect(ctx, ByteUtils.getString(msg, ByteUtils.UTF_8));
168   - } else {
169   - enqueueRegularSessionMsg(ctx,msg);
170   - }
171   - }
172 169
173 170
174 171
175   - void enqueueRegularSessionMsg(ChannelHandlerContext ctx, byte[] msg) {
  172 + void enqueueRegularSessionMsg(ChannelHandlerContext ctx, ByteBuf msg) {
176 173 final int queueSize = deviceSessionCtx.getMsgQueueSize();
177 174 if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
178 175 log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
... ... @@ -181,90 +178,79 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
181 178 return;
182 179 }
183 180
184   - deviceSessionCtx.addToQueue(new TCPMessage(MqttMessageType.PUBLISH,msg));
185   - processMsgQueue(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
  181 + TCPMessage message =deviceSessionCtx.getPayloadAdaptor().createTcpMessage(deviceSessionCtx,msg);
  182 + deviceSessionCtx.addToQueue(message);
  183 + processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
  184 +
186 185 }
187 186
188 187
189 188
190   - void processMsgQueue(ChannelHandlerContext ctx) {
  189 + void processQueueMessage(ChannelHandlerContext ctx) {
191 190 if (!deviceSessionCtx.isConnected()) {
192 191 log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
193 192 return;
194 193 }
195   - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processRegularSessionMsg(ctx, msg));
196   - }
197   -
198   - void processRegularSessionMsg(ChannelHandlerContext ctx, TCPMessage msg) {
199   - switch (msg.getMessageType()) {
200   - case PUBLISH:
201   - processPublish(ctx, msg);
202   - break;
203   - case SUBSCRIBE:
204   -// processSubscribe(ctx, (MqttSubscribeMessage) msg);
205   - break;
206   - case UNSUBSCRIBE:
207   -// processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
208   - break;
209   -
210   - case DISCONNECT:
211   - ctx.close();
212   - break;
213   - case PUBACK:
214   -// int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
215   -// TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
216   -// if (rpcRequest != null) {
217   -// transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
218   -// }
219   - break;
220   - default:
221   - break;
222   - }
  194 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
223 195 }
224 196
225   - private ByteBuf toDeviceMsg(byte[] msgs){
226   - return Unpooled.copiedBuffer(msgs);
227   - }
228 197
229   - private String textFromMessage(byte[] payload){
230   - if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.ASCII)){
231   - return ByteUtils.getString(payload,ByteUtils.UTF_8);
232   - }else{
233   - return ByteUtils.bytesToHex(payload);
234   - }
235   - }
236   - private void processPublish(ChannelHandlerContext ctx, TCPMessage mqttMsg) {
237   - if (!checkConnected(ctx, mqttMsg)) {
  198 +
  199 +
  200 +
  201 +
  202 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, TCPMessage tcpMessage) {
  203 + if (!checkConnected(ctx, tcpMessage)) {
238 204 return;
239 205 }
240   - if(deviceSessionCtx.getPingText().equals(mqttMsg.getMessage())){
241   - ctx.channel().writeAndFlush(toDeviceMsg(mqttMsg.getMessage()));
  206 + log.error("【{}】设备【{}】收到数据【{}】", sessionId,deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
  207 + if (!deviceSessionCtx.getDeviceCode().equals(tcpMessage.getMessage()) && gatewaySessionHandler != null) {
  208 + processGatewayDeviceMsg(ctx, tcpMessage);
242 209 transportService.reportActivity(deviceSessionCtx.getSessionInfo());
243   - return;
  210 + } else {
  211 + processDirectDeviceMsg(ctx,tcpMessage);
244 212 }
245   - String dataStr = textFromMessage(mqttMsg.getMessage());
246   - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), dataStr);
247   -
248   - processDevicePublish(ctx, dataStr);
249 213 }
250 214
251 215
252   - private void processDevicePublish(ChannelHandlerContext ctx, String mqttMsg) {
  216 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TCPMessage tcpMessage) {
  217 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
253 218 try {
254   -// Matcher fwMatcher;
  219 + String topicName = tcpMessage.getTopic();
  220 + if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
  221 + gatewaySessionHandler.onDeviceAttributes(tcpMessage);
  222 + } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
  223 + gatewaySessionHandler.onDeviceTelemetry(tcpMessage);
  224 +
  225 + } else if (deviceSessionCtx.isToDeviceRpcResponseTopic(topicName)) {
  226 +
  227 + } else {
  228 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  229 + pushDeviceMsg(ctx,tcpMessage);
  230 + }
  231 + } catch (AdaptorException e) {
  232 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  233 + ctx.close();
  234 + }
  235 + }
  236 +
  237 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TCPMessage tcpMessage) {
  238 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
  239 + try {
  240 + String topicName = tcpMessage.getTopic();
255 241 TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
256   - TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
257   - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, null);
258   -// if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
259   -// TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
260   -// transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
261   -// } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
  242 + if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
  243 +// TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, tcpMessage.getMessage());
  244 +// transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, tcpMessage.getRequestId(), postAttributeMsg));
  245 + } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
  246 + TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, tcpMessage.getMessage());
  247 + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, null);
262 248 // } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
263 249 // TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
264 250 // transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
265 251 // attrReqTopicType = TopicType.V1;
266   -// } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
267   -// TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
  252 + } else if (deviceSessionCtx.isToDeviceRpcResponseTopic(topicName)) {
  253 +// TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage.getMessage(), MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
268 254 // transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
269 255 // } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
270 256 // TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
... ... @@ -328,12 +314,12 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
328 314 // TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
329 315 // transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
330 316 // attrReqTopicType = TopicType.V2;
331   -// } else {
332   -// transportService.reportActivity(deviceSessionCtx.getSessionInfo());
333   -// ack(ctx, msgId);
334   -// }
  317 + } else {
  318 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  319 + pushDeviceMsg(ctx,tcpMessage);
  320 + }
335 321 } catch (AdaptorException e) {
336   - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, mqttMsg, e);
  322 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
337 323 ctx.close();
338 324 }
339 325 }
... ... @@ -351,7 +337,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
351 337 }
352 338
353 339 if (chunkSize > context.getMaxPayloadSize()) {
354   - sendOtaPackageError(ctx, PAYLOAD_TOO_LARGE);
  340 +// sendOtaPackageError(ctx, PAYLOAD_TOO_LARGE);
355 341 return;
356 342 }
357 343
... ... @@ -373,18 +359,16 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
373 359 }
374 360 }
375 361
376   - private void ack(ChannelHandlerContext ctx, int msgId) {
377   - if (msgId > 0) {
378   - ctx.writeAndFlush(createMqttPubAckMsg(msgId));
379   - }
380   - }
381 362
382   - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
  363 +
  364 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String msgId, final TCPMessage msg) {
383 365 return new TransportServiceCallback<>() {
384 366 @Override
385 367 public void onSuccess(Void dummy) {
386 368 log.trace("[{}] Published msg: {}", sessionId, msg);
387   - ack(ctx, msgId);
  369 + if(StringUtils.isNotEmpty(msgId)){
  370 + pushDeviceMsg(ctx,msg);
  371 + }
388 372 }
389 373
390 374 @Override
... ... @@ -395,39 +379,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
395 379 };
396 380 }
397 381
398   - private class DeviceProvisionCallback implements TransportServiceCallback<ProvisionDeviceResponseMsg> {
399   - private final ChannelHandlerContext ctx;
400   - private final int msgId;
401   - private final TransportProtos.ProvisionDeviceRequestMsg msg;
402   -
403   - DeviceProvisionCallback(ChannelHandlerContext ctx, int msgId, TransportProtos.ProvisionDeviceRequestMsg msg) {
404   - this.ctx = ctx;
405   - this.msgId = msgId;
406   - this.msg = msg;
407   - }
408   -
409   - @Override
410   - public void onSuccess(TransportProtos.ProvisionDeviceResponseMsg provisionResponseMsg) {
411   -// log.trace("[{}] Published msg: {}", sessionId, msg);
412   -// ack(ctx, msgId);
413   -// try {
414   -// if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) {
415   -// deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
416   -// } else {
417   -// deviceSessionCtx.getContext().getAscallAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
418   -// }
419   -// scheduler.schedule((Callable<ChannelFuture>) ctx::close, 60, TimeUnit.SECONDS);
420   -// } catch (Exception e) {
421   -// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
422   -// }
423   - }
424   -
425   - @Override
426   - public void onError(Throwable e) {
427   - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
428   - ctx.close();
429   - }
430   - }
431 382
432 383 private class OtaPackageCallback implements TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> {
433 384 private final ChannelHandlerContext ctx;
... ... @@ -453,7 +404,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
453 404 otaPackSessions.put(requestId, firmwareId.toString());
454 405 sendOtaPackage(ctx, msgId, firmwareId.toString(), requestId, chunkSize, chunk, OtaPackageType.valueOf(response.getType()));
455 406 } else {
456   - sendOtaPackageError(ctx, response.getResponseStatus().toString());
  407 +// sendOtaPackageError(ctx, response.getResponseStatus().toString());
457 408 }
458 409 }
459 410
... ... @@ -466,7 +417,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
466 417
467 418 private void sendOtaPackage(ChannelHandlerContext ctx, int msgId, String firmwareId, String requestId, int chunkSize, int chunk, OtaPackageType type) {
468 419 log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId);
469   - ack(ctx, msgId);
  420 + pushDeviceMsg(ctx,new TCPMessage(requestId));
470 421 try {
471 422 byte[] firmwareChunk = context.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk);
472 423 deviceSessionCtx.getPayloadAdaptor()
... ... @@ -477,174 +428,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
477 428 }
478 429 }
479 430
480   - private void sendOtaPackageError(ChannelHandlerContext ctx, String error) {
481   - log.warn("[{}] {}", sessionId, error);
482   - deviceSessionCtx.getChannel().writeAndFlush(deviceSessionCtx
483   - .getPayloadAdaptor()
484   - .createMqttPublishMsg(deviceSessionCtx, MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC, error.getBytes()));
485   - ctx.close();
486   - }
487   -
488   - private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
489   -// if (!checkConnected(ctx, mqttMsg)) {
490   -// return;
491   -// }
492   -// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
493   -// List<Integer> grantedQoSList = new ArrayList<>();
494   -// boolean activityReported = false;
495   -// for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
496   -// String topic = subscription.topicName();
497   -// MqttQoS reqQoS = subscription.qualityOfService();
498   -// try {
499   -// switch (topic) {
500   -// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
501   -// processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
502   -// activityReported = true;
503   -// break;
504   -// }
505   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
506   -// processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
507   -// activityReported = true;
508   -// break;
509   -// }
510   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
511   -// processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
512   -// activityReported = true;
513   -// break;
514   -// }
515   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
516   -// processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
517   -// activityReported = true;
518   -// break;
519   -// }
520   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
521   -// processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
522   -// activityReported = true;
523   -// break;
524   -// }
525   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
526   -// processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
527   -// activityReported = true;
528   -// break;
529   -// }
530   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
531   -// processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
532   -// activityReported = true;
533   -// break;
534   -// }
535   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
536   -// processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
537   -// activityReported = true;
538   -// break;
539   -// }
540   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
541   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
542   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
543   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
544   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
545   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
546   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
547   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
548   -// case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
549   -// case MqttTopics.GATEWAY_RPC_TOPIC:
550   -// case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
551   -// case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
552   -// case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
553   -// case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
554   -// case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
555   -//
556   -// break;
557   -// default:
558   -// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
559   -// grantedQoSList.add(FAILURE.value());
560   -// break;
561   -// }
562   -// } catch (Exception e) {
563   -// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
564   -// grantedQoSList.add(FAILURE.value());
565   -// }
566   -// }
567   -// if (!activityReported) {
568   -// transportService.reportActivity(deviceSessionCtx.getSessionInfo());
569   -// }
570   -// ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
571   - }
572 431
573   -// private void processRpcSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
574   -// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
575   -// rpcSubTopicType = topicType;
576   -// }
577   -//
578   -// private void processAttributesSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
579   -// transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
580   -// attrSubTopicType = topicType;
581   -// }
582   -
583   -
584   -
585   - private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
586   -// if (!checkConnected(ctx, mqttMsg)) {
587   -// return;
588   -// }
589   -// boolean activityReported = false;
590   -// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
591   -// for (String topicName : mqttMsg.payload().topics()) {
592   -// try {
593   -// switch (topicName) {
594   -// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
595   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
596   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC:
597   -// case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
598   -// transportService.process(deviceSessionCtx.getSessionInfo(),
599   -// TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
600   -// activityReported = true;
601   -// break;
602   -// }
603   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC:
604   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC:
605   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC:
606   -// case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
607   -// transportService.process(deviceSessionCtx.getSessionInfo(),
608   -// TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
609   -// activityReported = true;
610   -// break;
611   -// }
612   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
613   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
614   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
615   -// case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
616   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
617   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
618   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
619   -// case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
620   -// case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
621   -// case MqttTopics.GATEWAY_RPC_TOPIC:
622   -// case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
623   -// case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
624   -// case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
625   -// case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
626   -// case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
627   -// case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: {
628   -// activityReported = true;
629   -// break;
630   -// }
631   -// }
632   -// } catch (Exception e) {
633   -// log.debug("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
634   -// }
635   -// }
636   -// if (!activityReported) {
637   -// transportService.reportActivity(deviceSessionCtx.getSessionInfo());
638   -// }
639   -// ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
640   - }
641   -
642   - private MqttMessage createUnSubAckMessage(int msgId) {
643   - MqttFixedHeader mqttFixedHeader =
644   - new MqttFixedHeader(UNSUBACK, false, AT_MOST_ONCE, false, 0);
645   - MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
646   - return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
647   - }
648 432
649 433 void processConnect(ChannelHandlerContext ctx, String accessToken) {
650 434 log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
... ... @@ -772,7 +556,23 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
772 556 }
773 557 }
774 558
775   -
  559 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  560 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  561 + try {
  562 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  563 + if (infoNode != null) {
  564 + JsonNode gatewayNode = infoNode.get("gateway");
  565 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  566 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  567 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  568 + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  569 + }
  570 + }
  571 + }
  572 + } catch (IOException e) {
  573 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  574 + }
  575 + }
776 576
777 577 @Override
778 578 public void operationComplete(Future<? super Void> future) throws Exception {
... ... @@ -785,6 +585,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
785 585 log.debug("[{}] Client disconnected!", sessionId);
786 586 transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
787 587 transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  588 + if (gatewaySessionHandler != null) {
  589 + gatewaySessionHandler.onGatewayDisconnect();
  590 + }
788 591 deviceSessionCtx.setDisconnected();
789 592 }
790 593 deviceSessionCtx.release();
... ... @@ -804,11 +607,14 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
804 607 transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
805 608 @Override
806 609 public void onSuccess(Void msg) {
807   - transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), TcpTransportHandler.this);
  610 + SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), TcpTransportHandler.this);
  611 + checkGatewaySession(sessionMetaData);
808 612 ctx.writeAndFlush(createTcpConnAckMsg(CONNECTION_ACCEPTED));
809 613 deviceSessionCtx.setConnected(true);
810 614 log.debug("[{}] Client connected!", sessionId);
811   - transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
  615 +
  616 + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
  617 + transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
812 618 }
813 619
814 620 @Override
... ... @@ -857,11 +663,10 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
857 663
858 664 @Override
859 665 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
860   -// log.trace("[{}] Received RPC command to device", sessionId);
861   -// String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
862   -// TcpTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
863   -// try {
864   -// adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> {
  666 + log.error("【{}】下发RPC命令【{}】给设备【{}】", sessionId,rpcRequest.getParams(),deviceSessionCtx.getDeviceInfo().getDeviceName());
  667 + TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
  668 + try {
  669 + adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
865 670 // int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
866 671 // if (isAckExpected(payload)) {
867 672 // rpcAwaitingAck.put(msgId, rpcRequest);
... ... @@ -872,30 +677,31 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
872 677 // }
873 678 // }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
874 679 // }
875   -// var cf = publish(payload, deviceSessionCtx);
876   -// cf.addListener(result -> {
877   -// if (result.cause() == null) {
  680 + var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload);
  681 + cf.addListener(result -> {
  682 + if (result.cause() == null) {
878 683 // if (!isAckExpected(payload)) {
879 684 // transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
880   -// } else if (rpcRequest.getPersisted()) {
881   -// transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
882   -// }
883   -// } else {
884   -// // TODO: send error
885   -// }
886   -// });
887   -// });
888   -// } catch (Exception e) {
889   -// transportService.process(deviceSessionCtx.getSessionInfo(),
890   -// TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
891   -// .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
892   -// log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
893   -// }
  685 +// } else
  686 + if (rpcRequest.getPersisted()) {
  687 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  688 + }
  689 + } else {
  690 + // TODO: send error
  691 + }
  692 + });
  693 + });
  694 + } catch (Exception e) {
  695 + transportService.process(deviceSessionCtx.getSessionInfo(),
  696 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  697 + .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
  698 + log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
  699 + }
894 700 }
895 701
896 702 @Override
897 703 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
898   -// log.trace("[{}] Received RPC response from server", sessionId);
  704 + log.error("[{}] 服务端响应设备的RPC请求", sessionId);
899 705 // String baseTopic = toServerRpcSubTopicType.getRpcResponseTopicBase();
900 706 // TcpTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(toServerRpcSubTopicType);
901 707 // try {
... ... @@ -905,8 +711,30 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
905 711 // }
906 712 }
907 713
908   - private ChannelFuture publish(MqttMessage message, DeviceSessionCtx deviceSessionCtx) {
909   - return deviceSessionCtx.getChannel().writeAndFlush(message);
  714 + /**
  715 + * 往设备推送消息
  716 + * @param tcp
  717 + * @return
  718 + */
  719 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx,TCPMessage tcp) {
  720 + try {
  721 + String message = tcp.getMessage();
  722 + byte[] payloadInBytes ;
  723 + if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
  724 + payloadInBytes = ByteUtils.hexStr2Bytes(message);
  725 + }else{
  726 + payloadInBytes = message.getBytes(ByteUtils.UTF_8);
  727 + }
  728 +// ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
  729 +// ByteBuf payload = ALLOCATOR.buffer();
  730 +// payload.writeBytes(payloadInBytes);
  731 + ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);
  732 +
  733 + return ctx.writeAndFlush(payload);
  734 + } catch (UnsupportedEncodingException e) {
  735 + log.error(e.getMessage(),e);
  736 + throw new RuntimeException(e);
  737 + }
910 738 }
911 739
912 740 private boolean isAckExpected(MqttMessage message) {
... ...
1 1 /**
2 2 * Copyright © 2016-2022 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -23,30 +23,26 @@ import com.google.gson.JsonObject;
23 23 import com.google.gson.JsonParser;
24 24 import com.google.gson.JsonSyntaxException;
25 25 import io.netty.buffer.ByteBuf;
26   -import io.netty.handler.codec.mqtt.MqttFixedHeader;
27   -import io.netty.handler.codec.mqtt.MqttMessage;
28 26 import io.netty.handler.codec.mqtt.MqttPublishMessage;
29   -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
30 27 import lombok.extern.slf4j.Slf4j;
31 28 import org.springframework.beans.factory.annotation.Autowired;
32 29 import org.springframework.stereotype.Component;
33 30 import org.springframework.util.StringUtils;
34   -import org.thingsboard.server.common.data.device.profile.MqttTopics;
35 31 import org.thingsboard.server.common.data.ota.OtaPackageType;
36 32 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
37 33 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
38   -import org.thingsboard.server.gen.transport.TransportProtos;
39 34 import org.thingsboard.server.common.yunteng.script.YtScriptInvokeService;
40 35 import org.thingsboard.server.common.yunteng.script.YtScriptType;
41   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
  36 +import org.thingsboard.server.gen.transport.TransportProtos;
  37 +import org.thingsboard.server.transport.tcp.session.TCPMessage;
  38 +import org.thingsboard.server.transport.tcp.session.TcpDeviceWareSessionContext;
42 39
  40 +import java.io.UnsupportedEncodingException;
43 41 import java.nio.charset.Charset;
44 42 import java.nio.charset.StandardCharsets;
45 43 import java.util.*;
46 44 import java.util.concurrent.ExecutionException;
47 45
48   -import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT;
49   -
50 46
51 47 /**
52 48 * @author Andrew Shvayka
... ... @@ -59,8 +55,9 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
59 55 @Autowired
60 56 private YtScriptInvokeService jsEngine;
61 57 private static final JsonParser parser = new JsonParser();
  58 +
62 59 @Override
63   - public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, String inbound) throws AdaptorException {
  60 + public TransportProtos.PostTelemetryMsg convertToPostTelemetry(TcpDeviceWareSessionContext ctx, String inbound) throws AdaptorException {
64 61 try {
65 62 JsonElement payload = validatePayload(ctx, inbound, false);
66 63 return JsonConverter.convertToTelemetryProto(payload);
... ... @@ -80,7 +77,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
80 77 }
81 78
82 79 @Override
83   - public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  80 + public TransportProtos.PostAttributeMsg convertToPostAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
84 81 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
85 82 try {
86 83 return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
... ... @@ -91,7 +88,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
91 88 }
92 89
93 90 @Override
94   - public TransportProtos.ClaimDeviceMsg convertToClaimDevice(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  91 + public TransportProtos.ClaimDeviceMsg convertToClaimDevice(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
95 92 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true);
96 93 try {
97 94 return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload);
... ... @@ -102,7 +99,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
102 99 }
103 100
104 101 @Override
105   - public TransportProtos.ProvisionDeviceRequestMsg convertToProvisionRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  102 + public TransportProtos.ProvisionDeviceRequestMsg convertToProvisionRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
106 103 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
107 104 try {
108 105 return JsonConverter.convertToProvisionRequestMsg(payload);
... ... @@ -112,64 +109,73 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
112 109 }
113 110
114 111 @Override
115   - public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  112 + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
116 113 return processGetAttributeRequestMsg(inbound, topicBase);
117 114 }
118 115
119 116 @Override
120   - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  117 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String inbound, String topicBase) throws AdaptorException {
121 118 return processToDeviceRpcResponseMsg(inbound, topicBase);
122 119 }
123 120
124 121 @Override
125   - public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  122 + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
126 123 return processToServerRpcRequestMsg(ctx, inbound, topicBase);
127 124 }
128 125
129 126 @Override
130   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
  127 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
131 128 return processConvertFromAttributeResponseMsg(ctx, responseMsg, topicBase);
132 129 }
133 130
134 131 @Override
135   - public Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  132 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
136 133 return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg);
137 134 }
138 135
139 136 @Override
140   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
141   - return Optional.of(createMqttPublishMsg(ctx, topic, JsonConverter.toJson(notificationMsg)));
  137 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
  138 + return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(notificationMsg)));
142 139 }
143 140
144 141 @Override
145   - public Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
  142 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
146 143 JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, notificationMsg);
147   - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, result));
  144 + return Optional.of(createTcpMessage(ctx, result));
148 145 }
149 146
150 147 @Override
151   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) {
152   - return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
  148 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws UnsupportedEncodingException {
  149 + byte[] result = null;
  150 + String payload = rpcRequest.getParams();//methodThingskit
  151 +// if(ctx.getPayloadType().equals(TcpDataTypeEnum.ASCII)){
  152 +// }else{
  153 +// result= ByteUtils.hexToBytes(payload);
  154 +// }
  155 + if (!payload.startsWith("{") && !payload.endsWith("}")) {
  156 + payload = payload.replace("\"","");;
  157 + }
  158 + return Optional.of(createTcpMessage(ctx, payload));
153 159 }
154 160
155 161 @Override
156   - public Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
157   - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, JsonConverter.toGatewayJson(deviceName, rpcRequest)));
  162 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  163 + return Optional.of(createTcpMessage(ctx, JsonConverter.toGatewayJson(deviceName, rpcRequest)));
158 164 }
159 165
160 166 @Override
161   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
162   - return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
  167 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
  168 + return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(rpcResponse)));
163 169 }
164 170
165 171 @Override
166   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ProvisionDeviceResponseMsg provisionResponse) {
167   - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, JsonConverter.toJson(provisionResponse)));
  172 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ProvisionDeviceResponseMsg provisionResponse) {
  173 + return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(provisionResponse)));
168 174 }
169 175
170 176 @Override
171   - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) {
172   - return Optional.of(createMqttPublishMsg(ctx, String.format(DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT, firmwareType.getKeyPrefix(), requestId, chunk), firmwareChunk));
  177 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) {
  178 + return Optional.of(null);
173 179 }
174 180
175 181 public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
... ... @@ -204,19 +210,20 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
204 210 }
205 211 }
206 212
207   - private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException {
208   - String topicName = inbound.variableHeader().topicName();
209   - try {
210   - int requestId = getRequestId(topicName, topicBase);
211   - String payload = inbound.payload().toString(UTF8);
212   - return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
213   - } catch (RuntimeException e) {
214   - log.debug("Failed to decode rpc response", e);
215   - throw new AdaptorException(e);
216   - }
  213 + private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(String inbound, String topicBase) throws AdaptorException {
  214 +// String topicName = inbound.variableHeader().topicName();
  215 +// try {
  216 +// int requestId = getRequestId(topicName, topicBase);
  217 +// String payload = inbound.payload().toString(UTF8);
  218 +// return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
  219 +// } catch (RuntimeException e) {
  220 +// log.debug("Failed to decode rpc response", e);
  221 +// throw new AdaptorException(e);
  222 +// }
  223 + return null;
217 224 }
218 225
219   - private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  226 + private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
220 227 String topicName = inbound.variableHeader().topicName();
221 228 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
222 229 try {
... ... @@ -228,37 +235,28 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
228 235 }
229 236 }
230 237
231   - private Optional<MqttMessage> processConvertFromAttributeResponseMsg(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
  238 + private Optional<TCPMessage> processConvertFromAttributeResponseMsg(TcpDeviceWareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
232 239 if (!StringUtils.isEmpty(responseMsg.getError())) {
233 240 throw new AdaptorException(responseMsg.getError());
234 241 } else {
235 242 int requestId = responseMsg.getRequestId();
236 243 if (requestId >= 0) {
237   - return Optional.of(createMqttPublishMsg(ctx,
238   - topicBase + requestId,
  244 + return Optional.of(createTcpMessage(ctx,
239 245 JsonConverter.toJson(responseMsg)));
240 246 }
241 247 return Optional.empty();
242 248 }
243 249 }
244 250
245   - private Optional<MqttMessage> processConvertFromGatewayAttributeResponseMsg(DeviceSessionCtx ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  251 + private Optional<TCPMessage> processConvertFromGatewayAttributeResponseMsg(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
246 252 if (!StringUtils.isEmpty(responseMsg.getError())) {
247 253 throw new AdaptorException(responseMsg.getError());
248 254 } else {
249 255 JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, responseMsg);
250   - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, result));
  256 + return Optional.of(createTcpMessage(ctx, result));
251 257 }
252 258 }
253 259
254   - protected MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
255   - MqttFixedHeader mqttFixedHeader = null;
256   -// new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
257   - MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
258   - ByteBuf payload = ALLOCATOR.buffer();
259   - payload.writeBytes(json.toString().getBytes(UTF8));
260   - return new MqttPublishMessage(mqttFixedHeader, header, payload);
261   - }
262 260
263 261 private Set<String> toStringSet(JsonElement requestBody, String name) {
264 262 JsonElement element = requestBody.getAsJsonObject().get(name);
... ... @@ -280,7 +278,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
280 278 return payload;
281 279 }
282 280
283   - private JsonElement validatePayload(DeviceSessionCtx session, String payload, boolean isEmptyPayloadAllowed) throws AdaptorException, ExecutionException, InterruptedException {
  281 + private JsonElement validatePayload(TcpDeviceWareSessionContext session, String payload, boolean isEmptyPayloadAllowed) throws AdaptorException, ExecutionException, InterruptedException {
284 282 if (payload == null) {
285 283 log.debug("[{}] Payload is empty!", session.getSessionId());
286 284 if (!isEmptyPayloadAllowed) {
... ... @@ -306,4 +304,23 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
306 304 return Integer.parseInt(topicName.substring(topic.length()));
307 305 }
308 306
  307 +
  308 + protected TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, JsonElement json) {
  309 +// TCPMessage msg = new TCPMessage(MqttMessageType.PUBLISH,);
  310 +//// new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
  311 +// MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
  312 +// ByteBuf payload = ALLOCATOR.buffer();
  313 +// payload.writeBytes(json.toString().getBytes(UTF8));
  314 +// return new MqttPublishMessage(mqttFixedHeader, header, payload);
  315 + return null;
  316 + }
  317 +
  318 + protected TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, String payload) {
  319 + TCPMessage message = new TCPMessage(payload);
  320 + message.setRequestId(payload.substring(0, 4));
  321 + message.setTopic(payload.substring(2, 4));
  322 + message.setDeviceCode(payload.substring(0, 2));
  323 + return message;
  324 + }
  325 +
309 326 }
... ...
... ... @@ -16,79 +16,90 @@
16 16 package org.thingsboard.server.transport.tcp.adaptors;
17 17
18 18 import io.netty.buffer.ByteBuf;
19   -import io.netty.buffer.ByteBufAllocator;
20   -import io.netty.buffer.UnpooledByteBufAllocator;
21   -import io.netty.handler.codec.mqtt.MqttFixedHeader;
22   -import io.netty.handler.codec.mqtt.MqttMessage;
23   -import io.netty.handler.codec.mqtt.MqttMessageType;
24 19 import io.netty.handler.codec.mqtt.MqttPublishMessage;
25   -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
26 20 import org.thingsboard.server.common.data.ota.OtaPackageType;
  21 +import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
27 22 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
28   -import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
29   -import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
30   -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
31   -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
32   -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
33   -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
34   -import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
35   -import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
36   -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
37   -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
38   -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
39   -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
40   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.*;
  24 +import org.thingsboard.server.transport.tcp.session.TcpDeviceWareSessionContext;
  25 +import org.thingsboard.server.transport.tcp.session.TCPMessage;
41 26
  27 +import java.io.UnsupportedEncodingException;
  28 +import java.nio.charset.Charset;
  29 +import java.nio.charset.StandardCharsets;
42 30 import java.util.Optional;
43 31 import java.util.UUID;
44 32 import java.util.concurrent.ExecutionException;
45 33
46 34 /**
47   - * @author Andrew Shvayka
  35 + * 将收到的数据流转换为接口需要的数据格式
  36 + * 1、基于解析脚本将ByteBuf转JSON对象。
  37 + * 2、将JSON对象转PROTOBUF对象。
48 38 */
49 39 public interface TcpTransportAdaptor {
50   -
51   - ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
52   -
53   - PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, String inbound) throws AdaptorException;
  40 + static char[] HEX_VOCABLE = {'0', '1', '2', '3', '4', '5', '6', '7',
  41 + '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
  42 + static final Charset UTF8 = StandardCharsets.UTF_8;
  43 + PostTelemetryMsg convertToPostTelemetry(TcpDeviceWareSessionContext ctx, String inbound) throws AdaptorException;
54 44
55 45 UUID getJsScriptEngineFunctionId(String scriptBody, String... argNames) throws ExecutionException, InterruptedException;
56   - PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
57   -
58   - GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
  46 + PostAttributeMsg convertToPostAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
59 47
60   - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
  48 + GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
61 49
62   - ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
  50 + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String mqttMsg, String topicBase) throws AdaptorException;
63 51
64   - ClaimDeviceMsg convertToClaimDevice(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  52 + ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
65 53
66   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
  54 + ClaimDeviceMsg convertToClaimDevice(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
67 55
68   - Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
  56 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
69 57
70   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
  58 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
71 59
72   - Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
  60 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
73 61
74   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException;
  62 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
75 63
76   - Optional<MqttMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
  64 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException, UnsupportedEncodingException;
77 65
78   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
  66 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
79 67
80   - ProvisionDeviceRequestMsg convertToProvisionRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  68 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
81 69
82   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
  70 + ProvisionDeviceRequestMsg convertToProvisionRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
83 71
84   - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;
  72 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
85 73
86   - default MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, byte[] payloadInBytes) {
87   - MqttFixedHeader mqttFixedHeader =null;
88   -// new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
89   - MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
90   - ByteBuf payload = ALLOCATOR.buffer();
91   - payload.writeBytes(payloadInBytes);
92   - return new MqttPublishMessage(mqttFixedHeader, header, payload);
  74 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;
  75 + public static byte[] toBytes(ByteBuf inbound) {
  76 + byte[] bytes = new byte[inbound.readableBytes()];
  77 + int readerIndex = inbound.readerIndex();
  78 + inbound.getBytes(readerIndex, bytes);
  79 + return bytes;
  80 + }
  81 + public static String bytesToHex(byte[] bs) {
  82 + StringBuilder sb = new StringBuilder();
  83 + for (byte b : bs) {
  84 + int high = (b >> 4) & 0x0f;
  85 + int low = b & 0x0f;
  86 + sb.append(HEX_VOCABLE[high]);
  87 + sb.append(HEX_VOCABLE[low]);
  88 + }
  89 + return sb.toString();
  90 + }
  91 + default TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, ByteBuf payload) {
  92 + String payloadStr;
  93 + if(ctx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
  94 + byte[] payloadBytes = toBytes(payload);
  95 + payloadStr = bytesToHex(payloadBytes);
  96 + }else{
  97 + payloadStr = payload.toString(UTF8);
  98 + }
  99 + TCPMessage message = new TCPMessage(payloadStr);
  100 + message.setRequestId(payloadStr.substring(0,4));
  101 + message.setTopic(payloadStr.substring(2,4));
  102 + message.setDeviceCode(payloadStr.substring(0,2));
  103 + return message;
93 104 }
94 105 }
... ...
... ... @@ -2,36 +2,28 @@ package org.thingsboard.server.transport.tcp.session;
2 2
3 3 import io.netty.handler.codec.mqtt.MqttMessageType;
4 4 import lombok.Data;
5   -import lombok.NoArgsConstructor;
6   -import lombok.ToString;
7 5
8 6 import java.io.Serializable;
9 7
10 8 @Data
11 9 public class TCPMessage implements Serializable {
  10 +
  11 + /**消息ID,用于请求与响应的匹配,例如:modbus由地址码和功能码组成。*/
12 12 private String requestId;
13   - private MqttMessageType messageType;
14   -
15   - private byte[] message;
16   - private boolean hex;
17   -
18   - private String deviceId;
19   -
20   - /**
21   - * TCP 消息
22   - * @param sn 设备sn
23   - * @param message 消息体
24   - * @param hex 是否是16进制
25   - */
26   - public TCPMessage(MqttMessageType messageType, String deviceId, byte[] message, boolean hex){
27   - this.messageType = messageType;
28   - this.deviceId = deviceId;
29   - this.message = message;
30   - this.hex = hex;
31   - }
32 13
33   - public TCPMessage(MqttMessageType messageType, byte[] message){
34   - this.messageType = messageType;
  14 +
  15 + private String message;
  16 +
  17 + /**数据主题,例如:modbus的功能码等。*/
  18 + private String topic;
  19 +
  20 + /**设备地址码,例如:modbus的地址吗*/
  21 + private String deviceCode;
  22 +
  23 +
  24 +
  25 + public TCPMessage(String message){
35 26 this.message = message;
36 27 }
  28 +
37 29 }
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +import io.netty.channel.ChannelHandlerContext;
  19 +import io.netty.util.ReferenceCountUtil;
  20 +import lombok.Getter;
  21 +import lombok.Setter;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  24 +
  25 +import java.util.UUID;
  26 +import java.util.concurrent.ConcurrentLinkedQueue;
  27 +import java.util.concurrent.atomic.AtomicInteger;
  28 +import java.util.concurrent.locks.Lock;
  29 +import java.util.concurrent.locks.ReentrantLock;
  30 +import java.util.function.Consumer;
  31 +
  32 +/**
  33 + * @author Andrew Shvayka
  34 + */
  35 +@Slf4j
  36 +public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext {
  37 +
  38 + @Getter
  39 + @Setter
  40 + private ChannelHandlerContext channel;
  41 +
  42 +
  43 + private final AtomicInteger msgIdSeq = new AtomicInteger(0);
  44 +
  45 + private final ConcurrentLinkedQueue<TCPMessage> msgQueue = new ConcurrentLinkedQueue<>();
  46 +
  47 + @Getter
  48 + private final Lock msgQueueProcessorLock = new ReentrantLock();
  49 +
  50 + private final AtomicInteger msgQueueSize = new AtomicInteger(0);
  51 +
  52 + @Getter
  53 + @Setter
  54 + private boolean provisionOnly = false;
  55 +
  56 + public TcpDeviceSessionCtx(UUID sessionId, TcpTransportContext context) {
  57 + super(sessionId,context);
  58 + }
  59 +
  60 + public int nextMsgId() {
  61 + return msgIdSeq.incrementAndGet();
  62 + }
  63 +
  64 +
  65 +
  66 +
  67 +
  68 +
  69 +
  70 +
  71 +
  72 +
  73 +
  74 +
  75 +
  76 + public void addToQueue(TCPMessage msg) {
  77 + msgQueueSize.incrementAndGet();
  78 + ReferenceCountUtil.retain(msg);
  79 + msgQueue.add(msg);
  80 + }
  81 +
  82 + public void tryProcessQueuedMsgs(Consumer<TCPMessage> msgProcessor) {
  83 + while (!msgQueue.isEmpty()) {
  84 + if (msgQueueProcessorLock.tryLock()) {
  85 + try {
  86 + TCPMessage msg;
  87 + while ((msg = msgQueue.poll()) != null) {
  88 + try {
  89 + msgQueueSize.decrementAndGet();
  90 + msgProcessor.accept(msg);
  91 + } finally {
  92 + ReferenceCountUtil.safeRelease(msg);
  93 + }
  94 + }
  95 + } finally {
  96 + msgQueueProcessorLock.unlock();
  97 + }
  98 + } else {
  99 + return;
  100 + }
  101 + }
  102 + }
  103 +
  104 + public int getMsgQueueSize() {
  105 + return msgQueueSize.get();
  106 + }
  107 +
  108 + public void release() {
  109 + if (!msgQueue.isEmpty()) {
  110 + log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size());
  111 + msgQueue.forEach(ReferenceCountUtil::safeRelease);
  112 + msgQueue.clear();
  113 + }
  114 + }
  115 +
  116 +
  117 +}
... ...
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/session/TcpDeviceWareSessionContext.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/session/DeviceSessionCtx.java
... ... @@ -15,86 +15,85 @@
15 15 */
16 16 package org.thingsboard.server.transport.tcp.session;
17 17
18   -import io.netty.channel.ChannelHandlerContext;
19   -import io.netty.util.ReferenceCountUtil;
  18 +import com.fasterxml.jackson.core.JsonProcessingException;
  19 +import com.fasterxml.jackson.databind.JsonNode;
20 20 import lombok.Getter;
21   -import lombok.Setter;
22 21 import lombok.extern.slf4j.Slf4j;
23 22 import org.thingsboard.server.common.data.DeviceProfile;
24 23 import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
25 24 import org.thingsboard.server.common.data.device.profile.YtTcpDeviceProfileTransportConfiguration;
  25 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
26 26 import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
  27 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
27 28 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
28 29 import org.thingsboard.server.gen.transport.TransportProtos;
29 30 import org.thingsboard.server.transport.tcp.TcpTransportContext;
30 31 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
31   -import org.thingsboard.server.transport.tcp.util.ByteUtils;
32 32
33 33 import java.util.UUID;
34   -import java.util.concurrent.ConcurrentLinkedQueue;
35 34 import java.util.concurrent.ExecutionException;
36   -import java.util.concurrent.atomic.AtomicInteger;
37   -import java.util.concurrent.locks.Lock;
38   -import java.util.concurrent.locks.ReentrantLock;
39   -import java.util.function.Consumer;
40 35
41 36 /**
42 37 * @author Andrew Shvayka
43 38 */
44 39 @Slf4j
45   -public class DeviceSessionCtx extends DeviceAwareSessionContext {
46   -
47   - @Getter
48   - @Setter
49   - private ChannelHandlerContext channel;
  40 +public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionContext {
50 41
51 42 @Getter
52 43 private final TcpTransportContext context;
53 44
54   - private final AtomicInteger msgIdSeq = new AtomicInteger(0);
55   -
56   - private final ConcurrentLinkedQueue<TCPMessage> msgQueue = new ConcurrentLinkedQueue<>();
57   -
  45 + private volatile String telemetryTopicFilter ;
  46 + private volatile String attributesTopicFilter;
  47 + private volatile String toDeviceRpcResponseTopicFilter;
58 48 @Getter
59   - private final Lock msgQueueProcessorLock = new ReentrantLock();
  49 + private volatile TcpDataTypeEnum payloadType = TcpDataTypeEnum.HEX;
60 50
61   - private final AtomicInteger msgQueueSize = new AtomicInteger(0);
  51 + private volatile TcpTransportAdaptor adaptor;
62 52
  53 + /**设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符*/
63 54 @Getter
64   - @Setter
65   - private boolean provisionOnly = false;
  55 + private volatile String deviceCode = "55";
  56 +
66 57
67   - // private volatile TcpTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
68   -// private volatile TcpTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
69   - @Getter
70   - private volatile TcpDataTypeEnum payloadType = TcpDataTypeEnum.HEX;
71   - @Getter
72   - private volatile byte[] pingText ;
73   - // private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
74   -// private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
75   -// private volatile Descriptors.Descriptor rpcResponseDynamicMessageDescriptor;
76   -// private volatile DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
77   - private volatile TcpTransportAdaptor adaptor;
78 58 @Getter
79 59 private UUID scriptId;
80 60
81   - public DeviceSessionCtx(UUID sessionId, TcpTransportContext context) {
  61 + public TcpDeviceWareSessionContext(UUID sessionId, TcpTransportContext context) {
82 62 super(sessionId);
83 63 this.context = context;
84 64 this.adaptor = context.getJsonTcpAdaptor();
85 65 }
86 66
87   - public int nextMsgId() {
88   - return msgIdSeq.incrementAndGet();
  67 +
  68 + public boolean isDeviceTelemetryTopic(String topicName) {
  69 + return telemetryTopicFilter.equals(topicName);
89 70 }
90 71
  72 + public boolean isDeviceAttributesTopic(String topicName) {
  73 + return attributesTopicFilter.equals(topicName);
  74 + }
  75 + public boolean isToDeviceRpcResponseTopic(String topicName) {
  76 + return toDeviceRpcResponseTopicFilter.equals(topicName);
  77 + }
91 78
92 79 public TcpTransportAdaptor getPayloadAdaptor() {
93   - return adaptor;
  80 + return this.adaptor;
94 81 }
95 82
96 83
  84 + @Override
  85 + public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
  86 + super.setDeviceInfo(deviceInfo);
  87 + try {
  88 + JsonNode additionalInfo = context.getMapper().readTree(deviceInfo.getAdditionalInfo());
  89 + if(additionalInfo !=null && additionalInfo.has(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED)){
  90 + deviceCode = additionalInfo.get(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED).asText();
  91 + }
  92 + } catch (JsonProcessingException e) {
  93 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, deviceInfo.getDeviceName(), e);
  94 + }
97 95
  96 + }
98 97
99 98 @Override
100 99 public void setDeviceProfile(DeviceProfile deviceProfile) {
... ... @@ -118,7 +117,9 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
118 117 } else {
119 118 payloadType = TcpDataTypeEnum.HEX;
120 119 }
121   - this.pingText = ByteUtils.getBytes(tcpConfiguration.getPingText(),ByteUtils.UTF_8);
  120 + this.attributesTopicFilter = tcpConfiguration.getAttributesTopic();
  121 + this.telemetryTopicFilter = tcpConfiguration.getTelemetryTopic();
  122 + this.toDeviceRpcResponseTopicFilter = tcpConfiguration.getRpcTopic();
122 123 String scriptBody = tcpConfiguration.getScriptText();
123 124 try {
124 125 this.scriptId = this.adaptor.getJsScriptEngineFunctionId(scriptBody);
... ... @@ -130,50 +131,4 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
130 131 }
131 132 }
132 133
133   -
134   -
135   -
136   -
137   -
138   - public void addToQueue(TCPMessage msg) {
139   - msgQueueSize.incrementAndGet();
140   - ReferenceCountUtil.retain(msg);
141   - msgQueue.add(msg);
142   - }
143   -
144   - public void tryProcessQueuedMsgs(Consumer<TCPMessage> msgProcessor) {
145   - while (!msgQueue.isEmpty()) {
146   - if (msgQueueProcessorLock.tryLock()) {
147   - try {
148   - TCPMessage msg;
149   - while ((msg = msgQueue.poll()) != null) {
150   - try {
151   - msgQueueSize.decrementAndGet();
152   - msgProcessor.accept(msg);
153   - } finally {
154   - ReferenceCountUtil.safeRelease(msg);
155   - }
156   - }
157   - } finally {
158   - msgQueueProcessorLock.unlock();
159   - }
160   - } else {
161   - return;
162   - }
163   - }
164   - }
165   -
166   - public int getMsgQueueSize() {
167   - return msgQueueSize.get();
168   - }
169   -
170   - public void release() {
171   - if (!msgQueue.isEmpty()) {
172   - log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size());
173   - msgQueue.forEach(ReferenceCountUtil::safeRelease);
174   - msgQueue.clear();
175   - }
176   - }
177   -
178   -
179 134 }
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +import io.netty.channel.ChannelFuture;
  19 +import io.netty.handler.codec.mqtt.MqttMessage;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.data.DeviceProfile;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  24 +import org.thingsboard.server.common.transport.SessionMsgListener;
  25 +import org.thingsboard.server.common.transport.TransportService;
  26 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  27 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  28 +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  29 +import org.thingsboard.server.gen.transport.TransportProtos;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  31 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  32 +
  33 +import java.util.UUID;
  34 +
  35 +/**
  36 + * Created by ashvayka on 19.01.17.
  37 + */
  38 +@Slf4j
  39 +public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext implements SessionMsgListener {
  40 +
  41 + private final TcpGatewaySessionHandler parent;
  42 + private final TransportService transportService;
  43 +
  44 + public TcpGatewayDeviceSessionCtx(TcpTransportContext context, TcpGatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
  45 + DeviceProfile deviceProfile, TransportService transportService) {
  46 + super(UUID.randomUUID(),context);
  47 + this.parent = parent;
  48 + setSessionInfo(SessionInfoProto.newBuilder()
  49 + .setNodeId(parent.getNodeId())
  50 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  51 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  52 + .setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
  53 + .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
  54 + .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
  55 + .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
  56 + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
  57 + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
  58 + .setDeviceName(deviceInfo.getDeviceName())
  59 + .setDeviceType(deviceInfo.getDeviceType())
  60 + .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
  61 + .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
  62 + .setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
  63 + .setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
  64 + .build());
  65 + setDeviceInfo(deviceInfo);
  66 + setConnected(true);
  67 + setDeviceProfile(deviceProfile);
  68 + this.transportService = transportService;
  69 + }
  70 +
  71 + @Override
  72 + public UUID getSessionId() {
  73 + return sessionId;
  74 + }
  75 +
  76 + @Override
  77 + public int nextMsgId() {
  78 + return parent.nextMsgId();
  79 + }
  80 +
  81 + @Override
  82 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  83 + try {
  84 + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::pushDeviceMsg);
  85 + } catch (Exception e) {
  86 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  87 + }
  88 + }
  89 +
  90 + @Override
  91 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  92 + log.trace("[{}] Received attributes update notification to device", sessionId);
  93 + try {
  94 + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::pushDeviceMsg);
  95 + } catch (Exception e) {
  96 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  97 + }
  98 + }
  99 +
  100 + @Override
  101 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
  102 + log.error("【{}】下发RPC命令【{}】给网关子设备", sessionId,request.getParams());
  103 + try {
  104 + parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
  105 + payload -> {
  106 + ChannelFuture channelFuture = parent.pushDeviceMsg(payload);
  107 + if (request.getPersisted()) {
  108 + channelFuture.addListener(result -> {
  109 + if (result.cause() == null) {
  110 + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  111 + }
  112 + });
  113 + }
  114 + }
  115 + );
  116 + } catch (Exception e) {
  117 + transportService.process(getSessionInfo(),
  118 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  119 + .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
  120 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  121 + }
  122 + }
  123 +
  124 + @Override
  125 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  126 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  127 + parent.deregisterSession(getDeviceInfo().getDeviceName());
  128 + }
  129 +
  130 + @Override
  131 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
  132 + // This feature is not supported in the TB IoT Gateway yet.
  133 + }
  134 +
  135 + @Override
  136 + public void onDeviceDeleted(DeviceId deviceId) {
  137 + parent.onDeviceDeleted(this.getSessionInfo().getDeviceName());
  138 + }
  139 +
  140 + private boolean isAckExpected(MqttMessage message) {
  141 + return message.fixedHeader().qosLevel().value() > 0;
  142 + }
  143 +
  144 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +
  19 +import com.google.common.util.concurrent.FutureCallback;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import com.google.common.util.concurrent.SettableFuture;
  23 +import com.google.gson.*;
  24 +import com.google.protobuf.InvalidProtocolBufferException;
  25 +import com.google.protobuf.ProtocolStringList;
  26 +import io.netty.buffer.ByteBuf;
  27 +import io.netty.buffer.Unpooled;
  28 +import io.netty.channel.ChannelFuture;
  29 +import io.netty.channel.ChannelHandlerContext;
  30 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  31 +import lombok.extern.slf4j.Slf4j;
  32 +import org.springframework.util.CollectionUtils;
  33 +import org.springframework.util.ConcurrentReferenceHashMap;
  34 +import org.springframework.util.StringUtils;
  35 +import org.thingsboard.server.common.data.id.DeviceId;
  36 +import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
  37 +import org.thingsboard.server.common.transport.TransportService;
  38 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  39 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  40 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  41 +import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
  42 +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
  43 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  44 +import org.thingsboard.server.gen.transport.TransportApiProtos;
  45 +import org.thingsboard.server.gen.transport.TransportProtos;
  46 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  47 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  48 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  49 +import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
  50 +import org.thingsboard.server.transport.tcp.util.ByteUtils;
  51 +
  52 +import javax.annotation.Nullable;
  53 +import java.io.UnsupportedEncodingException;
  54 +import java.util.*;
  55 +import java.util.concurrent.ConcurrentHashMap;
  56 +import java.util.concurrent.ConcurrentMap;
  57 +import java.util.concurrent.locks.Lock;
  58 +import java.util.concurrent.locks.ReentrantLock;
  59 +
  60 +import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType;
  61 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.*;
  62 +
  63 +/**
  64 + * Created by ashvayka on 19.01.17.
  65 + */
  66 +@Slf4j
  67 +public class TcpGatewaySessionHandler {
  68 +
  69 + private static final String DEFAULT_DEVICE_TYPE = "default";
  70 + private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
  71 + private static final String DEVICE_PROPERTY = "device";
  72 +
  73 + private final TcpTransportContext context;
  74 + private final TransportService transportService;
  75 + private final TransportDeviceInfo gateway;
  76 + private final UUID sessionId;
  77 + private final ConcurrentMap<String, Lock> deviceCreationLockMap;
  78 + private final ConcurrentMap<String, TcpGatewayDeviceSessionCtx> devices;
  79 + private final ConcurrentMap<String, ListenableFuture<TcpGatewayDeviceSessionCtx>> deviceFutures;
  80 + private final ChannelHandlerContext channel;
  81 + private final TcpDeviceSessionCtx deviceSessionCtx;
  82 +
  83 + public TcpGatewaySessionHandler(TcpDeviceSessionCtx deviceSessionCtx, UUID sessionId) {
  84 + this.context = deviceSessionCtx.getContext();
  85 + this.transportService = context.getTransportService();
  86 + this.deviceSessionCtx = deviceSessionCtx;
  87 + this.gateway = deviceSessionCtx.getDeviceInfo();
  88 + this.sessionId = sessionId;
  89 + this.devices = new ConcurrentHashMap<>();
  90 + this.deviceFutures = new ConcurrentHashMap<>();
  91 + this.deviceCreationLockMap = createWeakMap();
  92 + this.channel = deviceSessionCtx.getChannel();
  93 + }
  94 +
  95 + ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
  96 + return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK);
  97 + }
  98 +
  99 +
  100 +
  101 +
  102 +
  103 + public void onDeviceTelemetry(TCPMessage tcpMessage) throws AdaptorException {
  104 + Futures.addCallback(checkDeviceConnected(tcpMessage.getDeviceCode()),
  105 + new FutureCallback<>() {
  106 + @Override
  107 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  108 + String deviceName = deviceCtx.getDeviceInfo().getDeviceName();
  109 + try {
  110 + TransportProtos.PostTelemetryMsg postTelemetryMsg = deviceCtx.getPayloadAdaptor().convertToPostTelemetry(deviceCtx,tcpMessage.getMessage());
  111 + processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, tcpMessage.getRequestId());
  112 + } catch (Throwable e) {
  113 + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, tcpMessage.getMessage(), e);
  114 + channel.close();
  115 + }
  116 + }
  117 +
  118 + @Override
  119 + public void onFailure(Throwable t) {
  120 + log.debug("[{}] Failed to process device telemetry command: {}", sessionId, tcpMessage.getDeviceCode(), t);
  121 + }
  122 + }, context.getExecutor());
  123 + }
  124 +
  125 +
  126 +
  127 +
  128 +
  129 +
  130 +
  131 + public void onGatewayDisconnect() {
  132 + devices.forEach(this::deregisterSession);
  133 + }
  134 +
  135 + public void onDeviceDeleted(String deviceName) {
  136 + deregisterSession(deviceName);
  137 + }
  138 +
  139 + public String getNodeId() {
  140 + return context.getNodeId();
  141 + }
  142 +
  143 + public UUID getSessionId() {
  144 + return sessionId;
  145 + }
  146 +
  147 + public TcpTransportAdaptor getPayloadAdaptor() {
  148 + return deviceSessionCtx.getPayloadAdaptor();
  149 + }
  150 +
  151 + void deregisterSession(String deviceName) {
  152 + TcpGatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
  153 + if (deviceSessionCtx != null) {
  154 + deregisterSession(deviceName, deviceSessionCtx);
  155 + } else {
  156 + log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
  157 + }
  158 + }
  159 +
  160 +
  161 +
  162 + int nextMsgId() {
  163 + return deviceSessionCtx.nextMsgId();
  164 + }
  165 +
  166 + private boolean isJsonPayloadType() {
  167 + return true;//deviceSessionCtx.isJsonPayloadType();
  168 + }
  169 +
  170 + private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
  171 + log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
  172 + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  173 + @Override
  174 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx result) {
  175 + ack(msg);
  176 + log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
  177 + }
  178 +
  179 + @Override
  180 + public void onFailure(Throwable t) {
  181 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
  182 +
  183 + }
  184 + }, context.getExecutor());
  185 + }
  186 +
  187 + private ListenableFuture<TcpGatewayDeviceSessionCtx> onDeviceConnect(String deviceCode, String deviceType) {
  188 + TcpGatewayDeviceSessionCtx result = devices.get(deviceCode);
  189 + if (result == null) {
  190 + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceCode, s -> new ReentrantLock());
  191 + deviceCreationLock.lock();
  192 + try {
  193 + result = devices.get(deviceCode);
  194 + if (result == null) {
  195 + return getDeviceCreationFuture(deviceCode, deviceType);
  196 + } else {
  197 + return Futures.immediateFuture(result);
  198 + }
  199 + } finally {
  200 + deviceCreationLock.unlock();
  201 + }
  202 + } else {
  203 + return Futures.immediateFuture(result);
  204 + }
  205 + }
  206 +
  207 + private ListenableFuture<TcpGatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
  208 + final SettableFuture<TcpGatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
  209 + ListenableFuture<TcpGatewayDeviceSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
  210 + if (future != null) {
  211 + return future;
  212 + }
  213 + try {
  214 + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
  215 + .setDeviceName(deviceName)
  216 + .setDeviceType(deviceType)
  217 + .setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
  218 + .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()).build(),
  219 + new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
  220 + @Override
  221 + public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
  222 + TcpGatewayDeviceSessionCtx deviceSessionCtx = new TcpGatewayDeviceSessionCtx(context,TcpGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), transportService);
  223 + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  224 + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
  225 + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
  226 + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
  227 + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
  228 + .setSessionInfo(deviceSessionInfo)
  229 + .setSessionEvent(SESSION_EVENT_MSG_OPEN)
  230 + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
  231 + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
  232 + .build(), null);
  233 + }
  234 + futureToSet.set(devices.get(deviceName));
  235 + deviceFutures.remove(deviceName);
  236 + }
  237 +
  238 + @Override
  239 + public void onError(Throwable e) {
  240 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
  241 + futureToSet.setException(e);
  242 + deviceFutures.remove(deviceName);
  243 + }
  244 + });
  245 + return futureToSet;
  246 + } catch (Throwable e) {
  247 + deviceFutures.remove(deviceName);
  248 + throw e;
  249 + }
  250 + }
  251 +
  252 + private int getMsgId(MqttPublishMessage mqttMsg) {
  253 + return mqttMsg.variableHeader().packetId();
  254 + }
  255 +
  256 + public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
  257 + JsonElement json = getJson(mqttMsg);
  258 + String deviceName = checkDeviceName(getDeviceName(json));
  259 + String deviceType = getDeviceType(json);
  260 + processOnConnect(mqttMsg, deviceName, deviceType);
  261 + }
  262 +
  263 +
  264 +
  265 + public void onDeviceDisconnect(MqttPublishMessage mqttMsg) throws AdaptorException {
  266 + String deviceName = checkDeviceName(getDeviceName(getJson(mqttMsg)));
  267 + processOnDisconnect(mqttMsg, deviceName);
  268 + }
  269 +
  270 +
  271 +
  272 + private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
  273 + deregisterSession(deviceName);
  274 + ack(msg);
  275 + }
  276 +
  277 +
  278 +
  279 +
  280 + private void processPostTelemetryMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, String msgId) {
  281 + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
  282 + }
  283 + public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException {
  284 + int msgId = getMsgId(mqttMsg);
  285 + ByteBuf payload = mqttMsg.payload();
  286 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  287 + if (json.isJsonObject()) {
  288 + JsonObject jsonObj = json.getAsJsonObject();
  289 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  290 + String deviceName = deviceEntry.getKey();
  291 + Futures.addCallback(checkDeviceConnected(deviceName),
  292 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  293 + @Override
  294 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  295 + if (!deviceEntry.getValue().isJsonObject()) {
  296 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  297 + }
  298 + try {
  299 + DeviceId deviceId = deviceCtx.getDeviceId();
  300 + TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
  301 + processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
  302 + } catch (Throwable e) {
  303 + log.warn("[{}][{}] Failed to convert claim message: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
  304 + }
  305 + }
  306 +
  307 + @Override
  308 + public void onFailure(Throwable t) {
  309 + log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t);
  310 + }
  311 + }, context.getExecutor());
  312 + }
  313 + } else {
  314 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  315 + }
  316 + }
  317 +
  318 +
  319 + private void processClaimDeviceMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
  320 + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId+"", claimDeviceMsg));
  321 + }
  322 +
  323 + public void onDeviceAttributes(TCPMessage mqttMsg) throws AdaptorException {
  324 + int msgId = 0;//getMsgId(mqttMsg);
  325 + ByteBuf payload = null;//mqttMsg.payload();
  326 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  327 + if (json.isJsonObject()) {
  328 + JsonObject jsonObj = json.getAsJsonObject();
  329 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  330 + String deviceName = deviceEntry.getKey();
  331 + Futures.addCallback(checkDeviceConnected(deviceName),
  332 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  333 + @Override
  334 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  335 + if (!deviceEntry.getValue().isJsonObject()) {
  336 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  337 + }
  338 + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject());
  339 + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
  340 + }
  341 +
  342 + @Override
  343 + public void onFailure(Throwable t) {
  344 + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
  345 + }
  346 + }, context.getExecutor());
  347 + }
  348 + } else {
  349 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  350 + }
  351 + }
  352 +
  353 +
  354 + private void processPostAttributesMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
  355 + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId+"", postAttributeMsg));
  356 + }
  357 +
  358 + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException {
  359 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, msg.payload());
  360 + if (json.isJsonObject()) {
  361 + JsonObject jsonObj = json.getAsJsonObject();
  362 + int requestId = jsonObj.get("id").getAsInt();
  363 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  364 + boolean clientScope = jsonObj.get("client").getAsBoolean();
  365 + Set<String> keys;
  366 + if (jsonObj.has("key")) {
  367 + keys = Collections.singleton(jsonObj.get("key").getAsString());
  368 + } else {
  369 + JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
  370 + keys = new HashSet<>();
  371 + for (JsonElement keyObj : keysArray) {
  372 + keys.add(keyObj.getAsString());
  373 + }
  374 + }
  375 + TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys);
  376 + processGetAttributeRequestMessage(mqttMsg, deviceName, requestMsg);
  377 + } else {
  378 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  379 + }
  380 + }
  381 +
  382 +
  383 + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
  384 + int msgId = getMsgId(mqttMsg);
  385 + ByteBuf payload = mqttMsg.payload();
  386 + JsonElement json = null;// JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  387 + if (json.isJsonObject()) {
  388 + JsonObject jsonObj = json.getAsJsonObject();
  389 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  390 + Futures.addCallback(checkDeviceConnected(deviceName),
  391 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  392 + @Override
  393 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  394 + Integer requestId = jsonObj.get("id").getAsInt();
  395 + String data = jsonObj.get("data").toString();
  396 + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  397 + .setRequestId(requestId).setPayload(data).build();
  398 + processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId);
  399 + }
  400 +
  401 + @Override
  402 + public void onFailure(Throwable t) {
  403 + log.debug("[{}] Failed to process device Rpc response command: {}", sessionId, deviceName, t);
  404 + }
  405 + }, context.getExecutor());
  406 + } else {
  407 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  408 + }
  409 + }
  410 +
  411 +
  412 + private void processRpcResponseMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
  413 + transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId+"", rpcResponseMsg));
  414 + }
  415 +
  416 + private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
  417 + int msgId = getMsgId(mqttMsg);
  418 + Futures.addCallback(checkDeviceConnected(deviceName),
  419 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  420 + @Override
  421 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  422 + transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId+"", requestMsg));
  423 + }
  424 +
  425 + @Override
  426 + public void onFailure(Throwable t) {
  427 + ack(mqttMsg);
  428 + log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t);
  429 + }
  430 + }, context.getExecutor());
  431 + }
  432 +
  433 + private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) {
  434 + TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
  435 + result.setRequestId(requestId);
  436 +
  437 + if (clientScope) {
  438 + result.addAllClientAttributeNames(keys);
  439 + } else {
  440 + result.addAllSharedAttributeNames(keys);
  441 + }
  442 + return result.build();
  443 + }
  444 +
  445 + private ListenableFuture<TcpGatewayDeviceSessionCtx> checkDeviceConnected(String deviceCode) {
  446 + TcpGatewayDeviceSessionCtx ctx = devices.get(deviceCode);
  447 + if (ctx == null) {
  448 + log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceCode);
  449 + return onDeviceConnect(deviceCode, DEFAULT_DEVICE_TYPE);
  450 + } else {
  451 + return Futures.immediateFuture(ctx);
  452 + }
  453 + }
  454 +
  455 + private String checkDeviceName(String deviceName) {
  456 + if (StringUtils.isEmpty(deviceName)) {
  457 + throw new RuntimeException("Device name is empty!");
  458 + } else {
  459 + return deviceName;
  460 + }
  461 + }
  462 +
  463 + private String getDeviceName(JsonElement json) {
  464 + return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
  465 + }
  466 +
  467 + private String getDeviceType(JsonElement json) {
  468 + JsonElement type = json.getAsJsonObject().get("type");
  469 + return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
  470 + }
  471 +
  472 + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
  473 + return null;//JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  474 + }
  475 +
  476 +
  477 +
  478 +
  479 + private void deregisterSession(String deviceName, TcpGatewayDeviceSessionCtx deviceSessionCtx) {
  480 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  481 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  482 + log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
  483 + }
  484 +
  485 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final String msgId, final T msg) {
  486 + return new TransportServiceCallback<Void>() {
  487 + @Override
  488 + public void onSuccess(Void dummy) {
  489 + log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg);
  490 + if(!StringUtils.isEmpty(msgId)){
  491 + pushDeviceMsg(new TCPMessage(msgId));
  492 + }
  493 + }
  494 +
  495 + @Override
  496 + public void onError(Throwable e) {
  497 + log.trace("[{}] Failed to publish msg: {} for device: {}", sessionId, msg, deviceName, e);
  498 + ctx.close();
  499 + }
  500 + };
  501 + }
  502 + /**
  503 + * 往设备推送消息
  504 + * @param tcp
  505 + * @return
  506 + */
  507 + ChannelFuture pushDeviceMsg(TCPMessage tcp) {
  508 + try {
  509 + String message = tcp.getMessage();
  510 + byte[] payloadInBytes ;
  511 + if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
  512 + payloadInBytes = ByteUtils.hexStr2Bytes(message);
  513 + }else{
  514 + payloadInBytes = message.getBytes(ByteUtils.UTF_8);
  515 + }
  516 +// ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
  517 +// ByteBuf payload = ALLOCATOR.buffer();
  518 +// payload.writeBytes(payloadInBytes);
  519 + ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);
  520 +
  521 + return channel.writeAndFlush(payload);
  522 + } catch (UnsupportedEncodingException e) {
  523 + log.error(e.getMessage(),e);
  524 + throw new RuntimeException(e);
  525 + }
  526 + }
  527 + private void ack(MqttPublishMessage msg) {
  528 + int msgId = getMsgId(msg);
  529 + if (msgId > 0) {
  530 + channel.writeAndFlush(msg);
  531 + }
  532 + }
  533 +}
  534 +
... ...
... ... @@ -215,6 +215,12 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
215 215 return doCreateDefaultDeviceProfile(tenantId, "default", true);
216 216 }
217 217
  218 + @Override
  219 + public DeviceProfile createDeviceProfile(TenantId tenantId, String name) {
  220 + log.trace("Executing createDeviceProfile tenantId [{}]", tenantId);
  221 + return doCreateDefaultDeviceProfile(tenantId, name, false);
  222 + }
  223 +
218 224 private DeviceProfile doCreateDefaultDeviceProfile(TenantId tenantId, String profileName, boolean defaultProfile) {
219 225 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
220 226 DeviceProfile deviceProfile = new DeviceProfile();
... ...
... ... @@ -140,8 +140,16 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
140 140
141 141 //Thingskit function
142 142 DeviceProfile deviceProfile = deviceProfileService.createDefaultDeviceProfile(savedTenant.getId());
143   - DeviceProfileDTO profileDTO = new DeviceProfileDTO(deviceProfile.getName(),deviceProfile.getTenantId(),deviceProfile.getId(), DeviceTypeEnum.DIRECT_CONNECTION);
  143 + DeviceProfileDTO profileDTO = new DeviceProfileDTO(deviceProfile.getName(),deviceProfile.getTenantId(),deviceProfile.getId(), DeviceTypeEnum.SENSOR);
144 144 ytDeviceProfileService.insertOrUpdate(profileDTO);
  145 + DeviceProfile gatewayProfile = deviceProfileService.createDeviceProfile(savedTenant.getId(),"默认MQTT网关设备");
  146 + DeviceProfileDTO gatewayDTO = new DeviceProfileDTO(gatewayProfile.getName(),gatewayProfile.getTenantId(),gatewayProfile.getId(), DeviceTypeEnum.GATEWAY);
  147 + ytDeviceProfileService.insertOrUpdate(gatewayDTO);
  148 + DeviceProfile directProfile = deviceProfileService.createDeviceProfile(savedTenant.getId(),"默认MQTT直连设备");
  149 + DeviceProfileDTO directDTO = new DeviceProfileDTO(directProfile.getName(),directProfile.getTenantId(),directProfile.getId(), DeviceTypeEnum.DIRECT_CONNECTION);
  150 + ytDeviceProfileService.insertOrUpdate(directDTO);
  151 +
  152 +
145 153 apiUsageStateService.createDefaultApiUsageState(savedTenant.getId(), null);
146 154
147 155 }
... ...
... ... @@ -23,6 +23,7 @@ public class YtDevice extends TenantBaseEntity {
23 23 private String gatewayId;
24 24 private String brand;
25 25 private String label;
  26 + private String code;
26 27 @TableField(typeHandler = EnumTypeHandler.class)
27 28 private DeviceTypeEnum deviceType;
28 29 private String sn;
... ...
... ... @@ -169,15 +169,7 @@ public class YtDeviceProfileServiceImpl
169 169
170 170 @Override
171 171 public List<DeviceProfileDTO> findDeviceProfile(String tenantId, String scriptId) {
172   - LambdaQueryWrapper<YtDeviceProfileEntity> queryWrapper =
173   - new QueryWrapper<YtDeviceProfileEntity>()
174   - .lambda()
175   - .eq(YtDeviceProfileEntity::getTenantId, tenantId)
176   - .eq(StringUtils.isNotEmpty(scriptId), YtDeviceProfileEntity::getScriptId, scriptId);
177   - List<DeviceProfileDTO> results =
178   - baseMapper.selectList(queryWrapper).stream()
179   - .map(item -> item.getDTO(DeviceProfileDTO.class))
180   - .collect(Collectors.toList());
  172 + List<DeviceProfileDTO> results = baseMapper.profileByScriptId(tenantId,scriptId);
181 173 return results;
182 174 }
183 175 }
... ...
... ... @@ -78,7 +78,13 @@ public class YtDeviceScriptServiceImpl extends AbstractBaseService<YtDeviceScrip
78 78
79 79 @Override
80 80 public String getScriptText(String tenantId, String scriptId) {
81   - return null;
  81 + LambdaQueryWrapper<YtDeviceScriptEntity> queryWrapper =
  82 + new QueryWrapper<YtDeviceScriptEntity>()
  83 + .lambda()
  84 + .eq(YtDeviceScriptEntity::getTenantId, tenantId)
  85 + .eq(YtDeviceScriptEntity::getId, scriptId);
  86 + YtDeviceScriptEntity result = baseMapper.selectOne(queryWrapper);
  87 + return result ==null? null : result.getConvertJs();
82 88 }
83 89
84 90 @Override
... ...
... ... @@ -262,12 +262,7 @@ public class YtDeviceServiceImpl extends AbstractBaseService<DeviceMapper, YtDev
262 262 if (StringUtils.isEmpty(tenantId) || StringUtils.isEmpty(deviceId)) {
263 263 throw new YtDataValidationException(ErrorMessage.INVALID_PARAMETER.getMessage());
264 264 }
265   - return baseMapper
266   - .selectOne(
267   - new LambdaQueryWrapper<YtDevice>()
268   - .eq(YtDevice::getTenantId, tenantId)
269   - .eq(YtDevice::getId, deviceId))
270   - .getDTO(DeviceDTO.class);
  265 + return baseMapper.selectDetail(tenantId,deviceId);
271 266 }
272 267
273 268 @Override
... ... @@ -490,6 +485,12 @@ public class YtDeviceServiceImpl extends AbstractBaseService<DeviceMapper, YtDev
490 485 return result;
491 486 }
492 487
  488 +
  489 + @Override
  490 + public DeviceDTO findSlaveDevice(String tenantId, String masterId, String deviceCode) {
  491 + return baseMapper.slaveDevice(tenantId,masterId,deviceCode);
  492 + }
  493 +
493 494 @Override
494 495 public List<String> findDeviceKeys(
495 496 String tenantId, String customerId, String organizationId, List<String> deviceIds) {
... ...
... ... @@ -108,7 +108,14 @@ public interface DeviceMapper extends BaseMapper<YtDevice> {
108 108 List<SelectItemDTO> slaveDevices(@Param("customerId") String customerId, @Param("tenantId") String tenantId
109 109 , @Param("organizationIds") List<String> organizationIds, @Param("masterId") String masterId);
110 110
111   -
  111 + /**
  112 + * TCP协议传输时,获取网关设备的从设备
  113 + * @param tenantId 租户ID
  114 + * @param masterId 网关设备的TB_ID
  115 + * @param code 网关子设备的标识符,例如:485协议的从设备地址码
  116 + * @return
  117 + */
  118 + DeviceDTO slaveDevice(@Param("tenantId") String tenantId, @Param("masterId") String masterId, @Param("code") String code);
112 119 /**
113 120 * 设备遥测数据指标名称
114 121 *
... ...
... ... @@ -22,4 +22,6 @@ public interface YtDeviceProfileMapper extends BaseMapper<YtDeviceProfileEntity>
22 22
23 23 IPage<DeviceProfileDTO> getProfilePage(IPage<?> page, @Param("tenantId") String tenantId, @Param("profileName") String profileName, @Param("transportType") String transportType);
24 24
  25 +
  26 + List<DeviceProfileDTO> profileByScriptId( @Param("tenantId") String tenantId, @Param("scriptId") String scriptId);
25 27 }
... ...
... ... @@ -122,6 +122,16 @@ public interface YtDeviceService extends BaseService<YtDevice> {
122 122 */
123 123 List<SelectItemDTO> findSlaveDevices(String masterId,String tenantId,String customerId, String organizationId);
124 124
  125 +
  126 + /**
  127 + * TCP协议传输时,获取网关设备的从设备
  128 + * @param tenantId 租户ID
  129 + * @param masterId 网关设备的TB_ID
  130 + * @param deviceCode 网关子设备的标识符,例如:485协议的从设备地址码
  131 + * @return
  132 + */
  133 + DeviceDTO findSlaveDevice(String tenantId,String masterId,String deviceCode);
  134 +
125 135 /**
126 136 * 设备遥测数据指标名称
127 137 * @param tenantId 租户ID
... ...
... ... @@ -17,6 +17,7 @@
17 17 <result property="deviceType" column="device_type" typeHandler="org.apache.ibatis.type.EnumTypeHandler"/>
18 18 <result property="brand" column="brand" />
19 19 <result property="sn" column="sn"/>
  20 + <result property="code" column="code"/>
20 21 <result property="tenantId" column="tenant_id"/>
21 22 <result property="tbDeviceId" column="tb_device_id"/>
22 23 <result property="label" column="label"/>
... ... @@ -60,9 +61,7 @@
60 61 </resultMap>
61 62
62 63 <sql id="basicColumns">
63   - ifd
64   - .
65   - id
  64 + ifd.id
66 65 ,ifd.sn,ifd.brand,ifd.name,ifd.device_info,ifd.profile_id,ifd.active_time,ifd.tenant_id,ifd.description
67 66 ,ifd.tb_device_id,ifd.label,ifd.last_connect_time,ifd.device_type,ifd.device_state,ifd.create_time,ifd.update_time,ifd.creator,
68 67 ifd.updater,ifd.organization_id,ifd.alarm_status
... ... @@ -377,6 +376,17 @@
377 376 </where>
378 377 </select>
379 378
  379 + <select id="slaveDevice" resultMap="deviceMap">
  380 + SELECT
  381 + <include refid="basicColumns"/>
  382 + FROM iotfs_device ifd
  383 + <where>
  384 + ifd.tenant_id = #{tenantId}
  385 + AND ifd.gateway_id = #{masterId}
  386 + AND ifd.code = #{code}
  387 + </where>
  388 + </select>
  389 +
380 390 <select id="findDeviceKeys" resultType="string">
381 391 SELECT
382 392 DISTINCT base.key as keyName
... ...
... ... @@ -59,7 +59,19 @@
59 59 AND iot.tenant_id = #{tenantId}
60 60 </if>
61 61 <if test="id !=null and id !=''">
62   - AND iot.id = #{id}
  62 + AND (iot.id = #{id} OR iot.tb_profile_id = #{id})
  63 + </if>
  64 + </where>
  65 + </select>
  66 + <select id="profileByScriptId" resultMap="detail">
  67 + SELECT
  68 + <include refid="basicColumns"/>
  69 + FROM device_profile base
  70 + LEFT JOIN iotfs_device_profile iot ON iot.tb_profile_id = base.id::TEXT
  71 + <where>
  72 + iot.tenant_id = #{tenantId}
  73 + <if test="scriptId !=null and scriptId !=''">
  74 + AND iot.script_id = #{scriptId}
63 75 </if>
64 76 </where>
65 77 </select>
... ...