什么是线程池 线程池顾名思义,就是有很多线程的一个池子,这里面有多少线程,是要根据你要业务需求来确定;它方便你线程的创建和使用,不需要频繁的创建线程资源使得,线程资源充分的得到利用。所以和数据库链接池类似,线程池的作用就是充分利用资源,提高相应速度,增加系统的吞吐率同时方便管理和监控线程池中线程使用情况,实现对程序的优化 。当添加的到线程池中的任务超过它的容量时,会有一部分任务阻塞等待。当等待任务超过阻塞队列大小,线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。
线程池解决什么问题 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分 :
1、线程池管理器 (ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程 (PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口 (Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列 (taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子: 假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
线程池如何设计的
Core and maximum pool sizes (ThreadPoolExecutor会根据corePoolSize以及maximumPoolSize的边界自动的调整线程池的大小。)
1、当通过execute(Runnable)提交任务时,而且正在运行的线程数少于corePoolSize ,即使其他线程处于空闲状态,也会创建一个新的线程执行这个任务;
2、如果有大于corePoolSize但是小于maximumPoolSize 数量的线程正在运行,则新提交的任务会放进workQueue进行任务缓存,但是如果workQueue已满 ,则会直接创建线程执行 ,但是如果创建的线程数大于maximum pool sizes的时候将拒绝任务 。 3、**当corePoolSize和maximumPoolSize **相等时则会创建固定数量的线程池
4、将maximumPoolSize 设置为无边界的 ,比如整数的最大值,则意味着线程数和任务数量一致,也就没有等待的任务 5、corePoolSize、maximumPoolSize可以根据实际需求通过构造器设置,也可以动态的在运行时设置。
On-demand construction (按照需求构造线程) 1、默认情况下,每一个核心线程只有当有新任务到来时才会初始化创建,并执行 2、但是可以在运行时可以通过prestartCoreThread(一个coreThread)或者prestartAllCoreThreads(全部coreThread)来提前创建并运行指定的核心线程,这种需求适用于初始化线程池时,任务队列初始不为空的情况下。
Creating new threads (创建一个新的线程)
1、创建线程是通过ThreadFactory 。除非特别的设定,否则默认使用Executors.defaultThreadFactory作为线程池,这个线程池创建的所有线程都有相同的线程组,线程优先级,非守护线程的标志 2、通过应用不同的线程池,可以更改线程的名字,线程组,优先级,守护标志等等 3、当通过**newThread()**调用线程池创建线程池失败时,返回null,此时执行器会继续运行,但是可能处理不了任何任务
4、线程需要处理”modifyThread” RuntimePermission,对线程修改进行运行时权限检查。如果使用这个线程池的工作线程或者其他线程没有处理这个认证”permission”则会使服务降级:对于线程池的所有设置都不会及时的生效,一个已经关闭的线程池可能还会处于一种线程池终止没有完成的状态
Keep-alive times (空闲的线程存活时间)
1、当这个线程池此时含有多余corePoolSize的线程存在,则多余的线程在空闲了超过keepAliveTime的时间将会被终止 2、这提供了一种减少空闲线程从而降低系统线程资源损耗的方法,还可以通过setKeepAliveTime进行动态设置 3、默认情况下,keep-alive policy只对超出corePoolSize的线程起作用 ,但是可以通过方法**allowCoreThreadTimeOut(boolean)**将空闲超时策略同样应用于coreThread,但是要保证超时时间不为0值。
Queue (阻塞队列,任何BlockingQueue都可以被用来容纳和传递提交的任务)
1、如果正在运行的线程小于corePoolSize,则executor会新增一个线程而不是将任务入队 2、如果正在运行的线程大于corePoolSize但是小于maximumPoolSize,executor则会将任务入队,而不是创建一个线程 3、如果任务不能入队(队列已满),则在没有超出maximumPoolSize的情况下创建一个新的线程,否则某种拒绝策略拒绝这个任务。
three general strategies for queuing (三种入队策略) 1、Direct handoffs:直接传递 。比如 synchronousQueue ,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。 2、Unbounded queues:无界队列。 比如 没有指定容量的LinkedBlockingQueue ,这将会使coreThread一直工作,而且由于任务总能入队,所以也不会创建其他超过corePoolSize的线程。用于所有任务完全独立,不相关,比如平滑瞬间高并发web页面的请求等,其实相当于异步框架了 3、Bounded queues:有界队列。 比如ArrayBlockingQueue ,有助于在设置有限大的maximumPoolSizes时,阻止造成系统资源的枯竭。队列大小和最大池大小可能需要相互折衷:使用大队列和小池最大限度地减少CPU的使用,操作系统资源,和上下文切换开销,但可能会导致人为的低吞吐量 。如果任务经常被阻塞(例如,如果它们是I/O绑定),系统可能比你允许的时间安排更多线程的时间 。使用**小队列通常需要更大的池大小,这使得CPU繁忙,但可能会遇到不可接受的调度开销,这也降低吞吐量 **
Rejected tasks (拒绝任务)
当提交一个新任务时,如果Executor已经关闭或者有限的workQueue,maximumPoolSizes,并且他们已经饱和了,只要出现其中一种情况都会被拒绝。有四种已经定义的处理策略。也可以继承RejectedExecutionHandler自定义实现
Hook methods (钩子方法,提供在每个任务执行时不同阶段执行不同的处理函数)
1、protected void beforeExecute(Thread t, Runnable r): 优先使用指定的线程处理给定的任务,并在任务执行前做一些处理(如设置ThreadLocal变量或者记录一些日志等),t为执行r任务的线程,r为提交的任务。
2、protected void afterExecute(Runnable r, Throwable t): 任务执行完成时处理。r为执行完的任务,t为指定的造成任务终止的异常,如果设置为null则执行会正常完成,不会抛出异常
3、**protected void terminated()**当Executor终止时,被调用一次
以上三个方法都为空方法 ,使用者自行实现。在进行多层嵌套时都要显示调用 super.method() 完成上层的处理函数。如果在调用方法时发生异常,则内部的工作线程可能会依次失败,突然终止。
可以继承ThreadPoolExecute,并实现上述几个Hook方法来检测线程池的状态,自定义自己的线程池,如监控任务的平均、最大、最小执行时间,来发现有没有一致阻塞的线程任务。
线程池是如何实现的 在Java中线程池使用ThreadPoolExecutor这类去实现了,它里面封装了线程池的相关属性和创建线程池的基本方法。下面我们就来简单看一下源代码,简答的分析一波
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; } private static boolean runStateLessThan (int c, int s) { return c < s; } private static boolean runStateAtLeast (int c, int s) { return c >= s; } private static boolean isRunning (int c) { return c < SHUTDOWN; }
线程池中使用AtomicInteger 的CAS机制来实现对 运行时状态以及工作线程计数的并发一致性操作 ,低29位(32-3)用来保存workerCount,所以workerCount的最大为2^29 -1 。高3位用来保存runState,这样实现具有较高效率,不用单独两次存储。
RUNNING -> SHUTDOWN :手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
(RUNNING or SHUTDOWN) -> STOP :调用shutdownNow方法
SHUTDOWN -> TIDYING :当队列和线程池都为空的时候
STOP -> TIDYING :当线程池为空的时候
TIDYING -> TERMINATED :terminated方法调用完成之后,ThreadPoolExecutor内部还保存着线程池的有效线程个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private boolean compareAndIncrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect + 1 ); } private boolean compareAndDecrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect - 1 ); } private void decrementWorkerCount () { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread" );
线程池的几个构造函数,线程池中一共有7个参数。每个参数都代表这个不同的意义。corePoolSize核心线程的数量;maximumPoolSize线程池的最多线程数;keepAliveTime当一个线程在空闲时的存活时间(核心线程要看allowCoreThreadTimeOut参数);TimeUnit时间单位多数是秒,也可以设置毫秒;workQueue缓冲队列(即线程池没有运行的任务队列);threadFactory用于创建新线程的线程工厂;handler用于线程池在满负荷下的拒绝策略。可以使抛异常,也可是放弃任务等。 拒绝策略指的是由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
线程池的几个内部类,主要是Worker类,和几个拒绝策略的实现类。Worker是一个AQS的实现类(为何设计成一个AQS在闲置Worker里会说明),同时也是一个实现Runnable的类,实现独占锁(非重入的互斥锁),它的构造函数只接受一个Runnable参数,内部保存着这个Runnable属性,还有一个thread线程属性用于包装这个Runnable(这个thread属性使用ThreadFactory构造。在构造函数内完成thread线程的构造),实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行(判断是否是闲置线程,是否可以被强制中断。 一般有锁闲置的工作线程,因为在执行runWorker的时候会去掉锁),可以看后面 shutdown 和 shutdownNow 方法的分析。另外还有一个completedTasks计数器表示这个Worker完成的任务数。Worker类复写了run方法,使用ThreadPoolExecutor的runWorker方法(在addWorker方法里调用),直接启动Worker的话,会调用ThreadPoolExecutor的runWork方法。需要特别注意的是这个Worker是实现了Runnable接口的,thread线程属性使用ThreadFactory构造Thread的时候,构造的Thread中使用的Runnable其实就是Worker。 在前面还有一个HashSet的Worker 的集合workers,线程池通过管理线程池里的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 · private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
ThreadPoolExecutor执行任务。首先通过submit或者excute方法把任务放到线程池中(这里如果线程池空闲会直接执行,否则会进入到缓冲队列中去),然后线程池从缓冲队列中获取任务(getTask),然后添加Work,最后执行要执行任务内容(runWorker)。
1 2 3 4 5 6 7 8 9 10 public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
由于submit方法内部调用execute方法,所以execute方法就是执行任务的方法,来看一下execute方法,execute方法内部分3个步骤进行处理。
如果当前正在执行的Worker数量比corePoolSize(基本大小,核心线程数)要小。直接创建一个新的Worker执行任务,会调用addWorker方法
如果当前正在执行的Worker数量大于等于corePoolSize(基本大小,核心线程数)。将任务放到阻塞队列里,如果阻塞队列没满并且状态是RUNNING的话,直接丢到阻塞队列,否则执行第3步。丢到阻塞队列之后,还需要再做一次验证(丢到阻塞队列之后可能另外一个线程关闭了线程池或者刚刚加入到队列的线程死了)。如果这个时候线程池不在RUNNING状态,把刚刚丢入队列的任务remove掉,调用reject方法,否则查看Worker数量,如果Worker数量为0,起一个新的Worker去阻塞队列里拿任务执行
丢到阻塞失败的话,会调用addWorker方法尝试起一个新的Worker去阻塞队列拿任务并执行任务,如果这个新的Worker创建失败,调用reject方法
上面说的Worker可以暂时理解为一个执行任务的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
如何添加一个Worker。
在外循环对运行状态进行判断 ,内循环通过CAS机制对workerCount进行增加 ,当设置成功,则跳出外循环,否则进行进行内循环重试
外循环之后,获取全局锁,再次对运行状态进行判断,符合条件则添加新的工作线程,并启动工作线程 ,如果在最后对添加线程没有开始运行(可能发生内存溢出 ,操作系统无法分配线程 等等)则对添加操作进行回滚 ,移除之前添加的线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
运行Worker中的任务。
线程池中的这个基本大小指的是Worker的数量。一个Worker是一个Runnable的实现类,会被当做一个线程进行启动。Worker内部带有一个Runnable属性firstTask,这个firstTask可以为null,为null的话Worker会去阻塞队列拿任务执行,否则会先执行这个任务,执行完毕之后再去阻塞队列继续拿任务执行。
所以说如果Worker数量超过了基本大小,那么任务都会在阻塞队列里,当Worker执行完了它的第一个任务之后,就会去阻塞队列里拿其他任务继续执行。
Worker在执行的时候会根据一些参数进行调节,比如Worker数量超过了线程池基本大小或者超时时间到了等因素,这个时候Worker会被线程池回收,线程池会尽量保持内部的Worker数量不超过基本大小
Worker执行任务的时候调用的是Runnable的run方法,而不是start方法,调用了start方法就相当于另外再起一个线程了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } } private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
获取任务getTask,一般会在runWorker的时候去调用。
通过死循环来对线程池状态进行判断,并获取任务,在超时发生之前发生中断则重置超时标志位false并进行重试 ,如果获取到任务则返回任务
主要来看一下是如何实现移除空闲keepAliveTime线程的 :workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方法从任务队列中定时获取任务 ,如果超时,则说明线程已经在等待了keepAliveTime都没有获得任务 ,则将超时标志设为true ,在下一次循环时进行判断,如果发现上一次获取任务发生超时,则立刻返回null,这时worker线程主循环将正常结束,并移除结束的worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
Worker在回收的时候会尝试终止线程池也就是tryTerminate方法。尝试关闭线程池的时候,会检查是否还有Worker在工作,检查线程池的状态,没问题的话会将状态过度到TIDYING状态,之后调用terminated方法,terminated方法调用完成之后将线程池状态更新到TERMINATED。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
线程池的关闭主要是两个方法,shutdown和shutdownNow方法。
shutdown方法会更新状态到SHUTDOWN,不会影响阻塞队列里任务的执行,但是不会执行新进来的任务。同时也会回收闲置的Worker,闲置Worker的定义上面已经说过了。
shutdownNow方法会更新状态到STOP,会影响阻塞队列的任务执行,也不会执行新进来的任务。同时会回收所有的Worker。
线程池的结束相关的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (;;) {, if (runStateAtLeast(ctl.get(), TERMINATED)) return true ; if (nanos <= 0 ) return false ; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } } private void interruptWorkers () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } protected void finalize () { shutdown(); }
interruptIdleWorkers方法,注意,这个方法打断的是闲置Worker,打断闲置Worker之后,getTask方法会返回null,然后Worker会被回收。怎么判断Worker是闲置呢?
闲置Worker是这样解释的:Worker运行的时候会去阻塞队列拿数据(getTask方法),拿的时候如果没有设置超时时间,那么会一直阻塞等待阻塞队列进数据,这样的Worker就被称为闲置Worker。由于Worker也是一个AQS,在runWorker方法里会有一对lock和unlock操作,这对lock操作是为了确保Worker不是一个闲置Worker。所以Worker被设计成一个AQS是为了根据Worker的锁来判断是否是闲置线程,是否可以被强制中断 (而且这个锁还是一个不可重入锁,即独占锁)。
Java提供了那几种线程池 在Java 的Executors类中有很多创建好的线程池。FixedThreadPool固定大小的线程池,SingleThreadExecutor只有一个线程的线程池,CachedThreadPool带有缓存功能的线程池,ScheduledThreadPool可以定时的线程池,SingleThreadScheduledExecutor只有一个线程并且可以定时的线程池,WorkStealingPool使用ForkJoin方式的线程池,unconfigurableExecutorService不可配置的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute (Runnable command) { e.execute(command); } public void shutdown () { e.shutdown(); } public List<Runnable> shutdownNow () { return e.shutdownNow(); } public boolean isShutdown () { return e.isShutdown(); } public boolean isTerminated () { return e.isTerminated(); } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit (Callable<T> task) { return e.submit(task); } public <T> Future<T> submit (Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super (executor); } protected void finalize () { super .shutdown(); } }
1 2 3 4 5 6 7 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
1 2 3 4 5 6 7 8 9 10 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue()); }
SingleThreadScheduledExecutor
1 2 3 4 5 6 public static ScheduledExecutorService newSingleThreadScheduledExecutor () { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1 )); }
1 2 3 4 5 6 7 8 public static ExecutorService newWorkStealingPool () { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); }
unconfigurableExecutorService
1 2 3 4 5 6 7 public static ExecutorService unconfigurableExecutorService (ExecutorService executor) { if (executor == null ) throw new NullPointerException(); return new DelegatedExecutorService(executor); }
线程池使用的注意问题 1、建议使用有界队列,有界队列能增加系统的稳定性和预警能力,防止资源过度消耗,撑爆内存,使得系统崩溃不可用。 2、提交到线程池的task之间要尽量保证相互独立,不能存在相互依赖,否则可能会造成死锁等其他影响线程池执行的原因。 3、提交到的线程池的task不要又创建一个子线程执行别的任务,然后又将这个子线程任务提交到线程池,这样会造成混乱的依赖,最终导致线程池崩溃,最好将一个task用一个线程执行。
4、一般需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务 ,就需要尽量压榨CPU,参考值可以设为 Num(CPU+1)
如果是IO密集型任务 ,参考值可以设置为2Num(CPU) *
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值 ,再观察任务运行情况和系统负载、资源利用率来进行适当调整 。
Java线程底层映射到操作系统原生线程,而且Java在windows和linux平台下,一个Java线程映射为一个内核线程,而内核线程和CPU物理核心数一样,所以Java线程和CPU核心是一对一的关系,将线程池的工作线程设置为与物理核心相等能做到真正的线程并发,如果设置线程数多于核心则会在核心线程之间不停的切换。
参考 Java线程池ThreadPoolExecutor源码分析
线程池-ThreadPoolExecute源码分析