Commit b064769ea002caed4d88474e72914d3b693a0072

Authored by 杨鸣坤
1 parent 3abc7065

feat: 新增能耗数据同步与查询功能

@@ -2,6 +2,8 @@ package com.iot.scheduler.controller; @@ -2,6 +2,8 @@ package com.iot.scheduler.controller;
2 2
3 import com.iot.scheduler.service.DevicePullService; 3 import com.iot.scheduler.service.DevicePullService;
4 import com.iot.scheduler.service.DeviceSearchService; 4 import com.iot.scheduler.service.DeviceSearchService;
  5 +import com.iot.scheduler.service.EnergyPullService;
  6 +import com.iot.scheduler.service.EnergySearchService;
5 import jakarta.annotation.Resource; 7 import jakarta.annotation.Resource;
6 import org.springframework.web.bind.annotation.GetMapping; 8 import org.springframework.web.bind.annotation.GetMapping;
7 import org.springframework.web.bind.annotation.RequestParam; 9 import org.springframework.web.bind.annotation.RequestParam;
@@ -16,6 +18,10 @@ public class HealthController { @@ -16,6 +18,10 @@ public class HealthController {
16 private DevicePullService devicePullService; 18 private DevicePullService devicePullService;
17 @Resource 19 @Resource
18 private DeviceSearchService deviceSearchService; 20 private DeviceSearchService deviceSearchService;
  21 + @Resource
  22 + private EnergySearchService energySearchService;
  23 + @Resource
  24 + private EnergyPullService energyPullService;
19 25
20 @GetMapping("/health") 26 @GetMapping("/health")
21 public String health() throws Exception { 27 public String health() throws Exception {
@@ -110,4 +116,31 @@ public class HealthController { @@ -110,4 +116,31 @@ public class HealthController {
110 @RequestParam String endDate) { 116 @RequestParam String endDate) {
111 return deviceSearchService.queryBootRate(startDate, endDate); 117 return deviceSearchService.queryBootRate(startDate, endDate);
112 } 118 }
  119 +
  120 + @GetMapping("/energy/history")
  121 + public void energyHistory() {
  122 + energyPullService.pullEnergyHistoryAndSave();
  123 + }
  124 +
  125 + /**
  126 + * 分页查询能耗设备信息
  127 + *
  128 + * @param deviceName 设备名称(模糊匹配)
  129 + * @param runStatus 状态(0:离线,1:停机,2:待机,3:运行)
  130 + * @param pageNo 页码,默认1
  131 + * @param pageSize 每页条数,默认10
  132 + */
  133 + @GetMapping("/energy/list")
  134 + public Map<String, Object> energyList(
  135 + @RequestParam(required = false) String deviceName,
  136 + @RequestParam(required = false) String runStatus,
  137 + @RequestParam(defaultValue = "1") Integer pageNo,
  138 + @RequestParam(defaultValue = "10") Integer pageSize) {
  139 + return energySearchService.queryEnergyList(deviceName, runStatus, pageNo, pageSize);
  140 + }
  141 +
  142 + @GetMapping("/energy/stats")
  143 + public Map<String, Object> energyStats() {
  144 + return energySearchService.queryEnergyStats();
  145 + }
113 } 146 }
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONArray;
  5 +import com.alibaba.fastjson.JSONObject;
  6 +import com.alibaba.fastjson.TypeReference;
  7 +import jakarta.annotation.Resource;
  8 +import lombok.extern.slf4j.Slf4j;
  9 +import org.apache.commons.lang3.StringUtils;
  10 +import org.apache.http.Consts;
  11 +import org.apache.http.HttpEntity;
  12 +import org.apache.http.client.methods.CloseableHttpResponse;
  13 +import org.apache.http.client.methods.HttpGet;
  14 +import org.apache.http.client.methods.HttpPost;
  15 +import org.apache.http.entity.ContentType;
  16 +import org.apache.http.entity.StringEntity;
  17 +import org.apache.http.impl.client.CloseableHttpClient;
  18 +import org.apache.http.impl.client.HttpClients;
  19 +import org.apache.http.util.EntityUtils;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.data.redis.core.RedisTemplate;
  22 +import org.springframework.jdbc.core.JdbcTemplate;
  23 +import org.springframework.scheduling.annotation.Scheduled;
  24 +import org.springframework.stereotype.Service;
  25 +import org.springframework.util.CollectionUtils;
  26 +import org.springframework.util.LinkedMultiValueMap;
  27 +import org.springframework.util.MultiValueMap;
  28 +import org.springframework.web.util.UriComponentsBuilder;
  29 +
  30 +import java.io.ByteArrayOutputStream;
  31 +import java.io.IOException;
  32 +import java.io.InputStream;
  33 +import java.text.SimpleDateFormat;
  34 +import java.util.*;
  35 +import java.util.concurrent.TimeUnit;
  36 +
  37 +@Slf4j
  38 +@Service
  39 +public class EnergyPullService {
  40 +
  41 + @Value("${energy.token.url}")
  42 + private String energyTokenUrl;
  43 + @Value("${energy.token.userName}")
  44 + private String energyUserName;
  45 + @Value("${energy.token.password}")
  46 + private String energyPassword;
  47 + @Value("${energy.deviceList.url}")
  48 + private String energyDeviceListUrl;
  49 + @Value("${energy.detail.url}")
  50 + private String energyDetailUrl;
  51 + @Value("${energy.db.corpCode}")
  52 + private String energyCorpCode;
  53 + @Value("${energy.db.tableName}")
  54 + private String energyTableName;
  55 + @Value("${energy.db.eqKwhTableName}")
  56 + private String eqKwhTableName;
  57 + @Value("${energy.db.eRunDtlTableName}")
  58 + private String eRunDtlTableName;
  59 + @Value("${energy.runStatus.url}")
  60 + private String energyRunStatusUrl;
  61 +
  62 + @Resource
  63 + private RedisTemplate<String, String> redisTemplate;
  64 + @Resource
  65 + private JdbcTemplate jdbcTemplate;
  66 +
  67 + /**
  68 + * 定时任务:同步能耗设备数据
  69 + */
  70 + @Scheduled(cron = "${scheduler.energy.cron:0 0/5 * * ?}")
  71 + public void pullEnergyDeviceAndSave() {
  72 + log.info("【能耗数据同步】开始同步能耗设备数据");
  73 + try {
  74 + String energyResult = getEnergyDeviceList();
  75 + if (StringUtils.isBlank(energyResult)) {
  76 + log.error("【能耗数据同步】获取能耗设备列表失败");
  77 + return;
  78 + }
  79 +
  80 + Map<String, Object> energyInfos = JSON.parseObject(energyResult, new TypeReference<>() {});
  81 + Integer code = (Integer) energyInfos.get("code");
  82 + if (code == null || code != 200) {
  83 + log.error("【能耗数据同步】获取能耗设备列表返回code不为200, code:{}, response:{}", code, energyResult);
  84 + return;
  85 + }
  86 +
  87 + JSONArray dataList = (JSONArray) energyInfos.get("data");
  88 + if (CollectionUtils.isEmpty(dataList)) {
  89 + log.warn("【能耗数据同步】能耗设备列表为空");
  90 + return;
  91 + }
  92 +
  93 + int successCount = 0;
  94 + for (Object o : dataList) {
  95 + JSONObject deviceJson = (JSONObject) o;
  96 +
  97 + String projectState = deviceJson.getString("projectState");
  98 + String projectType = deviceJson.getString("projectType");
  99 + String deviceName = deviceJson.getString("deviceName");
  100 + String dtuId = deviceJson.getString("dtuId");
  101 + String deviceId = deviceJson.getString("deviceId");
  102 + String dtuSn = deviceJson.getString("dtuSn");
  103 +
  104 + log.info("【能耗数据处理】deviceName:{}, dtuSn:{}, projectState:{}, projectType:{}",
  105 + deviceName, dtuSn, projectState, projectType);
  106 +
  107 + // 获取今日用电量
  108 + String todayEvalue = getTodayEnergyValue(dtuSn);
  109 +
  110 + // 获取能耗OEE历史数据(最后一条的duration和runStatus)
  111 + Map<String, String> runStatusMap = getEnergyRunStatus(dtuSn);
  112 + String duration = runStatusMap.get("duration");
  113 + String runStatus = runStatusMap.get("runStatus");
  114 +
  115 + saveOrUpdateEnergyDevice(projectState, projectType, deviceName, dtuId, deviceId, dtuSn,
  116 + todayEvalue, duration, runStatus);
  117 + successCount++;
  118 + }
  119 +
  120 + log.info("【能耗数据同步】同步完成,共处理 {} 条设备数据", successCount);
  121 + } catch (Exception e) {
  122 + log.error("【能耗数据同步】同步过程发生异常", e);
  123 + }
  124 + }
  125 +
  126 + /**
  127 + * 获取能耗设备列表
  128 + */
  129 + public String getEnergyDeviceList() {
  130 + String accessToken = getEnergyAccessToken();
  131 + Map<String, String> headerMap = new HashMap<>(1);
  132 + headerMap.put("Authorization", "Bearer " + accessToken);
  133 +
  134 + Map<String, String> paramsMap = new HashMap<>();
  135 + paramsMap.put("groupName", "SHC");
  136 +
  137 + // 第一次请求
  138 + String result = sendRequestGet(energyDeviceListUrl, paramsMap, headerMap);
  139 + if (StringUtils.isBlank(result)) {
  140 + return null;
  141 + }
  142 +
  143 + Map<String, Object> resultMap = JSON.parseObject(result, new TypeReference<>() {});
  144 + Integer resultCode = (Integer) resultMap.get("code");
  145 +
  146 + // Token失效,重试一次
  147 + if (resultCode == null || resultCode != 200) {
  148 + log.warn("【能耗设备列表】第一次请求失败,重新获取Token重试");
  149 + accessToken = getEnergyAccessToken();
  150 + if (StringUtils.isEmpty(accessToken)) {
  151 + return null;
  152 + }
  153 + headerMap.put("Authorization", "Bearer " + accessToken);
  154 +
  155 + result = sendRequestGet(energyDeviceListUrl, paramsMap, headerMap);
  156 + if (StringUtils.isBlank(result)) {
  157 + return null;
  158 + }
  159 +
  160 + resultMap = JSON.parseObject(result, new TypeReference<>() {});
  161 + resultCode = (Integer) resultMap.get("code");
  162 + if (resultCode == null || resultCode != 200) {
  163 + log.error("【能耗设备列表】重试后仍然失败, code:{}", resultCode);
  164 + return null;
  165 + }
  166 + }
  167 +
  168 + return result;
  169 + }
  170 +
  171 + /**
  172 + * 获取能耗Token
  173 + */
  174 + private String getEnergyAccessToken() {
  175 + String redisKey = "hnyssl_energy_token";
  176 +
  177 + // 检查Redis中是否有有效Token
  178 + String cachedToken = redisTemplate.opsForValue().get(redisKey);
  179 + if (StringUtils.isNotBlank(cachedToken) && redisTemplate.getExpire(redisKey) > 0) {
  180 + return cachedToken;
  181 + }
  182 +
  183 + // 请求新Token
  184 + Map<String, String> param = new HashMap<>(2);
  185 + param.put("username", energyUserName);
  186 + param.put("password", energyPassword);
  187 +
  188 + HttpPost httpPost = new HttpPost(energyTokenUrl);
  189 + String result = sendPost(httpPost, JSON.toJSONString(param));
  190 + if (StringUtils.isBlank(result)) {
  191 + log.error("【能耗Token】请求Token失败,响应为空");
  192 + return "";
  193 + }
  194 +
  195 + Map<String, Object> res = JSON.parseObject(result, new TypeReference<>() {});
  196 + Integer code = (Integer) res.get("code");
  197 + if (code == null || code != 200) {
  198 + log.error("【能耗Token】请求Token返回异常, code:{}", code);
  199 + return "";
  200 + }
  201 +
  202 + JSONObject data = (JSONObject) res.get("data");
  203 + String token = data.getString("token");
  204 + if (StringUtils.isBlank(token)) {
  205 + log.error("【能耗Token】响应中没有token");
  206 + return "";
  207 + }
  208 +
  209 + // 缓存Token,有效期1小时(Token实际有效期2小时)
  210 + redisTemplate.opsForValue().set(redisKey, token, 3600, TimeUnit.SECONDS);
  211 + log.info("【能耗Token】获取Token成功并缓存");
  212 +
  213 + return token;
  214 + }
  215 +
  216 + /**
  217 + * 保存或更新能耗设备数据(含用电量evalue、duration、runStatus)
  218 + */
  219 + private void saveOrUpdateEnergyDevice(String projectState, String projectType, String deviceName,
  220 + String dtuId, String deviceId, String dtuSn,
  221 + String evalue, String duration, String runStatus) {
  222 + // 查询是否已存在(按公司+dtuSn判断)
  223 + List<Map<String, Object>> existList = jdbcTemplate.queryForList(
  224 + "SELECT id FROM " + energyTableName + " WHERE corp_code = ? AND dtuSn = ?", energyCorpCode, dtuSn);
  225 +
  226 + Date now = new Date();
  227 + if (!existList.isEmpty()) {
  228 + // 更新
  229 + jdbcTemplate.update(
  230 + "UPDATE " + energyTableName +
  231 + " SET projectState = ?, projectType = ?, deviceName = ?, dtuId = ?, deviceId = ?, evalue = ?, duration = ?, runStatus = ?, updated_at = ?" +
  232 + " WHERE corp_code = ? AND dtuSn = ?",
  233 + projectState, projectType, deviceName, dtuId, deviceId, evalue, duration, runStatus, now, energyCorpCode, dtuSn);
  234 + log.debug("【能耗数据】更新成功 - dtuSn:{}, evalue:{}, runStatus:{}", dtuSn, evalue, runStatus);
  235 + } else {
  236 + // 新增
  237 + String id = UUID.randomUUID().toString().replace("-", "");
  238 + jdbcTemplate.update(
  239 + "INSERT INTO " + energyTableName +
  240 + " (id, corp_code, created_at, created_by, updated_at, updated_by, deviceName, projectType, projectState, dtuSn, dtuId, deviceId, evalue, duration, runStatus)" +
  241 + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
  242 + id, energyCorpCode, now, "system", now, "system",
  243 + deviceName, projectType, projectState, dtuSn, dtuId, deviceId, evalue, duration, runStatus);
  244 + log.debug("【能耗数据】新增成功 - dtuSn:{}, evalue:{}", dtuSn, evalue);
  245 + }
  246 + }
  247 +
  248 + /**
  249 + * 获取能耗OEE历史数据
  250 + * 调用 /api/energy/runStatus?dtuSn=xxx&date=today
  251 + * @return Map包含 lastDuration(最后一条的duration)和 lastRunStatus(最后一条的runStatus),失败返回空Map
  252 + */
  253 + private Map<String, String> getEnergyRunStatus(String dtuSn) {
  254 + Map<String, String> result = new HashMap<>();
  255 + result.put("duration", null);
  256 + result.put("runStatus", null);
  257 + try {
  258 + String accessToken = getEnergyAccessToken();
  259 + Map<String, String> headerMap = new HashMap<>(1);
  260 + headerMap.put("Authorization", "Bearer " + accessToken);
  261 +
  262 + Map<String, String> paramsMap = new HashMap<>();
  263 + paramsMap.put("dtuSn", dtuSn);
  264 + paramsMap.put("date", new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  265 +
  266 + String response = sendRequestGet(energyRunStatusUrl, paramsMap, headerMap);
  267 + if (StringUtils.isBlank(response)) {
  268 + return result;
  269 + }
  270 +
  271 + Map<String, Object> resultMap = JSON.parseObject(response, new TypeReference<>() {});
  272 + Integer resultCode = (Integer) resultMap.get("code");
  273 + if (resultCode == null || resultCode != 200) {
  274 + // Token失效重试
  275 + accessToken = getEnergyAccessToken();
  276 + if (StringUtils.isEmpty(accessToken)) {
  277 + return result;
  278 + }
  279 + headerMap.put("Authorization", "Bearer " + accessToken);
  280 + response = sendRequestGet(energyRunStatusUrl, paramsMap, headerMap);
  281 + if (StringUtils.isBlank(response)) {
  282 + return result;
  283 + }
  284 + resultMap = JSON.parseObject(response, new TypeReference<>() {});
  285 + resultCode = (Integer) resultMap.get("code");
  286 + if (resultCode == null || resultCode != 200) {
  287 + return result;
  288 + }
  289 + }
  290 +
  291 + JSONArray dataList = (JSONArray) resultMap.get("data");
  292 + // 完整data保存到 t_auto_ymk_iot_e_run_dtl 表
  293 + String description = dataList != null ? dataList.toJSONString() : "[]";
  294 + saveOrUpdateERunDtl(dtuSn, description, new SimpleDateFormat("yyyy-MM-dd 00:00:00").format(new Date()));
  295 +
  296 + // 取最后一条数据的 duration 和 runStatus
  297 + if (!CollectionUtils.isEmpty(dataList)) {
  298 + JSONObject lastItem = (JSONObject) dataList.get(dataList.size() - 1);
  299 + Long durVal = lastItem.getLong("duration");
  300 + Integer statusVal = lastItem.getInteger("runStatus");
  301 + result.put("duration", durVal != null ? String.valueOf(durVal) : null);
  302 + result.put("runStatus", statusVal != null ? String.valueOf(statusVal) : null);
  303 + log.info("【能耗OEE】dtuSn:{}, 最后一条 - duration:{}, runStatus:{}",
  304 + dtuSn, result.get("duration"), result.get("runStatus"));
  305 + }
  306 + } catch (Exception e) {
  307 + log.error("【能耗OEE】获取异常 - dtuSn:{}", dtuSn, e);
  308 + }
  309 + return result;
  310 + }
  311 +
  312 + /**
  313 + * 保存或更新能耗OEE运行状态明细到 t_auto_ymk_iot_e_run_dtl 表
  314 + * 超过60000字符拆分存入runStatus2
  315 + */
  316 + private void saveOrUpdateERunDtl(String dtuSn, String description, String useDate) {
  317 + int maxLen = 60000;
  318 + String rs1 = "";
  319 + String rs2 = "";
  320 +
  321 + if (description.length() > maxLen) {
  322 + rs1 = description.substring(0, maxLen);
  323 + rs2 = description.substring(maxLen);
  324 + } else {
  325 + rs1 = description;
  326 + }
  327 +
  328 + List<Map<String, Object>> existList = jdbcTemplate.queryForList(
  329 + "SELECT id FROM " + eRunDtlTableName + " WHERE corp_code = ? AND dtuSn = ? AND use_date = ?",
  330 + energyCorpCode, dtuSn, useDate);
  331 +
  332 + Date now = new Date();
  333 + if (!existList.isEmpty()) {
  334 + jdbcTemplate.update(
  335 + "UPDATE " + eRunDtlTableName + " SET runStatus1=?, runStatus2=?, updated_at=? WHERE corp_code=? AND dtuSn=? AND use_date=?",
  336 + rs1, rs2, now, energyCorpCode, dtuSn, useDate);
  337 + } else {
  338 + String id = UUID.randomUUID().toString().replace("-", "");
  339 + jdbcTemplate.update(
  340 + "INSERT INTO " + eRunDtlTableName +
  341 + " (id,corp_code,created_at,created_by,updated_at,updated_by,dtuSn,use_date,runStatus1,runStatus2)" +
  342 + " VALUES (?,?,?,?,?,?,?,?,?,?)",
  343 + id, energyCorpCode, now, "system", now, "system", dtuSn, useDate, rs1, rs2);
  344 + }
  345 + }
  346 +
  347 + // ==================== 能耗历史数据全量/增量同步 ====================
  348 +
  349 + /**
  350 + * 首次全量同步:本年1月1日 ~ 昨天(手动触发一次)
  351 + */
  352 + public void pullEnergyHistoryAndSave() {
  353 + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
  354 + Calendar cal = Calendar.getInstance();
  355 + cal.add(Calendar.DAY_OF_MONTH, -1); // 昨天
  356 + String endDate = sdf.format(cal.getTime());
  357 + cal.set(cal.get(Calendar.YEAR), Calendar.JANUARY, 1);
  358 + String startDate = sdf.format(cal.getTime());
  359 +
  360 + log.info("【能耗历史-全量】开始同步,日期范围: {} ~ {}", startDate, endDate);
  361 + doSyncEnergyHistory(startDate, endDate);
  362 + }
  363 +//
  364 +// /**
  365 +// * 每日增量同步:仅同步昨天的数据(定时任务自动触发)
  366 +// */
  367 +// @Scheduled(cron = "${scheduler.energyHistory.cron:0 40 2 * * ?}")
  368 +// public void pullEnergyHistoryDaily() {
  369 +// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
  370 +// Calendar cal = Calendar.getInstance();
  371 +// cal.add(Calendar.DAY_OF_MONTH, -1);
  372 +// String yesterday = sdf.format(cal.getTime());
  373 +//
  374 +// log.info("【能耗历史-每日增量】同步,日期: {}", yesterday);
  375 +// doSyncEnergyHistory(yesterday, yesterday);
  376 +// }
  377 +
  378 + /**
  379 + * 核心同步逻辑:获取能耗设备列表 → 每个设备每天调用用电量+OEE接口 → 写入 eq_kwh / e_run_dtl 表
  380 + */
  381 + private void doSyncEnergyHistory(String startDate, String endDate) {
  382 + String energyResult = getEnergyDeviceList();
  383 + if (StringUtils.isBlank(energyResult)) {
  384 + log.error("【能耗历史】获取设备列表失败");
  385 + return;
  386 + }
  387 +
  388 + Map<String, Object> energyInfos = JSON.parseObject(energyResult, new TypeReference<>() {});
  389 + Integer code = (Integer) energyInfos.get("code");
  390 + if (code == null || code != 200) {
  391 + log.error("【能耗历史】获取设备列表返回code异常, code:{}", code);
  392 + return;
  393 + }
  394 +
  395 + JSONArray deviceList = (JSONArray) energyInfos.get("data");
  396 + if (CollectionUtils.isEmpty(deviceList)) {
  397 + log.warn("【能耗历史】设备列表为空");
  398 + return;
  399 + }
  400 +
  401 + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
  402 + List<String> dateList = new ArrayList<>();
  403 + try {
  404 + Date start = sdf.parse(startDate);
  405 + Date end = sdf.parse(endDate);
  406 + Calendar cur = Calendar.getInstance();
  407 + cur.setTime(start);
  408 + while (!cur.getTime().after(end)) {
  409 + dateList.add(sdf.format(cur.getTime()));
  410 + cur.add(Calendar.DAY_OF_MONTH, 1);
  411 + }
  412 + } catch (Exception e) {
  413 + log.error("【能耗历史】日期解析失败: {} ~ {}", startDate, endDate, e);
  414 + return;
  415 + }
  416 +
  417 + log.info("【能耗历史】开始同步,设备数:{}, 天数:{}, 总计 {} 条记录待处理",
  418 + deviceList.size(), dateList.size(), deviceList.size() * dateList.size());
  419 +
  420 + int totalSaved = 0;
  421 + for (Object o : deviceList) {
  422 + JSONObject deviceJson = (JSONObject) o;
  423 + String dtuSn = deviceJson.getString("dtuSn");
  424 +
  425 + for (String dateStr : dateList) {
  426 + try {
  427 + // 1. 同步当日用电量数据到 t_auto_ymk_iot_eq_kwh
  428 + syncEnergyValueForDate(dtuSn, dateStr);
  429 +
  430 + // 2. 同步当日OEE时序数据到 t_auto_ymk_iot_e_run_dtl
  431 + syncRunStatusForDate(dtuSn, dateStr);
  432 +
  433 + totalSaved++;
  434 + } catch (Exception e) {
  435 + log.error("【能耗历史】处理异常 - dtuSn:{}, date:{}", dtuSn, dateStr, e);
  436 + }
  437 + }
  438 + }
  439 +
  440 + log.info("【能耗历史】同步完成,共保存 {} 条记录", totalSaved);
  441 + }
  442 +
  443 + /**
  444 + * 同步指定日期的设备用电量数据到 t_auto_ymk_iot_eq_kwh 表
  445 + */
  446 + private void syncEnergyValueForDate(String dtuSn, String dateStr) {
  447 + String accessToken = getEnergyAccessToken();
  448 + Map<String, String> headerMap = new HashMap<>(1);
  449 + headerMap.put("Authorization", "Bearer " + accessToken);
  450 +
  451 + Map<String, String> paramsMap = new HashMap<>();
  452 + paramsMap.put("dtuSn", dtuSn);
  453 + paramsMap.put("startDate", dateStr);
  454 + paramsMap.put("type", "0");
  455 +
  456 + String result = sendRequestGet(energyDetailUrl, paramsMap, headerMap);
  457 + if (StringUtils.isBlank(result)) {
  458 + return;
  459 + }
  460 +
  461 + Map<String, Object> resultMap = JSON.parseObject(result, new TypeReference<>() {});
  462 + Integer resultCode = (Integer) resultMap.get("code");
  463 + if (resultCode == null || resultCode != 200) {
  464 + accessToken = getEnergyAccessToken();
  465 + if (StringUtils.isEmpty(accessToken)) {
  466 + return;
  467 + }
  468 + headerMap.put("Authorization", "Bearer " + accessToken);
  469 + result = sendRequestGet(energyDetailUrl, paramsMap, headerMap);
  470 + if (StringUtils.isBlank(result)) {
  471 + return;
  472 + }
  473 + resultMap = JSON.parseObject(result, new TypeReference<>() {});
  474 + resultCode = (Integer) resultMap.get("code");
  475 + if (resultCode == null || resultCode != 200) {
  476 + return;
  477 + }
  478 + }
  479 +
  480 + JSONArray dataList = (JSONArray) resultMap.get("data");
  481 + String description = dataList != null ? dataList.toJSONString() : "[]";
  482 + saveOrUpdateEqKwh(dtuSn, description, dateStr + " 00:00:00");
  483 + }
  484 +
  485 + /**
  486 + * 同步指定日期的设备OEE时序数据到 t_auto_ymk_iot_e_run_dtl 表
  487 + */
  488 + private void syncRunStatusForDate(String dtuSn, String dateStr) {
  489 + String accessToken = getEnergyAccessToken();
  490 + Map<String, String> headerMap = new HashMap<>(1);
  491 + headerMap.put("Authorization", "Bearer " + accessToken);
  492 +
  493 + Map<String, String> paramsMap = new HashMap<>();
  494 + paramsMap.put("dtuSn", dtuSn);
  495 + paramsMap.put("date", dateStr);
  496 +
  497 + String response = sendRequestGet(energyRunStatusUrl, paramsMap, headerMap);
  498 + if (StringUtils.isBlank(response)) {
  499 + return;
  500 + }
  501 +
  502 + Map<String, Object> resultMap = JSON.parseObject(response, new TypeReference<>() {});
  503 + Integer resultCode = (Integer) resultMap.get("code");
  504 + if (resultCode == null || resultCode != 200) {
  505 + accessToken = getEnergyAccessToken();
  506 + if (StringUtils.isEmpty(accessToken)) {
  507 + return;
  508 + }
  509 + headerMap.put("Authorization", "Bearer " + accessToken);
  510 + response = sendRequestGet(energyRunStatusUrl, paramsMap, headerMap);
  511 + if (StringUtils.isBlank(response)) {
  512 + return;
  513 + }
  514 + resultMap = JSON.parseObject(response, new TypeReference<>() {});
  515 + resultCode = (Integer) resultMap.get("code");
  516 + if (resultCode == null || resultCode != 200) {
  517 + return;
  518 + }
  519 + }
  520 +
  521 + JSONArray dataList = (JSONArray) resultMap.get("data");
  522 + String description = dataList != null ? dataList.toJSONString() : "[]";
  523 + saveOrUpdateERunDtl(dtuSn, description, dateStr + " 00:00:00");
  524 + }
  525 +
  526 + /**
  527 + * 获取今日设备用电量 (type=0 按小时查询)
  528 + * 同时将完整响应data保存到 t_auto_ymk_iot_eq_kwh 表
  529 + * @return 今日用电量value,失败返回null
  530 + */
  531 + private String getTodayEnergyValue(String dtuSn) {
  532 + try {
  533 + String accessToken = getEnergyAccessToken();
  534 + Map<String, String> headerMap = new HashMap<>(1);
  535 + headerMap.put("Authorization", "Bearer " + accessToken);
  536 +
  537 + Map<String, String> paramsMap = new HashMap<>();
  538 + paramsMap.put("dtuSn", dtuSn);
  539 + paramsMap.put("startDate", new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  540 + paramsMap.put("type", "0");
  541 +
  542 + String result = sendRequestGet(energyDetailUrl, paramsMap, headerMap);
  543 + if (StringUtils.isBlank(result)) {
  544 + return null;
  545 + }
  546 +
  547 + Map<String, Object> resultMap = JSON.parseObject(result, new TypeReference<>() {});
  548 + Integer resultCode = (Integer) resultMap.get("code");
  549 + if (resultCode == null || resultCode != 200) {
  550 + accessToken = getEnergyAccessToken();
  551 + if (StringUtils.isEmpty(accessToken)) {
  552 + return null;
  553 + }
  554 + headerMap.put("Authorization", "Bearer " + accessToken);
  555 + result = sendRequestGet(energyDetailUrl, paramsMap, headerMap);
  556 + if (StringUtils.isBlank(result)) {
  557 + return null;
  558 + }
  559 + resultMap = JSON.parseObject(result, new TypeReference<>() {});
  560 + resultCode = (Integer) resultMap.get("code");
  561 + if (resultCode == null || resultCode != 200) {
  562 + return null;
  563 + }
  564 + }
  565 +
  566 + JSONArray dataList = (JSONArray) resultMap.get("data");
  567 + // 将完整data保存到eq_kwh表
  568 + String description = dataList != null ? dataList.toJSONString() : "[]";
  569 + saveOrUpdateEqKwh(dtuSn, description, new SimpleDateFormat("yyyy-MM-dd 00:00:00").format(new Date()));
  570 +
  571 + // 累加所有小时的value作为今日总用电量
  572 + double totalValue = 0;
  573 + if (!CollectionUtils.isEmpty(dataList)) {
  574 + for (Object o : dataList) {
  575 + JSONObject item = (JSONObject) o;
  576 + totalValue += item.getDoubleValue("value");
  577 + }
  578 + }
  579 +
  580 + String evalueStr = String.valueOf(totalValue);
  581 + log.info("【能耗用电量】dtuSn:{}, 今日用电量:{}", dtuSn, evalueStr);
  582 + return evalueStr;
  583 +
  584 + } catch (Exception e) {
  585 + log.error("【能耗用电量】获取异常 - dtuSn:{}", dtuSn, e);
  586 + return null;
  587 + }
  588 + }
  589 +
  590 + /**
  591 + * 保存或更新用电量数据到 t_auto_ymk_iot_eq_kwh 表
  592 + */
  593 + private void saveOrUpdateEqKwh(String dtuSn, String description, String useDate) {
  594 + List<Map<String, Object>> existList = jdbcTemplate.queryForList(
  595 + "SELECT id FROM " + eqKwhTableName + " WHERE corp_code = ? AND dtuSn = ? AND use_date = ?",
  596 + energyCorpCode, dtuSn, useDate);
  597 +
  598 + Date now = new Date();
  599 + if (!existList.isEmpty()) {
  600 + jdbcTemplate.update(
  601 + "UPDATE " + eqKwhTableName + " SET description = ?, updated_at = ? WHERE corp_code = ? AND dtuSn = ? AND use_date = ?",
  602 + description, now, energyCorpCode, dtuSn, useDate);
  603 + } else {
  604 + String id = UUID.randomUUID().toString().replace("-", "");
  605 + jdbcTemplate.update(
  606 + "INSERT INTO " + eqKwhTableName +
  607 + " (id, corp_code, created_at, created_by, updated_at, updated_by, dtuSn, use_date, description)" +
  608 + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
  609 + id, energyCorpCode, now, "system", now, "system", dtuSn, useDate, description);
  610 + }
  611 + }
  612 +
  613 + /**
  614 + * 发送GET请求
  615 + */
  616 + public static String sendRequestGet(String url, Map<String, String> params, Map<String, String> header) {
  617 + CloseableHttpClient httpclient = HttpClients.createDefault();
  618 + url = builderUrl(url, params);
  619 + String content = "";
  620 + HttpGet httpget = new HttpGet(url);
  621 + if (!CollectionUtils.isEmpty(header)) {
  622 + for (Map.Entry<String, String> entry : header.entrySet()) {
  623 + httpget.setHeader(entry.getKey(), entry.getValue());
  624 + }
  625 + }
  626 +
  627 + try (CloseableHttpResponse response = httpclient.execute(httpget)) {
  628 + if (response.getStatusLine().getStatusCode() == 200) {
  629 + content = EntityUtils.toString(response.getEntity(), "UTF-8");
  630 + }
  631 + } catch (IOException e) {
  632 + log.error("sendRequest---GET Error!", e);
  633 + }
  634 + return content;
  635 + }
  636 +
  637 + /**
  638 + * 构建带参数的URL
  639 + */
  640 + private static String builderUrl(String url, Map<String, String> params) {
  641 + UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(url);
  642 + if (!CollectionUtils.isEmpty(params)) {
  643 + MultiValueMap<String, String> paramsValue = new LinkedMultiValueMap<>();
  644 + for (Map.Entry<String, String> entry : params.entrySet()) {
  645 + paramsValue.add(entry.getKey(), entry.getValue());
  646 + }
  647 + uriBuilder = uriBuilder.queryParams(paramsValue);
  648 + }
  649 + return uriBuilder.toUriString();
  650 + }
  651 +
  652 + /**
  653 + * 发送POST请求
  654 + */
  655 + private String sendPost(HttpPost httpPost, String jsonData) {
  656 + CloseableHttpClient httpClient = HttpClients.createDefault();
  657 + StringEntity entity = new StringEntity(jsonData, ContentType.create("application/json", Consts.UTF_8));
  658 + httpPost.setEntity(entity);
  659 + String result = null;
  660 + try {
  661 + CloseableHttpResponse execute = httpClient.execute(httpPost);
  662 + HttpEntity res = execute.getEntity();
  663 + InputStream is = res.getContent();
  664 + int len;
  665 + byte[] buf = new byte[128];
  666 + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  667 + while ((len = is.read(buf)) != -1) {
  668 + byteArrayOutputStream.write(buf, 0, len);
  669 + }
  670 + result = byteArrayOutputStream.toString();
  671 + } catch (IOException e) {
  672 + log.error("sendPost error!", e);
  673 + }
  674 + return result;
  675 + }
  676 +}
  1 +package com.iot.scheduler.service;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.beans.factory.annotation.Value;
  5 +import org.springframework.jdbc.core.JdbcTemplate;
  6 +import org.springframework.stereotype.Service;
  7 +import org.springframework.util.StringUtils;
  8 +
  9 +import jakarta.annotation.Resource;
  10 +
  11 +import java.util.*;
  12 +
  13 +@Slf4j
  14 +@Service
  15 +public class EnergySearchService {
  16 +
  17 + @Value("${energy.db.corpCode}")
  18 + private String energyCorpCode;
  19 + @Value("${energy.db.tableName}")
  20 + private String energyTableName;
  21 +
  22 + @Resource
  23 + private JdbcTemplate jdbcTemplate;
  24 +
  25 + /**
  26 + * 分页查询能耗设备信息
  27 + * 查询表 t_auto_ymk_iot_energy,按设备名称排序
  28 + *
  29 + * @param deviceName 设备名称(模糊匹配)
  30 + * @param runStatus 状态(0:离线,1:停机,2:待机,3:运行)
  31 + * @param pageNo 页码,默认1
  32 + * @param pageSize 每页条数,默认10
  33 + */
  34 + public Map<String, Object> queryEnergyList(String deviceName, String runStatus,
  35 + Integer pageNo, Integer pageSize) {
  36 + StringBuilder countSql = new StringBuilder("SELECT COUNT(*) FROM " + energyTableName + " WHERE corp_code = ?");
  37 + StringBuilder querySql = new StringBuilder(
  38 + "SELECT id, deviceName, projectType, projectState, dtuSn, dtuId, deviceId, " +
  39 + "evalue, duration, runStatus, created_at, updated_at " +
  40 + "FROM " + energyTableName + " WHERE corp_code = ?");
  41 + List<Object> params = new ArrayList<>();
  42 + params.add(energyCorpCode);
  43 +
  44 + // 设备名称模糊查询
  45 + if (StringUtils.hasText(deviceName)) {
  46 + countSql.append(" AND deviceName LIKE ?");
  47 + querySql.append(" AND deviceName LIKE ?");
  48 + params.add("%" + deviceName + "%");
  49 + }
  50 +
  51 + // 设备状态精确匹配
  52 + if (StringUtils.hasText(runStatus)) {
  53 + countSql.append(" AND runStatus = ?");
  54 + querySql.append(" AND runStatus = ?");
  55 + params.add(runStatus);
  56 + }
  57 +
  58 + Long total = jdbcTemplate.queryForObject(countSql.toString(), Long.class, params.toArray());
  59 + int offset = (pageNo - 1) * pageSize;
  60 +
  61 + querySql.append(" ORDER BY deviceName ASC LIMIT ?, ?");
  62 + params.add(offset);
  63 + params.add(pageSize);
  64 +
  65 + List<Map<String, Object>> list = jdbcTemplate.queryForList(querySql.toString(), params.toArray());
  66 + list.forEach(row -> {
  67 + if (row.get("duration") != null) {
  68 + long seconds = Long.parseLong(String.valueOf(row.get("duration")));
  69 + long h = seconds / 3600;
  70 + long m = (seconds % 3600) / 60;
  71 + long s = seconds % 60;
  72 + StringBuilder sb = new StringBuilder();
  73 + if (h > 0) sb.append(h).append("时");
  74 + if (h > 0 || m > 0) sb.append(m).append("分");
  75 + sb.append(s).append("秒");
  76 + row.put("duration", sb.toString());
  77 + }
  78 + });
  79 +
  80 + return Map.of(
  81 + "code", 200,
  82 + "msg", "请求成功",
  83 + "total", total != null ? total : 0,
  84 + "pageNo", pageNo,
  85 + "pageSize", pageSize,
  86 + "list", list
  87 + );
  88 + }
  89 +
  90 + /**
  91 + * 统计能耗设备各runStatus数量及总数量
  92 + * runStatus: 0-离线, 1-停机, 2-待机, 3-运行
  93 + */
  94 + public Map<String, Object> queryEnergyStats() {
  95 + String sql = "SELECT runStatus, COUNT(*) AS cnt FROM " + energyTableName
  96 + + " WHERE corp_code = ? GROUP BY runStatus";
  97 + List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql, energyCorpCode);
  98 +
  99 + int total = 0;
  100 + Map<String, Integer> statusMap = new LinkedHashMap<>();
  101 + statusMap.put("0", 0);
  102 + statusMap.put("1", 0);
  103 + statusMap.put("2", 0);
  104 + statusMap.put("3", 0);
  105 +
  106 + for (Map<String, Object> row : rows) {
  107 + String key = String.valueOf(row.get("runStatus"));
  108 + int cnt = ((Number) row.get("cnt")).intValue();
  109 + total += cnt;
  110 + statusMap.merge(key, cnt, Integer::sum);
  111 + }
  112 +
  113 + return Map.of(
  114 + "code", 200,
  115 + "msg", "请求成功",
  116 + "total", total,
  117 + "0", statusMap.get("0"),
  118 + "1", statusMap.get("1"),
  119 + "2", statusMap.get("2"),
  120 + "3", statusMap.get("3")
  121 + );
  122 + }
  123 +}
@@ -42,6 +42,11 @@ scheduler: @@ -42,6 +42,11 @@ scheduler:
42 cron: "0 30 9 * * ?" # 每日凌晨 2:30 增量同步昨天数据 42 cron: "0 30 9 * * ?" # 每日凌晨 2:30 增量同步昨天数据
43 oee: 43 oee:
44 cron: "0 30 9 * * ?" # 每日凌晨 2:35 增量同步昨天OEE数据 44 cron: "0 30 9 * * ?" # 每日凌晨 2:35 增量同步昨天OEE数据
  45 + energy:
  46 + cron: "0 0/5 * * * ?" # 每5分钟同步一次能耗数据
  47 + energyHistory:
  48 + cron: "0 40 2 * * ?" # 每日凌晨 2:40 增量同步昨天能耗历史数据
  49 +
45 50
46 device: 51 device:
47 token: 52 token:
@@ -65,3 +70,22 @@ device: @@ -65,3 +70,22 @@ device:
65 oeeTableName: "t_auto_ymk_iot_dev_oee" 70 oeeTableName: "t_auto_ymk_iot_dev_oee"
66 lamp: 71 lamp:
67 url: "https://iotgc.cniot.vip/triColorLamp/dtuSn" 72 url: "https://iotgc.cniot.vip/triColorLamp/dtuSn"
  73 +
  74 +energy:
  75 + token:
  76 + url: "https://iotgc.cniot.vip/auth/token"
  77 + userName: "guests"
  78 + password: "Lingzhi"
  79 + userGroup:
  80 + url: "https://iotgc.cniot.vip/api/energy/userGroupDtuSns"
  81 + deviceList:
  82 + url: "https://iotgc.cniot.vip/api/energy/userGroupDtuSns"
  83 + detail:
  84 + url: "https://iotgc.cniot.vip/api/energy/dtuSnRateEnergy"
  85 + db:
  86 + corpCode: "ymk"
  87 + tableName: "t_auto_ymk_iot_energy"
  88 + eqKwhTableName: "t_auto_ymk_iot_eq_kwh"
  89 + eRunDtlTableName: "t_auto_ymk_iot_e_run_dtl"
  90 + runStatus:
  91 + url: "https://iotgc.cniot.vip/api/energy/runStatus"