HnHsqDeviceReportService.java 12.8 KB
package com.iot.scheduler.service;

import com.alibaba.fastjson.JSONObject;
import com.iot.scheduler.utils.HttpClientUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.*;
import java.sql.*;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
/**
 * 淮南红三七设备上报
 */
public class HnHsqDeviceReportService {

    @Value("${hnhsq.third.tokenUrl:http://58.243.79.51:32210/auth/oauth/token}")
    private String tokenUrl;

    @Value("${hnhsq.third.tokenUser:hnsq}")
    private String tokenUser;

    @Value("${hnhsq.third.tokenPwd:abc@1234}")
    private String tokenPwd;

    @Value("${hnhsq.third.reportUrl:http://58.243.79.51:31357/mainApi/formEngine/formData/batchAddOrUpate}")
    private String reportUrl;

    @Value("${hnhsq.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
    private String jdbcUrl;

    @Value("${hnhsq.third.jdbcUserName:postgres}")
    private String jdbcUserName;

    @Value("${hnhsq.third.jdbcPassword:postgres}")
    private String jdbcPassword;

    @Value("${hnhsq.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
    private String selectSql;

    private static final String companyName = "安徽红三七药业有限公司";

    public static final String lastTimeFilePath = "/thingskit/iot-scheduler/lastts/hsq.txt";
//    public static final String lastTimeFilePath = "D:/data/lastts/hsq.txt";

    public void deviceReport() {
        log.info("开始执行淮南红三七设备属性上报任务");
        try {
            List<Object> needSyncDataList = initConnectAndSelectData();
            log.info("数据库查询完成,共获取{}条数据", needSyncDataList.size());
            if(CollectionUtils.isNotEmpty(needSyncDataList)){
                List<Map<String, Object>> paramsList = switchData(needSyncDataList);
                String token = getToken();
                log.info("token:"+token);
                batchAddOrUpate(token, paramsList);
            }
        } catch (Exception e) {
            log.error("执行设备属性上报任务时发生异常", e);
        } finally {
            log.info("淮南红三七设备属性上报任务执行结束");
        }
    }

    public List<Map<String, Object>> switchData(List<Object> hcDeviceList) {
        List<Map<String, Object>> result = new ArrayList<>();
        if (CollectionUtils.isEmpty(hcDeviceList)) {
            return new ArrayList<>();
        }
        for (Object device : hcDeviceList) {
            Map<String, Object> data = new HashMap<>();
            List<Object> dataList = (ArrayList) device;
            String deviceName = dataList.get(0).toString();
            String deviceCode = dataList.get(1).toString();
            Long ts = Long.parseLong(dataList.get(2).toString());
            long status = Long.parseLong(dataList.get(3).toString());
            String statusStr = "在线";
            if (status == 0) {
                statusStr = "离线";
            }


            data.put("entName", companyName);
            data.put("deviceName",deviceName);
            data.put("deviceCode", deviceCode);
            data.put("reportTime", tsFormart(ts));
            data.put("status", statusStr);

            result.add(data);
        }

        return result;
    }

    public static String tsFormart(Long ts) {
        Instant instant = Instant.ofEpochMilli(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        String formattedDate = localDateTime.format(formatter);
        return formattedDate;
    }

    private List<Object> initConnectAndSelectData() {
        Connection connection = null;
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        HikariDataSource dataSource = null;
        List<Object> resultList = new ArrayList<>();
        long time = new java.util.Date().getTime();
        Long lastExecTs = getLastExecTs();
        String currentSelSql = selectSql + " and kv.ts >=" + lastExecTs + " and kv.ts<" + time;
        log.info("开始连接数据库,URL: {}", jdbcUrl);

        try {
            HikariConfig config = new HikariConfig();
            config.setJdbcUrl(jdbcUrl);
            config.setUsername(jdbcUserName);
            config.setPassword(jdbcPassword);
            config.setDriverClassName("org.postgresql.Driver");
            config.setMaximumPoolSize(5);
            config.setMinimumIdle(5);
            config.setConnectionTimeout(60000);
            config.setConnectionTestQuery("SELECT 1");

            dataSource = new HikariDataSource(config);
            log.info("Hikari连接池配置完成");

            connection = dataSource.getConnection();
            log.info("数据库连接成功");

            statement = connection.prepareStatement(currentSelSql);
            log.info("执行SQL查询: {}", currentSelSql);

            resultSet = statement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            log.info("查询结果集元数据获取成功,共{}列", columnCount);

            int rowCount = 0;
            while (resultSet.next()) {
                List<Object> result = new ArrayList<>(columnCount);
                for (int index = 1; index <= columnCount; index++) {
                    int columnType = metaData.getColumnType(index);
                    Object value = getTypedValue(resultSet, index, columnType);
                    result.add(value);
                }
                resultList.add(result);
                rowCount++;

                // 每处理1000行记录一次日志
                if (rowCount % 1000 == 0) {
                    log.info("已处理{}行数据", rowCount);
                }
            }

            log.info("数据查询完成,共获取{}行数据", rowCount);

        } catch (SQLException e) {
            log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
        } catch (Exception e) {
            log.error("初始化数据库连接或查询数据时发生异常", e);
        } finally {
            // 释放资源
            try {
                if (resultSet != null) resultSet.close();
                if (statement != null) statement.close();
                if (connection != null) connection.close();
                log.info("数据库连接资源已释放");
            } catch (SQLException e) {
                log.error("关闭数据库资源时发生异常", e);
            }

            if (dataSource != null) {
                try {
                    dataSource.close();
                    log.info("HikariDataSource连接池已关闭");
                } catch (Exception e) {
                    log.error("关闭HikariDataSource连接池时发生异常", e);
                }
            }
        }
        setLastExecTs(time);
        log.info("数据库操作完成,返回{}条记录", resultList.size());
        return resultList;
    }

    private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
        Object value;

        try {
            switch (sqlType) {
                case Types.BIT:
                case Types.BOOLEAN:
                    value = rs.getBoolean(index);
                    return rs.wasNull() ? null : value;

                case Types.TINYINT:
                case Types.SMALLINT:
                case Types.INTEGER:
                    value = rs.getInt(index);
                    return rs.wasNull() ? null : value;

                case Types.BIGINT:
                    value = rs.getLong(index);
                    return rs.wasNull() ? null : value;

                case Types.FLOAT:
                case Types.REAL:
                    value = rs.getFloat(index);
                    return rs.wasNull() ? null : value;

                case Types.DOUBLE:
                    value = rs.getDouble(index);
                    return rs.wasNull() ? null : value;

                case Types.NUMERIC:
                case Types.DECIMAL:
                    value = rs.getBigDecimal(index);
                    return rs.wasNull() ? null : value;

                case Types.CHAR:
                case Types.VARCHAR:
                case Types.LONGVARCHAR:
                case Types.NCHAR:
                case Types.NVARCHAR:
                case Types.LONGNVARCHAR:
                    value = rs.getString(index);
                    return rs.wasNull() ? null : value;

                case Types.DATE:
                    Date date = rs.getDate(index);
                    return date != null ? date.toLocalDate() : null;

                case Types.TIME:
                    Time time = rs.getTime(index);
                    return time != null ? time.toLocalTime() : null;

                case Types.TIMESTAMP:
                    Timestamp timestamp = rs.getTimestamp(index);
                    return timestamp != null ? timestamp.toLocalDateTime() : null;

                case Types.BINARY:
                case Types.VARBINARY:
                case Types.LONGVARBINARY:
                    value = rs.getBytes(index);
                    return rs.wasNull() ? null : value;

                case Types.BLOB:
                    value = rs.getBlob(index);
                    return rs.wasNull() ? null : value;

                case Types.CLOB:
                    value = rs.getClob(index);
                    return rs.wasNull() ? null : value;

                default:
                    value = rs.getObject(index);
                    return rs.wasNull() ? null : value;
            }
        } catch (SQLException e) {
            log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
            throw e;
        }
    }

    private void setLastExecTs(Long ts) {
        try {
            File file = new File(lastTimeFilePath);
            FileWriter fw = new FileWriter(file);
            fw.write(ts.toString());
            fw.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private Long getLastExecTs() {
        try {
            File file = new File(lastTimeFilePath);
            BufferedReader br = new BufferedReader(new FileReader(file));
            try {
                String line;
                while ((line = br.readLine()) != null) {
                    if (StringUtils.isNotBlank(line)) {
                        return Long.parseLong(line);
                    }
                }

            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                br.close();
            }


        } catch (IOException e) {
            e.printStackTrace();
        }

        //2026-01-27 00:00:00
        return 1769443200000L;
    }

    private String getToken() {

        Map<String, String> header = new HashMap<>();
        Map<String, String> paramsMap = new HashMap<>();
        paramsMap.put("grant_type", "password");
        paramsMap.put("username", tokenUser);
        paramsMap.put("password", tokenPwd);
        header.put("Authorization", "Basic bWljcm8tcG9ydGFsOlBAc3MxMjM0");

        String sResult = HttpClientUtils.doPostRequest(tokenUrl, header, paramsMap, null);
        if (StringUtils.isNotBlank(sResult)) {
            JSONObject jsonObject = JSONObject.parseObject(sResult);
            return jsonObject.getString("access_token");
        } else {
            return null;
        }

    }

    private void batchAddOrUpate(String token, List<Map<String, Object>> dataList) {

        Map<String, String> header = new HashMap<>();

        Map<String, Object> paramsMap = new HashMap<>();
        paramsMap.put("formId", "t67c1212b01915800070e7712");
        List<String> uniqueKeys = new ArrayList<>();
        uniqueKeys.add("entName");
        uniqueKeys.add("deviceCode");
        paramsMap.put("uniqueKeys", uniqueKeys);

        paramsMap.put("datas", dataList);

        header.put("Authorization", "Bearer" + token);

        String toJson = JSONObject.toJSONString(paramsMap);
        StringEntity myEntity = new StringEntity(toJson, ContentType.APPLICATION_JSON);
        String sResult = HttpClientUtils.doPostRequest(reportUrl, header, null, myEntity);
        log.info("淮南红三七上报结果:" + sResult);
    }
}