添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 降低资源消耗
    可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度
    当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
  • 不使用线程池的坏处

  • 频繁的线程创建和销毁会占用更多的CPU和内存。
  • 频繁的线程创建和销毁会对GC产生比较大的压力。
  • 线程太多,线程切换带来的开销将不可忽视。
  • 线程太少,多核CPU得不到充分利用,是一种浪费。
  • 线程池的工作原理

    当一个新的任务提交到线程池之后:

  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步。
  • 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
    线程池的工作流程.png
  • ThreadPoolExecutor的处理流程

    Executors

    Executors是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。

    // 创建单一线程的线程池
    public static ExecutorService newSingleThreadExecutor();
    // 创建固定数量的线程池
    public static ExecutorService newFixedThreadPool(int nThreads);
    // 创建带缓存的线程池
    public static ExecutorService newCachedThreadPool();
    // 创建定时调度的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
    // 创建流式(fork-join)线程池
    public static ExecutorService newWorkStealingPool();
    

    创建单一线程的线程池

    故名思意,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理。

    创建固定数量的线程池

    和创建单一线程的线程池类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。

    如果线程的数量未达到指定数量,则创建线程来执行任务
    如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
    如果没有线程是空闲的,则将任务缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理.

    创建带缓存的线程池

    这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。

    创建定时调度的线程池

    和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解

    手动创建线程池

    ThreadPoolExecutor源码:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
    

    ThreadPoolExecutor构造方法有7个参数

  • corePoolSize:线程池中的核心线程数。
  • maximumPoolSize:线程池中的最大线程数。
  • keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
  • unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等。
  • workQueue:等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象。
  • threadFactory:线程工厂,我们可以使用它来创建一个线程。
  • handler:拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。
  • 为什么阿里Java规约禁止使用Java内置Executors创建线程池?

    阿里巴巴Java规约中让我们手动创建线程池效果更好哦!
    其实可以从ThreadPoolExecutor构造方法的7个参数出发。
    规约中的原话:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    等待队列-workQueue

    等待队列是BlockingQueue类型的,理论上只要是它的子类,都可以用来作为等待队列。

    JDK中自带的一些阻塞队列

  • ArrayBlockingQueue:队列是有界的,基于数组实现的阻塞队列。
  • LinkedBlockingQueue:队列可以有界,也可以无界。基于链表实现的阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列。
  • PriorityBlockingQueue:带优先级的无界阻塞队列。
  • 通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。

    线程工厂-threadFactory

    ThreadFactory接口

    ThreadFactory是一个接口,只有一个方法。
    ThreadFactory是一个接口,只有一个方法。.png

    Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}。
    Executors采用了默认的DefaultThreadFactory线程工厂.png
    源代码:

    * The default thread factory static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t;

    自定义线程名称就是实现ThreadFactory

    * 带有名称的线程工厂 * <p>为什么需要定义线程的名称? * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。 class MyThreadFactory implements ThreadFactory { * 线程名称 private final String threadName; * 构造器:传入线程名称,设置线程名称 MyThreadFactory(String threadName) { this.threadName = threadName; @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, threadName); t.setDaemon(true); return t;

    拒绝策略/线程池饱和策略-handler

    什么是拒绝策略?

    就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他…

    JDK有哪些拒绝策略?

    JDK自带4种拒绝策略

    JDK自带4种拒绝策略,分别是:
    1.CallerRunsPolicy:在调用者线程执行。
    自实现CallerRunsPolicy类似:

    * 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。 class MyCallerRunsPolicy implements RejectedExecutionHandler { public MyCallerRunsPolicy() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { r.run();

    2.AbortPolicy:直接抛出RejectedExecutionException异常。
    自实现AbortPolicy类似:

    * 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。 * 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。 class MyAbortPolicy implements RejectedExecutionHandler { public MyAbortPolicy() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());

    3.DiscardPolicy:任务直接丢弃,不做任何处理。

    * 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。 class MyDiscardPolicy implements RejectedExecutionHandler { public MyDiscardPolicy() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

    4.DiscardOldestPolicy:丢弃队列里最旧的那个任务,再尝试执行当前任务。

    * 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去 class MyDiscardOldestPolicy implements RejectedExecutionHandler { public MyDiscardOldestPolicy() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { executor.getQueue().poll(); executor.execute(r);

    如何使用?

            // 线程池拒绝策略:DiscardPolicy => 直接丢弃
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            // 自实现DiscardPolicy
    //        executor.setRejectedExecutionHandler(new MyDiscardPolicy());
            // 线程池拒绝策略:AbortPolicy => 直接抛异常
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
            // 自实现MyAbortPolicy
    //        executor.setRejectedExecutionHandler(new MyAbortPolicy());
            // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 自实现MyCallerRunsPolicy
    //        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());
            // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
            // 自实现MyDiscardOldestPolicy
    //        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());
            // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
            executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
    

    提交任务的两种方式

    提及任务的方式有两种,分别是:submit和execute

    这两个方法的区别:

  • submit:submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
    submit(Runnable task)源代码:
      public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          return ftask;
    
  • execute:execute()用于提交不需要返回结果的任务。
    execute源代码:
      public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          int c = ctl.get();
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          else if (!addWorker(command, false))
              reject(command);
    

    关闭线程池的两种方式

    可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。

    这两个方法的区别:

  • shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
    shutdown源代码:
      public void shutdown() {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              // 确保安全关闭
              checkShutdownAccess();
              // 将线程池状态置为SHUTDOWN
              advanceRunState(SHUTDOWN);
              // 不再接受新任务
              interruptIdleWorkers();
              onShutdown(); // hook for ScheduledThreadPoolExecutor
          } finally {
              mainLock.unlock();
          tryTerminate();
    
  • shutdownNow()会将线程池状态置为STOP,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
    shutdownNow源代码:
      public List<Runnable> shutdownNow() {
          List<Runnable> tasks;
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              // 确保安全关闭
              checkShutdownAccess();
              // 将线程池状态置为STOP
              advanceRunState(STOP);
              // 打断所有线程
              interruptWorkers();
              // 清空队列
              tasks = drainQueue();
          } finally {
              mainLock.unlock();
          tryTerminate();
          // 并将队列中的任务返回回来
          return tasks;
    

    另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。

    如何正确配置线程池的参数?

  • 任务的性质:CPU密集型、IO密集型和混杂型。
  • 任务的优先级:高中低。
  • 任务执行的时间:长中短。
  • 任务的依赖性:是否依赖数据库或者其他系统资源。
  • 通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。

    可以通过Runtime.getRuntime().availableProcessors()来获取CPU的个数。

    线程池监控

    如果系统中大量用到了线程池,那么我们是不是有必要对线程池进行监控。
    这样子有助于我们定位出现的问题。

    ThreadPoolExecutor自带了一些方法:

  • long getTaskCount():获取已经执行或正在执行的任务数。
  • long getCompletedTaskCount():获取已经执行的任务数。
  • int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过。
  • int getPoolSize():获取线程池线程数。
  • int getActiveCount():获取活跃线程数(正在执行任务的线程数)。
  • protected void beforeExecute(Thread t, Runnable r):任务执行之前调用。
  • protected void afterExecute(Runnable r, Throwable t):任务执行之后调用。
  • protected void terminated():线程池结束之后调用。
  • 遇到的一个问题

    package com.lzhpo.threadpool.demo3;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     * 遇到的一个问题
     * @author lzhpo
    public class AProblem {
        static class DivTask implements Runnable {
            int a,b;
            public DivTask(int a, int b) {
                this.a = a;
                this.b = b;
            @Override
            public void run() {
                double result = a / b;
                System.out.println(result);
        public static void main(String[] args) {
            ExecutorService executor = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; i++) {
                executor.submit(new DivTask(100, i));
    

    运行结果:

    100.0
    
  • 我明明第一次的时候除数为0,为什么不报错?
  • 按理论来说,应该是有5次输出的,为什么只有三次?
    线程池submit的问题.png
  • 解决办法:
    对submit的返回值进行处理。
    因为submit是一个非阻塞的方法,就是不管你发生什么错误,我都会执行下去。
    线程池submit的问题-解决办法.png

  • 尽量使用手动的方式创建线程池,避免使用Executors工厂类。
  • 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略。
  • 在调线程池submit()方法的时候,一定要尽量避免任务执行异常被吞掉的问题。
  • HandCreateThreadPoolDemo1

    package com.lzhpo.threadpool.demo3;
    import java.util.Random;
    import java.util.concurrent.*;
     * 手动创建线程池
     * @author lzhpo
    public class HandCreateThreadPoolDemo1 {
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    TimeUnit.SECONDS,
                    // 线程池缓冲队列
                    new LinkedBlockingDeque<>(10),
                    // 自定义ThreadFactory线程工厂
                    new MyThreadFactory("HandCreateThreadPoolDemo1")) {
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("I'm beforeExecute.");
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("I'm afterExecute.");
                @Override
                protected void terminated() {
                    System.out.println("I'm terminated.");
             * 线程池拒绝策略
            // 线程池拒绝策略:DiscardPolicy => 直接丢弃
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            // 自实现DiscardPolicy
    //        executor.setRejectedExecutionHandler(new MyDiscardPolicy());
            // 线程池拒绝策略:AbortPolicy => 直接抛异常
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
            // 自实现MyAbortPolicy
    //        executor.setRejectedExecutionHandler(new MyAbortPolicy());
            // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 自实现MyCallerRunsPolicy
    //        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());
            // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
    //        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
            // 自实现MyDiscardOldestPolicy
    //        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());
            // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
            executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
             * 提交任务
            // 方法1:submit,非阻塞方法,有返回结果,也就是Future对象。
            executor.submit(() -> {
                System.out.println("This is a task.");
                System.out.println(Thread.currentThread().getName());
            // 方法2:execute。没有返回结果。
    //        executor.execute(() -> {
    //            System.out.println("This is a task.");
    //        });
             * 关闭线程池
            // 方法1:shutdown。shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
            executor.shutdown();
            // 方法2:立马结束,并且清空任务队列
    //        executor.shutdownNow();
    //--------------------自定义线程名称------------------------
     * 带有名称的线程工厂
     * <p>为什么需要定义线程的名称?
     * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
    class MyThreadFactory implements ThreadFactory {
         * 线程名称
        private final String threadName;
         * 构造器:传入线程名称,设置线程名称
        MyThreadFactory(String threadName) {
            this.threadName = threadName;
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
    //--------------------线程池拒绝策略------------------------
     * 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
     * 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
    class MyAbortPolicy implements RejectedExecutionHandler {
        public MyAbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
     * 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
    class MyCallerRunsPolicy implements RejectedExecutionHandler {
        public MyCallerRunsPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                r.run();
     * 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
    class MyDiscardPolicy implements RejectedExecutionHandler {
        public MyDiscardPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     * 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
    class MyDiscardOldestPolicy implements RejectedExecutionHandler {
        public MyDiscardOldestPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                executor.getQueue().poll();
                executor.execute(r);
     * 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
    class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            new Thread(r, "新线程" + new Random().nextInt(10)).start();
    
  •