博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ArrayBlockingQueue源码阅读(1.8)
阅读量:5933 次
发布时间:2019-06-19

本文共 9026 字,大约阅读时间需要 30 分钟。

ArrayBlockingQueue源码阅读


1、ArrayBlockingQueue类结构

  public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一种实现,要了解它就必须清楚BlockingQueue的相关知识;

2、BlockingQueue接口介绍

  在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!,BlockingQueue的类继承关系如下:

ArrayBlockingQueue源码阅读(1.8)

BlockingQueue接口重要方法如下:


  • offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程)。
  • offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
  • put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
  • poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败,如果不指定超时时间,在没有数据时立即返回失败。
  • take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
  • drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

3、源码分析

3.1、类属性查看

/** The queued items */ 以数组作为数据结构final Object[] items;/** items index for next take, poll, peek or remove */ 队列中下一个将被取出值的下标int takeIndex;/** items index for next put, offer, or add */  队列中下一个将被放入值的下标int putIndex;/** Number of elements in the queue */ 数组元素数量int count;/* * Concurrency control uses the classic two-condition algorithm  使用双条件算法 * found in any textbook.  *//** Main lock guarding all access */ 使用重入锁(独占锁)final ReentrantLock lock;/** Condition for waiting takes */ take时候用于等待的条件private final Condition notEmpty;/** Condition for waiting puts */ put时候用于等待的条件private final Condition notFull;    transient Itrs itrs = null;

3.2、构造函数分析

/**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and default access policy.
  • @param capacity the capacity of this queue
  • @throws IllegalArgumentException if {@code capacity < 1}

    */

    public ArrayBlockingQueue(int capacity) {    this(capacity, false);  //调用public ArrayBlockingQueue(int capacity, boolean fair)构造方法,默认使用非公平锁}

    /**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and the specified access policy.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order; //如果传入的值为true即公平锁,则需要维护一个有序队列,保证先进先出的原则
  • if {@code false} the access order is unspecified.
  • @throws IllegalArgumentException if {@code capacity < 1}
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {    if (capacity <= 0)        throw new IllegalArgumentException();    this.items = new Object[capacity]; //创建指定容量的数组    lock = new ReentrantLock(fair); //默认使用非公平锁    notEmpty = lock.newCondition();    notFull =  lock.newCondition();}

    /**

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity, the specified access policy and initially containing the
  • elements of the given collection,
  • added in traversal order of the collection's iterator.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order;
  • if {@code false} the access order is unspecified.
  • @param c the collection of elements to initially contain 使用指定集合初始化队列
  • @throws IllegalArgumentException if {@code capacity} is less than
  • {@code c.size()}, or less than 1.
  • @throws NullPointerException if the specified collection or any
  • of its elements are null

    */
    //这个构造函数的核心就是c.size()与capacity的大小关系对比了
    //如果c.size()>capacity那就会报错,所以在初始化的时候要注意

    public ArrayBlockingQueue(int capacity, boolean fair,                          Collection
    c) { this(capacity, fair); //先创建指定容量的数组,以便集合中的元素存放 //这种写法我们很常见,使用final表示引用不能改变,但又避免了直接使用成员变量 final ReentrantLock lock = this.lock; //对队列直接修改操作,需要先获取独占锁 lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; //下标从0开始存放 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //将数组元素个数返回给全局变量 putIndex = (i == capacity) ? 0 : i; 初始化入队索引 } finally { lock.unlock(); //解锁 }}

    3.3、入队列方法

    • add(E e) 方法,源码如下:
      //调用父类AbstractQueue的方法
      //在队列末尾(数组)插入指定的元素,前提是队列有空余空间,且指定元素不为空
      public boolean add(E e) {return super.add(e);}

      //父类AbstractQueue的方法

      public boolean add(E e) {if (offer(e)) //调用offer方法添加元素,不阻塞当前线程(等待)    return true;else    throw new IllegalStateException("Queue full");}
    • offer(E e)方法,源码如下:

      //如果队列可以容纳则立即返回成功,否则失败,不阻塞当前线程,是官方推荐使用的方法

      public boolean offer(E e) {checkNotNull(e); //检查元素是否为空final ReentrantLock lock = this.lock;lock.lock();//获取独占锁try {    if (count == items.length) //当前数组已满,立即返回失败        return false;    else {        enqueue(e);        return true;     }} finally {    lock.unlock(); //解锁}}

      //插入元素

      private void enqueue(E x) {// assert lock.getHoldCount() == 1;  当前线程调用lock()的次数// assert items[putIndex] == null; 当前位置没值final Object[] items = this.items;items[putIndex] = x; //在指定的位置插入元素if (++putIndex == items.length)    putIndex = 0;count++;  //更新数组元素个数notEmpty.signal();//通知被take方法读取元素阻塞等待的线程(前提是该线程持有锁)}

      put(E e)方法,如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续,其源码如下:

      public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //申请锁的过程可被外界打断(中断响应,不会无限制等待)try {    while (count == items.length)        notFull.await(); //队列已经满了,则使用put的条件等待(此方法与object.wait()方法不同,不释放锁),之道有空闲空间继续执行下一步enqueue(e);    enqueue(e);} finally {    lock.unlock(); //解锁}}

      //offer(E,long,TimeUnit)会在等待一段时间后返回,但是等待的过程中是可以响应中断的

      public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //响应中断try {    while (count == items.length) {        if (nanos <= 0)            return false;        nanos = notFull.awaitNanos(nanos);    }    enqueue(e);    return true;} finally {    lock.unlock();}}

3.4、出队列方法

3.4.1、 E poll方法
public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return (count == 0) ? null : dequeue(); //数组不为空,调用dequeue取出takeIndex下标位置上的元素,并且会唤醒等待notFull条件的线程        } finally {            lock.unlock();        }    }
3.4.2 E take() 方法
public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await(); //队列为空,则阻塞当前队列            return dequeue(); //队列有元素,则调用dequeue        } finally {            lock.unlock();        }    }

E dequeue()阅读:

private E dequeue() {        // assert lock.getHoldCount() == 1;        // assert items[takeIndex] != null;        final Object[] items = this.items;        @SuppressWarnings("unchecked")        E x = (E) items[takeIndex]; //获取takeIndex下标处的元素        items[takeIndex] = null; //将当前位置的元素设置为null                //这里可以看出这个数组是个环形数组,其实取元素,总是从队列头部开始,即items[0]        if (++takeIndex == items.length)            takeIndex = 0;        count--; //更新数组元素个数        if (itrs != null)            itrs.elementDequeued(); //修改迭代器参数        notFull.signal(); //通知阻塞在putIndex操作的线程        return x;    }
3.4.3、 drainTo一次性返回多个(默认Integer.MAX_VALUE)元素(放到集合中)
public int drainTo(Collection
c) { return drainTo(c, Integer.MAX_VALUE); }
public int drainTo(Collection
c, int maxElements) { checkNotNull(c); if (c == this) //存放返回元素的集合不能使当前队列 throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); //要取出的元素不能大于数组元素个数 int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); //将takerInex下标处的元素放入集合中 items[take] = null; //对应位置的元素设置为null if (++take == items.length) take = 0; // 环形数组 i++; } return n; //返回获取到的元素个数 } finally { // Restore invariants even if c.add() threw //更新状态,即使操作失败也要还原 if (i > 0) {// count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); //唤醒阻塞在此条件(putIndex)的线程 } } } finally { lock.unlock(); } }

转载于:https://blog.51cto.com/3265857/2321570

你可能感兴趣的文章
HTTP状态码详解
查看>>
简单团队-爬取豆瓣电影TOP250-代码设计规范
查看>>
简单几何(向量旋转+凸包+多边形面积) UVA 10652 Board Wrapping
查看>>
并查集(加权) LA 4487 Exclusive-OR
查看>>
mysql
查看>>
软工概论第十四周总结
查看>>
asp.net 同时执行js事件和代码事件 导出 excel
查看>>
Android混合应用开发之页面跳转
查看>>
java继承后构造函数的继承机制
查看>>
ASP.NET编译执行常见错误及解决方法汇总之一
查看>>
ORACLE 12C RMAN 功能增强
查看>>
JAVA对象的序列化和反序列化
查看>>
NYOJ477A+B Problem III
查看>>
C# 自动运行代码 (创建windows 服务的形式 )
查看>>
Python的几个常用模块
查看>>
WPF柱状图(支持数据库动态更新)
查看>>
对于这次宋老师布置的团队项目所使用的开发模式的想法
查看>>
Android中运行的错误:java.lang.UnsatisfiedLinkError: Couldn't load locSDK3: findLibrary returned null....
查看>>
react传递方法 警告
查看>>
linux FTP配置详解
查看>>