Commit b87e1b1797c0fc0358d7ec34b8f708014de6f2c9

Authored by 房远帅
1 parent d552078e

mqtt方法优化

@@ -140,7 +140,7 @@ public class CzDeviceReportService { @@ -140,7 +140,7 @@ public class CzDeviceReportService {
140 140
141 try { 141 try {
142 log.debug("开始MQTT发布: deviceId={}", deviceId); 142 log.debug("开始MQTT发布: deviceId={}", deviceId);
143 - MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(deviceMap)); 143 + MqttUtils.publish(broker, topic, username, password, clientId, JSON.toJSONString(deviceMap));
144 successCount++; 144 successCount++;
145 145
146 // 每10个设备记录一次进度 146 // 每10个设备记录一次进度
@@ -274,4 +274,4 @@ public class CzDeviceReportService { @@ -274,4 +274,4 @@ public class CzDeviceReportService {
274 log.info("数据库操作完成,返回{}条记录", resultList.size()); 274 log.info("数据库操作完成,返回{}条记录", resultList.size());
275 return resultList; 275 return resultList;
276 } 276 }
277 -}  
  277 +}
1 package com.iot.scheduler.utils; 1 package com.iot.scheduler.utils;
2 2
  3 +import io.micrometer.common.util.StringUtils;
3 import lombok.extern.slf4j.Slf4j; 4 import lombok.extern.slf4j.Slf4j;
4 -import org.apache.commons.lang3.StringUtils;  
5 import org.eclipse.paho.client.mqttv3.*; 5 import org.eclipse.paho.client.mqttv3.*;
6 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 6 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
7 7
@@ -253,6 +253,7 @@ public class MqttUtils { @@ -253,6 +253,7 @@ public class MqttUtils {
253 // 连接到服务器 253 // 连接到服务器
254 log.info("正在连接到: " + brokerUrl); 254 log.info("正在连接到: " + brokerUrl);
255 mqttClient.connect(connectOptions); 255 mqttClient.connect(connectOptions);
  256 + isConnected = mqttClient.isConnected();
256 257
257 } catch (MqttException e) { 258 } catch (MqttException e) {
258 isConnected = false; 259 isConnected = false;
@@ -274,21 +275,54 @@ public class MqttUtils { @@ -274,21 +275,54 @@ public class MqttUtils {
274 * @throws MqttException 发布异常 275 * @throws MqttException 发布异常
275 */ 276 */
276 public void publish(String topic, String message, int qos, boolean retained) throws MqttException { 277 public void publish(String topic, String message, int qos, boolean retained) throws MqttException {
277 - if (!isConnected) { 278 + if (mqttClient == null) {
278 throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); 279 throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
279 } 280 }
280 281
281 - try {  
282 - MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));  
283 - mqttMessage.setQos(qos);  
284 - mqttMessage.setRetained(retained); 282 + int attempts = 0;
  283 + while (true) {
  284 + if (!mqttClient.isConnected()) {
  285 + try {
  286 + if (connectOptions != null) {
  287 + mqttClient.connect(connectOptions);
  288 + isConnected = mqttClient.isConnected();
  289 + }
  290 + } catch (MqttException ce) {
  291 + attempts++;
  292 + if (attempts >= 2) {
  293 + log.error("重连失败: " + ce.getMessage());
  294 + throw ce;
  295 + }
  296 + try {
  297 + Thread.sleep(100);
  298 + } catch (InterruptedException ie) {
  299 + Thread.currentThread().interrupt();
  300 + }
  301 + continue;
  302 + }
  303 + }
285 304
286 - mqttClient.publish(topic, mqttMessage);  
287 - log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +  
288 - ", Retained: " + retained + ", Message: " + message);  
289 - } catch (MqttException e) {  
290 - log.error("发布失败: " + e.getMessage());  
291 - throw e; 305 + try {
  306 + MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
  307 + mqttMessage.setQos(qos);
  308 + mqttMessage.setRetained(retained);
  309 +
  310 + mqttClient.publish(topic, mqttMessage);
  311 + log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +
  312 + ", Retained: " + retained + ", Message: " + message);
  313 + return;
  314 + } catch (MqttException e) {
  315 + attempts++;
  316 + if (attempts >= 2) {
  317 + log.error("发布失败: " + e.getMessage());
  318 + throw e;
  319 + }
  320 + try {
  321 + Thread.sleep(100);
  322 + } catch (InterruptedException ie) {
  323 + Thread.currentThread().interrupt();
  324 + }
  325 + }
292 } 326 }
293 } 327 }
294 328
@@ -456,7 +490,7 @@ public class MqttUtils { @@ -456,7 +490,7 @@ public class MqttUtils {
456 String password, String clientId, String message) throws Exception { 490 String password, String clientId, String message) throws Exception {
457 MqttUtils mqttUtils = new MqttUtils(); 491 MqttUtils mqttUtils = new MqttUtils();
458 492
459 - MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker); 493 + MqttConfig config = new MqttConfig(broker);
460 494
461 if (StringUtils.isNotBlank(username)) { 495 if (StringUtils.isNotBlank(username)) {
462 config.setUsername(username); 496 config.setUsername(username);
@@ -470,7 +504,8 @@ public class MqttUtils { @@ -470,7 +504,8 @@ public class MqttUtils {
470 504
471 config.setConnectionTimeout(15) 505 config.setConnectionTimeout(15)
472 .setKeepAliveInterval(30) 506 .setKeepAliveInterval(30)
473 - .setQos(1); 507 + .setQos(1)
  508 + .setMaxInflight(50);
474 509
475 try { 510 try {
476 mqttUtils.connect(config); 511 mqttUtils.connect(config);
@@ -481,6 +516,41 @@ public class MqttUtils { @@ -481,6 +516,41 @@ public class MqttUtils {
481 } 516 }
482 } 517 }
483 518
  519 + public static void publish(String broker, String topic, String username,
  520 + String password, String clientId, String message) throws Exception {
  521 + MqttUtils mqttUtils = new MqttUtils();
  522 + MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker)
  523 + .setClientId(clientId)
  524 + .setUsername(username)
  525 + .setPassword(password)
  526 + .setConnectionTimeout(15)
  527 + .setKeepAliveInterval(30)
  528 + .setQos(1)
  529 + .setMaxInflight(50);
  530 +
  531 + try {
  532 + mqttUtils.connect(config);
  533 +
  534 + mqttUtils.publish(topic, message);
  535 +
  536 + } catch (MqttException e) {
  537 + log.error("首次发布失败: " + e.getMessage());
  538 + try {
  539 + mqttUtils.disconnect();
  540 + } catch (Exception ignore) {
  541 + }
  542 + try {
  543 + mqttUtils.connect(config);
  544 + mqttUtils.publish(topic, message);
  545 + } catch (MqttException ee) {
  546 + log.error("重试发布失败: " + ee.getMessage());
  547 + throw ee;
  548 + }
  549 + } finally {
  550 + mqttUtils.disconnect();
  551 + }
  552 + }
  553 +
484 554
485 public static void main(String[] args) { 555 public static void main(String[] args) {
486 556
@@ -520,4 +590,4 @@ public class MqttUtils { @@ -520,4 +590,4 @@ public class MqttUtils {
520 // mqttUtils.disconnect(); 590 // mqttUtils.disconnect();
521 // } 591 // }
522 } 592 }
523 -}  
  593 +}