Commit 0e3b1774feb5e26ca3d71aa5d93fe3b6c7cae675

Authored by xp.Huang
2 parents 116b40ac fad2ae65

Merge branch '20230704' into 'master_dev'

fix(DEFECT-1365): 微服务模式下定时任务执行问题修复。

See merge request yunteng/thingskit!202
1 1 package org.thingsboard.server.config.yunteng;
2 2
  3 +import java.util.Properties;
  4 +import javax.sql.DataSource;
3 5 import lombok.AllArgsConstructor;
4 6 import org.springframework.context.annotation.Bean;
5 7 import org.springframework.context.annotation.Configuration;
6 8 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  9 +import org.thingsboard.server.queue.util.TbCoreComponent;
7 10
8   -import javax.sql.DataSource;
9   -import java.util.Properties;
10   -
11   -/**
12   - * 定时任务配置(单机部署建议默认走内存,如需集群需要创建qrtz数据库表/打开类注释)
13   - */
  11 +/** 定时任务配置(单机部署建议默认走内存,如需集群需要创建qrtz数据库表/打开类注释) */
14 12 @Configuration
15 13 @AllArgsConstructor
16   -public class ScheduleConfig
17   -{
  14 +@TbCoreComponent
  15 +public class ScheduleConfig {
18 16 private final DataSource dataSource;
  17 +
19 18 @Bean
20   - public SchedulerFactoryBean schedulerFactoryBean()
21   - {
  19 + public SchedulerFactoryBean schedulerFactoryBean() {
22 20 SchedulerFactoryBean factory = new SchedulerFactoryBean();
23 21 factory.setDataSource(dataSource);
24 22
... ... @@ -31,16 +29,21 @@ public class ScheduleConfig
31 29 prop.put("org.quartz.threadPool.threadCount", "50");
32 30 prop.put("org.quartz.threadPool.threadPriority", "5");
33 31 // JobStore配置
34   - prop.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
  32 + prop.put(
  33 + "org.quartz.jobStore.class",
  34 + "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
35 35 // 集群配置
36 36 prop.put("org.quartz.jobStore.isClustered", "true");
37 37 prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
38 38 prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
39 39
40 40 // sqlserver 启用
41   - // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
  41 + // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE
  42 + // LOCK_NAME = ?");
42 43 // PostgreSQL 启用
43   - prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
  44 + prop.put(
  45 + "org.quartz.jobStore.driverDelegateClass",
  46 + "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
44 47 prop.put("org.quartz.jobStore.misfireThreshold", "12000");
45 48 prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
46 49 factory.setQuartzProperties(prop);
... ...
1 1 package org.thingsboard.server.controller.yunteng;
2 2
  3 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;
  4 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.ORDER_TYPE;
  5 +
3 6 import io.swagger.annotations.Api;
4 7 import io.swagger.annotations.ApiOperation;
  8 +import java.util.HashMap;
  9 +import java.util.List;
  10 +import javax.annotation.PostConstruct;
5 11 import lombok.RequiredArgsConstructor;
6 12 import org.apache.commons.lang3.StringUtils;
7 13 import org.quartz.SchedulerException;
... ... @@ -19,16 +25,12 @@ import org.thingsboard.server.common.data.yunteng.dto.SysJobDTO;
19 25 import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum;
20 26 import org.thingsboard.server.common.data.yunteng.utils.tools.ResponseResult;
21 27 import org.thingsboard.server.common.data.yunteng.utils.tools.TkPageData;
  28 +import org.thingsboard.server.common.msg.queue.ServiceType;
22 29 import org.thingsboard.server.controller.BaseController;
23 30 import org.thingsboard.server.dao.util.yunteng.CronUtils;
24 31 import org.thingsboard.server.dao.util.yunteng.ScheduleUtils;
25 32 import org.thingsboard.server.dao.yunteng.service.TkSysJobService;
26   -
27   -import java.util.HashMap;
28   -import java.util.List;
29   -
30   -import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;
31   -import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.ORDER_TYPE;
  33 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
32 34
33 35 /** 调度任务信息操作处理 */
34 36 @RestController
... ... @@ -38,6 +40,7 @@ import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.
38 40 @PreAuthorize("@check.checkPermissions({'SYS_ADMIN','PLATFORM_ADMIN'},{})")
39 41 public class SysJobController extends BaseController {
40 42 private final TkSysJobService jobService;
  43 + private final TbServiceInfoProvider serviceInfoProvider;
41 44
42 45 @GetMapping(
43 46 path = "/page",
... ... @@ -90,7 +93,7 @@ public class SysJobController extends BaseController {
90 93 @ApiOperation(value = "任务调度立即执行一次")
91 94 public ResponseResult<Boolean> run(@PathVariable("id") String id) throws SchedulerException {
92 95 SysJobDTO jobDTO = jobService.selectJobById(id);
93   - if(null == jobDTO){
  96 + if (null == jobDTO) {
94 97 throw new TkDataValidationException(ErrorMessage.INTERNAL_ERROR.getMessage());
95 98 }
96 99 boolean result = jobService.run(jobDTO);
... ... @@ -142,8 +145,7 @@ public class SysJobController extends BaseController {
142 145 return ResponseResult.success(dateList);
143 146 }
144 147
145   - private ResponseResult<SysJobDTO> saveOrUpdate(SysJobDTO job)
146   - throws SchedulerException{
  148 + private ResponseResult<SysJobDTO> saveOrUpdate(SysJobDTO job) throws SchedulerException {
147 149 String message = StringUtils.isEmpty(job.getId()) ? "新增任务'" : "修改任务'";
148 150 if (!CronUtils.isValid(job.getCronExpression())) {
149 151 throw new TkDataValidationException(message + job.getJobName() + "'失败,Cron表达式不正确");
... ... @@ -171,4 +173,13 @@ public class SysJobController extends BaseController {
171 173 }
172 174 return ResponseResult.success(jobService.saveOrUpdateJob(job));
173 175 }
  176 +
  177 + /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */
  178 + @PostConstruct
  179 + public void init() throws SchedulerException {
  180 + if (!serviceInfoProvider.isService(ServiceType.TB_CORE)) {
  181 + return;
  182 + }
  183 + jobService.init();
  184 + }
174 185 }
... ...
... ... @@ -44,8 +44,8 @@ public class SysJobServiceImpl extends AbstractBaseService<SysJobMapper, SysJobE
44 44 private final CacheUtils cacheUtils;
45 45 private final TkDeviceTaskCenterService tkDeviceTaskCenterService;
46 46 private static final String cacheName = FastIotConstants.CacheConfigKey.TASK_CENTER_INFOS;
47   - /** 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据) */
48   - @PostConstruct
  47 +
  48 + @Override
49 49 public void init() throws SchedulerException {
50 50 scheduler.clear();
51 51 try {
... ...
... ... @@ -114,4 +114,6 @@ public interface TkSysJobService {
114 114 * @return 定时任务信息
115 115 */
116 116 SysJobDTO findSysJobBySourceId(String sourceId);
  117 +
  118 + void init() throws SchedulerException;
117 119 }
... ...