程序员阿沛
发布于 2026-06-27 / 0 阅读
0
0

阿里一面说说看线程池的执行流程和原理线程池的拒绝策略有哪些如何确定线程池的核心线程数如何优雅的关闭线

阿里一面:说说看线程池的执行流程和原理?线程池的拒绝策略有哪些?如何确定线程池的核心线程数?如何优雅的关闭线程池?

面试概览:

  • 请说说看什么是线程池?以及为什么使用线程池开发而非单纯的多线程开发?

  • 请说说看线程池的实现原理和执行流程?

  • 如果让你设计一个线程池,那么这个线程池应该有哪些核心参数?

  • 你刚刚有提到线程池的拒绝策略,能说说看具体有哪几种拒绝策略以及它们的适用场景吗?

  • 在实际开发过程中你会如何确定线程池的核心线程数?

  • 如果关闭线程池的时候任务没执行完,如何优雅地关闭线程池?

  • 请简单的实现一个线程池?

面试官:请说说看什么是线程池?以及为什么使用线程池开发而非单纯的多线程开发?

线程池的定义

线程池是一种线程复用技术,它维护着多个线程等待监督管理者分配可并发执行的任务。处理过程中,将任务添加到队列,然后在线程创建后自动启动这些任务。

使用线程池而非单纯多线程开发的原因

  1. 降低资源消耗 :
* 线程池通过重复利用已创建的线程来执行任务,避免了线程的频繁创建和销毁所带来的开销。相比之下,单纯的多线程开发在每次需要执行任务时都会创建新的线程,这会导致资源的浪费和性能的下降。
  1. 提高响应速度 :
* 当任务到达时,线程池可以立即分配一个空闲线程来执行任务,而无需等待线程的创建。这提高了系统的响应速度,尤其是在处理大量并发请求时。
  1. 提高线程的可管理性 :
* 线程是稀缺资源,如果无限制地创建线程,不仅会消耗系统资源,还会降低系统的稳定性。线程池提供了一种统一分配、调优和监控线程的方式,使得线程的管理更加简单和高效。
  1. 避免大量线程的创建导致的内存问题 :
* 在多线程开发中,如果短时间内创建大量线程,可能会使内存到达极限,并出现“OutOfMemory”错误。而线程池通过限制最大线程数来避免这种情况的发生,当线程数达到最大值时,新的任务会被放入队列中等待执行。

面试官:请说说看线程池的实现原理和执行流程?

一、线程池的主要组件

  1. 工作线程 :在线程池中创建的线程,等待并处理任务。

  2. 任务队列 :待处理的任务队列,当有新任务时,任务会被放入队列中等待执行。

  3. 线程池管理器 :管理线程池中线程的生命周期,处理任务调度、线程创建和销毁等。

二、线程池的实现原理

  1. 线程复用 :线程池通过复用线程来避免频繁创建和销毁线程的开销。当有新的任务到来时,线程池会判断当前是否有空闲线程,如果有,则直接将任务分配给空闲线程执行;如果没有,则根据线程池的配置策略(如核心线程数、最大线程数等)来决定是否创建新的线程。

  2. 任务调度 :线程池中的任务调度器负责将任务从任务队列中取出,并分配给空闲的工作线程执行。任务调度器通常会采用先进先出(FIFO)的策略来确保任务的顺序执行。

  3. 线程管理 :线程池管理器负责线程的创建、销毁、状态监控以及任务调度等工作。它会根据线程池的配置参数(如核心线程数、最大线程数、空闲线程存活时间等)来动态调整线程池的规模,以确保系统的稳定性和性能。

  4. 任务执行 :工作线程从任务队列中取出任务后,会执行相应的任务逻辑。任务执行完毕后,工作线程会回到线程池中等待下一个任务的到来。

  5. 线程回收 :当线程池中的工作线程空闲时间超过设定的阈值时,线程池管理器会将其回收,以减少系统资源的占用。同时,当系统不再需要线程池时,可以调用线程池的关闭方法来释放所有资源。

三、线程池的状态转换

线程池的状态转换是其实现原理中的重要部分。线程池通常有以下五种状态:

  1. RUNNING :能接收新提交的任务,也能处理阻塞队列中的任务。

  2. SHUTDOWN :关闭状态,不再接收新提交的任务,但可以继续处理阻塞队列中已保存的任务。

  3. STOP :不接受任务,也不处理阻塞队列中的任务,会中断正在执行的任务。

  4. TIDYING :所有的任务已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法。

  5. TERMINATED :terminated()方法已经执行结束,线程池已完全终止。

四、线程池的执行流程

线程池的执行流程主要涉及核心线程、工作队列、非核心线程以及拒绝策略等关键要素。以下是线程池执行流程的详细解释:

  1. 任务提交 :
* 当出现一个新的线程任务时,线程池会首先尝试在线程池中分配一个空闲线程来执行这个任务。
  1. 核心线程判断 :
* 如果线程池中没有空闲线程,线程池会判断当前“存活线程数”是否小于核心线程数(corePoolSize)。

* 如果“存活线程数”小于核心线程数,线程池会创建一个新的线程来处理新任务。
  1. 工作队列判断 :
* 如果“存活线程数”等于核心线程数,线程池会判断工作队列是否已满。

* 如果工作队列未满(即还有位置可以存放任务),线程池会将新任务放入工作队列中等待,等线程池中出现空闲线程时,再按照“先进先出”的规则分配执行。
  1. 非核心线程判断 :
* 如果工作队列已满,线程池会判断当前存活线程数是否已经达到最大线程数(maximumPoolSize)。

* 如果“存活线程数”小于“最大线程数”,线程池会创建一个新线程来执行新任务。
  1. 拒绝策略 :
* AbortPolicy  :直接抛出RejectedExecutionException异常,阻止系统正常运行(这是默认策略)。

* CallerRunsPolicy  :用调用者所在线程来执行任务,不会抛出异常。

* DiscardPolicy  :不处理该任务,直接丢弃掉。

* DiscardOldestPolicy  :丢弃最老的未处理的任务请求。

* RejectExecutionHandler  :自定义拒绝策略,通过实现RejectedExecutionHandler接口来定义。

* 如果“存活线程数”已经达到或超过最大线程数,且工作队列也已满,线程池会采用拒绝策略来处理新任务。常见的拒绝策略包括:
  1. 任务执行 :
* 一旦线程池中有空闲线程或新线程被创建,线程池就会从工作队列中取出任务,并调度给空闲线程执行。
  1. 线程回收 :
* 对于非核心线程,如果它们在指定的存活时间(keepAliveTime)内没有执行任务,那么这些线程将会被回收。

* 当线程池处于SHUTDOWN状态时,它会停止接收新任务,但会继续处理已添加的任务。此时,空闲线程会被中断,线程池会逐步回收这些线程。

* 当所有任务都已终止,且线程池中的线程数量已经减少到0时,线程池会进入TIDYING状态,并执行钩子函数terminated()。之后,线程池会进入TERMINATED状态,表示线程池已经彻底关闭。

面试官:如果让你设计一个线程池,那么这个线程池应该有哪些核心参数?

以下是一些线程池设计中通常需要考虑的核心参数:

  1. 核心线程数(Core Pool Size) :
* 这是线程池中始终保持的线程数量,即使某些线程处于空闲状态。核心线程数通常根据系统的需求和可用资源来确定。
  1. 最大线程数(Maximum Pool Size) :
* 这是线程池中允许的最大线程数量。当任务队列满时,线程池会尝试创建新的线程,直到达到最大线程数。
  1. 任务队列容量(Work Queue Capacity) :
* 任务队列用于存储等待执行的任务。队列的容量决定了在达到最大线程数之前,线程池可以容纳多少待处理的任务。
  1. 线程存活时间(Keep-Alive Time) :
* 当线程数量超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。这个参数有助于在负载降低时释放资源。
  1. 时间单位(Time Unit) :
* 线程存活时间的时间单位,如毫秒、秒等。
  1. 线程工厂(Thread Factory) :
* 用于创建新线程的工厂类。通过自定义线程工厂,可以设置线程的优先级、是否为守护线程、线程名称等属性。
  1. 拒绝策略(Rejected Execution Handler) :
* 当线程池无法处理新任务时(因为线程数已达到最大且任务队列已满),将执行拒绝策略。常见的拒绝策略包括抛出异常、运行拒绝任务的回调、丢弃最旧的任务或丢弃最新的任务等。
  1. 是否允许核心线程超时(Allow Core Threads Time-Out) :
* 一个布尔值,指定核心线程是否也受线程存活时间的影响。如果设置为true,则核心线程在空闲时间超过线程存活时间后也会被终止。
  1. 任务优先级队列(Optional: Priority Work Queue) :
* 如果任务有不同的优先级,可以使用优先级队列来存储任务。这样,线程池会按照任务的优先级顺序来执行任务。

面试官:你刚刚有提到线程池的拒绝策略,能说说看具体有哪几种拒绝策略以及它们的适用场景吗?

首先,线程池的拒绝策略用于在任务队列已满且线程池无法处理新任务时,决定如何拒绝新的任务请求。

以下是几种Java线程池中常见的拒绝策略及其适用场景:

一、AbortPolicy(默认策略)

  • 作用 :当任务无法被线程池执行时,会抛出一个 RejectedExecutionException 异常。

  • 适用场景 :适用于对任务丢失敏感的场景,当线程池无法接受新任务时,希望立即知道并处理该异常。这种策略适合于那些不能容忍任务被丢弃或延迟执行的业务场景,因为它会立即通知调用者任务被拒绝,从而可以采取相应的措施,比如增加线程池大小、优化任务执行效率或者通知用户等待。

二、CallerRunsPolicy

  • 作用 :当任务无法被线程池执行时,会直接在调用者线程中运行这个任务。如果调用者线程正在执行一个任务,则会创建一个新线程来执行被拒绝的任务。

  • 适用场景 :适用于希望调用者自己处理被拒绝的任务的场景,通常是由调用者自身的线程来执行被拒绝的任务。

三、DiscardPolicy

  • 作用 :当任务无法被线程池执行时,任务将被丢弃,不抛出异常,也不执行任务。

  • 适用场景 :适用于对任务丢失不敏感的场景。例如,如果任务是一些可以重复执行或不太重要的操作,那么可以使用这种策略。但是需要注意的是,使用这种策略可能会导致无法执行的任务被丢弃而不会得到通知,因此需要谨慎选择。

四、DiscardOldestPolicy

  • 作用 :当任务无法被线程池执行时,线程池会丢弃队列中最旧的任务(即等待时间最长的任务),然后尝试再次提交当前任务。

  • 适用场景 :适用于对新任务优先级较高的场景。例如,如果新任务比旧任务更重要或更紧急,那么可以使用这种策略来确保新任务能够被执行。但是需要注意的是,这种策略也可能导致一些旧任务被丢弃而不会得到执行。

除了以上四种常见的拒绝策略外,还可以根据实际需求自定义拒绝策略。自定义拒绝策略可以通过实现 RejectedExecutionHandler
接口并覆盖其 rejectedExecution
方法来实现。在自定义拒绝策略中,可以根据具体的业务逻辑来处理被拒绝的任务,比如记录日志、抛出自定义异常、执行备选方案等。

面试官:在实际开发过程中你会 ** ** 如何确定线程池的核心线程数?

以下是一些常见的线程池核心线程数的考虑因素和确定方法:

考虑因素

  1. 任务特性 :
* CPU密集型任务  :这类任务主要耗费CPU资源,因此核心线程数通常设置为CPU可用核心的数量,以使CPU的利用率最大化。

* IO密集型任务  :这类任务大部分时间都在等待IO操作完成,因此核心线程数可以设置得更高,以充分利用CPU在等待IO操作时的空闲时间。通常,线程数会多于CPU核心数。

* 混合型任务  :这类任务既有计算又有IO操作,需要综合考虑两种任务的比例来决定核心线程数。
  1. 系统资源 :
* CPU核心数  :系统的CPU核心数是限制线程数的重要因素。过多的线程可能导致CPU资源过度竞争,降低系统性能。

* 内存容量  :内存资源也是需要考虑的因素。过多的线程会占用更多的内存,可能导致内存不足的问题。
  1. 应用性能需求 :
* 吞吐量  :如果应用需要处理大量的并发任务,那么可能需要更多的线程来提高吞吐量。

* 响应速度  :对于需要快速响应的应用,合理的线程数可以确保任务被及时处理。
  1. 任务执行时间和到达频率 :
* 如果任务执行时间较长,那么可能需要较少的线程数,因为每个线程会占用较长时间。

* 如果任务到达频率较高,那么可能需要更多的线程来确保任务能够及时被处理。

确定方法

  1. 理论预估 :
* 对于CPU密集型任务,可以使用公式“核心线程数 = CPU核心数”或“核心线程数 = CPU核心数 + 1”进行预估。

* 对于IO密集型任务,可以使用公式“核心线程数 = CPU核心数 / (1 - 阻塞系数)”或简单地设置为CPU核心数的两倍或多一点进行预估。

* 对于混合型任务,需要根据具体情况综合考虑两种任务的比例,并使用相应的公式进行预估。
  1. 压测验证 :
* 通过负载测试和压力测试来观察系统的性能表现,如吞吐量、响应时间等。

* 根据测试结果调整核心线程数,直到找到最佳的性能平衡点。
  1. 监控动态调整 :
* 在实际应用中,可以使用性能监控工具来监控系统的CPU使用率、内存使用情况和响应时间等指标。

* 根据监控结果动态调整核心线程数,以适应负载的变化。

面试官:如果关闭线程池的时候任务没执行完,如何优雅地关闭线程池?

一、使用 shutdown() 方法

shutdown() 方法是 ExecutorService 接口提供的一个用于关闭线程池的方法。它会启动线程池的关闭序列,执行以下操作:

  • 设置线程池的状态为 SHUTDOWN ,此时线程池不再接受新任务。

  • 中断空闲的线程,但不会中断正在执行的任务。

  • 等待已提交的任务执行完成。

调用 shutdown() 方法后,线程池会等待所有已提交的任务执行完毕,然后再关闭。如果需要等待线程池关闭完成,可以调用 awaitTermination() 方法。

二、使用 shutdownNow() 方法

shutdownNow() 方法也是 ExecutorService 接口提供的一个用于关闭线程池的方法。与 shutdown()
方法不同,它会尝试立即关闭线程池,执行以下操作:

  • 设置线程池的状态为 STOP

  • 尝试停止所有正在执行的任务,通过中断线程的方式实现。

  • 返回等待执行的任务列表,这些任务在关闭时还未来得及执行。

  • 清空任务队列。

需要注意的是, shutdownNow()
方法并不保证能停止所有正在执行的任务,因为中断线程只是设置了一个中断状态,具体能否立即生效还取决于线程内部的实现。因此,在调用 shutdownNow() 方法后,可能需要结合 awaitTermination() 方法等待一段时间,以确保尽可能多的任务能够停止。

三、优雅关闭线程池的步骤

  1. 调用 shutdown() 方法 :首先调用 shutdown() 方法,拒绝新提交的任务,并等待已提交的任务执行完成。

  2. 调用 awaitTermination() 方法 :使用 awaitTermination() 方法等待线程池中的所有任务都完成执行。该方法会阻塞当前线程,直到所有任务都完成执行、超时发生或当前线程被中断。

  3. 处理未完成的任务 :如果在超时时间内所有任务没有完成,可以调用 shutdownNow() 方法强制停止未执行完的任务。但需要注意,这可能会导致任务数据丢失或其他问题。

  4. 捕获和处理异常 :在关闭线程池的过程中,需要注意捕获和处理可能出现的异常,以确保程序的健壮性。

以下是一个优雅关闭线程池的示例代码:

ExecutorService executorService = Executors.newFixedThreadPool(10);

// 提交任务给线程池 for (int i = 0; i < 100; i++) { executorService.submit(() -> { // 模拟任务执行 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(“任务执行完成”); }); }
// 关闭线程池 try { // 拒绝新任务,等待已提交任务执行完成 executorService.shutdown(); // 等待一段时间,确保所有任务都执行完成 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // 如果超时仍未完成,则强制关闭线程池 List notExecutedTasks = executorService.shutdownNow(); System.out.println(“有 " + notExecutedTasks.size() + " 个任务未执行完成”); } } catch (InterruptedException e) { // 当前线程被中断,需要重新关闭线程池 executorService.shutdownNow(); Thread.currentThread().interrupt(); }

在这个示例中,我们首先使用 shutdown() 方法拒绝新任务,并等待已提交的任务执行完成。然后,使用 awaitTermination() 方法等待一段时间,以确保所有任务都执行完成。如果超时仍未完成,则使用 shutdownNow()
方法强制关闭线程池,并处理未执行的任务。在关闭线程池的过程中,我们还捕获了 InterruptedException 异常,以确保程序的健壮性。

面试官:请简单的实现一个线程池?

实现一个线程池需要处理线程管理、任务队列、任务调度、线程生命周期控制等多个方面。

以下是一个基本的Java线程池实现示例:

import java.util.concurrent.BlockingQueue;  import java.util.concurrent.LinkedBlockingQueue;  import java.util.List;  import java.util.ArrayList;  import java.util.concurrent.atomic.AtomicInteger;
public class SimpleThreadPool {      private final BlockingQueue<Runnable> taskQueue;      private final List<Worker> workers;      private final AtomicInteger workerCount;      private final int corePoolSize;      private final int maximumPoolSize;      private volatile boolean isShutdown;
    // Worker class that extends Thread      private class Worker extends Thread {          private Runnable currentTask;
        public Worker() {              workerCount.incrementAndGet();          }
        public void runTask(Runnable task) {              currentTask = task;              currentTask.run();              currentTask = null;          }
        @Override          public void run() {              while (!isShutdown || !taskQueue.isEmpty()) {                  try {                      Runnable task = taskQueue.take();                      runTask(task);                  } catch (InterruptedException e) {                      Thread.currentThread().interrupt(); // Restore interrupt status                  }              }              workerCount.decrementAndGet();          }      }
    public SimpleThreadPool(int corePoolSize, int maximumPoolSize) {          this.corePoolSize = corePoolSize;          this.maximumPoolSize = maximumPoolSize;          this.taskQueue = new LinkedBlockingQueue<>();          this.workers = new ArrayList<>();          this.workerCount = new AtomicInteger(0);          this.isShutdown = false;
        // Pre-start core number of workers          for (int i = 0; i < corePoolSize; i++) {              Worker worker = new Worker();              workers.add(worker);              worker.start();          }      }
    public void execute(Runnable task) {          if (isShutdown) {              throw new IllegalStateException("ThreadPool is shut down");          }
        if (workerCount.get() < maximumPoolSize) {              // If less than maximum pool size, start a new worker              Worker worker = new Worker();              workers.add(worker);              worker.start();              worker.runTask(task); // Run the task immediately if the worker is just started          } else {              // Otherwise, put the task in the queue              try {                  taskQueue.put(task);              } catch (InterruptedException e) {                  Thread.currentThread().interrupt(); // Restore interrupt status                  throw new RuntimeException(e);              }          }      }
    public void shutdown() {          isShutdown = true;          // Optionally, interrupt all workers to terminate them immediately          // for (Worker worker : workers) {          //     worker.interrupt();          // }
        // Alternatively, wait for all tasks to complete by not interrupting workers          // and letting them terminate naturally as the queue empties      }
    // Additional methods like shutdownNow(), awaitTermination(), etc., can be added for a more complete implementation
    public static void main(String[] args) throws InterruptedException {          SimpleThreadPool pool = new SimpleThreadPool(2, 4);
        for (int i = 0; i < 10; i++) {              int taskId = i;              pool.execute(() -> {                  System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());                  try {                      Thread.sleep(1000); // Simulate some work                  } catch (InterruptedException e) {                      Thread.currentThread().interrupt();                  }              });          }
        // Shutdown the pool after some time to let tasks complete          Thread.sleep(5000);          pool.shutdown();
        // Optionally, wait for all workers to terminate          // while (pool.workerCount.get() > 0) {          //     Thread.sleep(100);          // }      }  }

解释

  1. SimpleThreadPool 类 :
* ` taskQueue ` : 一个阻塞队列,用于存储待执行的任务。

* ` workers ` : 一个列表,存储所有工作线程。

* ` workerCount ` : 一个原子整数,用于跟踪当前活动的工作线程数。

* ` corePoolSize ` 和 ` maximumPoolSize ` : 分别表示线程池的核心大小和最大大小。

* ` isShutdown ` : 一个标志,表示线程池是否已关闭。
  1. Worker 内部类 :
* 继承自 ` Thread ` ,表示一个工作线程。

* ` currentTask ` : 当前正在执行的任务。

* ` runTask(Runnable task) ` : 运行任务的方法。

* ` run() ` : 工作线程的主循环,不断从任务队列中取任务并执行,直到线程池关闭或任务队列为空。
  1. 构造方法 :
* 初始化线程池的参数,并预启动核心数量的工作线程。
  1. execute(Runnable task) 方法 :
* 如果线程池已关闭,则抛出异常。

* 如果当前活动的工作线程数小于最大池大小,则启动一个新的工作线程并立即运行任务。

* 否则,将任务放入任务队列中等待执行。
  1. shutdown() 方法 :
* 设置关闭标志,可以选择立即中断所有工作线程,或者等待任务队列中的任务完成。
  1. main 方法 :
* 创建一个线程池并提交一些任务。

* 等待一段时间后关闭线程池。

注意

  • 这个实现没有处理线程池的一些高级特性,如拒绝策略、线程工厂、线程保持活动时间等。

  • 在实际应用中,应该使用Java标准库中的 ThreadPoolExecutor 类,它提供了更完整和健壮的线程池实现。

  • 这个实现中的 shutdown() 方法只是设置了关闭标志,并没有实际中断工作线程。在实际应用中,你可能需要更复杂的关闭逻辑,比如等待所有任务完成或者立即中断所有工作线程。

下面再给出 Python 实现的线程池。

import threading  import queue  import time  from typing import Callable, Any
class ThreadPool:      def __init__(self, num_threads: int):          self.tasks = queue.Queue()          self.threads = []          self.shutdown_flag = threading.Event()          self.num_threads = num_threads
        
# Create and start worker threads          for _ in range(num_threads):              thread = threading.Thread(target=self.worker)              thread.start()              self.threads.append(thread)
    def worker(self):          while not self.shutdown_flag.is_set():              try:                  
# Get a task from the queue (blocks if empty)                  func, args, kwargs = self.tasks.get(timeout=1)
                
# Execute the task                  try:                      func(*args, **kwargs)                  except Exception as e:                      print(f"Error executing task: {e}")
                
# Task done, mark it as processed                  self.tasks.task_done()              except queue.Empty:                  
# If the queue is empty and we're shutting down, exit the loop                  if self.shutdown_flag.is_set() and self.tasks.empty():                      break
    def submit(self, func: Callable, *args: Any, **kwargs: Any):          
# Add a task to the queue          self.tasks.put((func, args, kwargs))
    def shutdown(self, wait: bool = True):          
# Set the shutdown flag          self.shutdown_flag.set()
        
# Optionally wait for all tasks to be completed          if wait:              self.tasks.join()
        
# Join all threads          for thread in self.threads:              thread.join()
# Example usage:  def example_task(n):      print(f"Task {n} is running")      time.sleep(2)      print(f"Task {n} is complete")
if __name__ == "__main__":      
# Create a thread pool with 3 threads      pool = ThreadPool(num_threads=3)
    
# Submit some tasks to the pool      for i in range(10):          pool.submit(example_task, i)
    
# Shutdown the pool after all tasks are submitted      time.sleep(5)  
# Give some time for tasks to run      pool.shutdown(wait=True)

欢迎在评论区留言表达看法,阿沛会一一作出回复。

如果本文对大家有帮助,麻烦大家动动小手点个免费的“赞”或“在看”,大家的鼓励就是阿沛持续更新的动力~

-- 往期精彩 –

面试题:说说看进程、线程和协程的区别是什么?协程能够并行吗?Goroutine和Coroutine的区别是什么?

浏览器输入一个网址发生了什么(二) TCP模块封装和传输机制

华为二面:什么是零拷贝技术?mmap和sendfile如何实现零拷贝,它们是否真正实现了零拷贝?

阿里二面:在高并发场景下如何保证消息只被消费一次?

操作系统入门(三)万字长文细说进程间关系:互斥、同步和通信

操作系统入门(一)操作系统概述之并发、共享、虚拟、异步、中断和系统调用


评论