Commit 33ce0055faa50bd79e8cd005c7541bbc64a0820b

Authored by 杨鸣坤
1 parent f4b9cc4b

三环朝晖数据拉取上报

@@ -35,6 +35,55 @@ @@ -35,6 +35,55 @@
35 <artifactId>spring-boot-starter-test</artifactId> 35 <artifactId>spring-boot-starter-test</artifactId>
36 <scope>test</scope> 36 <scope>test</scope>
37 </dependency> 37 </dependency>
  38 + <dependency>
  39 + <groupId>com.zaxxer</groupId>
  40 + <artifactId>HikariCP</artifactId>
  41 + <version>4.0.3</version>
  42 + </dependency>
  43 + <dependency>
  44 + <groupId>com.zaxxer</groupId>
  45 + <artifactId>HikariCP-java7</artifactId>
  46 + <version>2.4.13</version>
  47 + </dependency>
  48 + <dependency>
  49 + <groupId>org.postgresql</groupId>
  50 + <artifactId>postgresql</artifactId>
  51 + <version>42.7.3</version>
  52 + </dependency>
  53 + <dependency>
  54 + <groupId>org.apache.httpcomponents</groupId>
  55 + <artifactId>httpclient</artifactId>
  56 + <version>4.5.8</version>
  57 + </dependency>
  58 + <!--工具类-->
  59 + <dependency>
  60 + <groupId>org.apache.commons</groupId>
  61 + <artifactId>commons-lang3</artifactId>
  62 + <version>3.6</version>
  63 + </dependency>
  64 +
  65 + <!--工具类-->
  66 + <dependency>
  67 + <groupId>org.apache.commons</groupId>
  68 + <artifactId>commons-collections4</artifactId>
  69 + <version>4.2</version>
  70 + </dependency>
  71 +
  72 + <dependency>
  73 + <groupId>com.alibaba</groupId>
  74 + <artifactId>fastjson</artifactId>
  75 + <version>1.2.60</version>
  76 + </dependency>
  77 + <dependency>
  78 + <groupId>org.springframework.boot</groupId>
  79 + <artifactId>spring-boot-starter-data-redis</artifactId>
  80 + <version>2.0.9.RELEASE</version>
  81 + </dependency>
  82 + <dependency>
  83 + <groupId>redis.clients</groupId>
  84 + <artifactId>jedis</artifactId>
  85 + <version>2.9.0</version>
  86 + </dependency>
38 </dependencies> 87 </dependencies>
39 88
40 <build> 89 <build>
1 package com.iot.scheduler.controller; 1 package com.iot.scheduler.controller;
2 2
  3 +import com.iot.scheduler.service.HnDeviceReportService;
  4 +import com.iot.scheduler.service.ShDevicePullService;
  5 +import jakarta.annotation.Resource;
3 import org.springframework.web.bind.annotation.GetMapping; 6 import org.springframework.web.bind.annotation.GetMapping;
4 import org.springframework.web.bind.annotation.RestController; 7 import org.springframework.web.bind.annotation.RestController;
5 8
6 @RestController 9 @RestController
7 public class HealthController { 10 public class HealthController {
8 11
  12 + @Resource
  13 + private HnDeviceReportService hnDeviceReportService;
  14 + @Resource
  15 + private ShDevicePullService shDevicePullService;
  16 +
9 @GetMapping("/health") 17 @GetMapping("/health")
10 public String health() { 18 public String health() {
11 return "IoT Scheduler is running..."; 19 return "IoT Scheduler is running...";
12 } 20 }
  21 +
  22 + @GetMapping("/manualSynchronization")
  23 + public void manualSynchronization() {
  24 + hnDeviceReportService.deviceReport();
  25 + }
  26 +
  27 + @GetMapping("/pullSynchronization")
  28 + public void pullSynchronization() {
  29 + shDevicePullService.pullDeviceAndPushToIot();
  30 + }
  31 +
13 } 32 }
  1 +package com.iot.scheduler.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * IOT平台token
  7 + */
  8 +@Data
  9 +public class DeviceToken {
  10 + //设备凭证类型
  11 + private String credentialsType;
  12 + //设备凭证id
  13 + private String credentialsId;
  14 + //凭证值
  15 + private String credentialsValue;
  16 +
  17 +}
  1 +package com.iot.scheduler.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * IOT平台:设备实体
  7 + */
  8 +@Data
  9 +public class QxDeviceInfo {
  10 + //设备厂商(必填)
  11 + private String brand;
  12 +
  13 + //别名(选填)
  14 + private String alias;
  15 +
  16 + //名称(必填)
  17 + private String name;
  18 +
  19 + //设备类型(
  20 + //网关:GATEWAY, 直连:DIRECT_CONNECTION, 网关子:SENSOR)
  21 + //默认填写:DIRECT_CONNECTION
  22 + private String deviceType;
  23 +
  24 + //协议类型
  25 + //(DEFAULT,TCP,MQTT,COAP,GB
  26 + //T28281)默认填写:DEFAULT
  27 + private String transportType;
  28 +
  29 + //平台设备配置 ID
  30 + private String deviceProfileId;
  31 +
  32 + //设备配置 ID
  33 + private String profileId;
  34 +
  35 + //组织 ID
  36 + private String organizationId;
  37 +
  38 + //设备标签
  39 + private String label;
  40 +
  41 + //设备描述
  42 + private String description;
  43 +
  44 + //设备编号
  45 + private String sn;
  46 +
  47 + //设备凭证信息
  48 + private DeviceToken deviceToken;
  49 +
  50 +}
  1 +package com.iot.scheduler.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Date;
  6 +
  7 +/**
  8 + * 三色灯状态详情
  9 + */
  10 +@Data
  11 +public class QxDeviceInfoDetail {
  12 + //设备状态
  13 + // RUN(运/行)/OFF(关机)/ERROR(故障)/STAND(待机)
  14 + private String status;
  15 +
  16 + //是否报警 1/0
  17 + private boolean alarm;
  18 +
  19 + //报警类型
  20 + private String alarmType;
  21 +
  22 + //设备序列号
  23 + private String dtuSn;
  24 +
  25 + //设备时间
  26 + private Date startTime;
  27 +
  28 +
  29 + //以下为非必填
  30 + //报警详情
  31 + private String alarmInfo;
  32 +
  33 + //用电能耗
  34 + private double eleEnergy;
  35 +
  36 + //用水量
  37 + private double waterEnergy;
  38 +
  39 + //用气量
  40 + private double gasEnergy;
  41 +
  42 + //产能
  43 + private double capacity;
  44 +
  45 + /**
  46 + * 累计产量
  47 + */
  48 + private double cumulativeOutput;
  49 +
  50 +}
  1 +package com.iot.scheduler.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Map;
  6 +
  7 +/**
  8 + * 上报设备状态
  9 + */
  10 +@Data
  11 +public class ReportDevice {
  12 +
  13 + // 设备ID
  14 + private String deviceId;
  15 +
  16 + // 上报设备属性
  17 + private Map<String, Object> properties;
  18 +}
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.iot.scheduler.model.ReportDevice;
  5 +import com.zaxxer.hikari.HikariConfig;
  6 +import com.zaxxer.hikari.HikariDataSource;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.apache.commons.collections4.CollectionUtils;
  9 +import org.apache.http.Consts;
  10 +import org.apache.http.HttpEntity;
  11 +import org.apache.http.client.methods.CloseableHttpResponse;
  12 +import org.apache.http.client.methods.HttpPost;
  13 +import org.apache.http.entity.ContentType;
  14 +import org.apache.http.entity.StringEntity;
  15 +import org.apache.http.impl.client.CloseableHttpClient;
  16 +import org.apache.http.impl.client.HttpClients;
  17 +import org.springframework.beans.factory.annotation.Value;
  18 +import org.springframework.stereotype.Service;
  19 +
  20 +import java.io.ByteArrayOutputStream;
  21 +import java.io.IOException;
  22 +import java.io.InputStream;
  23 +import java.sql.*;
  24 +import java.util.ArrayList;
  25 +import java.util.HashMap;
  26 +import java.util.List;
  27 +import java.util.Map;
  28 +
  29 +/**
  30 + * 怀宁设备属性上报
  31 + */
  32 +@Slf4j
  33 +@Service
  34 +public class HnDeviceReportService {
  35 +
  36 + @Value("${hn.third.domain:http://220.180.204.38:1803/report-property}")
  37 + private String reportUrl;
  38 +
  39 + @Value("${hn.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  40 + private String jdbcUrl;
  41 +
  42 + @Value("${hn.third.jdbcUserName:postgres}")
  43 + private String jdbcUserName;
  44 +
  45 + @Value("${hn.third.jdbcPassword:postgres}")
  46 + private String jdbcPassword;
  47 +
  48 + @Value("${hn.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  49 + private String selectSql;
  50 +
  51 + public void deviceReport() {
  52 + log.info("开始执行怀宁设备属性上报任务");
  53 + try {
  54 + List<Object> needSyncDataList = initConnectAndSelectData();
  55 + log.info("数据库查询完成,共获取{}条数据", needSyncDataList != null ? needSyncDataList.size() : 0);
  56 +
  57 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  58 + log.info("没有需要上报的数据,任务结束");
  59 + return;
  60 + }
  61 +
  62 + log.info("开始处理{}条设备数据并进行上报", needSyncDataList.size());
  63 + int successCount = 0;
  64 + int failCount = 0;
  65 +
  66 + for (int i = 0; i < needSyncDataList.size(); i++) {
  67 + Object needSyncData = needSyncDataList.get(i);
  68 + try {
  69 + List<Object> dataList = (ArrayList) needSyncData;
  70 + String deviceId = dataList.get(0).toString();
  71 +
  72 + log.info("开始处理设备[{}]的数据,进度: {}/{}", deviceId, i + 1, needSyncDataList.size());
  73 +
  74 + ReportDevice reportDevice = new ReportDevice();
  75 + reportDevice.setDeviceId(deviceId);
  76 + Map<String, Object> propertiesMap = new HashMap<>(4);
  77 +
  78 + String status = dataList.get(1).toString();
  79 + if ("RUN".equals(status)) {
  80 + propertiesMap.put("State_Run", 1);
  81 + propertiesMap.put("State_PowerOn", 1);
  82 + propertiesMap.put("State_Alarm", 0);
  83 + log.info("设备[{}]状态为: RUN", deviceId);
  84 + } else if ("STAND".equals(status)) {
  85 + propertiesMap.put("State_Run", 0);
  86 + propertiesMap.put("State_PowerOn", 1);
  87 + propertiesMap.put("State_Alarm", 0);
  88 + log.info("设备[{}]状态为: STAND", deviceId);
  89 + } else if ("ERROR".equals(status)) {
  90 + propertiesMap.put("State_Run", 0);
  91 + propertiesMap.put("State_PowerOn", 1);
  92 + propertiesMap.put("State_Alarm", 1);
  93 + log.info("设备[{}]状态为: ERROR", deviceId);
  94 + } else {
  95 + propertiesMap.put("State_Run", 0);
  96 + propertiesMap.put("State_PowerOn", 0);
  97 + propertiesMap.put("State_Alarm", 0);
  98 + log.info("设备[{}]状态为: 未知[{}],设置为离线", deviceId, status);
  99 + }
  100 +
  101 + Object outputValue = dataList.get(2);
  102 + long outputLong = 0L;
  103 +
  104 + if (outputValue != null) {
  105 + if (outputValue instanceof Double) {
  106 + double doubleValue = (Double) outputValue;
  107 + outputLong = (long) doubleValue; // 直接转型,会丢失小数部分
  108 + log.info("设备[{}]输出值: {} (double转换为long: {})", deviceId, doubleValue, outputLong);
  109 + } else if (outputValue instanceof Number) {
  110 + outputLong = ((Number) outputValue).longValue();
  111 + log.info("设备[{}]输出值: {} (Number转换为long: {})", deviceId, outputValue, outputLong);
  112 + } else {
  113 + // 处理字符串或其他类型
  114 + outputLong = Long.parseLong(outputValue.toString());
  115 + log.info("设备[{}]输出值: {} (字符串转换为long: {})", deviceId, outputValue, outputLong);
  116 + }
  117 + } else {
  118 + log.info("设备[{}]输出值为null,使用默认值0", deviceId);
  119 + }
  120 +
  121 + propertiesMap.put("ProInfo_Output", outputLong);
  122 + reportDevice.setProperties(propertiesMap);
  123 +
  124 + boolean sendResult = sendReportDevice(reportDevice);
  125 + if (sendResult) {
  126 + successCount++;
  127 + log.info("设备[{}]数据上报成功", deviceId);
  128 + } else {
  129 + failCount++;
  130 + log.error("设备[{}]数据上报失败", deviceId);
  131 + }
  132 +
  133 + } catch (Exception e) {
  134 + failCount++;
  135 + log.error("处理第{}条数据时发生异常,数据内容: {}", i + 1, needSyncData, e);
  136 + }
  137 + }
  138 +
  139 + log.info("设备属性上报任务完成,成功: {}条,失败: {}条,总计: {}条",
  140 + successCount, failCount, needSyncDataList.size());
  141 +
  142 + } catch (Exception e) {
  143 + log.error("执行设备属性上报任务时发生异常", e);
  144 + } finally {
  145 + log.info("怀宁设备属性上报任务执行结束");
  146 + }
  147 + }
  148 +
  149 + private boolean sendReportDevice(ReportDevice reportDevice) {
  150 + String deviceId = reportDevice.getDeviceId();
  151 + log.info("开始上报设备[{}]的数据,请求URL: {}", deviceId, reportUrl);
  152 +
  153 + HttpPost httpPost = new HttpPost(reportUrl);
  154 + CloseableHttpClient httpClient = HttpClients.createDefault();
  155 + String requestBody = JSON.toJSONString(reportDevice);
  156 +
  157 + log.info("设备[{}]上报请求体: {}", deviceId, requestBody);
  158 +
  159 + StringEntity entity = new StringEntity(requestBody, ContentType.create("application/json", Consts.UTF_8));
  160 + httpPost.setEntity(entity);
  161 + httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
  162 +
  163 + try (CloseableHttpResponse execute = httpClient.execute(httpPost)) {
  164 + int statusCode = execute.getStatusLine().getStatusCode();
  165 +
  166 + HttpEntity res = execute.getEntity();
  167 + String responseBody = "";
  168 + if (res != null) {
  169 + try (InputStream is = res.getContent();
  170 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
  171 + byte[] buf = new byte[128];
  172 + int len;
  173 + while ((len = is.read(buf)) != -1) {
  174 + byteArrayOutputStream.write(buf, 0, len);
  175 + }
  176 + responseBody = byteArrayOutputStream.toString("UTF-8");
  177 + }
  178 + }
  179 +
  180 + log.info("设备[{}]上报响应,状态码: {},响应内容: {}", deviceId, statusCode, responseBody);
  181 +
  182 + if (statusCode >= 200 && statusCode < 300) {
  183 + return true;
  184 + } else {
  185 + log.error("设备[{}]上报失败,HTTP状态码: {},响应: {}", deviceId, statusCode, responseBody);
  186 + return false;
  187 + }
  188 +
  189 + } catch (IOException e) {
  190 + log.error("设备[{}]上报请求发生IO异常", deviceId, e);
  191 + return false;
  192 + } finally {
  193 + try {
  194 + httpClient.close();
  195 + } catch (IOException e) {
  196 + log.error("关闭HTTP客户端时发生异常", e);
  197 + }
  198 + }
  199 + }
  200 +
  201 + private List<Object> initConnectAndSelectData() {
  202 + Connection connection = null;
  203 + PreparedStatement statement = null;
  204 + ResultSet resultSet = null;
  205 + HikariDataSource dataSource = null;
  206 + List<Object> resultList = new ArrayList<>();
  207 +
  208 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  209 +
  210 + try {
  211 + HikariConfig config = new HikariConfig();
  212 + config.setJdbcUrl(jdbcUrl);
  213 + config.setUsername(jdbcUserName);
  214 + config.setPassword(jdbcPassword);
  215 + config.setDriverClassName("org.postgresql.Driver");
  216 + config.setMaximumPoolSize(5);
  217 + config.setMinimumIdle(5);
  218 + config.setConnectionTimeout(60000);
  219 + config.setConnectionTestQuery("SELECT 1");
  220 +
  221 + dataSource = new HikariDataSource(config);
  222 + log.info("Hikari连接池配置完成");
  223 +
  224 + connection = dataSource.getConnection();
  225 + log.info("数据库连接成功");
  226 +
  227 + statement = connection.prepareStatement(selectSql);
  228 + log.info("执行SQL查询: {}", selectSql);
  229 +
  230 + resultSet = statement.executeQuery();
  231 + ResultSetMetaData metaData = resultSet.getMetaData();
  232 + int columnCount = metaData.getColumnCount();
  233 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  234 +
  235 + int rowCount = 0;
  236 + while (resultSet.next()) {
  237 + List<Object> result = new ArrayList<>(columnCount);
  238 + for (int index = 1; index <= columnCount; index++) {
  239 + int columnType = metaData.getColumnType(index);
  240 + Object value = getTypedValue(resultSet, index, columnType);
  241 + result.add(value);
  242 + }
  243 + resultList.add(result);
  244 + rowCount++;
  245 +
  246 + // 每处理1000行记录一次日志
  247 + if (rowCount % 1000 == 0) {
  248 + log.info("已处理{}行数据", rowCount);
  249 + }
  250 + }
  251 +
  252 + log.info("数据查询完成,共获取{}行数据", rowCount);
  253 +
  254 + } catch (SQLException e) {
  255 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  256 + } catch (Exception e) {
  257 + log.error("初始化数据库连接或查询数据时发生异常", e);
  258 + } finally {
  259 + // 释放资源
  260 + try {
  261 + if (resultSet != null) resultSet.close();
  262 + if (statement != null) statement.close();
  263 + if (connection != null) connection.close();
  264 + log.info("数据库连接资源已释放");
  265 + } catch (SQLException e) {
  266 + log.error("关闭数据库资源时发生异常", e);
  267 + }
  268 +
  269 + if (dataSource != null) {
  270 + try {
  271 + dataSource.close();
  272 + log.info("HikariDataSource连接池已关闭");
  273 + } catch (Exception e) {
  274 + log.error("关闭HikariDataSource连接池时发生异常", e);
  275 + }
  276 + }
  277 + }
  278 +
  279 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  280 + return resultList;
  281 + }
  282 +
  283 + private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  284 + Object value;
  285 +
  286 + try {
  287 + switch (sqlType) {
  288 + case Types.BIT:
  289 + case Types.BOOLEAN:
  290 + value = rs.getBoolean(index);
  291 + return rs.wasNull() ? null : value;
  292 +
  293 + case Types.TINYINT:
  294 + case Types.SMALLINT:
  295 + case Types.INTEGER:
  296 + value = rs.getInt(index);
  297 + return rs.wasNull() ? null : value;
  298 +
  299 + case Types.BIGINT:
  300 + value = rs.getLong(index);
  301 + return rs.wasNull() ? null : value;
  302 +
  303 + case Types.FLOAT:
  304 + case Types.REAL:
  305 + value = rs.getFloat(index);
  306 + return rs.wasNull() ? null : value;
  307 +
  308 + case Types.DOUBLE:
  309 + value = rs.getDouble(index);
  310 + return rs.wasNull() ? null : value;
  311 +
  312 + case Types.NUMERIC:
  313 + case Types.DECIMAL:
  314 + value = rs.getBigDecimal(index);
  315 + return rs.wasNull() ? null : value;
  316 +
  317 + case Types.CHAR:
  318 + case Types.VARCHAR:
  319 + case Types.LONGVARCHAR:
  320 + case Types.NCHAR:
  321 + case Types.NVARCHAR:
  322 + case Types.LONGNVARCHAR:
  323 + value = rs.getString(index);
  324 + return rs.wasNull() ? null : value;
  325 +
  326 + case Types.DATE:
  327 + Date date = rs.getDate(index);
  328 + return date != null ? date.toLocalDate() : null;
  329 +
  330 + case Types.TIME:
  331 + Time time = rs.getTime(index);
  332 + return time != null ? time.toLocalTime() : null;
  333 +
  334 + case Types.TIMESTAMP:
  335 + Timestamp timestamp = rs.getTimestamp(index);
  336 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  337 +
  338 + case Types.BINARY:
  339 + case Types.VARBINARY:
  340 + case Types.LONGVARBINARY:
  341 + value = rs.getBytes(index);
  342 + return rs.wasNull() ? null : value;
  343 +
  344 + case Types.BLOB:
  345 + value = rs.getBlob(index);
  346 + return rs.wasNull() ? null : value;
  347 +
  348 + case Types.CLOB:
  349 + value = rs.getClob(index);
  350 + return rs.wasNull() ? null : value;
  351 +
  352 + default:
  353 + value = rs.getObject(index);
  354 + return rs.wasNull() ? null : value;
  355 + }
  356 + } catch (SQLException e) {
  357 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  358 + throw e;
  359 + }
  360 + }
  361 +}
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONArray;
  5 +import com.alibaba.fastjson.JSONObject;
  6 +import com.alibaba.fastjson.TypeReference;
  7 +import com.iot.scheduler.model.DeviceToken;
  8 +import com.iot.scheduler.model.QxDeviceInfo;
  9 +import com.iot.scheduler.model.QxDeviceInfoDetail;
  10 +import jakarta.annotation.Resource;
  11 +import lombok.extern.slf4j.Slf4j;
  12 +import org.apache.commons.lang3.StringUtils;
  13 +import org.apache.http.Consts;
  14 +import org.apache.http.HttpEntity;
  15 +import org.apache.http.client.methods.CloseableHttpResponse;
  16 +import org.apache.http.client.methods.HttpGet;
  17 +import org.apache.http.client.methods.HttpPost;
  18 +import org.apache.http.entity.ContentType;
  19 +import org.apache.http.entity.StringEntity;
  20 +import org.apache.http.impl.client.CloseableHttpClient;
  21 +import org.apache.http.impl.client.HttpClients;
  22 +import org.apache.http.message.BasicHeader;
  23 +import org.apache.http.util.EntityUtils;
  24 +import org.springframework.beans.factory.annotation.Value;
  25 +import org.springframework.data.redis.core.RedisTemplate;
  26 +import org.springframework.stereotype.Service;
  27 +import org.springframework.util.CollectionUtils;
  28 +import org.springframework.util.LinkedMultiValueMap;
  29 +import org.springframework.util.MultiValueMap;
  30 +import org.springframework.web.util.UriComponentsBuilder;
  31 +
  32 +import java.io.ByteArrayOutputStream;
  33 +import java.io.IOException;
  34 +import java.io.InputStream;
  35 +import java.text.SimpleDateFormat;
  36 +import java.time.LocalTime;
  37 +import java.util.*;
  38 +import java.util.concurrent.TimeUnit;
  39 +
  40 +/**
  41 + * 三环朝晖应用数据同步
  42 + */
  43 +@Slf4j
  44 +@Service
  45 +public class ShDevicePullService {
  46 +
  47 + @Value("${sh.iot.organizeId:}")
  48 + private String iotOrganizeId;
  49 + @Value("${sh.iot.profileId}")
  50 + private String iotProfileId;
  51 + @Value("${sh.iot.deviceProfileId}")
  52 + private String iotDeviceProfileId;
  53 + @Value("${sh.iot.userName:}")
  54 + private String iotUserName;
  55 + @Value("${sh.iot.password:}")
  56 + private String iotPassword;
  57 + @Value("${sh.iot.tokenUrl}")
  58 + private String iotTokenUrl;
  59 + @Value("${sh.iot.infoUrl}")
  60 + private String iotDeviceInfoUrl;
  61 + @Value("${sh.iot.detailUrl}")
  62 + private String iotDeviceDetailUrl;
  63 +
  64 + @Value("${device.token.url}")
  65 + private String deviceTokenUrl;
  66 + @Value("${device.token.userName}")
  67 + private String deviceUserName;
  68 + @Value("${device.token.password}")
  69 + private String devicePassword;
  70 + @Value("${device.info.url}")
  71 + private String deviceInfoUrl;
  72 + @Value("${device.detail.url}")
  73 + private String deviceDetailUrl;
  74 +
  75 + @Resource
  76 + private RedisTemplate<String, String> redisTemplate;
  77 +
  78 + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  79 +
  80 + public void pullDeviceAndPushToIot() {
  81 + String deviceResult = getDeviceInfo();
  82 + Map<String, Object> deviceInfos = JSON.parseObject(deviceResult, new TypeReference<>() {
  83 + });
  84 +
  85 + JSONArray deviceInfoList = (JSONArray) deviceInfos.get("data");
  86 + List<QxDeviceInfo> qxDeviceInfos = new ArrayList<>();
  87 + List<QxDeviceInfoDetail> qxAddDeviceInfoDetails = new ArrayList<>();
  88 + Map<String, QxDeviceInfoDetail> qxDeviceInfoDetailMap = new HashMap<>();
  89 + for (Object o : deviceInfoList) {
  90 + JSONObject deviceInfoJson = (JSONObject) o;
  91 + QxDeviceInfo qxDeviceInfo = new QxDeviceInfo();
  92 + qxDeviceInfo.setDeviceType("DIRECT_CONNECTION");
  93 + qxDeviceInfo.setTransportType("DEFAULT");
  94 + qxDeviceInfo.setOrganizationId(iotOrganizeId);
  95 + qxDeviceInfo.setDeviceProfileId(iotDeviceProfileId);
  96 + qxDeviceInfo.setProfileId(iotProfileId);
  97 +// //项目状态(1:在线,2:离线,3:报警)
  98 +// Integer projectState = deviceInfoJson.getInteger("projectState");
  99 +// if (projectState != null) {
  100 +// qxDeviceInfo.setDescription(String.valueOf(projectState));
  101 +// }
  102 +
  103 + //项目类型
  104 + qxDeviceInfo.setLabel("生产设备");
  105 + //设备名称
  106 + String deviceName = deviceInfoJson.getString("deviceName");
  107 +
  108 + qxDeviceInfo.setName(deviceName);
  109 + qxDeviceInfo.setBrand(deviceName);
  110 + //序列号
  111 + String dtuSn = deviceInfoJson.getString("dtuSn");
  112 + qxDeviceInfo.setSn(dtuSn);
  113 + //高速/中速
  114 + List<String> highSpeedDeviceList = Arrays.asList("AGX251111720", "AGY251111266", "AGM251111722", "AGF251111329",
  115 + "AGM251111301", "AGM251111746", "AGW251111837", "AGW251111403", "AGW251111450");
  116 + if (highSpeedDeviceList.contains(dtuSn)) {
  117 + qxDeviceInfo.setDescription("高速机");
  118 + } else {
  119 + qxDeviceInfo.setDescription("中速机");
  120 + }
  121 +
  122 + DeviceToken deviceToken = new DeviceToken();
  123 + deviceToken.setCredentialsType("ACCESS_TOKEN");
  124 + deviceToken.setCredentialsId(dtuSn);
  125 + deviceToken.setCredentialsValue(dtuSn);
  126 + qxDeviceInfo.setDeviceToken(deviceToken);
  127 + qxDeviceInfos.add(qxDeviceInfo);
  128 + //有序列号直接获取灯信息
  129 + if (StringUtils.isNotBlank(dtuSn)) {
  130 + String deviceInfoDetails = getDeviceInfoDetail(dtuSn);
  131 + if (StringUtils.isBlank(deviceInfoDetails)) {
  132 + continue;
  133 + }
  134 +
  135 + Map<String, Object> deviceInfoDetailMap = JSON.parseObject(deviceInfoDetails, new TypeReference<>() {
  136 + });
  137 +
  138 + JSONArray deviceInfoDetailList = (JSONArray) deviceInfoDetailMap.get("data");
  139 + if (CollectionUtils.isEmpty(deviceInfoDetailList)) {
  140 + continue;
  141 + }
  142 +
  143 + JSONObject deviceInfoDetailJson = (JSONObject) deviceInfoDetailList.get(0);
  144 + //灯详情数据
  145 + //序列号
  146 + String dtuSnDetail = deviceInfoDetailJson.getString("dtuSn");
  147 + //开始时间
  148 + String startTime = deviceInfoDetailJson.getString("startTime");
  149 + QxDeviceInfoDetail qxDeviceInfoDetail = new QxDeviceInfoDetail();
  150 + qxDeviceInfoDetail.setAlarm(false);
  151 + //灯状态(0:灭灯,1:红,2:黄,3:绿,4:蓝)
  152 + Integer lampState = deviceInfoDetailJson.getInteger("lampState");
  153 + switch (lampState) {
  154 + case 0:
  155 + qxDeviceInfoDetail.setStatus("OFF");
  156 + break;
  157 + case 1:
  158 + qxDeviceInfoDetail.setStatus("ERROR");
  159 + qxDeviceInfoDetail.setAlarm(true);
  160 + break;
  161 + case 2:
  162 + qxDeviceInfoDetail.setStatus("STAND");
  163 + break;
  164 + case 3:
  165 + qxDeviceInfoDetail.setStatus("RUN");
  166 + //先从缓存里面拿token信息
  167 + String totalCapacity = getTotalCapacity("total_capacity_" + dtuSn, highSpeedDeviceList.contains(dtuSn));
  168 + qxDeviceInfoDetail.setCumulativeOutput(Double.valueOf(totalCapacity));
  169 + break;
  170 + default:
  171 + continue;
  172 + }
  173 +
  174 + if (StringUtils.isNotBlank(startTime)) {
  175 + try {
  176 + qxDeviceInfoDetail.setStartTime(dateFormat.parse(startTime));
  177 + } catch (Exception e) {
  178 + log.error("时间格式出错");
  179 + qxDeviceInfoDetail.setStartTime(new Date());
  180 + }
  181 + } else {
  182 + qxDeviceInfoDetail.setStartTime(new Date());
  183 + }
  184 +
  185 + qxDeviceInfoDetail.setDtuSn(dtuSn);
  186 + qxDeviceInfoDetailMap.put(dtuSn, qxDeviceInfoDetail);
  187 + qxAddDeviceInfoDetails.add(qxDeviceInfoDetail);
  188 + }
  189 + }
  190 +
  191 + //将数据同步到IOT平台
  192 + Map<String, String> qxParam = new HashMap<>(2);
  193 + qxParam.put("username", iotUserName);
  194 + qxParam.put("password", iotPassword);
  195 +
  196 + HttpPost qxHttpPost = new HttpPost(iotTokenUrl);
  197 + String qxResult = sendPost(qxHttpPost, JSON.toJSONString(qxParam));
  198 + if (StringUtils.isBlank(qxResult)) {
  199 + return;
  200 + }
  201 + Map<String, Object> qxRes = JSON.parseObject(qxResult, new TypeReference<Map<String, Object>>() {
  202 + });
  203 +
  204 + String qxAccessToken = (String) qxRes.get("token");
  205 + if (StringUtils.isBlank(qxAccessToken)) {
  206 + return;
  207 + }
  208 +
  209 + BasicHeader qxAuthorization = new BasicHeader("X-Authorization", "Bearer " + qxAccessToken);
  210 + if (!CollectionUtils.isEmpty(qxDeviceInfos)) {
  211 + HttpPost qxDeviceInfoPost = new HttpPost(iotDeviceInfoUrl);
  212 + qxDeviceInfoPost.addHeader(qxAuthorization);
  213 + for (QxDeviceInfo qxDeviceInfo : qxDeviceInfos) {
  214 + // todo
  215 + String syncDeviceInfo = sendPost(qxDeviceInfoPost, JSON.toJSONString(qxDeviceInfo));
  216 + //log.info("同步设备信息 syncDeviceInfo:{}", syncDeviceInfo);
  217 + }
  218 + }
  219 +
  220 + if (!CollectionUtils.isEmpty(qxAddDeviceInfoDetails)) {
  221 + for (QxDeviceInfoDetail qxDeviceInfoDetail : qxAddDeviceInfoDetails) {
  222 + String qxDeviceInfoDetailUrlStr = iotDeviceDetailUrl + qxDeviceInfoDetail.getDtuSn() + "/telemetry";
  223 + HttpPost qxDeviceInfoDetailPost = new HttpPost(qxDeviceInfoDetailUrlStr);
  224 + qxDeviceInfoDetailPost.addHeader(qxAuthorization);
  225 + String syncDeviceInfoDetail = sendPost(qxDeviceInfoDetailPost, JSON.toJSONString(qxDeviceInfoDetail));
  226 + }
  227 + }
  228 + }
  229 +
  230 + private String getTotalCapacity(String key, boolean highSpeedDevice) {
  231 + String totalCapacity = redisTemplate.opsForValue().get(key);
  232 + Double production = highSpeedDevice ? 120D : 90D;
  233 + Double totalCapacityD = 0D;
  234 + if (StringUtils.isEmpty(totalCapacity)) {
  235 + totalCapacityD = production;
  236 + } else {
  237 + totalCapacityD = Double.parseDouble(totalCapacity) + production;
  238 + }
  239 +
  240 + redisTemplate.opsForValue().set(key, String.valueOf(totalCapacityD));
  241 + return String.valueOf(totalCapacityD);
  242 + }
  243 +
  244 + public String getDeviceInfoDetail(String dtuSn) {
  245 + String accessToken = getAccessToken();
  246 + Map<String, String> dtuSnOb = new HashMap<>(1);
  247 + dtuSnOb.put("dtuSn", dtuSn);
  248 + Map<String, String> headerMap = new HashMap<>(1);
  249 + headerMap.put("Authorization", "Bearer " + accessToken);
  250 + String deviceInfoDetail = sendRequestGet(deviceDetailUrl, dtuSnOb, headerMap);
  251 + if (StringUtils.isBlank(deviceInfoDetail)) {
  252 + return null;
  253 + }
  254 +
  255 + Map<String, Object> deviceInfoDetailMap = JSON.parseObject(deviceInfoDetail,
  256 + new TypeReference<>() {
  257 + });
  258 + Integer deviceInfoDetailCode = (Integer) deviceInfoDetailMap.get("code");
  259 + if (deviceInfoDetailCode != 200) {
  260 + return null;
  261 + }
  262 +
  263 + JSONArray deviceInfoDetailList = (JSONArray) deviceInfoDetailMap.get("data");
  264 + if (CollectionUtils.isEmpty(deviceInfoDetailList)) {
  265 + return null;
  266 + }
  267 +
  268 + JSONObject deviceInfoDetailJson = (JSONObject) deviceInfoDetailList.get(0);
  269 + //灯详情数据
  270 + //灯状态(0:灭灯,1:红,2:黄,3:绿,4:蓝)
  271 + Integer lampState = deviceInfoDetailJson.getInteger("lampState");
  272 + if (lampState == null) {
  273 + return null;
  274 + }
  275 +
  276 + return deviceInfoDetail;
  277 + }
  278 +
  279 + public String getDeviceInfo() {
  280 + String accessToken = getAccessToken();
  281 + // 初始化headerMap并设置Authorization
  282 + Map<String, String> headerMap = new HashMap<>(1);
  283 + headerMap.put("Authorization", "Bearer " + accessToken);
  284 +
  285 + Map<String, String> paramsMap = new HashMap<>();
  286 + paramsMap.put("groupName", "三环朝晖");
  287 +
  288 + // 第一次请求设备信息
  289 + String deviceResult = sendRequestGet(deviceInfoUrl, paramsMap, headerMap);
  290 +
  291 + // 检查设备信息是否为空
  292 + if (StringUtils.isBlank(deviceResult)) {
  293 + return null;
  294 + }
  295 +
  296 + // 解析设备信息
  297 + Map<String, Object> deviceInfos = JSON.parseObject(deviceResult, new TypeReference<Map<String, Object>>() {
  298 + });
  299 + Integer deviceInfoCode = (Integer) deviceInfos.get("code");
  300 +
  301 + // 如果code不为200,可能是accessToken失效,重新获取token并重试
  302 + if (deviceInfoCode != 200) {
  303 + accessToken = getAccessToken();
  304 + if (StringUtils.isEmpty(accessToken)) {
  305 + return null;
  306 + }
  307 +
  308 + // 更新headerMap中的Authorization
  309 + headerMap.put("Authorization", "Bearer " + accessToken);
  310 +
  311 + // 第二次请求设备信息
  312 + deviceResult = sendRequestGet(deviceInfoUrl, paramsMap, headerMap);
  313 + if (StringUtils.isBlank(deviceResult)) {
  314 + return null;
  315 + }
  316 +
  317 + // 重新解析设备信息
  318 + deviceInfos = JSON.parseObject(deviceResult, new TypeReference<Map<String, Object>>() {
  319 + });
  320 + deviceInfoCode = (Integer) deviceInfos.get("code");
  321 +
  322 + // 如果第二次请求仍然失败,返回错误信息
  323 + if (deviceInfoCode != 200) {
  324 + return null;
  325 + }
  326 + }
  327 +
  328 + // 返回成功的设备信息
  329 + return deviceResult;
  330 + }
  331 +
  332 + private String getAccessToken() {
  333 + String accessToken = "";
  334 + String redisKey = "device_token";
  335 + if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(redisKey)) && redisTemplate.getExpire(redisKey) > 0) {
  336 + return redisTemplate.opsForValue().get(redisKey);
  337 + }
  338 +
  339 + Map<String, String> param = new HashMap<>(2);
  340 + param.put("username", deviceUserName);
  341 + param.put("password", devicePassword);
  342 + HttpPost httpPost = new HttpPost(deviceTokenUrl);
  343 + String result = sendPost(httpPost, JSON.toJSONString(param));
  344 + if (StringUtils.isBlank(result)) {
  345 + return accessToken;
  346 + }
  347 +
  348 + Map<String, Object> res = JSON.parseObject(result, new TypeReference<>() {
  349 + });
  350 +
  351 + Integer code = (Integer) res.get("code");
  352 + if (code == 200) {
  353 + JSONObject data = (JSONObject) res.get("data");
  354 + accessToken = (String) data.get("token");
  355 + redisTemplate.opsForValue().set(redisKey, accessToken, 3600, TimeUnit.SECONDS); // 一小时过期
  356 + }
  357 +
  358 + return accessToken;
  359 + }
  360 +
  361 + public static String sendRequestGet(String url, Map<String, String> params, Map<String, String> header) {
  362 + //实例化httpclient
  363 + CloseableHttpClient httpclient = HttpClients.createDefault();
  364 + url = builderUrl(url, params);
  365 + //请求结果
  366 + String content = "";
  367 + //实例化get方法
  368 + HttpGet httpget = new HttpGet(url);
  369 + if (!CollectionUtils.isEmpty(header)) {
  370 + for (Map.Entry<String, String> entry : header.entrySet()) {
  371 + httpget.setHeader(entry.getKey(), entry.getValue());
  372 + }
  373 + }
  374 +
  375 + try (CloseableHttpResponse response = httpclient.execute(httpget)) {
  376 +
  377 + //执行get方法
  378 + if (response.getStatusLine().getStatusCode() == 200) {
  379 + content = EntityUtils.toString(response.getEntity(), "UTF-8");
  380 + }
  381 + } catch (IOException e) {
  382 + log.error("sendRequest---GET Error!", e);
  383 + }
  384 + return content;
  385 + }
  386 +
  387 + private static String builderUrl(String url, Map<String, String> params) {
  388 + UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(url);
  389 + if (!CollectionUtils.isEmpty(params)) {
  390 + MultiValueMap<String, String> paramsValue = new LinkedMultiValueMap<>();
  391 + for (Map.Entry<String, String> entry : params.entrySet()) {
  392 + paramsValue.add(entry.getKey(), entry.getValue());
  393 + }
  394 +
  395 + uriBuilder = uriBuilder.queryParams(paramsValue);
  396 + }
  397 +
  398 + return uriBuilder.toUriString();
  399 + }
  400 +
  401 + private String sendPost(HttpPost httpPost, String jsonData) {
  402 + CloseableHttpClient httpClient = HttpClients.createDefault();
  403 + StringEntity entity = new StringEntity(jsonData, ContentType.create("application/json", Consts.UTF_8));
  404 + httpPost.setEntity(entity);
  405 + String result = null;
  406 + try {
  407 + CloseableHttpResponse execute = httpClient.execute(httpPost);
  408 + HttpEntity res = execute.getEntity();
  409 + InputStream is = res.getContent();
  410 + int len;
  411 + byte[] buf = new byte[128];
  412 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  413 + while ((len = is.read(buf)) != -1) {
  414 + byteArrayOutputStream.write(buf, 0, len);
  415 + }
  416 + result = byteArrayOutputStream.toString();
  417 + } catch (IOException e) {
  418 + e.printStackTrace();
  419 + }
  420 + return result;
  421 + }
  422 +}
1 package com.iot.scheduler.zone; 1 package com.iot.scheduler.zone;
2 2
  3 +import com.iot.scheduler.service.HnDeviceReportService;
3 import com.iot.scheduler.task.AbstractZoneScheduler; 4 import com.iot.scheduler.task.AbstractZoneScheduler;
  5 +import jakarta.annotation.Resource;
4 import lombok.extern.slf4j.Slf4j; 6 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.scheduling.annotation.Scheduled; 7 import org.springframework.scheduling.annotation.Scheduled;
6 import org.springframework.stereotype.Component; 8 import org.springframework.stereotype.Component;
@@ -9,6 +11,9 @@ import org.springframework.stereotype.Component; @@ -9,6 +11,9 @@ import org.springframework.stereotype.Component;
9 @Component 11 @Component
10 public class ChizhouZoneScheduler extends AbstractZoneScheduler { 12 public class ChizhouZoneScheduler extends AbstractZoneScheduler {
11 13
  14 + @Resource
  15 + private HnDeviceReportService hnDeviceReportService;
  16 +
12 @Override 17 @Override
13 protected String getZoneName() { 18 protected String getZoneName() {
14 return "Chizhou (池州经开区)"; 19 return "Chizhou (池州经开区)";
@@ -3,6 +3,15 @@ spring: @@ -3,6 +3,15 @@ spring:
3 name: iot-scheduler 3 name: iot-scheduler
4 main: 4 main:
5 banner-mode: off 5 banner-mode: off
  6 + data:
  7 + redis:
  8 + cluster:
  9 + nodes: # 集群节点列表
  10 + - 10.9.1.252:16380
  11 + - 10.9.1.252:16381
  12 + - 10.9.1.252:16382
  13 + max-redirects: 3 # 最大重定向次数
  14 + password: "Qixiao@redis20240410.com" # 如果有密码
6 15
7 server: 16 server:
8 port: 8080 17 port: 8080
@@ -24,3 +33,44 @@ scheduler: @@ -24,3 +33,44 @@ scheduler:
24 panji: 33 panji:
25 pull: "0 0/5 * * * ?" 34 pull: "0 0/5 * * * ?"
26 push: "0 0/10 * * * ?" 35 push: "0 0/10 * * * ?"
  36 +
  37 +hn:
  38 + third:
  39 + domain: "http://220.180.204.38:1803/report-property"
  40 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  41 + jdbcUserName: "postgres"
  42 + jdbcPassword: "postgres"
  43 + selectSql: "SELECT
  44 + de.sn,
  45 + tkl1.str_v,
  46 + tkl2.dbl_v
  47 + FROM
  48 + device de
  49 + LEFT JOIN ts_kv_latest tkl1 ON de.id = tkl1.entity_id
  50 + AND tkl1.key = 61
  51 + LEFT JOIN ts_kv_latest tkl2 ON de.id = tkl2.entity_id
  52 + AND tkl2.key = 175
  53 + WHERE
  54 + de.organization_id = 'f82530a0-93e4-4aeb-9339-f5b6d1127840';"
  55 +
  56 +sh:
  57 + iot:
  58 + organizeId: "f82530a0-93e4-4aeb-9339-f5b6d1127840"
  59 + profileId: "f2292f60-f738-11f0-9cb8-e3376d1e7978"
  60 + deviceProfileId: "f2292f60-f738-11f0-9cb8-e3376d1e7978"
  61 + userName: "shzh"
  62 + password: "123456"
  63 + tokenUrl: "https://iot.hzzlyun.com/api/auth/login"
  64 + infoUrl: "https://iot.hzzlyun.com/api/yt/device"
  65 + detailUrl: "https://iot.hzzlyun.com/api/v1/"
  66 +
  67 +device:
  68 + token:
  69 + url: "https://iotgc.cniot.vip/auth/token"
  70 + userName: "chuzhoult"
  71 + password: "111111"
  72 + info:
  73 + url: "https://iotgc.cniot.vip/triColorLamp/userGroupDtuSns"
  74 + detail:
  75 + url: "https://iotgc.cniot.vip/triColorLamp/dtuSnState"
  76 +