Commit 75e0131c1ffdfe207ed1b78ca976557262434fd9

Authored by 杨鸣坤
1 parent e77b9da2

潘集经开区园区设备上传

  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
  5 +import com.zaxxer.hikari.HikariConfig;
  6 +import com.zaxxer.hikari.HikariDataSource;
  7 +import jakarta.annotation.Resource;
  8 +import lombok.extern.slf4j.Slf4j;
  9 +import org.apache.commons.collections4.CollectionUtils;
  10 +import org.apache.commons.lang3.StringUtils;
  11 +import org.apache.http.HttpEntity;
  12 +import org.apache.http.NameValuePair;
  13 +import org.apache.http.client.entity.UrlEncodedFormEntity;
  14 +import org.apache.http.client.methods.CloseableHttpResponse;
  15 +import org.apache.http.client.methods.HttpPost;
  16 +import org.apache.http.entity.StringEntity;
  17 +import org.apache.http.impl.client.CloseableHttpClient;
  18 +import org.apache.http.impl.client.HttpClients;
  19 +import org.apache.http.message.BasicNameValuePair;
  20 +import org.apache.http.util.EntityUtils;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.data.redis.core.RedisTemplate;
  23 +import org.springframework.stereotype.Service;
  24 +
  25 +import java.nio.charset.StandardCharsets;
  26 +import java.sql.*;
  27 +import java.sql.Date;
  28 +import java.text.SimpleDateFormat;
  29 +import java.util.*;
  30 +import java.util.concurrent.TimeUnit;
  31 +
  32 +/**
  33 + * 潘集设备上报
  34 + */
  35 +@Slf4j
  36 +@Service
  37 +public class PjDeviceReportService {
  38 +
  39 +
  40 + @Value("${pj.third.reportUrl:http://111.39.171.168:12280/mainApi/formEngine/formData/batchAddOrUpate}")
  41 + private String reportUrl;
  42 +
  43 + @Value("${pj.third.tokenUrl:http://111.39.171.168:15555/auth/oauth/token}")
  44 + private String tokenUrl;
  45 +
  46 + @Value("${pj.third.username:adminhnjt}")
  47 + private String username;
  48 +
  49 + @Value("${pj.third.password:admin@1234}")
  50 + private String password;
  51 +
  52 + @Value("${pj.third.formId:t6937bc62e4332f00072a0849}")
  53 + private String formId;
  54 +
  55 + @Value("${pj.third.uniqueKeys:entName,deviceCode}")
  56 + private String uniqueKeys;
  57 +
  58 + @Value("${pj.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  59 + private String jdbcUrl;
  60 +
  61 + @Value("${pj.third.jdbcUserName:postgres}")
  62 + private String jdbcUserName;
  63 +
  64 + @Value("${pj.third.jdbcPassword:postgres}")
  65 + private String jdbcPassword;
  66 +
  67 + @Value("${pj.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  68 + private String selectSql;
  69 +
  70 + @Resource
  71 + private RedisTemplate<String, String> redisTemplate;
  72 +
  73 + /**
  74 + * 企业设备上报
  75 + */
  76 + public void batchReportEnterprise() {
  77 + log.info("开始执行企业设备上报任务");
  78 + long startTime = System.currentTimeMillis();
  79 +
  80 + try {
  81 + // 1. 查询需要同步的数据
  82 + log.info("步骤1: 开始查询需要同步的设备数据");
  83 + List<Object> needSyncDataList = initConnectAndSelectData();
  84 +
  85 + if (CollectionUtils.isEmpty(needSyncDataList)) {
  86 + log.info("未查询到需要同步的设备数据,任务结束");
  87 + return;
  88 + }
  89 +
  90 + log.info("共查询到 {} 条设备数据需要同步", needSyncDataList.size());
  91 +
  92 + // 2. 准备上报数据
  93 + log.info("步骤2: 开始准备上报数据");
  94 + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  95 + String nowTime = simpleDateFormat.format(new java.util.Date());
  96 + log.info("上报时间: {}", nowTime);
  97 +
  98 + CloseableHttpClient httpclient = HttpClients.createDefault();
  99 + HttpPost httpPost = new HttpPost(reportUrl);
  100 +
  101 + // 3. 获取并设置Token
  102 + log.info("步骤3: 获取访问令牌");
  103 + String token = getToken();
  104 + if (StringUtils.isBlank(token)) {
  105 + log.error("获取访问令牌失败,无法进行数据上报");
  106 + return;
  107 + }
  108 +
  109 + httpPost.setHeader("Authorization", "Bearer " + token);
  110 + httpPost.setHeader("Content-Type", "application/json; charset=utf-8");
  111 + log.info("请求URL: {}, Token已设置", reportUrl);
  112 +
  113 + // 4. 构建上报数据
  114 + log.info("步骤4: 构建上报数据结构");
  115 + List<Map<String, Object>> dataList = new ArrayList<>(needSyncDataList.size());
  116 + int successCount = 0;
  117 + int errorCount = 0;
  118 +
  119 + for (int i = 0; i < needSyncDataList.size(); i++) {
  120 + try {
  121 + Object needSyncData = needSyncDataList.get(i);
  122 + List<Object> dataObject = (ArrayList) needSyncData;
  123 +
  124 + if (dataObject.size() < 4) {
  125 + log.warn("第 {} 条数据格式不正确,期望至少4个字段,实际: {}", i + 1, dataObject.size());
  126 + errorCount++;
  127 + continue;
  128 + }
  129 +
  130 + Map<String, Object> data = new HashMap<>();
  131 + data.put("entName", dataObject.get(0));
  132 + data.put("deviceName", dataObject.get(1));
  133 + data.put("deviceCode", dataObject.get(2));
  134 + data.put("reportTime", nowTime);
  135 +
  136 + // 处理状态字段
  137 + Object statusObj = dataObject.get(3);
  138 + String statusStr = "在线";
  139 + try {
  140 + if (statusObj != null) {
  141 + long status = Long.parseLong(statusObj.toString());
  142 + if (status == 0) {
  143 + statusStr = "离线";
  144 + }
  145 + }
  146 + } catch (NumberFormatException e) {
  147 + log.warn("第 {} 条数据状态字段格式错误: {}", i + 1, statusObj);
  148 + statusStr = "未知";
  149 + }
  150 +
  151 + data.put("status", statusStr);
  152 + dataList.add(data);
  153 + successCount++;
  154 +
  155 + // 每100条记录输出一次进度
  156 + if (successCount % 100 == 0) {
  157 + log.info("已成功构建 {} 条上报数据", successCount);
  158 + }
  159 +
  160 + } catch (Exception e) {
  161 + log.error("处理第 {} 条数据时发生异常", i + 1, e);
  162 + errorCount++;
  163 + }
  164 + }
  165 +
  166 + log.info("数据构建完成,成功: {} 条,失败: {} 条", successCount, errorCount);
  167 +
  168 + if (CollectionUtils.isEmpty(dataList)) {
  169 + log.error("没有成功构建任何上报数据,任务结束");
  170 + return;
  171 + }
  172 +
  173 + // 5. 构建请求体
  174 + log.info("步骤5: 构建请求体");
  175 + Map<String, Object> sendData = new HashMap<>(3);
  176 + sendData.put("formId", formId);
  177 + List<String> uniqueKeyList = Arrays.asList(uniqueKeys.split(","));
  178 + sendData.put("uniqueKeys", uniqueKeyList);
  179 + sendData.put("datas", dataList);
  180 +
  181 + String json = JSON.toJSONString(sendData);
  182 + log.debug("请求体JSON数据: {}", json);
  183 + log.info("请求体大小: {} 字符", json.length());
  184 +
  185 + httpPost.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
  186 +
  187 + // 6. 发送请求
  188 + log.info("步骤6: 发送HTTP请求到第三方平台");
  189 + try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
  190 + int statusCode = response.getStatusLine().getStatusCode();
  191 + String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
  192 +
  193 + log.info("HTTP响应状态码: {}", statusCode);
  194 + log.info("HTTP响应内容: {}", result);
  195 +
  196 + if (statusCode == 200) {
  197 + JSONObject jsonResult = JSON.parseObject(result);
  198 + if (jsonResult != null && "true".equalsIgnoreCase(jsonResult.getString("success"))) {
  199 + log.info("数据上报成功!共上报 {} 条设备数据", dataList.size());
  200 + } else {
  201 + log.error("数据上报失败!响应状态异常: {}", result);
  202 + }
  203 + } else {
  204 + log.error("HTTP请求失败,状态码: {}", statusCode);
  205 + }
  206 +
  207 + } catch (Exception e) {
  208 + log.error("发送HTTP请求时发生异常", e);
  209 + }
  210 +
  211 + } finally {
  212 + long endTime = System.currentTimeMillis();
  213 + log.info("企业设备上报任务执行完成,总耗时: {} 毫秒", (endTime - startTime));
  214 + }
  215 + }
  216 +
  217 + /**
  218 + * 获取token
  219 + *
  220 + * @return
  221 + */
  222 + private String getToken() {
  223 + log.info("开始获取访问令牌");
  224 + long startTime = System.currentTimeMillis();
  225 + String key = "pjjkq_report_token";
  226 +
  227 + try {
  228 + // 1. 尝试从Redis获取缓存token
  229 + log.info("检查Redis中是否有缓存的Token,key: {}", key);
  230 + String token = redisTemplate.opsForValue().get(key);
  231 + Long expire = redisTemplate.getExpire(key);
  232 +
  233 + if (StringUtils.isNotBlank(token) && expire != null && expire > 0) {
  234 + log.info("从Redis缓存中获取Token成功,剩余有效期: {} 秒", expire);
  235 + return token;
  236 + }
  237 +
  238 + log.info("Redis中没有有效的Token缓存,开始请求新的Token");
  239 +
  240 + // 2. 请求新的token
  241 + CloseableHttpClient httpclient = HttpClients.createDefault();
  242 + HttpPost httpPost = new HttpPost(tokenUrl);
  243 + httpPost.setHeader("Authorization", "Basic bWljcm8tcG9ydGFsOlBAc3MxMjM0");
  244 + httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  245 +
  246 + List<NameValuePair> params = new ArrayList<>();
  247 + params.add(new BasicNameValuePair("grant_type", "password"));
  248 + params.add(new BasicNameValuePair("username", username));
  249 + params.add(new BasicNameValuePair("password", password)); // 密码脱敏
  250 +
  251 + log.info("Token请求URL: {}", tokenUrl);
  252 + log.info("请求参数 - grant_type: password, username: {}", username);
  253 +
  254 + httpPost.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
  255 +
  256 + try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
  257 + int statusCode = response.getStatusLine().getStatusCode();
  258 + log.info("Token接口响应状态码: {}", statusCode);
  259 +
  260 + HttpEntity responseEntity = response.getEntity();
  261 + String result = EntityUtils.toString(responseEntity);
  262 + log.debug("Token接口原始响应: {}", result);
  263 +
  264 + EntityUtils.consume(responseEntity);
  265 +
  266 + if (statusCode == 200) {
  267 + JSONObject json = JSON.parseObject(result);
  268 + token = json.getString("access_token");
  269 + int expiresIn = json.getIntValue("expires_in");
  270 +
  271 + log.info("成功获取Token,有效期: {} 秒", expiresIn);
  272 +
  273 + // 3. 缓存token到Redis
  274 + redisTemplate.opsForValue().set(key, token);
  275 + redisTemplate.expire(key, expiresIn, TimeUnit.SECONDS);
  276 +
  277 + log.info("Token已缓存到Redis,key: {},有效期: {} 秒", key, expiresIn);
  278 +
  279 + return token;
  280 + } else {
  281 + log.error("获取Token失败,状态码: {}, 响应: {}", statusCode, result);
  282 + return null;
  283 + }
  284 +
  285 + } catch (Exception e) {
  286 + log.error("调用Token接口时发生异常", e);
  287 + return null;
  288 + }
  289 +
  290 + } finally {
  291 + long endTime = System.currentTimeMillis();
  292 + log.info("获取Token操作完成,耗时: {} 毫秒", (endTime - startTime));
  293 + }
  294 + }
  295 +
  296 + private List<Object> initConnectAndSelectData() {
  297 + Connection connection = null;
  298 + PreparedStatement statement = null;
  299 + ResultSet resultSet = null;
  300 + HikariDataSource dataSource = null;
  301 + List<Object> resultList = new ArrayList<>();
  302 +
  303 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  304 +
  305 + try {
  306 + HikariConfig config = new HikariConfig();
  307 + config.setJdbcUrl(jdbcUrl);
  308 + config.setUsername(jdbcUserName);
  309 + config.setPassword(jdbcPassword);
  310 + config.setDriverClassName("org.postgresql.Driver");
  311 + config.setMaximumPoolSize(5);
  312 + config.setMinimumIdle(5);
  313 + config.setConnectionTimeout(60000);
  314 + config.setConnectionTestQuery("SELECT 1");
  315 +
  316 + dataSource = new HikariDataSource(config);
  317 + log.info("Hikari连接池配置完成");
  318 +
  319 + connection = dataSource.getConnection();
  320 + log.info("数据库连接成功");
  321 +
  322 + statement = connection.prepareStatement(selectSql);
  323 + log.info("执行SQL查询: {}", selectSql);
  324 +
  325 + resultSet = statement.executeQuery();
  326 + ResultSetMetaData metaData = resultSet.getMetaData();
  327 + int columnCount = metaData.getColumnCount();
  328 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  329 +
  330 + int rowCount = 0;
  331 + while (resultSet.next()) {
  332 + List<Object> result = new ArrayList<>(columnCount);
  333 + for (int index = 1; index <= columnCount; index++) {
  334 + int columnType = metaData.getColumnType(index);
  335 + Object value = getTypedValue(resultSet, index, columnType);
  336 + result.add(value);
  337 + }
  338 + resultList.add(result);
  339 + rowCount++;
  340 +
  341 + // 每处理1000行记录一次日志
  342 + if (rowCount % 1000 == 0) {
  343 + log.info("已处理{}行数据", rowCount);
  344 + }
  345 + }
  346 +
  347 + log.info("数据查询完成,共获取{}行数据", rowCount);
  348 +
  349 + } catch (SQLException e) {
  350 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  351 + } catch (Exception e) {
  352 + log.error("初始化数据库连接或查询数据时发生异常", e);
  353 + } finally {
  354 + // 释放资源
  355 + try {
  356 + if (resultSet != null) resultSet.close();
  357 + if (statement != null) statement.close();
  358 + if (connection != null) connection.close();
  359 + log.info("数据库连接资源已释放");
  360 + } catch (SQLException e) {
  361 + log.error("关闭数据库资源时发生异常", e);
  362 + }
  363 +
  364 + if (dataSource != null) {
  365 + try {
  366 + dataSource.close();
  367 + log.info("HikariDataSource连接池已关闭");
  368 + } catch (Exception e) {
  369 + log.error("关闭HikariDataSource连接池时发生异常", e);
  370 + }
  371 + }
  372 + }
  373 +
  374 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  375 + return resultList;
  376 + }
  377 +
  378 + private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  379 + Object value;
  380 +
  381 + try {
  382 + switch (sqlType) {
  383 + case Types.BIT:
  384 + case Types.BOOLEAN:
  385 + value = rs.getBoolean(index);
  386 + return rs.wasNull() ? null : value;
  387 +
  388 + case Types.TINYINT:
  389 + case Types.SMALLINT:
  390 + case Types.INTEGER:
  391 + value = rs.getInt(index);
  392 + return rs.wasNull() ? null : value;
  393 +
  394 + case Types.BIGINT:
  395 + value = rs.getLong(index);
  396 + return rs.wasNull() ? null : value;
  397 +
  398 + case Types.FLOAT:
  399 + case Types.REAL:
  400 + value = rs.getFloat(index);
  401 + return rs.wasNull() ? null : value;
  402 +
  403 + case Types.DOUBLE:
  404 + value = rs.getDouble(index);
  405 + return rs.wasNull() ? null : value;
  406 +
  407 + case Types.NUMERIC:
  408 + case Types.DECIMAL:
  409 + value = rs.getBigDecimal(index);
  410 + return rs.wasNull() ? null : value;
  411 +
  412 + case Types.CHAR:
  413 + case Types.VARCHAR:
  414 + case Types.LONGVARCHAR:
  415 + case Types.NCHAR:
  416 + case Types.NVARCHAR:
  417 + case Types.LONGNVARCHAR:
  418 + value = rs.getString(index);
  419 + return rs.wasNull() ? null : value;
  420 +
  421 + case Types.DATE:
  422 + Date date = rs.getDate(index);
  423 + return date != null ? date.toLocalDate() : null;
  424 +
  425 + case Types.TIME:
  426 + Time time = rs.getTime(index);
  427 + return time != null ? time.toLocalTime() : null;
  428 +
  429 + case Types.TIMESTAMP:
  430 + Timestamp timestamp = rs.getTimestamp(index);
  431 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  432 +
  433 + case Types.BINARY:
  434 + case Types.VARBINARY:
  435 + case Types.LONGVARBINARY:
  436 + value = rs.getBytes(index);
  437 + return rs.wasNull() ? null : value;
  438 +
  439 + case Types.BLOB:
  440 + value = rs.getBlob(index);
  441 + return rs.wasNull() ? null : value;
  442 +
  443 + case Types.CLOB:
  444 + value = rs.getClob(index);
  445 + return rs.wasNull() ? null : value;
  446 +
  447 + default:
  448 + value = rs.getObject(index);
  449 + return rs.wasNull() ? null : value;
  450 + }
  451 + } catch (SQLException e) {
  452 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  453 + throw e;
  454 + }
  455 + }
  456 +}
@@ -52,7 +52,7 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler { @@ -52,7 +52,7 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler {
52 logStart(taskName); 52 logStart(taskName);
53 try { 53 try {
54 log.info("[{}] Simulating pushing devices...", getZoneName()); 54 log.info("[{}] Simulating pushing devices...", getZoneName());
55 - hnDeviceReportService.deviceReport(); 55 +// hnDeviceReportService.deviceReport();
56 Thread.sleep(1000); 56 Thread.sleep(1000);
57 } catch (Exception e) { 57 } catch (Exception e) {
58 logError(taskName, e); 58 logError(taskName, e);
  1 +package com.iot.scheduler.zone;
  2 +
  3 +import com.iot.scheduler.service.PjDeviceReportService;
  4 +import com.iot.scheduler.task.AbstractZoneScheduler;
  5 +import jakarta.annotation.Resource;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.scheduling.annotation.Scheduled;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +@Slf4j
  11 +@Component
  12 +public class PanJiZoneScheduler extends AbstractZoneScheduler {
  13 + @Resource
  14 + private PjDeviceReportService pjDeviceReportService;
  15 +
  16 + @Override
  17 + protected String getZoneName() {
  18 + return "PanJi (潘集经开区)";
  19 + }
  20 +
  21 + @Scheduled(cron = "${scheduler.pj.push:0 0/1 * * * ?}")
  22 + public void pushHsqDevicesToThirdParty() {
  23 + String taskName = "PJ Devices (IoT -> 3rd Party)";
  24 + logStart(taskName);
  25 + try {
  26 + log.info("[{}] PJ pushing devices...", getZoneName());
  27 + pjDeviceReportService.batchReportEnterprise();
  28 + Thread.sleep(1000);
  29 + } catch (Exception e) {
  30 + logError(taskName, e);
  31 + } finally {
  32 + logEnd(taskName);
  33 + }
  34 + }
  35 +}
@@ -35,6 +35,8 @@ scheduler: @@ -35,6 +35,8 @@ scheduler:
35 push: "0 0/10 * * * ?" 35 push: "0 0/10 * * * ?"
36 hsq: 36 hsq:
37 push: "0 0 0/1 * * ?" 37 push: "0 0 0/1 * * ?"
  38 + pj:
  39 + push: "0 0/1 * * * ?"
38 40
39 hn: 41 hn:
40 third: 42 third:
@@ -72,6 +74,28 @@ hnhsq: @@ -72,6 +74,28 @@ hnhsq:
72 from ts_kv kv 74 from ts_kv kv
73 INNER JOIN device d on d.id = kv.entity_id and kv.key=61 75 INNER JOIN device d on d.id = kv.entity_id and kv.key=61
74 where tenant_id='5b6a05a0-f020-11f0-9cb8-e3376d1e7978'" 76 where tenant_id='5b6a05a0-f020-11f0-9cb8-e3376d1e7978'"
  77 +pj:
  78 + third:
  79 + reportUrl: "http://111.39.171.168:12280/mainApi/formEngine/formData/batchAddOrUpate"
  80 + tokenUrl: "http://111.39.171.168:15555/auth/oauth/token"
  81 + username: "adminhnjt"
  82 + password: "admin@1234"
  83 + formId: "t6937bc62e4332f00072a0849"
  84 + uniqueKeys: "entName,deviceCode"
  85 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  86 + jdbcUserName: "postgres"
  87 + jdbcPassword: "postgres"
  88 + selectSql: "SELECT
  89 + tko.name AS entName,
  90 + de.name AS deviceName,
  91 + de.sn AS deviceCode,
  92 + tkl.long_v AS status
  93 + FROM
  94 + device de
  95 + LEFT JOIN tk_organization tko ON de.organization_id = tko.id
  96 + LEFT JOIN ts_kv_latest tkl ON de.id = tkl.entity_id AND tkl.key = 61
  97 + WHERE de.tenant_id = '0414df80-f01d-11f0-9cb8-e3376d1e7978'
  98 + AND de.device_profile_id = '0418b010-f01d-11f0-9cb8-e3376d1e7978';"
75 99
76 sh: 100 sh:
77 iot: 101 iot: