Commit 247b181a0f50a462380c60adec025479b7cda3a4

Authored by 房远帅
Committed by 朱园亮
1 parent 7220147f

凯盛数据推送能源管理

... ... @@ -35,6 +35,60 @@
35 35 <artifactId>spring-boot-starter-test</artifactId>
36 36 <scope>test</scope>
37 37 </dependency>
  38 + <dependency>
  39 + <groupId>com.zaxxer</groupId>
  40 + <artifactId>HikariCP</artifactId>
  41 + <version>4.0.3</version>
  42 + </dependency>
  43 + <dependency>
  44 + <groupId>com.zaxxer</groupId>
  45 + <artifactId>HikariCP-java7</artifactId>
  46 + <version>2.4.13</version>
  47 + </dependency>
  48 + <dependency>
  49 + <groupId>org.postgresql</groupId>
  50 + <artifactId>postgresql</artifactId>
  51 + <version>42.7.3</version>
  52 + </dependency>
  53 + <dependency>
  54 + <groupId>org.apache.httpcomponents</groupId>
  55 + <artifactId>httpclient</artifactId>
  56 + <version>4.5.8</version>
  57 + </dependency>
  58 + <!--工具类-->
  59 + <dependency>
  60 + <groupId>org.apache.commons</groupId>
  61 + <artifactId>commons-lang3</artifactId>
  62 + <version>3.6</version>
  63 + </dependency>
  64 +
  65 + <!--工具类-->
  66 + <dependency>
  67 + <groupId>org.apache.commons</groupId>
  68 + <artifactId>commons-collections4</artifactId>
  69 + <version>4.2</version>
  70 + </dependency>
  71 +
  72 + <dependency>
  73 + <groupId>com.alibaba</groupId>
  74 + <artifactId>fastjson</artifactId>
  75 + <version>1.2.60</version>
  76 + </dependency>
  77 + <dependency>
  78 + <groupId>org.springframework.boot</groupId>
  79 + <artifactId>spring-boot-starter-data-redis</artifactId>
  80 + <version>2.0.9.RELEASE</version>
  81 + </dependency>
  82 + <dependency>
  83 + <groupId>redis.clients</groupId>
  84 + <artifactId>jedis</artifactId>
  85 + <version>2.9.0</version>
  86 + </dependency>
  87 + <dependency>
  88 + <groupId>org.eclipse.paho</groupId>
  89 + <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  90 + <version>1.2.5</version>
  91 + </dependency>
38 92 </dependencies>
39 93
40 94 <build>
... ...
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.iot.scheduler.utils.MqttUtils;
  5 +import com.zaxxer.hikari.HikariConfig;
  6 +import com.zaxxer.hikari.HikariDataSource;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.springframework.beans.factory.annotation.Value;
  9 +import org.springframework.stereotype.Service;
  10 +import org.springframework.util.CollectionUtils;
  11 +
  12 +import java.sql.*;
  13 +import java.time.LocalDateTime;
  14 +import java.time.format.DateTimeFormatter;
  15 +import java.util.ArrayList;
  16 +import java.util.HashMap;
  17 +import java.util.List;
  18 +import java.util.Map;
  19 +
  20 +/**
  21 + * 凯盛设备
  22 + */
  23 +@Slf4j
  24 +@Service
  25 +public class KSDeviceReportService {
  26 +
  27 + @Value("${ks.third.jdbcUrl:jdbc:postgresql://192.168.0.249:5432/iot}")
  28 + private String jdbcUrl;
  29 + @Value("${ks.third.jdbcUserName:postgres}")
  30 + private String jdbcUserName;
  31 +
  32 + @Value("${ks.third.jdbcPassword:1qaz@WSX}")
  33 + private String jdbcPassword;
  34 +
  35 + @Value("${ks.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  36 + private String selectSql;
  37 +
  38 + @Value("${ks.third.selectSql1:SELECT * FROM ts_kv_dictionary;}")
  39 + private String selectSql1;
  40 +
  41 + @Value("${ks.third.broker:10.9.0.114:1884}")
  42 + private String broker;
  43 + @Value("${ks.third.username:yunpower}")
  44 + private String username;
  45 + @Value("${ks.third.password:P1o2w3er}")
  46 + private String password;
  47 + @Value("${ks.third.topic:storage0}")
  48 + private String topic;
  49 + @Value("${ks.third.clientId:2020672015207559170}")
  50 + private String clientId;
  51 + @Value("${ks.third.channelSn:8VAUR9H3}")
  52 + private String channelSn;
  53 +
  54 + public void deviceElectricity() {
  55 + log.info("开始执行凯盛电表设备属性上报任务");
  56 + try {
  57 + List<Object> needSyncDataList = initConnectAndSelectData("E");
  58 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  59 + log.info("电表没有需要上报的数据,任务结束");
  60 + return;
  61 + }
  62 +
  63 + log.info("开始处理{}条电表设备数据并进行上报", needSyncDataList.size());
  64 + int successCount = 0;
  65 + int failCount = 0;
  66 + List<String> failedDevices = new ArrayList<>();
  67 + Map<Object, String> map = new HashMap<>();
  68 + map.put(1,"Ep");
  69 + map.put(2,"Ubc");
  70 + map.put(3,"Uc");
  71 + map.put(4,"P");
  72 + map.put(5,"Uab");
  73 + map.put(6,"Ib");
  74 + map.put(7,"Eq");
  75 + map.put(8,"F");
  76 + map.put(9,"Q");
  77 + map.put(10,"Ua");
  78 + map.put(11,"S");
  79 + map.put(12,"Ic");
  80 + map.put(13,"Ub");
  81 + map.put(14,"Uca");
  82 + map.put(15,"PSum");
  83 + map.put(16,"Ia");
  84 +
  85 + for (int i = 0; i < needSyncDataList.size(); i++) {
  86 + Object needSyncData = needSyncDataList.get(i);
  87 + try {
  88 + List<Object> dataList = (ArrayList) needSyncData;
  89 + String sn = dataList.get(0).toString();
  90 +
  91 + // 1. 获取当前时间
  92 + LocalDateTime now = LocalDateTime.now();
  93 + // 2. 定义间隔分钟数 (例如 5 分钟)
  94 + int intervalMinutes = 5;
  95 + // 3. 计算向下取整的时间
  96 + // 逻辑:获取当前的分钟数,减去 (分钟数 % 间隔),秒和纳秒设为 0
  97 + LocalDateTime roundedTime = now
  98 + .withMinute(now.getMinute() - (now.getMinute() % intervalMinutes))
  99 + .withSecond(0)
  100 + .withNano(0);
  101 +
  102 + // 4. 格式化输出
  103 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  104 +
  105 + for (int j = 1; j < dataList.size(); j++) {
  106 + Map<String, Object> properties = new HashMap<>(5);
  107 + Object o = dataList.get(j);
  108 + if (o == null) {
  109 + continue;
  110 + }
  111 + String deviceVarSn = "ks_" + sn + map.get(j);
  112 + log.info("===deviceVarSn==" + deviceVarSn);
  113 + properties.put("channelSn", channelSn);
  114 + properties.put("deviceVarSn", deviceVarSn);
  115 + properties.put("value", Double.parseDouble(o.toString().trim()));
  116 + properties.put("saveTime", roundedTime.format(formatter));
  117 + properties.put("collectTime", roundedTime.format(formatter));
  118 + try {
  119 + log.debug("电表开始MQTT发布: sn={}", sn);
  120 + MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
  121 + successCount++;
  122 +
  123 + } catch (Exception e) {
  124 + failCount++;
  125 + failedDevices.add(sn);
  126 + log.error("电表设备上报失败: sn={}, clientId={}, topic={}, 错误信息: {}",
  127 + sn, clientId, topic, e.getMessage());
  128 + log.error("电表详细异常信息:", e);
  129 + }
  130 +
  131 + // 可选:添加延迟,避免发送过快
  132 + try {
  133 + Thread.sleep(10); // 10毫秒延迟
  134 + } catch (InterruptedException e) {
  135 + log.warn("电表线程休眠被中断", e);
  136 + Thread.currentThread().interrupt();
  137 + }
  138 + }
  139 + } catch (Exception e) {
  140 + failCount++;
  141 + log.error("电表处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
  142 + }
  143 + }
  144 +
  145 + log.info("sn电表设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
  146 + successCount, failCount, needSyncDataList.size());
  147 +
  148 + } catch (Exception e) {
  149 + log.error("sn执行电表设备属性上报任务时发生异常", e);
  150 + } finally {
  151 + log.info("凯盛电表设备属性上报任务执行结束");
  152 + }
  153 + }
  154 +
  155 + public void deviceWater() {
  156 + log.info("开始执行凯盛水表设备属性上报任务");
  157 + try {
  158 + List<Object> needSyncDataList = initConnectAndSelectData("W");
  159 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  160 + log.info("水表没有需要上报的数据,任务结束");
  161 + return;
  162 + }
  163 +
  164 + log.info("开始处理{}条水表设备数据并进行上报", needSyncDataList.size());
  165 + int successCount = 0;
  166 + int failCount = 0;
  167 + List<String> failedDevices = new ArrayList<>();
  168 + Map<Object, String> map = new HashMap<>();
  169 + map.put(1,"devid");
  170 + map.put(2,"zll");
  171 + map.put(3,"ssll");
  172 +
  173 +
  174 + for (int i = 0; i < needSyncDataList.size(); i++) {
  175 + Object needSyncData = needSyncDataList.get(i);
  176 + try {
  177 + List<Object> dataList = (ArrayList) needSyncData;
  178 + String sn = dataList.get(0).toString();
  179 +
  180 + // 1. 获取当前时间
  181 + LocalDateTime now = LocalDateTime.now();
  182 + // 2. 定义间隔分钟数 (例如 5 分钟)
  183 + int intervalMinutes = 5;
  184 + // 3. 计算向下取整的时间
  185 + // 逻辑:获取当前的分钟数,减去 (分钟数 % 间隔),秒和纳秒设为 0
  186 + LocalDateTime roundedTime = now
  187 + .withMinute(now.getMinute() - (now.getMinute() % intervalMinutes))
  188 + .withSecond(0)
  189 + .withNano(0);
  190 +
  191 + // 4. 格式化输出
  192 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  193 +
  194 + for (int j = 1; j < dataList.size(); j++) {
  195 + Map<String, Object> properties = new HashMap<>(5);
  196 + Object o = dataList.get(j).toString();
  197 + if (o == null) {
  198 + continue;
  199 + }
  200 + String deviceVarSn = "ks_" + sn + map.get(j);
  201 + log.info("===deviceVarSn==" + deviceVarSn);
  202 + properties.put("channelSn", channelSn);
  203 + properties.put("deviceVarSn", deviceVarSn);
  204 + properties.put("value", Double.parseDouble(o.toString().trim()));
  205 + properties.put("saveTime", roundedTime.format(formatter));
  206 + properties.put("collectTime", roundedTime.format(formatter));
  207 + try {
  208 + log.debug("水表开始MQTT发布: sn={}", sn);
  209 + MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
  210 + successCount++;
  211 +
  212 + } catch (Exception e) {
  213 + failCount++;
  214 + failedDevices.add(sn);
  215 + log.error("水表设备上报失败: sn={}, clientId={}, topic={}, 错误信息: {}",
  216 + sn, clientId, topic, e.getMessage());
  217 + log.error("水表详细异常信息:", e);
  218 + }
  219 +
  220 + // 可选:添加延迟,避免发送过快
  221 + try {
  222 + Thread.sleep(10); // 10毫秒延迟
  223 + } catch (InterruptedException e) {
  224 + log.warn("水表线程休眠被中断", e);
  225 + Thread.currentThread().interrupt();
  226 + }
  227 + }
  228 + } catch (Exception e) {
  229 + failCount++;
  230 + log.error("水表处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
  231 + }
  232 + }
  233 +
  234 + log.info("水表设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
  235 + successCount, failCount, needSyncDataList.size());
  236 +
  237 + } catch (Exception e) {
  238 + log.error("水表执行设备属性上报任务时发生异常", e);
  239 + } finally {
  240 + log.info("凯盛水表设备属性上报任务执行结束");
  241 + }
  242 + }
  243 +
  244 + private List<Object> initConnectAndSelectData(String type) {
  245 + Connection connection = null;
  246 + PreparedStatement statement = null;
  247 + ResultSet resultSet = null;
  248 + HikariDataSource dataSource = null;
  249 + List<Object> resultList = new ArrayList<>();
  250 +
  251 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  252 +
  253 + try {
  254 + HikariConfig config = new HikariConfig();
  255 + config.setJdbcUrl(jdbcUrl);
  256 + config.setUsername(jdbcUserName);
  257 + config.setPassword(jdbcPassword);
  258 + config.setDriverClassName("org.postgresql.Driver");
  259 + config.setMaximumPoolSize(5);
  260 + config.setMinimumIdle(5);
  261 + config.setConnectionTimeout(60000);
  262 + config.setConnectionTestQuery("SELECT 1");
  263 +
  264 + dataSource = new HikariDataSource(config);
  265 + log.info("Hikari连接池配置完成");
  266 +
  267 + connection = dataSource.getConnection();
  268 + log.info("数据库连接成功");
  269 + if ("E".equals(type)) {
  270 + statement = connection.prepareStatement(selectSql);
  271 + log.info("执行SQL查询: {}", selectSql);
  272 + } else {
  273 + statement = connection.prepareStatement(selectSql1);
  274 + log.info("执行SQL查询: {}", selectSql1);
  275 + }
  276 +
  277 + resultSet = statement.executeQuery();
  278 + ResultSetMetaData metaData = resultSet.getMetaData();
  279 + int columnCount = metaData.getColumnCount();
  280 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  281 +
  282 + int rowCount = 0;
  283 + while (resultSet.next()) {
  284 + List<Object> result = new ArrayList<>(columnCount);
  285 + for (int index = 1; index <= columnCount; index++) {
  286 + int columnType = metaData.getColumnType(index);
  287 + Object value = getTypedValue(resultSet, index, columnType);
  288 + result.add(value);
  289 + }
  290 + resultList.add(result);
  291 + rowCount++;
  292 +
  293 + // 每处理1000行记录一次日志
  294 + if (rowCount % 1000 == 0) {
  295 + log.info("已处理{}行数据", rowCount);
  296 + }
  297 + }
  298 +
  299 + log.info("数据查询完成,共获取{}行数据", rowCount);
  300 +
  301 + } catch (SQLException e) {
  302 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  303 + } catch (Exception e) {
  304 + log.error("初始化数据库连接或查询数据时发生异常", e);
  305 + } finally {
  306 + // 释放资源
  307 + try {
  308 + if (resultSet != null) resultSet.close();
  309 + if (statement != null) statement.close();
  310 + if (connection != null) connection.close();
  311 + log.info("数据库连接资源已释放");
  312 + } catch (SQLException e) {
  313 + log.error("关闭数据库资源时发生异常", e);
  314 + }
  315 +
  316 + if (dataSource != null) {
  317 + try {
  318 + dataSource.close();
  319 + log.info("HikariDataSource连接池已关闭");
  320 + } catch (Exception e) {
  321 + log.error("关闭HikariDataSource连接池时发生异常", e);
  322 + }
  323 + }
  324 + }
  325 +
  326 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  327 + return resultList;
  328 + }
  329 +
  330 + private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  331 + Object value;
  332 +
  333 + try {
  334 + switch (sqlType) {
  335 + case Types.BIT:
  336 + case Types.BOOLEAN:
  337 + value = rs.getBoolean(index);
  338 + return rs.wasNull() ? null : value;
  339 +
  340 + case Types.TINYINT:
  341 + case Types.SMALLINT:
  342 + case Types.INTEGER:
  343 + value = rs.getInt(index);
  344 + return rs.wasNull() ? null : value;
  345 +
  346 + case Types.BIGINT:
  347 + value = rs.getLong(index);
  348 + return rs.wasNull() ? null : value;
  349 +
  350 + case Types.FLOAT:
  351 + case Types.REAL:
  352 + value = rs.getFloat(index);
  353 + return rs.wasNull() ? null : value;
  354 +
  355 + case Types.DOUBLE:
  356 + value = rs.getDouble(index);
  357 + return rs.wasNull() ? null : value;
  358 +
  359 + case Types.NUMERIC:
  360 + case Types.DECIMAL:
  361 + value = rs.getBigDecimal(index);
  362 + return rs.wasNull() ? null : value;
  363 +
  364 + case Types.CHAR:
  365 + case Types.VARCHAR:
  366 + case Types.LONGVARCHAR:
  367 + case Types.NCHAR:
  368 + case Types.NVARCHAR:
  369 + case Types.LONGNVARCHAR:
  370 + value = rs.getString(index);
  371 + return rs.wasNull() ? null : value;
  372 +
  373 + case Types.DATE:
  374 + Date date = rs.getDate(index);
  375 + return date != null ? date.toLocalDate() : null;
  376 +
  377 + case Types.TIME:
  378 + Time time = rs.getTime(index);
  379 + return time != null ? time.toLocalTime() : null;
  380 +
  381 + case Types.TIMESTAMP:
  382 + Timestamp timestamp = rs.getTimestamp(index);
  383 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  384 +
  385 + case Types.BINARY:
  386 + case Types.VARBINARY:
  387 + case Types.LONGVARBINARY:
  388 + value = rs.getBytes(index);
  389 + return rs.wasNull() ? null : value;
  390 +
  391 + case Types.BLOB:
  392 + value = rs.getBlob(index);
  393 + return rs.wasNull() ? null : value;
  394 +
  395 + case Types.CLOB:
  396 + value = rs.getClob(index);
  397 + return rs.wasNull() ? null : value;
  398 +
  399 + default:
  400 + value = rs.getObject(index);
  401 + return rs.wasNull() ? null : value;
  402 + }
  403 + } catch (SQLException e) {
  404 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  405 + throw e;
  406 + }
  407 + }
  408 +}
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import io.micrometer.common.util.StringUtils;
  4 +import lombok.extern.slf4j.Slf4j;
  5 +import org.eclipse.paho.client.mqttv3.*;
  6 +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7 +
  8 +import java.nio.charset.StandardCharsets;
  9 +import java.util.ArrayList;
  10 +import java.util.List;
  11 +import java.util.UUID;
  12 +
  13 +/**
  14 + * MQTT客户端工具类
  15 + * 支持连接、发布、订阅、断开连接等操作
  16 + */
  17 +@Slf4j
  18 +public class MqttUtils {
  19 +
  20 + private MqttClient mqttClient;
  21 + private MqttConnectOptions connectOptions;
  22 + private ConnectionCallback connectionCallback;
  23 + private MessageCallback messageCallback;
  24 + private boolean isConnected = false;
  25 + private List<String> subscribedTopics = new ArrayList<>();
  26 +
  27 + /**
  28 + * 连接回调接口
  29 + */
  30 + public interface ConnectionCallback {
  31 + void onConnected();
  32 +
  33 + void onDisconnected();
  34 +
  35 + void onConnectionFailed(Throwable cause);
  36 +
  37 + void onConnectionLost(Throwable cause);
  38 + }
  39 +
  40 + /**
  41 + * 消息回调接口
  42 + */
  43 + public interface MessageCallback {
  44 + void onMessageReceived(String topic, String message);
  45 +
  46 + void onMessageDelivered(IMqttDeliveryToken token);
  47 + }
  48 +
  49 + /**
  50 + * MQTT连接配置构建器
  51 + */
  52 + public static class MqttConfig {
  53 + private String broker;
  54 + private String clientId;
  55 + private String username;
  56 + private String password;
  57 + private int connectionTimeout = 10;
  58 + private int keepAliveInterval = 60;
  59 + private boolean cleanSession = true;
  60 + private boolean automaticReconnect = true;
  61 + private int maxInflight = 10;
  62 + private int qos = 1;
  63 +
  64 + public MqttConfig(String broker) {
  65 + this.broker = broker;
  66 + this.clientId = "Client_" + UUID.randomUUID().toString().substring(0, 8);
  67 + }
  68 +
  69 + public MqttConfig setBroker(String broker) {
  70 + this.broker = broker;
  71 + return this;
  72 + }
  73 +
  74 + public MqttConfig setClientId(String clientId) {
  75 + this.clientId = clientId;
  76 + return this;
  77 + }
  78 +
  79 + public MqttConfig setUsername(String username) {
  80 + this.username = username;
  81 + return this;
  82 + }
  83 +
  84 + public MqttConfig setPassword(String password) {
  85 + this.password = password;
  86 + return this;
  87 + }
  88 +
  89 + public MqttConfig setConnectionTimeout(int connectionTimeout) {
  90 + this.connectionTimeout = connectionTimeout;
  91 + return this;
  92 + }
  93 +
  94 + public MqttConfig setKeepAliveInterval(int keepAliveInterval) {
  95 + this.keepAliveInterval = keepAliveInterval;
  96 + return this;
  97 + }
  98 +
  99 + public MqttConfig setCleanSession(boolean cleanSession) {
  100 + this.cleanSession = cleanSession;
  101 + return this;
  102 + }
  103 +
  104 + public MqttConfig setAutomaticReconnect(boolean automaticReconnect) {
  105 + this.automaticReconnect = automaticReconnect;
  106 + return this;
  107 + }
  108 +
  109 + public MqttConfig setMaxInflight(int maxInflight) {
  110 + this.maxInflight = maxInflight;
  111 + return this;
  112 + }
  113 +
  114 + public MqttConfig setQos(int qos) {
  115 + this.qos = qos;
  116 + return this;
  117 + }
  118 +
  119 + public String getBroker() {
  120 + return broker;
  121 + }
  122 +
  123 + public String getClientId() {
  124 + return clientId;
  125 + }
  126 +
  127 + public String getUsername() {
  128 + return username;
  129 + }
  130 +
  131 + public String getPassword() {
  132 + return password;
  133 + }
  134 +
  135 + public int getConnectionTimeout() {
  136 + return connectionTimeout;
  137 + }
  138 +
  139 + public int getKeepAliveInterval() {
  140 + return keepAliveInterval;
  141 + }
  142 +
  143 + public boolean isCleanSession() {
  144 + return cleanSession;
  145 + }
  146 +
  147 + public boolean isAutomaticReconnect() {
  148 + return automaticReconnect;
  149 + }
  150 +
  151 + public int getMaxInflight() {
  152 + return maxInflight;
  153 + }
  154 +
  155 + public int getQos() {
  156 + return qos;
  157 + }
  158 + }
  159 +
  160 + /**
  161 + * 创建MQTT工具实例
  162 + */
  163 + public MqttUtils() {
  164 + }
  165 +
  166 + /**
  167 + * 连接到MQTT服务器
  168 + *
  169 + * @param config 连接配置
  170 + * @throws MqttException 连接异常
  171 + */
  172 + public void connect(MqttConfig config) throws MqttException {
  173 + connect(config, null, null);
  174 + }
  175 +
  176 + /**
  177 + * 连接到MQTT服务器(带回调)
  178 + *
  179 + * @param config 连接配置
  180 + * @param connCallback 连接回调
  181 + * @param msgCallback 消息回调
  182 + * @throws MqttException 连接异常
  183 + */
  184 + public void connect(MqttConfig config, ConnectionCallback connCallback, MessageCallback msgCallback)
  185 + throws MqttException {
  186 +
  187 + this.connectionCallback = connCallback;
  188 + this.messageCallback = msgCallback;
  189 +
  190 + try {
  191 + // 创建MQTT客户端
  192 + String brokerUrl = config.getBroker().startsWith("tcp://") ?
  193 + config.getBroker() : "tcp://" + config.getBroker();
  194 +
  195 + mqttClient = new MqttClient(brokerUrl, config.getClientId(), new MemoryPersistence());
  196 +
  197 + // 设置连接选项
  198 + connectOptions = new MqttConnectOptions();
  199 + connectOptions.setCleanSession(config.isCleanSession());
  200 + connectOptions.setConnectionTimeout(config.getConnectionTimeout());
  201 + connectOptions.setKeepAliveInterval(config.getKeepAliveInterval());
  202 + connectOptions.setAutomaticReconnect(config.isAutomaticReconnect());
  203 + connectOptions.setMaxInflight(config.getMaxInflight());
  204 +
  205 + // 设置用户名密码(如果有)
  206 + if (config.getUsername() != null && !config.getUsername().isEmpty()) {
  207 + connectOptions.setUserName(config.getUsername());
  208 + if (config.getPassword() != null && !config.getPassword().isEmpty()) {
  209 + connectOptions.setPassword(config.getPassword().toCharArray());
  210 + }
  211 + }
  212 +
  213 + // 设置回调
  214 + mqttClient.setCallback(new MqttCallbackExtended() {
  215 + @Override
  216 + public void connectComplete(boolean reconnect, String serverURI) {
  217 + isConnected = true;
  218 + log.info("MQTT连接成功: " + serverURI + (reconnect ? " (重连)" : ""));
  219 + if (connectionCallback != null) {
  220 + connectionCallback.onConnected();
  221 + }
  222 + }
  223 +
  224 + @Override
  225 + public void connectionLost(Throwable cause) {
  226 + isConnected = false;
  227 + log.error("MQTT连接断开: " + cause.getMessage());
  228 + if (connectionCallback != null) {
  229 + connectionCallback.onConnectionLost(cause);
  230 + }
  231 + }
  232 +
  233 + @Override
  234 + public void messageArrived(String topic, MqttMessage message) {
  235 + String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
  236 + log.info("收到消息 - Topic: " + topic + ", QoS: " + message.getQos() +
  237 + ", Message: " + payload);
  238 +
  239 + if (messageCallback != null) {
  240 + messageCallback.onMessageReceived(topic, payload);
  241 + }
  242 + }
  243 +
  244 + @Override
  245 + public void deliveryComplete(IMqttDeliveryToken token) {
  246 + log.info("消息发送完成");
  247 + if (messageCallback != null) {
  248 + messageCallback.onMessageDelivered(token);
  249 + }
  250 + }
  251 + });
  252 +
  253 + // 连接到服务器
  254 + log.info("正在连接到: " + brokerUrl);
  255 + mqttClient.connect(connectOptions);
  256 +
  257 + } catch (MqttException e) {
  258 + isConnected = false;
  259 + log.error("连接失败: " + e.getMessage());
  260 + if (connectionCallback != null) {
  261 + connectionCallback.onConnectionFailed(e);
  262 + }
  263 + throw e;
  264 + }
  265 + }
  266 +
  267 + /**
  268 + * 发布消息
  269 + *
  270 + * @param topic 主题
  271 + * @param message 消息内容
  272 + * @param qos QoS级别 (0, 1, 2)
  273 + * @param retained 是否保留消息
  274 + * @throws MqttException 发布异常
  275 + */
  276 + public void publish(String topic, String message, int qos, boolean retained) throws MqttException {
  277 + if (!isConnected) {
  278 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  279 + }
  280 +
  281 + try {
  282 + MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
  283 + mqttMessage.setQos(qos);
  284 + mqttMessage.setRetained(retained);
  285 +
  286 + mqttClient.publish(topic, mqttMessage);
  287 + log.info("发布成功 - Topic: " + topic + ", QoS: " + qos +
  288 + ", Retained: " + retained + ", Message: " + message);
  289 + } catch (MqttException e) {
  290 + log.error("发布失败: " + e.getMessage());
  291 + throw e;
  292 + }
  293 + }
  294 +
  295 + /**
  296 + * 发布消息(简化版)
  297 + *
  298 + * @param topic 主题
  299 + * @param message 消息内容
  300 + * @throws MqttException 发布异常
  301 + */
  302 + public void publish(String topic, String message) throws MqttException {
  303 + publish(topic, message, 1, false);
  304 + }
  305 +
  306 + /**
  307 + * 订阅主题
  308 + *
  309 + * @param topic 主题
  310 + * @param qos QoS级别
  311 + * @throws MqttException 订阅异常
  312 + */
  313 + public void subscribe(String topic, int qos) throws MqttException {
  314 + if (!isConnected) {
  315 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  316 + }
  317 +
  318 + try {
  319 + mqttClient.subscribe(topic, qos);
  320 + subscribedTopics.add(topic);
  321 + log.info("订阅成功 - Topic: " + topic + ", QoS: " + qos);
  322 + } catch (MqttException e) {
  323 + log.error("订阅失败: " + e.getMessage());
  324 + throw e;
  325 + }
  326 + }
  327 +
  328 + /**
  329 + * 订阅多个主题
  330 + *
  331 + * @param topics 主题数组
  332 + * @param qos QoS级别数组
  333 + * @throws MqttException 订阅异常
  334 + */
  335 + public void subscribe(String[] topics, int[] qos) throws MqttException {
  336 + if (!isConnected) {
  337 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  338 + }
  339 +
  340 + try {
  341 + mqttClient.subscribe(topics, qos);
  342 + for (String topic : topics) {
  343 + subscribedTopics.add(topic);
  344 + }
  345 + log.info("批量订阅成功 - 主题数量: " + topics.length);
  346 + } catch (MqttException e) {
  347 + log.error("批量订阅失败: " + e.getMessage());
  348 + throw e;
  349 + }
  350 + }
  351 +
  352 + /**
  353 + * 取消订阅
  354 + *
  355 + * @param topic 主题
  356 + * @throws MqttException 取消订阅异常
  357 + */
  358 + public void unsubscribe(String topic) throws MqttException {
  359 + if (!isConnected) {
  360 + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  361 + }
  362 +
  363 + try {
  364 + mqttClient.unsubscribe(topic);
  365 + subscribedTopics.remove(topic);
  366 + log.info("取消订阅成功 - Topic: " + topic);
  367 + } catch (MqttException e) {
  368 + log.error("取消订阅失败: " + e.getMessage());
  369 + throw e;
  370 + }
  371 + }
  372 +
  373 + /**
  374 + * 断开连接
  375 + */
  376 + public void disconnect() {
  377 + if (mqttClient != null && isConnected) {
  378 + try {
  379 + // 取消所有订阅
  380 + if (!subscribedTopics.isEmpty()) {
  381 + String[] topics = subscribedTopics.toArray(new String[0]);
  382 + mqttClient.unsubscribe(topics);
  383 + subscribedTopics.clear();
  384 + }
  385 +
  386 + // 断开连接
  387 + mqttClient.disconnect();
  388 + isConnected = false;
  389 + log.info("MQTT连接已断开");
  390 +
  391 + if (connectionCallback != null) {
  392 + connectionCallback.onDisconnected();
  393 + }
  394 +
  395 + } catch (MqttException e) {
  396 + log.error("断开连接异常: " + e.getMessage());
  397 + }
  398 + }
  399 + }
  400 +
  401 + /**
  402 + * 判断是否已连接
  403 + *
  404 + * @return 连接状态
  405 + */
  406 + public boolean isConnected() {
  407 + return isConnected && mqttClient != null && mqttClient.isConnected();
  408 + }
  409 +
  410 + /**
  411 + * 获取客户端ID
  412 + *
  413 + * @return 客户端ID
  414 + */
  415 + public String getClientId() {
  416 + return mqttClient != null ? mqttClient.getClientId() : null;
  417 + }
  418 +
  419 + /**
  420 + * 获取服务器URI
  421 + *
  422 + * @return 服务器URI
  423 + */
  424 + public String getServerURI() {
  425 + return mqttClient != null ? mqttClient.getServerURI() : null;
  426 + }
  427 +
  428 + /**
  429 + * 获取已订阅的主题列表
  430 + *
  431 + * @return 主题列表
  432 + */
  433 + public List<String> getSubscribedTopics() {
  434 + return new ArrayList<>(subscribedTopics);
  435 + }
  436 +
  437 + /**
  438 + * 关闭客户端(释放资源)
  439 + */
  440 + public void close() {
  441 + disconnect();
  442 + try {
  443 + if (mqttClient != null) {
  444 + mqttClient.close();
  445 + log.info("MQTT客户端已关闭");
  446 + }
  447 + } catch (MqttException e) {
  448 + log.error("关闭客户端异常: " + e.getMessage());
  449 + }
  450 + }
  451 +
  452 + /**
  453 + * 快速连接并发送消息
  454 + */
  455 + public static void quickPublish(String broker, String topic, String username,
  456 + String password, String clientId, String message) throws Exception {
  457 + MqttUtils mqttUtils = new MqttUtils();
  458 +
  459 + MqttConfig config = new MqttConfig(broker);
  460 +
  461 + if (StringUtils.isNotBlank(username)) {
  462 + config.setUsername(username);
  463 + }
  464 + if (StringUtils.isNotBlank(password)) {
  465 + config.setPassword(password);
  466 + }
  467 + if (StringUtils.isNotBlank(clientId)) {
  468 + config.setClientId(clientId);
  469 + }
  470 +
  471 + config.setConnectionTimeout(15)
  472 + .setKeepAliveInterval(30)
  473 + .setQos(1);
  474 +
  475 + try {
  476 + mqttUtils.connect(config);
  477 + mqttUtils.publish(topic, message);
  478 + log.info("消息发送成功!");
  479 + } finally {
  480 + mqttUtils.disconnect();
  481 + }
  482 + }
  483 +
  484 +
  485 + public static void main(String[] args) {
  486 +
  487 + String broker = "10.9.0.205:1883";
  488 + String clientId = "QesP7NB4PpAQhupFxlY";
  489 + String user = "admin";
  490 + String pwd = "123456";
  491 + String topic = "v1/devices/me/telemetry";
  492 + String message = "{\"temp\": 12.8,\"status\": \"ON\"}";
  493 +
  494 + try {
  495 + quickPublish(broker, topic, user, pwd, clientId, message);
  496 + } catch (Exception e) {
  497 + e.printStackTrace();
  498 + }
  499 +
  500 +// MqttUtils mqttUtils = new MqttUtils();
  501 +// MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker)
  502 +// .setClientId(clientId)
  503 +// .setUsername(user)
  504 +// .setPassword(pwd)
  505 +// .setConnectionTimeout(15)
  506 +// .setKeepAliveInterval(30)
  507 +// .setQos(1);
  508 +//
  509 +// try {
  510 +// // 连接
  511 +// mqttUtils.connect(config);
  512 +//
  513 +// // 发布消息
  514 +// mqttUtils.publish(topic, message);
  515 +//
  516 +// } catch (MqttException e) {
  517 +// e.printStackTrace();
  518 +// } finally {
  519 +// // 断开连接
  520 +// mqttUtils.disconnect();
  521 +// }
  522 + }
  523 +}
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +
  5 +import java.sql.*;
  6 +
  7 +@Slf4j
  8 +public class SqlTypedValueUtils {
  9 +
  10 + public static Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  11 + Object value;
  12 +
  13 + try {
  14 + switch (sqlType) {
  15 + case Types.BIT:
  16 + case Types.BOOLEAN:
  17 + value = rs.getBoolean(index);
  18 + return rs.wasNull() ? null : value;
  19 +
  20 + case Types.TINYINT:
  21 + case Types.SMALLINT:
  22 + case Types.INTEGER:
  23 + value = rs.getInt(index);
  24 + return rs.wasNull() ? null : value;
  25 +
  26 + case Types.BIGINT:
  27 + value = rs.getLong(index);
  28 + return rs.wasNull() ? null : value;
  29 +
  30 + case Types.FLOAT:
  31 + case Types.REAL:
  32 + value = rs.getFloat(index);
  33 + return rs.wasNull() ? null : value;
  34 +
  35 + case Types.DOUBLE:
  36 + value = rs.getDouble(index);
  37 + return rs.wasNull() ? null : value;
  38 +
  39 + case Types.NUMERIC:
  40 + case Types.DECIMAL:
  41 + value = rs.getBigDecimal(index);
  42 + return rs.wasNull() ? null : value;
  43 +
  44 + case Types.CHAR:
  45 + case Types.VARCHAR:
  46 + case Types.LONGVARCHAR:
  47 + case Types.NCHAR:
  48 + case Types.NVARCHAR:
  49 + case Types.LONGNVARCHAR:
  50 + value = rs.getString(index);
  51 + return rs.wasNull() ? null : value;
  52 +
  53 + case Types.DATE:
  54 + Date date = rs.getDate(index);
  55 + return date != null ? date.toLocalDate() : null;
  56 +
  57 + case Types.TIME:
  58 + Time time = rs.getTime(index);
  59 + return time != null ? time.toLocalTime() : null;
  60 +
  61 + case Types.TIMESTAMP:
  62 + Timestamp timestamp = rs.getTimestamp(index);
  63 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  64 +
  65 + case Types.BINARY:
  66 + case Types.VARBINARY:
  67 + case Types.LONGVARBINARY:
  68 + value = rs.getBytes(index);
  69 + return rs.wasNull() ? null : value;
  70 +
  71 + case Types.BLOB:
  72 + value = rs.getBlob(index);
  73 + return rs.wasNull() ? null : value;
  74 +
  75 + case Types.CLOB:
  76 + value = rs.getClob(index);
  77 + return rs.wasNull() ? null : value;
  78 +
  79 + default:
  80 + value = rs.getObject(index);
  81 + return rs.wasNull() ? null : value;
  82 + }
  83 + } catch (SQLException e) {
  84 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  85 + throw e;
  86 + }
  87 + }
  88 +}
... ...
  1 +package com.iot.scheduler.zone;
  2 +
  3 +import com.iot.scheduler.service.KSDeviceReportService;
  4 +import com.iot.scheduler.task.AbstractZoneScheduler;
  5 +import jakarta.annotation.Resource;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.scheduling.annotation.Scheduled;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +@Slf4j
  11 +@Component
  12 +public class KaiShengZoneScheduler extends AbstractZoneScheduler {
  13 + @Resource
  14 + private KSDeviceReportService ksDeviceReportService;
  15 +
  16 + @Override
  17 + protected String getZoneName() {
  18 + return "KaiSheng (凯盛)";
  19 + }
  20 +
  21 + @Scheduled(cron = "${scheduler.ks.push:0 0/5 * * * ?}")
  22 + public void pushKsDevicesToThirdParty() {
  23 + String taskName = "ks Push Devices (IoT -> 3rd Party)";
  24 + logStart(taskName);
  25 + try {
  26 + log.info("[{}] ks pushing devices...", getZoneName());
  27 + ksDeviceReportService.deviceElectricity();
  28 +// ksDeviceReportService.deviceWater();
  29 + Thread.sleep(1000);
  30 + } catch (Exception e) {
  31 + logError(taskName, e);
  32 + } finally {
  33 + logEnd(taskName);
  34 + }
  35 + }
  36 +}
... ...
... ... @@ -5,7 +5,7 @@ spring:
5 5 banner-mode: off
6 6
7 7 server:
8   - port: 8080
  8 + port: 32211
9 9
10 10 logging:
11 11 level:
... ... @@ -24,3 +24,83 @@ scheduler:
24 24 panji:
25 25 pull: "0 0/5 * * * ?"
26 26 push: "0 0/10 * * * ?"
  27 + ks:
  28 + pull: "0 0/5 * * * ?"
  29 + push: "0 0/5 * * * ?"
  30 +ks:
  31 + third:
  32 + jdbcUrl: "jdbc:postgresql://192.168.0.249:5432/iot"
  33 + jdbcUserName: "postgres"
  34 + jdbcPassword: "1qaz@WSX"
  35 + selectSql: "SELECT
  36 + de.sn AS sn,
  37 + tkl1.dbl_v AS Ep,
  38 + tkl2.dbl_v AS Ubc,
  39 + tkl3.dbl_v AS Uc,
  40 + tkl4.dbl_v AS P,
  41 + tkl5.dbl_v AS Uab,
  42 + tkl6.dbl_v AS Ib,
  43 + tkl7.dbl_v AS Eq,
  44 + tkl8.dbl_v AS F,
  45 + tkl9.dbl_v AS Q,
  46 + tkl10.dbl_v AS Ua,
  47 + tkl11.dbl_v AS S,
  48 + tkl12.dbl_v AS Ic,
  49 + tkl13.dbl_v AS Ub,
  50 + tkl14.dbl_v AS Uca,
  51 + tkl15.dbl_v AS PSum,
  52 + tkl16.dbl_v AS Ia
  53 + FROM
  54 + device de
  55 + LEFT JOIN ts_kv_latest tkl1 ON de.id = tkl1.entity_id
  56 + AND tkl1.key = 64
  57 + LEFT JOIN ts_kv_latest tkl2 ON de.id = tkl2.entity_id
  58 + AND tkl2.key = 65
  59 + LEFT JOIN ts_kv_latest tkl3 ON de.id = tkl3.entity_id
  60 + AND tkl3.key = 66
  61 + LEFT JOIN ts_kv_latest tkl4 ON de.id = tkl4.entity_id
  62 + AND tkl4.key = 67
  63 + LEFT JOIN ts_kv_latest tkl5 ON de.id = tkl5.entity_id
  64 + AND tkl5.key = 68
  65 + LEFT JOIN ts_kv_latest tkl6 ON de.id = tkl6.entity_id
  66 + AND tkl6.key = 69
  67 + LEFT JOIN ts_kv_latest tkl7 ON de.id = tkl7.entity_id
  68 + AND tkl7.key = 70
  69 + LEFT JOIN ts_kv_latest tkl8 ON de.id = tkl8.entity_id
  70 + AND tkl8.key = 71
  71 + LEFT JOIN ts_kv_latest tkl9 ON de.id = tkl9.entity_id
  72 + AND tkl9.key = 72
  73 + LEFT JOIN ts_kv_latest tkl10 ON de.id = tkl10.entity_id
  74 + AND tkl10.key = 73
  75 + LEFT JOIN ts_kv_latest tkl11 ON de.id = tkl11.entity_id
  76 + AND tkl11.key = 74
  77 + LEFT JOIN ts_kv_latest tkl12 ON de.id = tkl12.entity_id
  78 + AND tkl12.key = 75
  79 + LEFT JOIN ts_kv_latest tkl13 ON de.id = tkl13.entity_id
  80 + AND tkl13.key = 76
  81 + LEFT JOIN ts_kv_latest tkl14 ON de.id = tkl14.entity_id
  82 + AND tkl14.key = 77
  83 + LEFT JOIN ts_kv_latest tkl15 ON de.id = tkl15.entity_id
  84 + AND tkl15.key = 78
  85 + LEFT JOIN ts_kv_latest tkl16 ON de.id = tkl16.entity_id
  86 + AND tkl16.key = 79
  87 + WHERE
  88 + de.organization_id='1697500a-dc11-45cc-88f5-2ad47472a9bb'
  89 + AND de.device_profile_id = 'c2401630-ffec-11f0-926f-2f3182abc65f'"
  90 +
  91 + selectSql1: "SELECT
  92 + de.sn AS sn,
  93 + tkl1.dbl_v AS devid,
  94 + tkl2.dbl_v AS zll,
  95 + tkl3.dbl_v AS ssll
  96 + FROM
  97 + device de
  98 + LEFT JOIN ts_kv_latest tkl1 ON de.id = tkl1.entity_id
  99 + AND tkl1.key = 58
  100 + LEFT JOIN ts_kv_latest tkl2 ON de.id = tkl2.entity_id
  101 + AND tkl2.key = 83
  102 + LEFT JOIN ts_kv_latest tkl3 ON de.id = tkl3.entity_id
  103 + AND tkl3.key = 84
  104 + WHERE
  105 + de.organization_id='1697500a-dc11-45cc-88f5-2ad47472a9bb'
  106 + AND de.device_profile_id = '4e404b10-ffe7-11f0-926f-2f3182abc65f'"
... ...