概述
例子
在文章的开头我们先通过一个简单的例子来了解一下如何使用ScheduledThreadPoolExecutor
1 |
package com.example.demo; |
上面的例子分别通过delayRunnable、delayCallable、fixedDelayRunnable、fixedRateRunnable这四个方法来演示了如何通过ScheduledThreadPoolExecutor来对任务进行调度,大家可以将上面的代码拷贝下来观察一下控制台的输出,理解一下这四个方法对应的调度机制。下面我们将深入研究这些调度方法背后实现的原理。
调度方法
1 |
package java.util.concurrent; |
下面我们对这些方法的含义解释一下
schedule(Runnable command,long delay, TimeUnit unit);
这个方法的作用是在指定的delay之后执行command任务,这是一个一次性操作,command只会被执行一次
schedule(Callable
callable, long delay, TimeUnit unit);
与第一个调度方法作用相似,不同之处在于执行的是一个Callable
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);
在ScheduledExecutorService定义的调度方法中我们可以看到每个调度方法返回的都是ScheduledFuture对象,顾名思义这是一个调度任务对象。下面我们就来看一下这个ScheduledFuture对象是什么。
调度任务
执行完调度方法后会返回一个调度任务对象,而这个调度任务对象则是以ScheduledFuture的方式所定义。
1 |
public interface ScheduledFuture<V> extends Delayed, Future<V> { |
ScheduledFuture继承了Delayed和Future接口,Future接口在之前的文章中我们知道它提供了一些对任务的取消、任务结果获取以及任务状态等判断的定义,而Delayed这个接口从名字上我们大概也能猜到它应该是提供一些对任务延迟属性的定义。
Delayed
1 |
package java.util.concurrent; |
Delayed接口的定义很简单,仅仅定义了一个以给定的时间单位获取任务剩余延迟的方法。 即任务还剩多久时间就可以开始执行了。 同时Delayed接口也继承了Comparable接口代表两个延迟对象是能够比较的,不难猜出剩多时间多的任务一般是比剩余时间少的任务大的。(即晚执行)
ScheduledThreadPoolExecutor
构造器
ScheduledThreadPoolExecutor一共定义了四个构造器:
1 |
public ScheduledThreadPoolExecutor(int corePoolSize) { |
从上面的构造器参数我们可以发现,最终的ScheduledThreadPoolExecutor对象的maximumPoolSize值都是Integer.MAX_VALUE,BlockingQueue都是一个DelayedWorkQueue。对ThreadPoolExecutor内部原理了解的同学都知道只有当阻塞队列任务已满的情况下线程池才会去创建一个新的线程直到超过maximumPoolSize然后执行拒绝策略,而在文章的开头处我提到了ScheduledThreadPoolExecutor内部是通过一个最小堆无界队列来保存任务的,所以任务队列是永远不会满的,线程池中存活的只有核心线程, 从侧面告诉我们执行调度任务的线程就是核心线程。能够同时执行调度任务的数量取决于核心线程数。
重写的execute和submit方法
1 |
public void execute(Runnable command) { |
可以发现,execute和submit方法都调用了ScheduledExecutorService中两个一次性的调度方法,也就是说 通过ScheduledThreadPoolExecutor方法执行execute和submit方法的结果就相当于在延迟0秒后执行这个任务。 下面我们就来看一下ScheduledThreadPoolExecutor内部对这些调度方法的具体实现。
调度方法的实现
ScheduledFutureTask
1 |
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { |
ScheduledFutureTask继承了FutureTask实现了RunnableScheduledFuture接口,不了解FutureTask的同学可以看我之前写的文章。这个RunnableScheduledFuture是什么东西呢?
1 |
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { |
RunnableScheduledFuture接口继承了RunnableFuture接口使自身成为了一个Runnable对象,同时也继承了ScheduledFuture对象使自身成为了一个能够调度的任务对象。内部仅仅定义了一个isPeriodic方法,判断任务本身是否是一个周期性任务。
我不是很明白这个接口的意义,为什么不直接让ScheduledFuture继承Runnable然后将这个方法定义在ScheduledFuture内部呢?
现在我们知道内部类ScheduledFutureTask是一个继承FutureTask的能够运行的Runnable,同时本身也是一个延迟对象,能够通过Delayed的getDelay方法来获取剩余执行的时间和RunnableScheduledFuture的isPeriodic方法来表明自身是否是一个周期任务。下面我们详细看一下这个ScheduledFutureTask类内部的定义:
1 |
private class ScheduledFutureTask<V> |
在熟悉了调度任务的具体实现之后,接下来我们再来看一下ScheduledThreadPoolExecutor是如何将Runnable进行调度的。
任务调度
1 |
|
在上面的四个调度实现中我们发现其内部的实现大体一致主要分为如下几个步骤
triggerTime
1 |
private long triggerTime(long delay, TimeUnit unit) { |
将延迟执行的时间转换为纳秒然后调用triggerTime方法计算任务下次执行的具体时间
1 |
long triggerTime(long delay) { |
任务下次执行的时间为当前时间加上入参delay,这里对用户传递的参数delay进行了一个溢出操作的判断,如果传递的参数超过了
Long.MAX_VALUE / 2
的话则调用overflowFree方法矫正延迟时间。(为什么是Long最大值的一半?)
overflowFree
1 |
private long overflowFree(long delay) { |
说实话我也没看懂这个方法的的意图,不过作者在方法注释上面写道: 将队列中所有延迟的值限制在Long.MAX_VALUE之内,以避免compareTo中溢出。 当添加某些延迟为Long.MAX_VALUE的任务时,如果任务有资格出队,但尚未出队,则可能会发生这种情况。
意思是如果线程池中没有空闲的线程时,此刻往线程池中提交一个延迟时间为Long.MAX_VALUE的任务时,在任务入队时进行compareTo方法比较会产生溢出。我还是不能完全理解这段话的意思,有理解的同学麻烦留言告诉我一下。
计算出任务延迟执行的时间后,接下来则是调用delayedExecute方法对任务的执行进行一些提前的操作。
delayedExecute
1 |
private void delayedExecute(RunnableScheduledFuture<?> task) { |
线程池已经关闭的话则拒绝任务
没关闭则将任务提交到队列
再次判断线程池是否已经关闭(),如果关闭的话继续判断是否应该在线程池关闭之后取消周期和非周期任务
如果线程池没关闭的话则调用ensurePrestart方法确保当前线程池中至少有一个线程
小结
现在我们已经熟悉了ScheduledThreadPoolExecutor的基本执行流程,而ScheduledThreadPoolExecutor最为核心的点在于其内部的延迟队列。下面我们就来分析一下这个延迟队列的实现。
DelayedWorkQueue
1 |
// 队列初始化容量为16 |
DelayedWorkQueue是一个基于数组实现的最小堆数据结构,在上面的成员变量中有一个leader属性比较特殊,它是Leader-Follower线程模型的一种变体形式,当前正在队列的头部获取元素的线程就是一个Leader,当这个线程获取元素时,其它也尝试从头部获取元素的线程就会阻塞,就相当于Follower,然后当头部节点线程将节点拿出之后就会唤醒下一个Follower,这个Follower此刻就成为了新的Leader,循环往复,这样的好处就是永远只有一个头节点线程是处于定时等待状态,而其他的Follower线程都是处于永久等待状态,避免了多个线程处于定时等待再唤醒再等待的情况。
现在我们已经对DelayedWorkQueue的内部结构和线程如何从队列头部获取任务的机制有了简单的了解,接着我们看一下ThreadPoolExecutor中的线程是如何从队列中获取任务的
1 |
private Runnable getTask() { |
可以看到通过poll方法来获取带超时时间的任务,而阻塞的获取则是通过take方法来执行的。
我们继续回到ScheduledThreadPoolExecutor的delayedExecute方法中
1 |
private void delayedExecute(RunnableScheduledFuture<?> task) { |
看一看到任务入队是通过队列的add方法来实现的,那接下来我们就通过add和take方法来看一下DelayedWorkQueue内部的实现
add
1 |
public boolean add(Runnable e) { |
内部是基于offer方法实现的
1 |
public boolean offer(Runnable x) { |
offer方法中最重要的点是非根节点入队时通过siftUp方法来为其找到合适的位置
siftUp
1 |
private void siftUp(int k, RunnableScheduledFuture<?> key) { |
siftUp方法的作用是从索引k位置开始递归为元素在堆中找到合适的位置,如果父节点的值比新加入节点的值大的话,由于最小堆的特性,需要将新节点与父节点替换然后递归直到找到比自身值小的父节点为止,整个过程中节点呈现的是上升趋势。整个siftUp方法的执行流程可以通过下图来表示:
take
1 |
public RunnableScheduledFuture<?> take() throws InterruptedException { |
take方法中最重要的点是leader节点从队列头部取出任务时的finishPoll方法
finishPoll
1 |
private void siftDown(int k, RunnableScheduledFuture<?> key) { |
siftDown方法的作用是从索引k位置开始往下最多递归能够拥有子节点的次数,只要找到索引k下的最小的那个子节点值比key还要大,说明key节点是能够成为索引k的子节点的,否则的话将索引k赋值为值最小的那个子节点,然后继续从该最小子节点的索引处往下递归直到将key放在合适的位置位置,整个过程中key节点呈现的是下降趋势。整个siftDown方法的执行流程可以通过下图来表示: