Commit fad2ae65d743f7e54a500669422cda7c6bb40110

Authored by 芯火源
1 parent 6bd5ae32

fix(DEFECT-1365): 微服务模式下定时任务执行问题修复。

部分定时任务执行成功
1 package org.thingsboard.server.config.yunteng; 1 package org.thingsboard.server.config.yunteng;
2 2
  3 +import java.util.Properties;
  4 +import javax.sql.DataSource;
3 import lombok.AllArgsConstructor; 5 import lombok.AllArgsConstructor;
4 import org.springframework.context.annotation.Bean; 6 import org.springframework.context.annotation.Bean;
5 import org.springframework.context.annotation.Configuration; 7 import org.springframework.context.annotation.Configuration;
6 import org.springframework.scheduling.quartz.SchedulerFactoryBean; 8 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  9 +import org.thingsboard.server.queue.util.TbCoreComponent;
7 10
8 -import javax.sql.DataSource;  
9 -import java.util.Properties;  
10 -  
11 -/**  
12 - * 定时任务配置(单机部署建议默认走内存,如需集群需要创建qrtz数据库表/打开类注释)  
13 - */ 11 +/** 定时任务配置(单机部署建议默认走内存,如需集群需要创建qrtz数据库表/打开类注释) */
14 @Configuration 12 @Configuration
15 @AllArgsConstructor 13 @AllArgsConstructor
16 -public class ScheduleConfig  
17 -{ 14 +@TbCoreComponent
  15 +public class ScheduleConfig {
18 private final DataSource dataSource; 16 private final DataSource dataSource;
  17 +
19 @Bean 18 @Bean
20 - public SchedulerFactoryBean schedulerFactoryBean()  
21 - { 19 + public SchedulerFactoryBean schedulerFactoryBean() {
22 SchedulerFactoryBean factory = new SchedulerFactoryBean(); 20 SchedulerFactoryBean factory = new SchedulerFactoryBean();
23 factory.setDataSource(dataSource); 21 factory.setDataSource(dataSource);
24 22
@@ -31,16 +29,21 @@ public class ScheduleConfig @@ -31,16 +29,21 @@ public class ScheduleConfig
31 prop.put("org.quartz.threadPool.threadCount", "50"); 29 prop.put("org.quartz.threadPool.threadCount", "50");
32 prop.put("org.quartz.threadPool.threadPriority", "5"); 30 prop.put("org.quartz.threadPool.threadPriority", "5");
33 // JobStore配置 31 // JobStore配置
34 - prop.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore"); 32 + prop.put(
  33 + "org.quartz.jobStore.class",
  34 + "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
35 // 集群配置 35 // 集群配置
36 prop.put("org.quartz.jobStore.isClustered", "true"); 36 prop.put("org.quartz.jobStore.isClustered", "true");
37 prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000"); 37 prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
38 prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1"); 38 prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
39 39
40 // sqlserver 启用 40 // sqlserver 启用
41 - // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?"); 41 + // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE
  42 + // LOCK_NAME = ?");
42 // PostgreSQL 启用 43 // PostgreSQL 启用
43 - prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate"); 44 + prop.put(
  45 + "org.quartz.jobStore.driverDelegateClass",
  46 + "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
44 prop.put("org.quartz.jobStore.misfireThreshold", "12000"); 47 prop.put("org.quartz.jobStore.misfireThreshold", "12000");
45 prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_"); 48 prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
46 factory.setQuartzProperties(prop); 49 factory.setQuartzProperties(prop);
1 package org.thingsboard.server.controller.yunteng; 1 package org.thingsboard.server.controller.yunteng;
2 2
  3 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;
  4 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.ORDER_TYPE;
  5 +
3 import io.swagger.annotations.Api; 6 import io.swagger.annotations.Api;
4 import io.swagger.annotations.ApiOperation; 7 import io.swagger.annotations.ApiOperation;
  8 +import java.util.HashMap;
  9 +import java.util.List;
  10 +import javax.annotation.PostConstruct;
5 import lombok.RequiredArgsConstructor; 11 import lombok.RequiredArgsConstructor;
6 import org.apache.commons.lang3.StringUtils; 12 import org.apache.commons.lang3.StringUtils;
7 import org.quartz.SchedulerException; 13 import org.quartz.SchedulerException;
@@ -19,16 +25,12 @@ import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO; @@ -19,16 +25,12 @@ import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO;
19 import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum; 25 import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum;
20 import org.thingsboard.server.common.data.yunteng.utils.tools.ResponseResult; 26 import org.thingsboard.server.common.data.yunteng.utils.tools.ResponseResult;
21 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData; 27 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  28 +import org.thingsboard.server.common.msg.queue.ServiceType;
22 import org.thingsboard.server.controller.BaseController; 29 import org.thingsboard.server.controller.BaseController;
23 import org.thingsboard.server.dao.util.yunteng.CronUtils; 30 import org.thingsboard.server.dao.util.yunteng.CronUtils;
24 import org.thingsboard.server.dao.util.yunteng.ScheduleUtils; 31 import org.thingsboard.server.dao.util.yunteng.ScheduleUtils;
25 import org.thingsboard.server.dao.yunteng.service.TkSysJobService; 32 import org.thingsboard.server.dao.yunteng.service.TkSysJobService;
26 -  
27 -import java.util.HashMap;  
28 -import java.util.List;  
29 -  
30 -import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;  
31 -import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.ORDER_TYPE; 33 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
32 34
33 /** 调度任务信息操作处理 */ 35 /** 调度任务信息操作处理 */
34 @RestController 36 @RestController
@@ -38,6 +40,7 @@ import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant. @@ -38,6 +40,7 @@ import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.
38 @PreAuthorize("@check.checkPermissions({'SYS_ADMIN','PLATFORM_ADMIN'},{})") 40 @PreAuthorize("@check.checkPermissions({'SYS_ADMIN','PLATFORM_ADMIN'},{})")
39 public class SysJobController extends BaseController { 41 public class SysJobController extends BaseController {
40 private final TkSysJobService jobService; 42 private final TkSysJobService jobService;
  43 + private final TbServiceInfoProvider serviceInfoProvider;
41 44
42 @GetMapping( 45 @GetMapping(
43 path = "/page", 46 path = "/page",
@@ -90,7 +93,7 @@ public class SysJobController extends BaseController { @@ -90,7 +93,7 @@ public class SysJobController extends BaseController {
90 @ApiOperation(value = "任务调度立即执行一次") 93 @ApiOperation(value = "任务调度立即执行一次")
91 public ResponseResult<Boolean> run(@PathVariable("id") String id) throws SchedulerException { 94 public ResponseResult<Boolean> run(@PathVariable("id") String id) throws SchedulerException {
92 SysJobDTO jobDTO = jobService.selectJobById(id); 95 SysJobDTO jobDTO = jobService.selectJobById(id);
93 - if(null == jobDTO){ 96 + if (null == jobDTO) {
94 throw new TkDataValidationException(ErrorMessage.INTERNAL_ERROR.getMessage()); 97 throw new TkDataValidationException(ErrorMessage.INTERNAL_ERROR.getMessage());
95 } 98 }
96 boolean result = jobService.run(jobDTO); 99 boolean result = jobService.run(jobDTO);
@@ -142,8 +145,7 @@ public class SysJobController extends BaseController { @@ -142,8 +145,7 @@ public class SysJobController extends BaseController {
142 return ResponseResult.success(dateList); 145 return ResponseResult.success(dateList);
143 } 146 }
144 147
145 - private ResponseResult<SysJobDTO> saveOrUpdate(SysJobDTO job)  
146 - throws SchedulerException{ 148 + private ResponseResult<SysJobDTO> saveOrUpdate(SysJobDTO job) throws SchedulerException {
147 String message = StringUtils.isEmpty(job.getId()) ? "新增任务'" : "修改任务'"; 149 String message = StringUtils.isEmpty(job.getId()) ? "新增任务'" : "修改任务'";
148 if (!CronUtils.isValid(job.getCronExpression())) { 150 if (!CronUtils.isValid(job.getCronExpression())) {
149 throw new TkDataValidationException(message + job.getJobName() + "'失败,Cron表达式不正确"); 151 throw new TkDataValidationException(message + job.getJobName() + "'失败,Cron表达式不正确");
@@ -171,4 +173,13 @@ public class SysJobController extends BaseController { @@ -171,4 +173,13 @@ public class SysJobController extends BaseController {
171 } 173 }
172 return ResponseResult.success(jobService.saveOrUpdateJob(job)); 174 return ResponseResult.success(jobService.saveOrUpdateJob(job));
173 } 175 }
  176 +
  177 + /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */
  178 + @PostConstruct
  179 + public void init() throws SchedulerException {
  180 + if (!serviceInfoProvider.isService(ServiceType.TB_CORE)) {
  181 + return;
  182 + }
  183 + jobService.init();
  184 + }
174 } 185 }
@@ -44,8 +44,8 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE @@ -44,8 +44,8 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
44 private final CacheUtils cacheUtils; 44 private final CacheUtils cacheUtils;
45 private final TkDeviceTaskCenterService tkDeviceTaskCenterService; 45 private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
46 private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS; 46 private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
47 - /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */  
48 - @PostConstruct 47 +
  48 + @Override
49 public void init() throws SchedulerException { 49 public void init() throws SchedulerException {
50 scheduler.clear(); 50 scheduler.clear();
51 try { 51 try {
@@ -114,4 +114,6 @@ public interface TkSysJobService { @@ -114,4 +114,6 @@ public interface TkSysJobService {
114 * @return 定时任务信息 114 * @return 定时任务信息
115 */ 115 */
116 SysJobDTO findSysJobBySourceId(String sourceId); 116 SysJobDTO findSysJobBySourceId(String sourceId);
  117 +
  118 + void init() throws SchedulerException;
117 } 119 }