//队列数量最大值默认为0x7FFFFFFF 对应 Integer.MAX_VALUE 即 2147483647 private final int capacity;
//原子性变量计算队列元素个数 private final AtomicInteger count = new AtomicInteger(); //头部节点 transient Node<E> head;
//尾部节点 private transient Node<E> last;
//出队操作需要获得该锁 private final ReentrantLock takeLock = new ReentrantLock();
//当队列为空时,进行出队操作的线程会被放进这个条件队列等待 private final Condition notEmpty = takeLock.newCondition();
//入队操作需要获得该锁 private final ReentrantLock putLock = new ReentrantLock();
//当队列已满,进行入队操作的线程会被放进该这个条件队列等待 private final Condition notFull = putLock.newCondition();
构造方法分析
1 2 3 4 5 6 7 8 9 10
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾节点都指向哨兵节点 last = head = new Node<E>(null); }
public boolean offer(E e) { if (e == null) throw new NullPointerException();//判空 final AtomicInteger count = this.count;//获得队列个数 if (count.get() == capacity)//判断队列个数是否等于边界值 return false; int c = -1; Node<E> node = new Node<E>(e);//新建节点 final ReentrantLock putLock = this.putLock; putLock.lock();//获得入队操作的独占锁 try { if (count.get() < capacity)//判断队列个数是否小于边界值{ enqueue(node);//添加进队列尾部 c = count.getAndIncrement();//自增 if (c + 1 < capacity) notFull.signal();//唤醒因队列满而休眠的线程 } } finally { putLock.unlock();//放锁 } if (c == 0) signalNotEmpty();//唤醒因队列为空而休眠的线程 return c >= 0; }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//判空 int c = -1; Node<E> node = new Node<E>(e);//新建节点 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//获得可以被中断的独占锁 try { while (count.get() == capacity){ notFull.await();//因队列个数已满把自己挂起 } enqueue(node);//入队 c = count.getAndIncrement();//自增 if (c + 1 < capacity) notFull.signal();//唤醒因队列满而休眠的线程 } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();//唤醒因队列为空而休眠的线程 }
public E poll() { final AtomicInteger count = this.count;//获得总数 if (count.get() == 0)//如果队列为空则提取退出 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock();//获得出队操作的独占锁 try { if (count.get() > 0){ x = dequeue();出队 c = count.getAndDecrement();递减 if (c > 1) notEmpty.signal();//唤醒因队列为空而休眠的线程 } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();//唤醒因队列满而休眠的线程 return x; }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count;//获得元素个数 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//获得出队操作的可以被中断的独占锁 try { while (count.get() == 0){ notEmpty.await();//队列为空则休眠 } x = dequeue();//出队 c = count.getAndDecrement();//递减 if (c > 1) notEmpty.signal();//唤醒因队列为空而休眠的线程 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();//唤醒因队列为满而休眠的线程 return x; }