抱歉,您的瀏覽器無法訪問本站
本頁面需要瀏覽器支持(啟用)JavaScript
了解詳情 >

PriorityBlockingQueue简介

PriorityBlockingQueue是一种优先级,无界堵塞,线程安全的队列,每次出队返回优先级高或低的元素,内部通过实现。

upload

源码详解

属性分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

//队列默认大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

//数组最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//存放队列元素
private transient Object[] queue;

//队列元素个数
private transient int size;

//比较器
private transient Comparator<? super E> comparator;

//独占锁
private final ReentrantLock lock;

//当队列为空时,进行出队操作的线程会被放进这个条件队列等待
private final Condition notEmpty;

构造方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//无参构造方法,设置队列大小为默认11,比较器为null
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//比较器为null
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

//主要构造方法,初始化各组件
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

入队操作

offer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();//抛出NPE异常
final ReentrantLock lock = this.lock;
lock.lock();//上锁
int n, cap;
Object[] array;
//如果当前元素个数大于等于队列大小则扩展
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);//调用对象的compareTo方法
else
siftUpUsingComparator(n, e, array, cmp);//使用比较器
size = n + 1;
notEmpty.signal();//唤醒因出队操作而休眠的线程
} finally {
lock.unlock();
}
return true;
}
tryGrow()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //释放获得的锁
Object[] newArray = null;

//尝试获得扩容的锁把
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
//如果队列大小小于64则扩大oldCap+2,否则扩大成原来的50%,最大值为MAX_ARRAY_SIZE
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) //如果newArray为空则代表尝试获得扩容的锁失败了,需要把自己尝试挂起,让给扩容线程工作。
Thread.yield();
lock.lock();//获得锁
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);//真正扩容
}
}

这里比较难理解的地方主要是为什么要释放锁。。。这里主要是因为扩容队列需要时间,比较耗时,因为锁的原因,扩容时无法进行入队操作和出队操作。所以为了提高并发性能就要释放锁拉,小编我也感到很惊讶。

put()
1
2
3
public void put(E e) {
offer(e); // 直接调用offer()。。两个入队方法一样的
}

siftUpComparable()

1
2
3
4
5
6
7
8
9
10
11
12
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;//其实就是(k-1)/2,获得父类节点
Object e = array[parent];
if (key.compareTo((T) e) >= 0)//如果入队元素大于等于父类节点则直接跳出循环
break;
array[k] = e;
k = parent;
}
array[k] = key;
}

出队操作

poll()
1
2
3
4
5
6
7
8
9
10
//这方法没啥就是个壳,作用就上锁。。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
dequeue()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private E dequeue() {
int n = size - 1;
if (n < 0)//队列为空直接返回null
return null;
else {
Object[] array = queue;
E result = (E) array[0];//获得首部元素(出队元素)
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//开始向下调整
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
siftDownComparable()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static <T> void siftDownComparable(int k, T x, Object[] array,                                       int n) {
//判断队列长度是否大于0
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1//获得队列长度的一半;
while (k < half) {
int child = (k << 1) + 1; //获得k*2+1(也就是获得左子节点)
Object c = array[child];//获得左子节点的值
int right = child + 1;//获得右子节点
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];//右子节点比较小
if (key.compareTo((T) c) <= 0)//如果key值比选出来的节点还小就直接跳出循环
break;
array[k] = c;//赋值
k = child;//获得当前最小节点
}
array[k] = key;
}
}
take()
1
2
3
4
5
6
7
8
9
10
11
12
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)//如果当前队列为空则堵塞
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

其他操作

size()
1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();//上锁,保证获取队列长度的时候没有入队出队操作。
try {
return size;
} finally {
lock.unlock();
}
}
remove()
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);//获得移除元素的下标
if (i == -1)
return false;//为空直接返回
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
removeat()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // 如果移除的是最后的元素则直接设为null
array[i] = null;
else {
E moved = (E) array[n];//队列最后一个元素
array[n] = null;
Comparator<? super E> cmp = comparator;
//向下调整
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
//如果向下调整不了则向上调整
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}