Node LinkedBlockingQueue链表节点,单向节点
//Node节点类,单向节点
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
继任节点
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
next为空意味当前节点为链尾
*/
Node<E> next;
Node(E x) { item = x; }
}
构造方法
//默认无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
LinkedBlockingQueue指定容量,链头和链尾都非空对象但item为空
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
全局变量
/** The capacity bound, or Integer.MAX_VALUE if none */
//链表容量,默认Integer.MAX_VALUE,即无界队列
private final int capacity;
/** Current number of elements */
//当前队列元素总量
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list. 链头
* Invariant: head.item == null //不变形:链头元素永为空
*/
transient Node<E> head;
/**
* Tail of linked list. 链尾
* Invariant: last.next == null //不变形:链尾元素后再无元素
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
//拿锁,在 take, poll等方法时会请求
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//队列非空条件,以便通知队列进行取元素
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//插入锁,在 put, offer等方法时会请求
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//队列非空条件,以便同意队列进行插入元素
private final Condition notFull = putLock.newCondition();
signalNotEmpty链表非空,然后signal(通知) takeLock进行获取元素
//仅在put/offer后调用
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//强制拿锁
try {
notEmpty.signal();//触发signal
} finally {
takeLock.unlock();
}
}
signalNotFull链表非满,然后signal(通知) putLock进行插入元素
//仅在take/poll后调用
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
enqueue插入元素
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
//putLock必须获取当前锁
// assert last.next == null;
//队列尾必须为空
//新node指向原队列尾元素的next(链下个对象),然后再指向last(链尾)
last = last.next = node;
}
dequeue取元素
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
//当前线程必须持有takeLock
// assert head.item == null;
//当前链头元素必须为空
//备份链头
Node<E> h = head;
//备份链次元素(实际要取出的元素,定义为first)
Node<E> first = h.next;
h.next = h; // help GC //原链头h已经无作用
head = first;//first指向head,first成为新链头
E x = first.item;//取出目标元素
first.item = null;//置空(LinkBlockingQueue规范)
return x;
}
fullyLock&fullyUnlock全局拿锁和放锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
Collection插入到队列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);//默认无界队列
//获取pulLock,构造函数中pulLock从无竞争,但需要保证可见性
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
//不支持空对象
if (e == null)
throw new NullPointerException();
//不允许超过限度
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);//原子更新
} finally {
putLock.unlock();
}
}
put放入对象
public void put(E e) throws InterruptedException {
//不支持空对象
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
//预创建操作标记(-1即无操作)
int c = -1;
//创建节点
Node<E> node = new Node<E>(e);
//备份(副本)putLock和当前总量
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//请求putLock锁(可打断)
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//容量已满则等待
while (count.get() == capacity) {
notFull.await();
}
//执行过插入至链尾
enqueue(node);
//原子自增一位,返回旧值
c = count.getAndIncrement();
//链表还没有满,通知其他线程执行插入
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}
offer带时间限制的插入,返回操作结果
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//严禁非空对象
if (e == null) throw new NullPointerException();
//获取等待时间
long nanos = unit.toNanos(timeout);
//预创建操作标记(-1即无操作)
int c = -1;
//获取putLock锁和当前链表容量
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可打断的请求锁
putLock.lockInterruptibly();
try {
//如果已满载,并且
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//等待nanos(毫秒)秒,期间收到signal则返回(nanos-等待时间),否则在等待结束后返回0或负数
//可打断并返回InterruptedException
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));//插入元素
c = count.getAndIncrement();//获取最新容量高
if (c + 1 < capacity)//未满载则通知其他线程进行put/offer
notFull.signal();
} finally {
putLock.unlock();
}
//存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
if (c == 0)//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
signalNotEmpty();
return true;
}
offer尝试插入(一旦尝试插入则一直等待直至成功)
public boolean offer(E e) {
//元素不能为空
if (e == null) throw new NullPointerException();
//当前链表容量
final AtomicInteger count = this.count;
//满的话则返回false
if (count.get() == capacity)
return false;
//预创建操作标记(-1即无操作)
int c = -1;
Node<E> node = new Node<E>(e);
//获取putlock
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//未满载
if (count.get() < capacity) {
enqueue(node);//插入元素
c = count.getAndIncrement();
//队列未满则继续通知插入
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)//存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费
signalNotEmpty();//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
return c >= 0;//链表有元素则表示插入成功
}
take取元素(可打断)
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果当前链表空,则等待
while (count.get() == 0) {
notEmpty.await();
}
//取出首元素(first) E
x = dequeue();
c = count.getAndDecrement();//链表容量(原子)减一,并返回旧值
//链表非空则继续通知其他线程来取
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//放锁
}
//如果链满则通知其他putLock等待的线程进行取元素
//意思是,takeLock和putLock同时进行时,putLock一直在放元素,true表示有一条线程在等待插入元素
if (c == capacity)
signalNotFull();
return x;
}
poll取元素,有时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;//定义待取得元素
int c = -1;//操作标记
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可打断请求
try {
//如果链表无元素,则执行等待,直至耗时完毕
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();//取元素
c = count.getAndDecrement();//获取原来链表长度然后长度减一
if (c > 1)//如果链长度大于1,证明还有链还有元素,通知其他等待的takeLock执行取元素
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)//原链长度达至最大长度,即在取完元素后还可以放一个元素,所以执行通知putLock进行放元素
signalNotFull();
return x;
}
poll取元素
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)//当前链无元素,直接返回
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//获取锁,否则一直等待
try {
if (count.get() > 0) {//获取锁成功,并链有元素
x = dequeue();//取元素
c = count.getAndDecrement();//获取原链表长度然后实际长度减一
if (c > 1)//原链表长度还有元素则通知继续进行通知其他线程取元素
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//原链表长度达至满,则表示刚取完还可以放一个元素,所以执行通知
if (c == capacity)
signalNotFull();
return x;
}
peek取元素,但不拆链
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
unlink 拆链,将trail的下个元素p从链中拆除
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();//必须获takeLock和putLock,合称fullyLock();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;//p元素item置null
trail.next = p.next;//trail和(p.next)建立链关系
//如果原p就是尾元素,则置trail为尾元素
if (last == p)
last = trail;
//获取原链长度并减一,原链长度等于限额则表示链未满,则通知进行插入
if (count.getAndDecrement() == capacity)
notFull.signal();
}
remove拆除item所在的链
public boolean remove(Object o) {
//LinkedBlockingQueue允许null的item,除了head和last
if (o == null) return false;
fullyLock();//全局锁
try {
//迭代链表,发现首元素则拆除链
//
for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
contains o是否存在链表中
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
//p为空则表示到达链尾,否则原本就是空链
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
toArray迭代元素返回数组
public Object[] toArray() {
fullyLock();//获取全局锁
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
toArray迭代元素,然后插入数组a
public <T> T[] toArray(T[] a) {
fullyLock();//获取全锁
try {
int size = count.get();//当前链表长度
if (a.length < size)//参数数组a长度小于当前链表长度,则进行扩容
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;//迭代下标
//迭代链表并且将item存入数组a
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
//如果参数数组a长度长于链长度,则a下标的元素置空,但(k+1)往后的元素呢?又不置空?
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}
toString
public String toString() {
fullyLock();//全局锁
try {
Node<E> p = head.next;//获取首元素,为空则直接返回[]
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;//获取元素,然后组装,如果item为当前链表则返回this Collection
sb.append(e == this ? "(this Collection)" : e);
p = p.next;//为空则到达链尾
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
}
clear清空链
public void clear() {
fullyLock();//全局锁
try {
//1,取出实际头元素p,备份原head至h 4,将p定于为新head
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;//2,原head的next指向原head,造成循坏链,并item为空
p.item = null;//3,实际头元素item置空,help gc
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
drainTo转换item至Collection
public int drainTo(Collection<? super E> c, int maxElements) {
//集合不能为空并不能为当前元素
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;//是否未满,以通知来插入
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//请求tackLock
try {
//maxElements和count,取小的一方
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
//迭代链
while (i < n) {
//取出节点head次元素
Node<E> p = h.next;
//添加至collections
c.add(p.item);
//然后置空
p.item = null;
//原head自关联
h.next = h;
//定义新head
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
//恢复链原有状态,即使c.add()抛异常
if (i > 0) {//如果已经迭代过元素将迭代中的h作为新head
//assert h.item == null;
head = h;
//如果迭代完链的原长度的为capacity(链容量最大值)则表示此链没有满,通知putlock进行插入
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
Itr迭代器
private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
//构造函数
Itr() {
fullyLock();
try {//下个元素
current = head.next;
//下个元素item
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
//下个月元素是否为空
public boolean hasNext() {
return current != null;
}
/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
//如果自连链,则证明链头(clear()和drainTo()回导致此情况)
if (s == p)
return head.next;
//否则返回p.next(s != null && s.item == null仅在链头时发生)
if (s == null || s.item != null)
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
//hasNext已经判断了不为空
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
//current == null表示已经达到链尾
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
//hasNext已经判断了不为空
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//迭代链,找到p然后拆链
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
时间: 2024-11-22 20:38:19