HnDeviceReportService.java 14.4 KB
package com.iot.scheduler.service;

import com.alibaba.fastjson.JSON;
import com.iot.scheduler.model.ReportDevice;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 怀宁设备属性上报
 */
@Slf4j
@Service
public class HnDeviceReportService {

    @Value("${hn.third.domain:http://220.180.204.38:1803/report-property}")
    private String reportUrl;

    @Value("${hn.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
    private String jdbcUrl;

    @Value("${hn.third.jdbcUserName:postgres}")
    private String jdbcUserName;

    @Value("${hn.third.jdbcPassword:postgres}")
    private String jdbcPassword;

    @Value("${hn.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
    private String selectSql;

    public void deviceReport() {
        log.info("开始执行怀宁设备属性上报任务");
        try {
            List<Object> needSyncDataList = initConnectAndSelectData();
            log.info("数据库查询完成,共获取{}条数据", needSyncDataList != null ? needSyncDataList.size() : 0);

            if (CollectionUtils.isEmpty(needSyncDataList)) {
                log.info("没有需要上报的数据,任务结束");
                return;
            }

            log.info("开始处理{}条设备数据并进行上报", needSyncDataList.size());
            int successCount = 0;
            int failCount = 0;

            for (int i = 0; i < needSyncDataList.size(); i++) {
                Object needSyncData = needSyncDataList.get(i);
                try {
                    List<Object> dataList = (ArrayList) needSyncData;
                    String deviceId = dataList.get(0).toString();

                    log.info("开始处理设备[{}]的数据,进度: {}/{}", deviceId, i + 1, needSyncDataList.size());

                    ReportDevice reportDevice = new ReportDevice();
                    reportDevice.setDeviceId(deviceId);
                    Map<String, Object> propertiesMap = new HashMap<>(4);

                    String status = dataList.get(1).toString();
                    if ("RUN".equals(status)) {
                        propertiesMap.put("State_Run", 1);
                        propertiesMap.put("State_PowerOn", 1);
                        propertiesMap.put("State_Alarm", 0);
                        log.info("设备[{}]状态为: RUN", deviceId);
                    } else if ("STAND".equals(status)) {
                        propertiesMap.put("State_Run", 0);
                        propertiesMap.put("State_PowerOn", 1);
                        propertiesMap.put("State_Alarm", 0);
                        log.info("设备[{}]状态为: STAND", deviceId);
                    } else if ("ERROR".equals(status)) {
                        propertiesMap.put("State_Run", 0);
                        propertiesMap.put("State_PowerOn", 1);
                        propertiesMap.put("State_Alarm", 1);
                        log.info("设备[{}]状态为: ERROR", deviceId);
                    } else {
                        propertiesMap.put("State_Run", 0);
                        propertiesMap.put("State_PowerOn", 0);
                        propertiesMap.put("State_Alarm", 0);
                        log.info("设备[{}]状态为: 未知[{}],设置为离线", deviceId, status);
                    }

                    Object outputValue = dataList.get(2);
                    long outputLong = 0L;

                    if (outputValue != null) {
                        if (outputValue instanceof Double) {
                            double doubleValue = (Double) outputValue;
                            outputLong = (long) doubleValue; // 直接转型,会丢失小数部分
                            log.info("设备[{}]输出值: {} (double转换为long: {})", deviceId, doubleValue, outputLong);
                        } else if (outputValue instanceof Number) {
                            outputLong = ((Number) outputValue).longValue();
                            log.info("设备[{}]输出值: {} (Number转换为long: {})", deviceId, outputValue, outputLong);
                        } else {
                            // 处理字符串或其他类型
                            outputLong = Long.parseLong(outputValue.toString());
                            log.info("设备[{}]输出值: {} (字符串转换为long: {})", deviceId, outputValue, outputLong);
                        }
                    } else {
                        log.info("设备[{}]输出值为null,使用默认值0", deviceId);
                    }

                    propertiesMap.put("ProInfo_Output", outputLong);
                    reportDevice.setProperties(propertiesMap);

                    boolean sendResult = sendReportDevice(reportDevice);
                    if (sendResult) {
                        successCount++;
//                        log.info("设备[{}]数据上报成功", deviceId);
                    } else {
                        failCount++;
//                        log.error("设备[{}]数据上报失败", deviceId);
                    }

                } catch (Exception e) {
                    failCount++;
                    log.error("处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
                }
            }

            log.info("设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
                    successCount, failCount, needSyncDataList.size());

        } catch (Exception e) {
            log.error("执行设备属性上报任务时发生异常", e);
        } finally {
            log.info("怀宁设备属性上报任务执行结束");
        }
    }

    private boolean sendReportDevice(ReportDevice reportDevice) {
        String deviceId = reportDevice.getDeviceId();
        log.info("开始上报设备[{}]的数据,请求URL: {}", deviceId, reportUrl);

        HttpPost httpPost = new HttpPost(reportUrl);
        CloseableHttpClient httpClient = HttpClients.createDefault();
        String requestBody = JSON.toJSONString(reportDevice);

        log.info("设备[{}]上报请求体: {}", deviceId, requestBody);

        StringEntity entity = new StringEntity(requestBody, ContentType.create("application/json", Consts.UTF_8));
        httpPost.setEntity(entity);
        httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");

        try (CloseableHttpResponse execute = httpClient.execute(httpPost)) {
            int statusCode = execute.getStatusLine().getStatusCode();

            HttpEntity res = execute.getEntity();
            String responseBody = "";
            if (res != null) {
                try (InputStream is = res.getContent();
                     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                    byte[] buf = new byte[128];
                    int len;
                    while ((len = is.read(buf)) != -1) {
                        byteArrayOutputStream.write(buf, 0, len);
                    }
                    responseBody = byteArrayOutputStream.toString("UTF-8");
                }
            }

            log.info("设备[{}]上报响应,状态码: {},响应内容: {}", deviceId, statusCode, responseBody);

            if (statusCode >= 200 && statusCode < 300) {
                return true;
            } else {
                log.error("设备[{}]上报失败,HTTP状态码: {},响应: {}", deviceId, statusCode, responseBody);
                return false;
            }

        } catch (IOException e) {
            log.error("设备[{}]上报请求发生IO异常", deviceId, e);
            return false;
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                log.error("关闭HTTP客户端时发生异常", e);
            }
        }
    }

    private List<Object> initConnectAndSelectData() {
        Connection connection = null;
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        HikariDataSource dataSource = null;
        List<Object> resultList = new ArrayList<>();

        log.info("开始连接数据库,URL: {}", jdbcUrl);

        try {
            HikariConfig config = new HikariConfig();
            config.setJdbcUrl(jdbcUrl);
            config.setUsername(jdbcUserName);
            config.setPassword(jdbcPassword);
            config.setDriverClassName("org.postgresql.Driver");
            config.setMaximumPoolSize(5);
            config.setMinimumIdle(5);
            config.setConnectionTimeout(60000);
            config.setConnectionTestQuery("SELECT 1");

            dataSource = new HikariDataSource(config);
            log.info("Hikari连接池配置完成");

            connection = dataSource.getConnection();
            log.info("数据库连接成功");

            statement = connection.prepareStatement(selectSql);
            log.info("执行SQL查询: {}", selectSql);

            resultSet = statement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            log.info("查询结果集元数据获取成功,共{}列", columnCount);

            int rowCount = 0;
            while (resultSet.next()) {
                List<Object> result = new ArrayList<>(columnCount);
                for (int index = 1; index <= columnCount; index++) {
                    int columnType = metaData.getColumnType(index);
                    Object value = getTypedValue(resultSet, index, columnType);
                    result.add(value);
                }
                resultList.add(result);
                rowCount++;

                // 每处理1000行记录一次日志
                if (rowCount % 1000 == 0) {
                    log.info("已处理{}行数据", rowCount);
                }
            }

            log.info("数据查询完成,共获取{}行数据", rowCount);

        } catch (SQLException e) {
            log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
        } catch (Exception e) {
            log.error("初始化数据库连接或查询数据时发生异常", e);
        } finally {
            // 释放资源
            try {
                if (resultSet != null) resultSet.close();
                if (statement != null) statement.close();
                if (connection != null) connection.close();
                log.info("数据库连接资源已释放");
            } catch (SQLException e) {
                log.error("关闭数据库资源时发生异常", e);
            }

            if (dataSource != null) {
                try {
                    dataSource.close();
                    log.info("HikariDataSource连接池已关闭");
                } catch (Exception e) {
                    log.error("关闭HikariDataSource连接池时发生异常", e);
                }
            }
        }

        log.info("数据库操作完成,返回{}条记录", resultList.size());
        return resultList;
    }

    private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
        Object value;

        try {
            switch (sqlType) {
                case Types.BIT:
                case Types.BOOLEAN:
                    value = rs.getBoolean(index);
                    return rs.wasNull() ? null : value;

                case Types.TINYINT:
                case Types.SMALLINT:
                case Types.INTEGER:
                    value = rs.getInt(index);
                    return rs.wasNull() ? null : value;

                case Types.BIGINT:
                    value = rs.getLong(index);
                    return rs.wasNull() ? null : value;

                case Types.FLOAT:
                case Types.REAL:
                    value = rs.getFloat(index);
                    return rs.wasNull() ? null : value;

                case Types.DOUBLE:
                    value = rs.getDouble(index);
                    return rs.wasNull() ? null : value;

                case Types.NUMERIC:
                case Types.DECIMAL:
                    value = rs.getBigDecimal(index);
                    return rs.wasNull() ? null : value;

                case Types.CHAR:
                case Types.VARCHAR:
                case Types.LONGVARCHAR:
                case Types.NCHAR:
                case Types.NVARCHAR:
                case Types.LONGNVARCHAR:
                    value = rs.getString(index);
                    return rs.wasNull() ? null : value;

                case Types.DATE:
                    Date date = rs.getDate(index);
                    return date != null ? date.toLocalDate() : null;

                case Types.TIME:
                    Time time = rs.getTime(index);
                    return time != null ? time.toLocalTime() : null;

                case Types.TIMESTAMP:
                    Timestamp timestamp = rs.getTimestamp(index);
                    return timestamp != null ? timestamp.toLocalDateTime() : null;

                case Types.BINARY:
                case Types.VARBINARY:
                case Types.LONGVARBINARY:
                    value = rs.getBytes(index);
                    return rs.wasNull() ? null : value;

                case Types.BLOB:
                    value = rs.getBlob(index);
                    return rs.wasNull() ? null : value;

                case Types.CLOB:
                    value = rs.getClob(index);
                    return rs.wasNull() ? null : value;

                default:
                    value = rs.getObject(index);
                    return rs.wasNull() ? null : value;
            }
        } catch (SQLException e) {
            log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
            throw e;
        }
    }
}