KSDeviceReportService.java 16.2 KB
package com.iot.scheduler.service;

import com.alibaba.fastjson.JSON;
import com.iot.scheduler.utils.MqttUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.sql.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 凯盛设备
 */
@Slf4j
@Service
public class KSDeviceReportService {

    @Value("${ks.third.jdbcUrl:jdbc:postgresql://192.168.0.249:5432/iot}")
    private String jdbcUrl;
    @Value("${ks.third.jdbcUserName:postgres}")
    private String jdbcUserName;

    @Value("${ks.third.jdbcPassword:1qaz@WSX}")
    private String jdbcPassword;

    @Value("${ks.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
    private String selectSql;

    @Value("${ks.third.selectSql1:SELECT * FROM ts_kv_dictionary;}")
    private String selectSql1;

    @Value("${ks.third.broker:117.71.99.102:21883}")
    private String broker;
    @Value("${ks.third.username:yunpower}")
    private String username;
    @Value("${ks.third.password:P1o2w3er}")
    private String password;
    @Value("${ks.third.topic:storage0}")
    private String topic;
    @Value("${ks.third.clientId:2020672015207559171}")
    private String clientId;
    @Value("${ks.third.channelSn:B049Z974}")
    private String channelSn;

    public void deviceElectricity() {
        log.info("开始执行凯盛电表设备属性上报任务");
        try {
            List<Object> needSyncDataList = initConnectAndSelectData("E");
            if (CollectionUtils.isEmpty(needSyncDataList)) {
                log.info("电表没有需要上报的数据,任务结束");
                return;
            }

            log.info("开始处理{}条电表设备数据并进行上报", needSyncDataList.size());
            int successCount = 0;
            int failCount = 0;
            List<String> failedDevices = new ArrayList<>();
            Map<Object, String> map = new HashMap<>();
            map.put(1,"Ep");
            map.put(2,"Ua");
            map.put(3,"Ia");

            for (int i = 0; i < needSyncDataList.size(); i++) {
                Object needSyncData = needSyncDataList.get(i);
                try {
                    List<Object> dataList = (ArrayList) needSyncData;
                    String sn = dataList.get(0).toString();

                    // 1. 获取当前时间
                    LocalDateTime now = LocalDateTime.now();
                    // 2. 定义间隔分钟数 (例如 5 分钟)
                    int intervalMinutes = 5;
                    // 3. 计算向下取整的时间
                    // 逻辑:获取当前的分钟数,减去 (分钟数 % 间隔),秒和纳秒设为 0
                    LocalDateTime roundedTime = now
                            .withMinute(now.getMinute() - (now.getMinute() % intervalMinutes))
                            .withSecond(0)
                            .withNano(0);

                    // 4. 格式化输出
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

                    for (int j = 1; j < dataList.size(); j++) {
                        Map<String, Object> properties = new HashMap<>(5);
                        Object o = dataList.get(j);
                        if (o == null) {
                            continue;
                        }
                        String deviceVarSn = "xml_" + sn + map.get(j);
                        log.info("===deviceVarSn==" + deviceVarSn);
                        properties.put("channelSn", channelSn);
                        properties.put("deviceVarSn", deviceVarSn);
                        properties.put("value", Double.parseDouble(o.toString().trim()));
                        properties.put("saveTime", roundedTime.format(formatter));
                        properties.put("collectTime", roundedTime.format(formatter));
                        try {
                            log.debug("电表开始MQTT发布: sn={}", sn);
                            MqttUtils.publish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
                            successCount++;

                        } catch (Exception e) {
                            failCount++;
                            failedDevices.add(sn);
                            log.error("电表设备上报失败: sn={}, clientId={}, topic={}, 错误信息: {}",
                                    sn, clientId, topic, e.getMessage());
                            log.error("电表详细异常信息:", e);
                        }

                        // 可选:添加延迟,避免发送过快
                        try {
                            Thread.sleep(10); // 10毫秒延迟
                        } catch (InterruptedException e) {
                            log.warn("电表线程休眠被中断", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (Exception e) {
                    failCount++;
                    log.error("电表处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
                }
            }

            log.info("sn电表设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
                    successCount, failCount, needSyncDataList.size());

        } catch (Exception e) {
            log.error("sn执行电表设备属性上报任务时发生异常", e);
        } finally {
            log.info("凯盛电表设备属性上报任务执行结束");
        }
    }

    public void deviceWater() {
        log.info("开始执行凯盛水表设备属性上报任务");
        try {
            List<Object> needSyncDataList = initConnectAndSelectData("W");
            if (CollectionUtils.isEmpty(needSyncDataList)) {
                log.info("水表没有需要上报的数据,任务结束");
                return;
            }

            log.info("开始处理{}条水表设备数据并进行上报", needSyncDataList.size());
            int successCount = 0;
            int failCount = 0;
            List<String> failedDevices = new ArrayList<>();
            Map<Object, String> map = new HashMap<>();
            map.put(1,"zll");
            map.put(2,"ssll");


            for (int i = 0; i < needSyncDataList.size(); i++) {
                Object needSyncData = needSyncDataList.get(i);
                try {
                    List<Object> dataList = (ArrayList) needSyncData;
                    String sn = dataList.get(0).toString();

                    // 1. 获取当前时间
                    LocalDateTime now = LocalDateTime.now();
                    // 2. 定义间隔分钟数 (例如 5 分钟)
                    int intervalMinutes = 5;
                    // 3. 计算向下取整的时间
                    // 逻辑:获取当前的分钟数,减去 (分钟数 % 间隔),秒和纳秒设为 0
                    LocalDateTime roundedTime = now
                            .withMinute(now.getMinute() - (now.getMinute() % intervalMinutes))
                            .withSecond(0)
                            .withNano(0);

                    // 4. 格式化输出
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

                    for (int j = 1; j < dataList.size(); j++) {
                        Map<String, Object> properties = new HashMap<>(5);
                        Object o = dataList.get(j);
                        if (o == null) {
                            continue;
                        }
                        String deviceVarSn = "xml_" + sn + map.get(j);
                        log.info("===deviceVarSn==" + deviceVarSn);
                        properties.put("channelSn", channelSn);
                        properties.put("deviceVarSn", deviceVarSn);
                        properties.put("value", Double.parseDouble(o.toString().trim()));
                        properties.put("saveTime", roundedTime.format(formatter));
                        properties.put("collectTime", roundedTime.format(formatter));
                        try {
                            log.debug("水表开始MQTT发布: sn={}", sn);
                            MqttUtils.publish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
                            successCount++;

                        } catch (Exception e) {
                            failCount++;
                            failedDevices.add(sn);
                            log.error("水表设备上报失败: sn={}, clientId={}, topic={}, 错误信息: {}",
                                    sn, clientId, topic, e.getMessage());
                            log.error("水表详细异常信息:", e);
                        }

                        // 可选:添加延迟,避免发送过快
                        try {
                            Thread.sleep(10); // 10毫秒延迟
                        } catch (InterruptedException e) {
                            log.warn("水表线程休眠被中断", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (Exception e) {
                    failCount++;
                    log.error("水表处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
                }
            }

            log.info("水表设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
                    successCount, failCount, needSyncDataList.size());

        } catch (Exception e) {
            log.error("水表执行设备属性上报任务时发生异常", e);
        } finally {
            log.info("凯盛水表设备属性上报任务执行结束");
        }
    }

    private List<Object> initConnectAndSelectData(String type) {
        Connection connection = null;
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        HikariDataSource dataSource = null;
        List<Object> resultList = new ArrayList<>();

        log.info("开始连接数据库,URL: {}", jdbcUrl);

        try {
            HikariConfig config = new HikariConfig();
            config.setJdbcUrl(jdbcUrl);
            config.setUsername(jdbcUserName);
            config.setPassword(jdbcPassword);
            config.setDriverClassName("org.postgresql.Driver");
            config.setMaximumPoolSize(5);
            config.setMinimumIdle(5);
            config.setConnectionTimeout(60000);
            config.setConnectionTestQuery("SELECT 1");

            dataSource = new HikariDataSource(config);
            log.info("Hikari连接池配置完成");

            connection = dataSource.getConnection();
            log.info("数据库连接成功");
            if ("E".equals(type)) {
                statement = connection.prepareStatement(selectSql);
                log.info("执行SQL查询: {}", selectSql);
            } else {
                statement = connection.prepareStatement(selectSql1);
                log.info("执行SQL查询: {}", selectSql1);
            }

            resultSet = statement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            log.info("查询结果集元数据获取成功,共{}列", columnCount);

            int rowCount = 0;
            while (resultSet.next()) {
                List<Object> result = new ArrayList<>(columnCount);
                for (int index = 1; index <= columnCount; index++) {
                    int columnType = metaData.getColumnType(index);
                    Object value = getTypedValue(resultSet, index, columnType);
                    result.add(value);
                }
                resultList.add(result);
                rowCount++;

                // 每处理1000行记录一次日志
                if (rowCount % 1000 == 0) {
                    log.info("已处理{}行数据", rowCount);
                }
            }

            log.info("数据查询完成,共获取{}行数据", rowCount);

        } catch (SQLException e) {
            log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
        } catch (Exception e) {
            log.error("初始化数据库连接或查询数据时发生异常", e);
        } finally {
            // 释放资源
            try {
                if (resultSet != null) resultSet.close();
                if (statement != null) statement.close();
                if (connection != null) connection.close();
                log.info("数据库连接资源已释放");
            } catch (SQLException e) {
                log.error("关闭数据库资源时发生异常", e);
            }

            if (dataSource != null) {
                try {
                    dataSource.close();
                    log.info("HikariDataSource连接池已关闭");
                } catch (Exception e) {
                    log.error("关闭HikariDataSource连接池时发生异常", e);
                }
            }
        }

        log.info("数据库操作完成,返回{}条记录", resultList.size());
        return resultList;
    }

    private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
        Object value;

        try {
            switch (sqlType) {
                case Types.BIT:
                case Types.BOOLEAN:
                    value = rs.getBoolean(index);
                    return rs.wasNull() ? null : value;

                case Types.TINYINT:
                case Types.SMALLINT:
                case Types.INTEGER:
                    value = rs.getInt(index);
                    return rs.wasNull() ? null : value;

                case Types.BIGINT:
                    value = rs.getLong(index);
                    return rs.wasNull() ? null : value;

                case Types.FLOAT:
                case Types.REAL:
                    value = rs.getFloat(index);
                    return rs.wasNull() ? null : value;

                case Types.DOUBLE:
                    value = rs.getDouble(index);
                    return rs.wasNull() ? null : value;

                case Types.NUMERIC:
                case Types.DECIMAL:
                    value = rs.getBigDecimal(index);
                    return rs.wasNull() ? null : value;

                case Types.CHAR:
                case Types.VARCHAR:
                case Types.LONGVARCHAR:
                case Types.NCHAR:
                case Types.NVARCHAR:
                case Types.LONGNVARCHAR:
                    value = rs.getString(index);
                    return rs.wasNull() ? null : value;

                case Types.DATE:
                    Date date = rs.getDate(index);
                    return date != null ? date.toLocalDate() : null;

                case Types.TIME:
                    Time time = rs.getTime(index);
                    return time != null ? time.toLocalTime() : null;

                case Types.TIMESTAMP:
                    Timestamp timestamp = rs.getTimestamp(index);
                    return timestamp != null ? timestamp.toLocalDateTime() : null;

                case Types.BINARY:
                case Types.VARBINARY:
                case Types.LONGVARBINARY:
                    value = rs.getBytes(index);
                    return rs.wasNull() ? null : value;

                case Types.BLOB:
                    value = rs.getBlob(index);
                    return rs.wasNull() ? null : value;

                case Types.CLOB:
                    value = rs.getClob(index);
                    return rs.wasNull() ? null : value;

                default:
                    value = rs.getObject(index);
                    return rs.wasNull() ? null : value;
            }
        } catch (SQLException e) {
            log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
            throw e;
        }
    }
}