Commit eaebe64a500bb229b92b75d89bed98a8206908d6

Authored by 胡翰林
1 parent d1e65862

红三七设备数据同步

... ... @@ -18,6 +18,8 @@ public class HealthController {
18 18 private GhDevicePullService ghDevicePullService;
19 19 @Resource
20 20 private ShzzDevicePullService shzzDevicePullService;
  21 + @Resource
  22 + private HnHsqDeviceReportService hnHsqDeviceReportService;
21 23
22 24 @GetMapping("/health")
23 25 public String health() {
... ... @@ -26,9 +28,15 @@ public class HealthController {
26 28
27 29 @GetMapping("/manualSynchronization")
28 30 public void manualSynchronization() {
29   - hnDeviceReportService.deviceReport();
  31 + hnHsqDeviceReportService.deviceReport();
  32 + }
  33 +
  34 + @GetMapping("/hsqDeviceReport")
  35 + public void hsqDeviceReport() {
  36 + hnHsqDeviceReportService.deviceReport();
30 37 }
31 38
  39 +
32 40 @GetMapping("/pullSynchronization")
33 41 public void pullSynchronization() {
34 42 shDevicePullService.pullDeviceAndPushToIot();
... ...
  1 +package com.iot.scheduler.service;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.iot.scheduler.utils.HttpClientUtils;
  5 +import com.zaxxer.hikari.HikariConfig;
  6 +import com.zaxxer.hikari.HikariDataSource;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.apache.commons.collections4.CollectionUtils;
  9 +import org.apache.commons.lang3.StringUtils;
  10 +import org.apache.http.entity.ContentType;
  11 +import org.apache.http.entity.StringEntity;
  12 +import org.springframework.beans.factory.annotation.Value;
  13 +import org.springframework.stereotype.Service;
  14 +
  15 +import java.io.*;
  16 +import java.sql.*;
  17 +import java.time.Instant;
  18 +import java.time.LocalDateTime;
  19 +import java.time.ZoneId;
  20 +import java.time.format.DateTimeFormatter;
  21 +import java.util.ArrayList;
  22 +import java.util.HashMap;
  23 +import java.util.List;
  24 +import java.util.Map;
  25 +
  26 +@Slf4j
  27 +@Service
  28 +/**
  29 + * 淮南红三七设备上报
  30 + */
  31 +public class HnHsqDeviceReportService {
  32 +
  33 + @Value("${hnhsq.third.tokenUrl:http://58.243.79.51:32210/auth/oauth/token}")
  34 + private String tokenUrl;
  35 +
  36 + @Value("${hnhsq.third.tokenUser:hnsq}")
  37 + private String tokenUser;
  38 +
  39 + @Value("${hnhsq.third.tokenPwd:abc@1234}")
  40 + private String tokenPwd;
  41 +
  42 + @Value("${hnhsq.third.reportUrl:http://58.243.79.51:31357/mainApi/formEngine/formData/batchAddOrUpate}")
  43 + private String reportUrl;
  44 +
  45 + @Value("${hnhsq.third.jdbcUrl:jdbc:postgresql://106.15.73.210:5433/thingskit}")
  46 + private String jdbcUrl;
  47 +
  48 + @Value("${hnhsq.third.jdbcUserName:postgres}")
  49 + private String jdbcUserName;
  50 +
  51 + @Value("${hnhsq.third.jdbcPassword:postgres}")
  52 + private String jdbcPassword;
  53 +
  54 + @Value("${hnhsq.third.selectSql:SELECT * FROM ts_kv_dictionary;}")
  55 + private String selectSql;
  56 +
  57 + private static final String companyName = "安徽红三七药业有限公司";
  58 +
  59 + public static final String lastTimeFilePath = "/thingskit/iot-scheduler/lastts/hsq.txt";
  60 +// public static final String lastTimeFilePath = "D:/data/lastts/hsq.txt";
  61 +
  62 + public void deviceReport() {
  63 + log.info("开始执行淮南红三七设备属性上报任务");
  64 + try {
  65 + List<Object> needSyncDataList = initConnectAndSelectData();
  66 + log.info("数据库查询完成,共获取{}条数据", needSyncDataList.size());
  67 + if(CollectionUtils.isNotEmpty(needSyncDataList)){
  68 + List<Map<String, Object>> paramsList = switchData(needSyncDataList);
  69 + String token = getToken();
  70 + log.info("token:"+token);
  71 + batchAddOrUpate(token, paramsList);
  72 + }
  73 + } catch (Exception e) {
  74 + log.error("执行设备属性上报任务时发生异常", e);
  75 + } finally {
  76 + log.info("淮南红三七设备属性上报任务执行结束");
  77 + }
  78 + }
  79 +
  80 + public List<Map<String, Object>> switchData(List<Object> hcDeviceList) {
  81 + List<Map<String, Object>> result = new ArrayList<>();
  82 + if (CollectionUtils.isEmpty(hcDeviceList)) {
  83 + return new ArrayList<>();
  84 + }
  85 + for (Object device : hcDeviceList) {
  86 + Map<String, Object> data = new HashMap<>();
  87 + List<Object> dataList = (ArrayList) device;
  88 + String deviceName = dataList.get(0).toString();
  89 + String deviceCode = dataList.get(1).toString();
  90 + Long ts = Long.parseLong(dataList.get(2).toString());
  91 + long status = Long.parseLong(dataList.get(3).toString());
  92 + String statusStr = "在线";
  93 + if (status == 0) {
  94 + statusStr = "离线";
  95 + }
  96 +
  97 +
  98 + data.put("entName", companyName);
  99 + data.put("deviceName",deviceName);
  100 + data.put("deviceCode", deviceCode);
  101 + data.put("reportTime", tsFormart(ts));
  102 + data.put("status", statusStr);
  103 +
  104 + result.add(data);
  105 + }
  106 +
  107 + return result;
  108 + }
  109 +
  110 + public static String tsFormart(Long ts) {
  111 + Instant instant = Instant.ofEpochMilli(ts);
  112 + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
  113 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  114 + String formattedDate = localDateTime.format(formatter);
  115 + return formattedDate;
  116 + }
  117 +
  118 + private List<Object> initConnectAndSelectData() {
  119 + Connection connection = null;
  120 + PreparedStatement statement = null;
  121 + ResultSet resultSet = null;
  122 + HikariDataSource dataSource = null;
  123 + List<Object> resultList = new ArrayList<>();
  124 + long time = new java.util.Date().getTime();
  125 + Long lastExecTs = getLastExecTs();
  126 + String currentSelSql = selectSql + " and kv.ts >=" + lastExecTs + " and kv.ts<" + time;
  127 + log.info("开始连接数据库,URL: {}", jdbcUrl);
  128 +
  129 + try {
  130 + HikariConfig config = new HikariConfig();
  131 + config.setJdbcUrl(jdbcUrl);
  132 + config.setUsername(jdbcUserName);
  133 + config.setPassword(jdbcPassword);
  134 + config.setDriverClassName("org.postgresql.Driver");
  135 + config.setMaximumPoolSize(5);
  136 + config.setMinimumIdle(5);
  137 + config.setConnectionTimeout(60000);
  138 + config.setConnectionTestQuery("SELECT 1");
  139 +
  140 + dataSource = new HikariDataSource(config);
  141 + log.info("Hikari连接池配置完成");
  142 +
  143 + connection = dataSource.getConnection();
  144 + log.info("数据库连接成功");
  145 +
  146 + statement = connection.prepareStatement(currentSelSql);
  147 + log.info("执行SQL查询: {}", currentSelSql);
  148 +
  149 + resultSet = statement.executeQuery();
  150 + ResultSetMetaData metaData = resultSet.getMetaData();
  151 + int columnCount = metaData.getColumnCount();
  152 + log.info("查询结果集元数据获取成功,共{}列", columnCount);
  153 +
  154 + int rowCount = 0;
  155 + while (resultSet.next()) {
  156 + List<Object> result = new ArrayList<>(columnCount);
  157 + for (int index = 1; index <= columnCount; index++) {
  158 + int columnType = metaData.getColumnType(index);
  159 + Object value = getTypedValue(resultSet, index, columnType);
  160 + result.add(value);
  161 + }
  162 + resultList.add(result);
  163 + rowCount++;
  164 +
  165 + // 每处理1000行记录一次日志
  166 + if (rowCount % 1000 == 0) {
  167 + log.info("已处理{}行数据", rowCount);
  168 + }
  169 + }
  170 +
  171 + log.info("数据查询完成,共获取{}行数据", rowCount);
  172 +
  173 + } catch (SQLException e) {
  174 + log.error("数据库操作异常,URL: {}, 用户名: {}", jdbcUrl, jdbcUserName, e);
  175 + } catch (Exception e) {
  176 + log.error("初始化数据库连接或查询数据时发生异常", e);
  177 + } finally {
  178 + // 释放资源
  179 + try {
  180 + if (resultSet != null) resultSet.close();
  181 + if (statement != null) statement.close();
  182 + if (connection != null) connection.close();
  183 + log.info("数据库连接资源已释放");
  184 + } catch (SQLException e) {
  185 + log.error("关闭数据库资源时发生异常", e);
  186 + }
  187 +
  188 + if (dataSource != null) {
  189 + try {
  190 + dataSource.close();
  191 + log.info("HikariDataSource连接池已关闭");
  192 + } catch (Exception e) {
  193 + log.error("关闭HikariDataSource连接池时发生异常", e);
  194 + }
  195 + }
  196 + }
  197 + setLastExecTs(time);
  198 + log.info("数据库操作完成,返回{}条记录", resultList.size());
  199 + return resultList;
  200 + }
  201 +
  202 + private Object getTypedValue(ResultSet rs, int index, int sqlType) throws SQLException {
  203 + Object value;
  204 +
  205 + try {
  206 + switch (sqlType) {
  207 + case Types.BIT:
  208 + case Types.BOOLEAN:
  209 + value = rs.getBoolean(index);
  210 + return rs.wasNull() ? null : value;
  211 +
  212 + case Types.TINYINT:
  213 + case Types.SMALLINT:
  214 + case Types.INTEGER:
  215 + value = rs.getInt(index);
  216 + return rs.wasNull() ? null : value;
  217 +
  218 + case Types.BIGINT:
  219 + value = rs.getLong(index);
  220 + return rs.wasNull() ? null : value;
  221 +
  222 + case Types.FLOAT:
  223 + case Types.REAL:
  224 + value = rs.getFloat(index);
  225 + return rs.wasNull() ? null : value;
  226 +
  227 + case Types.DOUBLE:
  228 + value = rs.getDouble(index);
  229 + return rs.wasNull() ? null : value;
  230 +
  231 + case Types.NUMERIC:
  232 + case Types.DECIMAL:
  233 + value = rs.getBigDecimal(index);
  234 + return rs.wasNull() ? null : value;
  235 +
  236 + case Types.CHAR:
  237 + case Types.VARCHAR:
  238 + case Types.LONGVARCHAR:
  239 + case Types.NCHAR:
  240 + case Types.NVARCHAR:
  241 + case Types.LONGNVARCHAR:
  242 + value = rs.getString(index);
  243 + return rs.wasNull() ? null : value;
  244 +
  245 + case Types.DATE:
  246 + Date date = rs.getDate(index);
  247 + return date != null ? date.toLocalDate() : null;
  248 +
  249 + case Types.TIME:
  250 + Time time = rs.getTime(index);
  251 + return time != null ? time.toLocalTime() : null;
  252 +
  253 + case Types.TIMESTAMP:
  254 + Timestamp timestamp = rs.getTimestamp(index);
  255 + return timestamp != null ? timestamp.toLocalDateTime() : null;
  256 +
  257 + case Types.BINARY:
  258 + case Types.VARBINARY:
  259 + case Types.LONGVARBINARY:
  260 + value = rs.getBytes(index);
  261 + return rs.wasNull() ? null : value;
  262 +
  263 + case Types.BLOB:
  264 + value = rs.getBlob(index);
  265 + return rs.wasNull() ? null : value;
  266 +
  267 + case Types.CLOB:
  268 + value = rs.getClob(index);
  269 + return rs.wasNull() ? null : value;
  270 +
  271 + default:
  272 + value = rs.getObject(index);
  273 + return rs.wasNull() ? null : value;
  274 + }
  275 + } catch (SQLException e) {
  276 + log.error("获取结果集第{}列数据时发生异常,数据类型: {}", index, sqlType, e);
  277 + throw e;
  278 + }
  279 + }
  280 +
  281 + private void setLastExecTs(Long ts) {
  282 + try {
  283 + File file = new File(lastTimeFilePath);
  284 + FileWriter fw = new FileWriter(file);
  285 + fw.write(ts.toString());
  286 + fw.close();
  287 + } catch (IOException e) {
  288 + e.printStackTrace();
  289 + }
  290 + }
  291 +
  292 + private Long getLastExecTs() {
  293 + try {
  294 + File file = new File(lastTimeFilePath);
  295 + BufferedReader br = new BufferedReader(new FileReader(file));
  296 + try {
  297 + String line;
  298 + while ((line = br.readLine()) != null) {
  299 + if (StringUtils.isNotBlank(line)) {
  300 + return Long.parseLong(line);
  301 + }
  302 + }
  303 +
  304 + } catch (Exception ex) {
  305 + ex.printStackTrace();
  306 + } finally {
  307 + br.close();
  308 + }
  309 +
  310 +
  311 + } catch (IOException e) {
  312 + e.printStackTrace();
  313 + }
  314 +
  315 + //2026-01-27 00:00:00
  316 + return 1769443200000L;
  317 + }
  318 +
  319 + private String getToken() {
  320 +
  321 + Map<String, String> header = new HashMap<>();
  322 + Map<String, String> paramsMap = new HashMap<>();
  323 + paramsMap.put("grant_type", "password");
  324 + paramsMap.put("username", tokenUser);
  325 + paramsMap.put("password", tokenPwd);
  326 + header.put("Authorization", "Basic bWljcm8tcG9ydGFsOlBAc3MxMjM0");
  327 +
  328 + String sResult = HttpClientUtils.doPostRequest(tokenUrl, header, paramsMap, null);
  329 + if (StringUtils.isNotBlank(sResult)) {
  330 + JSONObject jsonObject = JSONObject.parseObject(sResult);
  331 + return jsonObject.getString("access_token");
  332 + } else {
  333 + return null;
  334 + }
  335 +
  336 + }
  337 +
  338 + private void batchAddOrUpate(String token, List<Map<String, Object>> dataList) {
  339 +
  340 + Map<String, String> header = new HashMap<>();
  341 +
  342 + Map<String, Object> paramsMap = new HashMap<>();
  343 + paramsMap.put("formId", "t67c1212b01915800070e7712");
  344 + List<String> uniqueKeys = new ArrayList<>();
  345 + uniqueKeys.add("entName");
  346 + uniqueKeys.add("deviceCode");
  347 + paramsMap.put("uniqueKeys", uniqueKeys);
  348 +
  349 + paramsMap.put("datas", dataList);
  350 +
  351 + header.put("Authorization", "Bearer" + token);
  352 +
  353 + String toJson = JSONObject.toJSONString(paramsMap);
  354 + StringEntity myEntity = new StringEntity(toJson, ContentType.APPLICATION_JSON);
  355 + String sResult = HttpClientUtils.doPostRequest(reportUrl, header, null, myEntity);
  356 + log.info("淮南红三七上报结果:" + sResult);
  357 + }
  358 +}
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import org.apache.commons.collections4.MapUtils;
  4 +import org.apache.commons.lang3.StringUtils;
  5 +import org.apache.http.*;
  6 +import org.apache.http.client.ClientProtocolException;
  7 +import org.apache.http.client.config.RequestConfig;
  8 +import org.apache.http.client.entity.UrlEncodedFormEntity;
  9 +import org.apache.http.client.methods.CloseableHttpResponse;
  10 +import org.apache.http.client.methods.HttpGet;
  11 +import org.apache.http.client.methods.HttpPost;
  12 +import org.apache.http.client.methods.HttpPut;
  13 +import org.apache.http.entity.StringEntity;
  14 +import org.apache.http.impl.client.CloseableHttpClient;
  15 +import org.apache.http.message.BasicNameValuePair;
  16 +import org.apache.http.util.EntityUtils;
  17 +import org.slf4j.Logger;
  18 +import org.slf4j.LoggerFactory;
  19 +
  20 +import java.io.IOException;
  21 +import java.util.ArrayList;
  22 +import java.util.List;
  23 +import java.util.Map;
  24 +
  25 +public class HttpClientUtils {
  26 +
  27 + private static final Logger log = LoggerFactory.getLogger(HttpClientUtils.class);
  28 + /**
  29 + * 发送post请求
  30 + *
  31 + * @param url:请求地址
  32 + * @param header:请求头参数
  33 + * @param params:表单参数 form提交
  34 + * @param httpEntity json/xml参数
  35 + * @return
  36 + */
  37 + public static String doPostRequest(String url, Map<String, String> header, Map<String, String> params, HttpEntity httpEntity) {
  38 + String resultStr = "";
  39 + if (StringUtils.isEmpty(url)) {
  40 + return resultStr;
  41 + }
  42 +
  43 + CloseableHttpClient httpClient = null;
  44 + CloseableHttpResponse httpResponse = null;
  45 + try {
  46 + httpClient = SSLClientCustom.getHttpClinet();
  47 + HttpPost httpPost = new HttpPost(url);
  48 + //请求头header信息
  49 + if (MapUtils.isNotEmpty(header)) {
  50 + for (Map.Entry<String, String> stringStringEntry : header.entrySet()) {
  51 + httpPost.setHeader(stringStringEntry.getKey(), stringStringEntry.getValue());
  52 + log.info("请求header信息,key:{},value:{}", stringStringEntry.getKey(), stringStringEntry.getValue());
  53 + }
  54 + }
  55 + //请求参数信息
  56 + if (MapUtils.isNotEmpty(params)) {
  57 + List<NameValuePair> paramList = new ArrayList<NameValuePair>();
  58 + for (Map.Entry<String, String> stringStringEntry : params.entrySet()) {
  59 + paramList.add(new BasicNameValuePair(stringStringEntry.getKey(), stringStringEntry.getValue()));
  60 + log.info("请求参数信息,key:{},value:{}", stringStringEntry.getKey(), stringStringEntry.getValue());
  61 + }
  62 + UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(paramList, Consts.UTF_8);
  63 + httpPost.setEntity(urlEncodedFormEntity);
  64 + }
  65 + //实体设置
  66 + if (httpEntity != null) {
  67 + String json = EntityUtils.toString(httpEntity);
  68 + log.info("请求参数信息{}", json);
  69 + httpPost.setEntity(httpEntity);
  70 + }
  71 +// httpPost.setHeader("Content-type", "application/x-www-form-urlencoded");
  72 + //发起请求
  73 + httpResponse = httpClient.execute(httpPost);
  74 + int statusCode = httpResponse.getStatusLine().getStatusCode();
  75 + if (statusCode == HttpStatus.SC_OK||statusCode == HttpStatus.SC_UNAUTHORIZED) {
  76 + HttpEntity httpResponseEntity = httpResponse.getEntity();
  77 + resultStr = EntityUtils.toString(httpResponseEntity);
  78 + //log.info("请求正常,请求地址:{},响应结果:{}", url, statusCode+";"+resultStr);
  79 + } else {
  80 + StringBuffer stringBuffer = new StringBuffer();
  81 + HeaderIterator headerIterator = httpResponse.headerIterator();
  82 + while (headerIterator.hasNext()) {
  83 + stringBuffer.append("\t" + headerIterator.next());
  84 + }
  85 + log.info("异常信息:请求地址:{},响应状态和结果:{}",url,statusCode+";"+stringBuffer);
  86 + }
  87 +
  88 + } catch (Exception e) {
  89 + log.info("请求地址:{}", url);
  90 + e.printStackTrace();
  91 + } finally {
  92 + HttpClientUtils.closeConnection(httpClient, httpResponse);
  93 + }
  94 + return resultStr;
  95 + }
  96 +
  97 + public static String doGetRequest(String url, Map<String, String> header, Map<String, Object> params) {
  98 + String resultStr = "";
  99 + if (StringUtils.isEmpty(url)) {
  100 + return resultStr;
  101 + }
  102 + CloseableHttpClient httpClient = null;
  103 + CloseableHttpResponse httpResponse = null;
  104 + try {
  105 + httpClient = SSLClientCustom.getHttpClinet();
  106 + //请求参数信息
  107 + if (MapUtils.isNotEmpty(params)) {
  108 + url = url + buildUrl(params);
  109 + }
  110 + HttpGet httpGet = new HttpGet(url);
  111 + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)//连接超时
  112 + .setConnectionRequestTimeout(25000)//请求超时
  113 + .setSocketTimeout(25000)//套接字连接超时
  114 + .setRedirectsEnabled(true).build();//允许重定向
  115 + httpGet.setConfig(requestConfig);
  116 + if (MapUtils.isNotEmpty(header)) {
  117 + for (Map.Entry<String, String> stringStringEntry : header.entrySet()) {
  118 + httpGet.setHeader(stringStringEntry.getKey(), stringStringEntry.getValue());
  119 + }
  120 + }
  121 + //发起请求
  122 + httpResponse = httpClient.execute(httpGet);
  123 + int statusCode = httpResponse.getStatusLine().getStatusCode();
  124 + if (statusCode == HttpStatus.SC_OK) {
  125 + resultStr = EntityUtils.toString(httpResponse.getEntity(), Consts.UTF_8);
  126 + log.info("请求地址:{},响应结果:{}", url, resultStr);
  127 + } else {
  128 + StringBuffer stringBuffer = new StringBuffer();
  129 + HeaderIterator headerIterator = httpResponse.headerIterator();
  130 + while (headerIterator.hasNext()) {
  131 + stringBuffer.append("\t" + headerIterator.next());
  132 + }
  133 + resultStr = EntityUtils.toString(httpResponse.getEntity(), Consts.UTF_8);
  134 + log.info("异常信息请求地址:{}", url, resultStr);
  135 + log.info("异常信息:请求响应状态:{},请求返回结果:{}", httpResponse.getStatusLine().getStatusCode(),StringUtils.isBlank(resultStr)?stringBuffer:resultStr);
  136 + }
  137 +
  138 + } catch (Exception e) {
  139 + log.info("请求地址:{}", url);
  140 + e.printStackTrace();
  141 +
  142 + } finally {
  143 + HttpClientUtils.closeConnection(httpClient, httpResponse);
  144 + }
  145 + return resultStr;
  146 + }
  147 +
  148 + public static String doGetRequest(String url, Map<String, String> header, Map<String, Object> params,StringBuffer tracecode) {
  149 + String resultStr = "";
  150 + if (StringUtils.isEmpty(url)) {
  151 + return resultStr;
  152 + }
  153 + CloseableHttpClient httpClient = null;
  154 + CloseableHttpResponse httpResponse = null;
  155 + try {
  156 + httpClient = SSLClientCustom.getHttpClinet();
  157 + //请求参数信息
  158 + if (MapUtils.isNotEmpty(params)) {
  159 + url = url + buildUrl(params);
  160 + }
  161 + HttpGet httpGet = new HttpGet(url);
  162 + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(30000)//连接超时
  163 + .setConnectionRequestTimeout(30000)//请求超时
  164 + .setSocketTimeout(30000)//套接字连接超时
  165 + .setRedirectsEnabled(true).build();//允许重定向
  166 + httpGet.setConfig(requestConfig);
  167 + if (MapUtils.isNotEmpty(header)) {
  168 + for (Map.Entry<String, String> stringStringEntry : header.entrySet()) {
  169 + httpGet.setHeader(stringStringEntry.getKey(), stringStringEntry.getValue());
  170 + }
  171 + }
  172 + //发起请求
  173 + httpResponse = httpClient.execute(httpGet);
  174 + int statusCode = httpResponse.getStatusLine().getStatusCode();
  175 + if (statusCode == HttpStatus.SC_OK) {
  176 + Header[] tracecodes = httpResponse.getHeaders("Tracecode");
  177 + tracecode.append(tracecodes[0].getValue());
  178 + resultStr = EntityUtils.toString(httpResponse.getEntity(), Consts.UTF_8);
  179 + log.info("请求地址:{},响应结果:{}", url, resultStr);
  180 + } else {
  181 + StringBuffer stringBuffer = new StringBuffer();
  182 + HeaderIterator headerIterator = httpResponse.headerIterator();
  183 + while (headerIterator.hasNext()) {
  184 + stringBuffer.append("\t" + headerIterator.next());
  185 + }
  186 + log.info("异常信息:请求地址:{},请求响应状态:{},请求返回结果:{}",url,httpResponse.getStatusLine().getStatusCode(), stringBuffer);
  187 + }
  188 +
  189 + } catch (Exception e) {
  190 + e.printStackTrace();
  191 + } finally {
  192 + HttpClientUtils.closeConnection(httpClient, httpResponse);
  193 + }
  194 + return resultStr;
  195 + }
  196 +
  197 +
  198 + public static String doPutRequest(String url, Map<String, String> header,StringEntity entity) {
  199 +
  200 + CloseableHttpClient closeableHttpClient = null;
  201 + CloseableHttpResponse closeableHttpResponse=null;
  202 + try {
  203 + closeableHttpClient = SSLClientCustom.getHttpClinet();
  204 + } catch (Exception e) {
  205 + e.printStackTrace();
  206 + }
  207 +
  208 +
  209 + RequestConfig requestConfig = RequestConfig.custom()
  210 + .setConnectTimeout(5000)
  211 + .setConnectionRequestTimeout(5000)
  212 + .setRedirectsEnabled(true)
  213 + .build();
  214 +
  215 + HttpPut httpPost = new HttpPut(url);
  216 +
  217 + httpPost.setConfig(requestConfig);
  218 + httpPost.setHeader("Content-Type", "application/json");
  219 +
  220 + if (MapUtils.isNotEmpty(header)) {
  221 + for (Map.Entry<String, String> stringStringEntry : header.entrySet()) {
  222 + httpPost.setHeader(stringStringEntry.getKey(), stringStringEntry.getValue());
  223 + log.info("请求header信息,key:{},value:{}", stringStringEntry.getKey(), stringStringEntry.getValue());
  224 + }
  225 + }
  226 +
  227 + String strRequest = "";
  228 + try {
  229 +// StringEntity entity = new StringEntity(jsonObject.toString(), "utf-8");
  230 + entity.setContentEncoding("utf-8");
  231 + entity.setContentType("application/json");
  232 + httpPost.setEntity(entity);
  233 +
  234 +
  235 + closeableHttpResponse = closeableHttpClient.execute(httpPost);
  236 +
  237 + if (null != closeableHttpResponse && !"".equals(closeableHttpResponse)) {
  238 + int statusCode = closeableHttpResponse.getStatusLine().getStatusCode();
  239 + if (closeableHttpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
  240 + HttpEntity httpEntity = closeableHttpResponse.getEntity();
  241 + strRequest = EntityUtils.toString(httpEntity);
  242 + log.info("请求正常,请求地址:{},响应结果:{}", url, statusCode+";"+strRequest);
  243 + } else {
  244 + StringBuffer stringBuffer = new StringBuffer();
  245 + HeaderIterator headerIterator = closeableHttpResponse.headerIterator();
  246 + while (headerIterator.hasNext()) {
  247 + stringBuffer.append("\t" + headerIterator.next());
  248 + }
  249 + strRequest = "Error Response" + statusCode;
  250 + log.info("异常信息:请求地址:{},响应状态和结果:{}",url,strRequest+";"+stringBuffer);
  251 + }
  252 + }
  253 +
  254 + } catch (ClientProtocolException e) {
  255 + e.printStackTrace();
  256 + } catch (ParseException e) {
  257 + e.printStackTrace();
  258 + } catch (IOException e) {
  259 + e.printStackTrace();
  260 + } finally {
  261 + try {
  262 + if (closeableHttpClient != null) {
  263 + closeableHttpClient.close();
  264 + }
  265 +
  266 + if(closeableHttpResponse!=null){
  267 + closeableHttpResponse.close();
  268 + }
  269 + } catch (IOException e) {
  270 + e.printStackTrace();
  271 + }
  272 + }
  273 +
  274 + return strRequest;
  275 + }
  276 +
  277 +
  278 +
  279 + /**
  280 + * 关掉连接释放资源
  281 + */
  282 + private static void closeConnection(CloseableHttpClient httpClient, CloseableHttpResponse httpResponse) {
  283 + if (httpClient != null) {
  284 + try {
  285 + httpClient.close();
  286 + } catch (IOException e) {
  287 + e.printStackTrace();
  288 + }
  289 + }
  290 + if (httpResponse != null) {
  291 + try {
  292 + httpResponse.close();
  293 + } catch (IOException e) {
  294 + e.printStackTrace();
  295 + }
  296 + }
  297 +
  298 + }
  299 +
  300 + /**
  301 + * 构造get请求的参数
  302 + *
  303 + * @return
  304 + */
  305 + private static String buildUrl(Map<String, Object> map) {
  306 + if (MapUtils.isEmpty(map)) {
  307 + return "";
  308 + }
  309 + StringBuffer stringBuffer = new StringBuffer("?");
  310 + for (Map.Entry<String, Object> stringStringEntry : map.entrySet()) {
  311 + stringBuffer.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&");
  312 + }
  313 + String result = stringBuffer.toString();
  314 + if (StringUtils.isNotEmpty(result)) {
  315 + result = result.substring(0, result.length() - 1);//去掉结尾的&连接符
  316 + }
  317 + return result;
  318 + }
  319 +
  320 +}
... ...
  1 +package com.iot.scheduler.utils;
  2 +
  3 +import org.apache.http.config.Registry;
  4 +import org.apache.http.config.RegistryBuilder;
  5 +import org.apache.http.conn.socket.ConnectionSocketFactory;
  6 +import org.apache.http.conn.socket.PlainConnectionSocketFactory;
  7 +import org.apache.http.conn.ssl.NoopHostnameVerifier;
  8 +import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
  9 +import org.apache.http.conn.ssl.TrustStrategy;
  10 +import org.apache.http.impl.client.CloseableHttpClient;
  11 +import org.apache.http.impl.client.HttpClients;
  12 +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
  13 +import org.apache.http.ssl.SSLContextBuilder;
  14 +
  15 +import java.security.KeyManagementException;
  16 +import java.security.KeyStoreException;
  17 +import java.security.NoSuchAlgorithmException;
  18 +import java.security.cert.CertificateException;
  19 +import java.security.cert.X509Certificate;
  20 +
  21 +public class SSLClientCustom {
  22 + private static final String HTTP = "http";
  23 + private static final String HTTPS = "https";
  24 + private static SSLConnectionSocketFactory sslConnectionSocketFactory = null;
  25 + private static PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = null;//连接池管理类
  26 + private static SSLContextBuilder sslContextBuilder = null;//管理Https连接的上下文类
  27 +
  28 + static {
  29 + try {
  30 + sslContextBuilder = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
  31 + @Override
  32 + public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
  33 +// 信任所有站点 直接返回true
  34 + return true;
  35 + }
  36 + });
  37 + sslConnectionSocketFactory = new SSLConnectionSocketFactory(sslContextBuilder.build(), new String[]{"SSLv2Hello", "SSLv3", "TLSv1", "TLSv1.2"}, null, NoopHostnameVerifier.INSTANCE);
  38 + Registry<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create()
  39 + .register(HTTP, new PlainConnectionSocketFactory())
  40 + .register(HTTPS, sslConnectionSocketFactory)
  41 + .build();
  42 + poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager(registryBuilder);
  43 + poolingHttpClientConnectionManager.setMaxTotal(200);
  44 + } catch (NoSuchAlgorithmException e) {
  45 + e.printStackTrace();
  46 + } catch (KeyStoreException e) {
  47 + e.printStackTrace();
  48 + } catch (KeyManagementException e) {
  49 + e.printStackTrace();
  50 + }
  51 +
  52 + }
  53 +
  54 + /**
  55 + * 获取连接
  56 + *
  57 + * @return
  58 + * @throws Exception
  59 + */
  60 + public static CloseableHttpClient getHttpClinet() throws Exception {
  61 + CloseableHttpClient httpClient = HttpClients.custom().setSSLSocketFactory(sslConnectionSocketFactory)
  62 + .setConnectionManager(poolingHttpClientConnectionManager)
  63 + .setConnectionManagerShared(true)
  64 + .build();
  65 + return httpClient;
  66 + }
  67 +}
... ...
  1 +package com.iot.scheduler.zone;
  2 +
  3 +import com.iot.scheduler.service.HnHsqDeviceReportService;
  4 +import com.iot.scheduler.task.AbstractZoneScheduler;
  5 +import jakarta.annotation.Resource;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.scheduling.annotation.Scheduled;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +@Slf4j
  11 +@Component
  12 +public class HuaiNanZoneScheduler extends AbstractZoneScheduler {
  13 + @Resource
  14 + private HnHsqDeviceReportService hnHsqDeviceReportService;
  15 +
  16 + @Override
  17 + protected String getZoneName() {
  18 + return "HuaiNan (淮南经开区)";
  19 + }
  20 +
  21 + @Scheduled(cron = "${scheduler.hsq.push:0 0 0/1 * * ?}")
  22 + public void pushHsqDevicesToThirdParty() {
  23 + String taskName = "hsq Push Devices (IoT -> 3rd Party)";
  24 + logStart(taskName);
  25 + try {
  26 + log.info("[{}] hsq pushing devices...", getZoneName());
  27 + hnHsqDeviceReportService.deviceReport();
  28 + Thread.sleep(1000);
  29 + } catch (Exception e) {
  30 + logError(taskName, e);
  31 + } finally {
  32 + logEnd(taskName);
  33 + }
  34 + }
  35 +}
... ...
... ... @@ -33,6 +33,8 @@ scheduler:
33 33 panji:
34 34 pull: "0 0/5 * * * ?"
35 35 push: "0 0/10 * * * ?"
  36 + hsq:
  37 + push: "0 0 0/1 * * ?"
36 38
37 39 hn:
38 40 third:
... ... @@ -57,6 +59,19 @@ hn:
57 59 '3304ebd5-71ae-448a-91a2-e3b4d6ae258a',
58 60 '875a4841-c7f2-4e2c-88a2-ea62d4642132'
59 61 );"
  62 +hnhsq:
  63 + third:
  64 + reportUrl: "http://58.243.79.51:31357/mainApi/formEngine/formData/batchAddOrUpate"
  65 + tokenUrl: "http://58.243.79.51:32210/auth/oauth/token"
  66 + tokenUser: "hnsq"
  67 + tokenPwd: "abc@1234"
  68 + jdbcUrl: "jdbc:postgresql://106.15.73.210:5433/thingskit"
  69 + jdbcUserName: "postgres"
  70 + jdbcPassword: "postgres"
  71 + selectSql: "SELECT d.name,d.sn,kv.ts,kv.long_v as value
  72 + from ts_kv kv
  73 + INNER JOIN device d on d.id = kv.entity_id and kv.key=61
  74 + where tenant_id='5b6a05a0-f020-11f0-9cb8-e3376d1e7978'"
60 75
61 76 sh:
62 77 iot:
... ...