Java线程池实现原理与ThreadPoolExector详解
线程池(Thread Pool)是一种基于池化思想管理线程的工具。
为什么要使用线程池?
- 降低资源消耗:通过池化技术重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池参数
核心构造方法如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
ThreadPoolExecutor
7 大参数:
- corePoolSize :核心线程数。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。这些线程创建后并不会销毁,而是一种常驻线程。
- maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。但线程池内的总线程数不会超过最大线程数,且如果使用了无界任务队列则该参数不起效果。
- workQueue:任务队列。用于存放等待执行的任务的阻塞队列。如果核心线程都在执行任务,并且任务队列没有满,则将新任务存储在这个任务队列。
- ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,按 FIFO 排序任务。
- LinkedBlockingQueue: 一个基于链表结构的阻塞队列,可有界也可无界,按 FIFO 排序任务,吞吐量通常要高于
ArrayBlockingQueue
。静态工厂方法Executors.newFixedThreadPool()
使用了这个队列。 - SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于
LinkedBlockingQueue
。静态工厂方法Executors.newCachedThreadPool()
使用了这个队列。 - PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
keepAliveTime:表示核心线程外的线程的空闲存活时间,也就是工作线程空闲后,核心线程外的线程不会立即销毁,而是会等到时间超过
keepAliveTime
时才会被销毁。unit :
keepAliveTime
参数的时间单位。threadFactory:为线程池提供创建新线程的线程工厂,可以用来设置线程名、是否为守护线程等等。
handler :拒绝 / 饱和策略。当队列和线程池都满了,则采取饱和策略处理提交的新任务。当
ThreadPoolExector
已经关闭时,execute()
方法会调用 Handler。这个策略默认情况下是AbortPolicy
,表示无法处理新任务时抛出异常。AbortPolicy :直接抛出异常
RejectedExecutionException
来拒绝新任务的处理。CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用
execute
方法的线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。这种策略会降低对于新任务提交速度,影响程序的整体性能。DiscardPolicy :不处理新任务,直接丢弃掉。
DiscardOldestPolicy : 丢弃队列里头部(最早)的任务,并执行当前任务。
线程池中阻塞队列的作用
- 普通队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前要入队的任务;而阻塞队列可以通过阻塞保留住当前想要继续入队的任务。
- 阻塞队列自带阻塞和唤醒的功能,当任务队列中没有任务时,阻塞队列可以阻塞要获取任务的线程,线程池利用阻塞队列的take方法将线程挂起,让其进入wait状态,让核心线程不占用cpu资源。
线程池工作流程
当向线程池提交一个任务后的处理流程:
- 如果当前运行的线程数小于 corePoolSize,无论线程是否空闲,都会新建一个核心线程来执行任务。(注意,执行这一步需要获得全局锁)。
- 如果当前运行的线程数 >= corePoolSize 时,则将任务加入任务队列。
- 当任务队列已满,则创建新的线程(非核心线程)来处理任务(注意,执行这一步需要获得全局锁)。
- 当任务队列已满, 且当前运行总线程数达到了 maximumPoolSize,则会采取拒绝策略进行处理。
以下是 ThreadPoolExecute
的 execute
方法的执行流程的源码:
1 | public void execute(Runnable command) { |
线程复用原理
线程池创建线程时,会调用 addWorker
方法将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取任务队列里的任务来执行,从而达到线程复用的目的。
为什么先添加队列而不是先创建最大线程?
为了在执行 execute 方法时,尽可能地避免获取全局锁。因为创建新线程时,需要获取全局锁,会阻塞其他的线程,十分耗费资源,影响了整体的效率。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute 方法调用都是执行加入任务队列,而这一步不需要获取全局锁。
常见问题
execute()方法和submit()方法的区别
execute()
方法只能接收Runnable
对象。submit()
方法可以接收Runnable
和Callable
类型的对象。submit()
方法可以返回持有计算结果的Future
对象,可以判断任务是否执行成功,而execute()
方法不可以。
换句话说就是,**execute()
方法用于提交不需要返回值的任务,submit()
方法用于需要提交返回值的任务**。
shutdown()和shutdownNow()
shutdown()
和shutdownNow()
的原理是遍历线程池中的工作线程,逐个调用线程的 interrupt 方法去中断线程。
但不同之处在于shutdown()
只是将线程池状态设置成 SHUTDOWN 状态,中断没有在执行任务的线程;shutdownNow()
则会将线程池状态设置成 STOP 状态,尝试停止所有正在执行或暂停任务的线程。
isTerminated()和isShutdown()
isTerminated()
:当线程池中所有任务都已经关闭时,返回 true。isShutdown()
:当线程池调用shutdown()
或shutdownNow()
方法时,返回true。
Exector 框架
由 3 大部分组成:
- 任务:包括被执行任务需要实现的接口
Runnable
或Callable
- 任务的执行:包括任务执行机制的核心接口
Executor
,以及继承自Executor
接口的ExecutorService
接口。ThreadPoolExecutor
和ScheduledThreadPoolExecutor
这两个关键类实现了 ExecutorService 接口。 - 异步计算的结果:**
Future
** 接口以及Future
接口的实现类FutureTask
类都可以代表异步计算的结果。
Executor 框架的使用示意图
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 - 把创建完成的实现
Runnable
/Callable
接口的对象直接交给ExecutorService
执行:ExecutorService.execute(Runnable command)
)或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
)。 - 如果执行
ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象(返回的是FutureTask
对象)。由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
ThreadPoolExector
Java中的线程池核心实现类是ThreadPoolExecutor,UML 类图如下:
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行分离开来进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力:
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
- 提供了管控线程池的方法,比如停止线程池的运行,如
shutdown()
。
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
主要用来在给定的延迟后运行任务,或者定期执行任务。使用 DelayQueue 无界队列作为任务队列。
常见线程池
通过 Executor 框架的工具类 Exectors,可以常见 3 种类型的 ThreadPoolExector。
- FixedThreadPool
- SingleThreadExector
- CachedThreadPool
FixedThreadPool
FixedThreadPool
被称为可重用固定线程数的线程池。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
FixedThreadPool
的 corePoolSize
和 maximumPoolSize
都被设置为 nThreads,这个 nThreads 参数是我们使用时指定的线程数。keepAliveTime
设置为 0L,意味着多余的空闲线程会被立即终止。使用无界队列 LinkedBlockingQueue
(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列。
FixedThreadPool 的 execute() 运行流程如下:
- 如果当前运行的线程数小于 corePoolSize,如果再来新任务的话,就创建新线程来执行任务;
- 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入
LinkedBlockingQueue
; - 线程执行完任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
SingleThreadExecutor
SingleThreadExecutor
是使用单个 worker 线程的线程池。
1 | public static ExecutorService newSingleThreadExecutor() { |
SingleThreadExecutor
的 corePoolSize
和 maximumPoolSize
都被设置为 1。其他参数和 FixedThreadPool
相同。使用无界队列 LinkedBlockingQueue
(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列。
CachedThreadPool
CachedThreadPool
是一个会根据需要创建新线程的线程池。
1 | public static ExecutorService newCachedThreadPool() { |
CachedThreadPool
的corePoolSize
被设置为 0,即 corePool 为空;maximumPoolSize
被设置为 Integer.MAX.VALUE
,即它是无界的,这意味着如果主线程提交任务的速度高于 maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新线程。极端情况下,这样会导致耗尽 CPU 和内存资源。
CachedThreadPool 的 execute() 运行流程如下:
- 首先执行
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成; - 新创建的线程将任务执行完后,会继续执行 poll 操作,这个 poll 操作会让空闲线程最多在
SynchronousQueue
中等待 60 秒。
ScheduledThreadPool
创建一个支持可延迟或定期执行任务的线程池。
推荐使用 ThreadPoolExecutor 构造函数创建线程池
通过 ThreadPoolExecutor
构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors
返回线程池对象的弊端如下:
FixedThreadPool
和SingleThreadExecutor
: 允许请求的队列长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
和ScheduledThreadPool
: 允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
参考资料
《Java 并发编程的艺术》