Java线程池

2015/03/20 JUC

Java线程池(ThreadPoolExecutor)

创建Java线程需要给线程分配堆栈内存以及初始化内存,还需要进行系统调用,频繁地创建和销毁线程会大大降低系统的运行效率。出于线程管理的需要,线程池应运而生。线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。

什么是线程池

创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。 简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。

为什么要有线程池

使用线程池的好处在于

  • 降低资源消耗:线程池通常会维护一些线程(数量为 corePoolSize),这些线程被重复使用来执行不同的任务,任务完成后不会销毁。在待处理任务量很大的时候,通过对线程资源的复用,避免了线程的频繁创建与销毁,从而降低了系统资源消耗。
  • 提高响应速度:由于线程池维护了一批 alive 状态的线程,当任务到达时,不需要再创建线程,而是直接由这些线程去执行任务,从而减少了任务的等待时间。
  • 提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控。

线程池实现原理

当向线程池提交一个任务之后,线程池的处理流程如下:

  1. 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
  2. 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
  3. 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。

举个例子,加深理解:

咱们作为开发者,上面都有开发主管,主管下面带领几个小弟干活,CTO给主管授权说,你可以招聘5个小弟干活,新来任务,如果小弟还不到5个,立即去招聘一个来干这个新来的任务,当5个小弟都招来了,再来任务之后,将任务记录到一个表格中,表格中最多记录100个,小弟们会主动去表格中获取任务执行,如果5个小弟都在干活,并且表格中也记录满了,那你可以将小弟扩充到20个,如果20个小弟都在干活,并且存放任务的表也满了,产品经理再来任务后,是直接拒绝,还是让产品自己干,这个由你自己决定,小弟们都尽心尽力在干活,任务都被处理完了,突然公司业绩下滑,几个员工没事干,打酱油,为了节约成本,CTO主管把小弟控制到5人,其他15个人直接被干掉了。所以作为小弟们,别让自己闲着,多干活。

原理: 先找几个人干活,大家都忙于干活,任务太多可以排期,排期的任务太多了,再招一些人来干活,最后干活的和排期都达到上层领导要求的上限了,那需要采取一些其他策略进行处理了。对于长时间不干活的人,考虑将其开掉,节约资源和成本。

JUC线程池架构

在Java中,线程池是由Executor框架实现的,Executor是最顶层的接口定义,其子类和实现类包括:ExecutorServiceScheduledExecutorServiceThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool等。

Executor

Executor是一个接口,只定义了一个execute()方法(void execute(Runnable command);),只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。

/**
 * An object that executes submitted {@link Runnable} tasks. This
 * interface provides a way of decoupling task submission from the
 * mechanics of how each task will be run, including details of thread
 * use, scheduling, etc.  An {@code Executor} is normally used
 * instead of explicitly creating threads. For example, rather than
 * invoking {@code new Thread(new(RunnableTask())).start()} for each
 * of a set of tasks, you might use:
 *
 * <pre>
 * Executor executor = <em>anExecutor</em>;
 * executor.execute(new RunnableTask1());
 * executor.execute(new RunnableTask2());
 * ...
 * </pre>
 *
 * However, the {@code Executor} interface does not strictly
 * require that execution be asynchronous. In the simplest case, an
 * executor can run the submitted task immediately in the caller's
 * thread:
 *
 *  <pre> {@code
 * class DirectExecutor implements Executor {
 *   public void execute(Runnable r) {
 *     r.run();
 *   }
 * }}</pre>
 *
 * More typically, tasks are executed in some thread other
 * than the caller's thread.  The executor below spawns a new thread
 * for each task.
 *
 *  <pre> {@code
 * class ThreadPerTaskExecutor implements Executor {
 *   public void execute(Runnable r) {
 *     new Thread(r).start();
 *   }
 * }}</pre>
 *
 * Many {@code Executor} implementations impose some sort of
 * limitation on how and when tasks are scheduled.  The executor below
 * serializes the submission of tasks to a second executor,
 * illustrating a composite executor.
 *
 *  <pre> {@code
 * class SerialExecutor implements Executor {
 *   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
 *   final Executor executor;
 *   Runnable active;
 *
 *   SerialExecutor(Executor executor) {
 *     this.executor = executor;
 *   }
 *
 *   public synchronized void execute(final Runnable r) {
 *     tasks.offer(new Runnable() {
 *       public void run() {
 *         try {
 *           r.run();
 *         } finally {
 *           scheduleNext();
 *         }
 *       }
 *     });
 *     if (active == null) {
 *       scheduleNext();
 *     }
 *   }
 *
 *   protected synchronized void scheduleNext() {
 *     if ((active = tasks.poll()) != null) {
 *       executor.execute(active);
 *     }
 *   }
 * }}</pre>
 *
 * The {@code Executor} implementations provided in this package
 * implement {@link ExecutorService}, which is a more extensive
 * interface.  The {@link ThreadPoolExecutor} class provides an
 * extensible thread pool implementation. The {@link Executors} class
 * provides convenient factory methods for these Executors.
 *
 * <p>Memory consistency effects: Actions in a thread prior to
 * submitting a {@code Runnable} object to an {@code Executor}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * its execution begins, perhaps in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService

ExecutorService在Executor的基础上加入了线程池的生命周期管理,可以通过shutdown或者shutdownNow方法来关闭线程池。 ExecutorService支持提交Callable形式的任务,提交完Callable任务后拿到一个Future(代表一个异步任务执行的结果)。

/**
 * An {@link Executor} that provides methods to manage termination and
 * methods that can produce a {@link Future} for tracking progress of
 * one or more asynchronous tasks.
 *
 * <p>An {@code ExecutorService} can be shut down, which will cause
 * it to reject new tasks.  Two different methods are provided for
 * shutting down an {@code ExecutorService}. The {@link #shutdown}
 * method will allow previously submitted tasks to execute before
 * terminating, while the {@link #shutdownNow} method prevents waiting
 * tasks from starting and attempts to stop currently executing tasks.
 * Upon termination, an executor has no tasks actively executing, no
 * tasks awaiting execution, and no new tasks can be submitted.  An
 * unused {@code ExecutorService} should be shut down to allow
 * reclamation of its resources.
 *
 * <p>Method {@code submit} extends base method {@link
 * Executor#execute(Runnable)} by creating and returning a {@link Future}
 * that can be used to cancel execution and/or wait for completion.
 * Methods {@code invokeAny} and {@code invokeAll} perform the most
 * commonly useful forms of bulk execution, executing a collection of
 * tasks and then waiting for at least one, or all, to
 * complete. (Class {@link ExecutorCompletionService} can be used to
 * write customized variants of these methods.)
 */
public interface ExecutorService extends Executor {

AbstractExecutorService

抽象类,实现了ExecutorService

ThreadPoolExecutor

线程池实现类,继承于AbstractExecutorService,JUC线程池的核心实现类

ScheduledExecutorService

继承于ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口

ScheduledThreadPoolExecutor

继承于ThreadPoolExecutor,实现了ExecutorService中延时执行和周期执行等抽象方法

Executors

静态工厂类,它通过静态工厂方法返回ExecutorService、ScheduledExecutorService等线程池示例对象

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系:

  • ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
  • ExecutorService又是继承了Executor接口,Executor是顶层接口。

线程池的创建

Executors中提供了一系列静态方法创建线程池:

newSingleThreadExecutor

newSingleThreadExecutor创建“单线程化线程池”。一个单线程的线程池。如果因异常结束,会再创建一个新的,保证按照提交顺序执行。


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class NewSingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 3; i++) {
            pool.execute(new newSingleThreadExecutorRunnable());
            pool.submit(new newSingleThreadExecutorRunnable());
        }
        pool.shutdown();
    }

    static class newSingleThreadExecutorRunnable implements Runnable {

        static AtomicInteger taskNo = new AtomicInteger(1);

        private String taskName;

        public newSingleThreadExecutorRunnable() {
            taskName = "task-" + taskNo;
            taskNo.incrementAndGet();
        }

        @Override
        public void run() {
            System.out.println("task:" + taskName + " start...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task:" + taskName + " end...");
        }
    }

}

特点:

  • 单线程化的线程池中的任务是按照提交的次序顺序执行的
  • 只有一个线程的线程池
  • 池中的唯一线程的存活时间是无限的
  • 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的
  • 适用场景:任务按照提交次序,一个任务一个任务地逐个执行的场景

newFixedThreadPool

newFixedThreadPool创建“固定数量的线程池。创建固定大小的线程池。根据提交的任务逐个增加线程,直到最大值保持不变。如果因异常结束,会新创建一个线程补充。


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class NewFixedThreadPoolDemo {


    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 4; i++) {
            pool.execute(new NewFixedThreadPoolRunnable());
            pool.submit(new NewFixedThreadPoolRunnable());
        }
        pool.shutdown();
    }

    static class NewFixedThreadPoolRunnable implements Runnable {

        static AtomicInteger taskNo = new AtomicInteger(1);

        private String taskName;

        public NewFixedThreadPoolRunnable() {
            taskName = "task-" + taskNo;
            taskNo.incrementAndGet();
        }

        @Override
        public void run() {
            System.out.println("task:" + taskName + " start...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task:" + taskName + " end...");
        }
    }
}

特点:

  • 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量
  • 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
  • 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)

适用场景:

  • 需要任务长期执行的场景
  • CPU密集型任务

缺点:

  • 内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽

newCachedThreadPool

newCachedThreadPool创建“可缓存线程池” newCachedThreadPool:创建一个可缓存的线程池。会根据任务自动新增或回收线程。


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class NewCachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 4; i++) {
            pool.execute(new NewCachedThreadPoolRunnable());
            pool.submit(new NewCachedThreadPoolRunnable());
        }
        pool.shutdown();
    }

    static class NewCachedThreadPoolRunnable implements Runnable {

        static AtomicInteger taskNo = new AtomicInteger(1);

        private String taskName;

        public NewCachedThreadPoolRunnable() {
            taskName = "task-" + taskNo;
            taskNo.incrementAndGet();
        }

        @Override
        public void run() {
            System.out.println("task:" + taskName + " start...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task:" + taskName + " end...");
        }
    }
}

特点:

  • 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务
  • 线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
  • 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程

适用场景:

  • 需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景

缺点:

  • 线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因创建线程过多而导致资源耗尽

newScheduledThreadPool

newScheduledThreadPool创建“可调度线程池”。支持定时以及周期性执行任务的需求。


import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class NewScheduledThreadPoolDemo {

    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
        for (int i = 0; i < 4; i++) {
            pool.scheduleAtFixedRate(new NewScheduledThreadPoolRunnable(), 0, 500, TimeUnit.MILLISECONDS);

        }
        pool.shutdown();
    }

    static class NewScheduledThreadPoolRunnable implements Runnable {

        static AtomicInteger taskNo = new AtomicInteger(1);

        private String taskName;

        public NewScheduledThreadPoolRunnable() {
            taskName = "task-" + taskNo;
            taskNo.incrementAndGet();
        }

        @Override
        public void run() {
            System.out.println("task:" + taskName + " start...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task:" + taskName + " end...");
        }
    }
}

Executors创建线程池的4种方法十分方便,但是构造器创建普通线程池、可调度线程池比较复杂,这些构造器会涉及大量的复杂参数,已经较少使用。

newWorkStealingPool

JDK8新增,根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层使用ForkJoinPool来实现。优势在于可以充分利用多CPU,把一个任务拆分成多个“小任务”,放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

Executors创建线程池存在的问题

  • 创建固定数量线程池的问题
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

阻塞队列无界,队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽

  • 创建单线程线程池的问题
     public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
    

    问题和固定数量线程池一样,阻塞队列无界

  • 创建缓存线程池的问题
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽

  • 创建可调度线程存在的问题 ```java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

主要问题在于线程数不设上限

总结:
- newFixedThreadPool和newSingleThreadExecutor: 阻塞队列无界,会堆积大量任务导致OOM(内存耗尽)
- newCachedThreadPool和newScheduledThreadPool: 线程数量无上界,会导致创建大量的线程,从而导致OOM
- 建议直接使用线程池ThreadPoolExecutor的构造器


## 线程池的实现

jdk中提供了线程池的具体实现,实现类是:`java.util.concurrent.ThreadPoolExecutor`。

一开始,我们先用一个通俗的例子来帮助我们理解线程池的运行机制:

假如我们建了一家加工厂,那么第一个问题来了:工人编制规模是多少?(这个数字就对应线程池的 corePoolSize,即线程池核心线程数量)

接下来,假定工厂满编20个工人,那么第二个问题就是工人怎么招?(也就是线程池的线程初始化策略)根据老板的豪气程度无非有三种方式:

- 第一种,老板抠到极致,不见兔子不撒鹰,接一部分活儿招一个工人,直到满编(也就是不进行线程池的线程初始化)
- 第二种,老板精打细算,先招一个人充充门面(即调用 prestartCoreThread() 初始化一个核心线程)
- 第三种,老板豪气万丈,不管有没有活干,先把工人都安排到位(即调用prestartAllCoreThreads(),初始化所有核心线程)
不管怎么样,工人总是要来的,那么工人的劳动合同该如何签呢?是终身劳动合同,厂子不倒,人员不散?还是铁打的营盘流水的兵?

- 前者就是 allowCoreThreadTimeOut 设置为 false,即核心线程不设置存活时间
- 后者就是 allowCoreThreadTimeOut 设置为 true,即核心线程设置存活时间,存活时间的长度就是 keepAliveTime
接下来,工厂的生产线也建设起来了(在线程池里称之为 HashSet<Worker> workers),工人进入这条生产线进行生产。

完事俱备,工厂开始接受订单。为了更好地调度生产,一个调度员入职,英文名字execute。在拿到订单后,调度员execute按既定流程开始工作:

清点一下当前的工人人数(即线程池的 poolSize),发现人员没满编,于是立马招一个工人来接下这个工作任务。
工人人数满编了,于是调度室把待加工的构建放置到工厂仓库(即任务队列BlockingQueue<Runnable> workQueue),等待有干完活儿的工人来处理,当然工人是没这个主动性的,所以又一个调度员 getTask 入职了,他的任务就是实时将仓库的待加工任务分配给空闲下来的工人。
繁忙的时候,调度员execute发现工人满负荷工作,仓库也堆满了,而订单还在雪花般飞来,为了把这些订单消化掉,execute 赶紧招了一批临时工,把工厂工人规模临时扩大到极致(即 maximumPoolSize,线程池最大容量)。当然当生产任务没那么繁忙时,这些临时工就要被裁撤了,毕竟临时工是有成本的。
当临时工都到位后,订单仍然源源不断,老板也只能忍痛割爱,拒绝后续订单了(即线程池的拒绝策略)。
工厂当然不能稀里糊涂地一门心思生产,毕竟工厂业绩老板是很关心的,于是生产总量要被统计(即 completedTaskCount,线程池已完成的任务数)。工厂最多有过多少工人也被顺手统计了(即 largestPoolSize,线程池出现过的最大线程数)

当有一天,工厂因为某原因关闭时,会有两种情形:
- 工厂宣布关闭,不再接受订单,但会把已经接受的订单做完,然后遣散工人(即 调用 shutdown()关闭线程池,比较柔和的关闭方式)
- 工厂宣布立即关闭,不仅不再接受订单,而且把仓库里的待加工组件清空,工人停止手头的工作并遣散(即调用 shutdownNow(),比较激进的关闭方式)。


## 构造方法
在ThreadPoolExecutor类中提供了四个构造方法:
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
}
  • corePoolSize 线程池中的核心线程数。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法(从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程)。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

  • maximumPoolSize 线程池最大线程数。它表示在线程池中最多能创建多少个线程。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue。

  • keepAliveTime:线程空闲时的存活时间。表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

  • unit 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。类型是一个枚举java.util.concurrent.TimeUnit
      1. 天:TimeUnit.DAYS;
      1. 小时:TimeUnit.HOURS;
      1. 分钟:TimeUnit.MINUTES;
      1. 秒:TimeUnit.SECONDS;
      1. 毫秒:TimeUnit.MILLISECONDS;
      1. 微秒:TimeUnit.MICROSECONDS;
      1. 纳秒:TimeUnit.NANOSECONDS;
  • workQueue 工作队列,用于缓存待处理任务的阻塞队列。在JDK中提供了如下阻塞队列:

    • ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO排序任务;

    • LinkedBlockingQuene: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

    • SynchronousQuene: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;

    • PriorityBlockingQuene: 具有优先级的无界阻塞队列

LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer()

  • threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory

  • handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略

    • AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。默认策略;
    • CallerRunsPolicy: 由调用线程处理该任务
    • DiscardOldestPolicy: 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • DiscardPolicy: 也是丢弃任务,但是不抛出异常。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

重要成员变量

接下来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

// 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock(); 
//用来存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();  
//线程存活时间
private volatile long  keepAliveTime; 
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut; 
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   corePoolSize; 
//线程池最大能容忍的线程数
private volatile int   maximumPoolSize; 
//线程池中当前的线程数  
private volatile int   poolSize;
//任务拒绝策略      
private volatile RejectedExecutionHandler handler; 
//线程工厂,用来创建线程
private volatile ThreadFactory threadFactory;  
//用来记录线程池中曾经出现过的最大线程数 
private int largestPoolSize; 
//用来记录已经执行完毕的任务个数  
private long completedTaskCount;  

向线程池提交任务的两种方式

  • execute方法 void execute(Runnable command): Executor接口中的方法
  • submit方法 Future submit(Callable task); Future submit(Runnable task, T result); Future<?> submit(Runnable task); 这3个submit方法都是ExecutorService接口中的方法

两种方法的区别:

  • execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数
  • Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果
  • submit()提交任务后会有返回值,而execute()没有
  • submit()方便Exception处理

通过submit()返回的Future对象获取结果

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

public class CreateThreadPollDemo {
    public static void main(String[] args) throws InterruptedException {
       ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);
       Future<Integer> future=pool.submit(new Callable<Integer>() {

        @Override
        public Integer call() throws Exception {
            return 123;
        }
           
       });
       try {
        Integer result=future.get();
        System.out.println("result:"+result);//123
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    Thread.sleep(1000);
    pool.shutdown();

    }
  
}

通过submit()返回的Future对象捕获异常

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import javax.management.RuntimeErrorException;

public class CreateThreadPollDemo {
    public static final int SLEEP_GAP=1000;
    static class TargetTask implements Runnable{
        static AtomicInteger taskNo=new AtomicInteger(1);
        String taskName;
        public TargetTask()
        {
            taskName="task-"+taskNo;
            taskNo.incrementAndGet();
        }
        public void run()
        {
            System.out.println(taskName+" is doing...");
            try {
                Thread.sleep(SLEEP_GAP);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(taskName+" end...");
        }
    }
    static class TargetTaskWithError extends TargetTask{
        public void run()
        {
            super.run();//执行父类的run方法
            throw new RuntimeException("Error from "+taskName);
        }
    }

    public static void main(String[] args) throws InterruptedException {
       ScheduledExecutorService pool=Executors.newScheduledThreadPool(2);
      pool.execute(new TargetTaskWithError());
      Future future=pool.submit(new TargetTaskWithError());
      try {
        if(future.get()==null)
          {
              System.out.println("No Exception");
          }
    } catch (ExecutionException e) {
      
        e.printStackTrace();
    }
    Thread.sleep(1000);
    pool.shutdown();

    }
  
}

execute()方法在启动任务执行后,任务执行过程中可能发生的异常调用者并不关心。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获

线程池的任务调度流程

  1. 如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程
  2. 如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。
  3. 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空
  4. 在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务
  5. 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略

注意点:

  • 核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象
  • 线程池的调度器创建线程的一条重要的规则是:在corePoolSize已满之后,还需要等阻塞队列已满,才会去创建新的线程

例如: 设置核心线程数量为1,阻塞队列为100,有5个任务待执行(假设极端情况下任务一直执行不接受),则只有1个任务可以被执行,其他4个任务在阻塞队列中,而不是创建新线程进行处理(阻塞队列未满)

ThreadFactory(线程工厂)

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

ThreadFactory是Java线程工厂接口,只有1个方法,调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态等 使用Executors创建新的线程池时,可以指定工厂,未指定是默认使用线程池时,也可以基于ThreadFactory(线程工厂)创建,在创建新线程池时可

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;


public class CreateThreadPollDemo {
    public static final int SLEEP_GAP=1000;
    static class TargetTask implements Runnable{
        static AtomicInteger taskNo=new AtomicInteger(1);
        String taskName;
        public TargetTask()
        {
            taskName="task-"+taskNo;
            taskNo.incrementAndGet();
        }
        public void run()
        {   
            
            System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");

            try {
                Thread.sleep(SLEEP_GAP);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(taskName+" end...");
        }
    }
    static class SimpleThreadFactory implements ThreadFactory{
        static AtomicInteger threadNo=new AtomicInteger(1);
        public Thread newThread(Runnable task) {
           String threadName="simpleThread-"+threadNo;
           System.out.println("创建一条线程,名字是:"+threadName);
           threadNo.incrementAndGet();
           Thread thread=new Thread(task,threadName);
           thread.setDaemon(true);
           return thread;
    }
  

    public static void main(String[] args) throws InterruptedException {
     ExecutorService pool=Executors.newFixedThreadPool(2,new SimpleThreadFactory());
     // ExecutorService pool=Executors.newFixedThreadPool(2);
      for(int i=0;i<5;i++)
      {
          pool.submit(new TargetTask());
      }
     
    Thread.sleep(5000);
    pool.shutdown();

    }
  
}
}

线程工厂和线程池工厂: Executors为线程池工厂类,用于快捷创建线程池(Thread Pool);ThreadFactory为线程工厂类,用于创建线程(Thread)

任务阻塞队列

特点:在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒

常见的几种阻塞队列的实现:

  • ArrayBlockingQueue:是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序,ArrayBlockingQueue在创建时必须设置大小
  • LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)
  • PriorityBlockingQueue:是具有优先级的无界队列

调度器的钩子方法

三个钩子方法存在于ThreadPoolExecutor类,这3个方法都是空方法,一般会在子类中重写 protected void beforeExecute(Thread t, Runnable r) { }: 任务执行之前的钩子方法 protected void afterExecute(Runnable r, Throwable t) { }: 任务执行之后的钩子方法 protected void terminated() { }: 线程池终止时的钩子方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class CreateThreadPollDemo {
    public static final int SLEEP_GAP=1000;
    static class TargetTask implements Runnable{
        static AtomicInteger taskNo=new AtomicInteger(1);
        String taskName;
        public TargetTask()
        {
            taskName="task-"+taskNo;
            taskNo.incrementAndGet();
        }
        public void run()
        {   
            
            System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");

            try {
                Thread.sleep(SLEEP_GAP);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(taskName+" end...");
        }
    }
    static class SimpleThreadFactory implements ThreadFactory{
        static AtomicInteger threadNo=new AtomicInteger(1);
        public Thread newThread(Runnable task) {
           String threadName="simpleThread-"+threadNo;
           System.out.println("创建一条线程,名字是:"+threadName);
           threadNo.incrementAndGet();
           Thread thread=new Thread(task,threadName);
           thread.setDaemon(true);
           return thread;
    }
  

    public static void main(String[] args) throws InterruptedException {
   
    ExecutorService pool=new ThreadPoolExecutor(2, 4, 60,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)){
        @Override
        protected void terminated()
        {
            System.out.println("调度器已停止...");
        }
        @Override
        protected void beforeExecute(Thread t,Runnable target)
        {
            System.out.println("前钩执行...");
            super.beforeExecute(t, target);
        }
        @Override
        protected void afterExecute(Runnable target,Throwable t)
        {
            System.out.println("后钩执行...");
            super.afterExecute(target, t);
        }
    };
    for(int i=0;i<5;i++)
        pool.execute(new TargetTask());
    Thread.sleep(5000);
    pool.shutdown();

    }
  
}
}

线程池的拒绝策略

拒绝情况:

  • 线程池已经被关闭
  • 工作队列已满且maximumPoolSize已满
  1. AbortPolicy:拒绝策略 新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略
  2. DiscardPolicy:抛弃策略 新任务就会直接被丢掉,并且不会有任何异常抛出
  3. DiscardOldestPolicy:抛弃最老任务策略 将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列(一般队头元素最老)
  4. CallerRunsPolicy:调用者执行策略 新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务
  5. 自定义策略 ```java import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;

public class CreateThreadPollDemo { public static final int SLEEP_GAP=1000; static class TargetTask implements Runnable{ static AtomicInteger taskNo=new AtomicInteger(1); String taskName; public TargetTask() { taskName=”task-“+taskNo; taskNo.incrementAndGet(); } public void run() {

        System.out.println(Thread.currentThread().getName()+": "+taskName+" is doing...");

        try {
            Thread.sleep(SLEEP_GAP);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(taskName+" end...");
    }
}
static class SimpleThreadFactory implements ThreadFactory{
    static AtomicInteger threadNo=new AtomicInteger(1);
    public Thread newThread(Runnable task) {
       String threadName="simpleThread-"+threadNo;
       System.out.println("创建一条线程,名字是:"+threadName);
       threadNo.incrementAndGet();
       Thread thread=new Thread(task,threadName);
       thread.setDaemon(true);
       return thread;
}
static class CustomerIgnorePolicy implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(Thread.currentThread().getName()+"-rejected;  taskCount-"+executor.getTaskCount());
        
    }
    
}
  

public static void main(String[] args) throws InterruptedException {
    int corePoolSize=2;//核心线程数
    int maximumPoolSize=4;//最大线程数
    long keepAlive=10;//空闲时间
    TimeUnit unit=TimeUnit.SECONDS;//时间单位
    BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<>(2);//阻塞队列
    ThreadFactory factory=new SimpleThreadFactory();//自定义线程工厂
    RejectedExecutionHandler policy=new CustomerIgnorePolicy();//自定义拒绝策略
    ThreadPoolExecutor pool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAlive,unit,workQueue,factory,policy);
   
 pool.prestartAllCoreThreads();
for(int i=0;i<11;i++)
    pool.execute(new TargetTask());
Thread.sleep(5000);
pool.shutdown();

}

} }


## 线程池状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
```java
// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

runState表示当前线程池的状态,它是一个 volatile 变量用来保证线程之间的可见性。

  • RUNNING: 线程池创建之后的初始状态,这种状态下可以执行任务
  • SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕
  • STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程
  • TIDYING:该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法
  • TERMINATED:执行完terminated()钩子方法之后的状态

RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 (2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 (2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。 (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。 (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

几种关闭线程池的方法:

  • shutdown()方法 等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务
 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检测权限
            advanceRunState(SHUTDOWN);//设置线程池状态
            interruptIdleWorkers();//中断空闲线程
            onShutdown(); // 钩子函数,用于清理一些资源
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
  • shutdownNow()方法 立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务
    public List<Runnable> shutdownNow() {
          List<Runnable> tasks;
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              checkShutdownAccess();//检测权限
              advanceRunState(STOP);//设置线程池状态
              interruptWorkers();//中断所有线程(工作线程以及空闲线程)
              tasks = drainQueue();//丢弃工作队列中的剩余任务
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
          return tasks;
      }
    
  • awaitTermination()方法 等待线程池完成关闭, shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成
    public boolean awaitTermination(long timeout, TimeUnit unit)
          throws InterruptedException {
          long nanos = unit.toNanos(timeout);
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              for (;;) {
                  if (runStateAtLeast(ctl.get(), TERMINATED))
                      return true;
                  if (nanos <= 0)
                      return false;
                  nanos = termination.awaitNanos(nanos);
              }
          } finally {
              mainLock.unlock();
          }
      }
    

    在设置的时间timeout内如果线程池完成关闭,返回true, 否则返回false

合理地配置线程池

要想合理的配置线程池,需要先分析任务的特性,可以冲一下几个角度分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
  • 任务的优先级:高、中、低
  • 任务的执行时间:长、中、短
  • 任务的依赖性:是否依赖其他的系统资源,如数据库连接。

性质不同任务可以用不同规模的线程池分开处理。CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。

使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无解队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

线程池中线程数量的配置

线程池汇总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有消息的使用机器的性能。Java Concurrency inPractice书中给出了估算线程池大小的公式:

Ncpu = CUP的数量
Ucpu = 目标CPU的使用率0<=Ucpu<=1
W/C = 等待时间与计算时间的比例
为保存处理器达到期望的使用率最有的线程池的大小等于
Nthreads = Ncpu × Ucpu × (1+W/C)

一些使用建议

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

ThreadPoolTaskExecutor 其他知识点汇总(待补充)

  1. 线程池中的所有线程超过了空闲时间都会被销毁么?

    如果allowCoreThreadTimeOut为true,超过了空闲时间的所有线程都会被回收,不过这个值默认是false,系统会保留核心线程,其他的会被回收

  2. 空闲线程是如何被销毁的?

    所有运行的工作线程会尝试从队列中获取任务去执行,超过一定时间(keepAliveTime)还没有拿到任务,自己主动退出

  3. 核心线程在线程池创建的时候会初始化好么?

    默认情况下,核心线程不会进行初始化,在刚开始调用线程池执行任务的时候,传入一个任务会创建一个线程,直到达到核心线程数。不过可以在创建线程池之后,调用其prestartAllCoreThreads提前将核心线程创建好。

ThreadPoolExecutor 源码分析

Java中的线程池核心实现类是ThreadPoolExecutor,我们基于JDK 1.8的源码来分析Java线程池的核心设计与实现。

线程池中的 ctl 属性分析

  • 将线程池状态与线程个数合二为一存储在一个原子变量 ctl 中,目的是只用一次 cas 原子操作就可以进行赋值更新两个信息
  • 即 ctl 中的高3位表示线程状态,低29位表示线程个数 ```java // 将线程池状态与线程个数合二为一存储在一个原子变量 ctl 中,目的是只用一次 cas 原子操作就可以进行赋值更新两个信息 // 初始是 RUNNING 状态,线程数为 0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 根据线程池状态和工作线程数量进行或运算,得到ctl的值 // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,通过或运算合并它们,存放在 ctl 中 private static int ctlOf(int rs, int wc) { return rs | wc; }

// Integer.SIZE=32表示Integer的长度,32-3=29,低 29 位代表线程个数 private static final int COUNT_BITS = Integer.SIZE - 3; // 1 « COUNT_BITS 表示 1左移29位,即 00100000 00000000 00000000 00000000 // (1 « COUNT_BITS) - 1 即 00100000 00000000 00000000 00000000 - 1 = 00011111 11111111 11111111 11111111 (十进制是2^29 - 1) // 线程池允许最大线程个数 2^29 - 1(即低 29 位全为 1) private static final int CAPACITY = (1 « COUNT_BITS) - 1;

// 从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING // 线程池的5种状态需要3位二进制才能表示 // -1 的二进制是 11111111 11111111 11111111 11111111 左移29位后为 11100000 00000000 00000000 00000000 即高3位为111 private static final int RUNNING = -1 « COUNT_BITS; // 高3位:111,接受新任务并处理排队任务 private static final int SHUTDOWN = 0 « COUNT_BITS; // 高3位:000,不接受新任务,但处理排队任务 private static final int STOP = 1 « COUNT_BITS; // 高3位:001,不接受新任务,不处理排队任务,并中断正在进行的任务 private static final int TIDYING = 2 « COUNT_BITS; // 高3位:010,所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法,terminate()是个空方法,需要自己实现逻辑 private static final int TERMINATED = 3 « COUNT_BITS; // 高3位:011,terminate() 已执行完

// 计算线程池的运行状态,c & ~CAPACITY 的结果是 c的高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算工作线程的数量,c & CAPACITY 的结果是 c的低29位 private static int workerCountOf(int c) { return c & CAPACITY; }


## 线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

- prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程
- prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程,并返回初始化的线程数

```java
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}

注意上面传进去的参数是null,知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take(); 即等待任务队列中有任务。

execute()

方法说明:在ThreadPoolExecutor类中,任务提交方法的入口是execute(Runnable command) 方法(submit()方法也是调用了execute()),该方法其实只在尝试做一件事:经过各种校验之后,调用 addWorker(Runnable command,boolean core) 方法为线程池创建一个线程并执行任务,与之相对应,execute() 的结果有两个:

  • 成功调用了addWorker()(剩下的执行任务要交给后续方法去完成了)
  • 未能调用addWorker并拒绝本次任务,返回null。

参数说明:

  • Runnable command:待执行的任务

执行流程:

  1. 通过 ctl.get() 得到线程池的当前线程数,如果线程数小于corePoolSize,则调用 addWorker(commond,true) 方法创建新的线程执行任务,否则执行步骤2;
  2. 步骤1失败,说明已经无法再创建新线程,那么考虑将任务放入阻塞队列,等待执行完任务的线程来处理。基于此,判断线程池是否处于Running状态(只有Running状态的线程池可以接受新任务),如果任务添加到任务队列成功则进入步骤3,失败则进入步骤4
  3. 来到这一步需要说明任务已经加入任务队列,这时要二次校验线程池的状态,会有以下情形:
    • 线程池不再是Running状态了,需要将任务从任务队列中移除,如果移除成功则拒绝本次任务
    • 线程池是Running状态,则判断线程池工作线程是否为0,是则调用 addWorker(commond,true) 添加一个没有初始任务的线程(这个线程将去获取已经加入任务队列的本次任务并执行),否则进入步骤4;
    • 线程池不是Running状态,但从任务队列移除任务失败(可能已被某线程获取?),进入步骤4
  4. 将线程池扩容至maximumPoolSize并调用 addWorker(commond,false) 方法创建新的线程执行任务,失败则拒绝本次任务
/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 * 在将来的某个时候执行给定的任务。任务可以在新线程中执行,也可以在现有的池线程中执行。
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 * 如果由于此执行器已关闭或已达到其容量而无法提交任务以供执行,则由当前的{@code RejectedExecutionHandler}处理该任务。
 * @param command the task to execute  待执行的任务命令
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务启动新线程。
     * 对addWorker的调用以原子方式检查runState和workerCount,
     * 因此可以通过返回false来防止在不应该添加线程时出现错误警报。
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果一个任务可以成功排队,那么我们仍然需要仔细检查两点,其一,我们是否应该添加一个线程
     * (因为自从上次检查至今,一些存在的线程已经死亡),其二,线程池状态此时已改变成非运行态。因此,我们重新检查状态,如果检查不通过,则移除已经入列的任务,如果检查通过且线程池线程数为0,则启动新线程。
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 3. 如果无法将任务加入任务队列,则将线程池扩容到极限容量并尝试创建一个新线程,
     * 如果失败则拒绝任务。
     */
    int c = ctl.get();
   
    // 步骤1:判断线程池当前线程数是否小于线程池大小
    if (workerCountOf(c) < corePoolSize) {
        // 增加一个工作线程并添加任务,成功则返回,否则进行步骤2
        // true代表使用coreSize作为边界约束,否则使用maximumPoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 步骤2:不满足workerCountOf(c) < corePoolSize或addWorker失败,进入步骤2
    // 校验线程池是否是Running状态且任务是否成功放入workQueue(阻塞队列)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次校验,如果线程池非Running且从任务队列中移除任务成功,则拒绝该任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池工作线程数量为0,则新建一个空任务的线程
        else if (workerCountOf(recheck) == 0)
            // 如果线程池不是Running状态,是加入不进去的
            addWorker(null, false);
    }
    // 步骤3:如果线程池不是Running状态或任务入列失败,尝试扩容maxPoolSize后再次addWorker,失败则拒绝任务
    else if (!addWorker(command, false))
        reject(command);
}

addWorker()

方法说明: addWorker(Runnable firstTask, boolean core) 方法,顾名思义,向线程池添加一个带有任务的工作线程。 参数说明:

  • Runnable firstTask:新创建的线程应该首先运行的任务(如果没有,则为空)。
  • boolean core:该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用corePollSize 作为约束值,否则使用maximumPoolSize。

执行流程:

  1. 外层循环判断线程池的状态是否可以新增工作线程。这层校验基于下面两个原则:
    • 线程池为Running状态时,既可以接受新任务也可以处理任务
    • 线程池为关闭状态时只能新增空任务的工作线程(worker)处理任务队列(workQueue)中的任务不能接受新任务
  2. 内层循环向线程池添加工作线程并返回是否添加成功的结果。
    • 首先校验线程数是否已经超限制,是则返回false,否则进入下一步
    • 通过CAS使工作线程数+1,成功则进入步骤3,失败则再次校验线程池是否是运行状态,是则继续内层循环,不是则返回外层循环
  3. 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程
    • 首先获取锁之后,再次校验线程池状态(具体校验规则见代码注解),通过则进入下一步,未通过则添加线程失败
    • 线程池状态校验通过后,再检查线程是否已经启动,是则抛出异常,否则尝试将线程加入线程池
    • 检查线程是否启动成功,成功则返回true,失败则进入 addWorkerFailed
private boolean addWorker(Runnable firstTask, boolean core) {
    // 外层循环:判断线程池状态
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 
         * 1.线程池为非Running状态(Running状态则既可以新增核心线程也可以接受任务)
         * 2.线程为shutdown状态且firstTask为空且队列不为空
         * 3.满足条件1且条件2不满足,则返回false
         * 4.条件2解读:线程池为shutdown状态时且任务队列不为空时,可以新增空任务的线程来处理队列中的任务
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

		// 内层循环:线程池添加核心线程并返回是否添加成功的结果
        for (;;) {
            int wc = workerCountOf(c);
            // 校验线程池已有线程数量是否超限:
            // 1.线程池最大上限CAPACITY 
            // 2.corePoolSize或maximumPoolSize(取决于入参core)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            // 通过CAS操作使工作线程数+1,跳出外层循环
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            // 线程+1失败,重读ctl
            c = ctl.get();   // Re-read ctl
            // 如果此时线程池状态不再是running,则重新进行外层循环
            if (runStateOf(c) != rs)
                continue retry;
            // 其他 CAS 失败是因为工作线程数量改变了,继续内层循环尝试CAS对线程数+1
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 下面代码需要加锁:线程池主锁
            mainLock.lock(); 
            try {
                // Recheck while holding lock.  
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 持锁期间重新检查,线程工厂创建线程失败或获取锁之前关闭的情况发生时,退出
                int c = ctl.get();
                int rs = runStateOf(c);

				// 再次检验线程池是否是running状态或线程池shutdown但线程任务为空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 线程已经启动,则抛出非法线程状态异常
                    // 为什么会存在这种状态呢?未解决
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //加入线程池
                    int s = workers.size();
                    // 如果当前工作线程数超过线程池曾经出现过的最大线程数,刷新后者值
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();  // 释放锁
            }
            if (workerAdded) { // 工作线程添加成功,启动该线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //线程启动失败,则进入addWorkerFailed
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker类

Worker类是内部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。

Worker类主要维护正在运行任务的线程的中断控制状态,以及其他次要的记录。这个类适时地继承了AbstractQueuedSynchronizer类,以简化获取和释放锁(该锁作用于每个任务执行代码)的过程。这样可以防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。

我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁,因为我们不希望工作任务在调用setCorePoolSize之类的池控制方法时能够重新获取锁。另外,为了在线程真正开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
 
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; 
     
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
     
    /** Per-thread task counter */
    volatile long completedTasks;
 
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    // 通过构造函数初始化,
    Worker(Runnable firstTask) {
        //设置AQS的同步状态
        // state:锁状态,-1为初始值,0为unlock状态,1为lock状态
        setState(-1); // inhibit interrupts until runWorker  在调用runWorker前,禁止中断
       
        this.firstTask = firstTask;
        // 线程工厂创建一个线程
        this.thread = getThreadFactory().newThread(this); 
    }
 
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this); //runWorker()是ThreadPoolExecutor的方法
    }
 
    // Lock methods
    // The value 0 represents the unlocked state. 0代表“没被锁定”状态
    // The value 1 represents the locked state. 1代表“锁定”状态
 
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
 
    /**
     * 尝试获取锁的方法
     * 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
     */
    protected boolean tryAcquire(int unused) {
        // 判断原值为0,且重置为1,所以state为-1时,锁无法获取。
        // 每次都是0->1,保证了锁的不可重入性
        if (compareAndSetState(0, 1)) {
            // 设置exclusiveOwnerThread=当前线程
            setExclusiveOwnerThread(Thread.currentThread()); 
            return true;
        }
        return false;
    }
 
    /**
     * 尝试释放锁
     * 不是state-1,而是置为0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }
 
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
 
    /**
     * 中断(如果运行)
     * shutdownNow时会循环对worker线程执行
     * 且不需要获取worker锁,即使在worker运行时也可以中断
     */
    void interruptIfStarted() {
        Thread t;
        //如果state>=0、t!=null、且t没有被中断
        //new Worker()时state==-1,说明不能中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker()

方法说明:可以说,runWorker(Worker w) 是线程池中真正处理任务的方法,前面的execute() 和 addWorker() 都是在为该方法做准备和铺垫。

参数说明:

  • Worker w:封装的Worker,携带了工作线程的诸多要素,包括 Runnable(待处理任务)、lock(锁)、completedTasks(记录线程池已完成任务数)

执行流程:

  1. 判断当前任务或者从任务队列中获取的任务是否不为空,都为空则进入步骤2,否则进入步骤3
  2. 任务为空,则将completedAbruptly置为false(即线程不是突然终止),并执行processWorkerExit(w,completedAbruptly) 方法进入线程退出程序
  3. 任务不为空,则进入循环,并加锁
  4. 判断是否为线程添加中断标识,以下两个条件满足其一则添加中断标识:线程池状态>=STOP,即STOP或TERMINATED一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED)执行前置方法 beforeExecute(wt, task)(该方法为空方法,由子类实现)后执行task.run() 方法执行任务(执行不成功抛出相应异常)执行后置方法 afterExecute(task, thrown)(该方法为空方法,由子类实现)后将线程池已完成的任务数+1,并释放锁。再次进行循环条件判断。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // allow interrupts
    // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
    w.unlock(); 
            
    // 线程退出的原因,true是任务导致,false是线程正常退出
    boolean completedAbruptly = true; 
    try {
        // 当前任务和从任务队列中获取的任务都为空,方停止循环
        while (task != null || (task = getTask()) != null) {
            //上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
            w.lock(); 
             
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
             * 条件1:线程池状态>=STOP,即STOP或TERMINATED
             * 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED),
             * 条件1与条件2任意满意一个,且wt不是中断状态,则中断wt,否则进入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); //当前线程调用interrupt()中断
             
            try {
                //执行前(空方法,由子类重写实现)
                beforeExecute(wt, task);
                 
                Throwable thrown = null;
                try {
                    task.run();
                } 
                catch (RuntimeException x) {
                    thrown = x; throw x;
                } 
                catch (Error x) {
                    thrown = x; throw x;
                } 
                catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } 
                finally {
                    //执行后(空方法,由子类重写实现)
                    afterExecute(task, thrown); 
                }
            } 
            finally {
                task = null; 
                w.completedTasks++; //完成任务数+1
                w.unlock(); //释放锁
            }
        }
        // 
        completedAbruptly = false;
    } 
    finally {
        //处理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()

方法说明:由函数调用关系图可知,在ThreadPoolExecutor类的实现中,Runnable getTask() 方法是为 void runWorker(Worker w)方法服务的,它的作用就是在任务队列(workQueue)中获取 task(Runnable)。 参数说明:无参数 执行流程:

  • 将timedOut(上次获取任务是否超时)置为false(首次执行方法,无上次,自然为false),进入一个无限循环
  • 如果线程池为Shutdown状态且任务队列为空(线程池shutdown状态可以处理任务队列中的任务,不再接受新任务,这个是重点)或者线程池为STOP或TERMINATED状态,则意味着线程池不必再获取任务了,当前工作线程数量-1并返回null,否则进入步骤3
  • 如果线程池数量超限制或者时间超限且(任务队列为空或当前线程数>1),则进入步骤4,否则进入步骤5。
  • 移除工作线程,成功则返回null,不成功则进入下轮循环。
  • 尝试用poll() 或者 take()(具体用哪个取决于timed的值)获取任务,如果任务不为空,则返回该任务。如果为空,则将timeOut 置为 true进入下一轮循环。如果获取任务过程发生异常,则将 timeOut置为 false 后进入下一轮循环。
private Runnable getTask() {
    // 最新一次poll是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 条件1:线程池状态SHUTDOWN、STOP、TERMINATED状态
         * 条件2:线程池STOP、TERMINATED状态或workQueue为空
         * 条件1与条件2同时为true,则workerCount-1,并且返回null
         * 注:条件2是考虑到SHUTDOWN状态的线程池不会接受任务,但仍会处理任务
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /**
         * 下列两个条件满足任意一个,则给当前正在尝试获取任务的工作线程设置阻塞时间限制(超时会被销毁?不太确定这点),否则线程可以一直保持活跃状态
         * 1.allowCoreThreadTimeOut:当前线程是否以keepAliveTime为超时时限等待任务
         * 2.当前线程数量已经超越了核心线程数
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
        // 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程
        // 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时)
        // 条件2:wc > 1或任务队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 移除工作线程,成功则返回null,不成功则进入下轮循环
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

	    // 执行到这里,说明已经经过前面重重校验,开始真正获取task了
        try {
            // 如果工作线程阻塞时间受限,则使用poll(),否则使用take()
            // poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // r不为空,则返回该Runnable
            if (r != null)
                return r;
            // 没能获取到Runable,则将最近获取任务是否超时设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 响应中断,进入下一次循环前将最近获取任务超时状态置为false
            timedOut = false;
        }
    }
}

processWorkerExit()

方法说明:processWorkerExit(Worker w, boolean completedAbruptly),执行线程退出的方法 参数说明:

  • Worker w:要结束的工作线程。
  • boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true。

执行流程:

  1. 如果 completedAbruptly 为 true,即工作线程因为异常突然死亡,则执行工作线程-1操作。
  2. 主线程获取锁后,线程池已经完成的任务数追加 w(当前工作线程) 完成的任务数,并从worker的set集合中移除当前worker。
  3. 根据线程池状态进行判断是否执行tryTerminate()结束线程池。
  4. 是否需要增加工作线程,如果线程池还没有完全终止,仍需要保持一定数量的线程。
  5. 如果当前线程是突然终止的,调用addWorker()创建工作线程
  6. 当前线程不是突然终止,但当前工作线程数量小于线程池需要维护的线程数量,则创建工作线程。需要维护的线程数量为corePoolSize(取决于成员变量 allowCoreThreadTimeOut是否为 false)或1。
/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1.工作线程-1操作
     * 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
     * 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2.从线程set集合中移除工作线程,该过程需要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将该worker已完成的任务数追加到线程池已完成的任务数
        completedTaskCount += w.completedTasks;
        // HashSet<Worker>中移除该worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
	// 3.根据线程池状态进行判断是否结束线程池
    tryTerminate();
	
	/**
     * 4.是否需要增加工作线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

Search

    微信好友

    博士的沙漏

    Table of Contents