参数说明

Java.util.concurrent.ThreadPoolExecutor,其构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize: 线程池维护线程的最少数量
  • maximumPoolSize:线程池维护线程的最大数量
  • keepAliveTime: 线程池维护线程所允许的空闲时间
  • unit: 线程池维护线程所允许的空闲时间的单位
  • workQueue: 线程池所使用的缓冲队列
  • handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

workQueue常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四个选择:

  1. ThreadPoolExecutor.AbortPolicy()              —   抛出java.util.concurrent.RejectedExecutionException异常
  2. ThreadPoolExecutor.CallerRunsPolicy()       —   重试添加当前的任务,他会自动重复调用execute()方法
  3. ThreadPoolExecutor.DiscardOldestPolicy()   —   抛弃旧的任务
  4. ThreadPoolExecutor.DiscardPolicy()            —   抛弃当前的任务

keepAliveTime的含义

先引用一句我觉得相对说的比较明白的含义:当线程空闲时间达到keepAliveTime,该线程会退出,有两个疑问:

  1. 线程为什么会空闲
  2. 线程为什么要退出

举例说明:
核心线程数10,最大线程数30,keepAliveTime是3秒
随着任务数量不断上升,线程池会不断的创建线程,直到到达核心线程数10,就不创建线程了,
这时多余的任务通过加入阻塞队列来运行,
当超出阻塞队列长度+核心线程数时,
这时不得不扩大线程个数来满足当前任务的运行,这时就需要创建新的线程了(最大线程数起作用),上限是最大线程数30
那么超出核心线程数10并小于最大线程数30的可能新创建的这20个线程相当于是“借”的,如果这20个线程空闲时间超过keepAliveTime,就会被退出

我们来看开头提到的两个问题:

  1. 线程为什么会空闲
  2. 线程为什么要退出
    答:
  3. 没有任务时线程就会空闲下来,在线程池中任务是任务(Runnale)线程是线程(Worker
  4. 通常超出核心线程的线程是“借”的,也就是说超出核心线程的情况算是一种能够预见的异常情况,并且这种情况并不常常发生(如果常常发生,那我想你应该调整你的核心线程数了),所以这种不经常发生而创建的线程为了避免资源浪费就应该要退出

看一下java.util.concurrent.ThreadPoolExecutor#getTask源码来验证上面一段话的含义:

1
2
3
4
5
6
7
8
9
10
11
int wc = workerCountOf(c);
// Are workers subject to culling? 是否屠宰workers
//当allowCoreThreadTimeOut为true或者当前任务数超过核心线程数时,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
……
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://如果timed为true说明worker有可能要被关闭,这里调用的代码含义:如果超过keepAliveTime纳秒还没取到任务,就返回null,后面会调用processWorkerExit把worker关闭
workQueue.take();//否则任务队列为空就阻塞在这里,直到任务队列再有任务
if (r != null)
return r;

10个核心线程会不会退出,由下面的参数决定:
allowCoreThreadTimeout:是否允许核心线程空闲退出,默认值为false

当keepAliveTime设置为0时到底是空闲线程直接退出还是不退出

通过实验证明和上文代码workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)证明,是直接不等待退出,而不是永远不退出.
需要注意了,这个值设置为0 并不是个很好的做法(除非场景中任务数量极少能超出核心线程数),如果任务数频繁超出核心线程数,这个值需要评估设定为合理值尽量避免线程开启关闭的动作

工作原理

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:

    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
    • 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
    • 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

这样的过程说明,并不是先加入任务就一定会先执行。
假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,
那么当加入 20 个任务时,执行的顺序就是这样的:

  1. 首先执行任务 1、2、3,然后任务 4~13 被放入队列。
  2. 这时候队列(默认队列)满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。
  3. 最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

队列

所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

  • 如果运行的线程少于corePoolSize,则Executor始终首选添加新的线程,而不进行排队。
  • 如果运行的线程等于或多于corePoolSize,则Executor始终首选将请求加入队列,而不添加新的线程。
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。

排队有三种通用策略:

  • 直接提交。工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界maximumPoolSizes以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。由于corePoolSize0所以任务会放入SynchronousQueue队列中,SynchronousQueue只能存放大小为1,所以会立刻新起线程,由于maxumumPoolSizeInteger.MAX_VALUE所以可以认为大小为2147483647。受内存大小限制。任务队列是SynchronousQueue这个队列的特点是,它并不能放置任何任务在其队列中,当有任务被提交时,使用SynchronousQueue的线程池会立即为该任务创建一个线程(如果 线程数量没有达到最大时,如果达到了最大,那么该任务会被拒绝)。这种队列适合于当任务数量较小时采用。也就是说,在使用这种队列时,未被执行的任务没有 一个容器来暂时储存。

  • 无界队列。使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue)将导致在所有corePoolSize线程都忙时新任务在队列中等待。这样,创建的线程就不会超过corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

  • 有界队列。当使用有限的maximumPoolSizes时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

拒绝策略

Executor已经关闭,并且Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute( 中提交的新任务将被拒绝。在以上两种情况下,execute方法都将调用其RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution,  方法。下面提供了四种预定义的处理程序策略:

  1. 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的execute本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  3. ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。
  4. ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。