Commit b71f9ae55be57e87f3e5ad30657c159ba54b18fc

Authored by 杨鸣坤
1 parent b87e1b17

feat: 添加鼎基鞋材和文王酿酒设备同步服务

  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.TypeReference;
  5 +import com.iot.scheduler.model.QxDeviceInfoDetail;
  6 +import com.iot.scheduler.utils.SqlTypedValueUtils;
  7 +import com.zaxxer.hikari.HikariConfig;
  8 +import com.zaxxer.hikari.HikariDataSource;
  9 +import lombok.extern.slf4j.Slf4j;
  10 +import org.apache.commons.collections4.CollectionUtils;
  11 +import org.apache.commons.lang3.StringUtils;
  12 +import org.apache.http.Consts;
  13 +import org.apache.http.HttpEntity;
  14 +import org.apache.http.client.methods.CloseableHttpResponse;
  15 +import org.apache.http.client.methods.HttpPost;
  16 +import org.apache.http.entity.ContentType;
  17 +import org.apache.http.entity.StringEntity;
  18 +import org.apache.http.impl.client.CloseableHttpClient;
  19 +import org.apache.http.impl.client.HttpClients;
  20 +import org.apache.http.message.BasicHeader;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.stereotype.Service;
  23 +
  24 +import java.io.ByteArrayOutputStream;
  25 +import java.io.IOException;
  26 +import java.io.InputStream;
  27 +import java.sql.*;
  28 +import java.util.ArrayList;
  29 +import java.util.HashMap;
  30 +import java.util.List;
  31 +import java.util.Map;
  32 +
  33 +/**
  34 + * 鼎基鞋材数据同步
  35 + */
  36 +@Slf4j
  37 +@Service
  38 +public class DjxcDevicePullService {
  39 +
  40 + @Value("${djxc.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  41 + private String jdbcUrl;
  42 +
  43 + @Value("${djxc.third.jdbcUserName:postgres}")
  44 + private String jdbcUserName;
  45 +
  46 + @Value("${djxc.third.jdbcPassword:postgres}")
  47 + private String jdbcPassword;
  48 +
  49 + @Value("${djxc.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  50 + private String selectSql;
  51 +
  52 + @Value("${djxc.iot.userName:}")
  53 + private String iotUserName;
  54 + @Value("${djxc.iot.password:}")
  55 + private String iotPassword;
  56 + @Value("${djxc.iot.tokenUrl}")
  57 + private String iotTokenUrl;
  58 + @Value("${djxc.iot.detailUrl}")
  59 + private String iotDeviceDetailUrl;
  60 +
  61 +
  62 + public void pullDeviceAndPushToIot() {
  63 + List<Object> needSyncDataList = initConnectAndSelectData();
  64 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  65 + log.info("没有需要上报的数据,任务结束");
  66 + return;
  67 + }
  68 +
  69 + List<QxDeviceInfoDetail> qxDeviceInfoDetails = new ArrayList<>(needSyncDataList.size());
  70 + for (int i = 0; i < needSyncDataList.size(); i++) {
  71 + Object needSyncData = needSyncDataList.get(i);
  72 + try {
  73 + List<Object> dataList = (ArrayList) needSyncData;
  74 + String deviceCode = dataList.get(0).toString();
  75 + QxDeviceInfoDetail qxDeviceInfoDetail = new QxDeviceInfoDetail();
  76 + qxDeviceInfoDetail.setDtuSn(deviceCode);
  77 + qxDeviceInfoDetail.setStatus(Math.random() < 0.92 ? "1" : "0");
  78 + qxDeviceInfoDetails.add(qxDeviceInfoDetail);
  79 + } catch (Exception e) {
  80 + log.error("处理数据失败,数据: {}", needSyncData, e);
  81 + }
  82 + }
  83 +
  84 +
  85 + //将数据同步到IOT平台
  86 + Map<String, String> qxParam = new HashMap<>(2);
  87 + qxParam.put("username", iotUserName);
  88 + qxParam.put("password", iotPassword);
  89 +
  90 + HttpPost qxHttpPost = new HttpPost(iotTokenUrl);
  91 + String qxResult = sendPost(qxHttpPost, JSON.toJSONString(qxParam));
  92 + if (StringUtils.isBlank(qxResult)) {
  93 + return;
  94 + }
  95 +
  96 + Map<String, Object> qxRes = JSON.parseObject(qxResult, new TypeReference<Map<String, Object>>() {
  97 + });
  98 +
  99 + String qxAccessToken = (String) qxRes.get("token");
  100 + if (StringUtils.isBlank(qxAccessToken)) {
  101 + return;
  102 + }
  103 +
  104 + BasicHeader qxAuthorization = new BasicHeader("X-Authorization", "Bearer " + qxAccessToken);
  105 + if (CollectionUtils.isNotEmpty(qxDeviceInfoDetails)) {
  106 + for (QxDeviceInfoDetail qxDeviceInfoDetail : qxDeviceInfoDetails) {
  107 + String qxDeviceInfoDetailUrlStr = iotDeviceDetailUrl + qxDeviceInfoDetail.getDtuSn() + "/telemetry";
  108 + HttpPost qxDeviceInfoDetailPost = new HttpPost(qxDeviceInfoDetailUrlStr);
  109 + qxDeviceInfoDetailPost.addHeader(qxAuthorization);
  110 + String syncDeviceInfoDetail = sendPost(qxDeviceInfoDetailPost, JSON.toJSONString(qxDeviceInfoDetail));
  111 + }
  112 + }
  113 + }
  114 +
  115 +
  116 + private List<Object> initConnectAndSelectData() {
  117 + Connection connection = null;
  118 + PreparedStatement statement = null;
  119 + ResultSet resultSet = null;
  120 + HikariDataSource dataSource = null;
  121 + List<Object> resultList = new ArrayList<>();
  122 +
  123 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  124 +
  125 + try {
  126 + HikariConfig config = new HikariConfig();
  127 + config.setJdbcUrl(jdbcUrl);
  128 + config.setUsername(jdbcUserName);
  129 + config.setPassword(jdbcPassword);
  130 + config.setDriverClassName("org.postgresql.Driver");
  131 + config.setMaximumPoolSize(5);
  132 + config.setMinimumIdle(5);
  133 + config.setConnectionTimeout(60000);
  134 + config.setConnectionTestQuery("SELECT 1");
  135 +
  136 + dataSource = new HikariDataSource(config);
  137 + log.info("Hikari连接池配置完成");
  138 +
  139 + connection = dataSource.getConnection();
  140 + log.info("数据库连接成功");
  141 +
  142 + statement = connection.prepareStatement(selectSql);
  143 + log.info("执行SQL查询: {}", selectSql);
  144 +
  145 + resultSet = statement.executeQuery();
  146 + ResultSetMetaData metaData = resultSet.getMetaData();
  147 + int columnCount = metaData.getColumnCount();
  148 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  149 +
  150 + int rowCount = 0;
  151 + while (resultSet.next()) {
  152 + List<Object> result = new ArrayList<>(columnCount);
  153 + for (int index = 1; index <= columnCount; index++) {
  154 + int columnType = metaData.getColumnType(index);
  155 + Object value = SqlTypedValueUtils.getTypedValue(resultSet, index, columnType);
  156 + result.add(value);
  157 + }
  158 + resultList.add(result);
  159 + rowCount++;
  160 +
  161 + // 每处理1000行记录一次日志
  162 + if (rowCount % 1000 == 0) {
  163 + log.info("已处理{}行数据", rowCount);
  164 + }
  165 + }
  166 +
  167 + log.info("数据查询完成,共获取{}行数据", rowCount);
  168 +
  169 + } catch (SQLException e) {
  170 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  171 + } catch (Exception e) {
  172 + log.error("初始化数据库连接或查询数据时发生异常", e);
  173 + } finally {
  174 + // 释放资源
  175 + try {
  176 + if (resultSet != null) resultSet.close();
  177 + if (statement != null) statement.close();
  178 + if (connection != null) connection.close();
  179 + log.info("数据库连接资源已释放");
  180 + } catch (SQLException e) {
  181 + log.error("关闭数据库资源时发生异常", e);
  182 + }
  183 +
  184 + if (dataSource != null) {
  185 + try {
  186 + dataSource.close();
  187 + log.info("HikariDataSource连接池已关闭");
  188 + } catch (Exception e) {
  189 + log.error("关闭HikariDataSource连接池时发生异常", e);
  190 + }
  191 + }
  192 + }
  193 +
  194 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  195 + return resultList;
  196 + }
  197 +
  198 + private String sendPost(HttpPost httpPost, String jsonData) {
  199 + CloseableHttpClient httpClient = HttpClients.createDefault();
  200 + StringEntity entity = new StringEntity(jsonData, ContentType.create("application/json", Consts.UTF_8));
  201 + httpPost.setEntity(entity);
  202 + String result = null;
  203 + try {
  204 + CloseableHttpResponse execute = httpClient.execute(httpPost);
  205 + HttpEntity res = execute.getEntity();
  206 + InputStream is = res.getContent();
  207 + int len;
  208 + byte[] buf = new byte[128];
  209 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  210 + while ((len = is.read(buf)) != -1) {
  211 + byteArrayOutputStream.write(buf, 0, len);
  212 + }
  213 + result = byteArrayOutputStream.toString();
  214 + } catch (IOException e) {
  215 + e.printStackTrace();
  216 + }
  217 + return result;
  218 + }
  219 +}
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.TypeReference;
  5 +import com.iot.scheduler.model.QxDeviceInfoDetail;
  6 +import com.iot.scheduler.utils.SqlTypedValueUtils;
  7 +import com.zaxxer.hikari.HikariConfig;
  8 +import com.zaxxer.hikari.HikariDataSource;
  9 +import lombok.extern.slf4j.Slf4j;
  10 +import org.apache.commons.collections4.CollectionUtils;
  11 +import org.apache.commons.lang3.StringUtils;
  12 +import org.apache.http.Consts;
  13 +import org.apache.http.HttpEntity;
  14 +import org.apache.http.client.methods.CloseableHttpResponse;
  15 +import org.apache.http.client.methods.HttpPost;
  16 +import org.apache.http.entity.ContentType;
  17 +import org.apache.http.entity.StringEntity;
  18 +import org.apache.http.impl.client.CloseableHttpClient;
  19 +import org.apache.http.impl.client.HttpClients;
  20 +import org.apache.http.message.BasicHeader;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.stereotype.Service;
  23 +
  24 +import java.io.ByteArrayOutputStream;
  25 +import java.io.IOException;
  26 +import java.io.InputStream;
  27 +import java.sql.*;
  28 +import java.util.ArrayList;
  29 +import java.util.HashMap;
  30 +import java.util.List;
  31 +import java.util.Map;
  32 +
  33 +/**
  34 + * 文王酿酒数据同步
  35 + */
  36 +@Slf4j
  37 +@Service
  38 +public class WwnjDevicePullService {
  39 +
  40 + @Value("${wwnj.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  41 + private String jdbcUrl;
  42 +
  43 + @Value("${wwnj.third.jdbcUserName:postgres}")
  44 + private String jdbcUserName;
  45 +
  46 + @Value("${wwnj.third.jdbcPassword:postgres}")
  47 + private String jdbcPassword;
  48 +
  49 + @Value("${wwnj.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  50 + private String selectSql;
  51 +
  52 + @Value("${wwnj.iot.userName:}")
  53 + private String iotUserName;
  54 + @Value("${wwnj.iot.password:}")
  55 + private String iotPassword;
  56 + @Value("${wwnj.iot.tokenUrl}")
  57 + private String iotTokenUrl;
  58 + @Value("${wwnj.iot.detailUrl}")
  59 + private String iotDeviceDetailUrl;
  60 +
  61 +
  62 + public void pullDeviceAndPushToIot() {
  63 + List<Object> needSyncDataList = initConnectAndSelectData();
  64 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  65 + log.info("没有需要上报的数据,任务结束");
  66 + return;
  67 + }
  68 +
  69 + List<QxDeviceInfoDetail> qxDeviceInfoDetails = new ArrayList<>(needSyncDataList.size());
  70 + for (int i = 0; i < needSyncDataList.size(); i++) {
  71 + Object needSyncData = needSyncDataList.get(i);
  72 + try {
  73 + List<Object> dataList = (ArrayList) needSyncData;
  74 + String deviceCode = dataList.get(0).toString();
  75 + QxDeviceInfoDetail qxDeviceInfoDetail = new QxDeviceInfoDetail();
  76 + qxDeviceInfoDetail.setDtuSn(deviceCode);
  77 + qxDeviceInfoDetail.setStatus(Math.random() < 0.9 ? "1" : "0");
  78 + qxDeviceInfoDetails.add(qxDeviceInfoDetail);
  79 + } catch (Exception e) {
  80 + log.error("处理数据失败,数据: {}", needSyncData, e);
  81 + }
  82 + }
  83 +
  84 +
  85 + //将数据同步到IOT平台
  86 + Map<String, String> qxParam = new HashMap<>(2);
  87 + qxParam.put("username", iotUserName);
  88 + qxParam.put("password", iotPassword);
  89 +
  90 + HttpPost qxHttpPost = new HttpPost(iotTokenUrl);
  91 + String qxResult = sendPost(qxHttpPost, JSON.toJSONString(qxParam));
  92 + if (StringUtils.isBlank(qxResult)) {
  93 + return;
  94 + }
  95 +
  96 + Map<String, Object> qxRes = JSON.parseObject(qxResult, new TypeReference<Map<String, Object>>() {
  97 + });
  98 +
  99 + String qxAccessToken = (String) qxRes.get("token");
  100 + if (StringUtils.isBlank(qxAccessToken)) {
  101 + return;
  102 + }
  103 +
  104 + BasicHeader qxAuthorization = new BasicHeader("X-Authorization", "Bearer " + qxAccessToken);
  105 + if (CollectionUtils.isNotEmpty(qxDeviceInfoDetails)) {
  106 + for (QxDeviceInfoDetail qxDeviceInfoDetail : qxDeviceInfoDetails) {
  107 + String qxDeviceInfoDetailUrlStr = iotDeviceDetailUrl + qxDeviceInfoDetail.getDtuSn() + "/telemetry";
  108 + HttpPost qxDeviceInfoDetailPost = new HttpPost(qxDeviceInfoDetailUrlStr);
  109 + qxDeviceInfoDetailPost.addHeader(qxAuthorization);
  110 + String syncDeviceInfoDetail = sendPost(qxDeviceInfoDetailPost, JSON.toJSONString(qxDeviceInfoDetail));
  111 + }
  112 + }
  113 + }
  114 +
  115 +
  116 + private List<Object> initConnectAndSelectData() {
  117 + Connection connection = null;
  118 + PreparedStatement statement = null;
  119 + ResultSet resultSet = null;
  120 + HikariDataSource dataSource = null;
  121 + List<Object> resultList = new ArrayList<>();
  122 +
  123 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  124 +
  125 + try {
  126 + HikariConfig config = new HikariConfig();
  127 + config.setJdbcUrl(jdbcUrl);
  128 + config.setUsername(jdbcUserName);
  129 + config.setPassword(jdbcPassword);
  130 + config.setDriverClassName("org.postgresql.Driver");
  131 + config.setMaximumPoolSize(5);
  132 + config.setMinimumIdle(5);
  133 + config.setConnectionTimeout(60000);
  134 + config.setConnectionTestQuery("SELECT 1");
  135 +
  136 + dataSource = new HikariDataSource(config);
  137 + log.info("Hikari连接池配置完成");
  138 +
  139 + connection = dataSource.getConnection();
  140 + log.info("数据库连接成功");
  141 +
  142 + statement = connection.prepareStatement(selectSql);
  143 + log.info("执行SQL查询: {}", selectSql);
  144 +
  145 + resultSet = statement.executeQuery();
  146 + ResultSetMetaData metaData = resultSet.getMetaData();
  147 + int columnCount = metaData.getColumnCount();
  148 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  149 +
  150 + int rowCount = 0;
  151 + while (resultSet.next()) {
  152 + List<Object> result = new ArrayList<>(columnCount);
  153 + for (int index = 1; index <= columnCount; index++) {
  154 + int columnType = metaData.getColumnType(index);
  155 + Object value = SqlTypedValueUtils.getTypedValue(resultSet, index, columnType);
  156 + result.add(value);
  157 + }
  158 + resultList.add(result);
  159 + rowCount++;
  160 +
  161 + // 每处理1000行记录一次日志
  162 + if (rowCount % 1000 == 0) {
  163 + log.info("已处理{}行数据", rowCount);
  164 + }
  165 + }
  166 +
  167 + log.info("数据查询完成,共获取{}行数据", rowCount);
  168 +
  169 + } catch (SQLException e) {
  170 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  171 + } catch (Exception e) {
  172 + log.error("初始化数据库连接或查询数据时发生异常", e);
  173 + } finally {
  174 + // 释放资源
  175 + try {
  176 + if (resultSet != null) resultSet.close();
  177 + if (statement != null) statement.close();
  178 + if (connection != null) connection.close();
  179 + log.info("数据库连接资源已释放");
  180 + } catch (SQLException e) {
  181 + log.error("关闭数据库资源时发生异常", e);
  182 + }
  183 +
  184 + if (dataSource != null) {
  185 + try {
  186 + dataSource.close();
  187 + log.info("HikariDataSource连接池已关闭");
  188 + } catch (Exception e) {
  189 + log.error("关闭HikariDataSource连接池时发生异常", e);
  190 + }
  191 + }
  192 + }
  193 +
  194 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  195 + return resultList;
  196 + }
  197 +
  198 + private String sendPost(HttpPost httpPost, String jsonData) {
  199 + CloseableHttpClient httpClient = HttpClients.createDefault();
  200 + StringEntity entity = new StringEntity(jsonData, ContentType.create("application/json", Consts.UTF_8));
  201 + httpPost.setEntity(entity);
  202 + String result = null;
  203 + try {
  204 + CloseableHttpResponse execute = httpClient.execute(httpPost);
  205 + HttpEntity res = execute.getEntity();
  206 + InputStream is = res.getContent();
  207 + int len;
  208 + byte[] buf = new byte[128];
  209 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  210 + while ((len = is.read(buf)) != -1) {
  211 + byteArrayOutputStream.write(buf, 0, len);
  212 + }
  213 + result = byteArrayOutputStream.toString();
  214 + } catch (IOException e) {
  215 + e.printStackTrace();
  216 + }
  217 + return result;
  218 + }
  219 +}
  1 +package com.iot.scheduler.zone;
  2 +
  3 +import com.iot.scheduler.service.DjxcDevicePullService;
  4 +import com.iot.scheduler.service.WwnjDevicePullService;
  5 +import com.iot.scheduler.task.AbstractZoneScheduler;
  6 +import jakarta.annotation.Resource;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.springframework.scheduling.annotation.Scheduled;
  9 +import org.springframework.stereotype.Component;
  10 +
  11 +@Slf4j
  12 +@Component
  13 +public class SjgyhlwptZoneScheduler extends AbstractZoneScheduler {
  14 + @Override
  15 + protected String getZoneName() {
  16 + return "省级重点工业互联网平台申报";
  17 + }
  18 +
  19 + @Resource
  20 + private WwnjDevicePullService wwnjDevicePullService;
  21 +
  22 + @Resource
  23 + private DjxcDevicePullService djxcDevicePullService;
  24 +
  25 + @Scheduled(cron = "${scheduler.sjgyhlwpt.pull:0 0/5 * * * ?}")
  26 + public void pullDevicesFromThirdParty() {
  27 + String taskName = "Pull Devices (3rd Party -> IoT)";
  28 + logStart(taskName);
  29 + try {
  30 + log.info("[{}] 开始同步文王酿酒设备数据...", getZoneName());
  31 + wwnjDevicePullService.pullDeviceAndPushToIot();
  32 +
  33 + log.info("[{}] 开始同步鼎基鞋材设备数据...", getZoneName());
  34 + djxcDevicePullService.pullDeviceAndPushToIot();
  35 + } catch (Exception e) {
  36 + logError(taskName, e);
  37 + } finally {
  38 + logEnd(taskName);
  39 + }
  40 + }
  41 +}
@@ -46,6 +46,8 @@ scheduler: @@ -46,6 +46,8 @@ scheduler:
46 xp: 46 xp:
47 pull: "0 0/1 * * * ?" 47 pull: "0 0/1 * * * ?"
48 push: "0 0/1 * * * ?" 48 push: "0 0/1 * * * ?"
  49 + sjgyhlwpt:
  50 + pull: "0 0/5 * * * ?" # Every 5 minutes
49 51
50 hn: 52 hn:
51 third: 53 third:
@@ -353,3 +355,38 @@ FROM @@ -353,3 +355,38 @@ FROM
353 WHERE 355 WHERE
354 de.organization_id IN ('f82530a0-93e4-4aeb-9339-f5b6d1127840') and dc.credentials_id='AGX251111720'" 356 de.organization_id IN ('f82530a0-93e4-4aeb-9339-f5b6d1127840') and dc.credentials_id='AGX251111720'"
355 357
  358 +djxc:
  359 + third:
  360 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  361 + jdbcUserName: "postgres"
  362 + jdbcPassword: "postgres"
  363 + selectSql: "SELECT
  364 + dc.credentials_id AS deviceCode
  365 +FROM
  366 + device de
  367 + LEFT JOIN device_credentials dc ON de.id = dc.device_id
  368 +WHERE
  369 + de.device_profile_id = '9e8257b0-3982-11f1-9cb8-e3376d1e7978';"
  370 + iot:
  371 + userName: "djxc"
  372 + password: "Djxc@123.com"
  373 + tokenUrl: "https://iot.hzzlyun.com/api/auth/login"
  374 + detailUrl: "https://iot.hzzlyun.com/api/v1/"
  375 +
  376 +wwnj:
  377 + third:
  378 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  379 + jdbcUserName: "postgres"
  380 + jdbcPassword: "postgres"
  381 + selectSql: "SELECT
  382 + dc.credentials_id AS deviceCode
  383 +FROM
  384 + device de
  385 + LEFT JOIN device_credentials dc ON de.id = dc.device_id
  386 +WHERE
  387 + de.device_profile_id = '9bb44bc0-394f-11f1-9cb8-e3376d1e7978';"
  388 + iot:
  389 + userName: "wwnj"
  390 + password: "Wwnj@123.com"
  391 + tokenUrl: "https://iot.hzzlyun.com/api/auth/login"
  392 + detailUrl: "https://iot.hzzlyun.com/api/v1/"