Commit 2af32686a5ef082a599d638044ff40e5498f205b

Authored by 胡翰林
1 parent 7f2e7d1b

潘集设备上报

... ... @@ -2,11 +2,13 @@ package com.iot.scheduler.service;
2 2
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONObject;
  5 +import com.iot.scheduler.utils.DeviceDataStorage;
5 6 import com.zaxxer.hikari.HikariConfig;
6 7 import com.zaxxer.hikari.HikariDataSource;
7 8 import jakarta.annotation.Resource;
8 9 import lombok.extern.slf4j.Slf4j;
9 10 import org.apache.commons.collections4.CollectionUtils;
  11 +import org.apache.commons.lang3.SerializationUtils;
10 12 import org.apache.commons.lang3.StringUtils;
11 13 import org.apache.http.HttpEntity;
12 14 import org.apache.http.NameValuePair;
... ... @@ -22,10 +24,15 @@ import org.springframework.beans.factory.annotation.Value;
22 24 import org.springframework.data.redis.core.RedisTemplate;
23 25 import org.springframework.stereotype.Service;
24 26
  27 +import java.io.Serializable;
25 28 import java.nio.charset.StandardCharsets;
26 29 import java.sql.*;
27 30 import java.sql.Date;
28 31 import java.text.SimpleDateFormat;
  32 +import java.time.Instant;
  33 +import java.time.LocalDateTime;
  34 +import java.time.ZoneId;
  35 +import java.time.format.DateTimeFormatter;
29 36 import java.util.*;
30 37 import java.util.concurrent.TimeUnit;
31 38
... ... @@ -79,6 +86,25 @@ public class PjDeviceReportService {
79 86 @Resource
80 87 private RedisTemplate<String, String> redisTemplate;
81 88
  89 + private static final DateTimeFormatter YYYY_MM_DD = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  90 + private static final DateTimeFormatter YYYY_MM = DateTimeFormatter.ofPattern("yyyy-MM");
  91 +
  92 + /**
  93 + * 时间戳(毫秒) -> yyyy-MM-dd
  94 + */
  95 + private static String toYyyyMmDd(long timestampMillis) {
  96 + return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault())
  97 + .format(YYYY_MM_DD);
  98 + }
  99 +
  100 + /**
  101 + * 时间戳(毫秒) -> yyyy-MM
  102 + */
  103 + private static String toYyyyMm(long timestampMillis) {
  104 + return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault())
  105 + .format(YYYY_MM);
  106 + }
  107 +
82 108 /**
83 109 * 企业设备上报
84 110 */
... ... @@ -244,16 +270,13 @@ public class PjDeviceReportService {
244 270
245 271 // 2. 准备上报数据
246 272 log.info("步骤2: 开始准备上报数据");
247   - SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
248   - String nowTime = simpleDateFormat.format(new java.util.Date());
249   - log.info("上报时间: {}", nowTime);
250 273
251 274 CloseableHttpClient httpclient = HttpClients.createDefault();
252 275 HttpPost httpPost = new HttpPost(reportUrl);
253 276
254 277 // 3. 获取并设置Token
255 278 log.info("步骤3: 获取访问令牌");
256   - String token = getToken();
  279 + String token = getTokenNoCache();
257 280 if (StringUtils.isBlank(token)) {
258 281 log.error("获取访问令牌失败,无法进行数据上报");
259 282 return;
... ... @@ -266,9 +289,10 @@ public class PjDeviceReportService {
266 289 // 4. 构建上报数据
267 290 log.info("步骤4: 构建上报数据结构");
268 291 List<Map<String, Object>> dataList = new ArrayList<>(needSyncDataList.size());
  292 + List<Map<String, Object>> saveDataList = new ArrayList<>(needSyncDataList.size());
269 293 int successCount = 0;
270 294 int errorCount = 0;
271   -
  295 + DeviceDataStorage storage = new DeviceDataStorage();
272 296 for (int i = 0; i < needSyncDataList.size(); i++) {
273 297 try {
274 298 Object needSyncData = needSyncDataList.get(i);
... ... @@ -281,14 +305,51 @@ public class PjDeviceReportService {
281 305 }
282 306
283 307 Map<String, Object> data = new HashMap<>();
  308 + String deviceCode = (String) dataObject.get(2);
  309 + String energyType = (String) dataObject.get(3);
  310 +
284 311 data.put("entName", dataObject.get(0));
285 312 data.put("deviceName", dataObject.get(1));
286   - data.put("deviceCode", dataObject.get(2));
287   - data.put("energyType", dataObject.get(3));
288   - data.put("energyValue", dataObject.get(4));
289   - data.put("timeSpan", "日");
290   - data.put("collectTime", nowTime);
  313 + data.put("deviceCode", deviceCode);
  314 + data.put("energyType", energyType);
  315 + Long ts = (Long) dataObject.get(5);
  316 + DeviceDataStorage.DeviceRecord lastRecord = storage.getLastRecord(deviceCode);
  317 + if (lastRecord == null) {
  318 + log.warn("第 {} 条数据,设备:{} 未找到上一期上报数据", i + 1, deviceCode);
  319 + continue;
  320 + }
  321 + if (lastRecord.getTimestamp() >= ts) {
  322 + log.warn("第 {} 条数据,设备:{} 数据未更新无需上报", i + 1, deviceCode);
  323 + continue;
  324 + }
  325 + Double energyValue = toDouble(dataObject.get(4));
  326 +
  327 + if ("电".equals(energyType) || "天然气".equals(energyType)) {
  328 + data.put("energyValue", energyValue);
  329 + data.put("timeSpan", "日");
  330 + data.put("collectTime", toYyyyMmDd(ts));
  331 + } else if ("水".equals(energyType)) {
  332 + Double reportValue = energyValue - lastRecord.getValue();
  333 + if (reportValue <= 0) {
  334 + log.warn("第 {} 条数据,设备:{} 水表抄表记录异常", i + 1, deviceCode);
  335 + }
  336 + data.put("energyValue", reportValue);
  337 + data.put("timeSpan", "月");
  338 + data.put("collectTime", toYyyyMm(ts));
  339 + } else {
  340 + log.warn("第 {} 条数据,设备:{} 物模型数据错误!", i + 1, deviceCode);
  341 + continue;
  342 + }
  343 +
291 344 dataList.add(data);
  345 + Map<String, Object> saveData = (Map<String, Object>) SerializationUtils.clone((Serializable) data);
  346 + saveData.put("ts",ts);
  347 + saveData.put("orignValue",energyValue);
  348 + saveDataList.add(saveData);
  349 +
  350 + //处理历史数据放开
  351 +// saveReportData(Collections.singletonList(saveData));
  352 +
292 353 successCount++;
293 354
294 355 // 每100条记录输出一次进度
... ... @@ -336,6 +397,8 @@ public class PjDeviceReportService {
336 397 JSONObject jsonResult = JSON.parseObject(result);
337 398 if (jsonResult != null && "true".equalsIgnoreCase(jsonResult.getString("success"))) {
338 399 log.info("数据上报成功!共上报 {} 条设备数据", dataList.size());
  400 + //记录上报数据
  401 + saveReportData(saveDataList);
339 402 } else {
340 403 log.error("数据上报失败!响应状态异常: {}", result);
341 404 }
... ... @@ -353,6 +416,19 @@ public class PjDeviceReportService {
353 416 }
354 417 }
355 418
  419 + private void saveReportData(List<Map<String, Object>> dataList) {
  420 + if (CollectionUtils.isEmpty(dataList)) {
  421 + return;
  422 + }
  423 + DeviceDataStorage storage = new DeviceDataStorage();
  424 + for (Map<String, Object> data : dataList) {
  425 + String deviceCode = (String) data.get("deviceCode");
  426 + Long ts = (Long) data.get("ts");
  427 + Double orignValue = (Double) data.get("orignValue");
  428 + storage.updateData(deviceCode, ts, orignValue);
  429 + }
  430 + }
  431 +
356 432 /**
357 433 * 获取token
358 434 *
... ... @@ -432,6 +508,70 @@ public class PjDeviceReportService {
432 508 }
433 509 }
434 510
  511 + /**
  512 + * 获取token
  513 + *
  514 + * @return
  515 + */
  516 + private String getTokenNoCache() {
  517 + log.info("开始获取访问令牌");
  518 + long startTime = System.currentTimeMillis();
  519 + String key = "pjjkq_report_token";
  520 +
  521 + try {
  522 + String token;
  523 + // 2. 请求新的token
  524 + CloseableHttpClient httpclient = HttpClients.createDefault();
  525 + HttpPost httpPost = new HttpPost(tokenUrl);
  526 + httpPost.setHeader("Authorization", "Basic bWljcm8tcG9ydGFsOlBAc3MxMjM0");
  527 + httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  528 +
  529 + List<NameValuePair> params = new ArrayList<>();
  530 + params.add(new BasicNameValuePair("grant_type", "password"));
  531 + params.add(new BasicNameValuePair("username", username));
  532 + params.add(new BasicNameValuePair("password", password)); // 密码脱敏
  533 +
  534 + log.info("Token请求URL: {}", tokenUrl);
  535 + log.info("请求参数 - grant_type: password, username: {}", username);
  536 +
  537 + httpPost.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
  538 +
  539 + try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
  540 + int statusCode = response.getStatusLine().getStatusCode();
  541 + log.info("Token接口响应状态码: {}", statusCode);
  542 +
  543 + HttpEntity responseEntity = response.getEntity();
  544 + String result = EntityUtils.toString(responseEntity);
  545 + log.debug("Token接口原始响应: {}", result);
  546 +
  547 + EntityUtils.consume(responseEntity);
  548 +
  549 + if (statusCode == 200) {
  550 + JSONObject json = JSON.parseObject(result);
  551 + token = json.getString("access_token");
  552 + int expiresIn = json.getIntValue("expires_in");
  553 +
  554 + log.info("成功获取Token,有效期: {} 秒", expiresIn);
  555 +
  556 + log.info("Token已缓存到Redis,key: {},有效期: {} 秒", key, expiresIn);
  557 +
  558 + return token;
  559 + } else {
  560 + log.error("获取Token失败,状态码: {}, 响应: {}", statusCode, result);
  561 + return null;
  562 + }
  563 +
  564 + } catch (Exception e) {
  565 + log.error("调用Token接口时发生异常", e);
  566 + return null;
  567 + }
  568 +
  569 + } finally {
  570 + long endTime = System.currentTimeMillis();
  571 + log.info("获取Token操作完成,耗时: {} 毫秒", (endTime - startTime));
  572 + }
  573 + }
  574 +
435 575 private List<Object> initConnectAndSelectData() {
436 576 Connection connection = null;
437 577 PreparedStatement statement = null;
... ... @@ -674,4 +814,24 @@ public class PjDeviceReportService {
674 814 throw e;
675 815 }
676 816 }
  817 +
  818 + public static Double toDouble(Object obj) {
  819 + if (obj == null) return null;
  820 + if (obj instanceof Double) {
  821 + return (Double) obj;
  822 + } else if (obj instanceof Long) {
  823 + return ((Long) obj).doubleValue();
  824 + } else if (obj instanceof Integer) {
  825 + return ((Integer) obj).doubleValue();
  826 + } else if (obj instanceof Number) {
  827 + return ((Number) obj).doubleValue();
  828 + } else if (obj instanceof String) {
  829 + try {
  830 + return Double.parseDouble((String) obj);
  831 + } catch (NumberFormatException e) {
  832 + return null;
  833 + }
  834 + }
  835 + return null;
  836 + }
677 837 }
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import java.io.*;
  4 +import java.nio.file.*;
  5 +import java.time.LocalDateTime;
  6 +import java.time.ZoneId;
  7 +import java.time.format.DateTimeFormatter;
  8 +
  9 +public class DeviceDataStorage {
  10 +
  11 + private static final String DATA_DIR = "/thingskit/iot-scheduler/reportData"; // 存储目录
  12 + private static final DateTimeFormatter TIMESTAMP_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  13 +
  14 + static {
  15 + // 确保存储目录存在
  16 + try {
  17 + Files.createDirectories(Paths.get(DATA_DIR));
  18 + } catch (IOException e) {
  19 + e.printStackTrace();
  20 + }
  21 + }
  22 +
  23 + /**
  24 + * 更新设备数据(追加一条记录)
  25 + *
  26 + * @param deviceCode 设备编码
  27 + * @param timestamp 时间戳(毫秒)
  28 + * @param value 实时值
  29 + */
  30 + public synchronized void updateData(String deviceCode, long timestamp, double value) {
  31 + String fileName = getFileName(deviceCode);
  32 + String line = formatDataLine(timestamp, value) + System.lineSeparator();
  33 + try (FileWriter fw = new FileWriter(fileName, true);
  34 + BufferedWriter bw = new BufferedWriter(fw)) {
  35 + bw.write(line);
  36 + } catch (IOException e) {
  37 + System.err.println("写入设备数据失败: " + deviceCode);
  38 + e.printStackTrace();
  39 + }
  40 + }
  41 +
  42 + /**
  43 + * 查询设备上次存储的数据(最新一条)
  44 + *
  45 + * @param deviceCode 设备编码
  46 + * @return DeviceRecord 对象,若无数据则返回 null
  47 + */
  48 + public DeviceRecord getLastRecord(String deviceCode) {
  49 + String fileName = getFileName(deviceCode);
  50 + File file = new File(fileName);
  51 + if (!file.exists()) {
  52 + return null;
  53 + }
  54 +
  55 + String lastLine = null;
  56 + try (BufferedReader br = new BufferedReader(new FileReader(file))) {
  57 + String line;
  58 + while ((line = br.readLine()) != null) {
  59 + if (!line.trim().isEmpty()) {
  60 + lastLine = line;
  61 + }
  62 + }
  63 + } catch (IOException e) {
  64 + System.err.println("读取设备数据失败: " + deviceCode);
  65 + e.printStackTrace();
  66 + return null;
  67 + }
  68 +
  69 + if (lastLine == null) {
  70 + return null;
  71 + }
  72 + return parseRecord(lastLine);
  73 + }
  74 +
  75 + /**
  76 + * 根据设备编码和时间戳查询指定时间的数据(精确匹配)
  77 + *
  78 + * @param deviceCode 设备编码
  79 + * @param timestamp 时间戳(毫秒)
  80 + * @return DeviceRecord 对象,若不存在则返回 null
  81 + */
  82 + public DeviceRecord getRecordByTimestamp(String deviceCode, long timestamp) {
  83 + String fileName = getFileName(deviceCode);
  84 + File file = new File(fileName);
  85 + if (!file.exists()) {
  86 + return null;
  87 + }
  88 +
  89 + try (BufferedReader br = new BufferedReader(new FileReader(file))) {
  90 + String line;
  91 + while ((line = br.readLine()) != null) {
  92 + if (line.trim().isEmpty()) continue;
  93 + DeviceRecord record = parseRecord(line);
  94 + if (record != null && record.getTimestamp() == timestamp) {
  95 + return record;
  96 + }
  97 + }
  98 + } catch (IOException e) {
  99 + System.err.println("读取设备数据失败: " + deviceCode);
  100 + e.printStackTrace();
  101 + }
  102 + return null;
  103 + }
  104 +
  105 + // 获取设备文件路径
  106 + private String getFileName(String deviceCode) {
  107 + return DATA_DIR + File.separator + deviceCode + ".txt";
  108 + }
  109 +
  110 + // 格式化存储行: 时间戳 | 数值
  111 + private String formatDataLine(long timestamp, double value) {
  112 + String timeStr = formatTimestamp(timestamp);
  113 + return timeStr + " | " + value + " | " + timestamp;
  114 + }
  115 +
  116 + // 将时间戳转为可读格式
  117 + private String formatTimestamp(long timestamp) {
  118 + LocalDateTime dateTime = LocalDateTime.ofInstant(
  119 + java.time.Instant.ofEpochMilli(timestamp),
  120 + ZoneId.systemDefault()
  121 + );
  122 + return dateTime.format(TIMESTAMP_FORMAT);
  123 + }
  124 +
  125 + // 解析记录行
  126 + private DeviceRecord parseRecord(String line) {
  127 + String[] parts = line.split("\\|");
  128 + if (parts.length < 3) {
  129 + // 兼容旧格式
  130 + if (parts.length == 2) {
  131 + try {
  132 + String timestampStr = parts[0].trim();
  133 + double value = Double.parseDouble(parts[1].trim());
  134 + LocalDateTime ldt = LocalDateTime.parse(timestampStr, TIMESTAMP_FORMAT);
  135 + long timestamp = ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
  136 + return new DeviceRecord(timestamp, value);
  137 + } catch (Exception e) {
  138 + return null;
  139 + }
  140 + }
  141 + return null;
  142 + }
  143 +
  144 + try {
  145 + long timestamp = Long.parseLong(parts[2].trim());
  146 + double value = Double.parseDouble(parts[1].trim());
  147 + return new DeviceRecord(timestamp, value);
  148 + } catch (NumberFormatException e) {
  149 + return null;
  150 + }
  151 + }
  152 +
  153 + // 记录实体类
  154 + public static class DeviceRecord {
  155 + private final long timestamp; // 毫秒时间戳
  156 + private final double value;
  157 +
  158 + public DeviceRecord(long timestamp, double value) {
  159 + this.timestamp = timestamp;
  160 + this.value = value;
  161 + }
  162 +
  163 + public long getTimestamp() {
  164 + return timestamp;
  165 + }
  166 +
  167 + public double getValue() {
  168 + return value;
  169 + }
  170 +
  171 + @Override
  172 + public String toString() {
  173 + LocalDateTime dateTime = LocalDateTime.ofInstant(
  174 + java.time.Instant.ofEpochMilli(timestamp),
  175 + ZoneId.systemDefault()
  176 + );
  177 + return "DeviceRecord{" +
  178 + "timestamp=" + timestamp +
  179 + " (" + dateTime.format(TIMESTAMP_FORMAT) + ")" +
  180 + ", value=" + value +
  181 + '}';
  182 + }
  183 + }
  184 +
  185 +
  186 + public static void main(String[] args) {
  187 + DeviceDataStorage storage = new DeviceDataStorage();
  188 +
  189 + // 模拟不同时间点的数据(时间戳作为参数传入)
  190 + long time1 = 1718352000000L; // 2024-06-14 16:00:00
  191 + long time2 = 1718438400000L; // 2024-06-15 16:00:00
  192 + long time3 = 1718524800000L; // 2024-06-16 16:00:00
  193 +
  194 + // 更新设备数据(传入时间戳)
  195 + storage.updateData("device_001", time1, 23.5);
  196 + storage.updateData("device_001", time2, 24.1);
  197 + storage.updateData("device_001", time3, 25.3);
  198 +
  199 + storage.updateData("device_002", time1, 68.3);
  200 + storage.updateData("device_002", time2, 69.1);
  201 +
  202 + // 查询上次存储的数据
  203 + DeviceDataStorage.DeviceRecord lastRecord = storage.getLastRecord("device_001");
  204 + System.out.println("device_001 最后一条数据: " + lastRecord);
  205 +
  206 + // 根据具体时间戳查询
  207 + DeviceDataStorage.DeviceRecord record = storage.getRecordByTimestamp("device_001", time2);
  208 + System.out.println("device_001 指定时间的数据: " + record);
  209 +
  210 + // 查询不存在的设备
  211 + DeviceDataStorage.DeviceRecord notExist = storage.getLastRecord("device_999");
  212 + System.out.println("device_999: " + (notExist == null ? "无数据" : notExist));
  213 + }
  214 +
  215 +}
\ No newline at end of file
... ...
... ... @@ -165,7 +165,8 @@ pj:
165 165 when 544 then '电'
166 166 when 545 then '水'
167 167 when 546 then '天然气' end AS energyType,
168   - COALESCE (tkl.dbl_v, tkl.long_v, 0) AS deviceValue
  168 + COALESCE (tkl.dbl_v, tkl.long_v, 0) AS deviceValue,
  169 + tkl.ts
169 170 FROM
170 171 device de
171 172 LEFT JOIN tk_organization tko ON de.organization_id = tko.id
... ...