RunnerPool.java
6.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package com.github.ltsopensource.tasktracker.runner;
import com.github.ltsopensource.core.constant.EcTopic;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.ec.EventInfo;
import com.github.ltsopensource.ec.EventSubscriber;
import com.github.ltsopensource.ec.Observer;
import com.github.ltsopensource.tasktracker.domain.TaskTrackerAppContext;
import com.github.ltsopensource.tasktracker.expcetion.NoAvailableJobRunnerException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Robert HG (254963746@qq.com) on 8/14/14.
* 线程池管理
*/
public class RunnerPool {
private final Logger LOGGER = LoggerFactory.getLogger(RunnerPool.class);
private ThreadPoolExecutor threadPoolExecutor = null;
private RunnerFactory runnerFactory;
private TaskTrackerAppContext appContext;
private RunningJobManager runningJobManager;
public RunnerPool(final TaskTrackerAppContext appContext) {
this.appContext = appContext;
this.runningJobManager = new RunningJobManager();
threadPoolExecutor = initThreadPoolExecutor();
runnerFactory = appContext.getRunnerFactory();
if (runnerFactory == null) {
runnerFactory = new DefaultRunnerFactory(appContext);
}
// 向事件中心注册事件, 改变工作线程大小
appContext.getEventCenter().subscribe(
new EventSubscriber(appContext.getConfig().getIdentity(), new Observer() {
@Override
public void onObserved(EventInfo eventInfo) {
setWorkThread(appContext.getConfig().getWorkThreads());
}
}), EcTopic.WORK_THREAD_CHANGE);
}
private ThreadPoolExecutor initThreadPoolExecutor() {
int workThreads = appContext.getConfig().getWorkThreads();
return new ThreadPoolExecutor(workThreads, workThreads, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), // 直接提交给线程而不保持它们
new NamedThreadFactory("JobRunnerPool"),
new ThreadPoolExecutor.AbortPolicy());
}
public void execute(JobMeta jobMeta, RunnerCallback callback) throws NoAvailableJobRunnerException {
try {
threadPoolExecutor.execute(
new JobRunnerDelegate(appContext, jobMeta, callback));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receive job success ! " + jobMeta);
}
} catch (RejectedExecutionException e) {
LOGGER.warn("No more thread to run job .");
throw new NoAvailableJobRunnerException(e);
}
}
/**
* 得到当前可用的线程数
*/
public int getAvailablePoolSize() {
return threadPoolExecutor.getMaximumPoolSize() - threadPoolExecutor.getActiveCount();
}
public void setWorkThread(int workThread) {
if (workThread == 0) {
throw new IllegalArgumentException("workThread can not be zero!");
}
threadPoolExecutor.setMaximumPoolSize(workThread);
threadPoolExecutor.setCorePoolSize(workThread);
LOGGER.info("workThread update to {}", workThread);
}
/**
* 得到最大线程数
*/
public int getWorkThread() {
return threadPoolExecutor.getCorePoolSize();
}
public RunnerFactory getRunnerFactory() {
return runnerFactory;
}
/**
* 执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
* 它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,
* 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。
* 所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
* 特殊的时候可以通过使用{@link InterruptibleJobRunner}来解决
*/
public void stopWorking() {
try {
threadPoolExecutor.shutdownNow();
Thread.sleep(1000);
threadPoolExecutor = initThreadPoolExecutor();
LOGGER.info("stop working succeed ");
} catch (Throwable t) {
LOGGER.error("stop working failed ", t);
}
}
public void shutDown() {
try {
threadPoolExecutor.shutdownNow();
LOGGER.info("stop working succeed ");
} catch (Throwable t) {
LOGGER.error("stop working failed ", t);
}
}
/**
* 用来管理正在执行的任务
*/
public class RunningJobManager {
private final ConcurrentMap<String/*jobId*/, JobRunnerDelegate> JOBS = new ConcurrentHashMap<String, JobRunnerDelegate>();
public void in(String jobId, JobRunnerDelegate jobRunnerDelegate) {
JOBS.putIfAbsent(jobId, jobRunnerDelegate);
}
public void out(String jobId) {
JOBS.remove(jobId);
}
public boolean running(String jobId) {
return JOBS.containsKey(jobId);
}
/**
* 返回给定list中不存在的jobId
*/
public List<String> getNotExists(List<String> jobIds) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ask jobs: " + jobIds + " Running jobs :" + JOBS.keySet());
}
List<String> notExistList = new ArrayList<String>();
for (String jobId : jobIds) {
if (!running(jobId)) {
notExistList.add(jobId);
}
}
return notExistList;
}
public void terminateJob(String jobId) {
JobRunnerDelegate jobRunnerDelegate = JOBS.get(jobId);
if (jobRunnerDelegate != null) {
try {
jobRunnerDelegate.currentThread().interrupt();
} catch (Throwable e) {
LOGGER.error("terminateJob [" + jobId + "] error", e);
}
}
}
}
public RunningJobManager getRunningJobManager() {
return runningJobManager;
}
}