JUC ArrayBlockingQueue

本文首先发表在 码蜂笔记

java.util.concurrent.ArrayBlockingQueue 是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类基于 java.util.concurrent.locks.ReentrantLock 来实现线程安全,所以提供了 ReentrantLock 所能支持的公平性选择。

属性

队列的操作主要有读、写,所以用了两个 int 类型的属性作为下一个读写位置的的指针。存放元素的数组是 final 修饰的,所以数组的大小是固定的。对于并发控制,是所有的访问都必须加锁,并用两个条件对象用于协调读写操作。

// 队列存放元素的容器
final Object[] items;

// 下一次读取或移除的位置
int takeIndex;

// 存放下一个放入元素的位置
int putIndex;

// 队列里有效元素的数量
int count;

// 所有访问的保护锁
final ReentrantLock lock;

// 等待获取的条件
private final Condition notEmpty;

// 等待放入的条件
private final Condition notFull;

环绕处理

如果指针一直往前增加或一直往后减小,那么总会超出数组的有效索引范围。所以需要进行一些环绕处理。

// 指针前移
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

// 指针后移
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

注意,上面的处理都是对指针值的直接处理,而不关心是读指针还是写指针,因为是否有可读元素、可写空间的判断是通过对 count 计数来判断的。

这也是 count 的作用,它极大地简化了指针有效性的判断。在下面的 insert 和 extract 方法中根本就不需要对读写指针之间的位置关系进行判断,非常精妙。

通过环绕处理可以把这个数组看成是圆形的缓存。

添加元素

所有添加操作最终都是调用到内部方法 insert

// 在持有锁的前提下调用
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex); // 指针前移 1
    ++count; // 有效元素数量加 1
    notEmpty.signal(); // 通知在非空条件上等待的读线程
}

读取元素

所有读取操作最终都是调用到内部方法 extract

// 在持有锁的前提下调用
private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null; // for GC,避免内存泄露;也用于判断元素是否被移除
    takeIndex = inc(takeIndex); // 指针前移 1
    --count; // 有效元素数量减 1
    notFull.signal(); // 通知在非满条件上等待的写线程
    return x;
}

移除指定位置元素

// 在持有锁的前提下调用
void removeAt(int i) {
    final Object[] items = this.items;
    // 如果要移除是元素就是下一个可读数据,直接移除、修改读指针即可。
    // 这是一种优化,避免数据拷贝。
    if (i == takeIndex) {
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else {
      // 如果要移除元素是在有效数据的中间,那么要把它之后添加的元素后移
      // 注意:这里不能用读写指针的大小关系作为终结条件,也是因为环绕。
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null; // for GC
                putIndex = i; // putIndex 不是直接减 1 还是因为环绕。
                break;
            }
        }
    }
    --count;
    notFull.signal(); //
}

方法加锁

作为线程安全的类,ArrayBlockingQueue 的所有公开方法的逻辑都是在加锁的前提下进行的。这里以put方法为例。

通过 put 方法添加元素时,线程会一直等待,直到有空闲空间可以放入元素。

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不允许存空值,JUC下线程安全的容器都不允许存空值。

    // 在JUC的很多类里,都会看到这种写法:把类的属性赋值给方法内的一个变量。
    // 这是因为类的属性是存放在堆里的,方法内的变量是存放在方法栈上的,访问方法栈比访问堆要快。
    // 在这里,this.lock属性要访问两次,通过赋值给方法的局部变量,就节省了一次堆的访问。
    // 其他的类属性只访问一次就不需要这样处理了。优化无处不在!!
    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();  // 加锁
    try {
      // 放在循环里是避免虚假唤醒
      // 容器满的时候持续等待
        while (count == items.length)
             // await 方法会导致当前线程释放锁,等待其他线程唤醒,唤醒后重新获取锁
            notFull.await();

        insert(e);
    } finally {
        lock.unlock();  // 释放锁
    }
}

迭代

根据DOC文档说明,此类 iterator() 方法每次返回的 Iterator 是一个“弱一致”的迭代器,从不抛出 ConcurrentModificationException,并且确保可遍历迭代器构造时存在的元素,此外还可能(但并不保证)反映构造后的所有修改。

内部的迭代器类:

private class Itr implements Iterator<E> {
    private int remaining; // 剩余可返回元素数量
    private int nextIndex; // 下一个可返回元素的位置
    private E nextItem;    // 下一个可返回的元素
    private E lastItem;    // 上一次返回的元素
    private int lastRet;   // 上次返回的元素的位置,-1 表示没有。

    Itr() {
        final ReentrantLock lock = ArrayBlockingQueue. this.lock;
        lock.lock();
        try {
            lastRet = -1;
            // 初始化的时候记录容器的当前存在元素(通过记录数  count 和读指针 takeIndex)。
            if ((remaining = count) > 0)
                nextItem = itemAt(nextIndex = takeIndex);
        } finally {
            lock.unlock();
        }
    }

    public boolean hasNext() {
        return remaining > 0;
    }

    public E next() {
        final ReentrantLock lock = ArrayBlockingQueue. this.lock;
        lock.lock();
        try {
            if (remaining <= 0)
                throw new NoSuchElementException();

            lastRet = nextIndex;

            E x = itemAt(nextIndex);  // 检查元素是否被更新
            if (x == null) { // 该位置元素被移除了
                x = nextItem;         // 只能被迫返回旧值
                lastItem = null ;      // 用于使移除操作失败
            } else {
                lastItem = x;
            }

            // 跳过空元素(也就是迭代器创建之后被移除的元素)
            while (--remaining > 0 && // skip over nulls
                   (nextItem = itemAt(nextIndex = inc(nextIndex))) == null )
                   // 设置下一次要返回的元素
                ;

            return x;
        } finally {
            lock.unlock();
        }
    }

    public void remove() {
        final ReentrantLock lock = ArrayBlockingQueue. this.lock;
        lock.lock();
        try {
            int i = lastRet;
            if (i == -1)
                throw new IllegalStateException();
            lastRet = -1;
            E x = lastItem;
            lastItem = null ;
            // only remove if item still at index
            if (x != null && x == items[i]) {
                  // 只有在上次调用 next() 方法时候当前遍历位置的元素仍然在那里时才移除
                boolean removingHead = (i == takeIndex);
                removeAt(i);
                if (!removingHead)
                    nextIndex = dec(nextIndex);
            }
        } finally {
            lock.unlock();
        }
    }
}

从迭代器的源码可以看出,它只能感知在它创建时有效元素位置上的变化(删除、被替换),而不能感知新增的元素。

关于线程安全类的使用注意

《Java 并发编程实践》提到:就算代码都是用基于线程安全类构建的程序也不一定就是线程安全的。

static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

for (Iterator<Integer> iterator = queue.iterator(); iterator.hasNext(); ) {
      iterator.next();
      iterator.remove();
}

在上面的代码片段里,如果 queue 是可以被多个线程访问的,那么上面的代码就不是线程安全的,因为 iterator 创建之后,hasNext、next、remove 这三个方法的调用之间,就有可能被其他线程修改了 queue ,导致抛出异常 NoSuchElementException。

线程安全类只保证了它的单个方法是线程安全的,如果要确保多个方法调用之间还是线程安全的,就必须使用另外的同步进行保证,而且要用同一个锁来保护,比如:

synchronized (queue) {
       for (Iterator<Integer> iterator = queue.iterator(); iterator.hasNext(); ) {
            iterator.next();
            iterator.remove();
      }
}

文章转自 并发编程网-ifeve.com

时间: 2024-10-23 02:30:20

JUC ArrayBlockingQueue的相关文章

Java多线程:“JUC原子类”03之AtomicLongArray原子类

AtomicLongArray介绍和函数列表 在"Java多线程系列--"JUC原子类"02之 AtomicLong原子类"中介绍过, AtomicLong是作用是对长整形进行原子操作.而AtomicLongArray的作用则是对"长整形数组" 进行原子操作. AtomicLongArray函数列表 // 创建给定长度的新 AtomicLongArray. AtomicLongArray(int length) // 创建与给定数组具有相同长度的

Java多线程:“JUC原子类”01之框架

根据修改的数据类型,可以将JUC包中的原子操作类可以分为4类. 1. 基本类型: AtomicInteger, AtomicLong, AtomicBoolean ; 2. 数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray ; 3. 引用类型: AtomicReference, AtomicStampedRerence, AtomicMarkableReference ; 4. 对象的属性修改类型: AtomicInt

Java多线程:“JUC锁”04之公平锁(二)

概要 前面一章,我们学习了"公平锁"获取锁的详细流程:这里,我们再来看看"公平锁 "释放锁的过程.内容包括: 参考代码 释放公平锁(基于JDK1.7.0_40) "公平锁"的获取过程请参考"Java多线程系列--"JUC锁"03之 公平锁 (一)",锁的使用示例请参考"Java多线程系列--"JUC锁"02之 互斥锁 ReentrantLock". 注意: (01)

Java多线程:“JUC锁”03之公平锁(一)

基本概念 本章,我们会讲解"线程获取公平锁"的原理:在讲解之前,需要了解几个基本概念.后 面的内容,都是基于这些概念的:这些概念可能比较枯燥,但从这些概念中,能窥见"java锁 "的一些架构,这对我们了解锁是有帮助的. 1. AQS        -- 指AbstractQueuedSynchronizer类. AQS 是java中管理"锁"的抽象类,锁的许多公共方法都是在这个类中实现.AQS是独占锁(例如, ReentrantLock)和共享锁

Java多线程:“JUC锁”01之框架

根据锁的添加到Java中的时间,Java中的锁,可以分为"同步锁"和 "JUC包中的锁". 同步锁 即通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁.Java 1.0版本中就已经支 持同步锁了. 同步锁的原理是,对于每一个对象,有且仅有一个同步锁:不同的线程能共同访问该同步锁.但是, 在同一个时间点,该同步锁能且只能被一个线程获取到.这样,获取到同步锁的线程就能进行CPU调度, 从而在CPU上执行:而没有获取到同步锁的线程,必须进行等待,

SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue性能测试(转)

听说JDK6对SynchronousQueue做了性能优化,避免对竞争资源加锁,所以想试试到底平时是选择SynchronousQueue还是其他BlockingQueue.   对于容器类在并发环境下的比较,一是是否线程安全,二是并发性能如何.BlockingQueue的实现都是线程安全的,所以只能比比它们的并发性能了.在不同的应用场景中,对容器的使用情况不同,有的读取操作多修改写入操作少,有的修改写入操作多,这对容器的性能会造成不同的影响.但对于Queue的使用,个人认为是比较一致的,简单点就

java之JUC系列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch

前面写了两篇JDBC源码的文章,自己都觉得有点枯燥,先插一段JUC系列的文章来换换胃口,前面有文章大概介绍过J U C包含的东西,JUC体系包含的内容也是非常的多,不是一两句可以说清楚的,我这首先列出将会列举的JUC相关的内容,然后介绍本文的版本:Tools部分 J.U.C体系的主要大板块包含内容,如下图所示: 注意这个里面每个部分都包含很多的类和处理器,而且是相互包含,相互引用的,相互实现的. 说到J UC其实就是说java的多线程等和锁,前面说过一些状态转换,中断等,我们今天来用它的tool

JUC中Atomic class之lazySet的一点疑惑

JUC中Atomic class之lazySet的一点疑惑 最近再次翻netty和disrupt的源码, 发现一些地方使用AtomicXXX.lazySet()/unsafe.putOrderedXXX系列, 以前一直没有注意lazySet这个方法, 仔细研究一下发现很有意思.我们拿AtomicReferenceFieldUpdater的set()和lazySet()作比较, 其他AtomicXXX类和这个类似. public void set(T obj, V newValue) { // .

ArrayBlockingQueue源码分析

ArrayBlockingQueue是一个有界的阻塞队列,底层维护的是一个数组. 遵循先进先出FIFO,从尾部插入,从头部取出.如果队列已满,插入操作将阻塞,如果队列是空的,从队列里面取出元素也会阻塞. 构造方法 /* * fair 当多线程同时访问时,采用公平锁,还是非公平锁,默认 false 非公平锁 * * 公平锁:先发出请求的线程将先执行 * 非公平锁:哪个线程的请求先执行的顺序不确定 */ public ArrayBlockingQueue(int capacity, boolean