/* * @param src the source array.源数组。 * @param srcPos starting position in the source array.要复制的起始位置,包含此位置的元素。 * @param dest the destination array.目标数组。 * @param destPos starting position in the destination data.目标数组放置复制元素的其实位置。 * @param length the number of array elements to be copied.要从源数组复制的元素数量。 * @exception IndexOutOfBoundsException if copying would cause * access of data outside array bounds. * @exception ArrayStoreException if an element in the <code>src</code> * array could not be stored into the <code>dest</code> array * because of a type mismatch. * @exception NullPointerException if either <code>src</code> or * <code>dest</code> is <code>null</code>. */ publicstaticnativevoidarraycopy(Object src, int srcPos, Object dest, int destPos, int length);
publicclassArrayBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable{ privatestaticfinallong serialVersionUID = -817911632652898426L; /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count;
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ privatefinal Condition notEmpty; /** Condition for waiting puts */ privatefinal Condition notFull; transient Itrs itrs = null;
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ publicArrayBlockingQueue(int capacity, boolean fair){ if (capacity <= 0) thrownew IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } }
publicclassLinkedBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable{ privatestaticfinallong serialVersionUID = -6903933977591709194L; /** * Linked list node class */ staticclassNode<E> { E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;
Node(E x) { item = x; } }
/** The capacity bound, or Integer.MAX_VALUE if none */ privatefinalint capacity;
/** Current number of elements */ privatefinal AtomicInteger count = new AtomicInteger();
/** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head;
/* * The implementation uses an array-based binary heap, with public * operations protected with a single lock. However, allocation * during resizing uses a simple spinlock (used only while not * holding main lock) in order to allow takes to operate * concurrently with allocation. This avoids repeated * postponement of waiting consumers and consequent element * build-up. The need to back away from lock during allocation * makes it impossible to simply wrap delegated * java.util.PriorityQueue operations within a lock, as was done * in a previous version of this class. To maintain * interoperability, a plain PriorityQueue is still used during * serialization, which maintains compatibility at the expense of * transiently doubling overhead. */
/** * The maximum size of array to allocate. * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in * OutOfMemoryError: Requested array size exceeds VM limit */ privatestaticfinalint MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. */ privatetransient Object[] queue;
/** * The number of elements in the priority queue. */ privatetransientint size;
/** * The comparator, or null if priority queue uses elements' * natural ordering. */ privatetransient Comparator<? super E> comparator;
/** * Lock used for all public operations */ privatefinal ReentrantLock lock;
/** * Condition for blocking when empty */ privatefinal Condition notEmpty;
/** * Spinlock for allocation, acquired via CAS. */ privatetransientvolatileint allocationSpinLock;
/** * A plain PriorityQueue used only for serialization, * to maintain compatibility with previous versions * of this class. Non-null only during serialization/deserialization. */ private PriorityQueue<E> q; //... }
E transfer(E e, boolean timed, long nanos){ /* Basic algorithm is to loop trying to take either of * two actions: * * 1. 如果队列是空的或新入队的线程与等待线程队列的操作相同(都是put线程或者都是 * take线程),则尝试将此线程加入到等待队列中,等待有相反操作的线程到来或取消等待(被中断),操作成功会返回node的item值。 * * 2. 如果队列不为空,并且入队的线程为等待线程队列的相反操作线程,尝试执行CAS操作,并将第* 一个入队的线程弹出,并返回put线程的item值。 * * 在上面的操作中,每次操作成功都会尝试变更队列的head和tail的位置。 * * 在循环一开始,检查head或tail为null,如果为null则重置循环. */ QNode s = null; // constructed/reused as needed boolean isData = (e != null);
for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait returnnull; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue;
advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); returnnull; }
if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e;
} else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read
Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; }
staticfinalclassQNode{ volatile QNode next; // next node in queue volatile Object item; // put的值,CAS'ed to or from null volatile Thread waiter; // 调用put或take的线程,to control park/unpark finalboolean isData; //是否有值,有为put操作,没有为take操作,区分互补的2种操作线程
/** * Tries to cancel by CAS'ing ref to this as item. */ voidtryCancel(Object cmp){ UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); }
booleanisCancelled(){ return item == this; }
/** * Returns true if this node is known to be off the queue * because its next pointer has been forgotten due to * an advanceHead operation. */ booleanisOffList(){ return next == this; }
E transfer(E e, boolean timed, long nanos){ /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. */
SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA;
for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else returnnull; } elseif (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); returnnull; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } elseif (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else// lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else// lost match h.casNext(m, mn); // help unlink } } } }
staticfinalclassSNode{ volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations.
/** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * * @param s the node to match * @return true if successfully matched to s */ booleantryMatch(SNode s){ if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } returntrue; } return match == s; }
/** * Tries to cancel a wait by matching node to itself. */ voidtryCancel(){ UNSAFE.compareAndSwapObject(this, matchOffset, null, this); }
/** * Implements all queuing methods. See above for explanation. * * @param e the item or null for take * @param haveData true if this is a put, else a take * @param how NOW, ASYNC, SYNC, or TIMED * @param nanos timeout in nanosecs, used only if mode is TIMED * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos){ if (haveData && (e == null))//操作类型和数据是否匹配,匹配才往下执行 thrownew NullPointerException(); Node s = null; // the node to append, if needed
retry: for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; if (item != p && (item != null) == isData) { // unmatched if (isData == haveData) // 操作类型相同,中断互补查找 break; if (p.casItem(item, e)) { // 操作类型不同,使用cas操作尝试将找到的节点操作状态变更,如果成功则继续 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } //进行目标节点后移和头节点修正 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
if (how != NOW) { //没有找到互补节点的操作,将调用线程和数据生成新node入队 if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
/** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ privatefinal Condition available = lock.newCondition(); }
publicvoidaddFirst(E e){ if (e == null) thrownew NullPointerException(); //head从高位开始,每次从头入队 elements[head = (head - 1) & (elements.length - 1)] = e; if (head == tail) doubleCapacity(); }
addLast==offerLast==add
1 2 3 4 5 6 7 8 9
publicvoidaddLast(E e){ if (e == null) thrownew NullPointerException(); elements[tail] = e; //每次tail存放后会检查下次入队是否会使head==tail,如果会就扩容,也就是说每次从尾部入队至少会保证有一个位置是空的,使从头部入队不会覆盖元素 //并且更新tail=tail+1,即真实的tail在数组中的位置为tail-1,tail处并没有元素 if ( (tail = (tail + 1) & (elements.length - 1)) == head) doubleCapacity(); }
扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatevoiddoubleCapacity(){ assert head == tail; int p = head; int n = elements.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) thrownew IllegalStateException("Sorry, deque too big"); Object[] a = new Object[newCapacity]; //将原来数组从head到数组结束的元素拷贝到新数组 System.arraycopy(elements, p, a, 0, r); //将原来数组从tail倒数到下标0处的元素拷贝到新数组 System.arraycopy(elements, 0, a, r, p); //原来的队列是分离的,首尾在物理上并不保持由首到尾的关系,copy后,在新的数组靠前的位置形成物理上由首到尾关系的队列,相当于原来tail所领导的元素划拨给了head领导 elements = a; head = 0;//下次head位置会变到新数组的末尾 tail = n;//修正尾部 }
public E pollFirst(){ int h = head; @SuppressWarnings("unchecked") E result = (E) elements[h]; // Element is null if deque empty if (result == null) returnnull; elements[h] = null; // Must null out slot head = (h + 1) & (elements.length - 1); return result; }
pollLast==removeLast
1 2 3 4 5 6 7 8 9 10
public E pollLast(){ int t = (tail - 1) & (elements.length - 1); @SuppressWarnings("unchecked") E result = (E) elements[t]; if (result == null) returnnull; elements[t] = null; tail = t; return result; }
Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR }
/** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext or casPrev. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); }
booleancasItem(E cmp, E val){ return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }