全局变量
/** The queued items */
//存放元素的数组
final Object[] items;
/** items index for next take, poll, peek or remove */
//下次拿元素的下标 take, poll, peek or remove中使用
int takeIndex;
/** items index for next put, offer, or add */
//下次放元素的下标 put, offer, or add中使用
int putIndex;
/** Number of elements in the queue */
//队列元素总数
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
//队列锁
final ReentrantLock lock;
/** Condition for waiting takes */
//非空条件
private final Condition notEmpty;
/** Condition for waiting puts */
//非满条件
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
//迭代器
transient Itrs itrs = null
构造函数
//初始化容量限额和默认非公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
构造函数
//初始化容量限额和默认非公平锁,和将Collection中插入ArrayBlockingQueue中
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);//元素非空,否则NullPointerException
items[i++] = e;
}
//如果Collection.size() > capacity会报下标越界
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
dec循环减一
//如果i是0则当前数组长度减1,否则i减1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
itemAt返回指定下标元素
final E itemAt(int i) {
return (E) items[i];
}
enqueue插入元素
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
//当前线程只获取一次锁
// assert items[putIndex] == null;
//下标takeIndex所在的元素为空
final Object[] items = this.items;
//插入元素
items[putIndex] = x;
//如果数组已经满了,则下次插入到数组首位
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
dequeue取出元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
//当前线程只获取一次锁
// assert items[takeIndex] != null;
//下标takeIndex所在的元素不为空
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//取出元素
E x = (E) items[takeIndex];
//置空
items[takeIndex] = null;
//当前数组取元素已经至数组尾,则下次从头再取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//迭代器相关,后面补
if (itrs != null)
itrs.elementDequeued();
//通知notFull await的线程进行取元素
notFull.signal();
return x;
}
removeAt移除指定下标元素
/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
//当前线程仅持锁1次,即并无重入
// assert items[removeIndex] != null;
//下标removeIndex的元素不能为空
// assert removeIndex >= 0 && removeIndex < items.length;
//下标removeIndex不能越界
//获取当前列表
final Object[] items = this.items;
//待移除下标等于待取元素下标
if (removeIndex == takeIndex) {
// removing front item; just advance
//直接置空
items[takeIndex] = null;
//当前数组取元素已经至数组尾,则下次从头再取
if (++takeIndex == items.length)
takeIndex = 0;
count--;//数组长度减一
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
//否则,这是个内部移除
// slide over all others up through putIndex.
//通过putIndex偏移所有元素
final int putIndex = this.putIndex;
//开始迭代
for (int i = removeIndex;;) {
int next = i + 1;
//如果迭代至尾,则从头再来
if (next == items.length)
next = 0;
//下个操作下标不等于待插入下标
if (next != putIndex) {
//则next元素向左移一位
items[i] = items[next];
//备份最新removeIndex为i
i = next;
} else {
////下个操作下标等于待插入下标,即i(removeIndex)置空
items[i] = null;
this.putIndex = i;//下次插入元素下标为i(removeIndex),跳出循环
break;
}
}
count--;//数组长度减一
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
add直接添加元素,失败则抛异常高
//
public boolean add(E e) {
return super.add(e);
//super.add(e);如下,如果插入失败则抛异常提示数组已满
/*
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
*/
}
offer添加元素(直接返回结果)
//
public boolean offer(E e) {
checkNotNull(e);//元素必须非空
final ReentrantLock lock = this.lock;
lock.lock();
try {
//已满则直接返回false
if (count == items.length)
return false;
else {
//否则执行插入
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
put添加元素(可中断)
/**
public void put(E e) throws InterruptedException {
checkNotNull(e);//插入元素非空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//已满则等待并释放锁
while (count == items.length)
notFull.await();
enqueue(e);//插入元素
} finally {
lock.unlock();
}
}
offer添加元素(带时间限制,可中断)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);//必须非空
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果容量已满则等待
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
offer添加元素(带时间限制,可中断)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);//必须非空
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//等待获取锁,可中断
try {
//如果容量已满则等待
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入
return true;
} finally {
lock.unlock();
}
}
poll取元素(会一直等待)
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
take取元素(会一直等待,可中断,无元素则一直等待)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
poll取元素(取锁会一直等待(可中断),取锁后无元素会限时等待)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
peek取元素(取锁会一直等待,如果队列为空则返回null)
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
获取size(取锁会一直等待)
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
remainingCapacity获取剩余容量(取锁会一直等待)
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
remove移除元素(锁一直等待)
public boolean remove(Object o) {
//null元素直接返回
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//锁一直等待
try {
if (count > 0) {
//获取放下标putIndex
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//如果获取下标是此元素则直接移除并返回结果
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//否则i加1,然后判断是否达至队列尾,是则重头再来
if (++i == items.length)
i = 0;
} while (i != putIndex);//迭代条件为取下标takeIndex不等于putIndex
}
return false;
} finally {
lock.unlock();
}
}
contains是否包含此元素
public boolean contains(Object o) {
//非空约束
if (o == null) return false;
//备份数组(为什么不是先等到lock,在再备份?)
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//从取下标开始迭代
if (o.equals(items[i]))
return true;
//达到数组尾则从头再来
if (++i == items.length)
i = 0;
//取下标等于放下标时,表示数组已经迭代完成
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
toArray,ArrayBlockingQueue转数组
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
//ArrayBlockingQueue表示取下标在数组中间
/*null null null 3 4 5 null null null
*null 1 2 3 4
*null null 2 3 4 5
*/
if (count <= n)
//从takeIndex拷贝count位数然后复制到a数组中(a是从头开始复制至count)
System.arraycopy(items, takeIndex, a, 0, count);
else {
//表示取下标为0
//0 null null null 4 5
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}
toArray,ArrayBlockingQueue转数组
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
//参数数组a长度比ArrayBlockingQueue断则扩容
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
//ArrayBlockingQueue表示取下标在数组中间
/*null null null 3 4 5 null null null
*null 1 2 3 4
*null null 2 3 4 5
*/
if (count <= n)
//从takeIndex拷贝count位数然后复制到a数组中(a是从头开始复制至count)
System.arraycopy(items, takeIndex, a, 0, count);
else {
//表示取下标为0
//0 null null null 4 5
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count)
//参数数组a下标null置空,然后count+1以后的就不管了?
a[count] = null;
} finally {
lock.unlock();
}
return a;
}
toString
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//空数组返回[]
int k = count;
if (k == 0)
return "[]";
final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
//迭代结束,直接返回
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
//到数组尾则从头再来
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
clear清空ArrayBlockingQueue
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//ArrayBlockingQueue非空
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//从取下标开始迭代
do {
items[i] = null;
//达到数组尾则从来再来
if (++i == items.length)
i = 0;
//putIndex == takeIndex 表示数组已经遍历完
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
//ArrayBlockingQueue非空并且有待插入的线程等待中,则通知
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
ArrayBlockingQueue元素迁移至Collection
public int drainTo(Collection<? super E> c, int maxElements){
//Collection非空,且不是本身
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
//maxElements再多迁移对象不能小于0
if (maxElements <= 0)
return 0;
//
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//maxElements,count两者取小
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
//从takeIndex开始取元素
E x = (E) items[take];
c.add(x);
//添加完以后置空
items[take] = null;
//取下标自增一位,若果已达至数组尾则从头再取
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
//i>0证明有取过元素
if (i > 0) {
//从新统计ArrayBlockingQueue总数
count -= i;
//更新取下标
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
//ArrayBlockingQueue非空并且有待插入的线程等待中,则通知
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
时间: 2024-09-24 00:49:03