抱歉,您的瀏覽器無法訪問本站
本頁面需要瀏覽器支持(啟用)JavaScript
了解詳情 >

ArrayBlockingQueue简介

ArrayBlockingQueue是一个有界,堵塞/非堵塞线程安全的队列。它是基于数组实现的,内部通过ReentrantLock独占锁来控制入队出队操作,通过两个条件变量控制队列为空为满的情况。

ArrayBlockingQueue

源码详解

成员分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//存放元素的数组
final Object[] items;

//出队操作的下标
int takeIndex;

//入队操作的下标
int putIndex;

//元素个数
int count;

//入队操作和出队操作的独占锁
final ReentrantLock lock;

//当队列为空时,进行出队操作的线程会被放进这个条件队列等待
private final Condition notEmpty;

//当队列已满,进行入队操作的线程会被放进该这个条件队列等待
private final Condition notFull;

构造方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];//创建指定大小的数组
lock = new ReentrantLock(fair);//创建锁
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//根据指定集合创建数组
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion//上锁
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);//判空
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

三种构造方法都要指定队列大小,另外fair参数控制独占锁是否为公平非公平锁。

入队操作

offer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);//判空
final ReentrantLock lock = this.lock;
lock.lock();//上锁
try {
if (count == items.length)//如果队列已满直接返回false
return false;
else {
enqueue(e);//把值添加到队列尾部
return true;
}
} finally {
lock.unlock();
}
}
enqueue(E x)
1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;//赋值
if (++putIndex == items.length)//如果入队下标等于队列边界则把入队下标设置为0
putIndex = 0;
count++;
notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获得可以中断的独占锁
try {
while (count == items.length)//如果队列已满则休眠
notFull.await();
enqueue(e);入队
} finally {
lock.unlock();
}
}

offer()和put()都可以用来添加新的元素,但offer()入队时发现队列已满会直接返回false,put()则会堵塞等待队列不为满在添加。

出队操作

poll()
1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//上锁
try {
return (count == 0) ? null : dequeue();//如果队列个数为空则直接返回null否则调用dequeue()
} finally {
lock.unlock();
}
}
dequeue()
1
2
3
4
5
6
7
8
9
10
11
12
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];//根据出队下标获得值
items[takeIndex] = null;
if (++takeIndex == items.length)//计算出队下标的值,如果等于队列长度则设置为0
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
take()
1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获得可以被中断的独占锁
try {
while (count == 0)//如果队列为空则休眠
notEmpty.await();
return dequeue();//进行出队操作
} finally {
lock.unlock();
}
}

poll()和take()都可以进行出队操作,区别是运行poll()时如果队列为空则直接返回null,take()则会堵塞当前线程直到队列不为空。