添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
1
2
3
4
5
6
7
// nThreads是线程池中线程的数量,核心线程数和最大线程数一样
ExecutorService executor = Executors.newFixedThreadPool(int nThreads);

// Executors中的newFixedThreadPool方法实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
- 优点:具有固定数量的线程,可确保线程数始终保持在指定的数量上。适用于需要控制并发线程数的场景,可以避免线程数量过多导致系统资源耗尽。
- 缺点:任务队列无界限制,如果任务提交速度超过线程处理速度,可能导致队列积压过多任务,最终可能导致内存溢出。不适合处理大量长时间运行的任务。

CachedThreadPool(缓存线程池):该线程池不固定线程数量,可以根据需要自动创建新线程,也会自动回收闲置的线程。适用于执行大量短期的任务。

1
2
3
4
5
6
7
ExecutorService executor = Executors.newCachedThreadPool();


// Executors中的newCachedThreadPool方法实现
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
- 优点:线程数量不固定,根据任务的提交情况动态创建和回收线程。适用于短期、异步的任务处理,能够灵活调配线程资源。
- 缺点:由于线程数量不受限制,如果任务提交速度过快,可能导致创建过多的线程,进而消耗过多的系统资源,甚至导致系统崩溃。

SingleThreadExecutor(单线程池):该线程池只包含一个线程,用于顺序执行任务。如果该线程因异常而终止,会创建一个新的线程来替代。

1
2
3
4
5
6
ExecutorService executor = Executors.newSingleThreadExecutor();

// Executors中的newSingleThreadExecutor方法实现
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
- 优点:只有一个工作线程,保证任务按照指定顺序执行。适用于需要顺序执行任务的场景,例如需要按照任务的提交顺序进行处理。
- 缺点:由于只有一个线程,如果该线程因为异常而终止,线程池将会创建一个新线程代替,可能会带来额外的开销。不适合处理大量耗时的任务。

ScheduledThreadPool(调度线程池):该线程池用于定时或周期性执行任务。可以指定任务的延迟时间或执行周期。

1
2
3
4
5
6
7
8
9
10
11
12
// corePoolSize是线程池中核心线程的数量
ScheduledExecutorService executor = Executors.newScheduledThreadPool(int corePoolSize);

// // Executors中的newScheduledThreadPool方法实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

// ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
- 优点:用于定时或周期性执行任务,可以指定任务的延迟时间或执行周期。适用于需要定时执行任务的场景。
- 缺点:线程数量固定,如果任务过多或任务执行时间过长,可能会导致任务堆积,影响调度的准确性。

为什么不推荐使用这些内置线程池?

任务队列没有限制:内置线程池的任务队列默认是无界的,如果任务提交速度过快,可能会导致队列积压过多任务,最终导致内存溢出或系统资源耗尽。

默认的线程拒绝策略:内置线程池的默认线程拒绝策略是抛出RejectedExecutionException,当任务提交超过线程池的处理能力时,会导致任务被拒绝执行。这可能会导致任务丢失或需要手动处理拒绝的任务。

配置限制有限:内置线程池提供了一些参数来配置线程池的行为,例如核心线程数、最大线程数、任务队列等。然而,这些参数可能不足以满足复杂的业务需求。对于更复杂的场景,可能需要更高级的线程池实现或手动创建自定义线程池。

缺乏监控和扩展功能:内置线程池的功能相对简单,缺乏对线程池的监控和扩展能力。在一些需要对线程池进行监控、统计或动态调整的场景下,内置线程池可能无法满足需求。

鉴于上述原因,对于复杂的应用程序和具有特定需求的场景,建议使用更高级的线程池实现,例如ThreadPoolExecutor类,它提供了更多的配置选项和灵活性,以满足各种需求。此外,还可以考虑使用第三方的线程池库,如Guava或Apache Commons等,它们提供了更多功能和扩展性。自定义线程池能够更好地适应特定的业务需求,并提供更好的控制和可扩展性。

ThreadPoolExecutor介绍

ThreadPoolExecutor是Java中的一个灵活且强大的线程池实现,它提供了很多配置选项,你可以将任务提交给线程池执行,并根据需要动态调整线程池的大小和配置。它是并发编程中常用的工具,适用于各种需要处理异步任务的场景,如服务器端应用程序、多线程数据处理和并行计算等。

ThreadPoolExecutor的一些关键特点:

  • 1.线程池大小控制:你可以通过设置核心线程池大小(corePoolSize)和最大线程池大小(maximumPoolSize)来控制线程池中的线程数量。核心线程池大小是线程池中一直保持活动的线程数,而最大线程池大小是线程池中允许存在的最大线程数。
  • 2.任务排队:ThreadPoolExecutor提供了多种任务排队策略,例如无界队列(Unbounded Queue)、有界队列(Bounded Queue)和同步移交(Synchronous Transfer)。你可以根据需要选择适合的任务排队策略,以控制任务的提交和执行。
  • 3.线程生命周期管理:ThreadPoolExecutor负责管理线程的生命周期,包括线程的创建、执行任务和销毁。它会根据线程池的配置自动创建和回收线程,以及处理线程的异常和空闲状态。
  • 4.拒绝策略:当线程池无法接受新的任务时,ThreadPoolExecutor提供了多种拒绝策略来处理这种情况。例如,你可以选择丢弃任务、抛出异常或在调用者线程中执行任务。
  • 5.统计和监控:ThreadPoolExecutor提供了一些方法来获取线程池的状态和统计信息,比如活动线程数、已完成任务数、任务队列大小等。这些信息可以帮助你监控和调优线程池的性能。
  • ThreadPoolExecutor类的一些常用API

    ThreadPoolExecutor的创建与配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    int corePoolSize = 5; // 核心线程池大小
    int maxPoolSize = 10; // 最大线程池大小
    long keepAliveTime = 5000; // 非核心线程的空闲时间
    TimeUnit unit = TimeUnit.MILLISECONDS; // 空闲时间的单位

    // ThreadPoolExecutor使用任务队列来存储待执行的任务。你可以选择使用不同类型的BlockingQueue实现,比如LinkedBlockingQueue、ArrayBlockingQueue等。
    BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1000); // 任务队列

    // 实例化ThreadPoolExecutor类
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue);

    // 设置拒绝策略,ThreadPoolExecutor提供了四种内置的拒绝策略:
    // 1.AbortPolicy,默认策略,即当线程池无法接受新任务时,会抛出RejectedExecutionException。
    // 2.CallerRunsPolicy,即当线程池无法接受新任务时,会在调用者线程中执行该任务。
    // 3.DiscardPolicy,即当线程池无法接受新任务时,新任务会被直接丢弃,不会抛出异常。
    // 4.DiscardOldestPolicy,会丢弃线程池中最早提交的一个任务,然后尝试重新提交被拒绝的任务。
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    ThreadPoolExecutor的调用

    没有返回值

    1
    2
    // 通过调用execute()方法,将任务提交给ThreadPoolExecutor执行,这里的MyTask是实现了Runnable接口的自定义任务类。
    executor.execute(new MyTask());
    1
    2
    3
    // 通过调用submit()方法,将任务提交给ThreadPoolExecutor执行,并返回了一个Future对象,用于获取任务的执行结果。
    // 这里的CallableTask是实现了Runnable接口或者Callable<T>接口的自定义任务类。
    Future<String> future = executor.submit(new CallableTask());

    需要注意的是,submit()方法可以接受不同类型的任务(Runnable或Callable),并返回一个Future对象。对于Runnable类型的任务,submit()方法返回的Future对象的get()方法将始终返回null。

    执行流程图

    提交一个Runnable时,不管当前线程池中的线程是否空闲,只要数量小于核心线程数就会创建新线程。

    ThreadPoolExecutor是非公平的,比如队列满了之后提交的Runnable可能会比正在排队的Runnable先执行。

    ThreadPoolExecutor的关闭

    1
    2
    3
    4
    5
    6
    // 不再接受新的任务,但是正在处理的任务和队列中尚未处理的任务会继续执行完毕
    executor.shutdown();

    // 不再接受新的任务,也不再执行队列中的任务,并且会中断正在处理的任务
    // 线程池会尽力停止正在执行的任务,但无法保证任务会立即停止。因此,在调用shutdownNow()后,你可以通过检查返回的任务列表来获取所有尚未处理完成的任务,并根据需要进行处理。
    List<Runnable> list = executor.shutdownNow();

    调用shutdownNow()也并不意味着线程池立刻就关闭了,可以通过如下方式判断线程池是否已经终止

    (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()触发,如果先调shutdown()紧着调shutdownNow(),就会发生SHUTDOWN -> STOP

    SHUTDOWN -> TIDYING:队列为空并且线程池中没有线程时自动转换

    STOP -> TIDYING:线程池中没有线程时自动转换(队列中可能还有任务,但是永远不会被执行)

    TIDYING -> TERMINATEDterminated()执行完后就会自动转换

    线程池为什么一定得是阻塞队列?

    线程池中的线程在运行过程中,执行完创建线程时绑定的第一个任务后,就会不断的从队列中获取任务并执行,那么如果队列中没有任务了,线程为了不自然消亡,就会阻塞在获取队列任务时,等着队列中有任务过来就会拿到任务从而去执行任务。通过这种方法能最终确保,线程池中能保留指定个数的核心线程数。

    线程发生异常,会被移出线程池吗?

    会。但为了保证维持住固定的核心线程数,会再创建一个新的线程。

    单个任务的异常情况,不会直接影响线程池中的其他线程,线程池会继续执行其他任务,除非遇到无法处理的异常,例如线程池被关闭或发生了无法恢复的错误。

    然而,如果某个任务的异常没有被正确处理,可能会导致整个线程池无法正常工作。例如,如果异常被忽略或没有适当的错误日志记录,可能会导致问题的隐患积累或任务无法正确完成。

    因此,在使用线程池时,建议为任务提供适当的异常处理逻辑,以确保及时捕获和处理异常,以及记录错误信息。这有助于提高线程池的可靠性和稳定性。

    也可以为线程池配置全局异常处理逻辑,如果线程执行过程中发生了未捕获的异常,可以通过下面的方式处理异常:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));

    ThreadFactory threadFactory = runnable -> {
    Thread t = new Thread(runnable);
    t.setUncaughtExceptionHandler((thread, error) -> System.out.println(thread.getName() + ":错误信息:" + error.getMessage()));
    return t;
    };

    executor.setThreadFactory(threadFactory);

    线程池的核心线程数、最大线程数该如何设置?

    线程池中的核心线程数计算方法:

    CPU核心数[逻辑核] = Runtime.getRuntime().availableProcessors();
    线程等待时间[阻塞时间]:指的就是线程没有使用CPU的时间,比如阻塞在了IO
    线程运行总时间:指的是线程执行完某个任务的总时间
    阻塞系数 = 线程等待时间[阻塞时间] / 线程运行总时间

    PS: 可以在压测时使用JVM提供的jvisualvm得到对应线程运行的总时间总时间(CPU),通过计算得到:
    线程等待时间 = 总时间 - 总时间(CPU)
    线程运行总时间 = 总时间

    1.计算密集型:内存运算,尽可能避免发生线程上下文切换
    核心线程数 = CPU核心数 + 1

    2.IO密集型:一般文件读写、数据库读写、网络接口调用等都属于IO密集型

    - 核心线程数 = CPU核心数 * (1 + 阻塞系数)
    - 该方法下,通常设置为 CPU核心数 * 2,所以: 4C服务器,线程数为8个左右

    方法2 [推荐]:
    - 核心线程数 = CPU核心数 / (1 - 阻塞系数)
    - 该方法下,经验上一般阻塞系数取值为0.8~0.9,所以: 4C服务器,线程数为20 ~ 40个为宜

    PS: 经验上来讲,方法2更为准确,但以上只是理论,实际工作中情况会更复杂,比如一个应用中,可能有多个线程池,除开线程池中的线程可能还有很多其他线程,或者除开这个应用还是一些其他应用也在运行,所以实际工作中如果要确定准确的线程数,最好是压测。

    CPU密集型任务:CPU核心数+1,这样既能充分利用CPU,也不至于有太多的上下文切换成本

    IO密集型任务:建议压测,或者先用公式计算出一个理论值(理论值通常都比较小)

    对于核心业务(访问频率高),可以把核心线程数设置为我们压测出来的结果,最大线程数可以等于核心线程数,或者大一点点,比如我们压测时可能会发现500个线程最佳,但是600个线程时也还行,此时600就可以为最大线程数

    对于非核心业务(访问频率不高),核心线程数可以比较小,避免操作系统去维护不必要的线程,最大线程数可以设置为我们计算或压测出来的结果。

    Tomcat中的线程池ThreadPoolExecutor

  • Tomcat中有一个与JUC包下同名的ThreadPoolExecutor,即org.apache.tomcat.util.threads.ThreadPoolExecutor,它扩展了Java标准库中的java.util.concurrent.ThreadPoolExecutor,具有一些特定的功能和行为:
  • 入队时,如果线程池的线程个数等于最大线程池数才入队,如果线程池的线程个数小于最大线程池数,会返回false,表示入队失败
  • 提交任务时,会先判断线程个数是否小于核心线程数,如果小于则创建线程,如果等于核心线程数,会入队,但是线程个数小于最大线程数会入队失败,从而会去创建线程
  • 随着任务的提交,会优先创建线程,直到线程个数等于最大线程数才会入队
  • 另外,提交任务时,如果正在处理的任务数小于线程池中的线程个数,那么也会直接入队,而不会去创建线程
  •