本文共 9026 字,大约阅读时间需要 30 分钟。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一种实现,要了解它就必须清楚BlockingQueue的相关知识;
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!,BlockingQueue的类继承关系如下:
BlockingQueue接口重要方法如下:
/** The queued items */ 以数组作为数据结构final Object[] items;/** items index for next take, poll, peek or remove */ 队列中下一个将被取出值的下标int takeIndex;/** items index for next put, offer, or add */ 队列中下一个将被放入值的下标int putIndex;/** Number of elements in the queue */ 数组元素数量int count;/* * Concurrency control uses the classic two-condition algorithm 使用双条件算法 * found in any textbook. *//** Main lock guarding all access */ 使用重入锁(独占锁)final ReentrantLock lock;/** Condition for waiting takes */ take时候用于等待的条件private final Condition notEmpty;/** Condition for waiting puts */ put时候用于等待的条件private final Condition notFull; transient Itrs itrs = null;
/**
@throws IllegalArgumentException if {@code capacity < 1}
*/public ArrayBlockingQueue(int capacity) { this(capacity, false); //调用public ArrayBlockingQueue(int capacity, boolean fair)构造方法,默认使用非公平锁}
/**
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //创建指定容量的数组 lock = new ReentrantLock(fair); //默认使用非公平锁 notEmpty = lock.newCondition(); notFull = lock.newCondition();}
/**
of its elements are null
*///这个构造函数的核心就是c.size()与capacity的大小关系对比了//如果c.size()>capacity那就会报错,所以在初始化的时候要注意public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); //先创建指定容量的数组,以便集合中的元素存放 //这种写法我们很常见,使用final表示引用不能改变,但又避免了直接使用成员变量 final ReentrantLock lock = this.lock; //对队列直接修改操作,需要先获取独占锁 lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; //下标从0开始存放 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //将数组元素个数返回给全局变量 putIndex = (i == capacity) ? 0 : i; 初始化入队索引 } finally { lock.unlock(); //解锁 }}
public boolean add(E e) {return super.add(e);}
//父类AbstractQueue的方法
public boolean add(E e) {if (offer(e)) //调用offer方法添加元素,不阻塞当前线程(等待) return true;else throw new IllegalStateException("Queue full");}
offer(E e)方法,源码如下:
//如果队列可以容纳则立即返回成功,否则失败,不阻塞当前线程,是官方推荐使用的方法public boolean offer(E e) {checkNotNull(e); //检查元素是否为空final ReentrantLock lock = this.lock;lock.lock();//获取独占锁try { if (count == items.length) //当前数组已满,立即返回失败 return false; else { enqueue(e); return true; }} finally { lock.unlock(); //解锁}}
//插入元素
private void enqueue(E x) {// assert lock.getHoldCount() == 1; 当前线程调用lock()的次数// assert items[putIndex] == null; 当前位置没值final Object[] items = this.items;items[putIndex] = x; //在指定的位置插入元素if (++putIndex == items.length) putIndex = 0;count++; //更新数组元素个数notEmpty.signal();//通知被take方法读取元素阻塞等待的线程(前提是该线程持有锁)}
put(E e)方法,如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续,其源码如下:
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //申请锁的过程可被外界打断(中断响应,不会无限制等待)try { while (count == items.length) notFull.await(); //队列已经满了,则使用put的条件等待(此方法与object.wait()方法不同,不释放锁),之道有空闲空间继续执行下一步enqueue(e); enqueue(e);} finally { lock.unlock(); //解锁}}
//offer(E,long,TimeUnit)会在等待一段时间后返回,但是等待的过程中是可以响应中断的
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //响应中断try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true;} finally { lock.unlock();}}
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); //数组不为空,调用dequeue取出takeIndex下标位置上的元素,并且会唤醒等待notFull条件的线程 } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //队列为空,则阻塞当前队列 return dequeue(); //队列有元素,则调用dequeue } finally { lock.unlock(); } }
E dequeue()阅读:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //获取takeIndex下标处的元素 items[takeIndex] = null; //将当前位置的元素设置为null //这里可以看出这个数组是个环形数组,其实取元素,总是从队列头部开始,即items[0] if (++takeIndex == items.length) takeIndex = 0; count--; //更新数组元素个数 if (itrs != null) itrs.elementDequeued(); //修改迭代器参数 notFull.signal(); //通知阻塞在putIndex操作的线程 return x; }
public int drainTo(Collection c) { return drainTo(c, Integer.MAX_VALUE); }
public int drainTo(Collection c, int maxElements) { checkNotNull(c); if (c == this) //存放返回元素的集合不能使当前队列 throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); //要取出的元素不能大于数组元素个数 int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); //将takerInex下标处的元素放入集合中 items[take] = null; //对应位置的元素设置为null if (++take == items.length) take = 0; // 环形数组 i++; } return n; //返回获取到的元素个数 } finally { // Restore invariants even if c.add() threw //更新状态,即使操作失败也要还原 if (i > 0) {// count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); //唤醒阻塞在此条件(putIndex)的线程 } } } finally { lock.unlock(); } }
转载于:https://blog.51cto.com/3265857/2321570