LinkedBlockingQueue源码解读

Node LinkedBlockingQueue链表节点,单向节点

//Node节点类,单向节点
  static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         继任节点
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
            next为空意味当前节点为链尾
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

构造方法

//默认无界队列
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

LinkedBlockingQueue指定容量,链头和链尾都非空对象但item为空

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

全局变量

 /** The capacity bound, or Integer.MAX_VALUE if none */
    //链表容量,默认Integer.MAX_VALUE,即无界队列
    private final int capacity;

    /** Current number of elements */
    //当前队列元素总量
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list. 链头
     * Invariant: head.item == null //不变形:链头元素永为空
     */
    transient Node<E> head;

    /**
     * Tail of linked list. 链尾
     * Invariant: last.next == null //不变形:链尾元素后再无元素
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    //拿锁,在 take, poll等方法时会请求
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    //队列非空条件,以便通知队列进行取元素
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    //插入锁,在  put, offer等方法时会请求
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    //队列非空条件,以便同意队列进行插入元素
    private final Condition notFull = putLock.newCondition();

signalNotEmpty链表非空,然后signal(通知) takeLock进行获取元素

//仅在put/offer后调用
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//强制拿锁
        try {
            notEmpty.signal();//触发signal
        } finally {
            takeLock.unlock();
        }
    }

signalNotFull链表非满,然后signal(通知) putLock进行插入元素

//仅在take/poll后调用
 private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

enqueue插入元素

 private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        //putLock必须获取当前锁
        // assert last.next == null;
        //队列尾必须为空
        //新node指向原队列尾元素的next(链下个对象),然后再指向last(链尾)
        last = last.next = node;
    }

dequeue取元素

 private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        //当前线程必须持有takeLock
        // assert head.item == null;
        //当前链头元素必须为空
        //备份链头
        Node<E> h = head;
        //备份链次元素(实际要取出的元素,定义为first)
        Node<E> first = h.next;
        h.next = h; // help GC //原链头h已经无作用
        head = first;//first指向head,first成为新链头
        E x = first.item;//取出目标元素
        first.item = null;//置空(LinkBlockingQueue规范)
        return x;
    }

fullyLock&fullyUnlock全局拿锁和放锁

  void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

Collection插入到队列

 public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);//默认无界队列
        //获取pulLock,构造函数中pulLock从无竞争,但需要保证可见性
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                //不支持空对象
                if (e == null)
                    throw new NullPointerException();
                //不允许超过限度
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);//原子更新
        } finally {
            putLock.unlock();
        }
    }

put放入对象

 public void put(E e) throws InterruptedException {
        //不支持空对象
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        //预创建操作标记(-1即无操作)
        int c = -1;
        //创建节点
        Node<E> node = new Node<E>(e);
        //备份(副本)putLock和当前总量
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //请求putLock锁(可打断)
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
             //容量已满则等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //执行过插入至链尾
            enqueue(node);
            //原子自增一位,返回旧值
            c = count.getAndIncrement();
            //链表还没有满,通知其他线程执行插入
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
    }

offer带时间限制的插入,返回操作结果

  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //严禁非空对象
        if (e == null) throw new NullPointerException();
        //获取等待时间
        long nanos = unit.toNanos(timeout);
        //预创建操作标记(-1即无操作)
        int c = -1;
        //获取putLock锁和当前链表容量
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //可打断的请求锁
        putLock.lockInterruptibly();
        try {
            //如果已满载,并且
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                //等待nanos(毫秒)秒,期间收到signal则返回(nanos-等待时间),否则在等待结束后返回0或负数
                //可打断并返回InterruptedException
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));//插入元素
            c = count.getAndIncrement();//获取最新容量高
            if (c + 1 < capacity)//未满载则通知其他线程进行put/offer
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
        if (c == 0)//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
            signalNotEmpty();
        return true;
    }

offer尝试插入(一旦尝试插入则一直等待直至成功)

   public boolean offer(E e) {
        //元素不能为空
        if (e == null) throw new NullPointerException();
        //当前链表容量
        final AtomicInteger count = this.count;
        //满的话则返回false
        if (count.get() == capacity)
            return false;
        //预创建操作标记(-1即无操作)
        int c = -1;
        Node<E> node = new Node<E>(e);
        //获取putlock
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //未满载
            if (count.get() < capacity) {
                enqueue(node);//插入元素
                c = count.getAndIncrement();
                //队列未满则继续通知插入
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)//存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
            signalNotEmpty();//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
        return c >= 0;//链表有元素则表示插入成功
    }

take取元素(可打断)

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //如果当前链表空,则等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            //取出首元素(first) E
            x = dequeue();
            c = count.getAndDecrement();//链表容量(原子)减一,并返回旧值
            //链表非空则继续通知其他线程来取
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();//放锁
        }
        //如果链满则通知其他putLock等待的线程进行取元素
        //意思是,takeLock和putLock同时进行时,putLock一直在放元素,true表示有一条线程在等待插入元素
        if (c == capacity)
            signalNotFull();
        return x;
    }

poll取元素,有时间

   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;//定义待取得元素
        int c = -1;//操作标记
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可打断请求
        try {
            //如果链表无元素,则执行等待,直至耗时完毕
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//取元素
            c = count.getAndDecrement();//获取原来链表长度然后长度减一
            if (c > 1)//如果链长度大于1,证明还有链还有元素,通知其他等待的takeLock执行取元素
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)//原链长度达至最大长度,即在取完元素后还可以放一个元素,所以执行通知putLock进行放元素
            signalNotFull();
        return x;
    }

poll取元素

  public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)//当前链无元素,直接返回
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//获取锁,否则一直等待
        try {
            if (count.get() > 0) {//获取锁成功,并链有元素
                x = dequeue();//取元素
                c = count.getAndDecrement();//获取原链表长度然后实际长度减一
                if (c > 1)//原链表长度还有元素则通知继续进行通知其他线程取元素
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //原链表长度达至满,则表示刚取完还可以放一个元素,所以执行通知
        if (c == capacity)
            signalNotFull();
        return x;
    }

peek取元素,但不拆链

  public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

unlink 拆链,将trail的下个元素p从链中拆除

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();//必须获takeLock和putLock,合称fullyLock();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.

        p.item = null;//p元素item置null
        trail.next = p.next;//trail和(p.next)建立链关系
        //如果原p就是尾元素,则置trail为尾元素
        if (last == p)
            last = trail;
        //获取原链长度并减一,原链长度等于限额则表示链未满,则通知进行插入
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

remove拆除item所在的链

   public boolean remove(Object o) {
        //LinkedBlockingQueue允许null的item,除了head和last
        if (o == null) return false;
        fullyLock();//全局锁
        try {
            //迭代链表,发现首元素则拆除链
            //
            for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

contains o是否存在链表中

   public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            //p为空则表示到达链尾,否则原本就是空链
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }

toArray迭代元素返回数组

 public Object[] toArray() {
        fullyLock();//获取全局锁
        try {
            int size = count.get();
            Object[] a = new Object[size];
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item;
            return a;
        } finally {
            fullyUnlock();
        }
    }

toArray迭代元素,然后插入数组a

 public <T> T[] toArray(T[] a) {
        fullyLock();//获取全锁
        try {
            int size = count.get();//当前链表长度
            if (a.length < size)//参数数组a长度小于当前链表长度,则进行扩容
                a = (T[])java.lang.reflect.Array.newInstance
                    (a.getClass().getComponentType(), size);

            int k = 0;//迭代下标
            //迭代链表并且将item存入数组a
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = (T)p.item;
            //如果参数数组a长度长于链长度,则a下标的元素置空,但(k+1)往后的元素呢?又不置空?
            if (a.length > k)
                a[k] = null;
            return a;
        } finally {
            fullyUnlock();
        }
    }

toString

public String toString() {
        fullyLock();//全局锁
        try {
            Node<E> p = head.next;//获取首元素,为空则直接返回[]
            if (p == null)
                return "[]";

            StringBuilder sb = new StringBuilder();
            sb.append('[');
            for (;;) {
                E e = p.item;//获取元素,然后组装,如果item为当前链表则返回this Collection
                sb.append(e == this ? "(this Collection)" : e);
                p = p.next;//为空则到达链尾
                if (p == null)
                    return sb.append(']').toString();
                sb.append(',').append(' ');
            }
        } finally {
            fullyUnlock();
        }
    }

clear清空链

 public void clear() {
        fullyLock();//全局锁
        try {
            //1,取出实际头元素p,备份原head至h               4,将p定于为新head
            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
                h.next = h;//2,原head的next指向原head,造成循坏链,并item为空
                p.item = null;//3,实际头元素item置空,help gc
            }
            head = last;
            // assert head.item == null && head.next == null;
            if (count.getAndSet(0) == capacity)
                notFull.signal();
        } finally {
            fullyUnlock();
        }
    }

drainTo转换item至Collection

public int drainTo(Collection<? super E> c, int maxElements) {
        //集合不能为空并不能为当前元素
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;//是否未满,以通知来插入
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//请求tackLock
        try {
            //maxElements和count,取小的一方
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                //迭代链
                while (i < n) {
                    //取出节点head次元素
                    Node<E> p = h.next;
                    //添加至collections
                    c.add(p.item);
                    //然后置空
                    p.item = null;
                    //原head自关联
                    h.next = h;
                    //定义新head
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                //恢复链原有状态,即使c.add()抛异常
                if (i > 0) {//如果已经迭代过元素将迭代中的h作为新head
                    //assert h.item == null;
                    head = h;
                    //如果迭代完链的原长度的为capacity(链容量最大值)则表示此链没有满,通知putlock进行插入
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }

Itr迭代器

    private class Itr implements Iterator<E> {
        /*
         * Basic weakly-consistent iterator.  At all times hold the next
         * item to hand out so that if hasNext() reports true, we will
         * still have it to return even if lost race with a take etc.
         */

        private Node<E> current;
        private Node<E> lastRet;
        private E currentElement;
        //构造函数
        Itr() {
            fullyLock();
            try {//下个元素
                current = head.next;
                //下个元素item
                if (current != null)
                    currentElement = current.item;
            } finally {
                fullyUnlock();
            }
        }

        //下个月元素是否为空
        public boolean hasNext() {
            return current != null;
        }

        /**
         * Returns the next live successor of p, or null if no such.
         *
         * Unlike other traversal methods, iterators need to handle both:
         * - dequeued nodes (p.next == p)
         * - (possibly multiple) interior removed nodes (p.item == null)
         */
        private Node<E> nextNode(Node<E> p) {
            for (;;) {
                Node<E> s = p.next;
                //如果自连链,则证明链头(clear()和drainTo()回导致此情况)
                if (s == p)
                    return head.next;
                //否则返回p.next(s != null && s.item == null仅在链头时发生)
                if (s == null || s.item != null)
                    return s;
                p = s;
            }
        }

        public E next() {
            fullyLock();
            try {
                //hasNext已经判断了不为空
                if (current == null)
                    throw new NoSuchElementException();
                E x = currentElement;
                lastRet = current;
                current = nextNode(current);
                //current == null表示已经达到链尾
                currentElement = (current == null) ? null : current.item;
                return x;
            } finally {
                fullyUnlock();
            }
        }

        public void remove() {
            //hasNext已经判断了不为空
            if (lastRet == null)
                throw new IllegalStateException();
            fullyLock();
            try {
                Node<E> node = lastRet;
                lastRet = null;
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                     //迭代链,找到p然后拆链
                    if (p == node) {
                        unlink(p, trail);
                        break;
                    }
                }
            } finally {
                fullyUnlock();
            }
        }
    }

时间: 2024-11-22 20:38:19

LinkedBlockingQueue源码解读的相关文章

jQuery源码解读之removeAttr()方法分析

 这篇文章主要介绍了jQuery源码解读之removeAttr()方法分析,较为详细的分析了removeAttr方法的实现技巧,非常具有实用价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之removeAttr()方法.分享给大家供大家参考.具体分析如下: 扩展jQuery原型对象的方法: 代码如下: jQuery.fn.extend({ //name,传入要DOM元素要移除的属性名. removeAttr: function( name ) {   //使用jQue

jQuery源码解读之hasClass()方法分析

 这篇文章主要介绍了jQuery源码解读之hasClass()方法,以注释形式较为详细的分析了hasClass()方法的实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之hasClass()方法.分享给大家供大家参考.具体分析如下:   代码如下: jQuery.fn.extend({ hasClass: function( selector ) { //将要检查的类名selector赋值给className, l为选择器选择的当前要检查的j

jQuery源码解读之addClass()方法分析

 这篇文章主要介绍了jQuery源码解读之addClass()方法,注释形式较为详细的分析了addClass()方法的实现技巧与相关注意事项,具有一定参考借鉴价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass

jQuery源码解读之removeClass()方法分析

 这篇文章主要介绍了jQuery源码解读之removeClass()方法,以注释形式较为详细的分析了removeClass()方法的实现技巧与使用注意事项,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之removeClass()方法.分享给大家供大家参考.具体分析如下: removeClass()方法和addClass()差别不大.这就来看看: 代码如下: jQuery.fn.extend({ removeClass: function( value ) { var c

Apache OFbiz entity engine源码解读

简介 最近一直在看Apache OFbiz entity engine的源码.为了能够更透彻得理解,也因为之前没有看人别人写过分析它的文章,所以决定自己来写一篇. 首先,我提出一个问题,如果你有兴趣可以想一下它的答案: JDBC真的给数据访问提供了足够的抽象,以至于你可以在多个支持jdbc访问的数据库之间任意切换而完全不需要担心你的数据访问代码吗? 我曾经在微博上有过关于该问题的思考: 其实这个感慨正是来自于我之前在看的一篇关于jdbc的文章,里面提到了jdbc中的一些设计模式(工厂方法),提供

Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上.完整项目Github源码 负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来. 1.Apache Beam编程实战–前言,Apache B

基于Docker的TensorFlow机器学习框架搭建和实例源码解读

概述:基于Docker的TensorFlow机器学习框架搭建和实例源码解读,TensorFlow作为最火热的机器学习框架之一,Docker是的容器,可以很好的结合起来,为机器学习或者科研人员提供便捷的机器学习开发环境,探索人工智能的奥秘,容器随开随用方便快捷.源码解析TensorFlow容器创建和示例程序运行,为热爱机器学者降低学习难度. 默认机器已经装好了Docker(Docker安装和使用可以看我另一篇博文:Ubuntu16.04安装Docker1.12+开发实例+hello world+w

Spark jdbc postgresql数据库连接和写入操作源码解读

概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行.整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中.附带完整项目源码(完整项目源码github). 1.首先在postgreSQL中创建一张测试表,并插入数据.(完整项目源码Github) 1.1. 在postgreSQL中的postgres用户下,创建 products CREATE TABLE pr

spring-session源码解读-2

启用redis session spring通过EnableRedisHttpSession注解来启用redid session @Import(RedisHttpSessionConfiguration.class) @Configuration public @interface EnableRedisHttpSession { int maxInactiveIntervalInSeconds() default 1800; } 该注解有两个元注解,一个是Configuration, 一个是