1.计算机的基础知识
位逻辑运算符:
&:
位与运算符,只有两个操作数都是true,结果才是true。
|:
位或运算符,只有两个操作数都是false,结果才是false。
~:
位非运算符:如果位为0,结果是1,如果位为1,结果是0.
^:
位异或运算:两个数转为二进制,然后从高位开始比较,如果相同则为0,不相同则为1。
位移运算:
无符号左移
无符号右移
:带符号右移(没有带符号左移这种操作)
二进制:
二进制都是以补码的形式表示的
正数的原码,反码,补码都一样;
要得到负数的补码,必须先求负数的反码,负数的反码;负数的反码按位1改成0,0改成1;负数的补码等于反码+1
分享一个,有很多干货,包含netty,spring,线程,spring cloud等详细讲解,也有详细的学习规划图,面试题整理等,我感觉在面试这块讲的非常清楚:获取面试资料只需:
点击这里领取!!!
暗号:CSDN
2.ThreadPoolExecutor简单示例
public class ThreadPoolExecutorTest {
public static void main ( String[ ] args) {
BlockingQueue b = new ArrayBlockingQueue ( 100 ) ;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor ( 2 , 10 , 500000 , TimeUnit. SECONDS,
b, new RejectedExecutionHandler ( ) {
@Override
public void rejectedExecution ( Runnable r, ThreadPoolExecutor executor) {
}
} ) ;
threadPoolExecutor. execute ( ( ) - > {
try {
Thread. sleep ( 10000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System. out. println ( 111111 ) ;
} ) ;
threadPoolExecutor. execute ( ( ) - > {
try {
Thread. sleep ( 10000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System. out. println ( 2222222 ) ;
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
3.ThreadPoolExecutor属性分析
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger ( ctlOf ( RUNNING, 0 ) ) ;
private static final int COUNT_BITS = Integer. SIZE - 3 ;
private static final int CAPACITY = ( 1 << COUNT_BITS) - 1 ;
private static final int RUNNING = - 1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf ( int c) { return c & ~ CAPACITY; }
private static int workerCountOf ( int c) { return c & CAPACITY; }
private static int ctlOf ( int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan ( int c, int s) {
return c < s;
}
private static boolean runStateAtLeast ( int c, int s) {
return c >= s;
}
private static boolean isRunning ( int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount ( int expect) {
return ctl. compareAndSet ( expect, expect + 1 ) ;
}
private boolean compareAndDecrementWorkerCount ( int expect) {
return ctl. compareAndSet ( expect, expect - 1 ) ;
}
private void decrementWorkerCount ( ) {
do { } while ( ! compareAndDecrementWorkerCount ( ctl. get ( ) ) ) ;
}
private final BlockingQueue< Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock ( ) ;
private final HashSet< Worker> workers = new HashSet < Worker> ( ) ;
private final Condition termination = mainLock. newCondition ( ) ;
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy ( ) ;
private static final RuntimePermission shutdownPerm =
new RuntimePermission ( "modifyThread" ) ;
private final AccessControlContext acc;
private volatile boolean allowCoreThreadTimeOut;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
4.ThreadPoolExecutor构造方法分析
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue< Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if ( corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0 )
throw new IllegalArgumentException ( ) ;
if ( workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException ( ) ;
this . acc = System. getSecurityManager ( ) == null ?
null :
AccessController. getContext ( ) ;
this . corePoolSize = corePoolSize;
this . maximumPoolSize = maximumPoolSize;
this . workQueue = workQueue;
this . keepAliveTime = unit. toNanos ( keepAliveTime) ;
this . threadFactory = threadFactory;
this . handler = handler;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
5.线程池创建线程顺序分析
当我们执行execute方法提交一个线程的时候,首先会判断当前线程池线程数是否超过核心线程数corePoolSize,若是没有超过,则创建新线程,若是超过,则尝试将Runnable提交到工作队列workQueue中。
如果工作队列workQueue没有超过容量,则Runnable提交到工作队列中,如果超过了workQueue的容量,则尝试创建线程。如果此时创建的线程小于最大线程数maximumPoolSize,则创建线程,如果超过了maximumPoolSize,则执行拒绝策列。
6.ThreadPoolExecutor.execute方法分析
public void execute ( Runnable command) {
if ( command == null)
throw new NullPointerException ( ) ;
int c = ctl. get ( ) ;
if ( workerCountOf ( c) < corePoolSize) {
if ( addWorker ( command, true ) )
return ;
c = ctl. get ( ) ;
}
if ( isRunning ( c) && workQueue. offer ( command) ) {
int recheck = ctl. get ( ) ;
if ( ! isRunning ( recheck) && remove ( command) )
reject ( command) ;
else if ( workerCountOf ( recheck) == 0 )
addWorker ( null, false ) ;
}
else if ( ! addWorker ( command, false ) )
reject ( command) ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
6.1.Worker类分析
每个线程的创建都离不开Worker类,该类完全包装了线程运行的所需要的属性,并提供了初始化线程和从阻塞队列中获取被阻塞的线程并执行的一系列方法。
观察该类代码发现,该类继承了AbstractQueuedSynchronizer并实现了Runnable接口,在创建线程的时候,实际Thread类的构造方法包装的就是Worker类自己(我们知道一般Runnable需要被传入到Thread里面的,如:Thread t = new Thread(runnable), t.start()启动线程)。
而从execute方法传过来的Runnable实现只是被封装到了firstTask中,创建出来的Thread在执行的时候,调用的start方法也只是启动了该类的runWorker方法,而真正封装我们执行逻辑的firstTask这个Runnable类在后续调用中也只是执行自己的run方法而已,并不再被Thread封装。
worker为什么要继承AbstractQueuedSynchronizer呢?
因为在runWork的方法内,在调用firstTask处理业务逻辑前,会给代码加上独占锁,加这个独占锁的目的是什么呢?因为我们在调用shutDown方法的时候,并不会终止处于运行中的线程。shutDown方法会根据独占锁来判断当前worker是否正在工作。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833 L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker ( Runnable firstTask) {
setState ( - 1 ) ;
this . firstTask = firstTask;
this . thread = getThreadFactory ( ) . newThread ( this ) ;
}
public void run ( ) {
runWorker ( this ) ;
}
protected boolean isHeldExclusively ( ) {
return getState ( ) != 0 ;
}
protected boolean tryAcquire ( int unused) {
if ( compareAndSetState ( 0 , 1 ) ) {
setExclusiveOwnerThread ( Thread. currentThread ( ) ) ;
return true ;
}
return false ;
}
protected boolean tryRelease ( int unused) {
setExclusiveOwnerThread ( null) ;
setState ( 0 ) ;
return true ;
}
public void lock ( ) { acquire ( 1 ) ; }
public boolean tryLock ( ) { return tryAcquire ( 1 ) ; }
public void unlock ( ) { release ( 1 ) ; }
public boolean isLocked ( ) { return isHeldExclusively ( ) ; }
void interruptIfStarted ( ) {
Thread t;
if ( getState ( ) >= 0 && ( t = thread) != null && ! t. isInterrupted ( ) ) {
try {
t. interrupt ( ) ;
} catch ( SecurityException ignore) {
}
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
6.2.addWorker方法分析
addWorker是创建线程的核心方法,一个worker代表一个线程,而workers这个全局变量可以代表线程池,只有向workers里面添加worker成功的时候,才能代表创建线程成功了。
addWorker在执行过程中,会根据线程池状态和线程池数量判断是否能创建线程,创建线程成功会将记录线程池状态和数量的ctl值+1,并将worker加入到workers里面,更新线程池生命周期内线程池线程的最大数量,然后启动线程执行任务。
addWorker的core参数代表是否是在创建核心线程,core为true代表创建核心线程,false代表阻塞队列已满,创建非核心线程。
返回值: true代表创建线程并启动成功,false代表创建线程失败。
什么时候返回false呢? CASE1.线程池状态rs>SHUTDOWN;
CASE2.线程池状态为SHUTDOWN的时候,阻塞队列没有任务了,为空;
CASE3.线程池状态为SHUTDOWN的时候,execute提交的Runnable(被封装到firstTask里面)不为空;
CASE4.如果是创建核心线程,此时已经超过核心线程数;如果是创建非核心线程,此时已经超过最大线程数;
CASE5.ThreadFactory创建线程为空,这里一般是我们自定义线程工厂的时候出的问题;
private boolean addWorker ( Runnable firstTask, boolean core) {
retry:
for ( ; ; ) {
int c = ctl. get ( ) ;
int rs = runStateOf ( c) ;
if ( rs >= SHUTDOWN &&
! ( rs == SHUTDOWN &&
firstTask == null &&
! workQueue. isEmpty ( ) ) )
return false ;
for ( ; ; ) {
int wc = workerCountOf ( c) ;
if ( wc >= CAPACITY ||
wc >= ( core ? corePoolSize : maximumPoolSize) )
return false ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
if ( compareAndIncrementWorkerCount ( c) )
break retry;
c = ctl. get ( ) ;
if ( runStateOf ( c) != rs)
continue retry;
}
}
boolean workerStarted = false ;
boolean workerAdded = false ;
Worker w = null;
try {
w = new Worker ( firstTask) ;
final Thread t = w. thread;
if ( t != null) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
int rs = runStateOf ( ctl. get ( ) ) ;
if ( rs < SHUTDOWN ||
( rs == SHUTDOWN && firstTask == null) ) {
if ( t. isAlive ( ) )
throw new IllegalThreadStateException ( ) ;
workers. add ( w) ;
int s = workers. size ( ) ;
if ( s > largestPoolSize)
largestPoolSize = s;
workerAdded = true ;
}
} finally {
mainLock. unlock ( ) ;
}
if ( workerAdded) {
t. start ( ) ;
workerStarted = true ;
}
}
} finally {
if ( ! workerStarted)
addWorkerFailed ( w) ;
}
return workerStarted;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
6.3.runWorker方法分析
addWorker方法创建线程成功,Worker类的Thread会调用start方法启动自己的run方法,因为Worker类实现了Runnable接口,run方法里面调用了runWorker方法。实际我们execute方法传入的Runnable被封装到了Worker类的firstTask属性里面。然后在runWorker里面调用run方法启动具体的逻辑,注意这里并没用再用Thrad封装Runnable了。线程启动后,会一直运行While循环,循环第一次运行自己传入的Runnable,第二次及之后则通过getTask方法从任务队列种获取具体的Runnable任务了。一旦While循环内发生异常或者getTask返回空,则会调用processWorkerExit执行线程销毁逻辑。getTask方法获取不到具体任务的线程都可被认为是空闲线程。
final void runWorker ( Worker w) {
Thread wt = Thread. currentThread ( ) ;
Runnable task = w. firstTask;
w. firstTask = null;
w. unlock ( ) ;
boolean completedAbruptly = true ;
try {
while ( task != null || ( task = getTask ( ) ) != null) {
w. lock ( ) ;
if ( ( runStateAtLeast ( ctl. get ( ) , STOP) ||
( Thread. interrupted ( ) &&
runStateAtLeast ( ctl. get ( ) , STOP) ) ) &&
! wt. isInterrupted ( ) )
wt. interrupt ( ) ;
try {
beforeExecute ( wt, task) ;
Throwable thrown = null;
try {
task. run ( ) ;
} catch ( RuntimeException x) {
thrown = x; throw x;
} catch ( Error x) {
thrown = x; throw x;
} catch ( Throwable x) {
thrown = x; throw new Error ( x) ;
} finally {
afterExecute ( task, thrown) ;
}
} finally {
task = null;
w. completedTasks++ ;
w. unlock ( ) ;
}
}
completedAbruptly = false ;
} finally {
processWorkerExit ( w, completedAbruptly) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
6.4.getTask方法分析
当getTask返回空的时候,线程可以执行销毁逻辑了。
getTask什么时候返回空?
1.线程池处于SHUTDOWN状态,工作队列为空的时候;
2.线程池处于STOP状态以上的时候,将线程池线程数-1并返回空;
3.当工作队列为空的时候;
注意,线程池的allowCoreThreadTimeOut属性会影响getTask方法,导致getTask方法一直阻塞在workQueue.take()这里的,这样就不会销毁线程。
1.allowCoreThreadTimeOut=true,使用非阻塞方法从队列获取任务
2.allowCoreThreadTimeOut=false,线程池线程数还未达到核心线程数上限,使用阻塞方法获取任务,这样就可以使得核心线程不会被销毁,getTask方法一直阻塞等待获取队列种的任务。
3.allowCoreThreadTimeOut=false,线程池线程数达到核心线程数,使用非阻塞方法获取任务
private Runnable getTask ( ) {
boolean timedOut = false ;
for ( ; ; ) {
int c = ctl. get ( ) ;
int rs = runStateOf ( c) ;
if ( rs >= SHUTDOWN && ( rs >= STOP || workQueue. isEmpty ( ) ) ) {
decrementWorkerCount ( ) ;
return null;
}
int wc = workerCountOf ( c) ;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ( ( wc > maximumPoolSize || ( timed && timedOut) )
&& ( wc > 1 || workQueue. isEmpty ( ) ) ) {
if ( compareAndDecrementWorkerCount ( c) )
return null;
continue ;
}
try {
Runnable r = timed ?
workQueue. poll ( keepAliveTime, TimeUnit. NANOSECONDS) :
workQueue. take ( ) ;
if ( r != null)
return r;
timedOut = true ;
} catch ( InterruptedException retry) {
timedOut = false ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
6.5.processWorkerExit方法分析
该方法用于执行线程销毁的逻辑。
1.加锁,从workers队列中移除一个worker,将线程池完成任务总数量+1, 释放锁;
2.判断是否需要关闭线程池;
3.如果线程正常完成并退出,根据allowCoreThreadTimeOut判断是否需要回收核心线程,
若allowCoreThreadTimeOut=true,只需要保证线程池最少有一个线程即可,也就是说超过1的空闲线程一定会被销毁。
若allowCoreThreadTimeOut=false,在线程数未达到核心线程数上限的情况下,由于getTask方法的阻塞,不会执行线程销毁的逻辑;当线程数达到核心线程数上限的情况,且队列也达到上限数,这之后创建的任何线程在getTask方法获取不到具体任务的情况下都会销毁
private void processWorkerExit ( Worker w, boolean completedAbruptly) {
if ( completedAbruptly)
decrementWorkerCount ( ) ;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
completedTaskCount += w. completedTasks;
workers. remove ( w) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
int c = ctl. get ( ) ;
if ( runStateLessThan ( c, STOP) ) {
if ( ! completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if ( min == 0 && ! workQueue. isEmpty ( ) )
min = 1 ;
if ( workerCountOf ( c) >= min)
return ;
}
addWorker ( null, false ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
6.6.tryTerminate方法分析
尝试关闭线程池方法并处理空闲线程,interruptIdleWorkers方法处理空闲线程,设置中断状态。每个线程退出都会单独调用该方法。
final void tryTerminate ( ) {
for ( ; ; ) {
int c = ctl. get ( ) ;
if ( isRunning ( c) ||
runStateAtLeast ( c, TIDYING) ||
( runStateOf ( c) == SHUTDOWN && ! workQueue. isEmpty ( ) ) )
return ;
if ( workerCountOf ( c) != 0 ) {
interruptIdleWorkers ( ONLY_ONE) ;
return ;
}
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
if ( ctl. compareAndSet ( c, ctlOf ( TIDYING, 0 ) ) ) {
try {
terminated ( ) ;
} finally {
ctl. set ( ctlOf ( TERMINATED, 0 ) ) ;
termination. signalAll ( ) ;
}
return ;
}
} finally {
mainLock. unlock ( ) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
6.7.interruptIdleWorkers方法分析
处理一个空闲线程方法。所有处于执行中的线程都会加锁(w.lock())。上面我们提过,核心线程被take方法阻塞的时候,我们这里设置线程t.interrupt(), 会解除take的阻塞。
private void interruptIdleWorkers ( boolean onlyOne) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
for ( Worker w : workers) {
Thread t = w. thread;
if ( ! t. isInterrupted ( ) && w. tryLock ( ) ) {
try {
t. interrupt ( ) ;
} catch ( SecurityException ignore) {
} finally {
w. unlock ( ) ;
}
}
if ( onlyOne)
break ;
}
} finally {
mainLock. unlock ( ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
6.8.awaitTermination方法分析
该方法是判断线程池状态状态是否是TERMINATED,如果是则直接返回true,否则会await挂起当前线程指定的时间
public boolean awaitTermination ( long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit. toNanos ( timeout) ;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
for ( ; ; ) {
if ( runStateAtLeast ( ctl. get ( ) , TERMINATED) )
return true ;
if ( nanos <= 0 )
return false ;
nanos = termination. awaitNanos ( nanos) ;
}
} finally {
mainLock. unlock ( ) ;
}
}
## 6.9 . shutDown和shutDownNow方法分析
shutDown方法会优雅的关闭线程池,设置线程池状态为SHUTDOWN,已经处于队列中的任务会继续等待执行完。
shutDownNow方法会立即关闭线程池,设置线程池状态为STOP。
public void shutdown ( ) {
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
checkShutdownAccess ( ) ;
advanceRunState ( SHUTDOWN) ;
interruptIdleWorkers ( ) ;
onShutdown ( ) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
}
public List< Runnable> shutdownNow ( ) {
List< Runnable> tasks;
final ReentrantLock mainLock = this . mainLock;
mainLock. lock ( ) ;
try {
checkShutdownAccess ( ) ;
advanceRunState ( STOP) ;
interruptWorkers ( ) ;
tasks = drainQueue ( ) ;
} finally {
mainLock. unlock ( ) ;
}
tryTerminate ( ) ;
return tasks;
}
#7. ThreadPoolExecutor拒绝策列
默认有以下4 中拒绝策列,用户也可以实现RejectedExecutionHandler接口自定义。
CallerRunsPolicy将任务交给调用者执行
AbortPolicy抛出异常
DiscardPolicy什么都不做,直接丢弃
DiscardOldestPolicy丢弃老的,执行新的
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
if ( ! e. isShutdown ( ) ) {
r. run ( ) ;
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException ( "Task " + r. toString ( ) +
" rejected from " +
e. toString ( ) ) ;
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy ( ) { }
public void rejectedExecution ( Runnable r, ThreadPoolExecutor e) {
if ( ! e. isShutdown ( ) ) {
e. getQueue ( ) . poll ( ) ;
e. execute ( r) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
8.扩展:改变线程池的初始化过程
如果我们想让线程按核心线程,最大线程,最后再进队列的方式初始化,应该怎么做?
public void execute ( Runnable command) {
if ( command == null)
throw new NullPointerException ( ) ;
int c = ctl. get ( ) ;
if ( workerCountOf ( c) < corePoolSize) {
if ( addWorker ( command, true ) )
return ;
c = ctl. get ( ) ;
}
if ( isRunning ( c) && workQueue. offer ( command) ) {
int recheck = ctl. get ( ) ;
if ( ! isRunning ( recheck) && remove ( command) )
reject ( command) ;
else if ( workerCountOf ( recheck) == 0 )
addWorker ( null, false ) ;
}
else if ( ! addWorker ( command, false ) )
reject ( command) ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
我们在说execute方法初始化线程池过程中。CASE2:workQueue.offer(command)会将任务加入到队列。所以,我们这里只需要自定义BlockingQueue,改造offer方法,在里面判断,当线程池线程数还未达到最大线程数的时候返回false即可。
Dubbo的EagerThreadPool自定义了一个BlockingQueue,在offer()方法中,如果当前线程池数量小于最大线程池时,直接返回false,这里就达到了调节线程池执行顺序的目的。
9.推荐
分享一个,有很多干货,包含netty,spring,线程,spring cloud等详细讲解,也有详细的学习规划图,面试题整理等,我感觉在面试这块讲的非常清楚:获取面试资料只需:
点击这里领取!!!
暗号:CSDN