Commit 04752003d20e45701b4bf0df9fddd6525147d7fe

Authored by 房远帅
1 parent 3bd04977

凯盛数据推送能源管理

@@ -117,7 +117,7 @@ public class KSDeviceReportService { @@ -117,7 +117,7 @@ public class KSDeviceReportService {
117 properties.put("collectTime", roundedTime.format(formatter)); 117 properties.put("collectTime", roundedTime.format(formatter));
118 try { 118 try {
119 log.debug("电表开始MQTT发布: sn={}", sn); 119 log.debug("电表开始MQTT发布: sn={}", sn);
120 - MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(properties)); 120 + MqttUtils.publish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
121 successCount++; 121 successCount++;
122 122
123 } catch (Exception e) { 123 } catch (Exception e) {
@@ -206,7 +206,7 @@ public class KSDeviceReportService { @@ -206,7 +206,7 @@ public class KSDeviceReportService {
206 properties.put("collectTime", roundedTime.format(formatter)); 206 properties.put("collectTime", roundedTime.format(formatter));
207 try { 207 try {
208 log.debug("水表开始MQTT发布: sn={}", sn); 208 log.debug("水表开始MQTT发布: sn={}", sn);
209 - MqttUtils.quickPublish(broker, topic, username, password, clientId, JSON.toJSONString(properties)); 209 + MqttUtils.publish(broker, topic, username, password, clientId, JSON.toJSONString(properties));
210 successCount++; 210 successCount++;
211 211
212 } catch (Exception e) { 212 } catch (Exception e) {
@@ -481,6 +481,32 @@ public class MqttUtils { @@ -481,6 +481,32 @@ public class MqttUtils {
481 } 481 }
482 } 482 }
483 483
  484 + public static void publish(String broker, String topic, String username,
  485 + String password, String clientId, String message) throws Exception {
  486 + MqttUtils mqttUtils = new MqttUtils();
  487 + MqttUtils.MqttConfig config = new MqttUtils.MqttConfig(broker)
  488 + .setClientId(clientId)
  489 + .setUsername(username)
  490 + .setPassword(password)
  491 + .setConnectionTimeout(15)
  492 + .setKeepAliveInterval(30)
  493 + .setQos(1);
  494 +
  495 + try {
  496 + // 连接
  497 + mqttUtils.connect(config);
  498 +
  499 + // 发布消息
  500 + mqttUtils.publish(topic, message);
  501 +
  502 + } catch (MqttException e) {
  503 + e.printStackTrace();
  504 + } finally {
  505 + // 断开连接
  506 + mqttUtils.disconnect();
  507 + }
  508 + }
  509 +
484 510
485 public static void main(String[] args) { 511 public static void main(String[] args) {
486 512
@@ -18,14 +18,14 @@ public class KaiShengZoneScheduler extends AbstractZoneScheduler { @@ -18,14 +18,14 @@ public class KaiShengZoneScheduler extends AbstractZoneScheduler {
18 return "KaiSheng (凯盛)"; 18 return "KaiSheng (凯盛)";
19 } 19 }
20 20
21 - @Scheduled(cron = "${scheduler.ks.push:0 0/2 * * * ?}") 21 + @Scheduled(cron = "${scheduler.ks.push:0 0/5 * * * ?}")
22 public void pushKsDevicesToThirdParty() { 22 public void pushKsDevicesToThirdParty() {
23 String taskName = "ks Push Devices (IoT -> 3rd Party)"; 23 String taskName = "ks Push Devices (IoT -> 3rd Party)";
24 logStart(taskName); 24 logStart(taskName);
25 try { 25 try {
26 log.info("[{}] ks pushing devices...", getZoneName()); 26 log.info("[{}] ks pushing devices...", getZoneName());
27 ksDeviceReportService.deviceElectricity(); 27 ksDeviceReportService.deviceElectricity();
28 -// ksDeviceReportService.deviceWater(); 28 + ksDeviceReportService.deviceWater();
29 Thread.sleep(1000); 29 Thread.sleep(1000);
30 } catch (Exception e) { 30 } catch (Exception e) {
31 logError(taskName, e); 31 logError(taskName, e);
@@ -25,8 +25,8 @@ scheduler: @@ -25,8 +25,8 @@ scheduler:
25 pull: "0 0/5 * * * ?" 25 pull: "0 0/5 * * * ?"
26 push: "0 0/10 * * * ?" 26 push: "0 0/10 * * * ?"
27 ks: 27 ks:
28 - pull: "0 0/2 * * * ?"  
29 - push: "0 0/2 * * * ?" 28 + pull: "0 0/5 * * * ?"
  29 + push: "0 0/5 * * * ?"
30 ks: 30 ks:
31 third: 31 third:
32 jdbcUrl: "jdbc:postgresql://192.168.0.249:5432/iot" 32 jdbcUrl: "jdbc:postgresql://192.168.0.249:5432/iot"