Commit 1d30326a68a23e09ad89cb77bbeaf7b3204ea96d

Authored by 胡翰林
1 parent c1dc2b6a

兴鹏设备属性上报

  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.iot.scheduler.model.ReportDevice;
  5 +import com.zaxxer.hikari.HikariConfig;
  6 +import com.zaxxer.hikari.HikariDataSource;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.apache.commons.collections4.CollectionUtils;
  9 +import org.apache.http.Consts;
  10 +import org.apache.http.HttpEntity;
  11 +import org.apache.http.client.methods.CloseableHttpResponse;
  12 +import org.apache.http.client.methods.HttpPost;
  13 +import org.apache.http.entity.ContentType;
  14 +import org.apache.http.entity.StringEntity;
  15 +import org.apache.http.impl.client.CloseableHttpClient;
  16 +import org.apache.http.impl.client.HttpClients;
  17 +import org.springframework.beans.factory.annotation.Value;
  18 +import org.springframework.stereotype.Service;
  19 +
  20 +import java.io.ByteArrayOutputStream;
  21 +import java.io.IOException;
  22 +import java.io.InputStream;
  23 +import java.sql.*;
  24 +import java.util.ArrayList;
  25 +import java.util.HashMap;
  26 +import java.util.List;
  27 +import java.util.Map;
  28 +
  29 +/**
  30 + * 兴鹏设备属性上报
  31 + */
  32 +@Slf4j
  33 +@Service
  34 +public class XpDeviceReportService {
  35 +
  36 + @Value("${xp.third.domain:http://220.180.204.38:1803/report-property}")
  37 + private String reportUrl;
  38 +
  39 + @Value("${xp.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  40 + private String jdbcUrl;
  41 +
  42 + @Value("${xp.third.jdbcUserName:postgres}")
  43 + private String jdbcUserName;
  44 +
  45 + @Value("${xp.third.jdbcPassword:postgres}")
  46 + private String jdbcPassword;
  47 +
  48 + @Value("${xp.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  49 + private String selectSql;
  50 +
  51 + public void deviceReport() {
  52 + log.info("开始执行兴鹏设备属性上报任务");
  53 + try {
  54 + List<Object> needSyncDataList = initConnectAndSelectData();
  55 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  56 + log.info("没有需要上报的数据,任务结束");
  57 + return;
  58 + }
  59 +
  60 + log.info("开始处理兴鹏{}条设备数据并进行上报", needSyncDataList.size());
  61 + int successCount = 0;
  62 + int failCount = 0;
  63 +
  64 + for (int i = 0; i < needSyncDataList.size(); i++) {
  65 + Object needSyncData = needSyncDataList.get(i);
  66 + try {
  67 + List<Object> dataList = (ArrayList) needSyncData;
  68 + String deviceId = dataList.get(0).toString();
  69 +
  70 + ReportDevice reportDevice = new ReportDevice();
  71 + reportDevice.setDeviceId(deviceId);
  72 + Map<String, Object> propertiesMap = new HashMap<>(4);
  73 +
  74 + String status = dataList.get(1).toString();
  75 + if ("RUN".equals(status)) {
  76 + propertiesMap.put("State_Run", 1);
  77 + propertiesMap.put("State_PowerOn", 1);
  78 + propertiesMap.put("State_Alarm", 0);
  79 + } else if ("STAND".equals(status)) {
  80 + propertiesMap.put("State_Run", 0);
  81 + propertiesMap.put("State_PowerOn", 1);
  82 + propertiesMap.put("State_Alarm", 0);
  83 + } else if ("ERROR".equals(status)) {
  84 + propertiesMap.put("State_Run", 0);
  85 + propertiesMap.put("State_PowerOn", 1);
  86 + propertiesMap.put("State_Alarm", 1);
  87 + } else {
  88 + propertiesMap.put("State_Run", 0);
  89 + propertiesMap.put("State_PowerOn", 0);
  90 + propertiesMap.put("State_Alarm", 0);
  91 + }
  92 +
  93 + Object outputValue = dataList.get(2);
  94 + long outputLong = 0L;
  95 +
  96 + if (outputValue != null) {
  97 + if (outputValue instanceof Double) {
  98 + double doubleValue = (Double) outputValue;
  99 + outputLong = (long) doubleValue; // 直接转型,会丢失小数部分
  100 + } else if (outputValue instanceof Number) {
  101 + outputLong = ((Number) outputValue).longValue();
  102 + } else {
  103 + // 处理字符串或其他类型
  104 + outputLong = Long.parseLong(outputValue.toString());
  105 + }
  106 + } else {
  107 + log.info("设备[{}]输出值为null,使用默认值0", deviceId);
  108 + }
  109 +
  110 + propertiesMap.put("ProInfo_Output", outputLong);
  111 + reportDevice.setProperties(propertiesMap);
  112 +
  113 + boolean sendResult = sendReportDevice(reportDevice);
  114 + if (sendResult) {
  115 + successCount++;
  116 + } else {
  117 + failCount++;
  118 + }
  119 +
  120 + } catch (Exception e) {
  121 + failCount++;
  122 + log.error("处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
  123 + }
  124 + }
  125 +
  126 + log.info("兴鹏设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
  127 + successCount, failCount, needSyncDataList.size());
  128 +
  129 + } catch (Exception e) {
  130 + log.error("兴鹏执行设备属性上报任务时发生异常", e);
  131 + } finally {
  132 + log.info("兴鹏设备属性上报任务执行结束");
  133 + }
  134 + }
  135 +
  136 + private boolean sendReportDevice(ReportDevice reportDevice) {
  137 + String deviceId = reportDevice.getDeviceId();
  138 + HttpPost httpPost = new HttpPost(reportUrl);
  139 + CloseableHttpClient httpClient = HttpClients.createDefault();
  140 + String requestBody = JSON.toJSONString(reportDevice);
  141 +
  142 + log.info("设备[{}]上报请求体: {}", deviceId, requestBody);
  143 +
  144 + StringEntity entity = new StringEntity(requestBody, ContentType.create("application/json", Consts.UTF_8));
  145 + httpPost.setEntity(entity);
  146 + httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
  147 +
  148 + try (CloseableHttpResponse execute = httpClient.execute(httpPost)) {
  149 + int statusCode = execute.getStatusLine().getStatusCode();
  150 +
  151 + HttpEntity res = execute.getEntity();
  152 + String responseBody = "";
  153 + if (res != null) {
  154 + try (InputStream is = res.getContent();
  155 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
  156 + byte[] buf = new byte[128];
  157 + int len;
  158 + while ((len = is.read(buf)) != -1) {
  159 + byteArrayOutputStream.write(buf, 0, len);
  160 + }
  161 + responseBody = byteArrayOutputStream.toString("UTF-8");
  162 + }
  163 + }
  164 +
  165 + log.info("设备[{}]上报响应,状态码: {},响应内容: {}", deviceId, statusCode, responseBody);
  166 +
  167 + if (statusCode >= 200 && statusCode < 300) {
  168 + return true;
  169 + } else {
  170 + log.error("设备[{}]上报失败,HTTP状态码: {},响应: {}", deviceId, statusCode, responseBody);
  171 + return false;
  172 + }
  173 +
  174 + } catch (IOException e) {
  175 + log.error("设备[{}]上报请求发生IO异常", deviceId, e);
  176 + return false;
  177 + } finally {
  178 + try {
  179 + httpClient.close();
  180 + } catch (IOException e) {
  181 + log.error("关闭HTTP客户端时发生异常", e);
  182 + }
  183 + }
  184 + }
  185 +
  186 + private List<Object> initConnectAndSelectData() {
  187 + Connection connection = null;
  188 + PreparedStatement statement = null;
  189 + ResultSet resultSet = null;
  190 + HikariDataSource dataSource = null;
  191 + List<Object> resultList = new ArrayList<>();
  192 +
  193 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  194 +
  195 + try {
  196 + HikariConfig config = new HikariConfig();
  197 + config.setJdbcUrl(jdbcUrl);
  198 + config.setUsername(jdbcUserName);
  199 + config.setPassword(jdbcPassword);
  200 + config.setDriverClassName("org.postgresql.Driver");
  201 + config.setMaximumPoolSize(5);
  202 + config.setMinimumIdle(5);
  203 + config.setConnectionTimeout(60000);
  204 + config.setConnectionTestQuery("SELECT 1");
  205 +
  206 + dataSource = new HikariDataSource(config);
  207 + log.info("Hikari连接池配置完成");
  208 +
  209 + connection = dataSource.getConnection();
  210 + log.info("数据库连接成功");
  211 +
  212 + statement = connection.prepareStatement(selectSql);
  213 + log.info("执行SQL查询: {}", selectSql);
  214 +
  215 + resultSet = statement.executeQuery();
  216 + ResultSetMetaData metaData = resultSet.getMetaData();
  217 + int columnCount = metaData.getColumnCount();
  218 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  219 +
  220 + int rowCount = 0;
  221 + while (resultSet.next()) {
  222 + List<Object> result = new ArrayList<>(columnCount);
  223 + for (int index = 1; index <= columnCount; index++) {
  224 + int columnType = metaData.getColumnType(index);
  225 + Object value = getTypedValue(resultSet, index, columnType);
  226 + result.add(value);
  227 + }
  228 + resultList.add(result);
  229 + rowCount++;
  230 +
  231 + // 每处理1000行记录一次日志
  232 + if (rowCount % 1000 == 0) {
  233 + log.info("已处理{}行数据", rowCount);
  234 + }
  235 + }
  236 +
  237 + log.info("数据查询完成,共获取{}行数据", rowCount);
  238 +
  239 + } catch (SQLException e) {
  240 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  241 + } catch (Exception e) {
  242 + log.error("初始化数据库连接或查询数据时发生异常", e);
  243 + } finally {
  244 + // 释放资源
  245 + try {
  246 + if (resultSet != null) resultSet.close();
  247 + if (statement != null) statement.close();
  248 + if (connection != null) connection.close();
  249 + log.info("数据库连接资源已释放");
  250 + } catch (SQLException e) {
  251 + log.error("关闭数据库资源时发生异常", e);
  252 + }
  253 +
  254 + if (dataSource != null) {
  255 + try {
  256 + dataSource.close();
  257 + log.info("HikariDataSource连接池已关闭");
  258 + } catch (Exception e) {
  259 + log.error("关闭HikariDataSource连接池时发生异常", e);
  260 + }
  261 + }
  262 + }
  263 +
  264 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  265 + return resultList;
  266 + }
  267 +
  268 + private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  269 + Object value;
  270 +
  271 + try {
  272 + switch (sqlType) {
  273 + case Types.BIT:
  274 + case Types.BOOLEAN:
  275 + value = rs.getBoolean(index);
  276 + return rs.wasNull() ? null : value;
  277 +
  278 + case Types.TINYINT:
  279 + case Types.SMALLINT:
  280 + case Types.INTEGER:
  281 + value = rs.getInt(index);
  282 + return rs.wasNull() ? null : value;
  283 +
  284 + case Types.BIGINT:
  285 + value = rs.getLong(index);
  286 + return rs.wasNull() ? null : value;
  287 +
  288 + case Types.FLOAT:
  289 + case Types.REAL:
  290 + value = rs.getFloat(index);
  291 + return rs.wasNull() ? null : value;
  292 +
  293 + case Types.DOUBLE:
  294 + value = rs.getDouble(index);
  295 + return rs.wasNull() ? null : value;
  296 +
  297 + case Types.NUMERIC:
  298 + case Types.DECIMAL:
  299 + value = rs.getBigDecimal(index);
  300 + return rs.wasNull() ? null : value;
  301 +
  302 + case Types.CHAR:
  303 + case Types.VARCHAR:
  304 + case Types.LONGVARCHAR:
  305 + case Types.NCHAR:
  306 + case Types.NVARCHAR:
  307 + case Types.LONGNVARCHAR:
  308 + value = rs.getString(index);
  309 + return rs.wasNull() ? null : value;
  310 +
  311 + case Types.DATE:
  312 + Date date = rs.getDate(index);
  313 + return date != null ? date.toLocalDate() : null;
  314 +
  315 + case Types.TIME:
  316 + Time time = rs.getTime(index);
  317 + return time != null ? time.toLocalTime() : null;
  318 +
  319 + case Types.TIMESTAMP:
  320 + Timestamp timestamp = rs.getTimestamp(index);
  321 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  322 +
  323 + case Types.BINARY:
  324 + case Types.VARBINARY:
  325 + case Types.LONGVARBINARY:
  326 + value = rs.getBytes(index);
  327 + return rs.wasNull() ? null : value;
  328 +
  329 + case Types.BLOB:
  330 + value = rs.getBlob(index);
  331 + return rs.wasNull() ? null : value;
  332 +
  333 + case Types.CLOB:
  334 + value = rs.getClob(index);
  335 + return rs.wasNull() ? null : value;
  336 +
  337 + default:
  338 + value = rs.getObject(index);
  339 + return rs.wasNull() ? null : value;
  340 + }
  341 + } catch (SQLException e) {
  342 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  343 + throw e;
  344 + }
  345 + }
  346 +}
@@ -25,6 +25,8 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler { @@ -25,6 +25,8 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler {
25 private KjzsDevicePullService kjzsDevicePullService; 25 private KjzsDevicePullService kjzsDevicePullService;
26 @Resource 26 @Resource
27 private YjhbDevicePullService yjhbDevicePullService; 27 private YjhbDevicePullService yjhbDevicePullService;
  28 + @Resource
  29 + private XpDeviceReportService xpDeviceReportService;
28 30
29 @Override 31 @Override
30 protected String getZoneName() { 32 protected String getZoneName() {
@@ -66,4 +68,19 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler { @@ -66,4 +68,19 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler {
66 logEnd(taskName); 68 logEnd(taskName);
67 } 69 }
68 } 70 }
  71 +
  72 + @Scheduled(cron = "${scheduler.xp.push:0 0/15 * * * ?}")
  73 + public void xpPushDevicesToThirdParty() {
  74 + String taskName = "Push Devices (IoT -> 3rd Party)";
  75 + logStart(taskName);
  76 + try {
  77 + log.info("[{}] Simulating pushing devices...", getZoneName());
  78 + xpDeviceReportService.deviceReport();
  79 + Thread.sleep(1000);
  80 + } catch (Exception e) {
  81 + logError(taskName, e);
  82 + } finally {
  83 + logEnd(taskName);
  84 + }
  85 + }
69 } 86 }
@@ -43,6 +43,9 @@ scheduler: @@ -43,6 +43,9 @@ scheduler:
43 nanqiao: 43 nanqiao:
44 pull: "0 0/1 * * * ?" 44 pull: "0 0/1 * * * ?"
45 push: "0 0/1 * * * ?" 45 push: "0 0/1 * * * ?"
  46 + xp:
  47 + pull: "0 0/1 * * * ?"
  48 + push: "0 0/1 * * * ?"
46 49
47 hn: 50 hn:
48 third: 51 third:
@@ -294,3 +297,23 @@ device: @@ -294,3 +297,23 @@ device:
294 detail: 297 detail:
295 url: "https://iotgc.cniot.vip/triColorLamp/dtuSnState" 298 url: "https://iotgc.cniot.vip/triColorLamp/dtuSnState"
296 299
  300 +xp:
  301 + third:
  302 + domain: "http://220.180.204.38:1803/report-property"
  303 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  304 + jdbcUserName: "postgres"
  305 + jdbcPassword: "postgres"
  306 + selectSql: "SELECT
  307 + 'XP251111720' AS sn,
  308 + tkl1.str_v AS STATUS,
  309 + tkl2.dbl_v AS cumulativeOutput
  310 +FROM
  311 + device de
  312 + LEFT JOIN device_credentials dc ON de.id = dc.device_id
  313 + LEFT JOIN ts_kv_latest tkl1 ON de.id = tkl1.entity_id
  314 + AND tkl1.KEY = 61
  315 + LEFT JOIN ts_kv_latest tkl2 ON de.id = tkl2.entity_id
  316 + AND tkl2.KEY = 175
  317 +WHERE
  318 + de.organization_id IN ('f82530a0-93e4-4aeb-9339-f5b6d1127840') and dc.credentials_id='AGX251111720'"
  319 +