GhDevicePullService.java 15.2 KB
package com.iot.scheduler.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.iot.scheduler.model.DeviceToken;
import com.iot.scheduler.model.QxDeviceInfo;
import com.iot.scheduler.model.QxDeviceInfoDetail;
import com.iot.scheduler.utils.SqlTypedValueUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.*;
import java.util.Date;

/**
 * 国宏应用数据同步
 */
@Slf4j
@Service
public class GhDevicePullService {

    @Value("${gh.iot.organizeId:}")
    private String iotOrganizeId;
    @Value("${gh.iot.profileId}")
    private String iotProfileId;
    @Value("${gh.iot.deviceProfileId}")
    private String iotDeviceProfileId;
    @Value("${gh.iot.userName:}")
    private String iotUserName;
    @Value("${gh.iot.password:}")
    private String iotPassword;
    @Value("${gh.iot.tokenUrl}")
    private String iotTokenUrl;
    @Value("${gh.iot.infoUrl}")
    private String iotDeviceInfoUrl;
    @Value("${gh.iot.detailUrl}")
    private String iotDeviceDetailUrl;


    @Value("${hn.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
    private String jdbcUrl;

    @Value("${hn.third.jdbcUserName:postgres}")
    private String jdbcUserName;

    @Value("${hn.third.jdbcPassword:postgres}")
    private String jdbcPassword;


    @Resource
    private RedisTemplate<String, String> redisTemplate;

    public void pullDeviceAndPushToIot() {
        JSONArray deviceInfoList = getDeviceInfo();
        List<QxDeviceInfo> qxDeviceInfos = new ArrayList<>();
        List<QxDeviceInfoDetail> qxAddDeviceInfoDetails = new ArrayList<>();
        Map<String, QxDeviceInfoDetail> qxDeviceInfoDetailMap = new HashMap<>();
        for (Object o : deviceInfoList) {
            JSONObject deviceInfoJson = (JSONObject) o;
            QxDeviceInfo qxDeviceInfo = new QxDeviceInfo();
            qxDeviceInfo.setDeviceType("SENSOR");
            qxDeviceInfo.setTransportType("DEFAULT");
            qxDeviceInfo.setOrganizationId(iotOrganizeId);
            qxDeviceInfo.setDeviceProfileId(iotDeviceProfileId);
            qxDeviceInfo.setProfileId(iotProfileId);
//            //项目状态(1:在线,2:离线,3:报警)
//            Integer projectState = deviceInfoJson.getInteger("projectState");
//            if (projectState != null) {
//                qxDeviceInfo.setDescription(String.valueOf(projectState));
//            }

            //项目类型
            qxDeviceInfo.setLabel("生产设备");
            //设备名称
            String deviceName = deviceInfoJson.getString("deviceName");

            qxDeviceInfo.setName(deviceName);
            qxDeviceInfo.setBrand(deviceName);
            //序列号
            String dtuSn = deviceInfoJson.getString("dtuSn");
            qxDeviceInfo.setSn(dtuSn);
            DeviceToken deviceToken = new DeviceToken();
            deviceToken.setCredentialsType("ACCESS_TOKEN");
            deviceToken.setCredentialsId(dtuSn);
            deviceToken.setCredentialsValue(dtuSn);
            qxDeviceInfo.setDeviceToken(deviceToken);
            qxDeviceInfos.add(qxDeviceInfo);
            QxDeviceInfoDetail qxDeviceInfoDetail = new QxDeviceInfoDetail();
            qxDeviceInfoDetail.setAlarm(false);
            //灯状态(0:灭灯,1:红,2:黄,3:绿,4:蓝)
            Integer lampState = deviceInfoJson.getInteger("lampState");
            switch (lampState) {
                case 0:
                    qxDeviceInfoDetail.setStatus("OFF");
                    break;
                case 1:
                    qxDeviceInfoDetail.setStatus("ERROR");
                    qxDeviceInfoDetail.setAlarm(true);
                    //先从缓存里面拿token信息
                    String totalCapacity = getTotalCapacity("gh_total_capacity_" + dtuSn, 0D);
                    qxDeviceInfoDetail.setCumulativeOutput(Double.valueOf(totalCapacity));
                    break;
                case 2:
                    qxDeviceInfoDetail.setStatus("STAND");
                    //先从缓存里面拿token信息
                    totalCapacity = getTotalCapacity("gh_total_capacity_" + dtuSn, 0D);
                    qxDeviceInfoDetail.setCumulativeOutput(Double.valueOf(totalCapacity));
                    break;
                case 3:
                    qxDeviceInfoDetail.setStatus("RUN");
                    //先从缓存里面拿token信息
                    totalCapacity = getTotalCapacity("gh_total_capacity_" + dtuSn, 200D);
                    qxDeviceInfoDetail.setCumulativeOutput(Double.valueOf(totalCapacity));
                    qxDeviceInfoDetail.setCapacity(200D);
                    break;
                default:
                    continue;
            }

            if (lampState == 0) {
                continue;
            }

            qxDeviceInfoDetail.setStartTime(new Date());
            qxDeviceInfoDetail.setDtuSn(dtuSn);
            qxDeviceInfoDetailMap.put(dtuSn, qxDeviceInfoDetail);
            qxAddDeviceInfoDetails.add(qxDeviceInfoDetail);
        }

        //将数据同步到IOT平台
        Map<String, String> qxParam = new HashMap<>(2);
        qxParam.put("username", iotUserName);
        qxParam.put("password", iotPassword);

        HttpPost qxHttpPost = new HttpPost(iotTokenUrl);
        String qxResult = sendPost(qxHttpPost, JSON.toJSONString(qxParam));
        if (StringUtils.isBlank(qxResult)) {
            return;
        }
        Map<String, Object> qxRes = JSON.parseObject(qxResult, new TypeReference<>() {
        });

        String qxAccessToken = (String) qxRes.get("token");
        if (StringUtils.isBlank(qxAccessToken)) {
            return;
        }

        BasicHeader qxAuthorization = new BasicHeader("X-Authorization", "Bearer " + qxAccessToken);
        if (!CollectionUtils.isEmpty(qxDeviceInfos)) {
            HttpPost qxDeviceInfoPost = new HttpPost(iotDeviceInfoUrl);
            qxDeviceInfoPost.addHeader(qxAuthorization);
            for (QxDeviceInfo qxDeviceInfo : qxDeviceInfos) {
                // todo
                String syncDeviceInfo = sendPost(qxDeviceInfoPost, JSON.toJSONString(qxDeviceInfo));
                //log.info("同步设备信息 syncDeviceInfo:{}", syncDeviceInfo);
            }
        }

        if (!CollectionUtils.isEmpty(qxAddDeviceInfoDetails)) {
            for (QxDeviceInfoDetail qxDeviceInfoDetail : qxAddDeviceInfoDetails) {
                String qxDeviceInfoDetailUrlStr = iotDeviceDetailUrl + qxDeviceInfoDetail.getDtuSn() + "/telemetry";
                HttpPost qxDeviceInfoDetailPost = new HttpPost(qxDeviceInfoDetailUrlStr);
                qxDeviceInfoDetailPost.addHeader(qxAuthorization);
                String syncDeviceInfoDetail = sendPost(qxDeviceInfoDetailPost, JSON.toJSONString(qxDeviceInfoDetail));
            }
        }
    }

    private JSONArray getDeviceInfo() {
        Connection connection = null;
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        HikariDataSource dataSource = null;
        List<Object> resultList = new ArrayList<>();

        log.info("开始连接数据库,URL: {}", jdbcUrl);

        try {
            HikariConfig config = new HikariConfig();
            config.setJdbcUrl(jdbcUrl);
            config.setUsername(jdbcUserName);
            config.setPassword(jdbcPassword);
            config.setDriverClassName("org.postgresql.Driver");
            config.setMaximumPoolSize(5);
            config.setMinimumIdle(5);
            config.setConnectionTimeout(60000);
            config.setConnectionTestQuery("SELECT 1");

            dataSource = new HikariDataSource(config);
            log.info("Hikari连接池配置完成");

            connection = dataSource.getConnection();
            log.info("数据库连接成功");

            String selectSql = "SELECT\n" +
                    "de.name,\n" +
                    "dc.credentials_id,\n" +
                    "CASE\n" +
                    "    WHEN ak2.long_v IS NULL OR (ak.bool_v = FALSE AND ak2.long_v IS NOT NULL) THEN 'OFF'\n" +
                    "    WHEN tkl.long_v = 1 THEN 'ERROR'\n" +
                    "    WHEN tkl2.long_v = 1 THEN 'STAND'\n" +
                    "    WHEN tkl3.long_v = 1 THEN 'RUN'\n" +
                    "    ELSE 'OFF'\n" +
                    "  END AS status \n" +
                    "FROM device de \n" +
                    "LEFT JOIN ts_kv_latest tkl on de.id = tkl.entity_id AND tkl.key = '64'\n" +
                    "LEFT JOIN ts_kv_latest tkl2 on de.id = tkl2.entity_id AND tkl2.key = '535'\n" +
                    "LEFT JOIN ts_kv_latest tkl3 on de.id = tkl3.entity_id AND tkl3.key = '534'\n" +
                    "LEFT JOIN attribute_kv ak ON de.id = ak.entity_id AND ak.entity_type = 'DEVICE' AND ak.attribute_key = 'active'\n" +
                    "LEFT JOIN attribute_kv ak2 ON de.id = ak2.entity_id AND ak2.entity_type = 'DEVICE' AND ak2.attribute_key = 'lastActivityTime'\n" +
                    "LEFT JOIN device_credentials dc ON de.id = dc.device_id\n" +
                    "WHERE de.organization_id = '875a4841-c7f2-4e2c-88a2-ea62d4642132'\n" +
                    "AND de.device_profile_id = '1f2183a0-0562-11f1-9cb8-e3376d1e7978'\n";

            statement = connection.prepareStatement(selectSql);
            log.info("执行SQL查询: {}", selectSql);

            resultSet = statement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            log.info("查询结果集元数据获取成功,共{}列", columnCount);

            int rowCount = 0;
            while (resultSet.next()) {
                List<Object> result = new ArrayList<>(columnCount);
                for (int index = 1; index <= columnCount; index++) {
                    int columnType = metaData.getColumnType(index);
                    Object value = SqlTypedValueUtils.getTypedValue(resultSet, index, columnType);
                    result.add(value);
                }
                resultList.add(result);
                rowCount++;

                // 每处理1000行记录一次日志
                if (rowCount % 1000 == 0) {
                    log.info("已处理{}行数据", rowCount);
                }
            }

            log.info("数据查询完成,共获取{}行数据", rowCount);

        } catch (SQLException e) {
            log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
        } catch (Exception e) {
            log.error("初始化数据库连接或查询数据时发生异常", e);
        } finally {
            // 释放资源
            try {
                if (resultSet != null) resultSet.close();
                if (statement != null) statement.close();
                if (connection != null) connection.close();
                log.info("数据库连接资源已释放");
            } catch (SQLException e) {
                log.error("关闭数据库资源时发生异常", e);
            }

            if (dataSource != null) {
                try {
                    dataSource.close();
                    log.info("HikariDataSource连接池已关闭");
                } catch (Exception e) {
                    log.error("关闭HikariDataSource连接池时发生异常", e);
                }
            }
        }

        log.info("数据库操作完成,返回{}条记录", resultList.size());
        JSONArray jsonArray = new JSONArray();
        List<JSONObject> jsonObjectList = new ArrayList<>(resultList.size());
        for (Object result : resultList) {
            List<Object> dataList = (ArrayList) result;
            Map<String, Object> map = new HashMap<>(3);
            map.put("deviceName", dataList.get(0));
            map.put("dtuSn", dataList.get(1));
            String status = dataList.get(2).toString();
            if ("OFF".equals(status)) {
                map.put("lampState", 0);
            } else if ("ERROR".equals(status)) {
                map.put("lampState", 1);
            } else if ("STAND".equals(status)) {
                map.put("lampState", 2);
            } else if ("RUN".equals(status)) {
                map.put("lampState", 3);
            }

            JSONObject jsonObject = new JSONObject();
            jsonObject.putAll(map);
            jsonObjectList.add(jsonObject);
        }

        jsonArray.addAll(jsonObjectList);
        return jsonArray;

//        JSONArray jsonArray = new JSONArray();
//        List<JSONObject> jsonObjectList = new ArrayList<>(13);
//        for (int index = 1; index < 14; index++) {
//            Map<String, Object> map = new HashMap<>(3);
//            map.put("deviceName", "无纺布机组" + index);
//            map.put("dtuSn", "AGH20260123" + String.format("%03d", index));
//            map.put("lampState", 3);
//            JSONObject jsonObject = new JSONObject();
//            jsonObject.putAll(map);
//            jsonObjectList.add(jsonObject);
//        }
//
//        jsonArray.addAll(jsonObjectList);
//        return jsonArray;
    }


    private String getTotalCapacity(String key, Double production) {
        String totalCapacity = redisTemplate.opsForValue().get(key);
        Double totalCapacityD = 0D;
        if (StringUtils.isEmpty(totalCapacity)) {
            totalCapacityD = production;
        } else {
            totalCapacityD = Double.parseDouble(totalCapacity) + production;
        }

        redisTemplate.opsForValue().set(key, String.valueOf(totalCapacityD));
        return String.valueOf(totalCapacityD);
    }

    private String sendPost(HttpPost httpPost, String jsonData) {
        CloseableHttpClient httpClient = HttpClients.createDefault();
        StringEntity entity = new StringEntity(jsonData, ContentType.create("application/json", Consts.UTF_8));
        httpPost.setEntity(entity);
        String result = null;
        try {
            CloseableHttpResponse execute = httpClient.execute(httpPost);
            HttpEntity res = execute.getEntity();
            InputStream is = res.getContent();
            int len;
            byte[] buf = new byte[128];
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while ((len = is.read(buf)) != -1) {
                byteArrayOutputStream.write(buf, 0, len);
            }
            result = byteArrayOutputStream.toString();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
}