news 2026/4/23 17:08:15

【Java】线程池源码解析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【Java】线程池源码解析

hreadPoolExecutor#

首先是ThreadPoolExecutor()里面可以看到线程池核心参数

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.acc = System.getSecurityManager() == null ?

null :

AccessController.getContext();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

我们首先看下BlockingQueue<Runnable> workQueue点进去看下,发现是

private final BlockingQueue<Runnable> workQueue;队列中存放的是Runnable对象,即可执行对象

还有一个在核心参数中没有体现的:

private final HashSet<Worker> workers = new HashSet<Worker>();

Worker#

这个用存放Worker对象的HashSet其实就是线程池,点Worker进去看下:

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

// 省略部分方法和变量

/** Thread this worker is running in. Null if factory fails. */

final Thread thread;

/** Initial task to run. Possibly null. */

Runnable firstTask;

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

}

HashSet不是线程安全****的。

所以在源码中,凡是涉及对 workers 进行 add(添加线程)或 remove(销毁线程)操作时,都需要获取一把全局锁:mainLock。

这也是为什么在高并发场景下,频繁创建/销毁线程会影响性能的原因之一(因为要争抢 mainLock)。

worker是 extends AQS

思考题: 为什么 Worker 要继承 AQS(实现锁的功能)?它不是已经有 runWorker 在跑了吗?

答案:

Worker 自己实现了一把不可重入锁。

作用:为了区分**“线程是在空闲(等任务)”** 还是 “正在干活(跑任务)”。

在 runWorker 里:

w.lock():开始执行任务前加锁。

w.unlock():任务执行完解锁。

场景:当你想调用 shutdown() 关闭线程池时,线程池会去中断那些没有被锁住(即没有在干活)的线程。如果一个线程正在干活(持有锁),shutdown 就不会中断它,让它把活干完。只有 shutdownNow 才会不管三七二十一去中断正在干活的线程。

可以发现有个重要成员变量:firstTask,这个表明要被执行的任务

看下构造函数:

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

this.thread = getThreadFactory().newThread(this);这里的newThread()参数是this,也就是Worker本身,当创建 Worker 时,它通过线程工厂创建了一个 Java Thread 对象。

还有个run方法public void run() {runWorker(this);}

runWorker()#

run方法中执行的是runWorker()方法,参数是this,点进去看下

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

可以看出里面一个大的while循环,循环条件是task != null || (task = getTask()) != null这就是线程池的线程复用的关键!当任务不为空或者能获取到任务时,线程池都在工作。

getTask()#

这个getTask()方法也可以看下

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

// Are workers subject to culling?

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

wc记录的是工作线程数,timed 标记的是“当前这个线程是否受超时限制”。

再看下里面try内逻辑:

如果 timed == true(通常是因为 wc > corePoolSize),说明当前线程属于“多出来的非核心线程”。

那么它去队列拿任务时,用 poll(keepAliveTime)。

关键点:如果在 keepAliveTime 时间内没拿到任务(返回 null),下一次循环时 timedOut 就会变为 true,进而导致返回 null,最终导致这个 Worker 退出循环被销毁。

总结:是先“等待超时拿到 null”,然后才“被回收”。

execute()#

再看下execute()方法

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread. If it fails, we know we are shut down or saturated

* and so reject the task.

*/

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

无论是看注释还是看if-else那块代码都能理解:

如果任务数小于核心线程数,那就创建核心线程

如果线程池正在运行,尝试将任务加入队列(workQueue.offer(command))

成功后需要二次检查:

如果线程池已关闭,移除任务并拒绝

如果没有线程了,创建新线程

你可能会疑惑,为什么任务已经入队了,还要判断 workerCount == 0 并可能创建一个空任务线程?

原因:假设核心线程数(Core)设为 0,或者在任务入队的瞬间,现有的线程刚好都挂了(抛异常)或者都超时销毁了。

后果:如果这里不检查,任务孤零零地躺在队列里,永远没人去取它,导致“死锁”般的假死状态。

作用:兜底策略,确保只要队列里有任务,就至少有一个线程活着去处理它。

如果队列满了

尝试创建非核心线程(addWorker(command, false))

如果失败(达到最大线程数),拒绝任务

再关注一下另外两个的重要的方法;

prestartCoreThread()#

/**

* Starts a core thread, causing it to idly wait for work. This

* overrides the default policy of starting core threads only when

* new tasks are executed. This method will return {@code false}

* if all core threads have already been started.

*

* @return {@code true} if a thread was started

*/

public boolean prestartCoreThread() {

return workerCountOf(ctl.get()) < corePoolSize &&

addWorker(null, true);

}

prestartAllCoreThreads()#

/**

* Starts all core threads, causing them to idly wait for work. This

* overrides the default policy of starting core threads only when

* new tasks are executed.

*

* @return the number of threads started

*/

public int prestartAllCoreThreads() {

int n = 0;

while (addWorker(null, true))

++n;

return n;

}

1.核心机制:addWorker(null, true)#

首先注意到这两个方法内部都调用了:

Java

addWorker(null, true)

null: 这里的 firstTask 是空。

回忆一下之前看的 runWorker 源码:如果 firstTask 为空,线程启动后就不会立即执行任务,而是直接进入 while 循环调用 getTask()。

getTask() 会调用 workQueue.take(),导致该线程在队列上阻塞等待。

true: 表示创建的是核心线程。

结论:addWorker(null, true) 的作用就是——招一个工人,让他没事干先去仓库门口等着,随时准备干活。

2. prestartCoreThread() —— 启动一个#

public boolean prestartCoreThread() {

// 1. 检查当前线程数是否小于核心数

// 2. 尝试创建一个空任务的核心线程

return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);

}

理解:如果核心线程还没满,就提前启动一个核心线程。

返回值:如果成功启动了一个线程,返回 true;如果核心线程早已满了,返回 false。

3. prestartAllCoreThreads() —— 全部启动#

public int prestartAllCoreThreads() {

int n = 0;

// 只要 addWorker 返回 true(说明还没满),就一直循环创建

while (addWorker(null, true))

++n;

return n;

}

理解:不管现在有多少线程,只要没达到 corePoolSize,就一口气把剩下的坑位全填满,让所有核心线程全部就位待命。

返回值:返回这次一共新启动了多少个线程。

典型应用场景:#

高并发系统的启动时刻:比如“双11”零点,流量瞬间铺天盖地。如果这时候再去创建线程,线程创建的开销可能会导致系统卡顿(Jitter)。使用 prestartAllCoreThreads() 可以在流量到达前先把线程池填满。

低延迟敏感系统:为了避免请求第一次处理时的抖动(Latency Spike),提前预热。

总结#

入口:ThreadPoolExecutor 构造参数(配置)。

调度:execute()(决策:是建线程还是入队)。

载体:Worker(封装了 Thread 和 Runnable)。

引擎:runWorker()(死循环:取任务 -> 执行 -> 统计)。

油箱:getTask()(阻塞队列取货,决定线程生死)。

程池的核心本质:

线程池__不仅仅是 new Thread() 的集合,它更是一个 “生产者-消费者” 模型。

生产者:__execute() 方法,负责把任务“生产”出来并推送到队列或直接交给工人。

消费者:__Worker 线程,负责从队列这个“缓冲区”里不断 getTask() 并消费。

管理者:__ThreadPoolExecutor 持有 ctl 状态,动态控制工人的数量(招人/裁员)。

思考题:如果核心线程数设置为0会发生啥?#

当 corePoolSize = 0 _时,线程池__就像一个“全兼职”_的机构。

任务来了先尝试进队列。

如果进了队列,必须兜底检查是否有线程活着,如果没有,通过 addWorker(null, false) 创建一个临时工(非核心线程)来处理队列里的活。注意这里必须用 false__,因为核心编制(Core)是0,只能招临时工(Max)。

这个临时工线程在 keepAliveTime _超时后会被销毁,_线程池__最终会变回空状态。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 14:48:41

刘二大人PyTorch深度学习实践第二讲笔记

个新坑&#xff0c;系统学一遍深度学习好做毕设&#xff0c;能到河工大挺激动的&#xff0c;赶紧给刘二大人投自荐简历&#xff0c;但是已读不回&#xff0c;还是自己太菜了........不过已经到河工大了挺好的&#xff0c;梦校第二讲线性模型image-20251125141224993image-20251…

作者头像 李华
网站建设 2026/4/23 13:10:43

再探二分查找

各位好久不见&#xff0c;不知不觉2025都快要结束了&#xff0c;是时候来再总结一次算法&#xff08;入门&#xff09;的经验了。 最近笔者看标准算法库时&#xff0c;注意到C算法库中只有两种二分查找的方法&#xff1a;lower_bound和upper_bound&#xff0c;分别用来查找第一…

作者头像 李华
网站建设 2026/4/23 14:34:23

自动化运维利器Ansible

前言 在如今的IT环境中&#xff0c;服务器数量越来越多&#xff0c;业务流程也越来越复杂。如果还靠手工登录每台服务器操作&#xff0c;不仅效率低&#xff0c;还容易出错。这时候&#xff0c;自动化运维工具就成了运维工程师的“救星”。 Ansible作为其中的佼佼者&#xff0c…

作者头像 李华
网站建设 2026/4/23 14:44:32

基于SpringBoot+Vue的台球厅管理系统(完整源码+万字论文+精品PPT)

这里写目录标题博主简介源码演示录像论文创作效果图【部分】开发框架以及工具介绍系统运行效果图资源可行性分析数据库表结构设计代码示例获取源码【支持定做】博主简介 &#x1f468;‍&#x1f4bb; 博主简介&#xff1a; 本人是CSDN特邀作者、博客专家、CSDN新星计划导师&a…

作者头像 李华