Commit a4869b2b05d6f45f73c317d7a23aacf7954ac4c5

Authored by xp.Huang
2 parents 4976b45f 59e99111

Merge branch '20230705' into 'master_dev'

20230705

See merge request yunteng/thingskit!203
1 1 /**
2 2 * Copyright © 2016-2022 The Thingsboard Authors
3 3 *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
  4 + * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  5 + * except in compliance with the License. You may obtain a copy of the License at
7 6 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  7 + * <p>http://www.apache.org/licenses/LICENSE-2.0
9 8 *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
  9 + * <p>Unless required by applicable law or agreed to in writing, software distributed under the
  10 + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  11 + * express or implied. See the License for the specific language governing permissions and
14 12 * limitations under the License.
15 13 */
16 14 package org.thingsboard.server.transport.udp;
17 15
18   -
19 16 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
20 17 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
21 18
... ... @@ -25,9 +22,11 @@ import com.google.common.util.concurrent.Futures;
25 22 import com.google.common.util.concurrent.ListenableFuture;
26 23 import com.google.common.util.concurrent.MoreExecutors;
27 24 import io.netty.buffer.ByteBuf;
  25 +import io.netty.buffer.ByteBufUtil;
28 26 import io.netty.buffer.Unpooled;
29 27 import io.netty.channel.ChannelFuture;
30 28 import io.netty.channel.ChannelHandlerContext;
  29 +import io.netty.channel.socket.DatagramPacket;
31 30 import io.netty.handler.codec.mqtt.*;
32 31 import io.netty.util.concurrent.Future;
33 32 import io.netty.util.concurrent.GenericFutureListener;
... ... @@ -73,411 +72,472 @@ import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
73 72 * @author Andrew Shvayka
74 73 */
75 74 @Slf4j
76   - public class UdpDatagramDataHandler implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
77   -
78   - private final UUID sessionId;
79   - private final TcpTransportContext context;
80   - private final TransportService transportService;
81   -
82   -
83   - /**
84   - * 需要处理的消息队列,例如:需要下发给设备的,设备上传的。
85   - */
86   - final TcpDeviceSessionCtx deviceSessionCtx;
87   - volatile InetSocketAddress address;
88   -
89   - volatile TcpGatewaySessionHandler gatewaySessionHandler;
90   - private final AtomicInteger authedCounter = new AtomicInteger();
91   -
92   -
93   - UdpDatagramDataHandler(ChannelHandlerContext ctx,TcpTransportContext context, InetSocketAddress devAddress) {
94   - super();
95   - this.sessionId = UUID.randomUUID();
96   - this.context = context;
97   - this.address = devAddress;
98   - this.transportService = context.getTransportService();
99   - this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  75 +public class UdpDatagramDataHandler
  76 + implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
  77 +
  78 + private final UUID sessionId;
  79 + private final TcpTransportContext context;
  80 + private final TransportService transportService;
  81 +
  82 + /** 需要处理的消息队列,例如:需要下发给设备的,设备上传的。 */
  83 + final TcpDeviceSessionCtx deviceSessionCtx;
  84 +
  85 + volatile InetSocketAddress address;
  86 +
  87 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
  88 + private final AtomicInteger authedCounter = new AtomicInteger();
  89 +
  90 + UdpDatagramDataHandler(
  91 + ChannelHandlerContext ctx, TcpTransportContext context, InetSocketAddress devAddress) {
  92 + super();
  93 + this.sessionId = UUID.randomUUID();
  94 + this.context = context;
  95 + this.address = devAddress;
  96 + this.transportService = context.getTransportService();
  97 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
  98 + }
  99 +
  100 + public void dealDeviceMsg(ChannelHandlerContext ctx, String msg) {
  101 + deviceSessionCtx.setChannel(ctx);
  102 + if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
  103 + processConnect(ctx, msg);
  104 + return;
100 105 }
101   -
102   -
103   -
104   -
105   -
106   -
107   - public void dealDeviceMsg(ChannelHandlerContext ctx, String msg) {
108   - deviceSessionCtx.setChannel(ctx);
109   - if (deviceSessionCtx.getDeviceInfo() == null || deviceSessionCtx.getDeviceProfile() == null) {
110   - processConnect(ctx, msg);
111   - return;
112   - }
113   - final int queueSize = deviceSessionCtx.getMsgQueueSize();
114   - if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
115   - log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
116   - deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
117   -// ctx.close();
118   - return;
119   - }
120   - deviceSessionCtx.addToQueue(msg);
121   - processQueueMessage(ctx); //Under the normal conditions the msg queue will contain 0 messages. Many messages will be processed on device connect event in separate thread pool
122   -
  106 + final int queueSize = deviceSessionCtx.getMsgQueueSize();
  107 + if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
  108 + log.info(
  109 + "Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
  110 + deviceSessionCtx.getDeviceId(),
  111 + context.getMessageQueueSizePerDeviceLimit(),
  112 + queueSize,
  113 + deviceSessionCtx.getMsgQueueSize());
  114 + // ctx.close();
  115 + return;
123 116 }
124   -
125   -
126   - void processQueueMessage(ChannelHandlerContext ctx) {
127   - if (!deviceSessionCtx.isConnected()) {
128   - log.trace("[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}", sessionId, deviceSessionCtx.getDeviceId(), deviceSessionCtx.getMsgQueueSize());
129   - return;
130   - }
131   - deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  117 + deviceSessionCtx.addToQueue(msg);
  118 + processQueueMessage(
  119 + ctx); // Under the normal conditions the msg queue will contain 0 messages. Many messages
  120 + // will be processed on device connect event in separate thread pool
  121 + }
  122 +
  123 + void processQueueMessage(ChannelHandlerContext ctx) {
  124 + if (!deviceSessionCtx.isConnected()) {
  125 + log.trace(
  126 + "[{}][{}] Postpone processing msg due to device is not connected. Msg queue size is {}",
  127 + sessionId,
  128 + deviceSessionCtx.getDeviceId(),
  129 + deviceSessionCtx.getMsgQueueSize());
  130 + return;
132 131 }
  132 + deviceSessionCtx.tryProcessQueuedMsgs(msg -> processDeviceSessionMsg(ctx, msg));
  133 + }
133 134
134   -
135   - private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
136   - if (!checkConnected(ctx, tcpMessage)) {
137   - return;
138   - }
139   - deviceSessionCtx.doUpScript(tcpMessage, r -> {
140   - if (gatewaySessionHandler != null) {
141   - processGatewayDeviceMsg(ctx, r);
142   - }
143   - processDirectDeviceMsg(ctx, r);
  135 + private void processDeviceSessionMsg(ChannelHandlerContext ctx, String tcpMessage) {
  136 + if (!checkConnected(ctx, tcpMessage)) {
  137 + return;
  138 + }
  139 + deviceSessionCtx.doUpScript(
  140 + tcpMessage,
  141 + r -> {
  142 + if (gatewaySessionHandler != null) {
  143 + processGatewayDeviceMsg(ctx, r);
  144 + }
  145 + processDirectDeviceMsg(ctx, r);
144 146 });
145   -
  147 + }
  148 +
  149 + /**
  150 + * 上行脚本解析结果是否包含数据
  151 + *
  152 + * @param datas 数据集合
  153 + * @return
  154 + */
  155 + private boolean hasDatas(Map<String, Object> datas) {
  156 + if (datas == null || datas.isEmpty()) {
  157 + return false;
146 158 }
147   -
148   -
149   - /**
150   - * 上行脚本解析结果是否包含数据
151   - * @param datas 数据集合
152   - * @return
153   - */
154   - private boolean hasDatas(Map<String, Object> datas) {
155   - if (datas == null || datas.isEmpty()) {
156   - return false;
157   - }
158   - return true;
  159 + return true;
  160 + }
  161 +
  162 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  163 + log.trace(
  164 + "[{}][{}] Processing publish msg [{}]!",
  165 + sessionId,
  166 + deviceSessionCtx.getDeviceId(),
  167 + tcpMessage);
  168 + try {
  169 + Map<String, Object> datas = tcpMessage.getDatas();
  170 + if (hasDatas(datas)) {
  171 + datas.forEach(
  172 + (devName, param) -> {
  173 + if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
  174 + return;
  175 + }
  176 + if (tcpMessage.getTelemetry()) {
  177 + gatewaySessionHandler.onDeviceTelemetry(
  178 + devName, tcpMessage.getRequestId(), param.toString());
  179 + } else {
  180 + // gatewaySessionHandler.onDeviceRpcResponse(devName,
  181 + // tcpMessage.getRequestId(), param.toString());
  182 + }
  183 + });
  184 + } else {
  185 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  186 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  187 + }
  188 + } catch (Exception e) {
  189 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  190 + // ctx.close();
159 191 }
160   -
161   -
162   - private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
163   - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
164   - try {
165   - Map<String, Object> datas = tcpMessage.getDatas();
166   - if (hasDatas(datas)) {
167   - datas.forEach((devName, param) -> {
168   - if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
169   - return;
170   - }
171   - if (tcpMessage.getTelemetry()) {
172   - gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
173   - } else {
174   -// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
175   - }
176   -
177   - });
178   - } else {
179   - transportService.reportActivity(deviceSessionCtx.getSessionInfo());
180   - pushDeviceMsg(ctx, tcpMessage.getAckMsg());
181   - }
182   - } catch (Exception e) {
183   - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
184   -// ctx.close();
  192 + }
  193 +
  194 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
  195 + log.trace(
  196 + "[{}][{}] Processing publish msg [{}]!",
  197 + sessionId,
  198 + deviceSessionCtx.getDeviceId(),
  199 + tcpMessage);
  200 + try {
  201 + TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
  202 + Map<String, Object> datas = tcpMessage.getDatas();
  203 + if (hasDatas(datas)) {
  204 + String dataStr = JacksonUtil.toString(datas);
  205 + if (tcpMessage.getTelemetry()) {
  206 + TransportProtos.PostTelemetryMsg postTelemetryMsg =
  207 + payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
  208 + transportService.process(
  209 + deviceSessionCtx.getSessionInfo(),
  210 + postTelemetryMsg,
  211 + getPubAckCallback(ctx, tcpMessage));
  212 + } else {
  213 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg =
  214 + payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  215 + transportService.process(
  216 + deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
185 217 }
  218 + } else {
  219 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  220 + pushDeviceMsg(ctx, tcpMessage.getAckMsg());
  221 + }
  222 + } catch (AdaptorException e) {
  223 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  224 + // ctx.close();
186 225 }
187   -
188   - private void processDirectDeviceMsg(ChannelHandlerContext ctx, TcpUpEntry tcpMessage) {
189   - log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
190   - try {
191   - TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
192   - Map<String, Object> datas = tcpMessage.getDatas();
193   - if (hasDatas(datas)) {
194   - String dataStr = JacksonUtil.toString(datas);
195   - if (tcpMessage.getTelemetry()) {
196   - TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
197   - transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
198   - } else {
199   - TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
200   - transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
201   - }
202   - } else {
203   - transportService.reportActivity(deviceSessionCtx.getSessionInfo());
204   - pushDeviceMsg(ctx, tcpMessage.getAckMsg());
205   - }
206   - } catch (AdaptorException e) {
207   - log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
208   -// ctx.close();
  226 + }
  227 +
  228 + private <T> TransportServiceCallback<Void> getPubAckCallback(
  229 + final ChannelHandlerContext ctx, final TcpUpEntry msg) {
  230 + return new TransportServiceCallback<>() {
  231 + @Override
  232 + public void onSuccess(Void dummy) {
  233 + log.trace("[{}] Published msg: {}", sessionId, msg);
  234 + if (StringUtils.isNotEmpty(msg.getAckMsg())) {
  235 + pushDeviceMsg(ctx, msg.getAckMsg());
209 236 }
  237 + }
  238 +
  239 + @Override
  240 + public void onError(Throwable e) {
  241 + log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
  242 + // ctx.close();
  243 + }
  244 + };
  245 + }
  246 +
  247 + void processConnect(ChannelHandlerContext ctx, String accessToken) {
  248 + log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
  249 +
  250 + if (DataConstants.PROVISION.equals(accessToken)
  251 + || DataConstants.PROVISION.equals(accessToken)) {
  252 + deviceSessionCtx.setProvisionOnly(true);
  253 + pushDeviceMsg(ctx, "CONNECTION_ACCEPTED");
  254 + } else {
  255 + TkScriptInvokeService.authScripts.forEach(
  256 + id -> {
  257 + ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
  258 + Futures.addCallback(
  259 + item,
  260 + new FutureCallback<String>() {
  261 + @Override
  262 + public void onSuccess(@Nullable String result) {
  263 + processAuthTokenConnect(
  264 + ctx,
  265 + id,
  266 + JacksonUtil.fromString(result.replace("\\", "\\\\"), TcpAuthEntry.class));
  267 + }
  268 +
  269 + @Override
  270 + public void onFailure(Throwable t) {
  271 + onValidateFailed(
  272 + ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  273 + }
  274 + },
  275 + MoreExecutors.directExecutor());
  276 + });
210 277 }
211   -
212   -
213   - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final TcpUpEntry msg) {
214   - return new TransportServiceCallback<>() {
  278 + }
  279 +
  280 + private void processAuthTokenConnect(
  281 + ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
  282 +
  283 + log.debug(
  284 + "[{}][{}] Processing connect msg for client with user name: {}!",
  285 + address,
  286 + sessionId,
  287 + accessToken);
  288 + if (null != accessToken.getClientId()) {}
  289 + if (null != accessToken.getUserName()) {}
  290 + String token = accessToken.getPassword();
  291 + if (StringUtils.isNotEmpty(token)) {
  292 + TransportProtos.ValidateDeviceTokenRequestMsg.Builder request =
  293 + TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
  294 + request.setToken(token);
  295 + transportService.process(
  296 + DeviceTransportType.TCP,
  297 + request.build(),
  298 + new TransportServiceCallback<>() {
215 299 @Override
216   - public void onSuccess(Void dummy) {
217   - log.trace("[{}] Published msg: {}", sessionId, msg);
218   - if (StringUtils.isNotEmpty(msg.getAckMsg())) {
219   - pushDeviceMsg(ctx, msg.getAckMsg());
220   - }
  300 + public void onSuccess(ValidateDeviceCredentialsResponse msg) {
  301 + onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
221 302 }
222 303
223 304 @Override
224 305 public void onError(Throwable e) {
225   - log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
226   -// ctx.close();
  306 + log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
  307 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
227 308 }
228   - };
  309 + });
  310 + } else {
  311 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
229 312 }
230   -
231   -
232   - void processConnect(ChannelHandlerContext ctx, String accessToken) {
233   - log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, accessToken);
234   -
235   - if (DataConstants.PROVISION.equals(accessToken) || DataConstants.PROVISION.equals(accessToken)) {
236   - deviceSessionCtx.setProvisionOnly(true);
237   - pushDeviceMsg(ctx,"CONNECTION_ACCEPTED");
238   - } else {
239   - TkScriptInvokeService.authScripts.forEach(id -> {
240   - ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken);
241   - Futures.addCallback(item, new FutureCallback<String>() {
242   - @Override
243   - public void onSuccess(@Nullable String result) {
244   - processAuthTokenConnect(ctx, id, JacksonUtil.fromString(result.replace("\\","\\\\"), TcpAuthEntry.class));
245   - }
246   -
247   - @Override
248   - public void onFailure(Throwable t) {
249   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
250   - }
251   - }, MoreExecutors.directExecutor());
252   -
253   - });
254   - }
  313 + }
  314 +
  315 + private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
  316 + if (deviceSessionCtx.isConnected()) {
  317 + return true;
  318 + } else {
  319 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
  320 + return false;
255 321 }
256   -
257   - private void processAuthTokenConnect(ChannelHandlerContext ctx, UUID scriptId, TcpAuthEntry accessToken) {
258   -
259   - log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, accessToken);
260   - if (null != accessToken.getClientId()) {
261   - }
262   - if (null != accessToken.getUserName()) {
263   - }
264   - String token = accessToken.getPassword();
265   - if(StringUtils.isNotEmpty(token)){
266   - TransportProtos.ValidateDeviceTokenRequestMsg.Builder request = TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder();
267   - request.setToken(token);
268   - transportService.process(DeviceTransportType.TCP, request.build(),
269   - new TransportServiceCallback<>() {
270   - @Override
271   - public void onSuccess(ValidateDeviceCredentialsResponse msg) {
272   - onValidateDeviceResponse(msg, ctx, accessToken, scriptId);
273   - }
274   -
275   - @Override
276   - public void onError(Throwable e) {
277   - log.trace("[{}] Failed to process credentials: {}", address, accessToken, e);
278   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
279   - }
280   - });
281   - }else{
282   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
  322 + }
  323 +
  324 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  325 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  326 + try {
  327 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  328 + if (infoNode != null) {
  329 + JsonNode gatewayNode = infoNode.get("gateway");
  330 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  331 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  332 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME)
  333 + && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  334 + sessionMetaData.setOverwriteActivityTime(
  335 + infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  336 + }
283 337 }
  338 + }
  339 + } catch (IOException e) {
  340 + log.trace(
  341 + "[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
284 342 }
285   -
286   -
287   -
288   -
289   -
290   -
291   -
292   -
293   -
294   - private boolean checkConnected(ChannelHandlerContext ctx, String msg) {
295   - if (deviceSessionCtx.isConnected()) {
296   - return true;
297   - } else {
298   - log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
299   - return false;
300   - }
301   - }
302   -
303   - private void checkGatewaySession(SessionMetaData sessionMetaData) {
304   - TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
305   - try {
306   - JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
307   - if (infoNode != null) {
308   - JsonNode gatewayNode = infoNode.get("gateway");
309   - if (gatewayNode != null && gatewayNode.asBoolean()) {
310   - gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
311   - if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
312   - sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
313   - }
314   - }
315   - }
316   - } catch (IOException e) {
317   - log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
318   - }
  343 + }
  344 +
  345 + @Override
  346 + public void operationComplete(Future<? super Void> future) throws Exception {
  347 + log.trace("[{}] Channel closed!", sessionId);
  348 + doDisconnect();
  349 + }
  350 +
  351 + public void doDisconnect() {
  352 + if (deviceSessionCtx.isConnected()) {
  353 + log.debug("[{}] Client disconnected!", sessionId);
  354 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  355 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  356 + if (gatewaySessionHandler != null) {
  357 + gatewaySessionHandler.onGatewayDisconnect();
  358 + }
  359 + deviceSessionCtx.setDisconnected();
319 360 }
320   -
321   - @Override
322   - public void operationComplete(Future<? super Void> future) throws Exception {
323   - log.trace("[{}] Channel closed!", sessionId);
324   - doDisconnect();
325   - }
326   -
327   - public void doDisconnect() {
328   - if (deviceSessionCtx.isConnected()) {
329   - log.debug("[{}] Client disconnected!", sessionId);
330   - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
331   - transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
332   - if (gatewaySessionHandler != null) {
333   - gatewaySessionHandler.onGatewayDisconnect();
  361 + deviceSessionCtx.release();
  362 + }
  363 +
  364 + private void onValidateDeviceResponse(
  365 + ValidateDeviceCredentialsResponse msg,
  366 + ChannelHandlerContext ctx,
  367 + TcpAuthEntry authEntry,
  368 + UUID scriptId) {
  369 + if (!msg.hasDeviceInfo()) {
  370 + context.onAuthFailure(address);
  371 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
  372 + } else {
  373 + DeviceProfile profile = msg.getDeviceProfile();
  374 + TkTcpDeviceProfileTransportConfiguration tcpConfig =
  375 + (TkTcpDeviceProfileTransportConfiguration)
  376 + profile.getProfileData().getTransportConfiguration();
  377 + if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
  378 + authedCounter.incrementAndGet();
  379 + return;
  380 + }
  381 + context.onAuthSuccess(address);
  382 + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
  383 + deviceSessionCtx.setDeviceProfile(profile);
  384 + deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
  385 + transportService.process(
  386 + deviceSessionCtx.getSessionInfo(),
  387 + SESSION_EVENT_MSG_OPEN,
  388 + new TransportServiceCallback<Void>() {
  389 + @Override
  390 + public void onSuccess(Void msg) {
  391 + SessionMetaData sessionMetaData =
  392 + transportService.registerAsyncSession(
  393 + deviceSessionCtx.getSessionInfo(), UdpDatagramDataHandler.this);
  394 + checkGatewaySession(sessionMetaData);
  395 + pushDeviceMsg(ctx, authEntry.getSuccess());
  396 + deviceSessionCtx.setConnected(true);
  397 + log.debug("[{}] Client connected!", sessionId);
  398 +
  399 + transportService.process(
  400 + deviceSessionCtx.getSessionInfo(),
  401 + TransportProtos.SubscribeToRPCMsg.newBuilder().build(),
  402 + null);
  403 + transportService
  404 + .getCallbackExecutor()
  405 + .execute(
  406 + () ->
  407 + processQueueMessage(
  408 + ctx)); // this callback will execute in Producer worker thread and
  409 + // hard or blocking work have to be submitted to the separate
  410 + // thread.
334 411 }
335   - deviceSessionCtx.setDisconnected();
336   - }
337   - deviceSessionCtx.release();
338   - }
339 412
340   -
341   - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, TcpAuthEntry authEntry, UUID scriptId) {
342   - if (!msg.hasDeviceInfo()) {
343   - context.onAuthFailure(address);
344   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
345   - } else {
346   - DeviceProfile profile = msg.getDeviceProfile();
347   - TkTcpDeviceProfileTransportConfiguration tcpConfig = (TkTcpDeviceProfileTransportConfiguration) profile.getProfileData().getTransportConfiguration();
348   - if (scriptId != null && !tcpConfig.getAuthScriptId().equals(scriptId.toString())) {
349   - authedCounter.incrementAndGet();
350   - return;
  413 + @Override
  414 + public void onError(Throwable e) {
  415 + if (e instanceof TbRateLimitsException) {
  416 + log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
  417 + } else {
  418 + log.warn("[{}] Failed to submit session event", sessionId, e);
  419 + }
  420 + onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
351 421 }
352   - context.onAuthSuccess(address);
353   - deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
354   - deviceSessionCtx.setDeviceProfile(profile);
355   - deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId));
356   - transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
357   - @Override
358   - public void onSuccess(Void msg) {
359   - SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), UdpDatagramDataHandler.this);
360   - checkGatewaySession(sessionMetaData);
361   - pushDeviceMsg(ctx,authEntry.getSuccess());
362   - deviceSessionCtx.setConnected(true);
363   - log.debug("[{}] Client connected!", sessionId);
364   -
365   - transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
366   - transportService.getCallbackExecutor().execute(() -> processQueueMessage(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
367   - }
368   -
369   - @Override
370   - public void onError(Throwable e) {
371   - if (e instanceof TbRateLimitsException) {
372   - log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
373   - } else {
374   - log.warn("[{}] Failed to submit session event", sessionId, e);
375   - }
376   - onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
377   - }
378   - });
379   - }
  422 + });
380 423 }
  424 + }
381 425
382   - private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
383   - authedCounter.incrementAndGet();
384   - if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
385   - pushDeviceMsg(ctx,msg.name());
  426 + private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) {
  427 + authedCounter.incrementAndGet();
  428 + if (TkScriptInvokeService.authScripts.size() == authedCounter.intValue()) {
  429 + pushDeviceMsg(ctx, msg.name());
386 430
387   -// ctx.close();
388   - }
  431 + // ctx.close();
389 432 }
390   -
391   - @Override
392   - public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
393   -
394   - }
395   -
396   - @Override
397   - public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
398   -
399   - }
400   -
401   - @Override
402   - public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
403   - log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
404   -// deviceSessionCtx.getChannel().close();
405   - }
406   -
407   - @Override
408   - public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
409   - log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
410   - TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
411   - try {
  433 + }
  434 +
  435 + @Override
  436 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {}
  437 +
  438 + @Override
  439 + public void onAttributeUpdate(
  440 + UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {}
  441 +
  442 + @Override
  443 + public void onRemoteSessionCloseCommand(
  444 + UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  445 + log.trace(
  446 + "[{}] Received the remote command to close the session: {}",
  447 + sessionId,
  448 + sessionCloseNotification.getMessage());
  449 + // deviceSessionCtx.getChannel().close();
  450 + }
  451 +
  452 + @Override
  453 + public void onToDeviceRpcRequest(
  454 + UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  455 + log.debug(
  456 + "【{}】下发RPC命令【{}】给设备【{}】",
  457 + sessionId,
  458 + rpcRequest.getParams(),
  459 + deviceSessionCtx.getDeviceInfo().getDeviceName());
  460 + TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
  461 + try {
412 462 adaptor
413 463 .convertToPublish(deviceSessionCtx, rpcRequest)
414 464 .ifPresent(
415 465 payload -> {
416 466 deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
417   - Optional.ofNullable(pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas())).ifPresent(cf->{
418   - cf.addListener(
419   - result -> {
  467 + Optional.ofNullable(
  468 + pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas()))
  469 + .ifPresent(
  470 + cf -> {
  471 + cf.addListener(
  472 + result -> {
420 473 if (result.cause() == null) {
421   - transportService.process(
422   - deviceSessionCtx.getSessionInfo(),
423   - rpcRequest,
424   - RpcStatus.DELIVERED,
425   - TransportServiceCallback.EMPTY);
  474 + transportService.process(
  475 + deviceSessionCtx.getSessionInfo(),
  476 + rpcRequest,
  477 + RpcStatus.DELIVERED,
  478 + TransportServiceCallback.EMPTY);
426 479 } else {
427   - // TODO: send error
  480 + // TODO: send error
428 481 }
429   - });
430   - });
  482 + });
  483 + });
431 484 ;
432 485 });
433   - } catch (Exception e) {
434   - transportService.process(deviceSessionCtx.getSessionInfo(),
435   - TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
436   - .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to TCP msg").build(), TransportServiceCallback.EMPTY);
437   - log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
438   - }
  486 + } catch (Exception e) {
  487 + transportService.process(
  488 + deviceSessionCtx.getSessionInfo(),
  489 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  490 + .setRequestId(rpcRequest.getRequestId())
  491 + .setError("Failed to convert device RPC command to TCP msg")
  492 + .build(),
  493 + TransportServiceCallback.EMPTY);
  494 + log.error("[{}] Failed to convert device RPC command to TCP msg", sessionId, e);
439 495 }
440   -
441   -
442   - @Override
443   - public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
444   - log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
445   - }
446   -
447   - /**
448   - * 往设备推送消息
449   - *
450   - * @param message
451   - * @return
452   - */
453   - private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
454   - if(StringUtils.isBlank(message)){
455   - return null;
456   - }
457   - ByteBuf buff = Unpooled.buffer();
458   - if(!message.matches("-?[0-9a-fA-F]+")){
459   - //不满足16进制将字符串转为16进制
460   - message = ByteUtils.stringEncodeToHex(message);
461   - }
462   - buff.writeBytes(ByteUtils.hexToByteArray(message));
463   - return ctx.writeAndFlush(buff);
  496 + }
  497 +
  498 + @Override
  499 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  500 + log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
  501 + }
  502 +
  503 + /**
  504 + * 往设备推送消息
  505 + *
  506 + * @param message
  507 + * @return
  508 + */
  509 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
  510 + if (StringUtils.isBlank(message)) {
  511 + return null;
464 512 }
465   -
466   -
467   - @Override
468   - public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
469   - deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
470   - }
471   -
472   - @Override
473   - public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
474   - deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
475   - }
476   -
477   - @Override
478   - public void onDeviceDeleted(DeviceId deviceId) {
479   - context.onAuthFailure(address);
480   - ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
481   -// ctx.close();
  513 + ByteBuf buff = Unpooled.buffer();
  514 + if (!message.matches("-?[0-9a-fA-F]+")) {
  515 + // 不满足16进制将字符串转为16进制
  516 + message = ByteUtils.stringEncodeToHex(message);
482 517 }
  518 + buff.writeBytes(ByteBufUtil.decodeHexDump(message));
  519 + DatagramPacket packet = new DatagramPacket(buff, address);
  520 + return ctx.writeAndFlush(packet);
  521 + }
  522 +
  523 + @Override
  524 + public void onDeviceProfileUpdate(
  525 + TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  526 + deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
  527 + }
  528 +
  529 + @Override
  530 + public void onDeviceUpdate(
  531 + TransportProtos.SessionInfoProto sessionInfo,
  532 + Device device,
  533 + Optional<DeviceProfile> deviceProfileOpt) {
  534 + deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
  535 + }
  536 +
  537 + @Override
  538 + public void onDeviceDeleted(DeviceId deviceId) {
  539 + context.onAuthFailure(address);
  540 + ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
  541 + // ctx.close();
  542 + }
483 543 }
... ...
1 1 package org.thingsboard.server.dao.yunteng.impl;
2 2
3 3 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4 +import java.util.ArrayList;
  5 +import java.util.List;
  6 +import java.util.Set;
  7 +import java.util.stream.Collectors;
4 8 import lombok.RequiredArgsConstructor;
5 9 import org.springframework.stereotype.Service;
6 10 import org.thingsboard.server.common.data.yunteng.dto.DoActionDTO;
7   -import org.thingsboard.server.common.data.yunteng.enums.DeviceTypeEnum;
8 11 import org.thingsboard.server.common.data.yunteng.enums.ScopeEnum;
9 12 import org.thingsboard.server.dao.yunteng.entities.TenantBaseEntity;
10 13 import org.thingsboard.server.dao.yunteng.entities.TkDeviceEntity;
... ... @@ -14,44 +17,46 @@ import org.thingsboard.server.dao.yunteng.mapper.DoActionMapper;
14 17 import org.thingsboard.server.dao.yunteng.service.AbstractBaseService;
15 18 import org.thingsboard.server.dao.yunteng.service.DoActionService;
16 19
17   -import java.util.ArrayList;
18   -import java.util.List;
19   -import java.util.Set;
20   -import java.util.stream.Collectors;
21   -
22   -/** @Description @Author cxy @Date 2021/12/6 20:23 */
  20 +/**
  21 + * @Description @Author cxy @Date 2021/12/6 20:23
  22 + */
23 23 @Service
24 24 @RequiredArgsConstructor
25 25 public class TkDoActionServiceImpl extends AbstractBaseService<DoActionMapper, TkDoActionEntity>
26 26 implements DoActionService {
27 27 private final DeviceMapper deviceMapper;
  28 +
28 29 @Override
29 30 public List<TkDoActionEntity> getActionsByAll(String sceneId) {
30 31 LambdaQueryWrapper filter =
31   - new LambdaQueryWrapper<TkDoActionEntity>().eq(TkDoActionEntity::getSceneLinkageId, sceneId).eq(TkDoActionEntity::getEntityType, ScopeEnum.ALL);
  32 + new LambdaQueryWrapper<TkDoActionEntity>()
  33 + .eq(TkDoActionEntity::getSceneLinkageId, sceneId)
  34 + .eq(TkDoActionEntity::getEntityType, ScopeEnum.ALL);
32 35 return baseMapper.selectList(filter);
33 36 }
34 37
35   -
36 38 @Override
37 39 public List<TkDoActionEntity> getActionsByPart(String sceneId) {
38 40 LambdaQueryWrapper filter =
39   - new LambdaQueryWrapper<TkDoActionEntity>().eq(TkDoActionEntity::getSceneLinkageId, sceneId).eq(TkDoActionEntity::getEntityType, ScopeEnum.PART);
  41 + new LambdaQueryWrapper<TkDoActionEntity>()
  42 + .eq(TkDoActionEntity::getSceneLinkageId, sceneId)
  43 + .eq(TkDoActionEntity::getEntityType, ScopeEnum.PART);
40 44 List<TkDoActionEntity> source = baseMapper.selectList(filter);
41   - return source.stream().map(t ->{
42   - List<TkDeviceEntity> partDevices = deviceMapper.selectList(new LambdaQueryWrapper<TkDeviceEntity>().in(TkDeviceEntity::getTbDeviceId,t.getDeviceId()));
43   - List<String> deviceId = new ArrayList<>();
44   - for(TkDeviceEntity item : partDevices){
45   - if(DeviceTypeEnum.SENSOR.equals(item.getDeviceType())){
46   - deviceId.add(item.getGatewayId());
47   - }else{
48   - deviceId.add(item.getTbDeviceId());
49   - }
50   - }
51   - t.setDeviceId(deviceId);
52   - return t;
53   - }).collect(Collectors.toList());
54   -
  45 + return source.stream()
  46 + .map(
  47 + t -> {
  48 + List<TkDeviceEntity> partDevices =
  49 + deviceMapper.selectList(
  50 + new LambdaQueryWrapper<TkDeviceEntity>()
  51 + .in(TkDeviceEntity::getTbDeviceId, t.getDeviceId()));
  52 + List<String> deviceId = new ArrayList<>();
  53 + for (TkDeviceEntity item : partDevices) {
  54 + deviceId.add(item.getTbDeviceId());
  55 + }
  56 + t.setDeviceId(deviceId);
  57 + return t;
  58 + })
  59 + .collect(Collectors.toList());
55 60 }
56 61
57 62 @Override
... ...