PriorityBlockingQueue简介
PriorityBlockingQueue是一种优先级,无界堵塞,线程安全的队列,每次出队返回优先级高或低的元素,内部通过堆 实现。
源码详解 属性分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 //队列默认大小 private static final int DEFAULT_INITIAL_CAPACITY = 11; //数组最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //存放队列元素 private transient Object[] queue; //队列元素个数 private transient int size; //比较器 private transient Comparator<? super E> comparator; //独占锁 private final ReentrantLock lock; //当队列为空时,进行出队操作的线程会被放进这个条件队列等待 private final Condition notEmpty;
构造方法分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //无参构造方法,设置队列大小为默认11,比较器为null public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } //比较器为null public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } //主要构造方法,初始化各组件 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
入队操作 offer() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean offer(E e) { if (e == null) throw new NullPointerException();//抛出NPE异常 final ReentrantLock lock = this.lock; lock.lock();//上锁 int n, cap; Object[] array; //如果当前元素个数大于等于队列大小则扩展 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array);//调用对象的compareTo方法 else siftUpUsingComparator(n, e, array, cmp);//使用比较器 size = n + 1; notEmpty.signal();//唤醒因出队操作而休眠的线程 } finally { lock.unlock(); } return true; }
tryGrow() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //释放获得的锁 Object[] newArray = null; //尝试获得扩容的锁把 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { //如果队列大小小于64则扩大oldCap+2,否则扩大成原来的50%,最大值为MAX_ARRAY_SIZE try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) //如果newArray为空则代表尝试获得扩容的锁失败了,需要把自己尝试挂起,让给扩容线程工作。 Thread.yield(); lock.lock();//获得锁 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap);//真正扩容 } }
这里比较难理解的地方主要是为什么要释放锁。。。这里主要是因为扩容队列需要时间,比较耗时,因为锁的原因,扩容时无法进行入队操作和出队操作。所以为了提高并发性能就要释放锁拉,小编我也感到很惊讶。
put() 1 2 3 public void put(E e) { offer(e); // 直接调用offer()。。两个入队方法一样的 }
siftUpComparable() 1 2 3 4 5 6 7 8 9 10 11 12 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1;//其实就是(k-1)/2,获得父类节点 Object e = array[parent]; if (key.compareTo((T) e) >= 0)//如果入队元素大于等于父类节点则直接跳出循环 break; array[k] = e; k = parent; } array[k] = key; }
出队操作 poll() 1 2 3 4 5 6 7 8 9 10 //这方法没啥就是个壳,作用就上锁。。 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } }
dequeue() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private E dequeue() { int n = size - 1; if (n < 0)//队列为空直接返回null return null; else { Object[] array = queue; E result = (E) array[0];//获得首部元素(出队元素) E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; //开始向下调整 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
siftDownComparable() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { //判断队列长度是否大于0 if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1//获得队列长度的一半; while (k < half) { int child = (k << 1) + 1; //获得k*2+1(也就是获得左子节点) Object c = array[child];//获得左子节点的值 int right = child + 1;//获得右子节点 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right];//右子节点比较小 if (key.compareTo((T) c) <= 0)//如果key值比选出来的节点还小就直接跳出循环 break; array[k] = c;//赋值 k = child;//获得当前最小节点 } array[k] = key; } }
take() 1 2 3 4 5 6 7 8 9 10 11 12 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null)//如果当前队列为空则堵塞 notEmpty.await(); } finally { lock.unlock(); } return result; }
其他操作 size() 1 2 3 4 5 6 7 8 9 public int size() { final ReentrantLock lock = this.lock; lock.lock();//上锁,保证获取队列长度的时候没有入队出队操作。 try { return size; } finally { lock.unlock(); } }
remove() 1 2 3 4 5 6 7 8 9 10 11 12 13 public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(o);//获得移除元素的下标 if (i == -1) return false;//为空直接返回 removeAt(i); return true; } finally { lock.unlock(); } }
removeat() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void removeAt(int i) { Object[] array = queue; int n = size - 1; if (n == i) // 如果移除的是最后的元素则直接设为null array[i] = null; else { E moved = (E) array[n];//队列最后一个元素 array[n] = null; Comparator<? super E> cmp = comparator; //向下调整 if (cmp == null) siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); //如果向下调整不了则向上调整 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; }