OpenJDK / lambda / lambda / jdk
changeset 5862:d1a94d8f7b46 it2-bootstrap
Merge
author | mduigou |
---|---|
date | Wed, 22 Aug 2012 18:22:03 -0700 |
parents | 32604983b70c 6b67e68ac62e |
children | 78975c11b889 |
files | |
diffstat | 9 files changed, 2838 insertions(+), 2314 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/concurrent/CountedCompleter.java Wed Aug 22 18:22:03 2012 -0700 @@ -0,0 +1,480 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; + +/** + * A resultless {@link ForkJoinTask} with a completion action + * performed when triggered and there are no remaining pending + * actions. Uses of CountedCompleter are similar to those of other + * completion based components (such as {@link + * java.nio.channels.CompletionHandler}) except that multiple + * <em>pending</em> completions may be necessary to trigger the {@link + * #onCompletion} action, not just one. Unless initialized otherwise, + * the {@link #getPendingCount pending count} starts at zero, but may + * be (atomically) changed using methods {@link #setPendingCount}, + * {@link #addToPendingCount}, and {@link + * #compareAndSetPendingCount}. Upon invocation of {@link + * #tryComplete}, if the pending action count is nonzero, it is + * decremented; otherwise, the completion action is performed, and if + * this completer itself has a completer, the process is continued + * with its completer. As is the case with related synchronization + * components such as {@link Phaser} and {@link + * java.util.concurrent.Semaphore} these methods affect only internal + * counts; they do not establish any further internal bookkeeping. In + * particular, the identities of pending tasks are not maintained. As + * illustrated below, you can create subclasses that do record some or + * all pended tasks or their results when needed. + * + * <p>A concrete CountedCompleter class must define method {@link + * #compute}, that should, in almost all use cases, invoke {@code + * tryComplete()} once before returning. The class may also optionally + * override method {@link #onCompletion} to perform an action upon + * normal completion, and method {@link #onExceptionalCompletion} to + * perform an action upon any exception. + * + * <p>A CountedCompleter that does not itself have a completer (i.e., + * one for which {@link #getCompleter} returns {@code null}) can be + * used as a regular ForkJoinTask with this added functionality. + * However, any completer that in turn has another completer serves + * only as an internal helper for other computations, so its own task + * status (as reported in methods such as {@link ForkJoinTask#isDone}) + * is arbitrary; this status changes only upon explicit invocations of + * {@link #complete}, {@link ForkJoinTask#cancel}, {@link + * ForkJoinTask#completeExceptionally} or upon exceptional completion + * of method {@code compute}. Upon any exceptional completion, the + * exception may be relayed to a task's completer (and its completer, + * and so on), if one exists and it has not otherwise already + * completed. + * + * <p><b>Sample Usages.</b> + * + * <p><b>Parallel recursive decomposition.</b> CountedCompleters may + * be arranged in trees similar to those often used with {@link + * RecursiveAction}s, although the constructions involved in setting + * them up typically vary. Even though they entail a bit more + * bookkeeping, CountedCompleters may be better choices when applying + * a possibly time-consuming operation (that cannot be further + * subdivided) to each element of an array or collection; especially + * when the operation takes a significantly different amount of time + * to complete for some elements than others, either because of + * intrinsic variation (for example IO) or auxiliary effects such as + * garbage collection. Because CountedCompleters provide their own + * continuations, other threads need not block waiting to perform + * them. + * + * <p> For example, here is an initial version of a class that uses + * divide-by-two recursive decomposition to divide work into single + * pieces (leaf tasks). Even when work is split into individual calls, + * tree-based techniques are usually preferable to directly forking + * leaf tasks, because they reduce inter-thread communication and + * improve load balancing. In the recursive case, the second of each + * pair of subtasks to finish triggers completion of its parent + * (because no result combination is performed, the default no-op + * implementation of method {@code onCompletion} is not overridden). A + * static utility method sets up the base task and invokes it: + * + * <pre> {@code + * class MyOperation<E> { void apply(E e) { ... } } + * + * class ForEach<E> extends CountedCompleter { + * + * public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) { + * pool.invoke(new ForEach<E>(null, array, op, 0, array.length)); + * } + * + * final E[] array; final MyOperation<E> op; final int lo, hi; + * ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) { + * super(p); + * this.array = array; this.op = op; this.lo = lo; this.hi = hi; + * } + * + * public void compute() { // version 1 + * if (hi - lo >= 2) { + * int mid = (lo + hi) >>> 1; + * setPendingCount(2); // must set pending count before fork + * new ForEach(this, array, op, mid, hi).fork(); // right child + * new ForEach(this, array, op, lo, mid).fork(); // left child + * } + * else if (hi > lo) + * op.apply(array[lo]); + * tryComplete(); + * } + * } }</pre> + * + * This design can be improved by noticing that in the recursive case, + * the task has nothing to do after forking its right task, so can + * directly invoke its left task before returning. (This is an analog + * of tail recursion removal.) Also, because the task returns upon + * executing its left task (rather than falling through to invoke + * tryComplete) the pending count is set to one: + * + * <pre> {@code + * class ForEach<E> ... + * public void compute() { // version 2 + * if (hi - lo >= 2) { + * int mid = (lo + hi) >>> 1; + * setPendingCount(1); // only one pending + * new ForEach(this, array, op, mid, hi).fork(); // right child + * new ForEach(this, array, op, lo, mid).compute(); // direct invoke + * } + * else { + * if (hi > lo) + * op.apply(array[lo]); + * tryComplete(); + * } + * } + * }</pre> + * + * As a further improvement, notice that the left task need not even + * exist. Instead of creating a new one, we can iterate using the + * original task, and add a pending count for each fork: + * + * <pre> {@code + * class ForEach<E> ... + * public void compute() { // version 3 + * int l = lo, h = hi; + * while (h - l >= 2) { + * int mid = (l + h) >>> 1; + * addToPendingCount(1); + * new ForEach(this, array, op, mid, h).fork(); // right child + * h = mid; + * } + * if (h > l) + * op.apply(array[l]); + * tryComplete(); + * } + * }</pre> + * + * Additional improvements of such classes might entail precomputing + * pending counts so that they can be established in constructors, + * specializing classes for leaf steps, subdividing by say, four, + * instead of two per iteration, and using an adaptive threshold + * instead of always subdividing down to single elements. + * + * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine + * results of multiple subtasks usually need to access these results + * in method {@link #onCompletion}. As illustrated in the following + * class (that performs a simplified form of map-reduce where mappings + * and reductions are all of type {@code E}), one way to do this in + * divide and conquer designs is to have each subtask record its + * sibling, so that it can be accessed in method {@code onCompletion}. + * For clarity, this class uses explicit left and right subtasks, but + * variants of other streamlinings seen in the above example may also + * apply. + * + * <pre> {@code + * class MyMapper<E> { E apply(E v) { ... } } + * class MyReducer<E> { E apply(E x, E y) { ... } } + * class MapReducer<E> extends CountedCompleter { + * final E[] array; final MyMapper<E> mapper; + * final MyReducer<E> reducer; final int lo, hi; + * MapReducer sibling; + * E result; + * MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper, + * MyReducer<E> reducer, int lo, int hi) { + * super(p); + * this.array = array; this.mapper = mapper; + * this.reducer = reducer; this.lo = lo; this.hi = hi; + * } + * public void compute() { + * if (hi - lo >= 2) { + * int mid = (lo + hi) >>> 1; + * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); + * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); + * left.sibling = right; + * right.sibling = left; + * setPendingCount(1); // only right is pending + * right.fork(); + * left.compute(); // directly execute left + * } + * else { + * if (hi > lo) + * result = mapper.apply(array[lo]); + * tryComplete(); + * } + * } + * public void onCompletion(CountedCompleter caller) { + * if (caller != this) { + * MapReducer<E> child = (MapReducer<E>)caller; + * MapReducer<E> sib = child.sibling; + * if (sib == null || sib.result == null) + * result = child.result; + * else + * result = reducer.apply(child.result, sib.result); + * } + * } + * + * public static <E> E mapReduce(ForkJoinPool pool, E[] array, + * MyMapper<E> mapper, MyReducer<E> reducer) { + * MapReducer<E> mr = new MapReducer<E>(null, array, mapper, + * reducer, 0, array.length); + * pool.invoke(mr); + * return mr.result; + * } + * } }</pre> + * + * <p><b>Triggers.</b> Some CountedCompleters are themselves never + * forked, but instead serve as bits of plumbing in other designs; + * including those in which the completion of one of more async tasks + * triggers another async task. For example: + * + * <pre> {@code + * class HeaderBuilder extends CountedCompleter { ... } + * class BodyBuilder extends CountedCompleter { ... } + * class PacketSender extends CountedCompleter { + * PacketSender(...) { super(null, 1); ... } // trigger on second completion + * public void compute() { } // never called + * public void onCompletion(CountedCompleter caller) { sendPacket(); } + * } + * // sample use: + * PacketSender p = new PacketSender(); + * new HeaderBuilder(p, ...).fork(); + * new BodyBuilder(p, ...).fork(); + * }</pre> + * + * @since 1.8 + * @author Doug Lea + */ +public abstract class CountedCompleter extends ForkJoinTask<Void> { + private static final long serialVersionUID = 5232453752276485070L; + + /** This task's completer, or null if none */ + final CountedCompleter completer; + /** The number of pending tasks until completion */ + volatile int pending; + + /** + * Creates a new CountedCompleter with the given completer + * and initial pending count. + * + * @param completer this tasks completer, or {@code null} if none + * @param initialPendingCount the initial pending count + */ + protected CountedCompleter(CountedCompleter completer, + int initialPendingCount) { + this.completer = completer; + this.pending = initialPendingCount; + } + + /** + * Creates a new CountedCompleter with the given completer + * and an initial pending count of zero. + * + * @param completer this tasks completer, or {@code null} if none + */ + protected CountedCompleter(CountedCompleter completer) { + this.completer = completer; + } + + /** + * Creates a new CountedCompleter with no completer + * and an initial pending count of zero. + */ + protected CountedCompleter() { + this.completer = null; + } + + /** + * The main computation performed by this task. + */ + public abstract void compute(); + + /** + * Performs an action when method {@link #tryComplete} is invoked + * and there are no pending counts, or when the unconditional + * method {@link #complete} is invoked. By default, this method + * does nothing. + * + * @param caller the task invoking this method (which may + * be this task itself). + */ + public void onCompletion(CountedCompleter caller) { + } + + /** + * Performs an action when method {@link #completeExceptionally} + * is invoked or method {@link #compute} throws an exception, and + * this task has not otherwise already completed normally. On + * entry to this method, this task {@link + * ForkJoinTask#isCompletedAbnormally}. The return value of this + * method controls further propagation: If {@code true} and this + * task has a completer, then this completer is also completed + * exceptionally. The default implementation of this method does + * nothing except return {@code true}. + * + * @param ex the exception + * @param caller the task invoking this method (which may + * be this task itself). + * @return true if this exception should be propagated to this + * tasks completer, if one exists. + */ + public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) { + return true; + } + + /** + * Returns the completer established in this task's constructor, + * or {@code null} if none. + * + * @return the completer + */ + public final CountedCompleter getCompleter() { + return completer; + } + + /** + * Returns the current pending count. + * + * @return the current pending count + */ + public final int getPendingCount() { + return pending; + } + + /** + * Sets the pending count to the given value. + * + * @param count the count + */ + public final void setPendingCount(int count) { + pending = count; + } + + /** + * Adds (atomically) the given value to the pending count. + * + * @param delta the value to add + */ + public final void addToPendingCount(int delta) { + int c; // note: can replace with intrinsic in jdk8 + do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta)); + } + + /** + * Sets (atomically) the pending count to the given count only if + * it currently holds the given expected value. + * + * @param expected the expected value + * @param count the new value + * @return true is successful + */ + public final boolean compareAndSetPendingCount(int expected, int count) { + return U.compareAndSwapInt(this, PENDING, expected, count); + } + + /** + * If the pending count is nonzero, decrements the count; + * otherwise invokes {@link #onCompletion} and then similarly + * tries to complete this task's completer, if one exists, + * else marks this task as complete. + */ + public final void tryComplete() { + CountedCompleter a = this, s = a; + for (int c;;) { + if ((c = a.pending) == 0) { + a.onCompletion(s); + if ((a = (s = a).completer) == null) { + s.quietlyComplete(); + return; + } + } + else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) + return; + } + } + + /** + * Regardless of pending count, invokes {@link #onCompletion}, + * marks this task as complete with a {@code null} return value, + * and further triggers {@link #tryComplete} on this task's + * completer, if one exists. This method may be useful when + * forcing completion as soon as any one (versus all) of several + * subtask results are obtained. + * + * @param mustBeNull the {@code null} completion value + */ + public void complete(Void mustBeNull) { + CountedCompleter p; + onCompletion(this); + quietlyComplete(); + if ((p = completer) != null) + p.tryComplete(); + } + + /** + * Support for FJT exception propagation + */ + void internalPropagateException(Throwable ex) { + CountedCompleter a = this, s = a; + while (a.onExceptionalCompletion(ex, s) && + (a = (s = a).completer) != null && a.status >= 0) + a.recordExceptionalCompletion(ex); + } + + /** + * Implements execution conventions for CountedCompleters + */ + protected final boolean exec() { + compute(); + return false; + } + + /** + * Always returns {@code null}. + * + * @return {@code null} always + */ + public final Void getRawResult() { return null; } + + /** + * Requires null completion value. + */ + protected final void setRawResult(Void mustBeNull) { } + + + // Unsafe mechanics + private static final sun.misc.Unsafe U; + private static final long PENDING; + static { + try { + U = sun.misc.Unsafe.getUnsafe(); + PENDING = U.objectFieldOffset + (CountedCompleter.class.getDeclaredField("pending")); + } catch (Exception e) { + throw new Error(e); + } + } +}
--- a/src/share/classes/java/util/concurrent/ForkJoinPool.java Mon Aug 20 17:14:26 2012 -0700 +++ b/src/share/classes/java/util/concurrent/ForkJoinPool.java Wed Aug 22 18:22:03 2012 -0700 @@ -48,10 +48,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; /** @@ -63,12 +62,14 @@ * <p>A {@code ForkJoinPool} differs from other kinds of {@link * ExecutorService} mainly by virtue of employing * <em>work-stealing</em>: all threads in the pool attempt to find and - * execute subtasks created by other active tasks (eventually blocking - * waiting for work if none exist). This enables efficient processing - * when most tasks spawn other subtasks (as do most {@code - * ForkJoinTask}s). When setting <em>asyncMode</em> to true in - * constructors, {@code ForkJoinPool}s may also be appropriate for use - * with event-style tasks that are never joined. + * execute tasks submitted to the pool and/or created by other active + * tasks (eventually blocking waiting for work if none exist). This + * enables efficient processing when most tasks spawn other subtasks + * (as do most {@code ForkJoinTask}s), as well as when many small + * tasks are submitted to the pool from external clients. Especially + * when setting <em>asyncMode</em> to true in constructors, {@code + * ForkJoinPool}s may also be appropriate for use with event-style + * tasks that are never joined. * * <p>A {@code ForkJoinPool} is constructed with a given target * parallelism level; by default, equal to the number of available @@ -88,15 +89,16 @@ * convenient form for informal monitoring. * * <p> As is the case with other ExecutorServices, there are three - * main task execution methods summarized in the following - * table. These are designed to be used by clients not already engaged - * in fork/join computations in the current pool. The main forms of - * these methods accept instances of {@code ForkJoinTask}, but - * overloaded forms also allow mixed execution of plain {@code + * main task execution methods summarized in the following table. + * These are designed to be used primarily by clients not already + * engaged in fork/join computations in the current pool. The main + * forms of these methods accept instances of {@code ForkJoinTask}, + * but overloaded forms also allow mixed execution of plain {@code * Runnable}- or {@code Callable}- based activities as well. However, - * tasks that are already executing in a pool should normally - * <em>NOT</em> use these pool execution methods, but instead use the - * within-computation forms listed in the table. + * tasks that are already executing in a pool should normally instead + * use the within-computation forms listed in the table unless using + * async event-style tasks that are not usually joined, in which case + * there is little difference among choice of methods. * * <table BORDER CELLPADDING=3 CELLSPACING=1> * <tr> @@ -131,13 +133,12 @@ * daemon} mode, there is typically no need to explicitly {@link * #shutdown} such a pool upon program exit. * - * <pre> + * <pre> {@code * static final ForkJoinPool mainPool = new ForkJoinPool(); * ... * public void sort(long[] array) { * mainPool.invoke(new SortTask(array, 0, array.length)); - * } - * </pre> + * }}</pre> * * <p><b>Implementation notes</b>: This implementation restricts the * maximum number of running threads to 32767. Attempts to create @@ -156,108 +157,196 @@ /* * Implementation Overview * - * This class provides the central bookkeeping and control for a - * set of worker threads: Submissions from non-FJ threads enter - * into a submission queue. Workers take these tasks and typically - * split them into subtasks that may be stolen by other workers. - * Preference rules give first priority to processing tasks from - * their own queues (LIFO or FIFO, depending on mode), then to - * randomized FIFO steals of tasks in other worker queues, and - * lastly to new submissions. + * This class and its nested classes provide the main + * functionality and control for a set of worker threads: + * Submissions from non-FJ threads enter into submission queues. + * Workers take these tasks and typically split them into subtasks + * that may be stolen by other workers. Preference rules give + * first priority to processing tasks from their own queues (LIFO + * or FIFO, depending on mode), then to randomized FIFO steals of + * tasks in other queues. + * + * WorkQueues + * ========== + * + * Most operations occur within work-stealing queues (in nested + * class WorkQueue). These are special forms of Deques that + * support only three of the four possible end-operations -- push, + * pop, and poll (aka steal), under the further constraints that + * push and pop are called only from the owning thread (or, as + * extended here, under a lock), while poll may be called from + * other threads. (If you are unfamiliar with them, you probably + * want to read Herlihy and Shavit's book "The Art of + * Multiprocessor programming", chapter 16 describing these in + * more detail before proceeding.) The main work-stealing queue + * design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from GC requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs poll (steal) from being on the indices + * ("base" and "top") to the slots themselves. So, both a + * successful pop and poll mainly entail a CAS of a slot from + * non-null to null. Because we rely on CASes of references, we + * do not need tag bits on base or top. They are simple ints as + * used in any circular array-based queue (see for example + * ArrayDeque). Updates to the indices must still be ordered in a + * way that guarantees that top == base means the queue is empty, + * but otherwise may err on the side of possibly making the queue + * appear nonempty when a push, pop, or poll have not fully + * committed. Note that this means that the poll operation, + * considered individually, is not wait-free. One thief cannot + * successfully continue until another in-progress one (or, if + * previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different + * random victim target to try next. So, in order for one thief to + * progress, it suffices for any in-progress poll or new push on + * any empty queue to complete. (This is why we normally use + * method pollAt and its variants that try once at the apparent + * base index, else consider alternative actions, rather than + * method poll.) + * + * This approach also enables support of a user mode in which local + * task processing is in FIFO, not LIFO order, simply by using + * poll rather than pop. This can be useful in message-passing + * frameworks in which tasks are never joined. However neither + * mode considers affinities, loads, cache localities, etc, so + * rarely provide the best possible performance on a given + * machine, but portably provide good throughput by averaging over + * these factors. (Further, even if we did try to use such + * information, we do not usually have a basis for exploiting it. + * For example, some sets of tasks profit from cache affinities, + * but others are harmed by cache pollution effects.) + * + * WorkQueues are also used in a similar way for tasks submitted + * to the pool. We cannot mix these tasks in the same queues used + * for work-stealing (this would contaminate lifo/fifo + * processing). Instead, we loosely associate submission queues + * with submitting threads, using a form of hashing. The + * ThreadLocal Submitter class contains a value initially used as + * a hash code for choosing existing queues, but may be randomly + * repositioned upon contention with other submitters. In + * essence, submitters act like workers except that they never + * take tasks, and they are multiplexed on to a finite number of + * shared work queues. However, classes are set up so that future + * extensions could allow submitters to optionally help perform + * tasks as well. Insertion of tasks in shared mode requires a + * lock (mainly to protect in the case of resizing) but we use + * only a simple spinlock (using bits in field runState), because + * submitters encountering a busy queue move on to try or create + * other queues -- they block only when creating and registering + * new queues. + * + * Management + * ========== * * The main throughput advantages of work-stealing stem from * decentralized control -- workers mostly take tasks from * themselves or each other. We cannot negate this in the * implementation of other management responsibilities. The main * tactic for avoiding bottlenecks is packing nearly all - * essentially atomic control state into a single 64bit volatile - * variable ("ctl"). This variable is read on the order of 10-100 - * times as often as it is modified (always via CAS). (There is - * some additional control state, for example variable "shutdown" - * for which we can cope with uncoordinated updates.) This - * streamlines synchronization and control at the expense of messy - * constructions needed to repack status bits upon updates. - * Updates tend not to contend with each other except during - * bursts while submitted tasks begin or end. In some cases when - * they do contend, threads can instead do something else - * (usually, scan for tasks) until contention subsides. + * essentially atomic control state into two volatile variables + * that are by far most often read (not written) as status and + * consistency checks. + * + * Field "ctl" contains 64 bits holding all the information needed + * to atomically decide to add, inactivate, enqueue (on an event + * queue), dequeue, and/or re-activate workers. To enable this + * packing, we restrict maximum parallelism to (1<<15)-1 (which is + * far in excess of normal operating range) to allow ids, counts, + * and their negations (used for thresholding) to fit into 16bit + * fields. * - * To enable packing, we restrict maximum parallelism to (1<<15)-1 - * (which is far in excess of normal operating range) to allow - * ids, counts, and their negations (used for thresholding) to fit - * into 16bit fields. + * Field "runState" contains 32 bits needed to register and + * deregister WorkQueues, as well as to enable shutdown. It is + * only modified under a lock (normally briefly held, but + * occasionally protecting allocations and resizings) but even + * when locked remains available to check consistency. * - * Recording Workers. Workers are recorded in the "workers" array - * that is created upon pool construction and expanded if (rarely) - * necessary. This is an array as opposed to some other data - * structure to support index-based random steals by workers. - * Updates to the array recording new workers and unrecording - * terminated ones are protected from each other by a seqLock - * (scanGuard) but the array is otherwise concurrently readable, - * and accessed directly by workers. To simplify index-based + * Recording WorkQueues. WorkQueues are recorded in the + * "workQueues" array that is created upon pool construction and + * expanded if necessary. Updates to the array while recording + * new workers and unrecording terminated ones are protected from + * each other by a lock but the array is otherwise concurrently + * readable, and accessed directly. To simplify index-based * operations, the array size is always a power of two, and all - * readers must tolerate null slots. To avoid flailing during - * start-up, the array is presized to hold twice #parallelism - * workers (which is unlikely to need further resizing during - * execution). But to avoid dealing with so many null slots, - * variable scanGuard includes a mask for the nearest power of two - * that contains all current workers. All worker thread creation - * is on-demand, triggered by task submissions, replacement of - * terminated workers, and/or compensation for blocked - * workers. However, all other support code is set up to work with - * other policies. To ensure that we do not hold on to worker - * references that would prevent GC, ALL accesses to workers are - * via indices into the workers array (which is one source of some - * of the messy code constructions here). In essence, the workers - * array serves as a weak reference mechanism. Thus for example - * the wait queue field of ctl stores worker indices, not worker - * references. Access to the workers in associated methods (for - * example signalWork) must both index-check and null-check the - * IDs. All such accesses ignore bad IDs by returning out early - * from what they are doing, since this can only be associated - * with termination, in which case it is OK to give up. + * readers must tolerate null slots. Shared (submission) queues + * are at even indices, worker queues at odd indices. Grouping + * them together in this way simplifies and speeds up task + * scanning. * - * All uses of the workers array, as well as queue arrays, check - * that the array is non-null (even if previously non-null). This - * allows nulling during termination, which is currently not - * necessary, but remains an option for resource-revocation-based - * shutdown schemes. + * All worker thread creation is on-demand, triggered by task + * submissions, replacement of terminated workers, and/or + * compensation for blocked workers. However, all other support + * code is set up to work with other policies. To ensure that we + * do not hold on to worker references that would prevent GC, ALL + * accesses to workQueues are via indices into the workQueues + * array (which is one source of some of the messy code + * constructions here). In essence, the workQueues array serves as + * a weak reference mechanism. Thus for example the wait queue + * field of ctl stores indices, not references. Access to the + * workQueues in associated methods (for example signalWork) must + * both index-check and null-check the IDs. All such accesses + * ignore bad IDs by returning out early from what they are doing, + * since this can only be associated with termination, in which + * case it is OK to give up. All uses of the workQueues array + * also check that it is non-null (even if previously + * non-null). This allows nulling during termination, which is + * currently not necessary, but remains an option for + * resource-revocation-based shutdown schemes. It also helps + * reduce JIT issuance of uncommon-trap code, which tends to + * unnecessarily complicate control flow in some methods. * - * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot + * Event Queuing. Unlike HPC work-stealing frameworks, we cannot * let workers spin indefinitely scanning for tasks when none can * be found immediately, and we cannot start/resume workers unless * there appear to be tasks available. On the other hand, we must * quickly prod them into action when new tasks are submitted or - * generated. We park/unpark workers after placing in an event - * wait queue when they cannot find work. This "queue" is actually - * a simple Treiber stack, headed by the "id" field of ctl, plus a - * 15bit counter value to both wake up waiters (by advancing their - * count) and avoid ABA effects. Successors are held in worker - * field "nextWait". Queuing deals with several intrinsic races, - * mainly that a task-producing thread can miss seeing (and + * generated. In many usages, ramp-up time to activate workers is + * the main limiting factor in overall performance (this is + * compounded at program start-up by JIT compilation and + * allocation). So we try to streamline this as much as possible. + * We park/unpark workers after placing in an event wait queue + * when they cannot find work. This "queue" is actually a simple + * Treiber stack, headed by the "id" field of ctl, plus a 15bit + * counter value (that reflects the number of times a worker has + * been inactivated) to avoid ABA effects (we need only as many + * version numbers as worker threads). Successors are held in + * field WorkQueue.nextWait. Queuing deals with several intrinsic + * races, mainly that a task-producing thread can miss seeing (and * signalling) another thread that gave up looking for work but * has not yet entered the wait queue. We solve this by requiring - * a full sweep of all workers both before (in scan()) and after - * (in tryAwaitWork()) a newly waiting worker is added to the wait - * queue. During a rescan, the worker might release some other - * queued worker rather than itself, which has the same net - * effect. Because enqueued workers may actually be rescanning - * rather than waiting, we set and clear the "parked" field of - * ForkJoinWorkerThread to reduce unnecessary calls to unpark. - * (Use of the parked field requires a secondary recheck to avoid - * missed signals.) + * a full sweep of all workers (via repeated calls to method + * scan()) both before and after a newly waiting worker is added + * to the wait queue. During a rescan, the worker might release + * some other queued worker rather than itself, which has the same + * net effect. Because enqueued workers may actually be rescanning + * rather than waiting, we set and clear the "parker" field of + * WorkQueues to reduce unnecessary calls to unpark. (This + * requires a secondary recheck to avoid missed signals.) Note + * the unusual conventions about Thread.interrupts surrounding + * parking and other blocking: Because interrupts are used solely + * to alert threads to check termination, which is checked anyway + * upon blocking, we clear status (using Thread.interrupted) + * before any call to park, so that park does not immediately + * return due to status being set via some other unrelated call to + * interrupt in user code. * * Signalling. We create or wake up workers only when there * appears to be at least one task they might be able to find and * execute. When a submission is added or another worker adds a - * task to a queue that previously had two or fewer tasks, they + * task to a queue that previously had fewer than two tasks, they * signal waiting workers (or trigger creation of new ones if * fewer than the given parallelism level -- see signalWork). - * These primary signals are buttressed by signals during rescans - * as well as those performed when a worker steals a task and - * notices that there are more tasks too; together these cover the - * signals needed in cases when more than two tasks are pushed - * but untaken. + * These primary signals are buttressed by signals during rescans; + * together these cover the signals needed in cases when more + * tasks are pushed but untaken, and improve performance compared + * to having one thread wake up all workers. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will @@ -265,105 +354,158 @@ * SHRINK_RATE nanosecs. This will slowly propagate, eventually * terminating all workers after long periods of non-use. * - * Submissions. External submissions are maintained in an - * array-based queue that is structured identically to - * ForkJoinWorkerThread queues except for the use of - * submissionLock in method addSubmission. Unlike the case for - * worker queues, multiple external threads can add new - * submissions, so adding requires a lock. + * Shutdown and Termination. A call to shutdownNow atomically sets + * a runState bit and then (non-atomically) sets each worker's + * runState status, cancels all unprocessed tasks, and wakes up + * all waiting workers. Detecting whether termination should + * commence after a non-abrupt shutdown() call requires more work + * and bookkeeping. We need consensus about quiescence (i.e., that + * there is no more work). The active count provides a primary + * indication but non-abrupt shutdown still requires a rechecking + * scan for any workers that are inactive but not queued. * - * Compensation. Beyond work-stealing support and lifecycle - * control, the main responsibility of this framework is to take - * actions when one worker is waiting to join a task stolen (or - * always held by) another. Because we are multiplexing many - * tasks on to a pool of workers, we can't just let them block (as - * in Thread.join). We also cannot just reassign the joiner's - * run-time stack with another and replace it later, which would - * be a form of "continuation", that even if possible is not - * necessarily a good idea since we sometimes need both an - * unblocked task and its continuation to progress. Instead we - * combine two tactics: + * Joining Tasks + * ============= + * + * Any of several actions may be taken when one worker is waiting + * to join a task stolen (or always held) by another. Because we + * are multiplexing many tasks on to a pool of workers, we can't + * just let them block (as in Thread.join). We also cannot just + * reassign the joiner's run-time stack with another and replace + * it later, which would be a form of "continuation", that even if + * possible is not necessarily a good idea since we sometimes need + * both an unblocked task and its continuation to progress. + * Instead we combine two tactics: * * Helping: Arranging for the joiner to execute some task that it - * would be running if the steal had not occurred. Method - * ForkJoinWorkerThread.joinTask tracks joining->stealing - * links to try to find such a task. + * would be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, - * method tryPreBlock() may create or re-activate a spare - * thread to compensate for blocked joiners until they - * unblock. + * method tryCompensate() may create or re-activate a spare + * thread to compensate for blocked joiners until they unblock. + * + * A third form (implemented in tryRemoveAndExec and + * tryPollForAndExec) amounts to helping a hypothetical + * compensator: If we can readily tell that a possible action of a + * compensator is to steal and execute the task being joined, the + * joining thread can do so directly, without the need for a + * compensation thread (although at the expense of larger run-time + * stacks, but the tradeoff is typically worthwhile). * * The ManagedBlocker extension API can't use helping so relies * only on compensation in method awaitBlocker. * + * The algorithm in tryHelpStealer entails a form of "linear" + * helping: Each worker records (in field currentSteal) the most + * recent task it stole from some other worker. Plus, it records + * (in field currentJoin) the task it is currently actively + * joining. Method tryHelpStealer uses these markers to try to + * find a worker to help (i.e., steal back a task from and execute + * it) that could hasten completion of the actively joined task. + * In essence, the joiner executes a task that would be on its own + * local deque had the to-be-joined task not been stolen. This may + * be seen as a conservative variant of the approach in Wagner & + * Calder "Leapfrogging: a portable technique for implementing + * efficient futures" SIGPLAN Notices, 1993 + * (http://portal.acm.org/citation.cfm?id=155354). It differs in + * that: (1) We only maintain dependency links across workers upon + * steals, rather than use per-task bookkeeping. This sometimes + * requires a linear scan of workQueues array to locate stealers, + * but often doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. A stealHint is only a + * hint because a worker might have had multiple steals and the + * hint records only one of them (usually the most current). + * Hinting isolates cost to when it is needed, rather than adding + * to per-task overhead. (2) It is "shallow", ignoring nesting + * and potentially cyclic mutual steals. (3) It is intentionally + * racy: field currentJoin is updated only while actively joining, + * which means that we miss links in the chain during long-lived + * tasks, GC stalls etc (which is OK since blocking in such cases + * is usually a good idea). (4) We bound the number of attempts + * to find work (see MAX_HELP) and fall back to suspending the + * worker and if necessary replacing it with another. + * * It is impossible to keep exactly the target parallelism number * of threads running at any given time. Determining the * existence of conservatively safe helping targets, the * availability of already-created spares, and the apparent need - * to create new spares are all racy and require heuristic - * guidance, so we rely on multiple retries of each. Currently, - * in keeping with on-demand signalling policy, we compensate only - * if blocking would leave less than one active (non-waiting, - * non-blocked) worker. Additionally, to avoid some false alarms - * due to GC, lagging counters, system activity, etc, compensated - * blocking for joins is only attempted after rechecks stabilize - * (retries are interspersed with Thread.yield, for good - * citizenship). The variable blockedCount, incremented before - * blocking and decremented after, is sometimes needed to - * distinguish cases of waiting for work vs blocking on joins or - * other managed sync. Both cases are equivalent for most pool - * control, so we can update non-atomically. (Additionally, - * contention on blockedCount alleviates some contention on ctl). - * - * Shutdown and Termination. A call to shutdownNow atomically sets - * the ctl stop bit and then (non-atomically) sets each workers - * "terminate" status, cancels all unprocessed tasks, and wakes up - * all waiting workers. Detecting whether termination should - * commence after a non-abrupt shutdown() call requires more work - * and bookkeeping. We need consensus about quiesence (i.e., that - * there is no more work) which is reflected in active counts so - * long as there are no current blockers, as well as possible - * re-evaluations during independent changes in blocking or - * quiescing workers. + * to create new spares are all racy, so we rely on multiple + * retries of each. Compensation in the apparent absence of + * helping opportunities is challenging to control on JVMs, where + * GC and other activities can stall progress of tasks that in + * turn stall out many other dependent tasks, without us being + * able to determine whether they will ever require compensation. + * Even though work-stealing otherwise encounters little + * degradation in the presence of more threads than cores, + * aggressively adding new threads in such cases entails risk of + * unwanted positive feedback control loops in which more threads + * cause more dependent stalls (as well as delayed progress of + * unblocked threads to the point that we know they are available) + * leading to more situations requiring more threads, and so + * on. This aspect of control can be seen as an (analytically + * intractable) game with an opponent that may choose the worst + * (for us) active thread to stall at any time. We take several + * precautions to bound losses (and thus bound gains), mainly in + * methods tryCompensate and awaitJoin: (1) We only try + * compensation after attempting enough helping steps (measured + * via counting and timing) that we have already consumed the + * estimated cost of creating and activating a new thread. (2) We + * allow up to 50% of threads to be blocked before initially + * adding any others, and unless completely saturated, check that + * some work is available for a new worker before adding. Also, we + * create up to only 50% more threads until entering a mode that + * only adds a thread if all others are possibly blocked. All + * together, this means that we might be half as fast to react, + * and create half as many threads as possible in the ideal case, + * but present vastly fewer anomalies in all other cases compared + * to both more aggressive and more conservative alternatives. * * Style notes: There is a lot of representation-level coupling * among classes ForkJoinPool, ForkJoinWorkerThread, and - * ForkJoinTask. Most fields of ForkJoinWorkerThread maintain - * data structures managed by ForkJoinPool, so are directly - * accessed. Conversely we allow access to "workers" array by - * workers, and direct access to ForkJoinTask.status by both - * ForkJoinPool and ForkJoinWorkerThread. There is little point - * trying to reduce this, since any associated future changes in - * representations will need to be accompanied by algorithmic - * changes anyway. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, at the expense of - * some ugliness. + * ForkJoinTask. The fields of WorkQueue maintain data structures + * managed by ForkJoinPool, so are directly accessed. There is + * little point trying to reduce this, since any associated future + * changes in representations will need to be accompanied by + * algorithmic changes anyway. Several methods intrinsically + * sprawl because they must accumulate sets of consistent reads of + * volatiles held in local variables. Methods signalWork() and + * scan() are the main bottlenecks, so are especially heavily + * micro-optimized/mangled. There are lots of inline assignments + * (of form "while ((local = field) != 0)") which are usually the + * simplest way to ensure the required read orderings (which are + * sometimes critical). This leads to a "C"-like style of listing + * declarations of these locals at the heads of methods or blocks. + * There are several occurrences of the unusual "do {} while + * (!cas...)" which is the simplest way to force an update of a + * CAS'ed variable. There are also other coding oddities that help + * some methods perform reasonably even when interpreted (not + * compiled). * - * Methods signalWork() and scan() are the main bottlenecks so are - * especially heavily micro-optimized/mangled. There are lots of - * inline assignments (of form "while ((local = field) != 0)") - * which are usually the simplest way to ensure the required read - * orderings (which are sometimes critical). This leads to a - * "C"-like style of listing declarations of these locals at the - * heads of methods or blocks. There are several occurrences of - * the unusual "do {} while (!cas...)" which is the simplest way - * to force an update of a CAS'ed variable. There are also other - * coding oddities that help some methods perform reasonably even - * when interpreted (not compiled). - * - * The order of declarations in this file is: (1) declarations of - * statics (2) fields (along with constants used when unpacking - * some of them), listed in an order that tends to reduce - * contention among them a bit under most JVMs. (3) internal - * control methods (4) callbacks and other support for - * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported - * methods (plus a few little helpers). (6) static block - * initializing all statics in a minimally dependent order. + * The order of declarations in this file is: + * (1) Static utility functions + * (2) Nested (static) classes + * (3) Static fields + * (4) Fields, along with constants used when unpacking some of them + * (5) Internal control methods + * (6) Callbacks and other support for ForkJoinTask methods + * (7) Exported methods + * (8) Static block initializing statics in minimally dependent order */ + // Static utilities + + /** + * If there is a security manager, makes sure caller has + * permission to modify threads. + */ + private static void checkPermission() { + SecurityManager security = System.getSecurityManager(); + if (security != null) + security.checkPermission(modifyThreadPermission); + } + + // Nested classes + /** * Factory for creating new {@link ForkJoinWorkerThread}s. * A {@code ForkJoinWorkerThreadFactory} must be defined and used @@ -392,6 +534,582 @@ } /** + * A simple non-reentrant lock used for exclusion when managing + * queues and workers. We use a custom lock so that we can readily + * probe lock state in constructions that check among alternative + * actions. The lock is normally only very briefly held, and + * sometimes treated as a spinlock, but other usages block to + * reduce overall contention in those cases where locked code + * bodies perform allocation/resizing. + */ + static final class Mutex extends AbstractQueuedSynchronizer { + public final boolean tryAcquire(int ignore) { + return compareAndSetState(0, 1); + } + public final boolean tryRelease(int ignore) { + setState(0); + return true; + } + public final void lock() { acquire(0); } + public final void unlock() { release(0); } + public final boolean isHeldExclusively() { return getState() == 1; } + public final Condition newCondition() { return new ConditionObject(); } + } + + /** + * Class for artificial tasks that are used to replace the target + * of local joins if they are removed from an interior queue slot + * in WorkQueue.tryRemoveAndExec. We don't need the proxy to + * actually do anything beyond having a unique identity. + */ + static final class EmptyTask extends ForkJoinTask<Void> { + EmptyTask() { status = ForkJoinTask.NORMAL; } // force done + public final Void getRawResult() { return null; } + public final void setRawResult(Void x) {} + public final boolean exec() { return true; } + } + + /** + * Queues supporting work-stealing as well as external task + * submission. See above for main rationale and algorithms. + * Implementation relies heavily on "Unsafe" intrinsics + * and selective use of "volatile": + * + * Field "base" is the index (mod array.length) of the least valid + * queue slot, which is always the next position to steal (poll) + * from if nonempty. Reads and writes require volatile orderings + * but not CAS, because updates are only performed after slot + * CASes. + * + * Field "top" is the index (mod array.length) of the next queue + * slot to push to or pop from. It is written only by owner thread + * for push, or under lock for trySharedPush, and accessed by + * other threads only after reading (volatile) base. Both top and + * base are allowed to wrap around on overflow, but (top - base) + * (or more commonly -(base - top) to force volatile read of base + * before top) still estimates size. + * + * The array slots are read and written using the emulation of + * volatiles/atomics provided by Unsafe. Insertions must in + * general use putOrderedObject as a form of releasing store to + * ensure that all writes to the task object are ordered before + * its publication in the queue. (Although we can avoid one case + * of this when locked in trySharedPush.) All removals entail a + * CAS to null. The array is always a power of two. To ensure + * safety of Unsafe array operations, all accesses perform + * explicit null checks and implicit bounds checks via + * power-of-two masking. + * + * In addition to basic queuing support, this class contains + * fields described elsewhere to control execution. It turns out + * to work better memory-layout-wise to include them in this + * class rather than a separate class. + * + * Performance on most platforms is very sensitive to placement of + * instances of both WorkQueues and their arrays -- we absolutely + * do not want multiple WorkQueue instances or multiple queue + * arrays sharing cache lines. (It would be best for queue objects + * and their arrays to share, but there is nothing available to + * help arrange that). Unfortunately, because they are recorded + * in a common array, WorkQueue instances are often moved to be + * adjacent by garbage collectors. To reduce impact, we use field + * padding that works OK on common platforms; this effectively + * trades off slightly slower average field access for the sake of + * avoiding really bad worst-case access. (Until better JVM + * support is in place, this padding is dependent on transient + * properties of JVM field layout rules.) We also take care in + * allocating, sizing and resizing the array. Non-shared queue + * arrays are initialized (via method growArray) by workers before + * use. Others are allocated on first use. + */ + static final class WorkQueue { + /** + * Capacity of work-stealing queue array upon initialization. + * Must be a power of two; at least 4, but should be larger to + * reduce or eliminate cacheline sharing among queues. + * Currently, it is much larger, as a partial workaround for + * the fact that JVMs often place arrays in locations that + * share GC bookkeeping (especially cardmarks) such that + * per-write accesses encounter serious memory contention. + */ + static final int INITIAL_QUEUE_CAPACITY = 1 << 13; + + /** + * Maximum size for queue arrays. Must be a power of two less + * than or equal to 1 << (31 - width of array entry) to ensure + * lack of wraparound of index calculations, but defined to a + * value a bit less than this to help users trap runaway + * programs before saturating systems. + */ + static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M + + volatile long totalSteals; // cumulative number of steals + int seed; // for random scanning; initialize nonzero + volatile int eventCount; // encoded inactivation count; < 0 if inactive + int nextWait; // encoded record of next event waiter + int rescans; // remaining scans until block + int nsteals; // top-level task executions since last idle + final int mode; // lifo, fifo, or shared + int poolIndex; // index of this queue in pool (or 0) + int stealHint; // index of most recent known stealer + volatile int runState; // 1: locked, -1: terminate; else 0 + volatile int base; // index of next slot for poll + int top; // index of next slot for push + ForkJoinTask<?>[] array; // the elements (initially unallocated) + final ForkJoinPool pool; // the containing pool (may be null) + final ForkJoinWorkerThread owner; // owning thread or null if shared + volatile Thread parker; // == owner during call to park; else null + volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin + ForkJoinTask<?> currentSteal; // current non-local task being executed + // Heuristic padding to ameliorate unfortunate memory placements + Object p00, p01, p02, p03, p04, p05, p06, p07; + Object p08, p09, p0a, p0b, p0c, p0d, p0e; + + WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode) { + this.mode = mode; + this.pool = pool; + this.owner = owner; + // Place indices in the center of array (that is not yet allocated) + base = top = INITIAL_QUEUE_CAPACITY >>> 1; + } + + /** + * Returns the approximate number of tasks in the queue. + */ + final int queueSize() { + int n = base - top; // non-owner callers must read base first + return (n >= 0) ? 0 : -n; // ignore transient negative + } + + /** + * Provides a more accurate estimate of whether this queue has + * any tasks than does queueSize, by checking whether a + * near-empty queue has at least one unclaimed task. + */ + final boolean isEmpty() { + ForkJoinTask<?>[] a; int m, s; + int n = base - (s = top); + return (n >= 0 || + (n == -1 && + ((a = array) == null || + (m = a.length - 1) < 0 || + U.getObjectVolatile + (a, ((m & (s - 1)) << ASHIFT) + ABASE) == null))); + } + + /** + * Pushes a task. Call only by owner in unshared queues. + * + * @param task the task. Caller must ensure non-null. + * @throw RejectedExecutionException if array cannot be resized + */ + final void push(ForkJoinTask<?> task) { + ForkJoinTask<?>[] a; ForkJoinPool p; + int s = top, m, n; + if ((a = array) != null) { // ignore if queue removed + U.putOrderedObject + (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task); + if ((n = (top = s + 1) - base) <= 2) { + if ((p = pool) != null) + p.signalWork(); + } + else if (n >= m) + growArray(true); + } + } + + /** + * Pushes a task if lock is free and array is either big + * enough or can be resized to be big enough. + * + * @param task the task. Caller must ensure non-null. + * @return true if submitted + */ + final boolean trySharedPush(ForkJoinTask<?> task) { + boolean submitted = false; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + ForkJoinTask<?>[] a = array; + int s = top; + try { + if ((a != null && a.length > s + 1 - base) || + (a = growArray(false)) != null) { // must presize + int j = (((a.length - 1) & s) << ASHIFT) + ABASE; + U.putObject(a, (long)j, task); // don't need "ordered" + top = s + 1; + submitted = true; + } + } finally { + runState = 0; // unlock + } + } + return submitted; + } + + /** + * Takes next task, if one exists, in LIFO order. Call only + * by owner in unshared queues. (We do not have a shared + * version of this method because it is never needed.) + */ + final ForkJoinTask<?> pop() { + ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + return t; + } + } + } + return null; + } + + /** + * Takes a task in FIFO order if b is base of queue and a task + * can be claimed without contention. Specialized versions + * appear in ForkJoinPool methods scan and tryHelpStealer. + */ + final ForkJoinTask<?> pollAt(int b) { + ForkJoinTask<?> t; ForkJoinTask<?>[] a; + if ((a = array) != null) { + int j = (((a.length - 1) & b) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && + base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + return t; + } + } + return null; + } + + /** + * Takes next task, if one exists, in FIFO order. + */ + final ForkJoinTask<?> poll() { + ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; + while ((b = base) - top < 0 && (a = array) != null) { + int j = (((a.length - 1) & b) << ASHIFT) + ABASE; + t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); + if (t != null) { + if (base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + return t; + } + } + else if (base == b) { + if (b + 1 == top) + break; + Thread.yield(); // wait for lagging update + } + } + return null; + } + + /** + * Takes next task, if one exists, in order specified by mode. + */ + final ForkJoinTask<?> nextLocalTask() { + return mode == 0 ? pop() : poll(); + } + + /** + * Returns next task, if one exists, in order specified by mode. + */ + final ForkJoinTask<?> peek() { + ForkJoinTask<?>[] a = array; int m; + if (a == null || (m = a.length - 1) < 0) + return null; + int i = mode == 0 ? top - 1 : base; + int j = ((i & m) << ASHIFT) + ABASE; + return (ForkJoinTask<?>)U.getObjectVolatile(a, j); + } + + /** + * Pops the given task only if it is at the current top. + */ + final boolean tryUnpush(ForkJoinTask<?> t) { + ForkJoinTask<?>[] a; int s; + if ((a = array) != null && (s = top) != base && + U.compareAndSwapObject + (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { + top = s; + return true; + } + return false; + } + + /** + * Polls the given task only if it is at the current base. + */ + final boolean pollFor(ForkJoinTask<?> task) { + ForkJoinTask<?>[] a; int b; + if ((b = base) - top < 0 && (a = array) != null) { + int j = (((a.length - 1) & b) << ASHIFT) + ABASE; + if (U.getObjectVolatile(a, j) == task && base == b && + U.compareAndSwapObject(a, j, task, null)) { + base = b + 1; + return true; + } + } + return false; + } + + /** + * Initializes or doubles the capacity of array. Call either + * by owner or with lock held -- it is OK for base, but not + * top, to move while resizings are in progress. + * + * @param rejectOnFailure if true, throw exception if capacity + * exceeded (relayed ultimately to user); else return null. + */ + final ForkJoinTask<?>[] growArray(boolean rejectOnFailure) { + ForkJoinTask<?>[] oldA = array; + int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; + if (size <= MAXIMUM_QUEUE_CAPACITY) { + int oldMask, t, b; + ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; + if (oldA != null && (oldMask = oldA.length - 1) >= 0 && + (t = top) - (b = base) > 0) { + int mask = size - 1; + do { + ForkJoinTask<?> x; + int oldj = ((b & oldMask) << ASHIFT) + ABASE; + int j = ((b & mask) << ASHIFT) + ABASE; + x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); + if (x != null && + U.compareAndSwapObject(oldA, oldj, x, null)) + U.putObjectVolatile(a, j, x); + } while (++b != t); + } + return a; + } + else if (!rejectOnFailure) + return null; + else + throw new RejectedExecutionException("Queue capacity exceeded"); + } + + /** + * Removes and cancels all known tasks, ignoring any exceptions. + */ + final void cancelAll() { + ForkJoinTask.cancelIgnoringExceptions(currentJoin); + ForkJoinTask.cancelIgnoringExceptions(currentSteal); + for (ForkJoinTask<?> t; (t = poll()) != null; ) + ForkJoinTask.cancelIgnoringExceptions(t); + } + + /** + * Computes next value for random probes. Scans don't require + * a very high quality generator, but also not a crummy one. + * Marsaglia xor-shift is cheap and works well enough. Note: + * This is manually inlined in its usages in ForkJoinPool to + * avoid writes inside busy scan loops. + */ + final int nextSeed() { + int r = seed; + r ^= r << 13; + r ^= r >>> 17; + return seed = r ^= r << 5; + } + + // Execution methods + + /** + * Pops and runs tasks until empty. + */ + private void popAndExecAll() { + // A bit faster than repeated pop calls + ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t; + while ((a = array) != null && (m = a.length - 1) >= 0 && + (s = top - 1) - base >= 0 && + (t = ((ForkJoinTask<?>) + U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) + != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + t.doExec(); + } + } + } + + /** + * Polls and runs tasks until empty. + */ + private void pollAndExecAll() { + for (ForkJoinTask<?> t; (t = poll()) != null;) + t.doExec(); + } + + /** + * If present, removes from queue and executes the given task, or + * any other cancelled task. Returns (true) immediately on any CAS + * or consistency check failure so caller can retry. + * + * @return 0 if no progress can be made, else positive + * (this unusual convention simplifies use with tryHelpStealer.) + */ + final int tryRemoveAndExec(ForkJoinTask<?> task) { + int stat = 1; + boolean removed = false, empty = true; + ForkJoinTask<?>[] a; int m, s, b, n; + if ((a = array) != null && (m = a.length - 1) >= 0 && + (n = (s = top) - (b = base)) > 0) { + for (ForkJoinTask<?> t;;) { // traverse from s to b + int j = ((--s & m) << ASHIFT) + ABASE; + t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); + if (t == null) // inconsistent length + break; + else if (t == task) { + if (s + 1 == top) { // pop + if (!U.compareAndSwapObject(a, j, task, null)) + break; + top = s; + removed = true; + } + else if (base == b) // replace with proxy + removed = U.compareAndSwapObject(a, j, task, + new EmptyTask()); + break; + } + else if (t.status >= 0) + empty = false; + else if (s + 1 == top) { // pop and throw away + if (U.compareAndSwapObject(a, j, t, null)) + top = s; + break; + } + if (--n == 0) { + if (!empty && base == b) + stat = 0; + break; + } + } + } + if (removed) + task.doExec(); + return stat; + } + + /** + * Executes a top-level task and any local tasks remaining + * after execution. + */ + final void runTask(ForkJoinTask<?> t) { + if (t != null) { + currentSteal = t; + t.doExec(); + if (top != base) { // process remaining local tasks + if (mode == 0) + popAndExecAll(); + else + pollAndExecAll(); + } + ++nsteals; + currentSteal = null; + } + } + + /** + * Executes a non-top-level (stolen) task. + */ + final void runSubtask(ForkJoinTask<?> t) { + if (t != null) { + ForkJoinTask<?> ps = currentSteal; + currentSteal = t; + t.doExec(); + currentSteal = ps; + } + } + + /** + * Returns true if owned and not known to be blocked. + */ + final boolean isApparentlyUnblocked() { + Thread wt; Thread.State s; + return (eventCount >= 0 && + (wt = owner) != null && + (s = wt.getState()) != Thread.State.BLOCKED && + s != Thread.State.WAITING && + s != Thread.State.TIMED_WAITING); + } + + /** + * If this owned and is not already interrupted, try to + * interrupt and/or unpark, ignoring exceptions. + */ + final void interruptOwner() { + Thread wt, p; + if ((wt = owner) != null && !wt.isInterrupted()) { + try { + wt.interrupt(); + } catch (SecurityException ignore) { + } + } + if ((p = parker) != null) + U.unpark(p); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe U; + private static final long RUNSTATE; + private static final int ABASE; + private static final int ASHIFT; + static { + int s; + try { + U = sun.misc.Unsafe.getUnsafe(); + Class<?> k = WorkQueue.class; + Class<?> ak = ForkJoinTask[].class; + RUNSTATE = U.objectFieldOffset + (k.getDeclaredField("runState")); + ABASE = U.arrayBaseOffset(ak); + s = U.arrayIndexScale(ak); + } catch (Exception e) { + throw new Error(e); + } + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(s); + } + } + + /** + * Per-thread records for threads that submit to pools. Currently + * holds only pseudo-random seed / index that is used to choose + * submission queues in method doSubmit. In the future, this may + * also incorporate a means to implement different task rejection + * and resubmission policies. + * + * Seeds for submitters and workers/workQueues work in basically + * the same way but are initialized and updated using slightly + * different mechanics. Both are initialized using the same + * approach as in class ThreadLocal, where successive values are + * unlikely to collide with previous values. This is done during + * registration for workers, but requires a separate AtomicInteger + * for submitters. Seeds are then randomly modified upon + * collisions using xorshifts, which requires a non-zero seed. + */ + static final class Submitter { + int seed; + Submitter() { + int s = nextSubmitterSeed.getAndAdd(SEED_INCREMENT); + seed = (s == 0) ? 1 : s; // ensure non-zero + } + } + + /** ThreadLocal class for Submitters */ + static final class ThreadSubmitter extends ThreadLocal<Submitter> { + public Submitter initialValue() { return new Submitter(); } + } + + // static fields (initialized in static initializer below) + + /** * Creates a new ForkJoinWorkerThread. This factory is used unless * overridden in ForkJoinPool constructors. */ @@ -399,107 +1117,86 @@ defaultForkJoinWorkerThreadFactory; /** + * Generator for assigning sequence numbers as pool names. + */ + private static final AtomicInteger poolNumberGenerator; + + /** + * Generator for initial hashes/seeds for submitters. Accessed by + * Submitter class constructor. + */ + static final AtomicInteger nextSubmitterSeed; + + /** * Permission required for callers of methods that may start or * kill threads. */ private static final RuntimePermission modifyThreadPermission; /** - * If there is a security manager, makes sure caller has - * permission to modify threads. + * Per-thread submission bookeeping. Shared across all pools + * to reduce ThreadLocal pollution and because random motion + * to avoid contention in one pool is likely to hold for others. */ - private static void checkPermission() { - SecurityManager security = System.getSecurityManager(); - if (security != null) - security.checkPermission(modifyThreadPermission); - } + private static final ThreadSubmitter submitters; - /** - * Generator for assigning sequence numbers as pool names. - */ - private static final AtomicInteger poolNumberGenerator; + // static constants /** - * Generator for initial random seeds for worker victim - * selection. This is used only to create initial seeds. Random - * steals use a cheaper xorshift generator per steal attempt. We - * don't expect much contention on seedGenerator, so just use a - * plain Random. + * The wakeup interval (in nanoseconds) for a worker waiting for a + * task when the pool is quiescent to instead try to shrink the + * number of workers. The exact value does not matter too + * much. It must be short enough to release resources during + * sustained periods of idleness, but not so short that threads + * are continually re-created. */ - static final Random workerSeedGenerator; + private static final long SHRINK_RATE = + 4L * 1000L * 1000L * 1000L; // 4 seconds /** - * Array holding all worker threads in the pool. Initialized upon - * construction. Array size must be a power of two. Updates and - * replacements are protected by scanGuard, but the array is - * always kept in a consistent enough state to be randomly - * accessed without locking by workers performing work-stealing, - * as well as other traversal-based methods in this class, so long - * as reads memory-acquire by first reading ctl. All readers must - * tolerate that some array slots may be null. + * The timeout value for attempted shrinkage, includes + * some slop to cope with system timer imprecision. */ - ForkJoinWorkerThread[] workers; - - /** - * Initial size for submission queue array. Must be a power of - * two. In many applications, these always stay small so we use a - * small initial cap. - */ - private static final int INITIAL_QUEUE_CAPACITY = 8; + private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10); /** - * Maximum size for submission queue array. Must be a power of two - * less than or equal to 1 << (31 - width of array entry) to - * ensure lack of index wraparound, but is capped at a lower - * value to help users trap runaway computations. - */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M - - /** - * Array serving as submission queue. Initialized upon construction. + * The maximum stolen->joining link depth allowed in method + * tryHelpStealer. Must be a power of two. This value also + * controls the maximum number of times to try to help join a task + * without any apparent progress or change in pool state before + * giving up and blocking (see awaitJoin). Depths for legitimate + * chains are unbounded, but we use a fixed constant to avoid + * (otherwise unchecked) cycles and to bound staleness of + * traversal parameters at the expense of sometimes blocking when + * we could be helping. */ - private ForkJoinTask<?>[] submissionQueue; - - /** - * Lock protecting submissions array for addSubmission - */ - private final ReentrantLock submissionLock; - - /** - * Condition for awaitTermination, using submissionLock for - * convenience. - */ - private final Condition termination; + private static final int MAX_HELP = 64; /** - * Creation factory for worker threads. + * Secondary time-based bound (in nanosecs) for helping attempts + * before trying compensated blocking in awaitJoin. Used in + * conjunction with MAX_HELP to reduce variance due to different + * polling rates associated with different helping options. The + * value should roughly approximate the time required to create + * and/or activate a worker thread. */ - private final ForkJoinWorkerThreadFactory factory; - - /** - * The uncaught exception handler used when any worker abruptly - * terminates. - */ - final Thread.UncaughtExceptionHandler ueh; + private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec /** - * Prefix for assigning names to worker threads + * Increment for seed generators. See class ThreadLocal for + * explanation. */ - private final String workerNamePrefix; + private static final int SEED_INCREMENT = 0x61c88647; /** - * Sum of per-thread steal counts, updated only when threads are - * idle or terminating. - */ - private volatile long stealCount; - - /** - * Main pool control -- a long packed with: + * Bits and masks for control variables + * + * Field ctl is a long packed with: * AC: Number of active running workers minus target parallelism (16 bits) - * TC: Number of total workers minus target parallelism (16bits) + * TC: Number of total workers minus target parallelism (16 bits) * ST: true if pool is terminating (1 bit) * EC: the wait count of top waiting thread (15 bits) - * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) + * ID: poolIndex of top of Treiber stack of waiters (16 bits) * * When convenient, we can extract the upper 32 bits of counts and * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = @@ -508,13 +1205,26 @@ * parallelism and the positionings of fields makes it possible to * perform the most common checks via sign tests of fields: When * ac is negative, there are not enough active workers, when tc is - * negative, there are not enough total workers, when id is - * negative, there is at least one waiting worker, and when e is + * negative, there are not enough total workers, and when e is * negative, the pool is terminating. To deal with these possibly * negative fields, we use casts in and out of "short" and/or * signed shifts to maintain signedness. + * + * When a thread is queued (inactivated), its eventCount field is + * set negative, which is the only way to tell if a worker is + * prevented from executing tasks, even though it must continue to + * scan for them to avoid queuing races. Note however that + * eventCount updates lag releases so usage requires care. + * + * Field runState is an int packed with: + * SHUTDOWN: true if shutdown is enabled (1 bit) + * SEQ: a sequence number updated upon (de)registering workers (30 bits) + * INIT: set true after workQueues array construction (1 bit) + * + * The sequence number enables simple consistency checks: + * Staleness of read-only operations on the workQueues array can + * be checked by comparing runState before vs after the reads. */ - volatile long ctl; // bit positions/shifts for fields private static final int AC_SHIFT = 48; @@ -523,8 +1233,9 @@ private static final int EC_SHIFT = 16; // bounds - private static final int MAX_ID = 0x7fff; // max poolIndex - private static final int SMASK = 0xffff; // mask short bits + private static final int SMASK = 0xffff; // short bits + private static final int MAX_CAP = 0x7fff; // max #workers - 1 + private static final int SQMASK = 0xfffe; // even short bits private static final int SHORT_SIGN = 1 << 15; private static final int INT_SIGN = 1 << 31; @@ -546,843 +1257,757 @@ private static final int UTC_UNIT = 1 << UTC_SHIFT; // masks and units for dealing with e = (int)ctl - private static final int E_MASK = 0x7fffffff; // no STOP_BIT - private static final int EC_UNIT = 1 << EC_SHIFT; - - /** - * The target parallelism level. - */ - final int parallelism; + private static final int E_MASK = 0x7fffffff; // no STOP_BIT + private static final int E_SEQ = 1 << EC_SHIFT; - /** - * Index (mod submission queue length) of next element to take - * from submission queue. Usage is identical to that for - * per-worker queues -- see ForkJoinWorkerThread internal - * documentation. - */ - volatile int queueBase; + // runState bits + private static final int SHUTDOWN = 1 << 31; - /** - * Index (mod submission queue length) of next element to add - * in submission queue. Usage is identical to that for - * per-worker queues -- see ForkJoinWorkerThread internal - * documentation. - */ - int queueTop; + // access mode for WorkQueue + static final int LIFO_QUEUE = 0; + static final int FIFO_QUEUE = 1; + static final int SHARED_QUEUE = -1; - /** - * True when shutdown() has been called. - */ - volatile boolean shutdown; - - /** - * True if use local fifo, not default lifo, for local polling - * Read by, and replicated by ForkJoinWorkerThreads - */ - final boolean locallyFifo; + // Instance fields - /** - * The number of threads in ForkJoinWorkerThreads.helpQuiescePool. - * When non-zero, suppresses automatic shutdown when active - * counts become zero. + /* + * Field layout order in this class tends to matter more than one + * would like. Runtime layout order is only loosely related to + * declaration order and may differ across JVMs, but the following + * empirically works OK on current JVMs. */ - volatile int quiescerCount; - - /** - * The number of threads blocked in join. - */ - volatile int blockedCount; - - /** - * Counter for worker Thread names (unrelated to their poolIndex) - */ - private volatile int nextWorkerNumber; - /** - * The index for the next created worker. Accessed under scanGuard. - */ - private int nextWorkerIndex; + volatile long ctl; // main pool control + final int parallelism; // parallelism level + final int localMode; // per-worker scheduling mode + final int submitMask; // submit queue index bound + int nextSeed; // for initializing worker seeds + volatile int runState; // shutdown status and seq + WorkQueue[] workQueues; // main registry + final Mutex lock; // for registration + final Condition termination; // for awaitTermination + final ForkJoinWorkerThreadFactory factory; // factory for new workers + final Thread.UncaughtExceptionHandler ueh; // per-worker UEH + final AtomicLong stealCount; // collect counts when terminated + final AtomicInteger nextWorkerNumber; // to create worker name string + final String workerNamePrefix; // to create worker name string - /** - * SeqLock and index masking for updates to workers array. Locked - * when SG_UNIT is set. Unlocking clears bit by adding - * SG_UNIT. Staleness of read-only operations can be checked by - * comparing scanGuard to value before the reads. The low 16 bits - * (i.e, anding with SMASK) hold (the smallest power of two - * covering all worker indices, minus one, and is used to avoid - * dealing with large numbers of null slots when the workers array - * is overallocated. - */ - volatile int scanGuard; - - private static final int SG_UNIT = 1 << 16; + // Creating, registering, and deregistering workers /** - * The wakeup interval (in nanoseconds) for a worker waiting for a - * task when the pool is quiescent to instead try to shrink the - * number of workers. The exact value does not matter too - * much. It must be short enough to release resources during - * sustained periods of idleness, but not so short that threads - * are continually re-created. + * Tries to create and start a worker */ - private static final long SHRINK_RATE = - 4L * 1000L * 1000L * 1000L; // 4 seconds + private void addWorker() { + Throwable ex = null; + ForkJoinWorkerThread wt = null; + try { + if ((wt = factory.newThread(this)) != null) { + wt.start(); + return; + } + } catch (Throwable e) { + ex = e; + } + deregisterWorker(wt, ex); // adjust counts etc on failure + } /** - * Top-level loop for worker threads: On each step: if the - * previous step swept through all queues and found no tasks, or - * there are excess threads, then possibly blocks. Otherwise, - * scans for and, if found, executes a task. Returns when pool - * and/or worker terminate. - * - * @param w the worker + * Callback from ForkJoinWorkerThread constructor to assign a + * public name. This must be separate from registerWorker because + * it is called during the "super" constructor call in + * ForkJoinWorkerThread. */ - final void work(ForkJoinWorkerThread w) { - boolean swept = false; // true on empty scans - long c; - while (!w.terminate && (int)(c = ctl) >= 0) { - int a; // active count - if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) - swept = scan(w, a); - else if (tryAwaitWork(w, c)) - swept = false; - } + final String nextWorkerName() { + return workerNamePrefix.concat + (Integer.toString(nextWorkerNumber.addAndGet(1))); } - // Signalling - /** - * Wakes up or creates a worker. + * Callback from ForkJoinWorkerThread constructor to establish its + * poolIndex and record its WorkQueue. To avoid scanning bias due + * to packing entries in front of the workQueues array, we treat + * the array as a simple power-of-two hash table using per-thread + * seed as hash, expanding as needed. + * + * @param w the worker's queue */ - final void signalWork() { - /* - * The while condition is true if: (there is are too few total - * workers OR there is at least one waiter) AND (there are too - * few active workers OR the pool is terminating). The value - * of e distinguishes the remaining cases: zero (no waiters) - * for create, negative if terminating (in which case do - * nothing), else release a waiter. The secondary checks for - * release (non-null array etc) can fail if the pool begins - * terminating after the test, and don't impose any added cost - * because JVMs must perform null and bounds checks anyway. - */ - long c; int e, u; - while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & - (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { - if (e > 0) { // release a waiting worker - int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; - if ((ws = workers) == null || - (i = ~e & SMASK) >= ws.length || - (w = ws[i]) == null) - break; - long nc = (((long)(w.nextWait & E_MASK)) | - ((long)(u + UAC_UNIT) << 32)); - if (w.eventCount == e && - UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - w.eventCount = (e + EC_UNIT) & E_MASK; - if (w.parked) - UNSAFE.unpark(w); - break; + + final void registerWorker(WorkQueue w) { + Mutex lock = this.lock; + lock.lock(); + try { + WorkQueue[] ws = workQueues; + if (w != null && ws != null) { // skip on shutdown/failure + int rs, n = ws.length, m = n - 1; + int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence + w.seed = (s == 0) ? 1 : s; // ensure non-zero seed + int r = (s << 1) | 1; // use odd-numbered indices + if (ws[r &= m] != null) { // collision + int probes = 0; // step by approx half size + int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2; + while (ws[r = (r + step) & m] != null) { + if (++probes >= n) { + workQueues = ws = Arrays.copyOf(ws, n <<= 1); + m = n - 1; + probes = 0; + } + } } + w.eventCount = w.poolIndex = r; // establish before recording + ws[r] = w; // also update seq + runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); } - else if (UNSAFE.compareAndSwapLong - (this, ctlOffset, c, - (long)(((u + UTC_UNIT) & UTC_MASK) | - ((u + UAC_UNIT) & UAC_MASK)) << 32)) { - addWorker(); - break; - } + } finally { + lock.unlock(); } } /** - * Variant of signalWork to help release waiters on rescans. - * Tries once to release a waiter if active count < 0. + * Final callback from terminating worker, as well as upon failure + * to construct or start a worker in addWorker. Removes record of + * worker from array, and adjusts counts. If pool is shutting + * down, tries to complete termination. * - * @return false if failed due to contention, else true + * @param wt the worker thread or null if addWorker failed + * @param ex the exception causing failure, or null if none + */ + final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { + Mutex lock = this.lock; + WorkQueue w = null; + if (wt != null && (w = wt.workQueue) != null) { + w.runState = -1; // ensure runState is set + stealCount.getAndAdd(w.totalSteals + w.nsteals); + int idx = w.poolIndex; + lock.lock(); + try { // remove record from array + WorkQueue[] ws = workQueues; + if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + } finally { + lock.unlock(); + } + } + + long c; // adjust ctl counts + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | + ((c - TC_UNIT) & TC_MASK) | + (c & ~(AC_MASK|TC_MASK))))); + + if (!tryTerminate(false, false) && w != null) { + w.cancelAll(); // cancel remaining tasks + if (w.array != null) // suppress signal if never ran + signalWork(); // wake up or create replacement + if (ex == null) // help clean refs on way out + ForkJoinTask.helpExpungeStaleExceptions(); + } + + if (ex != null) // rethrow + U.throwException(ex); + } + + + // Submissions + + /** + * Unless shutting down, adds the given task to a submission queue + * at submitter's current queue index (modulo submission + * range). If no queue exists at the index, one is created. If + * the queue is busy, another index is randomly chosen. The + * submitMask bounds the effective number of queues to the + * (nearest power of two for) parallelism level. + * + * @param task the task. Caller must ensure non-null. */ - private boolean tryReleaseWaiter() { - long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; - if ((e = (int)(c = ctl)) > 0 && - (int)(c >> AC_SHIFT) < 0 && - (ws = workers) != null && - (i = ~e & SMASK) < ws.length && - (w = ws[i]) != null) { - long nc = ((long)(w.nextWait & E_MASK) | - ((c + AC_UNIT) & (AC_MASK|TC_MASK))); - if (w.eventCount != e || - !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) - return false; - w.eventCount = (e + EC_UNIT) & E_MASK; - if (w.parked) - UNSAFE.unpark(w); + private void doSubmit(ForkJoinTask<?> task) { + Submitter s = submitters.get(); + for (int r = s.seed, m = submitMask;;) { + WorkQueue[] ws; WorkQueue q; + int k = r & m & SQMASK; // use only even indices + if (runState < 0 || (ws = workQueues) == null || ws.length <= k) + throw new RejectedExecutionException(); // shutting down + else if ((q = ws[k]) == null) { // create new queue + WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE); + Mutex lock = this.lock; // construct outside lock + lock.lock(); + try { // recheck under lock + int rs = runState; // to update seq + if (ws == workQueues && ws[k] == null) { + ws[k] = nq; + runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN)); + } + } finally { + lock.unlock(); + } + } + else if (q.trySharedPush(task)) { + signalWork(); + return; + } + else if (m > 1) { // move to a different index + r ^= r << 13; // same xorshift as WorkQueues + r ^= r >>> 17; + s.seed = r ^= r << 5; + } + else + Thread.yield(); // yield if no alternatives } - return true; + } + + // Maintaining ctl counts + + /** + * Increments active count; mainly called upon return from blocking. + */ + final void incrementActiveCount() { + long c; + do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT)); + } + + /** + * Tries to activate or create a worker if too few are active. + */ + final void signalWork() { + long c; int u; + while ((u = (int)((c = ctl) >>> 32)) < 0) { // too few active + WorkQueue[] ws = workQueues; int e, i; WorkQueue w; Thread p; + if ((e = (int)c) > 0) { // at least one waiting + if (ws != null && (i = e & SMASK) < ws.length && + (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { + long nc = (((long)(w.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (U.compareAndSwapLong(this, CTL, c, nc)) { + w.eventCount = (e + E_SEQ) & E_MASK; + if ((p = w.parker) != null) + U.unpark(p); // activate and release + break; + } + } + else + break; + } + else if (e == 0 && (u & SHORT_SIGN) != 0) { // too few total + long nc = (long)(((u + UTC_UNIT) & UTC_MASK) | + ((u + UAC_UNIT) & UAC_MASK)) << 32; + if (U.compareAndSwapLong(this, CTL, c, nc)) { + addWorker(); + break; + } + } + else + break; + } } // Scanning for tasks /** - * Scans for and, if found, executes one task. Scans start at a - * random index of workers array, and randomly select the first - * (2*#workers)-1 probes, and then, if all empty, resort to 2 - * circular sweeps, which is necessary to check quiescence. and - * taking a submission only if no stealable tasks were found. The - * steal code inside the loop is a specialized form of - * ForkJoinWorkerThread.deqTask, followed bookkeeping to support - * helpJoinTask and signal propagation. The code for submission - * queues is almost identical. On each steal, the worker completes - * not only the task, but also all local tasks that this task may - * have generated. On detecting staleness or contention when - * trying to take a task, this method returns without finishing - * sweep, which allows global state rechecks before retry. - * - * @param w the worker - * @param a the number of active workers - * @return true if swept all queues without finding a task + * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ - private boolean scan(ForkJoinWorkerThread w, int a) { - int g = scanGuard; // mask 0 avoids useless scans if only one active - int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; - ForkJoinWorkerThread[] ws = workers; - if (ws == null || ws.length <= m) // staleness check - return false; - for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { - ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; - ForkJoinWorkerThread v = ws[k & m]; - if (v != null && (b = v.queueBase) != v.queueTop && - (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { - long u = (i << ASHIFT) + ABASE; - if ((t = q[i]) != null && v.queueBase == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - int d = (v.queueBase = b + 1) - v.queueTop; - v.stealHint = w.poolIndex; - if (d != 0) - signalWork(); // propagate if nonempty - w.execTask(t); - } - r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); - return false; // store next seed - } - else if (j < 0) { // xorshift - r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; - } - else - ++k; - } - if (scanGuard != g) // staleness check - return false; - else { // try to take submission - ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; - if ((b = queueBase) != queueTop && - (q = submissionQueue) != null && - (i = (q.length - 1) & b) >= 0) { - long u = (i << ASHIFT) + ABASE; - if ((t = q[i]) != null && queueBase == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - queueBase = b + 1; - w.execTask(t); - } - return false; - } - return true; // all queues empty - } + final void runWorker(WorkQueue w) { + w.growArray(false); // initialize queue array in this thread + do { w.runTask(scan(w)); } while (w.runState >= 0); } /** - * Tries to enqueue worker w in wait queue and await change in - * worker's eventCount. If the pool is quiescent and there is - * more than one worker, possibly terminates worker upon exit. - * Otherwise, before blocking, rescans queues to avoid missed - * signals. Upon finding work, releases at least one worker - * (which may be the current worker). Rescans restart upon - * detected staleness or failure to release due to - * contention. Note the unusual conventions about Thread.interrupt - * here and elsewhere: Because interrupts are used solely to alert - * threads to check termination, which is checked here anyway, we - * clear status (using Thread.interrupted) before any call to - * park, so that park does not immediately return due to status - * being set via some other unrelated call to interrupt in user - * code. + * Scans for and, if found, returns one task, else possibly + * inactivates the worker. This method operates on single reads of + * volatile state and is designed to be re-invoked continuously, + * in part because it returns upon detecting inconsistencies, + * contention, or state changes that indicate possible success on + * re-invocation. + * + * The scan searches for tasks across a random permutation of + * queues (starting at a random index and stepping by a random + * relative prime, checking each at least once). The scan + * terminates upon either finding a non-empty queue, or completing + * the sweep. If the worker is not inactivated, it takes and + * returns a task from this queue. On failure to find a task, we + * take one of the following actions, after which the caller will + * retry calling this method unless terminated. + * + * * If pool is terminating, terminate the worker. * - * @param w the calling worker - * @param c the ctl value on entry - * @return true if waited or another thread was released upon enq + * * If not a complete sweep, try to release a waiting worker. If + * the scan terminated because the worker is inactivated, then the + * released worker will often be the calling worker, and it can + * succeed obtaining a task on the next call. Or maybe it is + * another worker, but with same net effect. Releasing in other + * cases as well ensures that we have enough workers running. + * + * * If not already enqueued, try to inactivate and enqueue the + * worker on wait queue. Or, if inactivating has caused the pool + * to be quiescent, relay to idleAwaitWork to check for + * termination and possibly shrink pool. + * + * * If already inactive, and the caller has run a task since the + * last empty scan, return (to allow rescan) unless others are + * also inactivated. Field WorkQueue.rescans counts down on each + * scan to ensure eventual inactivation and blocking. + * + * * If already enqueued and none of the above apply, park + * awaiting signal, + * + * @param w the worker (via its WorkQueue) + * @return a task or null of none found */ - private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { - int v = w.eventCount; - w.nextWait = (int)c; // w's successor record - long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); - if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - long d = ctl; // return true if lost to a deq, to force scan - return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; - } - for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount - long s = stealCount; - if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) - sc = w.stealCount = 0; - else if (w.eventCount != v) - return true; // update next time - } - if ((!shutdown || !tryTerminate(false)) && - (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && - blockedCount == 0 && quiescerCount == 0) - idleAwaitWork(w, nc, c, v); // quiescent - for (boolean rescanned = false;;) { - if (w.eventCount != v) - return true; - if (!rescanned) { - int g = scanGuard, m = g & SMASK; - ForkJoinWorkerThread[] ws = workers; - if (ws != null && m < ws.length) { - rescanned = true; - for (int i = 0; i <= m; ++i) { - ForkJoinWorkerThread u = ws[i]; - if (u != null) { - if (u.queueBase != u.queueTop && - !tryReleaseWaiter()) - rescanned = false; // contended - if (w.eventCount != v) - return true; - } + private final ForkJoinTask<?> scan(WorkQueue w) { + WorkQueue[] ws; // first update random seed + int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + int rs = runState, m; // volatile read order matters + if ((ws = workQueues) != null && (m = ws.length - 1) > 0) { + int ec = w.eventCount; // ec is negative if inactive + int step = (r >>> 16) | 1; // relative prime + for (int j = (m + 1) << 2; ; r += step) { + WorkQueue q; ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b; + if ((q = ws[r & m]) != null && (b = q.base) - q.top < 0 && + (a = q.array) != null) { // probably nonempty + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + t = (ForkJoinTask<?>)U.getObjectVolatile(a, i); + if (q.base == b && ec >= 0 && t != null && + U.compareAndSwapObject(a, i, t, null)) { + if (q.top - (q.base = b + 1) > 1) + signalWork(); // help pushes signal + return t; + } + else if (ec < 0 || j <= m) { + rs = 0; // mark scan as imcomplete + break; // caller can retry after release } } - if (scanGuard != g || // stale - (queueBase != queueTop && !tryReleaseWaiter())) - rescanned = false; - if (!rescanned) - Thread.yield(); // reduce contention - else - Thread.interrupted(); // clear before park + if (--j < 0) + break; + } + + long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns; + if (e < 0) // decode ctl on empty scan + w.runState = -1; // pool is terminating + else if (rs == 0 || rs != runState) { // incomplete scan + WorkQueue v; Thread p; // try to release a waiter + if (e > 0 && a < 0 && w.eventCount == ec && + (v = ws[e & m]) != null && v.eventCount == (e | INT_SIGN)) { + long nc = ((long)(v.nextWait & E_MASK) | + ((c + AC_UNIT) & (AC_MASK|TC_MASK))); + if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + } + } } - else { - w.parked = true; // must recheck - if (w.eventCount != v) { - w.parked = false; - return true; + else if (ec >= 0) { // try to enqueue/inactivate + long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); + w.nextWait = e; + w.eventCount = ec | INT_SIGN; // mark as inactive + if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) + w.eventCount = ec; // unmark on CAS failure + else { + if ((ns = w.nsteals) != 0) { + w.nsteals = 0; // set rescans if ran task + w.rescans = (a > 0) ? 0 : a + parallelism; + w.totalSteals += ns; + } + if (a == 1 - parallelism) // quiescent + idleAwaitWork(w, nc, c); } - LockSupport.park(this); - rescanned = w.parked = false; + } + else if (w.eventCount < 0) { // already queued + if ((nr = w.rescans) > 0) { // continue rescanning + int ac = a + parallelism; + if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0) + Thread.yield(); // yield before block + } + else { + Thread.interrupted(); // clear status + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); + w.parker = wt; // emulate LockSupport.park + if (w.eventCount < 0) // recheck + U.park(false, 0L); + w.parker = null; + U.putObject(wt, PARKBLOCKER, null); + } } } + return null; } /** - * If inactivating worker w has caused pool to become - * quiescent, check for pool termination, and wait for event - * for up to SHRINK_RATE nanosecs (rescans are unnecessary in - * this case because quiescence reflects consensus about lack - * of work). On timeout, if ctl has not changed, terminate the - * worker. Upon its termination (see deregisterWorker), it may - * wake up another worker to possibly repeat this process. + * If inactivating worker w has caused the pool to become + * quiescent, checks for pool termination, and, so long as this is + * not the only worker, waits for event for up to SHRINK_RATE + * nanosecs. On timeout, if ctl has not changed, terminates the + * worker, which will in turn wake up another worker to possibly + * repeat this process. * * @param w the calling worker - * @param currentCtl the ctl value after enqueuing w - * @param prevCtl the ctl value if w terminated - * @param v the eventCount w awaits change + * @param currentCtl the ctl value triggering possible quiescence + * @param prevCtl the ctl value to restore if thread is terminated */ - private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, - long prevCtl, int v) { - if (w.eventCount == v) { - if (shutdown) - tryTerminate(false); - ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs + private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { + if (w.eventCount < 0 && !tryTerminate(false, false) && + (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { + Thread wt = Thread.currentThread(); + Thread.yield(); // yield before block while (ctl == currentCtl) { long startTime = System.nanoTime(); - w.parked = true; - if (w.eventCount == v) // must recheck - LockSupport.parkNanos(this, SHRINK_RATE); - w.parked = false; - if (w.eventCount != v) + Thread.interrupted(); // timed variant of version in scan() + U.putObject(wt, PARKBLOCKER, this); + w.parker = wt; + if (ctl == currentCtl) + U.park(false, SHRINK_RATE); + w.parker = null; + U.putObject(wt, PARKBLOCKER, null); + if (ctl != currentCtl) break; - else if (System.nanoTime() - startTime < - SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop - Thread.interrupted(); // spurious wakeup - else if (UNSAFE.compareAndSwapLong(this, ctlOffset, - currentCtl, prevCtl)) { - w.terminate = true; // restore previous - w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; + if (System.nanoTime() - startTime >= SHRINK_TIMEOUT && + U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { + w.eventCount = (w.eventCount + E_SEQ) | E_MASK; + w.runState = -1; // shrink break; } } } } - // Submissions - /** - * Enqueues the given task in the submissionQueue. Same idea as - * ForkJoinWorkerThread.pushTask except for use of submissionLock. + * Tries to locate and execute tasks for a stealer of the given + * task, or in turn one of its stealers, Traces currentSteal -> + * currentJoin links looking for a thread working on a descendant + * of the given task and with a non-empty queue to steal back and + * execute tasks from. The first call to this method upon a + * waiting join will often entail scanning/search, (which is OK + * because the joiner has nothing better to do), but this method + * leaves hints in workers to speed up subsequent calls. The + * implementation is very branchy to cope with potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or so long that they are likely cyclic. * - * @param t the task + * @param joiner the joining worker + * @param task the task to join + * @return 0 if no progress can be made, negative if task + * known complete, else positive */ - private void addSubmission(ForkJoinTask<?> t) { - final ReentrantLock lock = this.submissionLock; - lock.lock(); - try { - ForkJoinTask<?>[] q; int s, m; - if ((q = submissionQueue) != null) { // ignore if queue removed - long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; - UNSAFE.putOrderedObject(q, u, t); - queueTop = s + 1; - if (s - queueBase == m) - growSubmissionQueue(); - } - } finally { - lock.unlock(); - } - signalWork(); - } - - // (pollSubmission is defined below with exported methods) - - /** - * Creates or doubles submissionQueue array. - * Basically identical to ForkJoinWorkerThread version. - */ - private void growSubmissionQueue() { - ForkJoinTask<?>[] oldQ = submissionQueue; - int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; - if (size > MAXIMUM_QUEUE_CAPACITY) - throw new RejectedExecutionException("Queue capacity exceeded"); - if (size < INITIAL_QUEUE_CAPACITY) - size = INITIAL_QUEUE_CAPACITY; - ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size]; - int mask = size - 1; - int top = queueTop; - int oldMask; - if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { - for (int b = queueBase; b != top; ++b) { - long u = ((b & oldMask) << ASHIFT) + ABASE; - Object x = UNSAFE.getObjectVolatile(oldQ, u); - if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) - UNSAFE.putObjectVolatile - (q, ((b & mask) << ASHIFT) + ABASE, x); + private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { + int stat = 0, steps = 0; // bound to avoid cycles + if (joiner != null && task != null) { // hoist null checks + restart: for (;;) { + ForkJoinTask<?> subtask = task; // current target + for (WorkQueue j = joiner, v;;) { // v is stealer of subtask + WorkQueue[] ws; int m, s, h; + if ((s = task.status) < 0) { + stat = s; + break restart; + } + if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) + break restart; // shutting down + if ((v = ws[h = (j.stealHint | 1) & m]) == null || + v.currentSteal != subtask) { + for (int origin = h;;) { // find stealer + if (((h = (h + 2) & m) & 15) == 1 && + (subtask.status < 0 || j.currentJoin != subtask)) + continue restart; // occasional staleness check + if ((v = ws[h]) != null && + v.currentSteal == subtask) { + j.stealHint = h; // save hint + break; + } + if (h == origin) + break restart; // cannot find stealer + } + } + for (;;) { // help stealer or descend to its stealer + ForkJoinTask[] a; int b; + if (subtask.status < 0) // surround probes with + continue restart; // consistency checks + if ((b = v.base) - v.top < 0 && (a = v.array) != null) { + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + ForkJoinTask<?> t = + (ForkJoinTask<?>)U.getObjectVolatile(a, i); + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + stat = 1; // apparent progress + if (t != null && v.base == b && + U.compareAndSwapObject(a, i, t, null)) { + v.base = b + 1; // help stealer + joiner.runSubtask(t); + } + else if (v.base == b && ++steps == MAX_HELP) + break restart; // v apparently stalled + } + else { // empty -- try to descend + ForkJoinTask<?> next = v.currentJoin; + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + else if (next == null || ++steps == MAX_HELP) + break restart; // dead-end or maybe cyclic + else { + subtask = next; + j = v; + break; + } + } + } + } } } - } - - // Blocking support - - /** - * Tries to increment blockedCount, decrement active count - * (sometimes implicitly) and possibly release or create a - * compensating worker in preparation for blocking. Fails - * on contention or termination. - * - * @return true if the caller can block, else should recheck and retry - */ - private boolean tryPreBlock() { - int b = blockedCount; - if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) { - int pc = parallelism; - do { - ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; - int e, ac, tc, rc, i; - long c = ctl; - int u = (int)(c >>> 32); - if ((e = (int)c) < 0) { - // skip -- terminating - } - else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 && - (ws = workers) != null && - (i = ~e & SMASK) < ws.length && - (w = ws[i]) != null) { - long nc = ((long)(w.nextWait & E_MASK) | - (c & (AC_MASK|TC_MASK))); - if (w.eventCount == e && - UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - w.eventCount = (e + EC_UNIT) & E_MASK; - if (w.parked) - UNSAFE.unpark(w); - return true; // release an idle worker - } - } - else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) { - long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); - if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) - return true; // no compensation needed - } - else if (tc + pc < MAX_ID) { - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - addWorker(); - return true; // create a replacement - } - } - // try to back out on any failure and let caller retry - } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, - b = blockedCount, b - 1)); - } - return false; - } - - /** - * Decrements blockedCount and increments active count - */ - private void postBlock() { - long c; - do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask - c = ctl, c + AC_UNIT)); - int b; - do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, - b = blockedCount, b - 1)); - } - - /** - * Possibly blocks waiting for the given task to complete, or - * cancels the task if terminating. Fails to wait if contended. - * - * @param joinMe the task - */ - final void tryAwaitJoin(ForkJoinTask<?> joinMe) { - int s; - Thread.interrupted(); // clear interrupts before checking termination - if (joinMe.status >= 0) { - if (tryPreBlock()) { - joinMe.tryAwaitDone(0L); - postBlock(); - } - else if ((ctl & STOP_BIT) != 0L) - joinMe.cancelIgnoringExceptions(); - } + return stat; } /** - * Possibly blocks the given worker waiting for joinMe to - * complete or timeout + * If task is at base of some steal queue, steals and executes it. * - * @param joinMe the task - * @param millis the wait time for underlying Object.wait - */ - final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) { - while (joinMe.status >= 0) { - Thread.interrupted(); - if ((ctl & STOP_BIT) != 0L) { - joinMe.cancelIgnoringExceptions(); - break; - } - if (tryPreBlock()) { - long last = System.nanoTime(); - while (joinMe.status >= 0) { - long millis = TimeUnit.NANOSECONDS.toMillis(nanos); - if (millis <= 0) - break; - joinMe.tryAwaitDone(millis); - if (joinMe.status < 0) - break; - if ((ctl & STOP_BIT) != 0L) { - joinMe.cancelIgnoringExceptions(); - break; - } - long now = System.nanoTime(); - nanos -= now - last; - last = now; - } - postBlock(); - break; - } - } - } - - /** - * If necessary, compensates for blocker, and blocks - */ - private void awaitBlocker(ManagedBlocker blocker) - throws InterruptedException { - while (!blocker.isReleasable()) { - if (tryPreBlock()) { - try { - do {} while (!blocker.isReleasable() && !blocker.block()); - } finally { - postBlock(); - } - break; - } - } - } - - // Creating, registering and deregistring workers - - /** - * Tries to create and start a worker; minimally rolls back counts - * on failure. + * @param joiner the joining worker + * @param task the task */ - private void addWorker() { - Throwable ex = null; - ForkJoinWorkerThread t = null; - try { - t = factory.newThread(this); - } catch (Throwable e) { - ex = e; - } - if (t == null) { // null or exceptional factory return - long c; // adjust counts - do {} while (!UNSAFE.compareAndSwapLong - (this, ctlOffset, c = ctl, - (((c - AC_UNIT) & AC_MASK) | - ((c - TC_UNIT) & TC_MASK) | - (c & ~(AC_MASK|TC_MASK))))); - // Propagate exception if originating from an external caller - if (!tryTerminate(false) && ex != null && - !(Thread.currentThread() instanceof ForkJoinWorkerThread)) - UNSAFE.throwException(ex); - } - else - t.start(); - } - - /** - * Callback from ForkJoinWorkerThread constructor to assign a - * public name - */ - final String nextWorkerName() { - for (int n;;) { - if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset, - n = nextWorkerNumber, ++n)) - return workerNamePrefix + n; - } - } - - /** - * Callback from ForkJoinWorkerThread constructor to - * determine its poolIndex and record in workers array. - * - * @param w the worker - * @return the worker's pool index - */ - final int registerWorker(ForkJoinWorkerThread w) { - /* - * In the typical case, a new worker acquires the lock, uses - * next available index and returns quickly. Since we should - * not block callers (ultimately from signalWork or - * tryPreBlock) waiting for the lock needed to do this, we - * instead help release other workers while waiting for the - * lock. - */ - for (int g;;) { - ForkJoinWorkerThread[] ws; - if (((g = scanGuard) & SG_UNIT) == 0 && - UNSAFE.compareAndSwapInt(this, scanGuardOffset, - g, g | SG_UNIT)) { - int k = nextWorkerIndex; - try { - if ((ws = workers) != null) { // ignore on shutdown - int n = ws.length; - if (k < 0 || k >= n || ws[k] != null) { - for (k = 0; k < n && ws[k] != null; ++k) - ; - if (k == n) - ws = workers = Arrays.copyOf(ws, n << 1); - } - ws[k] = w; - nextWorkerIndex = k + 1; - int m = g & SMASK; - g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); - } - } finally { - scanGuard = g; - } - return k; - } - else if ((ws = workers) != null) { // help release others - for (ForkJoinWorkerThread u : ws) { - if (u != null && u.queueBase != u.queueTop) { - if (tryReleaseWaiter()) - break; - } + private void tryPollForAndExec(WorkQueue joiner, ForkJoinTask<?> task) { + WorkQueue[] ws; + if ((ws = workQueues) != null) { + for (int j = 1; j < ws.length && task.status >= 0; j += 2) { + WorkQueue q = ws[j]; + if (q != null && q.pollFor(task)) { + joiner.runSubtask(task); + break; } } } } /** - * Final callback from terminating worker. Removes record of - * worker from array, and adjusts counts. If pool is shutting - * down, tries to complete termination. + * Tries to decrement active count (sometimes implicitly) and + * possibly release or create a compensating worker in preparation + * for blocking. Fails on contention or termination. Otherwise, + * adds a new thread if no idle workers are available and either + * pool would become completely starved or: (at least half + * starved, and fewer than 50% spares exist, and there is at least + * one task apparently available). Even though the availability + * check requires a full scan, it is worthwhile in reducing false + * alarms. * - * @param w the worker + * @param task if non-null, a task being waited for + * @param blocker if non-null, a blocker being waited for + * @return true if the caller can block, else should recheck and retry */ - final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) { - int idx = w.poolIndex; - int sc = w.stealCount; - int steps = 0; - // Remove from array, adjust worker counts and collect steal count. - // We can intermix failed removes or adjusts with steal updates - do { - long s, c; - int g; - if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 && - UNSAFE.compareAndSwapInt(this, scanGuardOffset, - g, g |= SG_UNIT)) { - ForkJoinWorkerThread[] ws = workers; - if (ws != null && idx >= 0 && - idx < ws.length && ws[idx] == w) - ws[idx] = null; // verify - nextWorkerIndex = idx; - scanGuard = g + SG_UNIT; - steps = 1; - } - if (steps == 1 && - UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, - (((c - AC_UNIT) & AC_MASK) | - ((c - TC_UNIT) & TC_MASK) | - (c & ~(AC_MASK|TC_MASK))))) - steps = 2; - if (sc != 0 && - UNSAFE.compareAndSwapLong(this, stealCountOffset, - s = stealCount, s + sc)) - sc = 0; - } while (steps != 2 || sc != 0); - if (!tryTerminate(false)) { - if (ex != null) // possibly replace if died abnormally - signalWork(); - else - tryReleaseWaiter(); - } - } - - // Shutdown and termination - - /** - * Possibly initiates and/or completes termination. - * - * @param now if true, unconditionally terminate, else only - * if shutdown and empty queue and no active workers - * @return true if now terminating or terminated - */ - private boolean tryTerminate(boolean now) { - long c; - while (((c = ctl) & STOP_BIT) == 0) { - if (!now) { - if ((int)(c >> AC_SHIFT) != -parallelism) - return false; - if (!shutdown || blockedCount != 0 || quiescerCount != 0 || - queueBase != queueTop) { - if (ctl == c) // staleness check - return false; - continue; + final boolean tryCompensate(ForkJoinTask<?> task, ManagedBlocker blocker) { + int pc = parallelism, e; + long c = ctl; + WorkQueue[] ws = workQueues; + if ((e = (int)c) >= 0 && ws != null) { + int u, a, ac, hc; + int tc = (short)((u = (int)(c >>> 32)) >>> UTC_SHIFT) + pc; + boolean replace = false; + if ((a = u >> UAC_SHIFT) <= 0) { + if ((ac = a + pc) <= 1) + replace = true; + else if ((e > 0 || (task != null && + ac <= (hc = pc >>> 1) && tc < pc + hc))) { + WorkQueue w; + for (int j = 0; j < ws.length; ++j) { + if ((w = ws[j]) != null && !w.isEmpty()) { + replace = true; + break; // in compensation range and tasks available + } + } } } - if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) - startTerminating(); - } - if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers - final ReentrantLock lock = this.submissionLock; - lock.lock(); - try { - termination.signalAll(); - } finally { - lock.unlock(); + if ((task == null || task.status >= 0) && // recheck need to block + (blocker == null || !blocker.isReleasable()) && ctl == c) { + if (!replace) { // no compensation + long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); + if (U.compareAndSwapLong(this, CTL, c, nc)) + return true; + } + else if (e != 0) { // release an idle worker + WorkQueue w; Thread p; int i; + if ((i = e & SMASK) < ws.length && (w = ws[i]) != null) { + long nc = ((long)(w.nextWait & E_MASK) | + (c & (AC_MASK|TC_MASK))); + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + w.eventCount = (e + E_SEQ) & E_MASK; + if ((p = w.parker) != null) + U.unpark(p); + return true; + } + } + } + else if (tc < MAX_CAP) { // create replacement + long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); + if (U.compareAndSwapLong(this, CTL, c, nc)) { + addWorker(); + return true; + } + } } } - return true; + return false; } /** - * Runs up to three passes through workers: (0) Setting - * termination status for each worker, followed by wakeups up to - * queued workers; (1) helping cancel tasks; (2) interrupting - * lagging threads (likely in external tasks, but possibly also - * blocked in joins). Each pass repeats previous steps because of - * potential lagging thread creation. + * Helps and/or blocks until the given task is done. + * + * @param joiner the joining worker + * @param task the task + * @return task status on exit */ - private void startTerminating() { - cancelSubmissions(); - for (int pass = 0; pass < 3; ++pass) { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - for (ForkJoinWorkerThread w : ws) { - if (w != null) { - w.terminate = true; - if (pass > 0) { - w.cancelTasks(); - if (pass > 1 && !w.isInterrupted()) { - try { - w.interrupt(); - } catch (SecurityException ignore) { + final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { + int s; + if ((s = task.status) >= 0) { + ForkJoinTask<?> prevJoin = joiner.currentJoin; + joiner.currentJoin = task; + long startTime = 0L; + for (int k = 0;;) { + if ((s = (joiner.isEmpty() ? // try to help + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task))) == 0 && + (s = task.status) >= 0) { + if (k == 0) { + startTime = System.nanoTime(); + tryPollForAndExec(joiner, task); // check uncommon case + } + else if ((k & (MAX_HELP - 1)) == 0 && + System.nanoTime() - startTime >= + COMPENSATION_DELAY && + tryCompensate(task, null)) { + if (task.trySetSignal()) { + synchronized (task) { + if (task.status >= 0) { + try { // see ForkJoinTask + task.wait(); // for explanation + } catch (InterruptedException ie) { + } } + else + task.notifyAll(); } } + long c; // re-activate + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); } } - terminateWaiters(); + if (s < 0 || (s = task.status) < 0) { + joiner.currentJoin = prevJoin; + break; + } + else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1) + Thread.yield(); // for politeness + } + } + return s; + } + + /** + * Stripped-down variant of awaitJoin used by timed joins. Tries + * to help join only while there is continuous progress. (Caller + * will then enter a timed wait.) + * + * @param joiner the joining worker + * @param task the task + * @return task status on exit + */ + final int helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { + int s; + while ((s = task.status) >= 0 && + (joiner.isEmpty() ? + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task)) != 0) + ; + return s; + } + + /** + * Returns a (probably) non-empty steal queue, if one is found + * during a random, then cyclic scan, else null. This method must + * be retried by caller if, by the time it tries to use the queue, + * it is empty. + */ + private WorkQueue findNonEmptyStealQueue(WorkQueue w) { + // Similar to loop in scan(), but ignoring submissions + int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + int step = (r >>> 16) | 1; + for (WorkQueue[] ws;;) { + int rs = runState, m; + if ((ws = workQueues) == null || (m = ws.length - 1) < 1) + return null; + for (int j = (m + 1) << 2; ; r += step) { + WorkQueue q = ws[((r << 1) | 1) & m]; + if (q != null && !q.isEmpty()) + return q; + else if (--j < 0) { + if (runState == rs) + return null; + break; + } } } } + /** - * Polls and cancels all submissions. Called only during termination. + * Runs tasks until {@code isQuiescent()}. We piggyback on + * active count ctl maintenance, but rather than blocking + * when tasks cannot be found, we rescan until all others cannot + * find tasks either. */ - private void cancelSubmissions() { - while (queueBase != queueTop) { - ForkJoinTask<?> task = pollSubmission(); - if (task != null) { - try { - task.cancel(false); - } catch (Throwable ignore) { + final void helpQuiescePool(WorkQueue w) { + for (boolean active = true;;) { + ForkJoinTask<?> localTask; // exhaust local queue + while ((localTask = w.nextLocalTask()) != null) + localTask.doExec(); + WorkQueue q = findNonEmptyStealQueue(w); + if (q != null) { + ForkJoinTask<?> t; int b; + if (!active) { // re-establish active count + long c; + active = true; + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); + } + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + w.runSubtask(t); + } + else { + long c; + if (active) { // decrement active count without queuing + active = false; + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c -= AC_UNIT)); + } + else + c = ctl; // re-increment on exit + if ((int)(c >> AC_SHIFT) + parallelism == 0) { + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); + break; } } } } /** - * Tries to set the termination status of waiting workers, and - * then wakes them up (after which they will terminate). + * Gets and removes a local or stolen task for the given worker. + * + * @return a task, if available */ - private void terminateWaiters() { - ForkJoinWorkerThread[] ws = workers; - if (ws != null) { - ForkJoinWorkerThread w; long c; int i, e; - int n = ws.length; - while ((i = ~(e = (int)(c = ctl)) & SMASK) < n && - (w = ws[i]) != null && w.eventCount == (e & E_MASK)) { - if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, - (long)(w.nextWait & E_MASK) | - ((c + AC_UNIT) & AC_MASK) | - (c & (TC_MASK|STOP_BIT)))) { - w.terminate = true; - w.eventCount = e + EC_UNIT; - if (w.parked) - UNSAFE.unpark(w); - } - } + final ForkJoinTask<?> nextTaskFor(WorkQueue w) { + for (ForkJoinTask<?> t;;) { + WorkQueue q; int b; + if ((t = w.nextLocalTask()) != null) + return t; + if ((q = findNonEmptyStealQueue(w)) == null) + return null; + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + return t; } } - // misc ForkJoinWorkerThread support - - /** - * Increment or decrement quiescerCount. Needed only to prevent - * triggering shutdown if a worker is transiently inactive while - * checking quiescence. - * - * @param delta 1 for increment, -1 for decrement - */ - final void addQuiescerCount(int delta) { - int c; - do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, - c = quiescerCount, c + delta)); - } - - /** - * Directly increment or decrement active count without - * queuing. This method is used to transiently assert inactivation - * while checking quiescence. - * - * @param delta 1 for increment, -1 for decrement - */ - final void addActiveCount(int delta) { - long d = delta < 0 ? -AC_UNIT : AC_UNIT; - long c; - do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, - ((c + d) & AC_MASK) | - (c & ~AC_MASK))); - } - /** * Returns the approximate (non-atomic) number of idle threads per - * active thread. + * active thread to offset steal queue size for method + * ForkJoinTask.getSurplusQueuedTaskCount(). */ final int idlePerActive() { // Approximate at powers of two for small values, saturate past 4 @@ -1395,6 +2020,91 @@ 8); } + // Termination + + /** + * Possibly initiates and/or completes termination. The caller + * triggering termination runs three passes through workQueues: + * (0) Setting termination status, followed by wakeups of queued + * workers; (1) cancelling all tasks; (2) interrupting lagging + * threads (likely in external tasks, but possibly also blocked in + * joins). Each pass repeats previous steps because of potential + * lagging thread creation. + * + * @param now if true, unconditionally terminate, else only + * if no work and no active workers + * @param enable if true, enable shutdown when next possible + * @return true if now terminating or terminated + */ + private boolean tryTerminate(boolean now, boolean enable) { + Mutex lock = this.lock; + for (long c;;) { + if (((c = ctl) & STOP_BIT) != 0) { // already terminating + if ((short)(c >>> TC_SHIFT) == -parallelism) { + lock.lock(); // don't need try/finally + termination.signalAll(); // signal when 0 workers + lock.unlock(); + } + return true; + } + if (runState >= 0) { // not yet enabled + if (!enable) + return false; + lock.lock(); + runState |= SHUTDOWN; + lock.unlock(); + } + if (!now) { // check if idle & no tasks + if ((int)(c >> AC_SHIFT) != -parallelism || + hasQueuedSubmissions()) + return false; + // Check for unqueued inactive workers. One pass suffices. + WorkQueue[] ws = workQueues; WorkQueue w; + if (ws != null) { + for (int i = 1; i < ws.length; i += 2) { + if ((w = ws[i]) != null && w.eventCount >= 0) + return false; + } + } + } + if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { + for (int pass = 0; pass < 3; ++pass) { + WorkQueue[] ws = workQueues; + if (ws != null) { + WorkQueue w; + int n = ws.length; + for (int i = 0; i < n; ++i) { + if ((w = ws[i]) != null) { + w.runState = -1; + if (pass > 0) { + w.cancelAll(); + if (pass > 1) + w.interruptOwner(); + } + } + } + // Wake up workers parked on event queue + int i, e; long cc; Thread p; + while ((e = (int)(cc = ctl) & E_MASK) != 0 && + (i = e & SMASK) < n && + (w = ws[i]) != null) { + long nc = ((long)(w.nextWait & E_MASK) | + ((cc + AC_UNIT) & AC_MASK) | + (cc & (TC_MASK|STOP_BIT))); + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, cc, nc)) { + w.eventCount = (e + E_SEQ) & E_MASK; + w.runState = -1; + if ((p = w.parker) != null) + U.unpark(p); + } + } + } + } + } + } + } + // Exported methods // Constructors @@ -1464,29 +2174,31 @@ checkPermission(); if (factory == null) throw new NullPointerException(); - if (parallelism <= 0 || parallelism > MAX_ID) + if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); this.parallelism = parallelism; this.factory = factory; this.ueh = handler; - this.locallyFifo = asyncMode; + this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; - // initialize workers array with room for 2*parallelism if possible - int n = parallelism << 1; - if (n >= MAX_ID) - n = MAX_ID; - else { // See Hackers Delight, sec 3.2, where n < (1 << 16) - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; - } - workers = new ForkJoinWorkerThread[n + 1]; - this.submissionLock = new ReentrantLock(); - this.termination = submissionLock.newCondition(); + // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2. + int n = parallelism - 1; + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; + int size = (n + 1) << 1; // #slots = 2*#workers + this.submitMask = size - 1; // room for max # of submit queues + this.workQueues = new WorkQueue[size]; + this.termination = (this.lock = new Mutex()).newCondition(); + this.stealCount = new AtomicLong(); + this.nextWorkerNumber = new AtomicInteger(); + int pn = poolNumberGenerator.incrementAndGet(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); - sb.append(poolNumberGenerator.incrementAndGet()); + sb.append(Integer.toString(pn)); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); + lock.lock(); + this.runState = 1; // set init flag + lock.unlock(); } // Execution methods @@ -1508,34 +2220,10 @@ * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { - Thread t = Thread.currentThread(); if (task == null) throw new NullPointerException(); - if (shutdown) - throw new RejectedExecutionException(); - if ((t instanceof ForkJoinWorkerThread) && - ((ForkJoinWorkerThread)t).pool == this) - return task.invoke(); // bypass submit if in same pool - else { - addSubmission(task); - return task.join(); - } - } - - /** - * Unless terminating, forks task if within an ongoing FJ - * computation in the current pool, else submits as external task. - */ - private <T> void forkOrSubmit(ForkJoinTask<T> task) { - ForkJoinWorkerThread w; - Thread t = Thread.currentThread(); - if (shutdown) - throw new RejectedExecutionException(); - if ((t instanceof ForkJoinWorkerThread) && - (w = (ForkJoinWorkerThread)t).pool == this) - w.pushTask(task); - else - addSubmission(task); + doSubmit(task); + return task.join(); } /** @@ -1549,7 +2237,7 @@ public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); - forkOrSubmit(task); + doSubmit(task); } // AbstractExecutorService methods @@ -1566,8 +2254,8 @@ if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else - job = ForkJoinTask.adapt(task, null); - forkOrSubmit(job); + job = new ForkJoinTask.AdaptedRunnableAction(task); + doSubmit(job); } /** @@ -1582,7 +2270,7 @@ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); - forkOrSubmit(task); + doSubmit(task); return task; } @@ -1592,10 +2280,8 @@ * scheduled for execution */ public <T> ForkJoinTask<T> submit(Callable<T> task) { - if (task == null) - throw new NullPointerException(); - ForkJoinTask<T> job = ForkJoinTask.adapt(task); - forkOrSubmit(job); + ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); + doSubmit(job); return job; } @@ -1605,10 +2291,8 @@ * scheduled for execution */ public <T> ForkJoinTask<T> submit(Runnable task, T result) { - if (task == null) - throw new NullPointerException(); - ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); - forkOrSubmit(job); + ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); + doSubmit(job); return job; } @@ -1624,8 +2308,8 @@ if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else - job = ForkJoinTask.adapt(task, null); - forkOrSubmit(job); + job = new ForkJoinTask.AdaptedRunnableAction(task); + doSubmit(job); return job; } @@ -1634,25 +2318,31 @@ * @throws RejectedExecutionException {@inheritDoc} */ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { - ArrayList<ForkJoinTask<T>> forkJoinTasks = - new ArrayList<ForkJoinTask<T>>(tasks.size()); - for (Callable<T> task : tasks) - forkJoinTasks.add(ForkJoinTask.adapt(task)); - invoke(new InvokeAll<T>(forkJoinTasks)); - + // In previous versions of this class, this method constructed + // a task to run ForkJoinTask.invokeAll, but now external + // invocation of multiple tasks is at least as efficient. + List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size()); + // Workaround needed because method wasn't declared with + // wildcards in return type but should have been. @SuppressWarnings({"unchecked", "rawtypes"}) - List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks; - return futures; - } + List<Future<T>> futures = (List<Future<T>>) (List) fs; - static final class InvokeAll<T> extends RecursiveAction { - final ArrayList<ForkJoinTask<T>> tasks; - InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; } - public void compute() { - try { invokeAll(tasks); } - catch (Exception ignore) {} + boolean done = false; + try { + for (Callable<T> t : tasks) { + ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); + doSubmit(f); + fs.add(f); + } + for (ForkJoinTask<T> f : fs) + f.quietlyJoin(); + done = true; + return futures; + } finally { + if (!done) + for (ForkJoinTask<T> f : fs) + f.cancel(false); } - private static final long serialVersionUID = -7914297376763021607L; } /** @@ -1702,7 +2392,7 @@ * @return {@code true} if this pool uses async mode */ public boolean getAsyncMode() { - return locallyFifo; + return localMode != 0; } /** @@ -1714,8 +2404,15 @@ * @return the number of worker threads */ public int getRunningThreadCount() { - int r = parallelism + (int)(ctl >> AC_SHIFT); - return (r <= 0) ? 0 : r; // suppress momentarily negative values + int rc = 0; + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 1; i < ws.length; i += 2) { + if ((w = ws[i]) != null && w.isApparentlyUnblocked()) + ++rc; + } + } + return rc; } /** @@ -1726,7 +2423,7 @@ * @return the number of active threads */ public int getActiveThreadCount() { - int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; + int r = parallelism + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; // suppress momentarily negative values } @@ -1742,7 +2439,7 @@ * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { - return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0; + return (int)(ctl >> AC_SHIFT) + parallelism == 0; } /** @@ -1757,7 +2454,15 @@ * @return the number of steals */ public long getStealCount() { - return stealCount; + long count = stealCount.get(); + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 1; i < ws.length; i += 2) { + if ((w = ws[i]) != null) + count += w.totalSteals; + } + } + return count; } /** @@ -1772,12 +2477,12 @@ */ public long getQueuedTaskCount() { long count = 0; - ForkJoinWorkerThread[] ws; - if ((short)(ctl >>> TC_SHIFT) > -parallelism && - (ws = workers) != null) { - for (ForkJoinWorkerThread w : ws) - if (w != null) - count -= w.queueBase - w.queueTop; // must read base first + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 1; i < ws.length; i += 2) { + if ((w = ws[i]) != null) + count += w.queueSize(); + } } return count; } @@ -1790,7 +2495,15 @@ * @return the number of queued submissions */ public int getQueuedSubmissionCount() { - return -queueBase + queueTop; + int count = 0; + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; i += 2) { + if ((w = ws[i]) != null) + count += w.queueSize(); + } + } + return count; } /** @@ -1800,7 +2513,14 @@ * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { - return queueBase != queueTop; + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; i += 2) { + if ((w = ws[i]) != null && !w.isEmpty()) + return true; + } + } + return false; } /** @@ -1811,16 +2531,11 @@ * @return the next submission, or {@code null} if none */ protected ForkJoinTask<?> pollSubmission() { - ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; - while ((b = queueBase) != queueTop && - (q = submissionQueue) != null && - (i = (q.length - 1) & b) >= 0) { - long u = (i << ASHIFT) + ABASE; - if ((t = q[i]) != null && - queueBase == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - queueBase = b + 1; - return t; + WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; i += 2) { + if ((w = ws[i]) != null && (t = w.poll()) != null) + return t; } } return null; @@ -1845,20 +2560,17 @@ */ protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { int count = 0; - while (queueBase != queueTop) { - ForkJoinTask<?> t = pollSubmission(); - if (t != null) { - c.add(t); - ++count; + WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; ++i) { + if ((w = ws[i]) != null) { + while ((t = w.poll()) != null) { + c.add(t); + ++count; + } + } } } - ForkJoinWorkerThread[] ws; - if ((short)(ctl >>> TC_SHIFT) > -parallelism && - (ws = workers) != null) { - for (ForkJoinWorkerThread w : ws) - if (w != null) - count += w.drainTasksTo(c); - } return count; } @@ -1870,21 +2582,36 @@ * @return a string identifying this pool, as well as its state */ public String toString() { - long st = getStealCount(); - long qt = getQueuedTaskCount(); - long qs = getQueuedSubmissionCount(); - int pc = parallelism; + // Use a single pass through workQueues to collect counts + long qt = 0L, qs = 0L; int rc = 0; + long st = stealCount.get(); long c = ctl; + WorkQueue[] ws; WorkQueue w; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; ++i) { + if ((w = ws[i]) != null) { + int size = w.queueSize(); + if ((i & 1) == 0) + qs += size; + else { + qt += size; + st += w.totalSteals; + if (w.isApparentlyUnblocked()) + ++rc; + } + } + } + } + int pc = parallelism; int tc = pc + (short)(c >>> TC_SHIFT); - int rc = pc + (int)(c >> AC_SHIFT); - if (rc < 0) // ignore transient negative - rc = 0; - int ac = rc + blockedCount; + int ac = pc + (int)(c >> AC_SHIFT); + if (ac < 0) // ignore transient negative + ac = 0; String level; if ((c & STOP_BIT) != 0) level = (tc == 0) ? "Terminated" : "Terminating"; else - level = shutdown ? "Shutting down" : "Running"; + level = runState < 0 ? "Shutting down" : "Running"; return super.toString() + "[" + level + ", parallelism = " + pc + @@ -1911,8 +2638,7 @@ */ public void shutdown() { checkPermission(); - shutdown = true; - tryTerminate(false); + tryTerminate(false, true); } /** @@ -1933,8 +2659,7 @@ */ public List<Runnable> shutdownNow() { checkPermission(); - shutdown = true; - tryTerminate(true); + tryTerminate(true, true); return Collections.emptyList(); } @@ -1969,19 +2694,12 @@ } /** - * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. - */ - final boolean isAtLeastTerminating() { - return (ctl & STOP_BIT) != 0L; - } - - /** * Returns {@code true} if this pool has been shut down. * * @return {@code true} if this pool has been shut down */ public boolean isShutdown() { - return shutdown; + return runState < 0; } /** @@ -1998,7 +2716,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); - final ReentrantLock lock = this.submissionLock; + final Mutex lock = this.lock; lock.lock(); try { for (;;) { @@ -2109,12 +2827,18 @@ public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - w.pool.awaitBlocker(blocker); - } - else { - do {} while (!blocker.isReleasable() && !blocker.block()); + ForkJoinPool p = ((t instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).pool : null); + while (!blocker.isReleasable()) { + if (p == null || p.tryCompensate(null, blocker)) { + try { + do {} while (!blocker.isReleasable() && !blocker.block()); + } finally { + if (p != null) + p.incrementActiveCount(); + } + break; + } } } @@ -2123,49 +2847,39 @@ // implement RunnableFuture. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { - return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value); + return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return (RunnableFuture<T>) ForkJoinTask.adapt(callable); + return new ForkJoinTask.AdaptedCallable<T>(callable); } // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long ctlOffset; - private static final long stealCountOffset; - private static final long blockedCountOffset; - private static final long quiescerCountOffset; - private static final long scanGuardOffset; - private static final long nextWorkerNumberOffset; - private static final long ABASE; + private static final sun.misc.Unsafe U; + private static final long CTL; + private static final long PARKBLOCKER; + private static final int ABASE; private static final int ASHIFT; static { poolNumberGenerator = new AtomicInteger(); - workerSeedGenerator = new Random(); + nextSubmitterSeed = new AtomicInteger(0x55555555); modifyThreadPermission = new RuntimePermission("modifyThread"); defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); + submitters = new ThreadSubmitter(); int s; try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); + U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ForkJoinPool.class; - ctlOffset = UNSAFE.objectFieldOffset + Class<?> ak = ForkJoinTask[].class; + CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); - stealCountOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("stealCount")); - blockedCountOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("blockedCount")); - quiescerCountOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("quiescerCount")); - scanGuardOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("scanGuard")); - nextWorkerNumberOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("nextWorkerNumber")); - Class<?> a = ForkJoinTask[].class; - ABASE = UNSAFE.arrayBaseOffset(a); - s = UNSAFE.arrayIndexScale(a); + Class<?> tk = Thread.class; + PARKBLOCKER = U.objectFieldOffset + (tk.getDeclaredField("parkBlocker")); + ABASE = U.arrayBaseOffset(ak); + s = U.arrayIndexScale(ak); } catch (Exception e) { throw new Error(e); }
--- a/src/share/classes/java/util/concurrent/ForkJoinTask.java Mon Aug 20 17:14:26 2012 -0700 +++ b/src/share/classes/java/util/concurrent/ForkJoinTask.java Wed Aug 22 18:22:03 2012 -0700 @@ -37,17 +37,13 @@ import java.io.Serializable; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.RandomAccess; -import java.util.Map; import java.lang.ref.WeakReference; import java.lang.ref.ReferenceQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; @@ -76,33 +72,43 @@ * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. * The efficiency of {@code ForkJoinTask}s stems from a set of * restrictions (that are only partially statically enforceable) - * reflecting their intended use as computational tasks calculating - * pure functions or operating on purely isolated objects. The - * primary coordination mechanisms are {@link #fork}, that arranges + * reflecting their main use as computational tasks calculating pure + * functions or operating on purely isolated objects. The primary + * coordination mechanisms are {@link #fork}, that arranges * asynchronous execution, and {@link #join}, that doesn't proceed * until the task's result has been computed. Computations should - * avoid {@code synchronized} methods or blocks, and should minimize - * other blocking synchronization apart from joining other tasks or - * using synchronizers such as Phasers that are advertised to - * cooperate with fork/join scheduling. Tasks should also not perform - * blocking IO, and should ideally access variables that are - * completely independent of those accessed by other running - * tasks. Minor breaches of these restrictions, for example using - * shared output streams, may be tolerable in practice, but frequent - * use may result in poor performance, and the potential to - * indefinitely stall if the number of threads not waiting for IO or - * other external synchronization becomes exhausted. This usage - * restriction is in part enforced by not permitting checked - * exceptions such as {@code IOExceptions} to be thrown. However, - * computations may still encounter unchecked exceptions, that are - * rethrown to callers attempting to join them. These exceptions may - * additionally include {@link RejectedExecutionException} stemming - * from internal resource exhaustion, such as failure to allocate - * internal task queues. Rethrown exceptions behave in the same way as - * regular exceptions, but, when possible, contain stack traces (as - * displayed for example using {@code ex.printStackTrace()}) of both - * the thread that initiated the computation as well as the thread - * actually encountering the exception; minimally only the latter. + * ideally avoid {@code synchronized} methods or blocks, and should + * minimize other blocking synchronization apart from joining other + * tasks or using synchronizers such as Phasers that are advertised to + * cooperate with fork/join scheduling. Subdividable tasks should also + * not perform blocking IO, and should ideally access variables that + * are completely independent of those accessed by other running + * tasks. These guidelines are loosely enforced by not permitting + * checked exceptions such as {@code IOExceptions} to be + * thrown. However, computations may still encounter unchecked + * exceptions, that are rethrown to callers attempting to join + * them. These exceptions may additionally include {@link + * RejectedExecutionException} stemming from internal resource + * exhaustion, such as failure to allocate internal task + * queues. Rethrown exceptions behave in the same way as regular + * exceptions, but, when possible, contain stack traces (as displayed + * for example using {@code ex.printStackTrace()}) of both the thread + * that initiated the computation as well as the thread actually + * encountering the exception; minimally only the latter. + * + * <p>It is possible to define and use ForkJoinTasks that may block, + * but doing do requires three further considerations: (1) Completion + * of few if any <em>other</em> tasks should be dependent on a task + * that blocks on external synchronization or IO. Event-style async + * tasks that are never joined (for example, those subclassing {@link + * CountedCompleter}) often fall into this category. (2) To minimize + * resource impact, tasks should be small; ideally performing only the + * (possibly) blocking action. (3) Unless the {@link + * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly + * blocked tasks is known to be less than the pool's {@link + * ForkJoinPool#getParallelism} level, the pool cannot guarantee that + * enough threads will be available to ensure progress or good + * performance. * * <p>The primary method for awaiting completion and extracting * results of a task is {@link #join}, but there are several variants: @@ -118,6 +124,13 @@ * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * + * <p>In the most typical usages, a fork-join pair act like a call + * (fork) and return (join) from a parallel recursive function. As is + * the case with other forms of recursive calls, returns (joins) + * should be performed innermost-first. For example, {@code a.fork(); + * b.fork(); b.join(); a.join();} is likely to be substantially more + * efficient than joining {@code a} before {@code b}. + * * <p>The execution status of tasks may be queried at several levels * of detail: {@link #isDone} is true if a task completed in any way * (including the case where a task was cancelled without executing); @@ -133,18 +146,19 @@ * <p>The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. While these methods have + * {@code public} access (to allow instances of different task + * subclasses to call each other's methods), some of them may only be + * called from within other ForkJoinTasks (as may be determined using + * method {@link #inForkJoinPool}). Attempts to invoke them in other + * contexts result in exceptions or errors, possibly including {@code + * ClassCastException}. * * <p>Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -154,7 +168,17 @@ * supports other methods and techniques (for example the use of * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that - * are not statically structured as DAGs. + * are not statically structured as DAGs. To support such usages a + * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} + * value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use + * these {@code protected} methods or tags for any purpose, but they + * may be of use in the construction of specialized subclasses. For + * example, parallel graph traversals can use the supplied methods to + * avoid revisiting nodes/tasks that have already been processed. + * (Method names for tagging are bulky in part to encourage definition + * of methods that reflect their usage patterns.) * * <p>Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -194,41 +218,50 @@ * See the internal documentation of class ForkJoinPool for a * general implementation overview. ForkJoinTasks are mainly * responsible for maintaining their "status" field amidst relays - * to methods in ForkJoinWorkerThread and ForkJoinPool. The - * methods of this class are more-or-less layered into (1) basic - * status maintenance (2) execution and awaiting completion (3) - * user-level methods that additionally report results. This is - * sometimes hard to see because this file orders exported methods - * in a way that flows well in javadocs. + * to methods in ForkJoinWorkerThread and ForkJoinPool. + * + * The methods of this class are more-or-less layered into + * (1) basic status maintenance + * (2) execution and awaiting completion + * (3) user-level methods that additionally report results. + * This is sometimes hard to see because this file orders exported + * methods in a way that flows well in javadocs. */ /* * The status field holds run control status bits packed into a * single int to minimize footprint and to ensure atomicity (via * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status holds value - * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking - * waits by other threads have the SIGNAL bit set. Completion of - * a stolen task with SIGNAL set awakens any waiters via - * notifyAll. Even though suboptimal for some purposes, we use - * basic builtin wait/notify to take advantage of "monitor - * inflation" in JVMs that we would otherwise need to emulate to - * avoid adding further per-task bookkeeping overhead. We want - * these monitors to be "fat", i.e., not use biasing or thin-lock - * techniques, so use some odd coding idioms that tend to avoid - * them. + * values until completed, upon which status (anded with + * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks + * undergoing blocking waits by other threads have the SIGNAL bit + * set. Completion of a stolen task with SIGNAL set awakens any + * waiters via notifyAll. Even though suboptimal for some + * purposes, we use basic builtin wait/notify to take advantage of + * "monitor inflation" in JVMs that we would otherwise need to + * emulate to avoid adding further per-task bookkeeping overhead. + * We want these monitors to be "fat", i.e., not use biasing or + * thin-lock techniques, so use some odd coding idioms that tend + * to avoid them, mainly by arranging that every synchronized + * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ volatile int status; // accessed directly by pool and workers - private static final int NORMAL = -1; - private static final int CANCELLED = -2; - private static final int EXCEPTIONAL = -3; - private static final int SIGNAL = 1; + static final int DONE_MASK = 0xf0000000; // mask out non-completion bits + static final int NORMAL = 0xf0000000; // must be negative + static final int CANCELLED = 0xc0000000; // must be < NORMAL + static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** - * Marks completion and wakes up threads waiting to join this task, - * also clearing signal request bits. + * Marks completion and wakes up threads waiting to join this + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -237,8 +270,8 @@ for (int s;;) { if ((s = status) < 0) return s; - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) { - if (s != 0) + if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -246,27 +279,36 @@ } /** - * Tries to block a worker thread until completed or timed out. - * Uses Object.wait time argument conventions. - * May fail on contention or interrupt. + * Primary execution method for stolen tasks. Unless done, calls + * exec and records status if completed, but doesn't wait for + * completion otherwise. * - * @param millis if > 0, wait time. + * @return status on exit from this method */ - final void tryAwaitDone(long millis) { - int s; - try { - if (((s = status) > 0 || - (s == 0 && - UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) && - status > 0) { - synchronized (this) { - if (status > 0) - wait(millis); - } + final int doExec() { + int s; boolean completed; + if ((s = status) >= 0) { + try { + completed = exec(); + } catch (Throwable rex) { + return setExceptionalCompletion(rex); } - } catch (InterruptedException ie) { - // caller must check termination + if (completed) + s = setCompletion(NORMAL); } + return s; + } + + /** + * Tries to set SIGNAL status unless already completed. Used by + * ForkJoinPool. Other variants are directly incorporated into + * externalAwaitDone etc. + * + * @return true if successful + */ + final boolean trySetSignal() { + int s = status; + return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); } /** @@ -274,48 +316,42 @@ * @return status upon completion */ private int externalAwaitDone() { + boolean interrupted = false; int s; - if ((s = status) >= 0) { - boolean interrupted = false; - synchronized (this) { - while ((s = status) >= 0) { - if (s == 0) - UNSAFE.compareAndSwapInt(this, statusOffset, - 0, SIGNAL); - else { + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + else + notifyAll(); } } - if (interrupted) - Thread.currentThread().interrupt(); } + if (interrupted) + Thread.currentThread().interrupt(); return s; } /** - * Blocks a non-worker-thread until completion or interruption or timeout. + * Blocks a non-worker-thread until completion or interruption. */ - private int externalInterruptibleAwaitDone(long millis) - throws InterruptedException { + private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0) { - synchronized (this) { - while ((s = status) >= 0) { - if (s == 0) - UNSAFE.compareAndSwapInt(this, statusOffset, - 0, SIGNAL); - else { - wait(millis); - if (millis > 0L) - break; - } + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(); + else + notifyAll(); } } } @@ -323,65 +359,41 @@ } /** - * Primary execution method for stolen tasks. Unless done, calls - * exec and records status if completed, but doesn't wait for - * completion otherwise. - */ - final void doExec() { - if (status >= 0) { - boolean completed; - try { - completed = exec(); - } catch (Throwable rex) { - setExceptionalCompletion(rex); - return; - } - if (completed) - setCompletion(NORMAL); // must be outside try block - } - } - - /** - * Primary mechanics for join, get, quietlyJoin. + * Implementation for join, get, quietlyJoin. Directly handles + * only cases of already-completed, external wait, and + * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. + * * @return status upon completion */ private int doJoin() { - Thread t; ForkJoinWorkerThread w; int s; boolean completed; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - if ((s = status) < 0) - return s; - if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { - try { - completed = exec(); - } catch (Throwable rex) { - return setExceptionalCompletion(rex); - } - if (completed) - return setCompletion(NORMAL); + int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; + if ((s = status) >= 0) { + if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { + if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). + tryUnpush(this) || (s = doExec()) >= 0) + s = wt.pool.awaitJoin(w, this); } - return w.joinTask(this); + else + s = externalAwaitDone(); } - else - return externalAwaitDone(); + return s; } /** - * Primary mechanics for invoke, quietlyInvoke. + * Implementation for invoke, quietlyInvoke. + * * @return status upon completion */ private int doInvoke() { - int s; boolean completed; - if ((s = status) < 0) - return s; - try { - completed = exec(); - } catch (Throwable rex) { - return setExceptionalCompletion(rex); + int s; Thread t; ForkJoinWorkerThread wt; + if ((s = doExec()) >= 0) { + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, + this); + else + s = externalAwaitDone(); } - if (completed) - return setCompletion(NORMAL); - else - return doJoin(); + return s; } // Exception table support @@ -416,7 +428,7 @@ * any ForkJoinPool will call helpExpungeStaleExceptions when its * pool becomes isQuiescent. */ - static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{ + static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles @@ -429,30 +441,67 @@ } /** - * Records exception and sets exceptional completion. + * Records exception and sets status. + * + * @return status on exit + */ + final int recordExceptionalCompletion(Throwable ex) { + int s; + if ((s = status) >= 0) { + int h = System.identityHashCode(this); + final ReentrantLock lock = exceptionTableLock; + lock.lock(); + try { + expungeStaleExceptions(); + ExceptionNode[] t = exceptionTable; + int i = h & (t.length - 1); + for (ExceptionNode e = t[i]; ; e = e.next) { + if (e == null) { + t[i] = new ExceptionNode(this, ex, t[i]); + break; + } + if (e.get() == this) // already present + break; + } + } finally { + lock.unlock(); + } + s = setCompletion(EXCEPTIONAL); + } + return s; + } + + /** + * Records exception and possibly propagates * * @return status on exit */ private int setExceptionalCompletion(Throwable ex) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i]); - break; - } - if (e.get() == this) // already present - break; + int s = recordExceptionalCompletion(ex); + if ((s & DONE_MASK) == EXCEPTIONAL) + internalPropagateException(ex); + return s; + } + + /** + * Hook for exception propagation support for tasks with completers. + */ + void internalPropagateException(Throwable ex) { + } + + /** + * Cancels, ignoring any exceptions thrown by cancel. Used during + * worker and pool shutdown. Cancel is spec'ed not to throw any + * exceptions, but if it does anyway, we have no recourse during + * shutdown, so guard against this case. + */ + static final void cancelIgnoringExceptions(ForkJoinTask<?> t) { + if (t != null && t.status >= 0) { + try { + t.cancel(false); + } catch (Throwable ignore) { } - } finally { - lock.unlock(); } - return setCompletion(EXCEPTIONAL); } /** @@ -501,7 +550,7 @@ * @return the exception, or null if none */ private Throwable getThrowableException() { - if (status != EXCEPTIONAL) + if ((status & DONE_MASK) != EXCEPTIONAL) return null; int h = System.identityHashCode(this); ExceptionNode e; @@ -519,7 +568,7 @@ Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (e.thrower != Thread.currentThread().getId()) { + if (false && e.thrower != Thread.currentThread().getId()) { Class<? extends Throwable> ec = ex.getClass(); try { Constructor<?> noArgCtor = null; @@ -586,16 +635,14 @@ } /** - * Report the result of invoke or join; called only upon - * non-normal return of internal versions. + * Throws exception, if any, associated with the given status. */ - private V reportResult() { - int s; Throwable ex; - if ((s = status) == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) - UNSAFE.throwException(ex); - return getRawResult(); + private void reportException(int s) { + Throwable ex = ((s == CANCELLED) ? new CancellationException() : + (s == EXCEPTIONAL) ? getThrowableException() : + null); + if (ex != null) + U.throwException(ex); } // public methods @@ -619,8 +666,7 @@ * @return {@code this}, to simplify usage */ public final ForkJoinTask<V> fork() { - ((ForkJoinWorkerThread) Thread.currentThread()) - .pushTask(this); + ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this); return this; } @@ -636,10 +682,10 @@ * @return the computed result */ public final V join() { - if (doJoin() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doJoin() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -651,10 +697,10 @@ * @return the computed result */ public final V invoke() { - if (doInvoke() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doInvoke() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -681,9 +727,12 @@ * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { + int s1, s2; t2.fork(); - t1.invoke(); - t2.join(); + if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + t1.reportException(s1); + if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + t2.reportException(s2); } /** @@ -726,12 +775,12 @@ if (t != null) { if (ex != null) t.cancel(false); - else if (t.doJoin() < NORMAL && ex == null) + else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) - UNSAFE.throwException(ex); + U.throwException(ex); } /** @@ -783,12 +832,12 @@ if (t != null) { if (ex != null) t.cancel(false); - else if (t.doJoin() < NORMAL && ex == null) + else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) - UNSAFE.throwException(ex); + U.throwException(ex); return tasks; } @@ -820,20 +869,7 @@ * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return setCompletion(CANCELLED) == CANCELLED; - } - - /** - * Cancels, ignoring any exceptions thrown by cancel. Used during - * worker and pool shutdown. Cancel is spec'ed not to throw any - * exceptions, but if it does anyway, we have no recourse during - * shutdown, so guard against this case. - */ - final void cancelIgnoringExceptions() { - try { - cancel(false); - } catch (Throwable ignore) { - } + return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { @@ -841,7 +877,7 @@ } public final boolean isCancelled() { - return status == CANCELLED; + return (status & DONE_MASK) == CANCELLED; } /** @@ -861,7 +897,7 @@ * exception and was not cancelled */ public final boolean isCompletedNormally() { - return status == NORMAL; + return (status & DONE_MASK) == NORMAL; } /** @@ -872,7 +908,7 @@ * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; + int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); @@ -922,6 +958,18 @@ } /** + * Completes this task normally without setting a value. The most + * recent value established by {@link #setRawResult} (or {@code + * null} by default) will be returned as the result of subsequent + * invocations of {@code join} and related operations. + * + * @since 1.8 + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -934,9 +982,9 @@ */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(0L); + doJoin() : externalInterruptibleAwaitDone(); Throwable ex; - if (s == CANCELLED) + if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); @@ -959,32 +1007,60 @@ */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - long nanos = unit.toNanos(timeout); - if (status >= 0) { - boolean completed = false; - if (w.unpushTask(this)) { - try { - completed = exec(); - } catch (Throwable rex) { - setExceptionalCompletion(rex); + if (Thread.interrupted()) + throw new InterruptedException(); + // Messy in part because we measure in nanosecs, but wait in millisecs + int s; long ns, ms; + if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { + long deadline = System.nanoTime() + ns; + ForkJoinPool p = null; + ForkJoinPool.WorkQueue w = null; + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + w = wt.workQueue; + s = p.helpJoinOnce(w, this); // no retries on failure + } + boolean canBlock = false; + boolean interrupted = false; + try { + while ((s = status) >= 0) { + if (w != null && w.runState < 0) + cancelIgnoringExceptions(this); + else if (!canBlock) { + if (p == null || p.tryCompensate(this, null)) + canBlock = true; + } + else { + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && + U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { + try { + wait(ms); + } catch (InterruptedException ie) { + if (p == null) + interrupted = true; + } + } + else + notifyAll(); + } + } + if ((s = status) < 0 || interrupted || + (ns = deadline - System.nanoTime()) <= 0L) + break; } } - if (completed) - setCompletion(NORMAL); - else if (status >= 0 && nanos > 0) - w.pool.timedAwaitJoin(this, nanos); + } finally { + if (p != null && canBlock) + p.incrementActiveCount(); } + if (interrupted) + throw new InterruptedException(); } - else { - long millis = unit.toMillis(timeout); - if (millis > 0) - externalInterruptibleAwaitDone(millis); - } - int s = status; - if (s != NORMAL) { + if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -1029,8 +1105,9 @@ * ClassCastException}. */ public static void helpQuiesce() { - ((ForkJoinWorkerThread) Thread.currentThread()) - .helpQuiescePool(); + ForkJoinWorkerThread wt = + (ForkJoinWorkerThread)Thread.currentThread(); + wt.pool.helpQuiescePool(wt.workQueue); } /** @@ -1050,7 +1127,7 @@ * setRawResult(null)}. */ public void reinitialize() { - if (status == EXCEPTIONAL) + if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; @@ -1098,8 +1175,8 @@ * @return {@code true} if unforked */ public boolean tryUnfork() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .unpushTask(this); + return ((ForkJoinWorkerThread)Thread.currentThread()) + .workQueue.tryUnpush(this); } /** @@ -1118,7 +1195,7 @@ */ public static int getQueuedTaskCount() { return ((ForkJoinWorkerThread) Thread.currentThread()) - .getQueueSize(); + .workQueue.queueSize(); } /** @@ -1140,8 +1217,52 @@ * @return the surplus number of tasks, which may be negative */ public static int getSurplusQueuedTaskCount() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .getEstimatedSurplusTaskCount(); + /* + * The aim of this method is to return a cheap heuristic guide + * for task partitioning when programmers, frameworks, tools, + * or languages have little or no idea about task granularity. + * In essence by offering this method, we ask users only about + * tradeoffs in overhead vs expected throughput and its + * variance, rather than how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, + * each thread makes available for stealing enough tasks for + * other threads to remain active. Inductively, if all threads + * play by the same rules, each thread should make available + * only a constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of + * 1 would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should + * minimize steal rates, which in general means that threads + * nearer the top of computation tree should generate more + * than those nearer the bottom. In perfect steady state, each + * thread is at approximately the same level of computation + * tree. However, producing extra tasks amortizes the + * uncertainty of progress and diffusion assumptions. + * + * So, users will want to use values larger, but not much + * larger than 1 to both smooth over transient shortages and + * hedge against uneven progress; as traded off against the + * cost of extra task overhead. We leave the user to pick a + * threshold value to compare with the results of this call to + * guide decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to + * estimate surplus strictly locally. In steady-state, if one + * thread is maintaining say 2 surplus tasks, then so are + * others. So we can just use estimated queue length. + * However, this strategy alone leads to serious mis-estimates + * in some non-steady-state conditions (ramp-up, ramp-down, + * other stalls). We can detect many of these by further + * considering the number of "idle" threads, that are known to + * have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. + */ + ForkJoinWorkerThread wt = + (ForkJoinWorkerThread)Thread.currentThread(); + return wt.workQueue.queueSize() - wt.pool.idlePerActive(); } // Extension methods @@ -1167,15 +1288,18 @@ protected abstract void setRawResult(V value); /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in + * Immediately performs the base action of this task and returns + * true if, upon return from this method, this task is guaranteed + * to have completed normally. This method may return false + * otherwise, to indicate that this task is not necessarily + * complete (or is not known to be complete), for example in * asynchronous actions that require explicit invocations of - * {@link #complete} to become joinable. It may also throw an - * (unchecked) exception to indicate abnormal exit. + * completion methods. This method may also throw an (unchecked) + * exception to indicate abnormal exit. This method is designed to + * support extensions, and should not in general be called + * otherwise. * - * @return {@code true} if completed normally + * @return {@code true} if this task is known to have completed normally */ protected abstract boolean exec(); @@ -1198,8 +1322,7 @@ * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask<?> peekNextLocalTask() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .peekTask(); + return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek(); } /** @@ -1218,7 +1341,7 @@ */ protected static ForkJoinTask<?> pollNextLocalTask() { return ((ForkJoinWorkerThread) Thread.currentThread()) - .pollLocalTask(); + .workQueue.nextLocalTask(); } /** @@ -1240,8 +1363,60 @@ * @return a task, or {@code null} if none are available */ protected static ForkJoinTask<?> pollTask() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .pollTask(); + ForkJoinWorkerThread wt = + (ForkJoinWorkerThread)Thread.currentThread(); + return wt.pool.nextTaskFor(wt.workQueue); + } + + // tag operations + + /** + * Returns the tag for this task. + * + * @return the tag for this task + * @since 1.8 + */ + public final short getForkJoinTaskTag() { + return (short)status; + } + + /** + * Atomically sets the tag value for this task. + * + * @param tag the tag value + * @return the previous value of the tag + * @since 1.8 + */ + public final short setForkJoinTaskTag(short tag) { + for (int s;;) { + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; + } + } + + /** + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. + * + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. + * @since 1.8 + */ + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { + for (int s;;) { + if ((short)(s = status) != e) + return false; + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) + return true; + } } /** @@ -1252,21 +1427,33 @@ static final class AdaptedRunnable<T> extends ForkJoinTask<T> implements RunnableFuture<T> { final Runnable runnable; - final T resultOnCompletion; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; - this.resultOnCompletion = result; + this.result = result; // OK to set this even before completion } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adaptor for Runnables without results + */ + static final class AdaptedRunnableAction extends ForkJoinTask<Void> + implements RunnableFuture<Void> { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; } - public void run() { invoke(); } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } @@ -1281,9 +1468,9 @@ if (callable == null) throw new NullPointerException(); this.callable = callable; } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { try { result = callable.call(); return true; @@ -1295,7 +1482,7 @@ throw new RuntimeException(ex); } } - public void run() { invoke(); } + public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } @@ -1308,7 +1495,7 @@ * @return the task */ public static ForkJoinTask<?> adapt(Runnable runnable) { - return new AdaptedRunnable<Void>(runnable, null); + return new AdaptedRunnableAction(runnable); } /** @@ -1342,11 +1529,10 @@ private static final long serialVersionUID = -7721805057305804111L; /** - * Saves the state to a stream (that is, serializes it). + * Saves this task to a stream (that is, serializes it). * * @serialData the current run status and the exception thrown * during execution, or {@code null} if none - * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { @@ -1355,9 +1541,7 @@ } /** - * Reconstitutes the instance from a stream (that is, deserializes it). - * - * @param s the stream + * Reconstitutes this task from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { @@ -1368,15 +1552,15 @@ } // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long statusOffset; + private static final sun.misc.Unsafe U; + private static final long STATUS; static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue<Object>(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY]; try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); - statusOffset = UNSAFE.objectFieldOffset + U = sun.misc.Unsafe.getUnsafe(); + STATUS = U.objectFieldOffset (ForkJoinTask.class.getDeclaredField("status")); } catch (Exception e) { throw new Error(e);
--- a/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Mon Aug 20 17:14:26 2012 -0700 +++ b/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java Wed Aug 22 18:22:03 2012 -0700 @@ -35,9 +35,6 @@ package java.util.concurrent; -import java.util.Collection; -import java.util.concurrent.RejectedExecutionException; - /** * A thread managed by a {@link ForkJoinPool}, which executes * {@link ForkJoinTask}s. @@ -54,238 +51,13 @@ */ public class ForkJoinWorkerThread extends Thread { /* - * Overview: - * * ForkJoinWorkerThreads are managed by ForkJoinPools and perform - * ForkJoinTasks. This class includes bookkeeping in support of - * worker activation, suspension, and lifecycle control described - * in more detail in the internal documentation of class - * ForkJoinPool. And as described further below, this class also - * includes special-cased support for some ForkJoinTask - * methods. But the main mechanics involve work-stealing: - * - * Work-stealing queues are special forms of Deques that support - * only three of the four possible end-operations -- push, pop, - * and deq (aka steal), under the further constraints that push - * and pop are called only from the owning thread, while deq may - * be called from other threads. (If you are unfamiliar with - * them, you probably want to read Herlihy and Shavit's book "The - * Art of Multiprocessor programming", chapter 16 describing these - * in more detail before proceeding.) The main work-stealing - * queue design is roughly similar to those in the papers "Dynamic - * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html) and - * "Idempotent work stealing" by Michael, Saraswat, and Vechev, - * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). - * The main differences ultimately stem from gc requirements that - * we null out taken slots as soon as we can, to maintain as small - * a footprint as possible even in programs generating huge - * numbers of tasks. To accomplish this, we shift the CAS - * arbitrating pop vs deq (steal) from being on the indices - * ("queueBase" and "queueTop") to the slots themselves (mainly - * via method "casSlotNull()"). So, both a successful pop and deq - * mainly entail a CAS of a slot from non-null to null. Because - * we rely on CASes of references, we do not need tag bits on - * queueBase or queueTop. They are simple ints as used in any - * circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that queueTop == queueBase means the queue is empty, - * but otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probabilistic non-blockingness. - * If an attempted steal fails, a thief always chooses a different - * random victim target to try next. So, in order for one thief to - * progress, it suffices for any in-progress deq or new push on - * any empty queue to complete. - * - * This approach also enables support for "async mode" where local - * task processing is in FIFO, not LIFO order; simply by using a - * version of deq rather than pop when locallyFifo is true (as set - * by the ForkJoinPool). This allows use in message-passing - * frameworks in which tasks are never joined. However neither - * mode considers affinities, loads, cache localities, etc, so - * rarely provide the best possible performance on a given - * machine, but portably provide good throughput by averaging over - * these factors. (Further, even if we did try to use such - * information, we do not usually have a basis for exploiting - * it. For example, some sets of tasks profit from cache - * affinities, but others are harmed by cache pollution effects.) - * - * When a worker would otherwise be blocked waiting to join a - * task, it first tries a form of linear helping: Each worker - * records (in field currentSteal) the most recent task it stole - * from some other worker. Plus, it records (in field currentJoin) - * the task it is currently actively joining. Method joinTask uses - * these markers to try to find a worker to help (i.e., steal back - * a task from and execute it) that could hasten completion of the - * actively joined task. In essence, the joiner executes a task - * that would be on its own local deque had the to-be-joined task - * not been stolen. This may be seen as a conservative variant of - * the approach in Wagner & Calder "Leapfrogging: a portable - * technique for implementing efficient futures" SIGPLAN Notices, - * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs - * in that: (1) We only maintain dependency links across workers - * upon steals, rather than use per-task bookkeeping. This may - * require a linear scan of workers array to locate stealers, but - * usually doesn't because stealers leave hints (that may become - * stale/wrong) of where to locate them. This isolates cost to - * when it is needed, rather than adding to per-task overhead. - * (2) It is "shallow", ignoring nesting and potentially cyclic - * mutual steals. (3) It is intentionally racy: field currentJoin - * is updated only while actively joining, which means that we - * miss links in the chain during long-lived tasks, GC stalls etc - * (which is OK since blocking in such cases is usually a good - * idea). (4) We bound the number of attempts to find work (see - * MAX_HELP) and fall back to suspending the worker and if - * necessary replacing it with another. - * - * Efficient implementation of these algorithms currently relies - * on an uncomfortable amount of "Unsafe" mechanics. To maintain - * correct orderings, reads and writes of variable queueBase - * require volatile ordering. Variable queueTop need not be - * volatile because non-local reads always follow those of - * queueBase. Similarly, because they are protected by volatile - * queueBase reads, reads of the queue array and its slots by - * other threads do not need volatile load semantics, but writes - * (in push) require store order and CASes (in pop and deq) - * require (volatile) CAS semantics. (Michael, Saraswat, and - * Vechev's algorithm has similar properties, but without support - * for nulling slots.) Since these combinations aren't supported - * using ordinary volatiles, the only way to accomplish these - * efficiently is to use direct Unsafe calls. (Using external - * AtomicIntegers and AtomicReferenceArrays for the indices and - * array is significantly slower because of memory locality and - * indirection effects.) - * - * Further, performance on most platforms is very sensitive to - * placement and sizing of the (resizable) queue array. Even - * though these queues don't usually become all that big, the - * initial size must be large enough to counteract cache - * contention effects across multiple queues (especially in the - * presence of GC cardmarking). Also, to improve thread-locality, - * queues are initialized after starting. + * ForkJoinTasks. For explanation, see the internal documentation + * of class ForkJoinPool. */ - /** - * Mask for pool indices encoded as shorts - */ - private static final int SMASK = 0xffff; - - /** - * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 4, but is - * padded to minimize cache effects. - */ - private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; - - /** - * Maximum size for queue array. Must be a power of two - * less than or equal to 1 << (31 - width of array entry) to - * ensure lack of index wraparound, but is capped at a lower - * value to help users trap runaway computations. - */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M - - /** - * The work-stealing queue array. Size must be a power of two. - * Initialized when started (as opposed to when constructed), to - * improve memory locality. - */ - ForkJoinTask<?>[] queue; - - /** - * The pool this thread works in. Accessed directly by ForkJoinTask. - */ - final ForkJoinPool pool; - - /** - * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, and accessed by other - * threads only after reading (volatile) queueBase. Both queueTop - * and queueBase are allowed to wrap around on overflow, but - * (queueTop - queueBase) still estimates size. - */ - int queueTop; - - /** - * Index (mod queue.length) of least valid queue slot, which is - * always the next position to steal from if nonempty. - */ - volatile int queueBase; - - /** - * The index of most recent stealer, used as a hint to avoid - * traversal in method helpJoinTask. This is only a hint because a - * worker might have had multiple steals and this only holds one - * of them (usually the most current). Declared non-volatile, - * relying on other prevailing sync to keep reasonably current. - */ - int stealHint; - - /** - * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool to locate this worker in - * its workers array. - */ - final int poolIndex; - - /** - * Encoded record for pool task waits. Usages are always - * surrounded by volatile reads/writes - */ - int nextWait; - - /** - * Complement of poolIndex, offset by count of entries of task - * waits. Accessed by ForkJoinPool to manage event waiters. - */ - volatile int eventCount; - - /** - * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be initialized as nonzero. - */ - int seed; - - /** - * Number of steals. Directly accessed (and reset) by pool when - * idle. - */ - int stealCount; - - /** - * True if this worker should or did terminate - */ - volatile boolean terminate; - - /** - * Set to true before LockSupport.park; false on return - */ - volatile boolean parked; - - /** - * True if use local fifo, not default lifo, for local polling. - * Shadows value from ForkJoinPool. - */ - final boolean locallyFifo; - - /** - * The task most recently stolen from another worker (or - * submission queue). All uses are surrounded by enough volatile - * reads/writes to maintain as non-volatile. - */ - ForkJoinTask<?> currentSteal; - - /** - * The task currently being joined, set only when actively trying - * to help other stealers in helpJoinTask. All uses are surrounded - * by enough volatile reads/writes to maintain as non-volatile. - */ - ForkJoinTask<?> currentJoin; + final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics + final ForkJoinPool pool; // the pool this thread works in /** * Creates a ForkJoinWorkerThread operating in the given pool. @@ -295,19 +67,15 @@ */ protected ForkJoinWorkerThread(ForkJoinPool pool) { super(pool.nextWorkerName()); - this.pool = pool; - int k = pool.registerWorker(this); - poolIndex = k; - eventCount = ~k & SMASK; // clear wait count - locallyFifo = pool.locallyFifo; + setDaemon(true); Thread.UncaughtExceptionHandler ueh = pool.ueh; if (ueh != null) setUncaughtExceptionHandler(ueh); - setDaemon(true); + this.pool = pool; + pool.registerWorker(this.workQueue = new ForkJoinPool.WorkQueue + (pool, this, pool.localMode)); } - // Public methods - /** * Returns the pool hosting this thread. * @@ -327,28 +95,9 @@ * @return the index number */ public int getPoolIndex() { - return poolIndex; + return workQueue.poolIndex; } - // Randomization - - /** - * Computes next value for random victim probes and backoffs. - * Scans don't require a very high quality generator, but also not - * a crummy one. Marsaglia xor-shift is cheap and works well - * enough. Note: This is manually inlined in FJP.scan() to avoid - * writes inside busy loops. - */ - private int nextSeed() { - int r = seed; - r ^= r << 13; - r ^= r >>> 17; - r ^= r << 5; - return seed = r; - } - - // Run State management - /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must @@ -359,9 +108,6 @@ * processing tasks. */ protected void onStart() { - queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; - int r = ForkJoinPool.workerSeedGenerator.nextInt(); - seed = (r == 0) ? 1 : r; // must be nonzero } /** @@ -373,17 +119,6 @@ * to an unrecoverable error, or {@code null} if completed normally */ protected void onTermination(Throwable exception) { - try { - terminate = true; - cancelTasks(); - pool.deregisterWorker(this, exception); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - UNSAFE.throwException(exception); - } } /** @@ -395,604 +130,18 @@ Throwable exception = null; try { onStart(); - pool.work(this); + pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { - onTermination(exception); - } - } - - /* - * Intrinsics-based atomic writes for queue slots. These are - * basically the same as methods in AtomicReferenceArray, but - * specialized for (1) ForkJoinTask elements (2) requirement that - * nullness and bounds checks have already been performed by - * callers and (3) effective offsets are known not to overflow - * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't - * need corresponding version for reads: plain array reads are OK - * because they are protected by other volatile reads and are - * confirmed by CASes. - * - * Most uses don't actually call these methods, but instead - * contain inlined forms that enable more predictable - * optimization. We don't define the version of write used in - * pushTask at all, but instead inline there a store-fenced array - * slot write. - * - * Also in most methods, as a performance (not correctness) issue, - * we'd like to encourage compilers not to arbitrarily postpone - * setting queueTop after writing slot. Currently there is no - * intrinsic for arranging this, but using Unsafe putOrderedInt - * may be a preferable strategy on some compilers even though its - * main effect is a pre-, not post- fence. To simplify possible - * changes, the option is left in comments next to the associated - * assignments. - */ - - /** - * CASes slot i of array q from t to null. Caller must ensure q is - * non-null and index is in range. - */ - private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i, - ForkJoinTask<?> t) { - return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null); - } - - /** - * Performs a volatile write of the given task at given slot of - * array q. Caller must ensure q is non-null and index is in - * range. This method is used only during resets and backouts. - */ - private static final void writeSlot(ForkJoinTask<?>[] q, int i, - ForkJoinTask<?> t) { - UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t); - } - - // queue methods - - /** - * Pushes a task. Call only from this thread. - * - * @param t the task. Caller must ensure non-null. - */ - final void pushTask(ForkJoinTask<?> t) { - ForkJoinTask<?>[] q; int s, m; - if ((q = queue) != null) { // ignore if queue removed - long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; - UNSAFE.putOrderedObject(q, u, t); - queueTop = s + 1; // or use putOrderedInt - if ((s -= queueBase) <= 2) - pool.signalWork(); - else if (s == m) - growQueue(); - } - } - - /** - * Creates or doubles queue array. Transfers elements by - * emulating steals (deqs) from old array and placing, oldest - * first, into new array. - */ - private void growQueue() { - ForkJoinTask<?>[] oldQ = queue; - int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; - if (size > MAXIMUM_QUEUE_CAPACITY) - throw new RejectedExecutionException("Queue capacity exceeded"); - if (size < INITIAL_QUEUE_CAPACITY) - size = INITIAL_QUEUE_CAPACITY; - ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size]; - int mask = size - 1; - int top = queueTop; - int oldMask; - if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { - for (int b = queueBase; b != top; ++b) { - long u = ((b & oldMask) << ASHIFT) + ABASE; - Object x = UNSAFE.getObjectVolatile(oldQ, u); - if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) - UNSAFE.putObjectVolatile - (q, ((b & mask) << ASHIFT) + ABASE, x); + try { + onTermination(exception); + } catch (Throwable ex) { + if (exception == null) + exception = ex; + } finally { + pool.deregisterWorker(this, exception); } } } - - /** - * Tries to take a task from the base of the queue, failing if - * empty or contended. Note: Specializations of this code appear - * in locallyDeqTask and elsewhere. - * - * @return a task, or null if none or contended - */ - final ForkJoinTask<?> deqTask() { - ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; - if (queueTop != (b = queueBase) && - (q = queue) != null && // must read q after b - (i = (q.length - 1) & b) >= 0 && - (t = q[i]) != null && queueBase == b && - UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) { - queueBase = b + 1; - return t; - } - return null; - } - - /** - * Tries to take a task from the base of own queue. Called only - * by this thread. - * - * @return a task, or null if none - */ - final ForkJoinTask<?> locallyDeqTask() { - ForkJoinTask<?> t; int m, b, i; - ForkJoinTask<?>[] q = queue; - if (q != null && (m = q.length - 1) >= 0) { - while (queueTop != (b = queueBase)) { - if ((t = q[i = m & b]) != null && - queueBase == b && - UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, - t, null)) { - queueBase = b + 1; - return t; - } - } - } - return null; - } - - /** - * Returns a popped task, or null if empty. - * Called only by this thread. - */ - private ForkJoinTask<?> popTask() { - int m; - ForkJoinTask<?>[] q = queue; - if (q != null && (m = q.length - 1) >= 0) { - for (int s; (s = queueTop) != queueBase;) { - int i = m & --s; - long u = (i << ASHIFT) + ABASE; // raw offset - ForkJoinTask<?> t = q[i]; - if (t == null) // lost to stealer - break; - if (UNSAFE.compareAndSwapObject(q, u, t, null)) { - queueTop = s; // or putOrderedInt - return t; - } - } - } - return null; - } - - /** - * Specialized version of popTask to pop only if topmost element - * is the given task. Called only by this thread. - * - * @param t the task. Caller must ensure non-null. - */ - final boolean unpushTask(ForkJoinTask<?> t) { - ForkJoinTask<?>[] q; - int s; - if ((q = queue) != null && (s = queueTop) != queueBase && - UNSAFE.compareAndSwapObject - (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { - queueTop = s; // or putOrderedInt - return true; - } - return false; - } - - /** - * Returns next task, or null if empty or contended. - */ - final ForkJoinTask<?> peekTask() { - int m; - ForkJoinTask<?>[] q = queue; - if (q == null || (m = q.length - 1) < 0) - return null; - int i = locallyFifo ? queueBase : (queueTop - 1); - return q[i & m]; - } - - // Support methods for ForkJoinPool - - /** - * Runs the given task, plus any local tasks until queue is empty - */ - final void execTask(ForkJoinTask<?> t) { - currentSteal = t; - for (;;) { - if (t != null) - t.doExec(); - if (queueTop == queueBase) - break; - t = locallyFifo ? locallyDeqTask() : popTask(); - } - ++stealCount; - currentSteal = null; - } - - /** - * Removes and cancels all tasks in queue. Can be called from any - * thread. - */ - final void cancelTasks() { - ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks - if (cj != null && cj.status >= 0) - cj.cancelIgnoringExceptions(); - ForkJoinTask<?> cs = currentSteal; - if (cs != null && cs.status >= 0) - cs.cancelIgnoringExceptions(); - while (queueBase != queueTop) { - ForkJoinTask<?> t = deqTask(); - if (t != null) - t.cancelIgnoringExceptions(); - } - } - - /** - * Drains tasks to given collection c. - * - * @return the number of tasks drained - */ - final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { - int n = 0; - while (queueBase != queueTop) { - ForkJoinTask<?> t = deqTask(); - if (t != null) { - c.add(t); - ++n; - } - } - return n; - } - - // Support methods for ForkJoinTask - - /** - * Returns an estimate of the number of tasks in the queue. - */ - final int getQueueSize() { - return queueTop - queueBase; - } - - /** - * Gets and removes a local task. - * - * @return a task, if available - */ - final ForkJoinTask<?> pollLocalTask() { - return locallyFifo ? locallyDeqTask() : popTask(); - } - - /** - * Gets and removes a local or stolen task. - * - * @return a task, if available - */ - final ForkJoinTask<?> pollTask() { - ForkJoinWorkerThread[] ws; - ForkJoinTask<?> t = pollLocalTask(); - if (t != null || (ws = pool.workers) == null) - return t; - int n = ws.length; // cheap version of FJP.scan - int steps = n << 1; - int r = nextSeed(); - int i = 0; - while (i < steps) { - ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)]; - if (w != null && w.queueBase != w.queueTop && w.queue != null) { - if ((t = w.deqTask()) != null) - return t; - i = 0; - } - } - return null; - } - - /** - * The maximum stolen->joining link depth allowed in helpJoinTask, - * as well as the maximum number of retries (allowing on average - * one staleness retry per level) per attempt to instead try - * compensation. Depths for legitimate chains are unbounded, but - * we use a fixed constant to avoid (otherwise unchecked) cycles - * and bound staleness of traversal parameters at the expense of - * sometimes blocking when we could be helping. - */ - private static final int MAX_HELP = 16; - - /** - * Possibly runs some tasks and/or blocks, until joinMe is done. - * - * @param joinMe the task to join - * @return completion status on exit - */ - final int joinTask(ForkJoinTask<?> joinMe) { - ForkJoinTask<?> prevJoin = currentJoin; - currentJoin = joinMe; - for (int s, retries = MAX_HELP;;) { - if ((s = joinMe.status) < 0) { - currentJoin = prevJoin; - return s; - } - if (retries > 0) { - if (queueTop != queueBase) { - if (!localHelpJoinTask(joinMe)) - retries = 0; // cannot help - } - else if (retries == MAX_HELP >>> 1) { - --retries; // check uncommon case - if (tryDeqAndExec(joinMe) >= 0) - Thread.yield(); // for politeness - } - else - retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1; - } - else { - retries = MAX_HELP; // restart if not done - pool.tryAwaitJoin(joinMe); - } - } - } - - /** - * If present, pops and executes the given task, or any other - * cancelled task - * - * @return false if any other non-cancelled task exists in local queue - */ - private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) { - int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t; - if ((s = queueTop) != queueBase && (q = queue) != null && - (i = (q.length - 1) & --s) >= 0 && - (t = q[i]) != null) { - if (t != joinMe && t.status >= 0) - return false; - if (UNSAFE.compareAndSwapObject - (q, (i << ASHIFT) + ABASE, t, null)) { - queueTop = s; // or putOrderedInt - t.doExec(); - } - } - return true; - } - - /** - * Tries to locate and execute tasks for a stealer of the given - * task, or in turn one of its stealers, Traces - * currentSteal->currentJoin links looking for a thread working on - * a descendant of the given task and with a non-empty queue to - * steal back and execute tasks from. The implementation is very - * branchy to cope with potential inconsistencies or loops - * encountering chains that are stale, unknown, or of length - * greater than MAX_HELP links. All of these cases are dealt with - * by just retrying by caller. - * - * @param joinMe the task to join - * @param canSteal true if local queue is empty - * @return true if ran a task - */ - private boolean helpJoinTask(ForkJoinTask<?> joinMe) { - boolean helped = false; - int m = pool.scanGuard & SMASK; - ForkJoinWorkerThread[] ws = pool.workers; - if (ws != null && ws.length > m && joinMe.status >= 0) { - int levels = MAX_HELP; // remaining chain length - ForkJoinTask<?> task = joinMe; // base of chain - outer:for (ForkJoinWorkerThread thread = this;;) { - // Try to find v, the stealer of task, by first using hint - ForkJoinWorkerThread v = ws[thread.stealHint & m]; - if (v == null || v.currentSteal != task) { - for (int j = 0; ;) { // search array - if ((v = ws[j]) != null && v.currentSteal == task) { - thread.stealHint = j; - break; // save hint for next time - } - if (++j > m) - break outer; // can't find stealer - } - } - // Try to help v, using specialized form of deqTask - for (;;) { - ForkJoinTask<?>[] q; int b, i; - if (joinMe.status < 0) - break outer; - if ((b = v.queueBase) == v.queueTop || - (q = v.queue) == null || - (i = (q.length-1) & b) < 0) - break; // empty - long u = (i << ASHIFT) + ABASE; - ForkJoinTask<?> t = q[i]; - if (task.status < 0) - break outer; // stale - if (t != null && v.queueBase == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - v.queueBase = b + 1; - v.stealHint = poolIndex; - ForkJoinTask<?> ps = currentSteal; - currentSteal = t; - t.doExec(); - currentSteal = ps; - helped = true; - } - } - // Try to descend to find v's stealer - ForkJoinTask<?> next = v.currentJoin; - if (--levels > 0 && task.status >= 0 && - next != null && next != task) { - task = next; - thread = v; - } - else - break; // max levels, stale, dead-end, or cyclic - } - } - return helped; - } - - /** - * Performs an uncommon case for joinTask: If task t is at base of - * some workers queue, steals and executes it. - * - * @param t the task - * @return t's status - */ - private int tryDeqAndExec(ForkJoinTask<?> t) { - int m = pool.scanGuard & SMASK; - ForkJoinWorkerThread[] ws = pool.workers; - if (ws != null && ws.length > m && t.status >= 0) { - for (int j = 0; j <= m; ++j) { - ForkJoinTask<?>[] q; int b, i; - ForkJoinWorkerThread v = ws[j]; - if (v != null && - (b = v.queueBase) != v.queueTop && - (q = v.queue) != null && - (i = (q.length - 1) & b) >= 0 && - q[i] == t) { - long u = (i << ASHIFT) + ABASE; - if (v.queueBase == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - v.queueBase = b + 1; - v.stealHint = poolIndex; - ForkJoinTask<?> ps = currentSteal; - currentSteal = t; - t.doExec(); - currentSteal = ps; - } - break; - } - } - } - return t.status; - } - - /** - * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns - * an estimate of the number of tasks, offset by a function of - * number of idle workers. - * - * This method provides a cheap heuristic guide for task - * partitioning when programmers, frameworks, tools, or languages - * have little or no idea about task granularity. In essence by - * offering this method, we ask users only about tradeoffs in - * overhead vs expected throughput and its variance, rather than - * how finely to partition tasks. - * - * In a steady state strict (tree-structured) computation, each - * thread makes available for stealing enough tasks for other - * threads to remain active. Inductively, if all threads play by - * the same rules, each thread should make available only a - * constant number of tasks. - * - * The minimum useful constant is just 1. But using a value of 1 - * would require immediate replenishment upon each steal to - * maintain enough tasks, which is infeasible. Further, - * partitionings/granularities of offered tasks should minimize - * steal rates, which in general means that threads nearer the top - * of computation tree should generate more than those nearer the - * bottom. In perfect steady state, each thread is at - * approximately the same level of computation tree. However, - * producing extra tasks amortizes the uncertainty of progress and - * diffusion assumptions. - * - * So, users will want to use values larger, but not much larger - * than 1 to both smooth over transient shortages and hedge - * against uneven progress; as traded off against the cost of - * extra task overhead. We leave the user to pick a threshold - * value to compare with the results of this call to guide - * decisions, but recommend values such as 3. - * - * When all threads are active, it is on average OK to estimate - * surplus strictly locally. In steady-state, if one thread is - * maintaining say 2 surplus tasks, then so are others. So we can - * just use estimated queue length (although note that (queueTop - - * queueBase) can be an overestimate because of stealers lagging - * increments of queueBase). However, this strategy alone leads - * to serious mis-estimates in some non-steady-state conditions - * (ramp-up, ramp-down, other stalls). We can detect many of these - * by further considering the number of "idle" threads, that are - * known to have zero queued tasks, so compensate by a factor of - * (#idle/#active) threads. - */ - final int getEstimatedSurplusTaskCount() { - return queueTop - queueBase - pool.idlePerActive(); - } - - /** - * Runs tasks until {@code pool.isQuiescent()}. We piggyback on - * pool's active count ctl maintenance, but rather than blocking - * when tasks cannot be found, we rescan until all others cannot - * find tasks either. The bracketing by pool quiescerCounts - * updates suppresses pool auto-shutdown mechanics that could - * otherwise prematurely terminate the pool because all threads - * appear to be inactive. - */ - final void helpQuiescePool() { - boolean active = true; - ForkJoinTask<?> ps = currentSteal; // to restore below - ForkJoinPool p = pool; - p.addQuiescerCount(1); - for (;;) { - ForkJoinWorkerThread[] ws = p.workers; - ForkJoinWorkerThread v = null; - int n; - if (queueTop != queueBase) - v = this; - else if (ws != null && (n = ws.length) > 1) { - ForkJoinWorkerThread w; - int r = nextSeed(); // cheap version of FJP.scan - int steps = n << 1; - for (int i = 0; i < steps; ++i) { - if ((w = ws[(i + r) & (n - 1)]) != null && - w.queueBase != w.queueTop) { - v = w; - break; - } - } - } - if (v != null) { - ForkJoinTask<?> t; - if (!active) { - active = true; - p.addActiveCount(1); - } - if ((t = (v != this) ? v.deqTask() : - locallyFifo ? locallyDeqTask() : popTask()) != null) { - currentSteal = t; - t.doExec(); - currentSteal = ps; - } - } - else { - if (active) { - active = false; - p.addActiveCount(-1); - } - if (p.isQuiescent()) { - p.addActiveCount(1); - p.addQuiescerCount(-1); - break; - } - } - } - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long ABASE; - private static final int ASHIFT; - - static { - int s; - try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); - Class<?> a = ForkJoinTask[].class; - ABASE = UNSAFE.arrayBaseOffset(a); - s = UNSAFE.arrayIndexScale(a); - } catch (Exception e) { - throw new Error(e); - } - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); - } - }
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java Mon Aug 20 17:14:26 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalFactoryTest.java Wed Aug 22 18:22:03 2012 -0700 @@ -87,11 +87,11 @@ th[i] = new Thread() { @Override public void run() { - int threadId = ((Integer) (threadLocal.get())).intValue(); + int threadId = threadLocal.get().intValue(); assertFalse(visited[threadId], "visited[" + threadId + "]=" + visited[threadId]); visited[threadId] = true; // check the get() again - int secondCheckThreadId = ((Integer) (threadLocal.get())).intValue(); + int secondCheckThreadId = threadLocal.get().intValue(); assertEquals( secondCheckThreadId, threadId ); Thread.yield(); }
--- a/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java Mon Aug 20 17:14:26 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/java/lang/ThreadLocalTest.java Wed Aug 22 18:22:03 2012 -0700 @@ -43,7 +43,7 @@ * Test of get method, of class ThreadLocal. */ public void testGet() { - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<Object> instance = new ThreadLocal<>(); Object expResult = null; Object result = instance.get(); assertEquals(result, expResult); @@ -54,7 +54,7 @@ */ public void testSet() { String initialValue = "initial value"; - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<String> instance = new ThreadLocal<>(); instance.set(initialValue); assertEquals( instance.get(), initialValue ); } @@ -64,7 +64,7 @@ */ public void testRemove() { String putThisIn = "value was set"; - ThreadLocal instance = new ThreadLocal(); + ThreadLocal<String> instance = new ThreadLocal<>(); instance.set( putThisIn ); instance.remove(); assertNull( instance.get() ); @@ -75,7 +75,7 @@ */ public void testInitWithFactory() { String whatDoYouExpect = "OneWithEverything"; - ThreadLocal<String> hotdogForTheMonk = new ThreadLocal( new StringFactory( whatDoYouExpect )); + ThreadLocal<String> hotdogForTheMonk = new ThreadLocal<>( new StringFactory( whatDoYouExpect )); assertEquals( hotdogForTheMonk.get(), hotdogForTheMonk.get() ); }
--- a/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java Mon Aug 20 17:14:26 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/java/util/concurrent/AtomicReferenceTest.java Wed Aug 22 18:22:03 2012 -0700 @@ -24,9 +24,7 @@ */ package org.openjdk.tests.java.util.concurrent; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.functions.IntUnaryOperator; import java.util.functions.UnaryOperator; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -47,7 +45,7 @@ public void testUpdateAndGet() { System.out.println("getAndUpdate"); UnaryOperator<Integer> op = (x -> x + 2); - AtomicReference instance = new AtomicReference(new Integer(3)); + AtomicReference<Integer> instance = new AtomicReference<>(new Integer(3)); Object expResult = 5; Object result = instance.updateAndGet( op ); assertEquals(result, expResult); @@ -59,7 +57,7 @@ public void testGetAndUpdate() { System.out.println("getAndUpdate"); UnaryOperator<Integer> op = (x -> x + 3); - AtomicReference instance = new AtomicReference(new Integer(3)); + AtomicReference<Integer> instance = new AtomicReference<>(new Integer(3)); Object expResult = 3; Object result = instance.getAndUpdate( op ); assertEquals(result, expResult);
--- a/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java Mon Aug 20 17:14:26 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/java/util/functions/UnaryOperatorTest.java Wed Aug 22 18:22:03 2012 -0700 @@ -25,10 +25,6 @@ package org.openjdk.tests.java.util.functions; import java.util.functions.UnaryOperator; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -38,6 +34,7 @@ * @since 1.8 */ @Test +@SuppressWarnings({"rawtypes", "unchecked"}) public class UnaryOperatorTest { public UnaryOperatorTest() {
--- a/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestFDCCE.java Mon Aug 20 17:14:26 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/javac/MethodReferenceTestFDCCE.java Wed Aug 22 18:22:03 2012 -0700 @@ -32,10 +32,12 @@ import static org.testng.Assert.fail; /** + * Method references and raw types. * @author Robert Field */ @Test +@SuppressWarnings({"rawtypes", "unchecked"}) public class MethodReferenceTestFDCCE { static void assertCCE(Throwable t) {