JobFinishHandler.java
7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package com.github.ltsopensource.jobtracker.complete;
import com.github.ltsopensource.biz.logger.domain.JobLogPo;
import com.github.ltsopensource.biz.logger.domain.LogType;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.constant.Level;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.domain.JobRunResult;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.support.CronExpressionUtils;
import com.github.ltsopensource.core.support.JobDomainConverter;
import com.github.ltsopensource.core.support.JobUtils;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.queue.domain.JobPo;
import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
import java.util.Date;
import java.util.List;
/**
* @author Robert HG (254963746@qq.com) on 11/11/15.
*/
public class JobFinishHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(JobFinishHandler.class);
private JobTrackerAppContext appContext;
public JobFinishHandler(JobTrackerAppContext appContext) {
this.appContext = appContext;
}
public void onComplete(List<JobRunResult> results) {
if (CollectionUtils.isEmpty(results)) {
return;
}
for (JobRunResult result : results) {
JobMeta jobMeta = result.getJobMeta();
// 当前完成的job是否是重试的
boolean isRetryForThisTime = Boolean.TRUE.toString().equals(jobMeta.getInternalExtParam(Constants.IS_RETRY_JOB));
boolean isOnce = Boolean.TRUE.toString().equals(jobMeta.getInternalExtParam(Constants.ONCE));
if (jobMeta.getJob().isCron()) {
// 是 Cron任务
if (isOnce) {
finishNoReplyPrevCronJob(jobMeta);
} else {
finishCronJob(jobMeta.getJobId());
}
} else if (jobMeta.getJob().isRepeatable()) {
if (isOnce) {
finishNoReplyPrevRepeatJob(jobMeta, isRetryForThisTime);
} else {
finishRepeatJob(jobMeta.getJobId(), isRetryForThisTime);
}
}
// 从正在执行的队列中移除
appContext.getExecutingJobQueue().remove(jobMeta.getJobId());
}
}
private void finishCronJob(String jobId) {
JobPo jobPo = appContext.getCronJobQueue().getJob(jobId);
if (jobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(jobPo.getCronExpression());
if (nextTriggerTime == null) {
// 从CronJob队列中移除
appContext.getCronJobQueue().remove(jobId);
jobRemoveLog(jobPo, "Cron");
return;
}
// 表示下次还要执行
try {
jobPo.setTaskTrackerIdentity(null);
jobPo.setIsRunning(false);
jobPo.setTriggerTime(nextTriggerTime.getTime());
jobPo.setGmtModified(SystemClock.now());
jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo));
appContext.getExecutableJobQueue().add(jobPo);
} catch (DupEntryException e) {
LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
}
}
private void finishNoReplyPrevCronJob(JobMeta jobMeta) {
JobPo jobPo = appContext.getCronJobQueue().getJob(jobMeta.getJob().getTaskTrackerNodeGroup(), jobMeta.getRealTaskId());
if (jobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(jobPo.getCronExpression());
if (nextTriggerTime == null) {
// 检查可执行队列中是否还有
if (appContext.getExecutableJobQueue().countJob(jobPo.getRealTaskId(), jobPo.getTaskTrackerNodeGroup()) == 0) {
// TODO 检查执行中队列是否还有
// 从CronJob队列中移除
appContext.getCronJobQueue().remove(jobPo.getJobId());
jobRemoveLog(jobPo, "Cron");
}
}
}
private void finishNoReplyPrevRepeatJob(JobMeta jobMeta, boolean isRetryForThisTime) {
JobPo jobPo = appContext.getRepeatJobQueue().getJob(jobMeta.getJob().getTaskTrackerNodeGroup(), jobMeta.getRealTaskId());
if (jobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
if (jobPo.getRepeatCount() != -1 && (jobPo.getRepeatedCount() + 1) >= jobPo.getRepeatCount()) { //最后一次执行时 repeatedCount+1=repeatCount
// 已经重试完成, 那么删除, 这里可以不用check可执行队列是否还有,因为这里依赖的是计数
appContext.getRepeatJobQueue().remove(jobPo.getJobId());
jobRemoveLog(jobPo, "Repeat");
return;
}
// 如果当前完成的job是重试的,那么不要增加repeatedCount
if (!isRetryForThisTime) {
// 更新repeatJob的重复次数
final int jobQueueRepeatedCount = appContext.getRepeatJobQueue().incRepeatedCount(jobPo.getJobId());
if (jobQueueRepeatedCount >= jobPo.getRepeatCount()) {
appContext.getRepeatJobQueue().remove(jobPo.getJobId());
jobRemoveLog(jobPo, "Repeat");
}
}
}
private void finishRepeatJob(String jobId, boolean isRetryForThisTime) {
JobPo jobPo = appContext.getRepeatJobQueue().getJob(jobId);
if (jobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
if (jobPo.getRepeatCount() != -1 && (jobPo.getRepeatedCount() + 1) >= jobPo.getRepeatCount()) { //最后一次执行时 repeatedCount+1=repeatCount
// 已经重试完成, 那么删除
appContext.getRepeatJobQueue().remove(jobId);
jobRemoveLog(jobPo, "Repeat");
return;
}
int repeatedCount = jobPo.getRepeatedCount();
// 如果当前完成的job是重试的,那么不要增加repeatedCount
if (!isRetryForThisTime) {
// 更新repeatJob的重复次数
repeatedCount = appContext.getRepeatJobQueue().incRepeatedCount(jobId);
}
if (repeatedCount == -1) {
// 表示任务已经被删除了
return;
}
long nexTriggerTime = JobUtils.getRepeatNextTriggerTime(jobPo);
try {
jobPo.setRepeatedCount(repeatedCount + 1); //再生成可执行job时,从RepeatJobQueue更新后的repeatedCount再加1
jobPo.setTaskTrackerIdentity(null);
jobPo.setIsRunning(false);
jobPo.setTriggerTime(nexTriggerTime);
jobPo.setGmtModified(SystemClock.now());
jobPo.setInternalExtParam(Constants.EXE_SEQ_ID, JobUtils.generateExeSeqId(jobPo));
appContext.getExecutableJobQueue().add(jobPo);
} catch (DupEntryException e) {
LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
}
}
private void jobRemoveLog(JobPo jobPo, String type) {
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
jobLogPo.setSuccess(true);
jobLogPo.setLogType(LogType.DEL);
jobLogPo.setLogTime(SystemClock.now());
jobLogPo.setLevel(Level.INFO);
jobLogPo.setMsg(type + " Job Finished");
appContext.getJobLogger().log(jobLogPo);
}
}