Commit ee287de1cab794ac2f91429e1dd706dc52c9d47b

Authored by 杨鸣坤
1 parent d17ae9c0

池州经开区数据上报,凯盛

  1 +package com.iot.scheduler.service.chizhou;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.iot.scheduler.utils.MqttUtils;
  5 +import com.iot.scheduler.utils.SqlTypedValueUtils;
  6 +import com.zaxxer.hikari.HikariConfig;
  7 +import com.zaxxer.hikari.HikariDataSource;
  8 +import lombok.extern.slf4j.Slf4j;
  9 +import org.springframework.stereotype.Service;
  10 +
  11 +import java.sql.*;
  12 +import java.text.MessageFormat;
  13 +import java.text.SimpleDateFormat;
  14 +import java.util.*;
  15 +import java.util.Date;
  16 +
  17 +/**
  18 + * 池州经开区数据上传
  19 + */
  20 +@Slf4j
  21 +@Service
  22 +public class CzDeviceReportService {
  23 +
  24 + String broker = "112.30.143.137:5007";
  25 + String username = "admin";
  26 + String password = "ly@666yc";
  27 +
  28 + String jdbcUrl = "postgresql://192.168.0.249:5432/iot";
  29 + String jdbcUserName = "postgres";
  30 + String jdbcPassword = "1qaz@WSX";
  31 + String selectSql = " SELECT\n" +
  32 + " de.name AS 设备名称,\n" +
  33 + " CASE\n" +
  34 + " WHEN ak2.long_v IS NULL OR (ak.bool_v = FALSE AND ak2.long_v IS NOT NULL) THEN\n" +
  35 + " 'OFF'\n" +
  36 + " WHEN tkl.long_v = 1 THEN\n" +
  37 + " 'ERROR'\n" +
  38 + " WHEN tkl2.long_v = 1 THEN\n" +
  39 + " 'STAND'\n" +
  40 + " WHEN tkl3.long_v = 1 THEN\n" +
  41 + " 'RUN'\n" +
  42 + " ELSE\n" +
  43 + " 'OFF'\n" +
  44 + " END AS 设备状态,\n" +
  45 + " dc.credentials_id AS sn,\n" +
  46 + " de.organization_id\n" +
  47 + " FROM\n" +
  48 + " device de\n" +
  49 + " LEFT JOIN ts_kv_latest tkl ON de.id = tkl.entity_id\n" +
  50 + " AND tkl.KEY = '60'\n" +
  51 + " LEFT JOIN ts_kv_latest tkl2 ON de.id = tkl2.entity_id\n" +
  52 + " AND tkl2.KEY = '59'\n" +
  53 + " LEFT JOIN ts_kv_latest tkl3 ON de.id = tkl3.entity_id\n" +
  54 + " AND tkl3.KEY = '57'\n" +
  55 + " LEFT JOIN attribute_kv ak ON de.id = ak.entity_id\n" +
  56 + " AND ak.attribute_type = 2\n" +
  57 + " AND ak.attribute_key = 41\n" +
  58 + " LEFT JOIN attribute_kv ak2 ON de.id = ak2.entity_id\n" +
  59 + " AND ak2.attribute_type = 2\n" +
  60 + " AND ak2.attribute_key = 43\n" +
  61 + " LEFT JOIN device_credentials dc ON dc.device_id = de.id\n" +
  62 + " WHERE\n" +
  63 + " de.device_profile_id = '5f39ebe0-f6cf-11f0-bebc-e5e3c6471a90' UNION ALL\n" +
  64 + " SELECT\n" +
  65 + " de.name AS 设备名称,\n" +
  66 + " CASE\n" +
  67 + " WHEN ak2.long_v IS NULL\n" +
  68 + " OR (ak.bool_v = FALSE AND ak2.long_v IS NOT NULL) THEN\n" +
  69 + " 'OFF'\n" +
  70 + " ELSE\n" +
  71 + " 'RUN'\n" +
  72 + " END AS 设备状态,\n" +
  73 + " dc.credentials_id AS sn,\n" +
  74 + " de.organization_id\n" +
  75 + " FROM\n" +
  76 + " device de\n" +
  77 + " LEFT JOIN attribute_kv ak ON de.id = ak.entity_id\n" +
  78 + " AND ak.attribute_type = 2\n" +
  79 + " AND ak.attribute_key = 41\n" +
  80 + " LEFT JOIN attribute_kv ak2 ON de.id = ak2.entity_id\n" +
  81 + " AND ak2.attribute_type = 2\n" +
  82 + " AND ak2.attribute_key = 43\n" +
  83 + " LEFT JOIN device_credentials dc ON dc.device_id = de.id\n" +
  84 + " WHERE\n" +
  85 + " de.device_profile_id = 'c2401630-ffec-11f0-926f-2f3182abc65f' UNION ALL\n" +
  86 + " SELECT\n" +
  87 + " de.name AS 设备名称,\n" +
  88 + " CASE\n" +
  89 + " WHEN ak2.long_v IS NULL\n" +
  90 + " OR (ak.bool_v = FALSE AND ak2.long_v IS NOT NULL) THEN\n" +
  91 + " 'OFF'\n" +
  92 + " ELSE\n" +
  93 + " 'RUN'\n" +
  94 + " END AS 设备状态,\n" +
  95 + " dc.credentials_id AS sn,\n" +
  96 + " de.organization_id\n" +
  97 + " FROM\n" +
  98 + " device de\n" +
  99 + " LEFT JOIN attribute_kv ak ON de.id = ak.entity_id\n" +
  100 + " AND ak.attribute_type = 2\n" +
  101 + " AND ak.attribute_key = 41\n" +
  102 + " LEFT JOIN attribute_kv ak2 ON de.id = ak2.entity_id\n" +
  103 + " AND ak2.attribute_type = 2\n" +
  104 + " AND ak2.attribute_key = 43\n" +
  105 + " LEFT JOIN device_credentials dc ON dc.device_id = de.id\n" +
  106 + " WHERE\n" +
  107 + " de.device_profile_id = '4e404b10-ffe7-11f0-926f-2f3182abc65f'";
  108 +
  109 + public void deviceReport() {
  110 +// List<String> deviceIdList = Arrays.asList("TCKJ-001", "TCKJ-002", "TCKJ-003", "TCKJ-004", "TCKJ-005", "TCKJ-006",
  111 +// "TCKJ-007", "TCKJ-008", "TCKJ-009", "TCKJ-010", "TCKJ-011", "TCKJ-012", "TCKJ-013", "TCKJ-014", "TCKJ-015",
  112 +// "TCKJ-016", "TCKJ-017", "TCKJ-018", "TCKJ-019", "TCKJ-020", "TCKJ-021", "TCKJ-022", "TCKJ-023", "TCKJ-024",
  113 +// "TCKJ-025", "TCKJ-026", "TCKJ-027", "TCKJ-028", "TCKJ-029", "TCKJ-030", "TCKJ-031", "TCKJ-032", "TCKJ-033",
  114 +// "TCKJ-034", "TCKJ-035", "TCKJ-036", "TCKJ-037", "TCKJ-038", "TCKJ-039", "TCKJ-040", "TCKJ-041",
  115 +// "vJAzZWk1Q2I8cdq7kx3z", "LmiW3ljI4KP487CeMmol", "ceK7YSNLrTjVSSPaFmn3", "6KePBYyCiEgo10iJXIrF",
  116 +// "9vrxIQhbaZ542DvCVne0", "XMLN-001", "XMLN-002", "XMLN-003", "XMLN-004", "XMLN-005", "XMLN-006", "XMLN-007",
  117 +// "XMLN-008", "XMLN-009", "XMLN-010", "XMLN-011", "XMLN-012", "XMLN-013", "XMLN-014", "XMLN-015", "XMLN-016",
  118 +// "XMLN-017", "XMLN-018", "XMLN-019", "XMLN-020", "XMLN-021", "XMLN-022", "XMLN-023", "XMLN-024", "XMLN-025",
  119 +// "XMLN-026", "XMLN-027", "XMLN-028", "XMLN-029", "XMLN-030", "XMLN-031", "XMLN-032", "XMLN-033", "XMLN-034",
  120 +// "XMLN-035", "XMLN-036", "XMLN-037", "XMLN-038", "XMLN-039", "XMLN-040", "XMLN-041", "XMLN-042", "XMLN-043",
  121 +// "XMLN-044", "XMLN-045", "XMLN-046", "XMLN-047", "XMLN-048", "XMLN-049", "XMLN-050", "XMLN-051", "XMLN-052",
  122 +// "GYDPF-014", "GYDPF-006", "KBS-1-1", "KBS-1-2", "KBS-3-1", "KBS-3-5", "KBS-3-3", "CS-001", "uzGf4mjzkOPwGO4aCOf3",
  123 +// "GYDPF-001", "GYDPF-010", "KBS-1-5", "KBS-2-2", "KBS-3-7", "CS-003", "LBKXrYxLQN9W7qmmqzp5", "rba7VggnqGdNyPvVNs1n",
  124 +// "GYDPF-002", "GYDPF-016", "GYDPF-017", "KBS-1-6", "EqFMlJarb2aDwpwvUi6J", "x2173A2HKOiPqeRsKtWY", "GYDPF-005",
  125 +// "KBS-1-3", "KBS-2-1", "KBS-3-4", "1fzR6X57k2aafbbkH4po", "lbqMr5pmkNJLWEPSa0yp", "mYz1kUajPOROg0Ewcfhr",
  126 +// "6TLlfKBbTMaUZGgHtvuH", "XTE8PKwLSYp29gE1umfW", "MqYu4y0OycHQkzZhothe", "IFeN0lAXlAVQ1fD7gwk1", "hZb9NsRCqtVBSagFqnPS",
  127 +// "r22r53PNgY7TeqCK5y14", "KCGSwfTvGCtBfd0qsH63", "4KzFQCncmwFh7UT4vbHp", "GYDPF-007", "KBS-3-2", "t2kkUTZYqxmKLN4X4VPB",
  128 +// "9vWKALGoat3XalEtQaRB", "vYtwLMkppPbM3CtB3U9U", "4vwTfMqi8oEtWxdEUrE8", "vVvK2cTLvjYtZX1dksyd", "GYDPF-003",
  129 +// "KBS-3-8", "2E6eUu66qxl5Llr6ib5v", "iWbeBfwg36lIpBRMDnvO", "GYDPF-015", "KBS-2-4", "OSbA7X7FGETF0TXub4OS",
  130 +// "GYDPF-008", "KBS-2-3", "KBS-2-7", "pRhGGECbXu6FBshyg2yg", "zrVVNRo8QIbjeYjdoCUq", "fjPj3oHW7cL5m0AE1t3y",
  131 +// "8dJsgqjmxIIduP4Sr2rL", "GYDPF-012", "KBS-1-4", "JZ8Tbzye8rKotYTbHUJw", "dwToSYEWr04sPtBUxaOH", "GYDPF-009",
  132 +// "oe3OmagW5Wi9yw9LfUr3", "2mDiH94D7uSh4CtD2bi0", "ceGyDxlK7xzMoUacIwCv", "gxECiKjMEsVkYyowKIMd", "irAqF3fmzaM5GIo8ZLAx",
  133 +// "GYDPF-013", "KBS-2-5", "KBS-3-6", "dxPdbg2UqqYADPMy9TVe", "cjFUpeVPTOh4Uf6wnVH1", "VR5XgpqhFWGMcwdy8yPQ",
  134 +// "GYDPF-004", "KBS-1-7", "KBS-2-8", "HEXmiSz0sXsahRHV2bQm", "0lIIGWP49Xuo91TnTvVx", "C99WsNGFSGQSV4n3YmrW",
  135 +// "3paJSiHK32wvVM6f1tO8", "GYDPF-011", "KBS-1-8", "KBS-2-6", "yrk5Cxa6EJncWX2kTywa", "xOxHHscmrEBhLTlmyRrF",
  136 +// "e3HzLoIl0p3iEM860EdK");
  137 +
  138 + List<Object> needSyncDataList = initConnectAndSelectData();
  139 + log.info("总设备数量: {}台", needSyncDataList.size());
  140 +
  141 + Map<String, String> organizeIdAndClientIdMap = new HashMap<>(3);
  142 + organizeIdAndClientIdMap.put("63934b6f-1e02-4d29-ac14-1a64649e2231", "2020672119054331904"); // 安徽鑫米兰电子科技有限公司
  143 + organizeIdAndClientIdMap.put("35bcdb94-31ec-4750-9ee9-cc855aa66e17", "2020672015207559169"); // 安徽同池科技有限公司
  144 + organizeIdAndClientIdMap.put("1697500a-dc11-45cc-88f5-2ad47472a9bb", "2020672228886376448"); // 凯盛信息显示材料(池州)有限公司
  145 +
  146 + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  147 + String formattedDate = sdf.format(new Date());
  148 + log.info("上报时间统一为: {}", formattedDate);
  149 +
  150 + // 用于统计上报结果
  151 + int successCount = 0;
  152 + int failCount = 0;
  153 + List<String> failedDevices = new ArrayList<>();
  154 + for (int index = 0; index < needSyncDataList.size(); index++) {
  155 + Object needSyncData = needSyncDataList.get(index);
  156 + List<Object> dataList = (ArrayList) needSyncData;
  157 + String deviceId = dataList.get(2).toString();
  158 + String status = dataList.get(1).toString();
  159 + String organizeId = dataList.get(3).toString();
  160 +
  161 + // 根据index确定clientId
  162 + String clientId = organizeIdAndClientIdMap.get(organizeId);
  163 + String topic = MessageFormat.format("/{0}/{1}/properties/report", clientId, deviceId);
  164 +
  165 + // 获取设备状态
  166 + int deviceState = getDeviceState(status);
  167 +
  168 + Map<String, Object> properties = new HashMap<>(5);
  169 + properties.put("type", 1);
  170 + properties.put("state", deviceState);
  171 + properties.put("time", formattedDate);
  172 + properties.put("output", 0); // 当日产量,还未确定,暂定为0
  173 + properties.put("energy", 0); // 当日能耗,还未确定,暂定为0
  174 +
  175 + Map<String, Object> deviceMap = new HashMap<>(2);
  176 + deviceMap.put("deviceId", deviceId);
  177 + deviceMap.put("properties", properties);
  178 +
  179 + // 记录调试信息
  180 +// if (log.isDebugEnabled()) {
  181 +// log.debug("准备上报设备信息: deviceId={}, clientId={}, state={}, topic={}",
  182 +// deviceId, clientId, deviceState, topic);
  183 +// }
  184 +
  185 + try {
  186 + log.debug("开始MQTT发布: deviceId={}", deviceId);
  187 + MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(deviceMap));
  188 + successCount++;
  189 +
  190 + // 每10个设备记录一次进度
  191 +// if ((index + 1) % 10 == 0) {
  192 +// log.info("已处理 {} 个设备,当前处理: {}", index + 1, deviceId);
  193 +// }
  194 +
  195 + } catch (Exception e) {
  196 + failCount++;
  197 + failedDevices.add(deviceId);
  198 + log.error("设备上报失败: deviceId={}, clientId={}, topic={}, 错误信息: {}",
  199 + deviceId, clientId, topic, e.getMessage());
  200 + log.error("详细异常信息:", e);
  201 + }
  202 +
  203 + // 可选:添加延迟,避免发送过快
  204 + try {
  205 + Thread.sleep(10); // 10毫秒延迟
  206 + } catch (InterruptedException e) {
  207 + log.warn("线程休眠被中断", e);
  208 + Thread.currentThread().interrupt();
  209 + }
  210 + }
  211 +
  212 + // 任务完成,输出统计信息
  213 + log.info("========== 设备状态上报任务完成 ==========");
  214 + log.info("成功上报: {}台", successCount);
  215 + log.info("失败上报: {}台", failCount);
  216 +
  217 + if (failCount > 0) {
  218 + log.warn("失败设备列表: {}", failedDevices);
  219 + }
  220 +
  221 + // 任务执行耗时
  222 + log.info("========== 任务执行结束 ==========");
  223 + }
  224 +
  225 + private int getDeviceState(String status) {
  226 + int deviceState;
  227 + if ("RUN".equals(status)) {
  228 + deviceState = 1;
  229 + } else if ("STAND".equals(status)) {
  230 + deviceState = 2;
  231 + } else if ("OFF".equals(status)) {
  232 + deviceState = 3;
  233 + } else {
  234 + deviceState = 4;
  235 + }
  236 +
  237 + return deviceState;
  238 + }
  239 +
  240 + private List<Object> initConnectAndSelectData() {
  241 + Connection connection = null;
  242 + PreparedStatement statement = null;
  243 + ResultSet resultSet = null;
  244 + HikariDataSource dataSource = null;
  245 + List<Object> resultList = new ArrayList<>();
  246 +
  247 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  248 +
  249 + try {
  250 + HikariConfig config = new HikariConfig();
  251 + config.setJdbcUrl(jdbcUrl);
  252 + config.setUsername(jdbcUserName);
  253 + config.setPassword(jdbcPassword);
  254 + config.setDriverClassName("org.postgresql.Driver");
  255 + config.setMaximumPoolSize(5);
  256 + config.setMinimumIdle(5);
  257 + config.setConnectionTimeout(60000);
  258 + config.setConnectionTestQuery("SELECT 1");
  259 +
  260 + dataSource = new HikariDataSource(config);
  261 + log.info("Hikari连接池配置完成");
  262 +
  263 + connection = dataSource.getConnection();
  264 + log.info("数据库连接成功");
  265 +
  266 + statement = connection.prepareStatement(selectSql);
  267 + log.info("执行SQL查询: {}", selectSql);
  268 +
  269 + resultSet = statement.executeQuery();
  270 + ResultSetMetaData metaData = resultSet.getMetaData();
  271 + int columnCount = metaData.getColumnCount();
  272 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  273 +
  274 + int rowCount = 0;
  275 + while (resultSet.next()) {
  276 + List<Object> result = new ArrayList<>(columnCount);
  277 + for (int index = 1; index <= columnCount; index++) {
  278 + int columnType = metaData.getColumnType(index);
  279 + Object value = SqlTypedValueUtils.getTypedValue(resultSet, index, columnType);
  280 + result.add(value);
  281 + }
  282 + resultList.add(result);
  283 + rowCount++;
  284 +
  285 + // 每处理1000行记录一次日志
  286 + if (rowCount % 1000 == 0) {
  287 + log.info("已处理{}行数据", rowCount);
  288 + }
  289 + }
  290 +
  291 + log.info("数据查询完成,共获取{}行数据", rowCount);
  292 +
  293 + } catch (SQLException e) {
  294 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  295 + } catch (Exception e) {
  296 + log.error("初始化数据库连接或查询数据时发生异常", e);
  297 + } finally {
  298 + // 释放资源
  299 + try {
  300 + if (resultSet != null) resultSet.close();
  301 + if (statement != null) statement.close();
  302 + if (connection != null) connection.close();
  303 + log.info("数据库连接资源已释放");
  304 + } catch (SQLException e) {
  305 + log.error("关闭数据库资源时发生异常", e);
  306 + }
  307 +
  308 + if (dataSource != null) {
  309 + try {
  310 + dataSource.close();
  311 + log.info("HikariDataSource连接池已关闭");
  312 + } catch (Exception e) {
  313 + log.error("关闭HikariDataSource连接池时发生异常", e);
  314 + }
  315 + }
  316 + }
  317 +
  318 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  319 + return resultList;
  320 + }
  321 +}
\ No newline at end of file
... ...
1 1 package com.iot.scheduler.zone;
2 2
  3 +import com.iot.scheduler.service.chizhou.CzDeviceReportService;
3 4 import com.iot.scheduler.task.AbstractZoneScheduler;
  5 +import jakarta.annotation.Resource;
4 6 import lombok.extern.slf4j.Slf4j;
5 7 import org.springframework.scheduling.annotation.Scheduled;
6 8 import org.springframework.stereotype.Component;
... ... @@ -9,26 +11,14 @@ import org.springframework.stereotype.Component;
9 11 @Component
10 12 public class ChizhouZoneScheduler extends AbstractZoneScheduler {
11 13
  14 + @Resource
  15 + private CzDeviceReportService czDeviceReportService;
  16 +
12 17 @Override
13 18 protected String getZoneName() {
14 19 return "Chizhou (池州经开区)";
15 20 }
16 21
17   - @Scheduled(cron = "${scheduler.chizhou.pull:0 0/10 * * * ?}")
18   - public void pullDevicesFromThirdParty() {
19   - String taskName = "Pull Devices (3rd Party -> IoT)";
20   - logStart(taskName);
21   - try {
22   - // TODO: Implement actual logic
23   - log.info("[{}] Simulating pulling devices...", getZoneName());
24   - Thread.sleep(1000);
25   - } catch (Exception e) {
26   - logError(taskName, e);
27   - } finally {
28   - logEnd(taskName);
29   - }
30   - }
31   -
32 22 @Scheduled(cron = "${scheduler.chizhou.push:0 0/15 * * * ?}")
33 23 public void pushDevicesToThirdParty() {
34 24 String taskName = "Push Devices (IoT -> 3rd Party)";
... ... @@ -36,6 +26,7 @@ public class ChizhouZoneScheduler extends AbstractZoneScheduler {
36 26 try {
37 27 // TODO: Implement actual logic
38 28 log.info("[{}] Simulating pushing devices...", getZoneName());
  29 + czDeviceReportService.deviceReport();
39 30 Thread.sleep(1000);
40 31 } catch (Exception e) {
41 32 logError(taskName, e);
... ...