
阿里一面:说说看线程池的执行流程和原理?线程池的拒绝策略有哪些?如何确定线程池的核心线程数?如何优雅的关闭线程池?
面试概览:
-
请说说看什么是线程池?以及为什么使用线程池开发而非单纯的多线程开发?
-
请说说看线程池的实现原理和执行流程?
-
如果让你设计一个线程池,那么这个线程池应该有哪些核心参数?
-
你刚刚有提到线程池的拒绝策略,能说说看具体有哪几种拒绝策略以及它们的适用场景吗?
-
在实际开发过程中你会如何确定线程池的核心线程数?
-
如果关闭线程池的时候任务没执行完,如何优雅地关闭线程池?
-
请简单的实现一个线程池?
面试官:请说说看什么是线程池?以及为什么使用线程池开发而非单纯的多线程开发?
线程池的定义
线程池是一种线程复用技术,它维护着多个线程等待监督管理者分配可并发执行的任务。处理过程中,将任务添加到队列,然后在线程创建后自动启动这些任务。
使用线程池而非单纯多线程开发的原因
- 降低资源消耗 :
* 线程池通过重复利用已创建的线程来执行任务,避免了线程的频繁创建和销毁所带来的开销。相比之下,单纯的多线程开发在每次需要执行任务时都会创建新的线程,这会导致资源的浪费和性能的下降。
- 提高响应速度 :
* 当任务到达时,线程池可以立即分配一个空闲线程来执行任务,而无需等待线程的创建。这提高了系统的响应速度,尤其是在处理大量并发请求时。
- 提高线程的可管理性 :
* 线程是稀缺资源,如果无限制地创建线程,不仅会消耗系统资源,还会降低系统的稳定性。线程池提供了一种统一分配、调优和监控线程的方式,使得线程的管理更加简单和高效。
- 避免大量线程的创建导致的内存问题 :
* 在多线程开发中,如果短时间内创建大量线程,可能会使内存到达极限,并出现“OutOfMemory”错误。而线程池通过限制最大线程数来避免这种情况的发生,当线程数达到最大值时,新的任务会被放入队列中等待执行。
面试官:请说说看线程池的实现原理和执行流程?
一、线程池的主要组件
-
工作线程 :在线程池中创建的线程,等待并处理任务。
-
任务队列 :待处理的任务队列,当有新任务时,任务会被放入队列中等待执行。
-
线程池管理器 :管理线程池中线程的生命周期,处理任务调度、线程创建和销毁等。
二、线程池的实现原理

-
线程复用 :线程池通过复用线程来避免频繁创建和销毁线程的开销。当有新的任务到来时,线程池会判断当前是否有空闲线程,如果有,则直接将任务分配给空闲线程执行;如果没有,则根据线程池的配置策略(如核心线程数、最大线程数等)来决定是否创建新的线程。
-
任务调度 :线程池中的任务调度器负责将任务从任务队列中取出,并分配给空闲的工作线程执行。任务调度器通常会采用先进先出(FIFO)的策略来确保任务的顺序执行。
-
线程管理 :线程池管理器负责线程的创建、销毁、状态监控以及任务调度等工作。它会根据线程池的配置参数(如核心线程数、最大线程数、空闲线程存活时间等)来动态调整线程池的规模,以确保系统的稳定性和性能。
-
任务执行 :工作线程从任务队列中取出任务后,会执行相应的任务逻辑。任务执行完毕后,工作线程会回到线程池中等待下一个任务的到来。
-
线程回收 :当线程池中的工作线程空闲时间超过设定的阈值时,线程池管理器会将其回收,以减少系统资源的占用。同时,当系统不再需要线程池时,可以调用线程池的关闭方法来释放所有资源。
三、线程池的状态转换
线程池的状态转换是其实现原理中的重要部分。线程池通常有以下五种状态:

-
RUNNING :能接收新提交的任务,也能处理阻塞队列中的任务。
-
SHUTDOWN :关闭状态,不再接收新提交的任务,但可以继续处理阻塞队列中已保存的任务。
-
STOP :不接受任务,也不处理阻塞队列中的任务,会中断正在执行的任务。
-
TIDYING :所有的任务已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法。
-
TERMINATED :terminated()方法已经执行结束,线程池已完全终止。
四、线程池的执行流程
线程池的执行流程主要涉及核心线程、工作队列、非核心线程以及拒绝策略等关键要素。以下是线程池执行流程的详细解释:

- 任务提交 :
* 当出现一个新的线程任务时,线程池会首先尝试在线程池中分配一个空闲线程来执行这个任务。
- 核心线程判断 :
* 如果线程池中没有空闲线程,线程池会判断当前“存活线程数”是否小于核心线程数(corePoolSize)。
* 如果“存活线程数”小于核心线程数,线程池会创建一个新的线程来处理新任务。
- 工作队列判断 :
* 如果“存活线程数”等于核心线程数,线程池会判断工作队列是否已满。
* 如果工作队列未满(即还有位置可以存放任务),线程池会将新任务放入工作队列中等待,等线程池中出现空闲线程时,再按照“先进先出”的规则分配执行。
- 非核心线程判断 :
* 如果工作队列已满,线程池会判断当前存活线程数是否已经达到最大线程数(maximumPoolSize)。
* 如果“存活线程数”小于“最大线程数”,线程池会创建一个新线程来执行新任务。
- 拒绝策略 :
* AbortPolicy :直接抛出RejectedExecutionException异常,阻止系统正常运行(这是默认策略)。
* CallerRunsPolicy :用调用者所在线程来执行任务,不会抛出异常。
* DiscardPolicy :不处理该任务,直接丢弃掉。
* DiscardOldestPolicy :丢弃最老的未处理的任务请求。
* RejectExecutionHandler :自定义拒绝策略,通过实现RejectedExecutionHandler接口来定义。
* 如果“存活线程数”已经达到或超过最大线程数,且工作队列也已满,线程池会采用拒绝策略来处理新任务。常见的拒绝策略包括:
- 任务执行 :
* 一旦线程池中有空闲线程或新线程被创建,线程池就会从工作队列中取出任务,并调度给空闲线程执行。
- 线程回收 :
* 对于非核心线程,如果它们在指定的存活时间(keepAliveTime)内没有执行任务,那么这些线程将会被回收。
* 当线程池处于SHUTDOWN状态时,它会停止接收新任务,但会继续处理已添加的任务。此时,空闲线程会被中断,线程池会逐步回收这些线程。
* 当所有任务都已终止,且线程池中的线程数量已经减少到0时,线程池会进入TIDYING状态,并执行钩子函数terminated()。之后,线程池会进入TERMINATED状态,表示线程池已经彻底关闭。
面试官:如果让你设计一个线程池,那么这个线程池应该有哪些核心参数?
以下是一些线程池设计中通常需要考虑的核心参数:
- 核心线程数(Core Pool Size) :
* 这是线程池中始终保持的线程数量,即使某些线程处于空闲状态。核心线程数通常根据系统的需求和可用资源来确定。
- 最大线程数(Maximum Pool Size) :
* 这是线程池中允许的最大线程数量。当任务队列满时,线程池会尝试创建新的线程,直到达到最大线程数。
- 任务队列容量(Work Queue Capacity) :
* 任务队列用于存储等待执行的任务。队列的容量决定了在达到最大线程数之前,线程池可以容纳多少待处理的任务。
- 线程存活时间(Keep-Alive Time) :
* 当线程数量超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。这个参数有助于在负载降低时释放资源。
- 时间单位(Time Unit) :
* 线程存活时间的时间单位,如毫秒、秒等。
- 线程工厂(Thread Factory) :
* 用于创建新线程的工厂类。通过自定义线程工厂,可以设置线程的优先级、是否为守护线程、线程名称等属性。
- 拒绝策略(Rejected Execution Handler) :
* 当线程池无法处理新任务时(因为线程数已达到最大且任务队列已满),将执行拒绝策略。常见的拒绝策略包括抛出异常、运行拒绝任务的回调、丢弃最旧的任务或丢弃最新的任务等。
- 是否允许核心线程超时(Allow Core Threads Time-Out) :
* 一个布尔值,指定核心线程是否也受线程存活时间的影响。如果设置为true,则核心线程在空闲时间超过线程存活时间后也会被终止。
- 任务优先级队列(Optional: Priority Work Queue) :
* 如果任务有不同的优先级,可以使用优先级队列来存储任务。这样,线程池会按照任务的优先级顺序来执行任务。
面试官:你刚刚有提到线程池的拒绝策略,能说说看具体有哪几种拒绝策略以及它们的适用场景吗?
首先,线程池的拒绝策略用于在任务队列已满且线程池无法处理新任务时,决定如何拒绝新的任务请求。
以下是几种Java线程池中常见的拒绝策略及其适用场景:
一、AbortPolicy(默认策略)
-
作用 :当任务无法被线程池执行时,会抛出一个
RejectedExecutionException异常。 -
适用场景 :适用于对任务丢失敏感的场景,当线程池无法接受新任务时,希望立即知道并处理该异常。这种策略适合于那些不能容忍任务被丢弃或延迟执行的业务场景,因为它会立即通知调用者任务被拒绝,从而可以采取相应的措施,比如增加线程池大小、优化任务执行效率或者通知用户等待。
二、CallerRunsPolicy
-
作用 :当任务无法被线程池执行时,会直接在调用者线程中运行这个任务。如果调用者线程正在执行一个任务,则会创建一个新线程来执行被拒绝的任务。
-
适用场景 :适用于希望调用者自己处理被拒绝的任务的场景,通常是由调用者自身的线程来执行被拒绝的任务。
三、DiscardPolicy
-
作用 :当任务无法被线程池执行时,任务将被丢弃,不抛出异常,也不执行任务。
-
适用场景 :适用于对任务丢失不敏感的场景。例如,如果任务是一些可以重复执行或不太重要的操作,那么可以使用这种策略。但是需要注意的是,使用这种策略可能会导致无法执行的任务被丢弃而不会得到通知,因此需要谨慎选择。
四、DiscardOldestPolicy
-
作用 :当任务无法被线程池执行时,线程池会丢弃队列中最旧的任务(即等待时间最长的任务),然后尝试再次提交当前任务。
-
适用场景 :适用于对新任务优先级较高的场景。例如,如果新任务比旧任务更重要或更紧急,那么可以使用这种策略来确保新任务能够被执行。但是需要注意的是,这种策略也可能导致一些旧任务被丢弃而不会得到执行。
除了以上四种常见的拒绝策略外,还可以根据实际需求自定义拒绝策略。自定义拒绝策略可以通过实现 RejectedExecutionHandler
接口并覆盖其 rejectedExecution
方法来实现。在自定义拒绝策略中,可以根据具体的业务逻辑来处理被拒绝的任务,比如记录日志、抛出自定义异常、执行备选方案等。
面试官:在实际开发过程中你会 ** ** 如何确定线程池的核心线程数?
以下是一些常见的线程池核心线程数的考虑因素和确定方法:
考虑因素
- 任务特性 :
* CPU密集型任务 :这类任务主要耗费CPU资源,因此核心线程数通常设置为CPU可用核心的数量,以使CPU的利用率最大化。
* IO密集型任务 :这类任务大部分时间都在等待IO操作完成,因此核心线程数可以设置得更高,以充分利用CPU在等待IO操作时的空闲时间。通常,线程数会多于CPU核心数。
* 混合型任务 :这类任务既有计算又有IO操作,需要综合考虑两种任务的比例来决定核心线程数。
- 系统资源 :
* CPU核心数 :系统的CPU核心数是限制线程数的重要因素。过多的线程可能导致CPU资源过度竞争,降低系统性能。
* 内存容量 :内存资源也是需要考虑的因素。过多的线程会占用更多的内存,可能导致内存不足的问题。
- 应用性能需求 :
* 吞吐量 :如果应用需要处理大量的并发任务,那么可能需要更多的线程来提高吞吐量。
* 响应速度 :对于需要快速响应的应用,合理的线程数可以确保任务被及时处理。
- 任务执行时间和到达频率 :
* 如果任务执行时间较长,那么可能需要较少的线程数,因为每个线程会占用较长时间。
* 如果任务到达频率较高,那么可能需要更多的线程来确保任务能够及时被处理。
确定方法
- 理论预估 :
* 对于CPU密集型任务,可以使用公式“核心线程数 = CPU核心数”或“核心线程数 = CPU核心数 + 1”进行预估。
* 对于IO密集型任务,可以使用公式“核心线程数 = CPU核心数 / (1 - 阻塞系数)”或简单地设置为CPU核心数的两倍或多一点进行预估。
* 对于混合型任务,需要根据具体情况综合考虑两种任务的比例,并使用相应的公式进行预估。
- 压测验证 :
* 通过负载测试和压力测试来观察系统的性能表现,如吞吐量、响应时间等。
* 根据测试结果调整核心线程数,直到找到最佳的性能平衡点。
- 监控动态调整 :
* 在实际应用中,可以使用性能监控工具来监控系统的CPU使用率、内存使用情况和响应时间等指标。
* 根据监控结果动态调整核心线程数,以适应负载的变化。
面试官:如果关闭线程池的时候任务没执行完,如何优雅地关闭线程池?
一、使用 shutdown() 方法
shutdown() 方法是 ExecutorService 接口提供的一个用于关闭线程池的方法。它会启动线程池的关闭序列,执行以下操作:
-
设置线程池的状态为
SHUTDOWN,此时线程池不再接受新任务。 -
中断空闲的线程,但不会中断正在执行的任务。
-
等待已提交的任务执行完成。
调用 shutdown() 方法后,线程池会等待所有已提交的任务执行完毕,然后再关闭。如果需要等待线程池关闭完成,可以调用 awaitTermination() 方法。
二、使用 shutdownNow() 方法
shutdownNow() 方法也是 ExecutorService 接口提供的一个用于关闭线程池的方法。与 shutdown()
方法不同,它会尝试立即关闭线程池,执行以下操作:
-
设置线程池的状态为
STOP。 -
尝试停止所有正在执行的任务,通过中断线程的方式实现。
-
返回等待执行的任务列表,这些任务在关闭时还未来得及执行。
-
清空任务队列。
需要注意的是, shutdownNow()
方法并不保证能停止所有正在执行的任务,因为中断线程只是设置了一个中断状态,具体能否立即生效还取决于线程内部的实现。因此,在调用 shutdownNow() 方法后,可能需要结合 awaitTermination() 方法等待一段时间,以确保尽可能多的任务能够停止。
三、优雅关闭线程池的步骤
-
调用
shutdown()方法 :首先调用shutdown()方法,拒绝新提交的任务,并等待已提交的任务执行完成。 -
调用
awaitTermination()方法 :使用awaitTermination()方法等待线程池中的所有任务都完成执行。该方法会阻塞当前线程,直到所有任务都完成执行、超时发生或当前线程被中断。 -
处理未完成的任务 :如果在超时时间内所有任务没有完成,可以调用
shutdownNow()方法强制停止未执行完的任务。但需要注意,这可能会导致任务数据丢失或其他问题。 -
捕获和处理异常 :在关闭线程池的过程中,需要注意捕获和处理可能出现的异常,以确保程序的健壮性。
以下是一个优雅关闭线程池的示例代码:
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 提交任务给线程池 for (int i = 0; i < 100; i++) { executorService.submit(() -> { // 模拟任务执行 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(“任务执行完成”); }); }
// 关闭线程池 try { // 拒绝新任务,等待已提交任务执行完成 executorService.shutdown(); // 等待一段时间,确保所有任务都执行完成 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // 如果超时仍未完成,则强制关闭线程池 List
在这个示例中,我们首先使用 shutdown() 方法拒绝新任务,并等待已提交的任务执行完成。然后,使用 awaitTermination() 方法等待一段时间,以确保所有任务都执行完成。如果超时仍未完成,则使用 shutdownNow()
方法强制关闭线程池,并处理未执行的任务。在关闭线程池的过程中,我们还捕获了 InterruptedException 异常,以确保程序的健壮性。
面试官:请简单的实现一个线程池?
实现一个线程池需要处理线程管理、任务队列、任务调度、线程生命周期控制等多个方面。
以下是一个基本的Java线程池实现示例:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.List; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger;
public class SimpleThreadPool { private final BlockingQueue<Runnable> taskQueue; private final List<Worker> workers; private final AtomicInteger workerCount; private final int corePoolSize; private final int maximumPoolSize; private volatile boolean isShutdown;
// Worker class that extends Thread private class Worker extends Thread { private Runnable currentTask;
public Worker() { workerCount.incrementAndGet(); }
public void runTask(Runnable task) { currentTask = task; currentTask.run(); currentTask = null; }
@Override public void run() { while (!isShutdown || !taskQueue.isEmpty()) { try { Runnable task = taskQueue.take(); runTask(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Restore interrupt status } } workerCount.decrementAndGet(); } }
public SimpleThreadPool(int corePoolSize, int maximumPoolSize) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.taskQueue = new LinkedBlockingQueue<>(); this.workers = new ArrayList<>(); this.workerCount = new AtomicInteger(0); this.isShutdown = false;
// Pre-start core number of workers for (int i = 0; i < corePoolSize; i++) { Worker worker = new Worker(); workers.add(worker); worker.start(); } }
public void execute(Runnable task) { if (isShutdown) { throw new IllegalStateException("ThreadPool is shut down"); }
if (workerCount.get() < maximumPoolSize) { // If less than maximum pool size, start a new worker Worker worker = new Worker(); workers.add(worker); worker.start(); worker.runTask(task); // Run the task immediately if the worker is just started } else { // Otherwise, put the task in the queue try { taskQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Restore interrupt status throw new RuntimeException(e); } } }
public void shutdown() { isShutdown = true; // Optionally, interrupt all workers to terminate them immediately // for (Worker worker : workers) { // worker.interrupt(); // }
// Alternatively, wait for all tasks to complete by not interrupting workers // and letting them terminate naturally as the queue empties }
// Additional methods like shutdownNow(), awaitTermination(), etc., can be added for a more complete implementation
public static void main(String[] args) throws InterruptedException { SimpleThreadPool pool = new SimpleThreadPool(2, 4);
for (int i = 0; i < 10; i++) { int taskId = i; pool.execute(() -> { System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName()); try { Thread.sleep(1000); // Simulate some work } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
// Shutdown the pool after some time to let tasks complete Thread.sleep(5000); pool.shutdown();
// Optionally, wait for all workers to terminate // while (pool.workerCount.get() > 0) { // Thread.sleep(100); // } } }
解释
SimpleThreadPool类 :
* ` taskQueue ` : 一个阻塞队列,用于存储待执行的任务。
* ` workers ` : 一个列表,存储所有工作线程。
* ` workerCount ` : 一个原子整数,用于跟踪当前活动的工作线程数。
* ` corePoolSize ` 和 ` maximumPoolSize ` : 分别表示线程池的核心大小和最大大小。
* ` isShutdown ` : 一个标志,表示线程池是否已关闭。
Worker内部类 :
* 继承自 ` Thread ` ,表示一个工作线程。
* ` currentTask ` : 当前正在执行的任务。
* ` runTask(Runnable task) ` : 运行任务的方法。
* ` run() ` : 工作线程的主循环,不断从任务队列中取任务并执行,直到线程池关闭或任务队列为空。
- 构造方法 :
* 初始化线程池的参数,并预启动核心数量的工作线程。
execute(Runnable task)方法 :
* 如果线程池已关闭,则抛出异常。
* 如果当前活动的工作线程数小于最大池大小,则启动一个新的工作线程并立即运行任务。
* 否则,将任务放入任务队列中等待执行。
shutdown()方法 :
* 设置关闭标志,可以选择立即中断所有工作线程,或者等待任务队列中的任务完成。
main方法 :
* 创建一个线程池并提交一些任务。
* 等待一段时间后关闭线程池。
注意
-
这个实现没有处理线程池的一些高级特性,如拒绝策略、线程工厂、线程保持活动时间等。
-
在实际应用中,应该使用Java标准库中的
ThreadPoolExecutor类,它提供了更完整和健壮的线程池实现。 -
这个实现中的
shutdown()方法只是设置了关闭标志,并没有实际中断工作线程。在实际应用中,你可能需要更复杂的关闭逻辑,比如等待所有任务完成或者立即中断所有工作线程。
下面再给出 Python 实现的线程池。
import threading import queue import time from typing import Callable, Any
class ThreadPool: def __init__(self, num_threads: int): self.tasks = queue.Queue() self.threads = [] self.shutdown_flag = threading.Event() self.num_threads = num_threads
# Create and start worker threads for _ in range(num_threads): thread = threading.Thread(target=self.worker) thread.start() self.threads.append(thread)
def worker(self): while not self.shutdown_flag.is_set(): try:
# Get a task from the queue (blocks if empty) func, args, kwargs = self.tasks.get(timeout=1)
# Execute the task try: func(*args, **kwargs) except Exception as e: print(f"Error executing task: {e}")
# Task done, mark it as processed self.tasks.task_done() except queue.Empty:
# If the queue is empty and we're shutting down, exit the loop if self.shutdown_flag.is_set() and self.tasks.empty(): break
def submit(self, func: Callable, *args: Any, **kwargs: Any):
# Add a task to the queue self.tasks.put((func, args, kwargs))
def shutdown(self, wait: bool = True):
# Set the shutdown flag self.shutdown_flag.set()
# Optionally wait for all tasks to be completed if wait: self.tasks.join()
# Join all threads for thread in self.threads: thread.join()
# Example usage: def example_task(n): print(f"Task {n} is running") time.sleep(2) print(f"Task {n} is complete")
if __name__ == "__main__":
# Create a thread pool with 3 threads pool = ThreadPool(num_threads=3)
# Submit some tasks to the pool for i in range(10): pool.submit(example_task, i)
# Shutdown the pool after all tasks are submitted time.sleep(5)
# Give some time for tasks to run pool.shutdown(wait=True)
欢迎在评论区留言表达看法,阿沛会一一作出回复。
如果本文对大家有帮助,麻烦大家动动小手点个免费的“赞”或“在看”,大家的鼓励就是阿沛持续更新的动力~

-- 往期精彩 –
面试题:说说看进程、线程和协程的区别是什么?协程能够并行吗?Goroutine和Coroutine的区别是什么?
浏览器输入一个网址发生了什么(二) TCP模块封装和传输机制