Commit d5dfe007496c7c1c2351c1b67ec140deff8307f8

Authored by 胡翰林
1 parent 01521a3b

mqtt工具类

... ... @@ -84,6 +84,11 @@
84 84 <artifactId>jedis</artifactId>
85 85 <version>2.9.0</version>
86 86 </dependency>
  87 + <dependency>
  88 + <groupId>org.eclipse.paho</groupId>
  89 + <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  90 + <version>1.2.5</version>
  91 + </dependency>
87 92 </dependencies>
88 93
89 94 <build>
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.apache.commons.lang3.StringUtils;
  5 +import org.eclipse.paho.client.mqttv3.*;
  6 +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7 +
  8 +import java.nio.charset.StandardCharsets;
  9 +import java.util.ArrayList;
  10 +import java.util.List;
  11 +import java.util.UUID;
  12 +
  13 +/**
  14 + * MQTT客户端工具类
  15 + * 支持连接、发布、订阅、断开连接等操作
  16 + */
  17 +@Slf4j
  18 +public class MqttUtils {
  19 +
  20 + private MqttClient mqttClient;
  21 + private MqttConnectOptions connectOptions;
  22 + private ConnectionCallback connectionCallback;
  23 + private MessageCallback messageCallback;
  24 + private boolean isConnected = false;
  25 + private List<String> subscribedTopics = new ArrayList<>();
  26 +
  27 + /**
  28 + * 连接回调接口
  29 + */
  30 + public interface ConnectionCallback {
  31 + void onConnected();
  32 +
  33 + void onDisconnected();
  34 +
  35 + void onConnectionFailed(Throwable cause);
  36 +
  37 + void onConnectionLost(Throwable cause);
  38 + }
  39 +
  40 + /**
  41 + * 消息回调接口
  42 + */
  43 + public interface MessageCallback {
  44 + void onMessageReceived(String topic, String message);
  45 +
  46 + void onMessageDelivered(IMqttDeliveryToken token);
  47 + }
  48 +
  49 + /**
  50 + * MQTT连接配置构建器
  51 + */
  52 + public static class MqttConfig {
  53 + private String broker;
  54 + private String clientId;
  55 + private String username;
  56 + private String password;
  57 + private int connectionTimeout = 10;
  58 + private int keepAliveInterval = 60;
  59 + private boolean cleanSession = true;
  60 + private boolean automaticReconnect = true;
  61 + private int maxInflight = 10;
  62 + private int qos = 1;
  63 +
  64 + public MqttConfig(String broker) {
  65 + this.broker = broker;
  66 + this.clientId = "Client_" + UUID.randomUUID().toString().substring(0, 8);
  67 + }
  68 +
  69 + public MqttConfig setBroker(String broker) {
  70 + this.broker = broker;
  71 + return this;
  72 + }
  73 +
  74 + public MqttConfig setClientId(String clientId) {
  75 + this.clientId = clientId;
  76 + return this;
  77 + }
  78 +
  79 + public MqttConfig setUsername(String username) {
  80 + this.username = username;
  81 + return this;
  82 + }
  83 +
  84 + public MqttConfig setPassword(String password) {
  85 + this.password = password;
  86 + return this;
  87 + }
  88 +
  89 + public MqttConfig setConnectionTimeout(int connectionTimeout) {
  90 + this.connectionTimeout = connectionTimeout;
  91 + return this;
  92 + }
  93 +
  94 + public MqttConfig setKeepAliveInterval(int keepAliveInterval) {
  95 + this.keepAliveInterval = keepAliveInterval;
  96 + return this;
  97 + }
  98 +
  99 + public MqttConfig setCleanSession(boolean cleanSession) {
  100 + this.cleanSession = cleanSession;
  101 + return this;
  102 + }
  103 +
  104 + public MqttConfig setAutomaticReconnect(boolean automaticReconnect) {
  105 + this.automaticReconnect = automaticReconnect;
  106 + return this;
  107 + }
  108 +
  109 + public MqttConfig setMaxInflight(int maxInflight) {
  110 + this.maxInflight = maxInflight;
  111 + return this;
  112 + }
  113 +
  114 + public MqttConfig setQos(int qos) {
  115 + this.qos = qos;
  116 + return this;
  117 + }
  118 +
  119 + public String getBroker() {
  120 + return broker;
  121 + }
  122 +
  123 + public String getClientId() {
  124 + return clientId;
  125 + }
  126 +
  127 + public String getUsername() {
  128 + return username;
  129 + }
  130 +
  131 + public String getPassword() {
  132 + return password;
  133 + }
  134 +
  135 + public int getConnectionTimeout() {
  136 + return connectionTimeout;
  137 + }
  138 +
  139 + public int getKeepAliveInterval() {
  140 + return keepAliveInterval;
  141 + }
  142 +
  143 + public boolean isCleanSession() {
  144 + return cleanSession;
  145 + }
  146 +
  147 + public boolean isAutomaticReconnect() {
  148 + return automaticReconnect;
  149 + }
  150 +
  151 + public int getMaxInflight() {
  152 + return maxInflight;
  153 + }
  154 +
  155 + public int getQos() {
  156 + return qos;
  157 + }
  158 + }
  159 +
  160 + /**
  161 + * 创建MQTT工具实例
  162 + */
  163 + public MqttUtils() {
  164 + }
  165 +
  166 + /**
  167 + * 连接到MQTT服务器
  168 + *
  169 + * @param config 连接配置
  170 + * @throws MqttException 连接异常
  171 + */
  172 + public void connect(MqttConfig config) throws MqttException {
  173 + connect(config, null, null);
  174 + }
  175 +
  176 + /**
  177 + * 连接到MQTT服务器(带回调)
  178 + *
  179 + * @param config 连接配置
  180 + * @param connCallback 连接回调
  181 + * @param msgCallback 消息回调
  182 + * @throws MqttException 连接异常
  183 + */
  184 + public void connect(MqttConfig config, ConnectionCallback connCallback, MessageCallback msgCallback)
  185 + throws MqttException {
  186 +
  187 + this.connectionCallback = connCallback;
  188 + this.messageCallback = msgCallback;
  189 +
  190 + try {
  191 + // 创建MQTT客户端
  192 + String brokerUrl = config.getBroker().startsWith("tcp://") ?
  193 + config.getBroker() : "tcp://" + config.getBroker();
  194 +
  195 + mqttClient = new MqttClient(brokerUrl, config.getClientId(), new MemoryPersistence());
  196 +
  197 + // 设置连接选项
  198 + connectOptions = new MqttConnectOptions();
  199 + connectOptions.setCleanSession(config.isCleanSession());
  200 + connectOptions.setConnectionTimeout(config.getConnectionTimeout());
  201 + connectOptions.setKeepAliveInterval(config.getKeepAliveInterval());
  202 + connectOptions.setAutomaticReconnect(config.isAutomaticReconnect());
  203 + connectOptions.setMaxInflight(config.getMaxInflight());
  204 +
  205 + // 设置用户名密码(如果有)
  206 + if (config.getUsername() != null && !config.getUsername().isEmpty()) {
  207 + connectOptions.setUserName(config.getUsername());
  208 + if (config.getPassword() != null && !config.getPassword().isEmpty()) {
  209 + connectOptions.setPassword(config.getPassword().toCharArray());
  210 + }
  211 + }
  212 +
  213 + // 设置回调
  214 + mqttClient.setCallback(new MqttCallbackExtended() {
  215 + @Override
  216 + public void connectComplete(boolean reconnect, String serverURI) {
  217 + isConnected = true;
  218 + log.info("MQTT连接成功: " + serverURI + (reconnect ? " (重连)" : ""));
  219 + if (connectionCallback != null) {
  220 + connectionCallback.onConnected();
  221 + }
  222 + }
  223 +
  224 + @Override
  225 + public void connectionLost(Throwable cause) {
  226 + isConnected = false;
  227 + log.error("MQTT连接断开: " + cause.getMessage());
  228 + if (connectionCallback != null) {
  229 + connectionCallback.onConnectionLost(cause);
  230 + }
  231 + }
  232 +
  233 + @Override
  234 + public void messageArrived(String topic, MqttMessage message) {
  235 + String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
  236 + log.info("收到消息 - Topic: " + topic + ", QoS: " + message.getQos() +
  237 + ", Message: " + payload);
  238 +
  239 + if (messageCallback != null) {
  240 + messageCallback.onMessageReceived(topic, payload);
  241 + }
  242 + }
  243 +
  244 + @Override
  245 + public void deliveryComplete(IMqttDeliveryToken token) {
  246 + log.info("消息发送完成");
  247 + if (messageCallback != null) {
  248 + messageCallback.onMessageDelivered(token);
  249 + }
  250 + }
  251 + });
  252 +
  253 + // 连接到服务器
  254 + log.info("正在连接到: " + brokerUrl);
  255 + mqttClient.connect(connectOptions);
  256 +
  257 + } catch (MqttException e) {
  258 + isConnected = false;
  259 + log.error("连接失败: " + e.getMessage());
  260 + if (connectionCallback != null) {
  261 + connectionCallback.onConnectionFailed(e);
  262 + }
  263 + throw e;
  264 + }
  265 + }
  266 +
  267 + /**
  268 + * 发布消息
  269 + *
  270 + * @param topic 主题
  271 + * @param message 消息内容
  272 + * @param qos QoS级别 (0, 1, 2)
  273 + * @param retained 是否保留消息
  274 + * @throws MqttException 发布异常
  275 + */
  276 + public void publish(String topic, String message, int qos, boolean retained) throws MqttException {
  277 + if (!isConnected) {
  278 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  279 + }
  280 +
  281 + try {
  282 + MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
  283 + mqttMessage.setQos(qos);
  284 + mqttMessage.setRetained(retained);
  285 +
  286 + mqttClient.publish(topic, mqttMessage);
  287 + log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +
  288 + ", Retained: " + retained + ", Message: " + message);
  289 + } catch (MqttException e) {
  290 + log.error("发布失败: " + e.getMessage());
  291 + throw e;
  292 + }
  293 + }
  294 +
  295 + /**
  296 + * 发布消息(简化版)
  297 + *
  298 + * @param topic 主题
  299 + * @param message 消息内容
  300 + * @throws MqttException 发布异常
  301 + */
  302 + public void publish(String topic, String message) throws MqttException {
  303 + publish(topic, message, 1, false);
  304 + }
  305 +
  306 + /**
  307 + * 订阅主题
  308 + *
  309 + * @param topic 主题
  310 + * @param qos QoS级别
  311 + * @throws MqttException 订阅异常
  312 + */
  313 + public void subscribe(String topic, int qos) throws MqttException {
  314 + if (!isConnected) {
  315 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  316 + }
  317 +
  318 + try {
  319 + mqttClient.subscribe(topic, qos);
  320 + subscribedTopics.add(topic);
  321 + log.info("订阅成功 - Topic: " + topic + ", QoS: " + qos);
  322 + } catch (MqttException e) {
  323 + log.error("订阅失败: " + e.getMessage());
  324 + throw e;
  325 + }
  326 + }
  327 +
  328 + /**
  329 + * 订阅多个主题
  330 + *
  331 + * @param topics 主题数组
  332 + * @param qos QoS级别数组
  333 + * @throws MqttException 订阅异常
  334 + */
  335 + public void subscribe(String[] topics, int[] qos) throws MqttException {
  336 + if (!isConnected) {
  337 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  338 + }
  339 +
  340 + try {
  341 + mqttClient.subscribe(topics, qos);
  342 + for (String topic : topics) {
  343 + subscribedTopics.add(topic);
  344 + }
  345 + log.info("批量订阅成功 - 主题数量: " + topics.length);
  346 + } catch (MqttException e) {
  347 + log.error("批量订阅失败: " + e.getMessage());
  348 + throw e;
  349 + }
  350 + }
  351 +
  352 + /**
  353 + * 取消订阅
  354 + *
  355 + * @param topic 主题
  356 + * @throws MqttException 取消订阅异常
  357 + */
  358 + public void unsubscribe(String topic) throws MqttException {
  359 + if (!isConnected) {
  360 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  361 + }
  362 +
  363 + try {
  364 + mqttClient.unsubscribe(topic);
  365 + subscribedTopics.remove(topic);
  366 + log.info("取消订阅成功 - Topic: " + topic);
  367 + } catch (MqttException e) {
  368 + log.error("取消订阅失败: " + e.getMessage());
  369 + throw e;
  370 + }
  371 + }
  372 +
  373 + /**
  374 + * 断开连接
  375 + */
  376 + public void disconnect() {
  377 + if (mqttClient != null && isConnected) {
  378 + try {
  379 + // 取消所有订阅
  380 + if (!subscribedTopics.isEmpty()) {
  381 + String[] topics = subscribedTopics.toArray(new String[0]);
  382 + mqttClient.unsubscribe(topics);
  383 + subscribedTopics.clear();
  384 + }
  385 +
  386 + // 断开连接
  387 + mqttClient.disconnect();
  388 + isConnected = false;
  389 + log.info("MQTT连接已断开");
  390 +
  391 + if (connectionCallback != null) {
  392 + connectionCallback.onDisconnected();
  393 + }
  394 +
  395 + } catch (MqttException e) {
  396 + log.error("断开连接异常: " + e.getMessage());
  397 + }
  398 + }
  399 + }
  400 +
  401 + /**
  402 + * 判断是否已连接
  403 + *
  404 + * @return 连接状态
  405 + */
  406 + public boolean isConnected() {
  407 + return isConnected && mqttClient != null && mqttClient.isConnected();
  408 + }
  409 +
  410 + /**
  411 + * 获取客户端ID
  412 + *
  413 + * @return 客户端ID
  414 + */
  415 + public String getClientId() {
  416 + return mqttClient != null ? mqttClient.getClientId() : null;
  417 + }
  418 +
  419 + /**
  420 + * 获取服务器URI
  421 + *
  422 + * @return 服务器URI
  423 + */
  424 + public String getServerURI() {
  425 + return mqttClient != null ? mqttClient.getServerURI() : null;
  426 + }
  427 +
  428 + /**
  429 + * 获取已订阅的主题列表
  430 + *
  431 + * @return 主题列表
  432 + */
  433 + public List<String> getSubscribedTopics() {
  434 + return new ArrayList<>(subscribedTopics);
  435 + }
  436 +
  437 + /**
  438 + * 关闭客户端(释放资源)
  439 + */
  440 + public void close() {
  441 + disconnect();
  442 + try {
  443 + if (mqttClient != null) {
  444 + mqttClient.close();
  445 + log.info("MQTT客户端已关闭");
  446 + }
  447 + } catch (MqttException e) {
  448 + log.error("关闭客户端异常: " + e.getMessage());
  449 + }
  450 + }
  451 +
  452 + /**
  453 + * 快速连接并发送消息
  454 + */
  455 + public static void quickPublish(String broker, String topic, String username,
  456 + String password, String clientId, String message) throws Exception {
  457 + MqttUtils mqttUtils = new MqttUtils();
  458 +
  459 + MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker);
  460 +
  461 + if (StringUtils.isNotBlank(username)) {
  462 + config.setUsername(username);
  463 + }
  464 + if (StringUtils.isNotBlank(password)) {
  465 + config.setPassword(password);
  466 + }
  467 + if (StringUtils.isNotBlank(clientId)) {
  468 + config.setClientId(clientId);
  469 + }
  470 +
  471 + config.setConnectionTimeout(15)
  472 + .setKeepAliveInterval(30)
  473 + .setQos(1);
  474 +
  475 + try {
  476 + mqttUtils.connect(config);
  477 + mqttUtils.publish(topic, message);
  478 + log.info("消息发送成功!");
  479 + } finally {
  480 + mqttUtils.disconnect();
  481 + }
  482 + }
  483 +
  484 +
  485 + public static void main(String[] args) {
  486 +
  487 + String broker = "10.9.0.205:1883";
  488 + String clientId = "QesP7NB4PpAQhupFxlY";
  489 + String user = "admin";
  490 + String pwd = "123456";
  491 + String topic = "v1/devices/me/telemetry";
  492 + String message = "{\"temp\": 12.8,\"status\": \"ON\"}";
  493 +
  494 + try {
  495 + quickPublish(broker, topic, user, pwd, clientId, message);
  496 + } catch (Exception e) {
  497 + e.printStackTrace();
  498 + }
  499 +
  500 +// MqttUtils mqttUtils = new MqttUtils();
  501 +// MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker)
  502 +// .setClientId(clientId)
  503 +// .setUsername(user)
  504 +// .setPassword(pwd)
  505 +// .setConnectionTimeout(15)
  506 +// .setKeepAliveInterval(30)
  507 +// .setQos(1);
  508 +//
  509 +// try {
  510 +// // 连接
  511 +// mqttUtils.connect(config);
  512 +//
  513 +// // 发布消息
  514 +// mqttUtils.publish(topic, message);
  515 +//
  516 +// } catch (MqttException e) {
  517 +// e.printStackTrace();
  518 +// } finally {
  519 +// // 断开连接
  520 +// mqttUtils.disconnect();
  521 +// }
  522 + }
  523 +}
\ No newline at end of file
... ...