当系统启动时,首先做的便是zookeeper的连接,这一步在ZookeeperRegistryCenter的init方法中完成,源码精简为:
@Override
public void init() {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
.namespace(zkConfig.getNamespace());
client = builder.build();
client.start();
int maxWaitTime = zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries();
if (!client.blockUntilConnected(maxWaitTime, TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
}
代码很简单,这里使用了Apache Curator。
JobScheduler是elastic-job-lite中核心的对象,负责job的注册、启动等。代码分布在其构造器和init方法中,下面根据功能点进行说明。
private JobScheduler(final CoordinatorRegistryCenter regCenter,
final LiteJobConfiguration liteJobConfig,
final JobEventBus jobEventBus,
final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(
liteJobConfig.getJobName(), new JobInstance()
);
}
这货是负责本地各种任务相关的映射信息,下面是其类图:
addJobInstance便是将初始化的任务信息放到jobInstanceMap中。JobInstance中包含了任务的String类型的唯一ID,其生成方法:
public JobInstance() {
jobInstanceId = IpUtils.getIp() + DELIMITER +
ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
}
所以:ID由IP + 进程号的方式组成。
elastic-job留下的允许我们监控任务的启动和结束的接口:
前面说的Job实例创建只是本地的对象操作,尚未与zookeeper有交互。GuaranteeService从作者的角度来说叫做
保证分布式任务全部开始和结束状态的服务.
但是个人觉着更准确来说是封装了任务节点的数据访问和配置。如下:
JobScheduler构造器通过调用setGuaranteeServiceForElasticJobListeners方法创建了一个此对象:
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter,
final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}
此对象最终只能被AbstractDistributeOnceElasticJobListener(只执行一次的监听器)引用到。
即SchedulerFacade类,其构造器创建了一大坨Service:
public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
this.jobName = jobName;
configService = new ConfigurationService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
monitorService = new MonitorService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
估计后续的操作都要依赖于这些Service完成。
即LiteJobFacade,也有一坨Service:
public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners,
final JobEventBus jobEventBus) {
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionContextService = new ExecutionContextService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
failoverService = new FailoverService(regCenter, jobName);
this.elasticJobListeners = elasticJobListeners;
this.jobEventBus = jobEventBus;
}
从这里便开始与zookeeper有交互了。
即:
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
}
这里的逻辑可以概括为:
- 如果ZK上此任务已经存在(比如重启)并且没有启用覆盖,那么将以ZK的配置为准。
- 否则以本地为准。
SchedulerFacade.updateJobConfiguration:
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
configService.persist(liteJobConfig);
return configService.load(false);
}
persist方法实现:
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
zookeeper的存储结构如下:
config节点保存的数据其实就是LiteJobConfiguration对象序列化后得到的JSON串,如下图:
elastic-job本地基于Quartz实现,这一步便是创建一个Quartz调度器,实现位于JobScheduler的init方法:
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),
liteJobConfigFromRegCenter.getJobName()
);
核心为createScheduler方法:
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
没啥好说的。
这一步是启动对任务根路径的监听,在这里就是/elastic-job/simpleElasticJob:
@Override
public void addCacheData(final String cachePath) {
TreeCache cache = new TreeCache(client, cachePath);
cache.start();
}
SchedulerFacade.registerStartUpInfo:
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
这里的信息量略大。
public void startAllListeners() {
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
这些监听器负责对zookeeper节点数据变化的处理,后面再对这一部分详细展开。
利用Curator选主的代码如下:
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
选主利用的是/elastic-job/simpleElasticJob/leader/election/latch
节点,如果当前节点成为主节点,那么将任务ID保存到/elastic-job/simpleElasticJob/leader/election/instance
节点,如下图:
这一步是标记当前节点处于上线状态,其实就是建立一个如下节点,节点值为空串:
这一步建立了一个如下临时节点:
作用感觉和上面略有重叠,有待后续确认。
指的是持久化节点/elastic-job/simpleElasticJob/leader/sharding/necessary
,值为空,标志需要重新分片,具体作用后面确认。
指的是ReconcileService会启动对节点/elastic-job/simpleElasticJob/config
的监听,前面提到过,此节点保存的是任务的配置JSON,调解的原理后面再详细介绍。
位于JobScheduler.init:
public void init() {
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
这里其实就是在本地启动了Quartz去执行:
public void scheduleJob(final String cron) {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start();
}
前面讲任务的启动当作了一个黑盒子,其实里面包含了相当多的信息。Quartz其实根据我们指定的任务类去执行相应的回调方法,然而这里设置的任务类并不是我们前面自定义的SimpleElasticJob,而是:
Job是Quartz提供的任务抽象接口,显然LiteJob的execute方法便是入口了:
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
所以创建任务执行器的逻辑也就可以想象的到了:根据任务的类型去创建对应的执行器。
当我们设置了本地时间和ZK时间差检查时,在每次任务执行前会进行此检查,开启的方法如下:
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
.overwrite(true)
.maxTimeDiffSeconds(10)
.build();
这里只需要弄清楚一点,系统是如何获取ZK时间的。
@Override
public long getRegistryCenterTime(final String key) {
long result = 0L;
persist(key, "");
result = client.checkExists().forPath(key).getMtime();
return result;
}
Zookeeper中mtime属性表示节点的最后修改时间,所以先创建一个节点,然后获取其最后修改时间,也就是Zookeeper的时间了。
这里被创建的节点的路径是:/elasticjob/simpleElasticJob/simpleElasticJob/systemTime/current
Elastic-job本地的任务调度实际上只由一个线程完成,JobScheduler.createScheduler:
private Scheduler createScheduler() {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
Scheduler result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
return result;
}
getBaseQuartzProperties中有如下的属性:
result.put("org.quartz.threadPool.threadCount", "1");
这就意味着,如果任务在配置的间隔内没有完成,那么下一次任务将不会执行。
为每个分片创建临时节点:
/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/running
,其中0是分片号。
源码位于AbstractElasticJobExecutor.process:
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 这里便是调用我们自己job的process方法
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
可以看出:
- 如果有多个分片,那么将会启动对应的线程分别执行每个分片。
- 主线程会等待所有分片执行完毕。
- 这里直接在本地启动了所有分片,那么节点之间的并行/负载均衡如何实现?
即删除启动记录时创建的临时节点: /elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/running
.
以ShardingService的setReshardingFlag方法作为切入点,观察此方法的调用点,如下:
可以得出结论,触发重新分片的时间点为:
- 任务启动时
- 节点发生变化时:有新节点上线或者有节点宕机
- 分片数发生变化,这一点通过在页面上更新任务配置达到
重新分片标志设置后什么时候对其作出响应呢?答案是在本地进行调度执行时,入口位于AbstractElasticJobExecutor的executor方法,如下:
public final void execute() {
// ...
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// ...
}
具体的源码不再展开,其逻辑概括为:
-
重新分片其实就是调用我们设置的分片策略类,然后将分片的结果保存到zookeeper中
-
只有主节点才可以进行分片,如何判断当前节点是否是主节点呢,其实zookeeper路径:
/elastic-job/simpleElasticJob/leader/election/instance
内保存着主节点的唯一ID -
如果当前正有任务在执行,那么将等待任务执行完毕再进行重新分片工作
这其中用到了Zookeeper的事物特性。
Elastic-job支持自动failover机制,当一个节点宕机时,组件会自动进行重试。这里监听的是临时节点:
/elastic-job/simpleElasticJob/instances/172.23.128.233@-@27194
也只能是临时节点。
事件监听由JobCrashedJobListener完成,其核心逻辑如下:
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 如果是当前节点,直接返回,话说这怎么可能? 除非手动删除
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 得到被删除节点的分片,准备failover
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
这一步将在Zookeeper中创建如下节点:
/elastic-job/simpleElasticJob/leader/failover/items/0
核心的逻辑位于FailoverService.failoverIfNecessary:
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
executeInLeader将导致failover在主节点执行。
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
注意,这里的"主节点"并不是elastic-job的主节点,而是重新选举的另一个主节点(因为选举用的节点不同)。同时因为这里用的是自动释放的语法,所以在逻辑执行完成之后将会自动释放领导权,故所有节点均可以进入callback的execute方法执行。
FailoverLeaderExecutionCallback:
@Override
public void execute() {
// 判断任务是否已被关闭,或者当此节点成为leader时,failover已经执行完毕,此时直接返回即可
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// failover的分片
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
// 删除此分片的failver标记
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// 手动触发
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
一目了然。
首先明确一下,elastic-job的misfire其实就是对quartz同名机制的包装。理解了Quartz的misfire机制elastic-job也就迎刃而解了。
Quartz的misfire可能由两种情况导致:
- 业务逻辑在Quartz执行线程中执行,同时执行时间太长导致以至于在下一个执行时间点到来时还没有完成,这就是一次misfire。
- 到达执行时间点时没有空闲的线程资源去执行。
Quartz有如下参数:
org.quartz.jobStore.misfireThreshold=1
下面是其官方文档的定义:
The the number of milliseconds the scheduler will ‘tolerate’ a trigger to pass its next-fire-time by, before being considered “misfired”. The default value (if you don’t make an entry of this property in your configuration) is 60000 (60 seconds).
Elastic-job在JobScheduler中对此参数进行了配置:
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.jobStore.misfireThreshold", "1");
return result;
}
即超过既定时间点1毫秒就认为是misfire,可见是相当严格的。
Quartz提供了监听器TriggerListener对misfire进行监控,elastic-job定义了自己的监听器对其进行记录。JobScheduler.createScheduler:
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
监听器JobTriggerListener的核心逻辑:
@Override
public void triggerMisfired(final Trigger trigger) {
if (null != trigger.getPreviousFireTime()) {
executionService.setMisfire(shardingService.getLocalShardingItems());
}
}
其实就是为当前节点拥有的每一个分片创建一个misfire标志节点:
/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/misfile
此部分源码位于AbstractElasticJobExecutor.execute:
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}