Commit 691b2251ee1d24a9d81b2fd6b821cae7f30d186f

Authored by 房远帅
Committed by 朱园亮
1 parent 50fecc0e

MQTT方法优化

... ... @@ -253,6 +253,7 @@ public class MqttUtils {
253 253 // 连接到服务器
254 254 log.info("正在连接到: " + brokerUrl);
255 255 mqttClient.connect(connectOptions);
  256 + isConnected = mqttClient.isConnected();
256 257
257 258 } catch (MqttException e) {
258 259 isConnected = false;
... ... @@ -274,21 +275,54 @@ public class MqttUtils {
274 275 * @throws MqttException 发布异常
275 276 */
276 277 public void publish(String topic, String message, int qos, boolean retained) throws MqttException {
277   - if (!isConnected) {
  278 + if (mqttClient == null) {
278 279 throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
279 280 }
280 281
281   - try {
282   - MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
283   - mqttMessage.setQos(qos);
284   - mqttMessage.setRetained(retained);
  282 + int attempts = 0;
  283 + while (true) {
  284 + if (!mqttClient.isConnected()) {
  285 + try {
  286 + if (connectOptions != null) {
  287 + mqttClient.connect(connectOptions);
  288 + isConnected = mqttClient.isConnected();
  289 + }
  290 + } catch (MqttException ce) {
  291 + attempts++;
  292 + if (attempts >= 2) {
  293 + log.error("重连失败: " + ce.getMessage());
  294 + throw ce;
  295 + }
  296 + try {
  297 + Thread.sleep(100);
  298 + } catch (InterruptedException ie) {
  299 + Thread.currentThread().interrupt();
  300 + }
  301 + continue;
  302 + }
  303 + }
285 304
286   - mqttClient.publish(topic, mqttMessage);
287   - log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +
288   - ", Retained: " + retained + ", Message: " + message);
289   - } catch (MqttException e) {
290   - log.error("发布失败: " + e.getMessage());
291   - throw e;
  305 + try {
  306 + MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
  307 + mqttMessage.setQos(qos);
  308 + mqttMessage.setRetained(retained);
  309 +
  310 + mqttClient.publish(topic, mqttMessage);
  311 + log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +
  312 + ", Retained: " + retained + ", Message: " + message);
  313 + return;
  314 + } catch (MqttException e) {
  315 + attempts++;
  316 + if (attempts >= 2) {
  317 + log.error("发布失败: " + e.getMessage());
  318 + throw e;
  319 + }
  320 + try {
  321 + Thread.sleep(100);
  322 + } catch (InterruptedException ie) {
  323 + Thread.currentThread().interrupt();
  324 + }
  325 + }
292 326 }
293 327 }
294 328
... ... @@ -470,7 +504,8 @@ public class MqttUtils {
470 504
471 505 config.setConnectionTimeout(15)
472 506 .setKeepAliveInterval(30)
473   - .setQos(1);
  507 + .setQos(1)
  508 + .setMaxInflight(50);
474 509
475 510 try {
476 511 mqttUtils.connect(config);
... ... @@ -490,19 +525,28 @@ public class MqttUtils {
490 525 .setPassword(password)
491 526 .setConnectionTimeout(15)
492 527 .setKeepAliveInterval(30)
493   - .setQos(1);
  528 + .setQos(1)
  529 + .setMaxInflight(50);
494 530
495 531 try {
496   - // 连接
497 532 mqttUtils.connect(config);
498 533
499   - // 发布消息
500 534 mqttUtils.publish(topic, message);
501 535
502 536 } catch (MqttException e) {
503   - e.printStackTrace();
  537 + log.error("首次发布失败: " + e.getMessage());
  538 + try {
  539 + mqttUtils.disconnect();
  540 + } catch (Exception ignore) {
  541 + }
  542 + try {
  543 + mqttUtils.connect(config);
  544 + mqttUtils.publish(topic, message);
  545 + } catch (MqttException ee) {
  546 + log.error("重试发布失败: " + ee.getMessage());
  547 + throw ee;
  548 + }
504 549 } finally {
505   - // 断开连接
506 550 mqttUtils.disconnect();
507 551 }
508 552 }
... ...