public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newCachedThreadPool();
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
public static ExecutorService newWorkStealingPool();
创建单一线程的线程池
故名思意,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理。
创建固定数量的线程池
和创建单一线程的线程池类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。
如果线程的数量未达到指定数量,则创建线程来执行任务
如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
如果没有线程是空闲的,则将任务缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理.
创建带缓存的线程池
这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。
创建定时调度的线程池
和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解
手动创建线程池
ThreadPoolExecutor
源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
ThreadPoolExecutor构造方法有7个参数
corePoolSize:线程池中的核心线程数。
maximumPoolSize:线程池中的最大线程数。
keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等。
workQueue:等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象。
threadFactory:线程工厂,我们可以使用它来创建一个线程。
handler:拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。
为什么阿里Java规约禁止使用Java内置Executors创建线程池?
阿里巴巴Java规约中让我们手动创建线程池效果更好哦!
其实可以从ThreadPoolExecutor构造方法的7个参数出发。
规约中的原话:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
等待队列-workQueue
等待队列是BlockingQueue类型的,理论上只要是它的子类,都可以用来作为等待队列。
JDK中自带的一些阻塞队列
ArrayBlockingQueue:队列是有界的,基于数组实现的阻塞队列。
LinkedBlockingQueue:队列可以有界,也可以无界。基于链表实现的阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列。
PriorityBlockingQueue:带优先级的无界阻塞队列。
通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。
线程工厂-threadFactory
ThreadFactory接口
ThreadFactory是一个接口,只有一个方法。
Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}。
源代码:
* The default thread factory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
自定义线程名称就是实现ThreadFactory
* 带有名称的线程工厂
* <p>为什么需要定义线程的名称?
* 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
class MyThreadFactory implements ThreadFactory {
* 线程名称
private final String threadName;
* 构造器:传入线程名称,设置线程名称
MyThreadFactory(String threadName) {
this.threadName = threadName;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
拒绝策略/线程池饱和策略-handler
什么是拒绝策略?
就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他…
JDK有哪些拒绝策略?
JDK自带4种拒绝策略
JDK自带4种拒绝策略,分别是:
1.CallerRunsPolicy:在调用者线程执行。
自实现CallerRunsPolicy
类似:
* 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
class MyCallerRunsPolicy implements RejectedExecutionHandler {
public MyCallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
2.AbortPolicy:直接抛出RejectedExecutionException异常。
自实现AbortPolicy
类似:
* 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
* 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
class MyAbortPolicy implements RejectedExecutionHandler {
public MyAbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
3.DiscardPolicy:任务直接丢弃,不做任何处理。
* 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
class MyDiscardPolicy implements RejectedExecutionHandler {
public MyDiscardPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
4.DiscardOldestPolicy:丢弃队列里最旧的那个任务,再尝试执行当前任务。
* 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
public MyDiscardOldestPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
executor.getQueue().poll();
executor.execute(r);
如何使用?
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
提交任务的两种方式
提及任务的方式有两种,分别是:submit和execute
这两个方法的区别:
submit:submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
submit(Runnable task)源代码: public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
execute:execute()用于提交不需要返回结果的任务。
execute源代码: public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
else if (!addWorker(command, false))
reject(command);
关闭线程池的两种方式
可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。
这两个方法的区别:
shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
shutdown
源代码: public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
tryTerminate();
shutdownNow()会将线程池状态置为STOP,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
shutdownNow
源代码: public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
tryTerminate();
return tasks;
另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。
如何正确配置线程池的参数?
任务的性质:CPU密集型、IO密集型和混杂型。
任务的优先级:高中低。
任务执行的时间:长中短。
任务的依赖性:是否依赖数据库或者其他系统资源。
通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。
可以通过Runtime.getRuntime().availableProcessors()来获取CPU的个数。
线程池监控
如果系统中大量用到了线程池,那么我们是不是有必要对线程池进行监控。
这样子有助于我们定位出现的问题。
ThreadPoolExecutor自带了一些方法:
long getTaskCount():获取已经执行或正在执行的任务数。
long getCompletedTaskCount():获取已经执行的任务数。
int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过。
int getPoolSize():获取线程池线程数。
int getActiveCount():获取活跃线程数(正在执行任务的线程数)。
protected void beforeExecute(Thread t, Runnable r):任务执行之前调用。
protected void afterExecute(Runnable r, Throwable t):任务执行之后调用。
protected void terminated():线程池结束之后调用。
遇到的一个问题
package com.lzhpo.threadpool.demo3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
* 遇到的一个问题
* @author lzhpo
public class AProblem {
static class DivTask implements Runnable {
int a,b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
@Override
public void run() {
double result = a / b;
System.out.println(result);
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(new DivTask(100, i));
运行结果:
100.0
我明明第一次的时候除数为0,为什么不报错?
按理论来说,应该是有5次输出的,为什么只有三次?
解决办法:
对submit的返回值进行处理。
因为submit是一个非阻塞的方法,就是不管你发生什么错误,我都会执行下去。
尽量使用手动的方式创建线程池,避免使用Executors工厂类。
根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略。
在调线程池submit()方法的时候,一定要尽量避免任务执行异常被吞掉的问题。
HandCreateThreadPoolDemo1
:
package com.lzhpo.threadpool.demo3;
import java.util.Random;
import java.util.concurrent.*;
* 手动创建线程池
* @author lzhpo
public class HandCreateThreadPoolDemo1 {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
new MyThreadFactory("HandCreateThreadPoolDemo1")) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("I'm beforeExecute.");
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("I'm afterExecute.");
@Override
protected void terminated() {
System.out.println("I'm terminated.");
* 线程池拒绝策略
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
* 提交任务
executor.submit(() -> {
System.out.println("This is a task.");
System.out.println(Thread.currentThread().getName());
* 关闭线程池
executor.shutdown();
* 带有名称的线程工厂
* <p>为什么需要定义线程的名称?
* 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
class MyThreadFactory implements ThreadFactory {
* 线程名称
private final String threadName;
* 构造器:传入线程名称,设置线程名称
MyThreadFactory(String threadName) {
this.threadName = threadName;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
* 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
* 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
class MyAbortPolicy implements RejectedExecutionHandler {
public MyAbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
* 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
class MyCallerRunsPolicy implements RejectedExecutionHandler {
public MyCallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
* 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
class MyDiscardPolicy implements RejectedExecutionHandler {
public MyDiscardPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
* 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
public MyDiscardOldestPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
executor.getQueue().poll();
executor.execute(r);
* 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
new Thread(r, "新线程" + new Random().nextInt(10)).start();