RunnerPoolTest.java
4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.github.ltsopensource.tasktracker.runner;
import com.github.ltsopensource.core.cluster.Config;
import com.github.ltsopensource.core.cluster.LTSConfig;
import com.github.ltsopensource.core.constant.Environment;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.ec.injvm.InjvmEventCenter;
import com.github.ltsopensource.tasktracker.domain.Response;
import com.github.ltsopensource.tasktracker.domain.TaskTrackerAppContext;
import com.github.ltsopensource.tasktracker.expcetion.NoAvailableJobRunnerException;
import com.github.ltsopensource.tasktracker.monitor.TaskTrackerMStatReporter;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Robert HG (254963746@qq.com) on 2/21/16.
*/
public class RunnerPoolTest {
@Test
public void testInterruptor() throws NoAvailableJobRunnerException {
LTSConfig.setEnvironment(Environment.UNIT_TEST);
Config config = new Config();
config.setWorkThreads(10);
config.setIdentity("fjdaslfjlasj");
TaskTrackerAppContext appContext = new TaskTrackerAppContext();
appContext.setConfig(config);
appContext.setEventCenter(new InjvmEventCenter());
appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
// appContext.setJobRunnerClass(NormalJobRunner.class);
RunnerPool runnerPool = new RunnerPool(appContext);
appContext.setRunnerPool(runnerPool);
TaskTrackerMStatReporter monitor = new TaskTrackerMStatReporter(appContext);
appContext.setMStatReporter(monitor);
RunnerCallback callback = new RunnerCallback() {
@Override
public JobMeta runComplete(Response response) {
System.out.println("complete:" + JSON.toJSONString(response));
return null;
}
};
Job job = new Job();
job.setTaskId("fdsafas");
JobMeta jobMeta = new JobMeta();
jobMeta.setJobId("111111");
jobMeta.setJob(job);
runnerPool.execute(jobMeta, callback);
System.out.println(runnerPool.getAvailablePoolSize());
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5s之后停止
runnerPool.stopWorking();
while (true) {
try {
// 如果这个数字还在增长,表示线程还在执行,测试发现 NormalJobRunner 确实还在执行 TestInterruptorJobRunner 会停止
System.out.println(NormalJobRunner.l);
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(runnerPool.getAvailablePoolSize());
}
}
@Test
public void test() throws NoAvailableJobRunnerException {
int workThreads = 5;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(workThreads, workThreads, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), // 直接提交给线程而不保持它们
new NamedThreadFactory("test"),
new ThreadPoolExecutor.AbortPolicy());
final List<Thread> list = new CopyOnWriteArrayList<Thread>();
for (int i = 0; i < 12; i++) {
submitJob(threadPoolExecutor, list);
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.get(0).interrupt();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
submitJob(threadPoolExecutor, list);
submitJob(threadPoolExecutor, list);
try {
Thread.sleep(5000000000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void submitJob(ThreadPoolExecutor threadPoolExecutor, final List<Thread> list) {
try {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
list.add(Thread.currentThread());
try {
while (true) {
Thread.sleep(2000L);
System.out.println("=====" + Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
System.out.println("-- " + e.getMessage());
}
}
}