JobTrackerFactoryBean.java
4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.github.ltsopensource.spring;
import com.github.ltsopensource.autoconfigure.PropertiesConfigurationFactory;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.listener.MasterChangeListener;
import com.github.ltsopensource.jobtracker.JobTracker;
import com.github.ltsopensource.jobtracker.JobTrackerBuilder;
import com.github.ltsopensource.core.properties.JobTrackerProperties;
import com.github.ltsopensource.jobtracker.support.OldDataHandler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
/**
* JobTracker Spring Bean 工厂类
*
* @author Robert HG (254963746@qq.com) on 8/4/15.
*/
public class JobTrackerFactoryBean implements FactoryBean<JobTracker>,
InitializingBean, DisposableBean {
private JobTracker jobTracker;
private boolean started;
/**
* 集群名称
*/
private String clusterName;
/**
* zookeeper地址
*/
private String registryAddress;
/**
* master节点变化监听器
*/
private MasterChangeListener[] masterChangeListeners;
/**
* 额外参数配置
*/
private Properties configs = new Properties();
/**
* 监听端口
*/
private Integer listenPort;
private String identity;
private String bindIp;
/**
* 老数据处理接口
*/
private OldDataHandler oldDataHandler;
private String[] locations;
@Override
public JobTracker getObject() throws Exception {
return jobTracker;
}
@Override
public Class<?> getObjectType() {
return JobTracker.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
JobTrackerProperties properties = null;
if (locations == null || locations.length == 0) {
properties = new JobTrackerProperties();
properties.setListenPort(listenPort);
properties.setClusterName(clusterName);
properties.setRegistryAddress(registryAddress);
properties.setBindIp(bindIp);
properties.setIdentity(identity);
properties.setConfigs(CollectionUtils.toMap(configs));
} else {
properties = PropertiesConfigurationFactory.createPropertiesConfiguration(JobTrackerProperties.class, locations);
}
jobTracker = JobTrackerBuilder.buildByProperties(properties);
if (oldDataHandler != null) {
jobTracker.setOldDataHandler(oldDataHandler);
}
if (masterChangeListeners != null) {
for (MasterChangeListener masterChangeListener : masterChangeListeners) {
jobTracker.addMasterChangeListener(masterChangeListener);
}
}
}
/**
* 可以自己得到JobTracker对象后调用,也可以直接使用spring配置中的init属性指定该方法
*/
public void start() {
if (!started) {
jobTracker.start();
started = true;
}
}
@Override
public void destroy() throws Exception {
jobTracker.stop();
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setRegistryAddress(String registryAddress) {
this.registryAddress = registryAddress;
}
public void setMasterChangeListeners(MasterChangeListener... masterChangeListeners) {
this.masterChangeListeners = masterChangeListeners;
}
public void setConfigs(Properties configs) {
this.configs = configs;
}
public void setOldDataHandler(OldDataHandler oldDataHandler) {
this.oldDataHandler = oldDataHandler;
}
public void setListenPort(Integer listenPort) {
this.listenPort = listenPort;
}
public void setIdentity(String identity) {
this.identity = identity;
}
public void setBindIp(String bindIp) {
this.bindIp = bindIp;
}
public void setLocations(String... locations) {
this.locations = locations;
}
}