添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

[toc]

Executor 简介

在 Java 中,更偏向于使用 Executor 而非 Thread 来执行任务。 Executor 接口定义如下:

1
2
3
public interface Executor {
void execute(Runnable command);
}

类似于 Thread ,它可以用来在线程中执行一个 Runnable (任务),具体的执行策略取决于具体实现。比如以下实现就是每个任务都新建一个线程来执行:

1
Executor executor = command -> new Thread(command).start()

在标准库中, Executors 类提供了若干 static 方法可用于构造不同类型的 Executor ,它们在线程池中取出线程来执行任务,对于 已提交(调用 execute submit 方法执行) 的任务,一般有三种状态:

  • 已经完成:任务已经由池中的某个线程执行完毕,对应线程已返回给池中
  • 运行中:已经分配了线程执行任务
  • 等待执行:由于线程分配策略限制(比如限制了同时运行的线程数量上限),任务被缓存到内部队列,等待被执行。
  • ExecutorService

    Executor 的生命周期

    JVM 会在所有非守护线程全部终止后才会退出,比如以下代码:

    1
    2
    final Executor executor = Executors.newSingleThreadExecutor(); // 该线程池仅包含一个可用线程
    executor.execute(() -> System.out.println(Thread.currentThread().getName() + " done"))

    打印 pool-1-thread-1 done 后会卡住,查看线程栈会看到:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    "pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007fa04392a000 nid=0xa603 waiting on condition [0x0000700004bd6000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x000000076b1778a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    进入 ThreadPoolExecutor#getTask 内部看到相应代码:

    1
    2
    3
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take(); // 卡在这里

    workQueue 也就是前文所说的保存任务的工作队列:

    1
    private final BlockingQueue<Runnable> workQueue;

    可见线程池 ThreadPoolExecutor 都会尝试从 workQueue 中取出任务然后分配给线程来执行,见 runWorker 方法:

    1
    2
    3
    4
    final void runWorker(Worker w) {
    /* ... */
    try {
    while (task != null || (task = getTask()) != null) {

    Executors 类创建的实际上是 ExecutorService 类型,它继承自 Executor 接口,提供了对 Executor 生命周期的管理。当然,此外还提供了 submit 接口来基于 Future 对任务进行管理。

    shutdown

    1
    void shutdown();

    该方法会使 executor 等待所有已提交的任务运行完成。包括在工作队列中的任务。注意该方法并不会等待。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ExecutorService executor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 3; i++) {
    final int index = i;
    executor.execute(() -> {
    println(index + " started...");
    sleep(1000);
    println(index + " stopped.");
    });
    }
    executor.shutdown();
    println("Executor shutdown: " + executor.isShutdown());
    try {
    executor.execute(() -> {});
    } catch (RejectedExecutionException e) {
    println("Failed to execute task after shutdown: " + e.getMessage());
    }

    注:上述代码使用了我自省添加的 println 方法来打印时间戳和线程名,以及 sleep 方法吞掉 InterruptedException

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private static void println(String x) {
    System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName() + " | " + x);
    }

    private static void sleep(long millis) {
    try {
    Thread.sleep(millis);
    } catch (InterruptedException ignored) {
    }
    }

    运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    1644147267909 pool-1-thread-1 | 0 started...
    1644147267909 main | Executor shutdown: true
    1644147267911 main | Failed to execute task after shutdown: Task jcip.Main$$Lambda$2/598446861@619a5dff rejected from java.util.concurrent.ThreadPoolExecutor@1ed6993a[Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
    1644147268913 pool-1-thread-1 | 0 stopped.
    1644147268913 pool-1-thread-1 | 1 started...
    1644147269915 pool-1-thread-1 | 1 stopped.
    1644147269916 pool-1-thread-1 | 2 started...
    1644147270916 pool-1-thread-1 | 2 stopped.

    可以看到 shutdown() 立刻返回了,但是 JVM 进程是等待所有线程退出后才结束。但由于 shutdown 方法被调用,executor 会拒绝接受新的任务,因此在调用 execute 时会抛出 RejectedExecutionException 异常,包含了线程池 ThreadPoolExecutor 的具体信息:

    1
    Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0

    线程池已经关闭,池的大小为 1,活跃线程数为 1,排队的任务为 2,已经完成的任务数量为 0。

    awaitTermination

    由于 shutdown 并不会等待 executor 关闭,因此 ExecutorService 还提供了 awaitTermination 方法进行等待。

    1
    2
    boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;

    若在 timeout 范围内 executor 已经停止,则返回 true。否则返回 false,即等待超时。因此可以轮询调用该方法来等待 executor 停止。这里修改前一节调用 shutdown 方法之后的代码如下所示:

    1
    2
    3
    4
    5
    println("Executor terminated: " + executor.isTerminated());
    while (!executor.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
    // No ops
    }
    println("Executor terminated: " + executor.isTerminated());

    相关打印信息:

    1
    2
    1644148189276 main | Executor terminated: false
    1644148192288 main | Executor terminated: true

    从时间戳之差(3008 毫秒)可见等待不到 10 秒(timeout)就完成了。注意这里还调用了 isTerminated 方法,当所有任务都结束时该方法会返回 true。

    shutdownNow

    shutdown 是优雅的关闭,如果担心有的任务是有 bug 的,会一直卡住,导致 JVM 进程无法终止,此时需要用一种粗暴的关闭方式,也就是 shutdownNow

    1
    List<Runnable> shutdownNow();

    我初看这个方法时比较迷惑,以为 shutdownNow 是异步关闭,而 shutdown 是同步关闭。 实际上, shutdown 会等待所有任务完成,只不过不再接受新的任务。而 shutdownNow 则是 取消 所有运行中的任务,并且不再启动等待中的任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ExecutorService executor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 3; i++) {
    final int index = i;
    executor.execute(() -> {
    println(index + " started...");
    try {
    Thread.sleep(1000);
    println(index + " stopped.");
    } catch (InterruptedException e) {
    println(index + " is cancelled");
    }
    });
    }
    executor.shutdownNow();
    executor.awaitTermination(1, TimeUnit.MINUTES);

    运行结果:

    1
    2
    1644148971844 pool-1-thread-1 | 0 started...
    1644148971844 pool-1-thread-1 | 0 is cancelled

    中断状态的线程会被取消,因此抛出 InterruptedException ,而排队的两个任务则干脆没执行。

    线程池 ThreadPoolExecutor

    前面一直使用了单线程的线程池,它创建的 executor 类型实际上是 ThreadPoolExecutor

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService // 仅仅是个 wrapper,finalize() 方法会调用 shutdown()
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
    }

    线程池的配置

    包括前两节提到的 ThreadFactory RejectedExecutionHandler ThreadPoolExecutor 构造参数及其作用如下所示:

    corePoolSize 核心线程数,即保持运行的线程数,即使处于空闲状态,除非 allowCoreThreadTimeout 被设置 maximumPoolSize 线程池最大允许创建的线程数 keepAliveTime 当线程数超过核心线程数数量时,闲置线程在终止前等待新任务的最大时间 TimeUnit keepAliveTime 对应的时间单位 workQueue BlockingQueue<Runnable> 持有待执行的任务的队列 threadFactory ThreadFactory 提供 newThread 接口,可在通过 Runnable 创建线程时设置线程的一些信息(比如名字) handler RejectedExecutionHandler 当线程池到达上限时,新任务到来的处理策略。默认是抛出 RejectedExecutionException

    常见的几种快速创建线程池(用静态方法去掉 new 前缀)的参数:

    上述几种典型的配置各有缺陷,比如 CachedThreadPool 无法限制线程数量,容易导致线程创建太多而 OOM。而另外两种配置则限制死了线程的最大数量。

    最核心的参数是 corePoolSize maximumPoolSize 。当线程池的线程数不大于 corePoolSize 时,对于新的任务都会创建线程。但是如果正在运行的线程数量达到了 corePoolSize ,新任务到来时则会根据 maximumPoolSize workQueue 来决定行为:

  • workQueue 未满:加入队列
  • workQueue 已满:
  •