抱歉,您的瀏覽器無法訪問本站
本頁面需要瀏覽器支持(啟用)JavaScript
了解詳情 >

ThreadPoolExecutor线程池简介

在阿里的开发手册里,注明了不能在项目中显示创建线程,而是要通过线程池来管理提供。这样做是因为可以减少线程创建和销毁的内存开销,合理控制项目中的线程数。

ThreadPoolExecutor

ThreadPoolExecutor源码初探

线程池状态成员字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 高3位表示线程池状态,低29位表示线程池个数(默认是RUNNING状态线程个数为0)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3;

// 线程最大个数29个二进制1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;


// ***线程池状态***:

// 运行状态(高三位)111
private static final int RUNNING = -1 << COUNT_BITS;

// 暂停状态(高三位)000
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 停止状态(高三位)001
private static final int STOP = 1 << COUNT_BITS;

// 完成状态(高三位)010
private static final int TIDYING = 2 << COUNT_BITS;

// 终止状态(高三位)011
private static final int TERMINATED = 3 << COUNT_BITS;

// 获得高三位(运行状态)
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 获得低29位(线程个数)
private static int workerCountOf(int c) { return c & CAPACITY; }

计算新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态
  • RUNNING : 接受新的任务并且处理堵塞队列的任务
  • SHUTDOWN : 不接受新的任务但是处理堵塞队列的任务
  • STOP : 不接受新的任务而且抛弃堵塞队列的任务,中断正在运行的任务
  • TIDYING :所有任务都执行完毕,活动线程个数为0,且准备调用terminated方法
  • TERMINATED : 终止状态,调用完terminated后的状态

线程池构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}


public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}


public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

从上面线程池的四个构造方法可以看出其实最后一个7参数才是真正的构造方法。

线程池构造参数
  • corePoolSize : 线程池核心线程数
  • maxinumPoolSize : 线程池最大线程数,如果大于等于就会执行拒绝策略
  • keepAliveTime : 空闲线程存活时间,如果当前线程大于核心线程数且是空闲状态的最长休眠时间。
  • unit : 时间单位
  • workQueue :堵塞队列
  • threadFactory : 线程工厂
  • handler : 拒绝策略,当线程已满且到达maxinumPoolSize个数,采取的拒绝策略

execute(Runnable command)方法

execute方法负责将任务command提交到线程池中运行,ThreadPoolExecutor线程池其实是生产消费模型,当用户提交任务到队列中,相当于生产。当workers线程执行队列里的任务,相当于消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {

if (command == null)
throw new NullPointerException();
//获得当前线程池的状态和线程的个数组合值
int c = ctl.get();
//如果当前工作线程数小于核心线程数则创建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池是运行状态,则入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次检测,如果不是运行状态了就移除该线程,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前线程池为空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列满,则新增线程,新增线程则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

addWorker(Runnable firstTask, boolean core)方法

addWorker()尝试把新增任务添加到任务队列中,这里代码比较长,但是其实主要分为两部分。

  1. 双重循环通过CAS操作自增线程数。
  2. 加锁把新增任务加入到workers里。
    下面我们分段来看这段代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//这里比较绕下面会分析
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//如果当前运行线程超过了限制就会返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//同一时间只会有一个线程操作成功
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果当前线程池状态被改变则回到上层循环重新获得线程池状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

判断那里在三种情况下会返回false

  1. 当前线程池状态为STOP,TIDYING,TERMINATED
  2. 当前线程池状态为SHUTDOWN而且有第一个任务
  3. 当前线程池状态为SHUTDOWN而且任务队列为空

任务队列和拒绝策略

workQueue任务队列

任务队列一般分为4种,直接提交队列,有界队列,无界队列,优先队列。

SynchronousQueue(直接提交队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public class TestThreadPoolExecutor {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,100, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
for (int i=0;i<5;i++){
threadPoolExecutor.execute(new Thread(new HelloTask(),"线程"+i));
}
}
public static class HelloTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" Hello World! ");
}
}

}

输出结果为:

pool-1-thread-1 Hello World!
pool-1-thread-2 Hello World!
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task Thread[线程2,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.czj.example01.TestThreadPoolExceutor.main(TestThreadPoolExceutor.java:12)

可以看到当队列为SynchronousQueue时,当创建的线程数超过了maxinumPoolSize时就会直接执行拒绝策略。
使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

ArrayBlockingQueue(有界任务队列)

1
2
3
4
5
6
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,100, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(16));
for (int i=0;i<5;i++){
threadPoolExecutor.execute(new Thread(new HelloTask(),"线程"+i));
}
}

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

LinkedBlockingQueue(无界任务队列)

1
2
3
4
5
6
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
for (int i=0;i<5;i++){
threadPoolExecutor.execute(new Thread(new HelloTask(),"线程"+i));
}
}

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

PriorityBlockingQueue(优先队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TestThreadPoolExecutor {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,10,100, TimeUnit.SECONDS,new PriorityBlockingQueue<>());
for (int i=0;i<10;i++){
threadPoolExecutor.execute(new HelloTask("线程"+i,i));
}
}
public static class HelloTask implements Runnable,Comparable<HelloTask>{

private int priority;

private String name;
public HelloTask(String name,int priority) {
this.name = name;
this.priority = priority;
}


@Override
public void run() {
System.out.println(name+" Start Hello World! ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+" End Goodbye ");

}

@Override
public int compareTo(HelloTask o) {
if (this==o){
return 0;
}
int num = this.priority-o.priority;
if (num==0){
return 0;
}
//PriorityBlockingQueue底层实现是最小堆,所以比较越小越优先。
return num>0?-1:1;
}
}

}

输出结果

线程0 Start Hello World!
线程0 End Goodbye
线程9 Start Hello World!
线程9 End Goodbye
线程8 Start Hello World!
线程8 End Goodbye
线程7 Start Hello World!
线程7 End Goodbye
线程6 Start Hello World!
线程6 End Goodbye
线程5 Start Hello World!
线程5 End Goodbye
线程4 Start Hello World!
线程4 End Goodbye
线程3 Start Hello World!
线程3 End Goodbye
线程2 Start Hello World!
线程2 End Goodbye
线程1 Start Hello World!
线程1 End Goodbye

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。