任务调度

前言


在实现实验的算法的时候,要想在一个固定间隔时间对代码中某些变量有一个监控或者固定间隔时间处理一些事务逻辑,必不可少的就是定时的任务调度了。同样的,在做项目的过程中,这种定时的任务调度就更加广泛了,比如说间隔一定的时间给用户发送消息,定时清理掉一些数据的任务等等。这边就介绍一下之前用到的三种任务调度方式Timer、ScheduledExecutorService以及Quartz。

一、Timer

1.简单介绍

timer是一个JDK原生的定时工具,其可以根据给定的时间间隔定时调度任务。主要有以下几个核心函数。

1
2
3
4
5
6
schedule(TimerTask task, long delay)
schedule(TimerTask task, Date time)
schedule(TimerTask task, long delay, long period)
schedule(TimerTask task, Date firstTime, long period)
scheduleAtFixedRate(TimerTask task, long delay, long period)
scheduleAtFixedRate(TimerTask task, Date firstTime, long period)

1.TimerTask是其定时间隔执行的任务,要继承TimerTask并重写run方法。

2.time/firstTime:首次执行任务的时间

3.period:周期性执行Task的时间间隔,单位是毫秒

4.delay:执行task任务前的延时时间,单位是毫秒

这边再简单说一下schedulescheduleAtFixedRate两个函数的区别:scheduleAtFixedRate每次执行时间为上一个任务开始起向后推一个period时间间隔。也就是下一次开始的时间是上一次任务开始的时间点加上间隔时间,因此不会存在延后开始执行的情况,但是有可能会存在并发执行的问题,因为上一个可能在下一个任务开始的时候上一个任务还没有结束。schedule每次执行时间为上一个任务结束之后向后推一个peroid时间,若上一个任务结束的晚,那么就会存在延后的情况。若上一个任务结束的早,那么就会存在提前开始的情况。

2.简单使用

比如我这边要实现的功能是:实现求和并按照时间间隔监控此时的sum是多少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class testTimer {
public static Timer timer = new Timer();
Integer sum = 0;

public void run(){
MyTask myTask = new MyTask();
myTask.run();
}

class MyTask implements Runnable {
int a = 0;
@Override
public void run() {
timer.scheduleAtFixedRate(new CalcuTask(), 0, 5);
System.out.println("执行任务");
for (int i = 0 ; i < 1000000 ; ++i){
a++;
sum+=a;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class CalcuTask extends TimerTask {
@Override
public void run() {
System.out.println("统计时间");
System.out.println(sum);
System.out.println(System.currentTimeMillis());

long i = 0;
while (i < 10000){
i++;
}

}
}

public static void main(String[] args) {
testScheduledExecutorService ti = new testScheduledExecutorService();
ti.run();
}
}

使用其实非常简单,只要定义好定时执行的类CalcuTask并将其继承于TimerTask,然后根据需求判断调用scheduleAtFixedRate函数还是schedule函数即可。主要来看下源码是怎么实现的。

3.源码实现

在没有看源码实现之前,我能想到的实现方式会是这样:Timer也就是一个线程每隔一定的时间去监控另外一个线程的状态,也就是每隔一定的时间去起一个线程来执行监控。看一下Timer的实现方式

主要有四个类

1
2
3
4
Timer : 创建定时任务调度的主类  
TimerThread:Timer的一个内部类,继承Thread,负责线程任务的调度
TaskQueue: Timer的一个内部类,调度的任务的队列
TimerTask: 实现Runnable接口的抽象类,负责具体任务的执行逻辑

看过源码后发现,其实Timer实现背后是基于一个单线程,所有任务都是基于同一个线程来调度的和串行执行,当监控任务的执行中比较复杂的时候就会影响到下一次的监控,造成延迟。

在Timer类中创建一个TimerThread对象和TaskQueue对象,其中TaskQueue中可以存放多个TimerTask,本质就是一个TimerTask类型的优先级队列,注解中写的是balanced binary heap,也可以理解为一个小顶堆,是以nextExecutionTime参数进行排序的,每次出队列的是第一个TimerTask,也就是当前nextExecutionTime最小的一个定时任务。

Timer类中的两个参数,第一个参数是一个队列的类,队列中存放的是类型是TimerTask的实例。第二个参数是一个线程,里面存放的是TaskQueue实例。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as appropriate,
* and removing them from the queue when they're obsolete.
*/
private final TaskQueue queue = new TaskQueue();

/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);

四个构造函数。可以看成只有两个 ,构造函数中设置计时器的名字,设置是否为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Timer() {
this("Timer-" + serialNumber());
}

public Timer(boolean isDaemon) {
this("Timer-" + serialNumber(), isDaemon);
}

public Timer(String name) {
thread.setName(name);
thread.start();
}

public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}

六种定时方法及其解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//调用后延迟delay秒执行一次。
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}
//调用后在某个时间执行需求,只执行一次
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}
//调用后延迟delay秒执行,每隔period秒执行一次
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
//调用后在某个时间开始执行,每隔period秒执行一次
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}
//调用后延迟delay秒执行,每隔period秒执行一次
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}
//调用后在某个时间开始执行,每隔period秒执行一次
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), period);
}

sched方法。主要就是将当前task放入队列中,执行的过程中对其加锁,防止其他线程的影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");

// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;

synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");

synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
//将task放入队列
queue.add(task);
//这里的意思是如果当前调度的这个任务排在队列的头部,就唤醒当前队列wait的线程。
if (queue.getMin() == task)
queue.notify();
}
}

其中queue的add方法是这样的。

1
2
3
4
5
6
7
8
9
void add(TimerTask task) {
// Grow backing store if necessary
//如果新增后达到长度的限制,则扩容为原来的两倍。
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);

queue[++size] = task;
fixUp(size);
}

下面来看一下TimerThread类的run方法是如何执行的。

1
2
3
4
5
6
7
8
9
10
11
12
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
//这里表示如果有一个任务异常,整个过程就终止了。
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}

关键的mainLoop函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
//锁住当前的队列
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die

// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
//得到距离当前时间最近的一个任务,并锁住任务,防止其他线程的访问。
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
//根据两个时间来判断是否执行该任务
if (taskFired = (executionTime<=currentTime)) {
//如果period为0,为单次任务,从队列中移除,并设置任务状态为已执行。
if (task.period == 0) { // Non-repeating, remove
// 一个要注意的点是queue队列的索引值是从1开始的,而不是0。
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
//如果是周期性任务,重新排列任务列表。
//调用的是schedule还是scheduleAtFixedRate其实是在这边进行判断的。
//从这两类函数的调用就可以看出来,schedule给period赋值了-period,是个小于0的 值。scheduleAtFixedRate函数的period是个大于0的数。所以相当于如果是schedule那么 就用当前时间加上时间间隔,如果是scheduleAtFixedRate,就用执行时间加上时间间隔。所 以在这边就很容易的看出在单线程的情况下,schedule会造成时间的延误, scheduleAtFixedRate则不会。
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
//如果任务还没有到出发时间则等待,等待的时间差自然就是executionTime - currentTime。
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
//如果任务触发,则用run方法去执行timer的task,在主线程中执行。
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}

Timer的源码分析到这里基本就完了,总结一下:第一,Timer是一个单线程的定时器,从大方面来说主要能解决两种定时的问题,一种是到某个时间执行一次或者按照间隔执行定时任务,另一种是调用后延迟delay秒开始执行,这个延迟可以自己设定。第二,schedule函数和scheduleAtFixedRate函数的区别。第三,由于Timer是基于单线程的,当定时任务很复杂的时候,就不太适合用这个来做了,容易造成任务的延迟。这时候就需要用到ScheduledExecutorService这种线程池的方式来进行定时的任务调度。

二、ScheduledExecutorService

1.简单介绍

ScheduledExecutorService能克服Timer的不足,因为ScheduledExecutorService是基于线程池的,所以能支持多个线程并发执行,之间不受影响。至于原理是什么,可以看我的这篇博客 线程与线程池

ScheduledExecutorService主要有四个函数

1
2
3
4
5
6
7
8
//这个方法将在给定的延迟时间后执行callable。
schedule(Callable<V> callable, long delay, TimeUnit unit);
//这个方法类似于用Callable作为参数的版本(上面的方法)
schedule(Runnable command, long delay, TimeUnit unit);
//这个方法可以周期性的执行任务。任务在initialDelay时间后第一次执行,然后每次周期循环执行。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//这个方法与scheduleAtFixedRate()方法类似,只是对period理解是不同的。scheduleAtFixedRate()方法的period指的是上一个任务开始执行到下一个任务开始执行的时间间隔。然而,这个方法的period指的是上一个任务执行完到下一个任务开始执行之间的时间间隔。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

这边就用ScheduledExecutorService简单的实现上面Timer一样的功能。

2.简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class testScheduledExecutorService {
public static ScheduledExecutorService mScheduledExecutorService = Executors.newScheduledThreadPool(16);
Integer sum = 0;

public void run(){
MyTask myTask = new MyTask();
myTask.run();
}

class MyTask implements Runnable {
int a = 0;
@Override
public void run() {
mScheduledExecutorService.scheduleAtFixedRate(new CalcuTask(), 0, 5, TimeUnit.MILLISECONDS);
System.out.println("执行任务");
for (int i = 0 ; i < 1000000 ; ++i){
a++;
sum+=a;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class CalcuTask implements Runnable{
@Override
public void run() {
System.out.println("统计时间");
System.out.println(sum);
System.out.println(System.currentTimeMillis());
long i = 0;
while (i < 10000){
i++;
}

}
}

public static void main(String[] args) {
testScheduledExecutorService ti = new testScheduledExecutorService();
ti.run();
}
}

三、Quartz

1.简单介绍

虽然ScheduledExecutorService对Timer进行了线程池的改进,但是依然无法满足复杂的定时任务调度场景。因此OpenSymphony提供了强大的开源任务调度框架:Quartz。Quartz是纯Java实现,而且作为Spring的默认调度框架,由于Quartz的强大的调度功能、灵活的使用方式、还具有分布式集群能力,可以说Quartz出马,可以搞定一切定时任务调度。比方说火车订票之后超过三十分钟未支付为判断为无效,邮件的定时发送等等场景。

Quartz主要由三个基础部分组成

  • 调度器:Scheduler
  • 任务:JobDetail
  • 触发器:Trigger,包括SimpleTrigger和CronTrigger

其中任务就是最基本的要实现的部分,比方说上述的定时发送邮件,定时监控有没有支付成功等等。其次,需要有一个触发器去触发执行,指定运行的时间、执行的间隔和执行的次数等等。最后用调度器将任务和触发器结合起来,做到用指定的触发器去触发指定的任务。

2.简单使用

比如我要实现一个定时一秒钟输出一句简单的内容,其中这句话是由外部传入Job的,可以这么实现。

定时的实现类testQuartz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package test.Quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.concurrent.TimeUnit;

/**
* @Author: hqf
* @description:
* @Data: Create in 19:54 2019/9/9
* @Modified By:
*/
public class testQuartz {
public static void main(String[] args) throws SchedulerException, InterruptedException {
// 1、创建调度器Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
// 2、创建JobDetail实例,并与MyJob类绑定(Job执行内容)
JobDetail jobDetail = JobBuilder.newJob(MyJob.class)
.withIdentity("job1", "group1").build();
// 这边可以向Job传递信息
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("Value", "HelloWorld");
// 3、构建Trigger实例,每隔1s执行一次
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "triggerGroup1")
.startNow()//立即生效
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)//每隔1s执行一次
.repeatForever()).build();//一直执行
//4、执行
scheduler.scheduleJob(jobDetail, trigger);
System.out.println("--------scheduler start ! ------------");
scheduler.start();

//睡眠
TimeUnit.MINUTES.sleep(1);
scheduler.shutdown();
System.out.println("--------scheduler shutdown ! ------------");
}
}

Job类MyJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package test.Quartz;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
* @Author: hqf
* @description:
* @Data: Create in 19:59 2019/9/9
* @Modified By:
*/
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap mergedJobDataMap = jobExecutionContext.getMergedJobDataMap();
String value =(String) mergedJobDataMap.get("Value");
System.out.println(value);
}
}

在控制台就能每隔一秒钟输出一次内容。也就是每次都是新的一个Job去执行这个任务,那么现在如果我要一秒钟对一个int值累加1该如何实现,在Job中给我们提供了@PersistJobDataAfterExecution,就能将上一次的结果拿到下一次来用,并且为了避免并发可能造成数据混乱的错误,还应该加上@DisallowConcurrentExecution。

Job类改为如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package test.Quartz;
import org.quartz.*;

/**
* @Author: hqf
* @description:
* @Data: Create in 19:59 2019/9/9
* @Modified By:
*/

@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
int value =(Integer) jobDetail.getJobDataMap().get("Value");
System.out.println(value);
value++;
jobExecutionContext.getJobDetail().getJobDataMap().put("Value", value);
}
}

参考:定时任务框架Quartz-(一)Quartz入门与Demo搭建