学习大纲

要想深入理解 JUC (Java Util Concurrent) 的线程池实现,不能只看 ThreadPoolExecutor 一个类,而应该顺藤摸瓜,从接口定义核心实现辅助组件三个维度入手。

以下是按“阅读优先级”排序的源码研读路径:

第一阶段:核心骨架(必看)

这是线程池的灵魂,90% 的面试题和生产环境故障都源于对这个类的误解。

  1. java.util.concurrent.ThreadPoolExecutor (核心)

    • 地位:绝对的核心。
    • 看什么
      • ctl 变量:高 3 位存状态,低 29 位存线程数。这是位运算设计的典范。
      • execute(Runnable command):入口方法,蕴含了“三级缓冲”逻辑(核心线程 -> 队列 -> 最大线程 -> 拒绝)。
      • addWorker(Runnable firstTask, boolean core):如何创建一个线程并启动它。
      • 内部类 Worker重点!它继承了 AQS。为什么要继承 AQS?为了实现独占锁,判断线程是否空闲(中断时使用)
      • runWorker(Worker w):线程启动后真正运行的死循环。重点看它如何从队列 getTask() 以及如何处理异常。
  2. java.util.concurrent.AbstractExecutorService (骨架)

    • 地位ThreadPoolExecutor 的父类。
    • 看什么
      • submit() 系列方法:它展示了如何将 Runnable/Callable 包装成 FutureTask。这是理解“为什么线程池能拿到返回值”的关键。

第二阶段:顶层抽象(理解设计模式)

了解这些接口,有助于你理解 Java 开发者是如何把“任务提交”和“任务执行”解耦的。

  1. java.util.concurrent.Executor

    • 地位:顶层接口,只有一个 execute 方法。
    • 意义:定义了“执行者”的最基本行为。
  2. java.util.concurrent.ExecutorService

    • 地位:扩展了 Executor
    • 看什么:定义了生命周期管理方法(shutdown, isTerminated)和异步任务提交方法(submit, invokeAll)。

第三阶段:关键零件(组装机器的螺丝)

线程池之所以灵活,是因为它把“存放任务”和“拒绝任务”的逻辑外包出去了。

  1. java.util.concurrent.BlockingQueue 及其实现类

    • 地位:决定了任务的缓冲策略。
    • 看什么
      • ArrayBlockingQueue:有界,基于数组,一把锁。
      • LinkedBlockingQueue:可选有界,基于链表,两把锁(putLock, takeLock),吞吐量通常更高。
      • SynchronousQueue:不存储元素的队列,这对理解 Executors.newCachedThreadPool 至关重要。
  2. java.util.concurrent.RejectedExecutionHandler

    • 地位:拒绝策略接口。
    • 看什么:看看 ThreadPoolExecutor 内部的四个默认实现类(AbortPolicy, CallerRunsPolicy 等)。特别是 CallerRunsPolicy,它会让主线程去执行任务,这在生产环境中既是保命符也可能是隐患。

第四阶段:异步结果(Future 模式)

如果你想知道 threadPool.submit() 拿到的 Future.get() 是如何阻塞直到任务完成的。

  1. java.util.concurrent.FutureTask

    • 地位RunnableFuture 的结合体。
    • 看什么
      • 内部的状态机(NEW -> COMPLETING -> NORMAL…)。
      • get() 方法:如何使用 LockSupport.park() 挂起调用线程。
      • run() 方法:任务执行完后,如何通过 CAS 修改状态并 finishCompletion() 唤醒等待线程。

第五阶段:特殊实现(进阶)

  1. java.util.concurrent.ScheduledThreadPoolExecutor

    • 用途:定时任务。
    • 看点:它内部使用的 DelayedWorkQueue(也就是堆结构)是如何实现延时获取任务的。
  2. java.util.concurrent.ForkJoinPool

    • 用途:Java 8 并行流 (parallelStream) 的底层。
    • 看点工作窃取算法 (Work-Stealing)。每个线程有自己的双端队列,而不是共享一个大队列。这与 ThreadPoolExecutor 完全不同。

建议上手顺序

  1. 先看 ExecutorExecutorService 接口,扫一眼方法名。
  2. 死磕 ThreadPoolExecutor,特别是 execute() 流程和 Worker 内部类。
  3. 带着问题看 LinkedBlockingQueue(数据怎么存的)和 FutureTask(结果怎么拿的)。

你现在的 Java 基础很好(懂 AQS,懂 ConcurrentHashMap),建议在看 ThreadPoolExecutor.Worker 源码时,特别留意它是如何利用 AQS 的 state 变量来实现“不可重入锁”的,这非常有意思。

Executor / ExecutorService

了解一下借口,有助于理解 Java 开发者是如何把“任务提交”和“任务执行”解耦的。

  1. java.util.concurrent.Executor

    • 地位:顶层接口,只有一个 execute 方法。
    • 意义:定义了“执行者”的最基本行为。
  2. java.util.concurrent.ExecutorService

    • 地位:扩展了 Executor
    • 看什么:定义了生命周期管理方法(shutdown, isTerminated)和异步任务提交方法(submit, invokeAll)。

这里是关于 ExecutorExecutorService 接口的解释,附带了中文版的 Excalidraw 风格插图,以帮助理解它们的内部结构和状态变化。

1. Executor 接口

核心来说,Executor 是一个用于执行已提交的 Runnable 任务的简单接口。它将任务的提交与任务如何运行的细节(包括线程使用、调度等)解耦。一个常见的实现是线程池。

下图展示了一个基本的 Executor,它包含一个任务队列和一个工作线程池。任务被提交到队列中,工作线程会拾取并执行它们。
img

  • 任务队列 (·BlockingQueue<Runnable): 一个线程安全的队列,用于保存等待执行的任务。
  • 工作线程: 一个线程池,持续从队列中提取任务并执行它们。

2. ExecutorService 接口及其状态

ExecutorService 扩展了 Executor,提供了一个更完整的异步任务执行框架。它增加了用于管理执行器生命周期(包括关闭它)的方法。ExecutorService 可以处于几种状态之一,如下图所示。
img

  • 运行中 (RUNNING): 初始状态。接受并处理新任务。
  • 关闭中 (SHUTDOWN): 通过调用 shutdown() 启动。不再接受新任务,但会处理之前提交的任务。
  • 停止中 (STOP): 通过调用 shutdownNow() 启动。不再接受新任务,并尝试停止正在执行的任务。
  • 整理中 (TIDYING): 所有任务已完成,工作线程池为空。会调用 terminated() 钩子方法。
  • 已终止 (TERMINATED): terminated() 完成后的最终状态。

3. 转换到 SHUTDOWN 状态

当调用 shutdown() 时,ExecutorService 会从 运行中 (RUNNING) 状态转换到 关闭中 (SHUTDOWN) 状态。这是一种优雅的关闭方式。如下图所示,关键的变化是不再接受新任务。
img

“提交任务”箭头上的红色“X”表示任何提交新任务的尝试都将被拒绝(通常抛出 RejectedExecutionException)。但是,已经在队列中的任务将继续被工作线程处理。

4. 最终的 TERMINATED 状态

一旦所有任务完成并且工作线程已关闭,ExecutorService 就会进入其最终状态:已终止 (TERMINATED)

img
如最终图像所示,在 已终止 (TERMINATED) 状态下,任务队列为空,并且没有活动的工作线程。服务已完成其生命周期,并且所有相关资源已释放。此时调用 isTerminated() 方法将返回 true

ThreadPoolExecutor

img

这是一张非常直观且经典的 Java ThreadPoolExecutor(线程池执行器)工作原理和内部结构流程图。它清晰地描绘了一个新任务提交到线程池后,线程池是如何根据当前状态决定如何处理这个任务的。

下面我将分模块详细解释这张图片涉及的原理流程。


一、 核心组件介绍(图中的关键名词)

在理解流程之前,我们需要先认识图中的几个核心概念:

  1. Incoming Task(新任务):图最左侧的黄色箭头,代表外部提交给线程池需要执行的一个工作单元(通常是一个实现了 RunnableCallable 接口的对象)。
  2. ThreadPoolExecutor(线程池执行器):最外层的米色大框,代表整个线程池的管理容器。
  3. ctl (state + workerCount):位于顶部。这是线程池内部一个非常关键的原子变量(AtomicInteger)。它把线程池的运行状态(如 RUNNING, SHUTDOWN 等)和当前工作线程的数量打包存储在一个整型变量里,用于高效地控制并发。
  4. Worker(工作线程):图中的绿色小人。它们是线程池中真正执行任务的线程封装。
  5. corePoolSize(核心线程数):线程池试图长期维护的线程数量标准。即使它们处于空闲状态,通常也不会被回收(除非设置了允许核心线程超时)。
  6. maximumPoolSize(最大线程数):线程池允许创建的最大线程数量极限。
  7. workQueue (BlockingQueue)(工作队列/阻塞队列):图中的灰色长条框,里面排着黄色小方块(任务)。当没有空闲核心线程时,新任务会暂时存储在这里等待被执行。它是一个先进先出(FIFO)的队列。
  8. workers (HashSet<Worker>)(工作线程集合):图中的绿色云朵区域。它是一个集合,用来持有当前所有存活的工作线程引用。
  9. RejectedExecutionHandler(拒绝策略处理器):底部的红色圆角框。当线程池忙不过来且队列也满了时,用来处理新提交任务的后备机制。

二、 任务提交流程详解(核心路径)

当一个 Incoming Task 到达时,会进入紫色的菱形判断框 Execute?。线程池会按照以下四个步骤的顺序进行判断和处理:

步骤 1:判断核心线程池是否已满

  • 条件poolSize < corePoolSize (当前线程数 < 核心线程数)
  • 动作:对应图中上方指向第一个绿色小人的箭头。如果当前工作线程数量少于设定的核心线程数,线程池会立即创建一个新的 Worker(核心线程)来执行这个新任务。即使此时有其他空闲的核心线程,也会优先创建新线程,直到达到核心线程数

步骤 2:判断工作队列是否已满

  • 条件poolSize >= corePoolSizequeue not full (当前线程数 ≥ 核心线程数,并且工作队列没满)
  • 动作:对应图中指向灰色 workQueue 的箭头。如果核心线程数已达到标准,线程池不会立即创建新线程,而是尝试把新任务放入工作队列(workQueue)中排队,等待现有的工作线程空闲下来去取它。

步骤 3:判断线程池是否达到最大极限

  • 条件workQueue fullpoolSize < maximumPoolSize (工作队列满了,并且当前线程数 < 最大线程数)
  • 动作:对应图中指向第二个绿色小人的箭头。这属于应对突发流量的场景。如果队列满了,说明现有的核心线程处理不过来了,但总线程数还没达到极限。此时,线程池会创建一个新的 Worker(非核心线程)来协助处理这个新任务
    • 注意:这些非核心线程在空闲一段时间后(keepAliveTime),如果拿不到新任务,通常会被回收.

步骤 4:执行拒绝策略

  • 条件workQueue fullpoolSize >= maximumPoolSize (工作队列满了,并且当前线程数 ≥ 最大线程数)
  • 动作:对应图中指向红色框 RejectedExecutionHandler 的箭头。这是最糟糕的情况,线程池已经超负荷运作(队列满了,线程数也达到极限)。线程池无法再接受新任务,只能把任务交给拒绝策略处理器来处理。常见的处理方式包括抛出异常、丢弃任务、由提交任务的线程自己执行等。

三、 工作线程的运行循环(Worker Loop)

图中绿色云朵区域(workers 集合)展示了工作线程在创建之后的行为模式:

  1. Pull Tasks(拉取任务):云朵里的小人有向上的箭头指向 workQueue。这表示工作线程在执行完手头的任务后,不会立即销毁,而是会尝试从工作队列中拉取(take/poll)下一个等待的任务来执行。这是一个循环不断的过程。
  2. Completed(完成任务):右侧巨大的绿色箭头。表示线程成功执行完任务。执行完毕后,它又会回到“拉取任务”的状态。

总结

这张图完美地诠释了 Java 线程池的三级缓冲策略

  1. 第一级:优先使用核心线程
  2. 第二级:核心线程忙,则使用队列缓冲。
  3. 第三级:队列满,则临时创建非核心线程救急。

只有当这三级都无法处理时,才会触发拒绝策略。这种设计既保证了系统资源的合理利用(避免创建过多线程),又具备了一定的应对突发高并发的能力。

代码实践

创建线程池的方式有如下常见的 4 种方式。但是我们只推荐使用一种

1 . 使用 Exeutors 工具类

这属于 JDK 提供的工厂类,预设了几种配置

  • newFixedThreadPool (int nThreads)
    • 创建固定大小的线程池
    • 核心线程数 = 最大线程数
    • 使用无界队列 LinkedBlockingQueue 作为同步队列,若任务提交速度 > 任务处理速度,队列会无限增长,导致 OOM.
  • newCachedThreadPool ()
    • 描述:执行大量、短期的异步任务。
    • 特点:核心线程数为 0,最大线程数为 Integer.MAX_VALUE。如果有空闲线程就复用,没有就创建新线程。
    • 隐患:允许创建的线程数量几乎无限。如果任务过多,会创建大量线程,导致 CPU 100% 或 OOM
  • newSingleThreadExecutor ()
    • 描述:单线程池。
    • 特点:只有一个核心线程,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    • 隐患:同样使用无界队列 LinkedBlockingQueue,存在 OOM 风险。
  • newScheduledThreadPool (int corePoolSize)
    • 支持定时和周期性任务执行的线程池
    • 替代 Timer
  • newWorkStealingPool(int parallelism)
    • 这是 Java 7 引入的,专门为了解决分治算法(Divide-and-Conquer)或大数据并行计算而设计。
    • Work-Stealing(工作窃取)算法:每个线程都有自己的双端队列。如果某个线程干完活了,它会去“偷”其他忙碌线程队列末尾的任务来执行,极大提高了 CPU 利用率。
    • 场景:递归任务、大量计算型任务。
    • 应用:Java 8 的 Stream.parallel() 底层默认使用的就是公共的 ForkJoinPool
特性 newFixedThreadPool newWorkStealingPool
底层实现 ThreadPoolExecutor ForkJoinPool
队列结构 一个共享的 LinkedBlockingQueue 每个线程一个独立的 Deque (双端队列)
任务分配 抢占式 (锁竞争大) 工作窃取 (锁竞争小,自动负载均衡)
线程类型 User Thread (用户线程)


主程序会等待线程池跑完才退出。
Daemon Thread (守护线程)


主程序一旦结束,线程池立马被杀掉。
适用场景 任务耗时比较均匀,且需要保证任务必须执行完。 任务耗时差异大(有的快有的慢),希望利用高并发榨干 CPU。

2. 使用 ThreadPoolExecutor

这是最原始、最灵活、也是大厂规范(如阿里巴巴 Java 开发手册)强制要求的使用方式。通过手动 new 这个类,你可以明确掌控线程池的每一个行为。

构造函数与 7 大核心参数

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize, // 1. 核心线程数(正式员工)
int maximumPoolSize, // 2. 最大线程数(正式员工 + 临时工)
long keepAliveTime, // 3. 临时工空闲存活时间
TimeUnit unit, // 4. 时间单位
BlockingQueue<Runnable> workQueue, // 5. 任务阻塞队列(候客区)
ThreadFactory threadFactory, // 6. 线程工厂(给线程起名,便于排查问题)
RejectedExecutionHandler handler // 7. 拒绝策略(人满且队满时的处理方案)
)

关键:任务提交的执行流程

理解这个流程是调优线程池的基础:

  1. Core 满了吗? 没满 -> 创建核心线程执行。
  2. Queue 满了吗? 没满 -> 放入队列等待。
  3. Max 满了吗? 没满 -> 创建非核心线程(临时工)执行。
  4. 都满了 -> 触发 Reject (拒绝策略)

常见的拒绝策略

  • AbortPolicy (默认):直接抛出异常 RejectedExecutionException
  • CallerRunsPolicy:调用者所在线程自己去执行该任务(“谁派活谁自己干”),这是一种简单的反馈控制机制,可以减缓提交速度。
  • DiscardPolicy:直接丢弃任务,不予处理。
  • DiscardOldestPolicy:丢弃队列中最老的一个任务,尝试再次提交当前任务。

3. Spring 框架中的 ThreadPoolTaskExecutor

  • 本质:它是 Spring 对 JDK ThreadPoolExecutor 的一层包装(Wrapper)。
  • 优势
    • 可以通过 application.yaml 或 Java Config 配置 Bean。
    • 支持 Spring 的生命周期管理(优雅关闭)。
    • 配合 @Async 注解实现方法级的异步调用。

配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-service-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

源码解读

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {  
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

第一步:尝试使用核心线程 (Step 1)

1
2
3
4
5
6
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 判断当前工作线程数是否小于核心线程数
if (addWorker(command, true)) // 尝试启动一个新线程作为核心线程,并立即执行该任务
return; // 如果成功,方法结束
c = ctl.get(); // 如果失败(可能是并发导致线程数变了,或者线程池状态变了),重新获取状态
}
  • 逻辑:如果当前线程数少于核心线程数(corePoolSize),线程池会优先创建一个新线程来处理这个任务,而不是放入队列。
  • addWorker(command, true):第二个参数 true 表示这次创建是受限于 corePoolSize 的(即创建的是核心线程)。

第二步:尝试放入队列 & 二次检查 (Step 2)

如果核心线程已满,代码进入这一块:

1
2
3
4
5
6
7
8
9
10
11
if (isRunning(c) && workQueue.offer(command)) { // 1. 线程池是运行状态 且 2. 尝试把任务放入队列成功
int recheck = ctl.get(); // 获取最新的状态,进行“双重检查”

// 检查点 A:线程池是否还在运行?
if (! isRunning(recheck) && remove(command)) // 如果线程池被关闭了(不再运行),且能从队列移除刚才放进去的任务
reject(command); // 执行拒绝策略

// 检查点 B:当前依然有存活线程吗?
else if (workerCountOf(recheck) == 0) // 如果线程池虽然在运行,但工作线程数为 0(例如核心线程被允许超时回收了,或者意外死掉了)
addWorker(null, false); // 创建一个非核心线程(空任务),让它去队列里取任务执行
}
  • 逻辑:核心线程满了,尝试把任务丢进阻塞队列(workQueue)。
  • 为什么要 Double-Check (二次检查)?
    • 在“判断运行状态”和“入队”之间,或者“入队”之后,线程池的状态可能发生了变化(比如被别的线程调用了 shutdown())。
    • 情况 A:如果不检查,线程池关闭了但任务还在队列里,这个任务可能永远不会被执行。所以需要回滚(移除任务)并拒绝。
    • 情况 B:如果核心线程数允许为 0,或者所有线程在入队的一瞬间都死掉了(发生异常或超时),那么队列里有任务却没人干活。所以必须创建一个新线程(不带初始任务,专门去捞队列)来激活处理流程。

Remove 调用的核心目的: 为了回滚(Rollback)刚才的入队操作

  • !isRunning:确实关了?
  • remove(command):尝试从队列里删掉这个任务。
    • 如果返回 true(删除成功):说明任务确实还在队列里,我现在把它删了,然后执行 reject
    • 如果返回 false(删除失败):说明任务已经不在队列里了
      • 可能原因:虽然线程池关了,但刚才那一瞬间,某个饥饿的工作线程(Worker)眼疾手快,已经从队列里把这个任务取走开始执行了。
      • 结果:既然已经在跑了,就不需要拒绝了,也不需要回滚。

第三步:尝试使用非核心线程 (Step 3)

如果队列也满了(offer 返回 false),代码进入这一块:

1
2
else if (!addWorker(command, false)) // 尝试启动一个新线程(非核心线程)
reject(command); // 如果失败(说明达到 maximumPoolSize 或线程池已关闭),则执行拒绝策略
  • 逻辑:队列满了,说明负载很高。此时尝试开启“急救”模式,创建非核心线程来处理任务。
  • addWorker(command, false):第二个参数 false 表示这次创建是受限于 maximumPoolSize 的。
  • 拒绝:如果连最大线程数都到了,或者线程池关闭了,就彻底没办法了,调用 reject(command) 抛出异常或执行自定义的拒绝逻辑(如 AbortPolicy)。

总结流程图

为了方便理解,整个逻辑可以概括为以下流程:

  1. 小于 Core? -> 创建核心线程执行。
  2. 否则,队列没满? -> 扔进队列(并检查是否需要补线程或回滚)。
  3. 否则,小于 Max? -> 创建非核心线程执行。
  4. 否则 -> 拒绝 (Reject)。

为什么这段代码写得这么复杂?

这段代码是并发编程的经典案例,它极度追求无锁(Lock-free)或低锁的高性能设计。

  • 它大量使用 ctl (AtomicInteger) 来避免使用重量级的锁。
  • 严格的 if-else 顺序和 recheck 逻辑是为了处理多线程环境下瞬息万变的状态(例如:刚决定入队,线程池就关了;刚决定入队,所有消费线程就挂了)。

AbstractExecutorService

AbstractExecutorService 是 Java 并发包(java.util.concurrent)中一个非常经典的类。它是 ExecutorService 接口的骨架实现(Skeletal Implementation)

一、 主要职责

AbstractExecutorService 位于接口 ExecutorService 和具体实现类(如 ThreadPoolExecutor, ForkJoinPool)之间。

它的核心职责是:将任务的提交(submit)与任务的执行(execute)解耦,并统一处理任务的返回值(Future)。

具体来说,它实现了接口中所有“复杂”的方法,只留下最核心的 execute(Runnable command) 供子类实现。

  1. 实现 submit 系列方法

    • 用户调用 submit(Runnable)submit(Callable)
    • 它负责将这些不同类型的任务包装成统一的 FutureTask(即 RunnableFuture)。
    • 然后调用 execute(Runnable) 将包装好的任务交给具体的线程池去跑。
      img
  2. 实现 invokeAll / invokeAny

    • 实现了批量执行任务的逻辑。它负责遍历任务列表、提交任务、等待任务完成、收集结果或处理超时。
      img

二、 值得学习的点(源码赏析)

1. 典型的模板方法模式(Template Method Pattern)

这是它最大的设计亮点。它假设子类已经有了“执行一个 Runnable”的能力(即 execute 方法),以此为基础构建出了更高级的功能。

  • 变与不变
    • 不变:将任务包装成 Future 并返回给用户的流程是不变的。
    • :任务具体怎么被线程调度、是用线程池还是单线程,这是变化的(由 ThreadPoolExecutor 等子类通过实现 execute 方法决定)。
1
2
3
4
5
6
7
8
9
10
// AbstractExecutorService 源码片段
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 统一包装:将 Runnable 包装成 RunnableFuture (通常是 FutureTask)
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 核心执行:调用抽象的 execute(),具体逻辑由子类决定
execute(ftask);
// 3. 返回凭证:将 Future 返回给调用者
return ftask;
}
2. 工厂方法模式的应用 (newTaskFor)

你可能会问:为什么不直接 new FutureTask(...),而是调用 newTaskFor(...)

这是为了提供扩展性newTaskFor 是一个 protected 方法。如果通过继承 AbstractExecutorService 自定义线程池,你可以重写这个方法,返回你自己实现的 RunnableFuture(例如,带有监控功能的 Future,或者能够处理特定异常的 Future),而不需要修改 submit 的逻辑。

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
3. 适配器思想(统一 Callable 和 Runnable)

Java 中 Runnable 没有返回值,Callable 有返回值。AbstractExecutorService 通过 newTaskFor 抹平了这种差异。 无论是哪种任务,最终都被适配成了 RunnableFuture,这使得底层的执行器(如 ThreadPoolExecutor)只需要关注怎么执行 Runnable 即可,不需要关心任务是否有返回值。

4. 严谨的异常处理与资源清理 (invokeAll 的实现)

invokeAll 方法非常值得一读,它展示了在并发环境下如何编写健壮的代码。

如果不看源码,我们很容易写出简单的 for 循环提交。但源码考虑了:“如果在批量提交过程中,第 5 个任务抛出异常了,前 4 个已经提交的任务怎么办?”

源码逻辑简述: 它使用了一个 finally 块来保证原子性:要么所有任务都成功等待完成,要么一旦出事,取消所有已提交但未完成的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 伪代码逻辑展示 invokeAll 的健壮性
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
List<Future<T>> futures = new ArrayList<>();
boolean done = false;
try {
// 1. 遍历提交所有任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 2. 遍历等待所有任务完成 (f.get)
for (Future<T> f : futures) {
if (!f.isDone()) {
try { f.get(); } catch (Exception ignore) {}
}
}
done = true; // 标记全部成功
return futures;
} finally {
// 3. 兜底策略:如果上面 try 块中途抛出异常(比如内存溢出、被中断),
// done 就会是 false。此时必须取消所有还在运行的任务,防止资源泄漏。
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}

总结

AbstractExecutorService 并没有涉及复杂的线程调度算法(那是 ThreadPoolExecutor 的事),它主要教给了我们:

  1. 如何设计中间层:用模板模式屏蔽接口的复杂性。
  2. 如何统一模型:将不同类型的输入(Runnable/Callable)转化为统一的内部对象(RunnableFuture)。
  3. 如何写健壮的并发代码:在批量操作中,利用 finally 块确保“要么全成功,要么全清理”,防止僵尸任务。

展示了具体的子类(如 ThreadPoolExecutor)如何实现抽象的 execute 方法,真正地调度和运行任务。

img

BlockingQueue

img

这幅 Excalidraw 风格的图片生动地展示了 Java java.util.concurrent 包中四种最核心 BlockingQueue 实现的底层数据结构并发控制模型

图片分为四个板块,每个板块对应一种队列。以下是逐一详解:


1. 左上角:ArrayBlockingQueue (基于数组的单锁模型)

  • 视觉核心:一个圆环形的结构(Ring Buffer / 循环数组)。
  • 关键元素
    • 一把大红锁 (One Big Lock):这是最核心的特征。你可以看到生产者(Producer)和消费者(Consumer)手里拿的钥匙都需要去开中间这把大锁。
    • 互斥竞争:这意味着,无论放数据还是取数据,都必须争抢同一把锁。虽然简单,但在高并发下,两边会互相排斥,导致性能瓶颈。
    • putIndex / takeIndex:圆环上的箭头代表读写指针,它们在固定大小的数组上循环移动。
  • 一句话总结“死板但省心”。固定容量,一把锁管所有,没有对象创建开销(GC友好),但并发度低。

2. 右上角:LinkedBlockingQueue (基于链表的双锁模型)

  • 视觉核心:一条长长的链条(Linked List),中间断开,两头各管各的。
  • 关键元素
    • 两把不同的锁 (Split Locks)
      • 左边是蓝色的 takeLock(控制头部 Head,给消费者用)。
      • 右边是红色的 putLock(控制尾部 Tail,给生产者用)。
    • 独立操作:图片展示了消费者在拆队头的盒子,生产者在装队尾的盒子,两人互不干扰
    • 虚线框:代表中间的节点。只要队列既不空也不满,两头可以同时干活,并发性能极高。
  • 一句话总结“各司其职”。读写分离,并发度高,但每个数据都要包装成 Node 对象,内存占用和 GC 压力略大。

3. 左下角:SynchronousQueue (同步移交/零容量)

  • 视觉核心:中间是的,没有盒子,只有一个虚线的圈(Rendezvous Point / 汇合点)。
  • 关键元素
    • 握手 (Handshake):两只手紧紧握在一起。这代表数据不经过中间存储,直接从生产者的线程栈拷贝到消费者的线程栈。
    • 思考气泡:”Must wait for partner!”(必须等待伙伴)。
    • 阻塞含义:如果生产者来了(左边拿着 Data),但消费者没来(右边空手),生产者必须在那个虚线圈里等着,不能走,直到消费者出现接走数据。
  • 一句话总结“不见不散”。容量为 0,不做存储只做传递。适合线程池(如 CachedThreadPool)这种需要快速扩容、立即响应的场景。

4. 右下角:DelayQueue (基于堆的优先级模型)

  • 视觉核心:一个树形结构(Binary Heap / 二叉堆),类似于金字塔。
  • 关键元素
    • 时钟 (Clocks)
      • 塔尖(堆顶)的时钟显示 0s(Ready,时间到了)。
      • 下面的节点显示 5s, 10s(Future,时间未到)。
    • 可见性:消费者只能看到塔尖那个时间到了的数据。下面的数据虽然在队里,但拿不出来。
    • 睡觉的小人 (Leader):这是 DelayQueue 的核心优化——Leader-Follower 模式
      • 为了避免所有消费者线程都频繁醒来检查时间(惊群效应),队列会让一个线程(Leader)睡特定的时间(比如睡到塔尖元素过期),其他线程则无限期等待。
  • 一句话总结“定时引爆”。内部基于 PriorityQueue 排序,只有倒计时结束的元素才能出队。常用于缓存过期清理、任务超时处理。

总结对比

图示 队列类型 核心隐喻 锁的策略 适用场景
左上 ArrayBQ 圆环 独占锁 (争抢激烈) 只有少量线程,或者追求内存极致稳定时。
右上 LinkedBQ 链条 分拆锁 (各管各的) 通用的高并发生产者-消费者模型(最常用)。
左下 SyncQueue 握手 无锁/CAS (直接传递) 吞吐量极高,但需要消费者处理速度跟得上生产者。
右下 DelayQueue 金字塔 一把锁 + 优先级 延时任务,如订单超时取消、缓存失效。

面试题 / Checklist

  • ThreadPoolExecutorctl 为什么要把状态与 workerCount 打包?
  • execute() 任务提交流程的三段逻辑分别是什么?
  • corePoolSizemaximumPoolSizeworkQueue 三者如何联动决定吞吐与延迟?
  • 为什么不推荐直接用 Executors.newFixedThreadPool/newCachedThreadPool(典型风险点)?
  • Worker 为什么要继承 AQS?它在中断/空闲判断里起什么作用?
  • shutdown() vs shutdownNow() 的差异与正确关闭模板?
  • FutureTask.get() 为何会阻塞?内部靠什么机制唤醒等待线程?
  • SynchronousQueue 的语义是什么?为什么适合 cached thread pool?