OpenJDK / jdk9 / jdk9 / jdk
changeset 15087:f3af17da360b
8157522: Performance improvements to CompletableFuture
Reviewed-by: martin, psandoz, rriggs, plevart, dfuchs
author | dl |
---|---|
date | Fri, 15 Jul 2016 13:59:58 -0700 |
parents | fd4819ec5afd |
children | 955eab36f5da |
files | src/java.base/share/classes/java/util/concurrent/CompletableFuture.java test/java/util/concurrent/tck/CompletableFutureTest.java |
diffstat | 2 files changed, 1007 insertions(+), 638 deletions(-) [+] |
line wrap: on
line diff
--- a/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Fri Jul 15 13:55:51 2016 -0700 +++ b/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Fri Jul 15 13:59:58 2016 -0700 @@ -35,6 +35,8 @@ package java.util.concurrent; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.concurrent.locks.LockSupport; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -149,26 +151,29 @@ * applies across normal vs exceptional outcomes, sync vs async * actions, binary triggers, and various forms of completions. * - * Non-nullness of field result (set via CAS) indicates done. An - * AltResult is used to box null as a result, as well as to hold - * exceptions. Using a single field makes completion simple to - * detect and trigger. Encoding and decoding is straightforward - * but adds to the sprawl of trapping and associating exceptions - * with targets. Minor simplifications rely on (static) NIL (to - * box null results) being the only AltResult with a null - * exception field, so we don't usually need explicit comparisons. - * Even though some of the generics casts are unchecked (see - * SuppressWarnings annotations), they are placed to be - * appropriate even if checked. + * Non-nullness of volatile field "result" indicates done. It may + * be set directly if known to be thread-confined, else via CAS. + * An AltResult is used to box null as a result, as well as to + * hold exceptions. Using a single field makes completion simple + * to detect and trigger. Result encoding and decoding is + * straightforward but tedious and adds to the sprawl of trapping + * and associating exceptions with targets. Minor simplifications + * rely on (static) NIL (to box null results) being the only + * AltResult with a null exception field, so we don't usually need + * explicit comparisons. Even though some of the generics casts + * are unchecked (see SuppressWarnings annotations), they are + * placed to be appropriate even if checked. * * Dependent actions are represented by Completion objects linked * as Treiber stacks headed by field "stack". There are Completion - * classes for each kind of action, grouped into single-input - * (UniCompletion), two-input (BiCompletion), projected - * (BiCompletions using either (not both) of two inputs), shared - * (CoCompletion, used by the second of two sources), zero-input - * source actions, and Signallers that unblock waiters. Class - * Completion extends ForkJoinTask to enable async execution + * classes for each kind of action, grouped into: + * - single-input (UniCompletion), + * - two-input (BiCompletion), + * - projected (BiCompletions using exactly one of two inputs), + * - shared (CoCompletion, used by the second of two sources), + * - zero-input source actions, + * - Signallers that unblock waiters. + * Class Completion extends ForkJoinTask to enable async execution * (adding no space overhead because we exploit its "tag" methods * to maintain claims). It is also declared as Runnable to allow * usage with arbitrary executors. @@ -184,7 +189,7 @@ * encounter layers of adapters in common usages. * * * Boolean CompletableFuture method x(...) (for example - * uniApply) takes all of the arguments needed to check that an + * biApply) takes all of the arguments needed to check that an * action is triggerable, and then either runs the action or * arranges its async execution by executing its Completion * argument, if present. The method returns true if known to be @@ -194,24 +199,29 @@ * method with its held arguments, and on success cleans up. * The mode argument allows tryFire to be called twice (SYNC, * then ASYNC); the first to screen and trap exceptions while - * arranging to execute, and the second when called from a - * task. (A few classes are not used async so take slightly - * different forms.) The claim() callback suppresses function - * invocation if already claimed by another thread. + * arranging to execute, and the second when called from a task. + * (A few classes are not used async so take slightly different + * forms.) The claim() callback suppresses function invocation + * if already claimed by another thread. + * + * * Some classes (for example UniApply) have separate handling + * code for when known to be thread-confined ("now" methods) and + * for when shared (in tryFire), for efficiency. * * * CompletableFuture method xStage(...) is called from a public - * stage method of CompletableFuture x. It screens user + * stage method of CompletableFuture f. It screens user * arguments and invokes and/or creates the stage object. If - * not async and x is already complete, the action is run - * immediately. Otherwise a Completion c is created, pushed to - * x's stack (unless done), and started or triggered via - * c.tryFire. This also covers races possible if x completes - * while pushing. Classes with two inputs (for example BiApply) - * deal with races across both while pushing actions. The - * second completion is a CoCompletion pointing to the first, - * shared so that at most one performs the action. The - * multiple-arity methods allOf and anyOf do this pairwise to - * form trees of completions. + * not async and already triggerable, the action is run + * immediately. Otherwise a Completion c is created, and + * submitted to the executor if triggerable, or pushed onto f's + * stack if not. Completion actions are started via c.tryFire. + * We recheck after pushing to a source future's stack to cover + * possible races if the source completes while pushing. + * Classes with two inputs (for example BiApply) deal with races + * across both while pushing actions. The second completion is + * a CoCompletion pointing to the first, shared so that at most + * one performs the action. The multiple-arity methods allOf + * and anyOf do this pairwise to form trees of completions. * * Note that the generic type parameters of methods vary according * to whether "this" is a source, dependent, or completion. @@ -236,29 +246,30 @@ * pointing back to its sources. So we null out fields as soon as * possible. The screening checks needed anyway harmlessly ignore * null arguments that may have been obtained during races with - * threads nulling out fields. We also try to unlink fired - * Completions from stacks that might never be popped (see method - * postFire). Completion fields need not be declared as final or - * volatile because they are only visible to other threads upon - * safe publication. + * threads nulling out fields. We also try to unlink non-isLive + * (fired or cancelled) Completions from stacks that might + * otherwise never be popped: Method cleanStack always unlinks non + * isLive completions from the head of stack; others may + * occasionally remain if racing with other cancellations or + * removals. + * + * Completion fields need not be declared as final or volatile + * because they are only visible to other threads upon safe + * publication. */ volatile Object result; // Either the result or boxed AltResult volatile Completion stack; // Top of Treiber stack of dependent actions final boolean internalComplete(Object r) { // CAS from null to r - return U.compareAndSwapObject(this, RESULT, null, r); - } - - final boolean casStack(Completion cmp, Completion val) { - return U.compareAndSwapObject(this, STACK, cmp, val); + return RESULT.compareAndSet(this, null, r); } /** Returns true if successfully pushed c onto stack. */ final boolean tryPushStack(Completion c) { Completion h = stack; - lazySetNext(c, h); - return U.compareAndSwapObject(this, STACK, h, c); + NEXT.set(c, h); // CAS piggyback + return STACK.compareAndSet(this, h, c); } /** Unconditionally pushes c onto stack, retrying if necessary. */ @@ -278,8 +289,7 @@ /** Completes with the null value, unless already completed. */ final boolean completeNull() { - return U.compareAndSwapObject(this, RESULT, null, - NIL); + return RESULT.compareAndSet(this, null, NIL); } /** Returns the encoding of the given non-exceptional value. */ @@ -289,8 +299,7 @@ /** Completes with a non-exceptional result, unless already completed. */ final boolean completeValue(T t) { - return U.compareAndSwapObject(this, RESULT, null, - (t == null) ? NIL : t); + return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); } /** @@ -304,8 +313,7 @@ /** Completes with an exceptional result, unless already completed. */ final boolean completeThrowable(Throwable x) { - return U.compareAndSwapObject(this, RESULT, null, - encodeThrowable(x)); + return RESULT.compareAndSet(this, null, encodeThrowable(x)); } /** @@ -332,8 +340,7 @@ * existing CompletionException. */ final boolean completeThrowable(Throwable x, Object r) { - return U.compareAndSwapObject(this, RESULT, null, - encodeThrowable(x, r)); + return RESULT.compareAndSet(this, null, encodeThrowable(x, r)); } /** @@ -351,10 +358,11 @@ */ static Object encodeRelay(Object r) { Throwable x; - return (((r instanceof AltResult) && - (x = ((AltResult)r).ex) != null && - !(x instanceof CompletionException)) ? - new AltResult(new CompletionException(x)) : r); + if (r instanceof AltResult + && (x = ((AltResult)r).ex) != null + && !(x instanceof CompletionException)) + r = new AltResult(new CompletionException(x)); + return r; } /** @@ -362,14 +370,13 @@ * If exceptional, r is first coerced to a CompletionException. */ final boolean completeRelay(Object r) { - return U.compareAndSwapObject(this, RESULT, null, - encodeRelay(r)); + return RESULT.compareAndSet(this, null, encodeRelay(r)); } /** * Reports result using Future.get conventions. */ - private static <T> T reportGet(Object r) + private static Object reportGet(Object r) throws InterruptedException, ExecutionException { if (r == null) // by convention below, null means interrupted throw new InterruptedException(); @@ -384,14 +391,13 @@ x = cause; throw new ExecutionException(x); } - @SuppressWarnings("unchecked") T t = (T) r; - return t; + return r; } /** * Decodes outcome to return result or throw unchecked exception. */ - private static <T> T reportJoin(Object r) { + private static Object reportJoin(Object r) { if (r instanceof AltResult) { Throwable x; if ((x = ((AltResult)r).ex) == null) @@ -402,8 +408,7 @@ throw (CompletionException)x; throw new CompletionException(x); } - @SuppressWarnings("unchecked") T t = (T) r; - return t; + return r; } /* ------------- Async task preliminaries -------------- */ @@ -449,12 +454,6 @@ static final int ASYNC = 1; static final int NESTED = -1; - /** - * Spins before blocking in waitingGet - */ - static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? - 1 << 8 : 0); - /* ------------- Base Completion classes and operations -------------- */ @SuppressWarnings("serial") @@ -479,10 +478,6 @@ public final void setRawResult(Void v) {} } - static void lazySetNext(Completion c, Completion next) { - U.putObjectRelease(c, NEXT, next); - } - /** * Pops and tries to trigger all reachable dependents. Call only * when known to be done. @@ -497,40 +492,47 @@ while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; - if (f.casStack(h, t = h.next)) { + if (STACK.compareAndSet(f, h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } - h.next = null; // detach + NEXT.compareAndSet(h, t, null); // try to detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } - /** Traverses stack and unlinks dead Completions. */ + /** Traverses stack and unlinks one or more dead Completions, if found. */ final void cleanStack() { - for (Completion p = null, q = stack; q != null;) { + Completion p = stack; + // ensure head of stack live + for (boolean unlinked = false;;) { + if (p == null) + return; + else if (p.isLive()) { + if (unlinked) + return; + else + break; + } + else if (STACK.weakCompareAndSetVolatile(this, p, (p = p.next))) + unlinked = true; + else + p = stack; + } + // try to unlink first non-live + for (Completion q = p.next; q != null;) { Completion s = q.next; if (q.isLive()) { p = q; q = s; - } - else if (p == null) { - casStack(q, s); - q = stack; - } - else { - p.next = s; - if (p.isLive()) - q = s; - else { - p = null; // restart - q = stack; - } - } + } else if (NEXT.weakCompareAndSetVolatile(p, q, s)) + break; + else + q = p.next; } } @@ -568,11 +570,20 @@ final boolean isLive() { return dep != null; } } - /** Pushes the given completion (if it exists) unless done. */ - final void push(UniCompletion<?,?> c) { + /** + * Pushes the given completion unless it completes while trying. + * Caller should first check that result is null. + */ + final void unipush(Completion c) { if (c != null) { - while (result == null && !tryPushStack(c)) - lazySetNext(c, null); // clear on failure + while (!tryPushStack(c)) { + if (result != null) { + NEXT.set(c, null); + break; + } + } + if (result != null) + c.tryFire(SYNC); } } @@ -583,9 +594,10 @@ */ final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { if (a != null && a.stack != null) { - if (a.result == null) + Object r; + if ((r = a.result) == null) a.cleanStack(); - else if (mode >= 0) + if (mode >= 0 && (r != null || a.result != null)) a.postComplete(); } if (result != null && stack != null) { @@ -607,48 +619,65 @@ } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniApply(a = src, fn, mode > 0 ? null : this)) + Object r; Throwable x; Function<? super T,? extends V> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null) return null; + tryComplete: if (d.result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (mode <= 0 && !claim()) + return null; + else { + @SuppressWarnings("unchecked") T t = (T) r; + d.completeValue(f.apply(t)); + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final <S> boolean uniApply(CompletableFuture<S> a, - Function<? super S,? extends T> f, - UniApply<S,T> c) { - Object r; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; - tryComplete: if (result == null) { - if (r instanceof AltResult) { - if ((x = ((AltResult)r).ex) != null) { - completeThrowable(x, r); - break tryComplete; - } - r = null; - } - try { - if (c != null && !c.claim()) - return false; - @SuppressWarnings("unchecked") S s = (S) r; - completeValue(f.apply(s)); - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); + Object r; + if ((r = result) != null) + return uniApplyNow(r, e, f); CompletableFuture<V> d = newIncompleteFuture(); - if (e != null || !d.uniApply(this, f, null)) { - UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); - push(c); - c.tryFire(SYNC); + unipush(new UniApply<T,V>(e, d, this, f)); + return d; + } + + private <V> CompletableFuture<V> uniApplyNow( + Object r, Executor e, Function<? super T,? extends V> f) { + Throwable x; + CompletableFuture<V> d = newIncompleteFuture(); + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.result = encodeThrowable(x, r); + return d; + } + r = null; + } + try { + if (e != null) { + e.execute(new UniApply<T,V>(null, d, this, f)); + } else { + @SuppressWarnings("unchecked") T t = (T) r; + d.result = d.encodeValue(f.apply(t)); + } + } catch (Throwable ex) { + d.result = encodeThrowable(ex); } return d; } @@ -662,48 +691,67 @@ } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniAccept(a = src, fn, mode > 0 ? null : this)) + Object r; Throwable x; Consumer<? super T> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null) return null; + tryComplete: if (d.result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (mode <= 0 && !claim()) + return null; + else { + @SuppressWarnings("unchecked") T t = (T) r; + f.accept(t); + d.completeNull(); + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final <S> boolean uniAccept(CompletableFuture<S> a, - Consumer<? super S> f, UniAccept<S> c) { - Object r; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; - tryComplete: if (result == null) { - if (r instanceof AltResult) { - if ((x = ((AltResult)r).ex) != null) { - completeThrowable(x, r); - break tryComplete; - } - r = null; - } - try { - if (c != null && !c.claim()) - return false; - @SuppressWarnings("unchecked") S s = (S) r; - f.accept(s); - completeNull(); - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); + Object r; + if ((r = result) != null) + return uniAcceptNow(r, e, f); CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.uniAccept(this, f, null)) { - UniAccept<T> c = new UniAccept<T>(e, d, this, f); - push(c); - c.tryFire(SYNC); + unipush(new UniAccept<T>(e, d, this, f)); + return d; + } + + private CompletableFuture<Void> uniAcceptNow( + Object r, Executor e, Consumer<? super T> f) { + Throwable x; + CompletableFuture<Void> d = newIncompleteFuture(); + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.result = encodeThrowable(x, r); + return d; + } + r = null; + } + try { + if (e != null) { + e.execute(new UniAccept<T>(null, d, this, f)); + } else { + @SuppressWarnings("unchecked") T t = (T) r; + f.accept(t); + d.result = NIL; + } + } catch (Throwable ex) { + d.result = encodeThrowable(ex); } return d; } @@ -717,42 +765,56 @@ } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniRun(a = src, fn, mode > 0 ? null : this)) + Object r; Throwable x; Runnable f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null) return null; + if (d.result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + d.completeThrowable(x, r); + else + try { + if (mode <= 0 && !claim()) + return null; + else { + f.run(); + d.completeNull(); + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { - Object r; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; - if (result == null) { - if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) - completeThrowable(x, r); - else - try { - if (c != null && !c.claim()) - return false; - f.run(); - completeNull(); - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; + private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + Object r; + if ((r = result) != null) + return uniRunNow(r, e, f); + CompletableFuture<Void> d = newIncompleteFuture(); + unipush(new UniRun<T>(e, d, this, f)); + return d; } - private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { - if (f == null) throw new NullPointerException(); + private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) { + Throwable x; CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.uniRun(this, f, null)) { - UniRun<T> c = new UniRun<T>(e, d, this, f); - push(c); - c.tryFire(SYNC); - } + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + d.result = encodeThrowable(x, r); + else + try { + if (e != null) { + e.execute(new UniRun<T>(null, d, this, f)); + } else { + f.run(); + d.result = NIL; + } + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @@ -766,20 +828,20 @@ } final CompletableFuture<T> tryFire(int mode) { CompletableFuture<T> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) + Object r; BiConsumer<? super T, ? super Throwable> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || !d.uniWhenComplete(r, f, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final boolean uniWhenComplete(CompletableFuture<T> a, + final boolean uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c) { - Object r; T t; Throwable x = null; - if (a == null || (r = a.result) == null || f == null) - return false; + T t; Throwable x = null; if (result == null) { try { if (c != null && !c.claim()) @@ -811,10 +873,17 @@ Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = newIncompleteFuture(); - if (e != null || !d.uniWhenComplete(this, f, null)) { - UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); - push(c); - c.tryFire(SYNC); + Object r; + if ((r = result) == null) + unipush(new UniWhenComplete<T>(e, d, this, f)); + else if (e == null) + d.uniWhenComplete(r, f, null); + else { + try { + e.execute(new UniWhenComplete<T>(null, d, this, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } } return d; } @@ -829,20 +898,20 @@ } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniHandle(a = src, fn, mode > 0 ? null : this)) + Object r; BiFunction<? super T, Throwable, ? extends V> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || !d.uniHandle(r, f, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final <S> boolean uniHandle(CompletableFuture<S> a, + final <S> boolean uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c) { - Object r; S s; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; + S s; Throwable x; if (result == null) { try { if (c != null && !c.claim()) @@ -867,10 +936,17 @@ Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = newIncompleteFuture(); - if (e != null || !d.uniHandle(this, f, null)) { - UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); - push(c); - c.tryFire(SYNC); + Object r; + if ((r = result) == null) + unipush(new UniHandle<T,V>(e, d, this, f)); + else if (e == null) + d.uniHandle(r, f, null); + else { + try { + e.execute(new UniHandle<T,V>(null, d, this, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } } return d; } @@ -885,19 +961,20 @@ final CompletableFuture<T> tryFire(int mode) { // never ASYNC // assert mode != ASYNC; CompletableFuture<T> d; CompletableFuture<T> a; - if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) + Object r; Function<? super Throwable, ? extends T> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || !d.uniExceptionally(r, f, this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final boolean uniExceptionally(CompletableFuture<T> a, + final boolean uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c) { - Object r; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; + Throwable x; if (result == null) { try { if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { @@ -917,47 +994,38 @@ Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = newIncompleteFuture(); - if (!d.uniExceptionally(this, f, null)) { - UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); - push(c); - c.tryFire(SYNC); - } + Object r; + if ((r = result) == null) + unipush(new UniExceptionally<T>(d, this, f)); + else + d.uniExceptionally(r, f, null); return d; } @SuppressWarnings("serial") - static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose + static final class UniRelay<T> extends UniCompletion<T,T> { UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { super(null, dep, src); } final CompletableFuture<T> tryFire(int mode) { - CompletableFuture<T> d; CompletableFuture<T> a; - if ((d = dep) == null || !d.uniRelay(a = src)) + CompletableFuture<T> d; CompletableFuture<T> a; Object r; + if ((d = dep) == null + || (a = src) == null || (r = a.result) == null) return null; + if (d.result == null) + d.completeRelay(r); src = null; dep = null; return d.postFire(a, mode); } } - final boolean uniRelay(CompletableFuture<T> a) { - Object r; - if (a == null || (r = a.result) == null) - return false; - if (result == null) // no need to claim - completeRelay(r); - return true; - } - private CompletableFuture<T> uniCopyStage() { Object r; CompletableFuture<T> d = newIncompleteFuture(); if ((r = result) != null) - d.completeRelay(r); - else { - UniRelay<T> c = new UniRelay<T>(d, this); - push(c); - c.tryFire(SYNC); - } + d.result = encodeRelay(r); + else + unipush(new UniRelay<T>(d, this)); return d; } @@ -966,9 +1034,7 @@ if ((r = result) != null) return new MinimalStage<T>(encodeRelay(r)); MinimalStage<T> d = new MinimalStage<T>(); - UniRelay<T> c = new UniRelay<T>(d, this); - push(c); - c.tryFire(SYNC); + unipush(new UniRelay<T>(d, this)); return d; } @@ -982,54 +1048,48 @@ } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; - if ((d = dep) == null || - !d.uniCompose(a = src, fn, mode > 0 ? null : this)) + Function<? super T, ? extends CompletionStage<V>> f; + Object r; Throwable x; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null) return null; + tryComplete: if (d.result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (mode <= 0 && !claim()) + return null; + @SuppressWarnings("unchecked") T t = (T) r; + CompletableFuture<V> g = f.apply(t).toCompletableFuture(); + if ((r = g.result) != null) + d.completeRelay(r); + else { + g.unipush(new UniRelay<V>(d, g)); + if (d.result == null) + return null; + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; fn = null; return d.postFire(a, mode); } } - final <S> boolean uniCompose( - CompletableFuture<S> a, - Function<? super S, ? extends CompletionStage<T>> f, - UniCompose<S,T> c) { - Object r; Throwable x; - if (a == null || (r = a.result) == null || f == null) - return false; - tryComplete: if (result == null) { - if (r instanceof AltResult) { - if ((x = ((AltResult)r).ex) != null) { - completeThrowable(x, r); - break tryComplete; - } - r = null; - } - try { - if (c != null && !c.claim()) - return false; - @SuppressWarnings("unchecked") S s = (S) r; - CompletableFuture<T> g = f.apply(s).toCompletableFuture(); - if (g.result == null || !uniRelay(g)) { - UniRelay<T> copy = new UniRelay<T>(this, g); - g.push(copy); - copy.tryFire(SYNC); - if (result == null) - return false; - } - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private <V> CompletableFuture<V> uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f) { if (f == null) throw new NullPointerException(); + CompletableFuture<V> d = newIncompleteFuture(); Object r, s; Throwable x; - CompletableFuture<V> d = newIncompleteFuture(); - if (e == null && (r = result) != null) { + if ((r = result) == null) + unipush(new UniCompose<T,V>(e, d, this, f)); + else if (e == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { d.result = encodeThrowable(x, r); @@ -1041,21 +1101,20 @@ @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); if ((s = g.result) != null) - d.completeRelay(s); + d.result = encodeRelay(s); else { - UniRelay<V> c = new UniRelay<V>(d, g); - g.push(c); - c.tryFire(SYNC); + g.unipush(new UniRelay<V>(d, g)); } - return d; } catch (Throwable ex) { d.result = encodeThrowable(ex); - return d; } } - UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); - push(c); - c.tryFire(SYNC); + else + try { + e.execute(new UniCompose<T,V>(null, d, this, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @@ -1085,21 +1144,28 @@ } final boolean isLive() { BiCompletion<?,?,?> c; - return (c = base) != null && c.dep != null; + return (c = base) != null + // && c.isLive() + && c.dep != null; } } - /** Pushes completion to this and b unless both done. */ + /** + * Pushes completion to this and b unless both done. + * Caller should first check that either result or b.result is null. + */ final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { if (c != null) { - Object r; - while ((r = result) == null && !tryPushStack(c)) - lazySetNext(c, null); // clear on failure - if (b != null && b != this && b.result == null) { - Completion q = (r != null) ? c : new CoCompletion(c); - while (b.result == null && !b.tryPushStack(q)) - lazySetNext(q, null); // clear on failure + while (result == null) { + if (tryPushStack(c)) { + if (b.result == null) + b.unipush(new CoCompletion(c)); + else if (result != null) + c.tryFire(SYNC); + return; + } } + b.unipush(c); } } @@ -1107,9 +1173,10 @@ final CompletableFuture<T> postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode) { if (b != null && b.stack != null) { // clean second source - if (b.result == null) + Object r; + if ((r = b.result) == null) b.cleanStack(); - else if (mode >= 0) + if (mode >= 0 && (r != null || b.result != null)) b.postComplete(); } return postFire(a, mode); @@ -1127,22 +1194,21 @@ CompletableFuture<V> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r, s; BiFunction<? super T,? super U,? extends V> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || (b = snd) == null || (s = b.result) == null + || !d.biApply(r, s, f, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final <R,S> boolean biApply(CompletableFuture<R> a, - CompletableFuture<S> b, + final <R,S> boolean biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c) { - Object r, s; Throwable x; - if (a == null || (r = a.result) == null || - b == null || (s = b.result) == null || f == null) - return false; + Throwable x; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { @@ -1174,15 +1240,20 @@ private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { - CompletableFuture<U> b; + CompletableFuture<U> b; Object r, s; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = newIncompleteFuture(); - if (e != null || !d.biApply(this, b, f, null)) { - BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); - bipush(b, c); - c.tryFire(SYNC); - } + if ((r = result) == null || (s = b.result) == null) + bipush(b, new BiApply<T,U,V>(e, d, this, b, f)); + else if (e == null) + d.biApply(r, s, f, null); + else + try { + e.execute(new BiApply<T,U,V>(null, d, this, b, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @@ -1198,22 +1269,21 @@ CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r, s; BiConsumer<? super T,? super U> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || (b = snd) == null || (s = b.result) == null + || !d.biAccept(r, s, f, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final <R,S> boolean biAccept(CompletableFuture<R> a, - CompletableFuture<S> b, + final <R,S> boolean biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c) { - Object r, s; Throwable x; - if (a == null || (r = a.result) == null || - b == null || (s = b.result) == null || f == null) - return false; + Throwable x; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { @@ -1246,15 +1316,20 @@ private <U> CompletableFuture<Void> biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f) { - CompletableFuture<U> b; + CompletableFuture<U> b; Object r, s; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.biAccept(this, b, f, null)) { - BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); - bipush(b, c); - c.tryFire(SYNC); - } + if ((r = result) == null || (s = b.result) == null) + bipush(b, new BiAccept<T,U>(e, d, this, b, f)); + else if (e == null) + d.biAccept(r, s, f, null); + else + try { + e.execute(new BiAccept<T,U>(null, d, this, b, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @@ -1262,8 +1337,7 @@ static final class BiRun<T,U> extends BiCompletion<T,U,Void> { Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, - CompletableFuture<T> src, - CompletableFuture<U> snd, + CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn) { super(executor, dep, src, snd); this.fn = fn; } @@ -1271,25 +1345,25 @@ CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r, s; Runnable f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null + || (b = snd) == null || (s = b.result) == null + || !d.biRun(r, s, f, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, - Runnable f, BiRun<?,?> c) { - Object r, s; Throwable x; - if (a == null || (r = a.result) == null || - b == null || (s = b.result) == null || f == null) - return false; + final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) { + Throwable x; Object z; if (result == null) { - if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) - completeThrowable(x, r); - else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) - completeThrowable(x, s); + if ((r instanceof AltResult + && (x = ((AltResult)(z = r)).ex) != null) || + (s instanceof AltResult + && (x = ((AltResult)(z = s)).ex) != null)) + completeThrowable(x, z); else try { if (c != null && !c.claim()) @@ -1305,52 +1379,52 @@ private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, Runnable f) { - CompletableFuture<?> b; + CompletableFuture<?> b; Object r, s; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.biRun(this, b, f, null)) { - BiRun<T,?> c = new BiRun<>(e, d, this, b, f); - bipush(b, c); - c.tryFire(SYNC); - } + if ((r = result) == null || (s = b.result) == null) + bipush(b, new BiRun<>(e, d, this, b, f)); + else if (e == null) + d.biRun(r, s, f, null); + else + try { + e.execute(new BiRun<>(null, d, this, b, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @SuppressWarnings("serial") static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, - CompletableFuture<T> src, - CompletableFuture<U> snd) { + CompletableFuture<T> src, CompletableFuture<U> snd) { super(null, dep, src, snd); } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || !d.biRelay(a = src, b = snd)) + Object r, s, z; Throwable x; + if ((d = dep) == null + || (a = src) == null || (r = a.result) == null + || (b = snd) == null || (s = b.result) == null) return null; + if (d.result == null) { + if ((r instanceof AltResult + && (x = ((AltResult)(z = r)).ex) != null) || + (s instanceof AltResult + && (x = ((AltResult)(z = s)).ex) != null)) + d.completeThrowable(x, z); + else + d.completeNull(); + } src = null; snd = null; dep = null; return d.postFire(a, b, mode); } } - boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { - Object r, s; Throwable x; - if (a == null || (r = a.result) == null || - b == null || (s = b.result) == null) - return false; - if (result == null) { - if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) - completeThrowable(x, r); - else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) - completeThrowable(x, s); - else - completeNull(); - } - return true; - } - /** Recursively constructs a tree of completions. */ static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { @@ -1358,39 +1432,44 @@ if (lo > hi) // empty d.result = NIL; else { - CompletableFuture<?> a, b; + CompletableFuture<?> a, b; Object r, s, z; Throwable x; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); - if (!d.biRelay(a, b)) { - BiRelay<?,?> c = new BiRelay<>(d, a, b); - a.bipush(b, c); - c.tryFire(SYNC); - } + if ((r = a.result) == null || (s = b.result) == null) + a.bipush(b, new BiRelay<>(d, a, b)); + else if ((r instanceof AltResult + && (x = ((AltResult)(z = r)).ex) != null) || + (s instanceof AltResult + && (x = ((AltResult)(z = s)).ex) != null)) + d.result = encodeThrowable(x, z); + else + d.result = NIL; } return d; } /* ------------- Projected (Ored) BiCompletions -------------- */ - /** Pushes completion to this and b unless either done. */ + /** + * Pushes completion to this and b unless either done. + * Caller should first check that result and b.result are both null. + */ final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { if (c != null) { - while ((b == null || b.result == null) && result == null) { - if (tryPushStack(c)) { - if (b != null && b != this && b.result == null) { - Completion q = new CoCompletion(c); - while (result == null && b.result == null && - !b.tryPushStack(q)) - lazySetNext(q, null); // clear on failure - } + while (!tryPushStack(c)) { + if (result != null) { + NEXT.set(c, null); break; } - lazySetNext(c, null); // clear on failure } + if (result != null) + c.tryFire(SYNC); + else + b.unipush(new CoCompletion(c)); } } @@ -1398,8 +1477,7 @@ static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, - CompletableFuture<T> src, - CompletableFuture<U> snd, + CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn) { super(executor, dep, src, snd); this.fn = fn; } @@ -1407,54 +1485,46 @@ CompletableFuture<V> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r; Throwable x; Function<? super T,? extends V> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (b = snd) == null + || ((r = a.result) == null && (r = b.result) == null)) return null; + tryComplete: if (d.result == null) { + try { + if (mode <= 0 && !claim()) + return null; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") T t = (T) r; + d.completeValue(f.apply(t)); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final <R,S extends R> boolean orApply(CompletableFuture<R> a, - CompletableFuture<S> b, - Function<? super R, ? extends T> f, - OrApply<R,S,T> c) { - Object r; Throwable x; - if (a == null || b == null || - ((r = a.result) == null && (r = b.result) == null) || f == null) - return false; - tryComplete: if (result == null) { - try { - if (c != null && !c.claim()) - return false; - if (r instanceof AltResult) { - if ((x = ((AltResult)r).ex) != null) { - completeThrowable(x, r); - break tryComplete; - } - r = null; - } - @SuppressWarnings("unchecked") R rr = (R) r; - completeValue(f.apply(rr)); - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private <U extends T,V> CompletableFuture<V> orApplyStage( - Executor e, CompletionStage<U> o, - Function<? super T, ? extends V> f) { + Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); + + Object r; CompletableFuture<? extends T> z; + if ((r = (z = this).result) != null || + (r = (z = b).result) != null) + return z.uniApplyNow(r, e, f); + CompletableFuture<V> d = newIncompleteFuture(); - if (e != null || !d.orApply(this, b, f, null)) { - OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); - orpush(b, c); - c.tryFire(SYNC); - } + orpush(b, new OrApply<T,U,V>(e, d, this, b, f)); return d; } @@ -1462,8 +1532,7 @@ static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, - CompletableFuture<T> src, - CompletableFuture<U> snd, + CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn) { super(executor, dep, src, snd); this.fn = fn; } @@ -1471,54 +1540,47 @@ CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r; Throwable x; Consumer<? super T> f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (b = snd) == null + || ((r = a.result) == null && (r = b.result) == null)) return null; + tryComplete: if (d.result == null) { + try { + if (mode <= 0 && !claim()) + return null; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + d.completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") T t = (T) r; + f.accept(t); + d.completeNull(); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final <R,S extends R> boolean orAccept(CompletableFuture<R> a, - CompletableFuture<S> b, - Consumer<? super R> f, - OrAccept<R,S> c) { - Object r; Throwable x; - if (a == null || b == null || - ((r = a.result) == null && (r = b.result) == null) || f == null) - return false; - tryComplete: if (result == null) { - try { - if (c != null && !c.claim()) - return false; - if (r instanceof AltResult) { - if ((x = ((AltResult)r).ex) != null) { - completeThrowable(x, r); - break tryComplete; - } - r = null; - } - @SuppressWarnings("unchecked") R rr = (R) r; - f.accept(rr); - completeNull(); - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private <U extends T> CompletableFuture<Void> orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); + + Object r; CompletableFuture<? extends T> z; + if ((r = (z = this).result) != null || + (r = (z = b).result) != null) + return z.uniAcceptNow(r, e, f); + CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.orAccept(this, b, f, null)) { - OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); - orpush(b, c); - c.tryFire(SYNC); - } + orpush(b, new OrAccept<T,U>(e, d, this, b, f)); return d; } @@ -1526,8 +1588,7 @@ static final class OrRun<T,U> extends BiCompletion<T,U,Void> { Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, - CompletableFuture<T> src, - CompletableFuture<U> snd, + CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn) { super(executor, dep, src, snd); this.fn = fn; } @@ -1535,95 +1596,84 @@ CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || - !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) + Object r; Throwable x; Runnable f; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (b = snd) == null + || ((r = a.result) == null && (r = b.result) == null)) return null; + if (d.result == null) { + try { + if (mode <= 0 && !claim()) + return null; + else if (r instanceof AltResult + && (x = ((AltResult)r).ex) != null) + d.completeThrowable(x, r); + else { + f.run(); + d.completeNull(); + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } - final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, - Runnable f, OrRun<?,?> c) { - Object r; Throwable x; - if (a == null || b == null || - ((r = a.result) == null && (r = b.result) == null) || f == null) - return false; - if (result == null) { - try { - if (c != null && !c.claim()) - return false; - if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) - completeThrowable(x, r); - else { - f.run(); - completeNull(); - } - } catch (Throwable ex) { - completeThrowable(ex); - } - } - return true; - } - private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); + + Object r; CompletableFuture<?> z; + if ((r = (z = this).result) != null || + (r = (z = b).result) != null) + return z.uniRunNow(r, e, f); + CompletableFuture<Void> d = newIncompleteFuture(); - if (e != null || !d.orRun(this, b, f, null)) { - OrRun<T,?> c = new OrRun<>(e, d, this, b, f); - orpush(b, c); - c.tryFire(SYNC); - } + orpush(b, new OrRun<>(e, d, this, b, f)); return d; } @SuppressWarnings("serial") static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or - OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, - CompletableFuture<U> snd) { + OrRelay(CompletableFuture<Object> dep, + CompletableFuture<T> src, CompletableFuture<U> snd) { super(null, dep, src, snd); } final CompletableFuture<Object> tryFire(int mode) { CompletableFuture<Object> d; CompletableFuture<T> a; CompletableFuture<U> b; - if ((d = dep) == null || !d.orRelay(a = src, b = snd)) + Object r; + if ((d = dep) == null + || (a = src) == null || (b = snd) == null + || ((r = a.result) == null && (r = b.result) == null)) return null; + d.completeRelay(r); src = null; snd = null; dep = null; return d.postFire(a, b, mode); } } - final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { - Object r; - if (a == null || b == null || - ((r = a.result) == null && (r = b.result) == null)) - return false; - if (result == null) - completeRelay(r); - return true; - } - /** Recursively constructs a tree of completions. */ static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Object> d = new CompletableFuture<Object>(); if (lo <= hi) { - CompletableFuture<?> a, b; + CompletableFuture<?> a, b; Object r; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); - if (!d.orRelay(a, b)) { - OrRelay<?,?> c = new OrRelay<>(d, a, b); - a.orpush(b, c); - c.tryFire(SYNC); - } + if ((r = a.result) != null && (r = b.result) != null) + d.result = encodeRelay(r); + else + a.orpush(b, new OrRelay<>(d, a, b)); } return d; } @@ -1640,7 +1690,7 @@ public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} - public final boolean exec() { run(); return true; } + public final boolean exec() { run(); return false; } public void run() { CompletableFuture<T> d; Supplier<? extends T> f; @@ -1676,7 +1726,7 @@ public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} - public final boolean exec() { run(); return true; } + public final boolean exec() { run(); return false; } public void run() { CompletableFuture<Void> d; Runnable f; @@ -1760,15 +1810,13 @@ private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; - int spins = SPINS; Object r; while ((r = result) == null) { - if (spins > 0) { - if (ThreadLocalRandom.nextSecondarySeed() >= 0) - --spins; + if (q == null) { + q = new Signaller(interruptible, 0L, 0L); + if (Thread.currentThread() instanceof ForkJoinWorkerThread) + ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); } - else if (q == null) - q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = tryPushStack(q); else { @@ -1781,16 +1829,14 @@ break; } } - if (q != null) { + if (q != null && queued) { q.thread = null; - if (q.interrupted) { - if (interruptible) - cleanStack(); - else - Thread.currentThread().interrupt(); - } + if (!interruptible && q.interrupted) + Thread.currentThread().interrupt(); + if (r == null) + cleanStack(); } - if (r != null) + if (r != null || (r = result) != null) postComplete(); return r; } @@ -1808,9 +1854,12 @@ Signaller q = null; boolean queued = false; Object r; - while ((r = result) == null) { // similar to untimed, without spins - if (q == null) + while ((r = result) == null) { // similar to untimed + if (q == null) { q = new Signaller(true, nanos, deadline); + if (Thread.currentThread() instanceof ForkJoinWorkerThread) + ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); + } else if (!queued) queued = tryPushStack(q); else if (q.nanos <= 0L) @@ -1825,12 +1874,13 @@ break; } } - if (q != null) + if (q != null && queued) { q.thread = null; - if (r != null) + if (r == null) + cleanStack(); + } + if (r != null || (r = result) != null) postComplete(); - else - cleanStack(); if (r != null || (q != null && q.interrupted)) return r; } @@ -1942,9 +1992,12 @@ * @throws InterruptedException if the current thread was interrupted * while waiting */ + @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException { Object r; - return reportGet((r = result) == null ? waitingGet(true) : r); + if ((r = result) == null) + r = waitingGet(true); + return (T) reportGet(r); } /** @@ -1960,11 +2013,14 @@ * while waiting * @throws TimeoutException if the wait timed out */ + @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long nanos = unit.toNanos(timeout); Object r; - long nanos = unit.toNanos(timeout); - return reportGet((r = result) == null ? timedGet(nanos) : r); + if ((r = result) == null) + r = timedGet(nanos); + return (T) reportGet(r); } /** @@ -1981,9 +2037,12 @@ * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ + @SuppressWarnings("unchecked") public T join() { Object r; - return reportJoin((r = result) == null ? waitingGet(false) : r); + if ((r = result) == null) + r = waitingGet(false); + return (T) reportJoin(r); } /** @@ -1996,9 +2055,10 @@ * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ + @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) { Object r; - return ((r = result) == null) ? valueIfAbsent : reportJoin(r); + return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r); } /** @@ -2775,19 +2835,16 @@ throw new UnsupportedOperationException(); } } - // Unsafe mechanics - private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); - private static final long RESULT; - private static final long STACK; - private static final long NEXT; + // VarHandle mechanics + private static final VarHandle RESULT; + private static final VarHandle STACK; + private static final VarHandle NEXT; static { try { - RESULT = U.objectFieldOffset - (CompletableFuture.class.getDeclaredField("result")); - STACK = U.objectFieldOffset - (CompletableFuture.class.getDeclaredField("stack")); - NEXT = U.objectFieldOffset - (Completion.class.getDeclaredField("next")); + MethodHandles.Lookup l = MethodHandles.lookup(); + RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); + STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class); + NEXT = l.findVarHandle(Completion.class, "next", Completion.class); } catch (ReflectiveOperationException e) { throw new Error(e); }
--- a/test/java/util/concurrent/tck/CompletableFutureTest.java Fri Jul 15 13:55:51 2016 -0700 +++ b/test/java/util/concurrent/tck/CompletableFutureTest.java Fri Jul 15 13:59:58 2016 -0700 @@ -57,6 +57,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -486,62 +487,68 @@ class FailingSupplier extends CheckedAction implements Supplier<Integer> { - FailingSupplier(ExecutionMode m) { super(m); } + final CFException ex; + FailingSupplier(ExecutionMode m) { super(m); ex = new CFException(); } public Integer get() { invoked(); - throw new CFException(); + throw ex; } } class FailingConsumer extends CheckedIntegerAction implements Consumer<Integer> { - FailingConsumer(ExecutionMode m) { super(m); } + final CFException ex; + FailingConsumer(ExecutionMode m) { super(m); ex = new CFException(); } public void accept(Integer x) { invoked(); value = x; - throw new CFException(); + throw ex; } } class FailingBiConsumer extends CheckedIntegerAction implements BiConsumer<Integer, Integer> { - FailingBiConsumer(ExecutionMode m) { super(m); } + final CFException ex; + FailingBiConsumer(ExecutionMode m) { super(m); ex = new CFException(); } public void accept(Integer x, Integer y) { invoked(); value = subtract(x, y); - throw new CFException(); + throw ex; } } class FailingFunction extends CheckedIntegerAction implements Function<Integer, Integer> { - FailingFunction(ExecutionMode m) { super(m); } + final CFException ex; + FailingFunction(ExecutionMode m) { super(m); ex = new CFException(); } public Integer apply(Integer x) { invoked(); value = x; - throw new CFException(); + throw ex; } } class FailingBiFunction extends CheckedIntegerAction implements BiFunction<Integer, Integer, Integer> { - FailingBiFunction(ExecutionMode m) { super(m); } + final CFException ex; + FailingBiFunction(ExecutionMode m) { super(m); ex = new CFException(); } public Integer apply(Integer x, Integer y) { invoked(); value = subtract(x, y); - throw new CFException(); + throw ex; } } class FailingRunnable extends CheckedAction implements Runnable { - FailingRunnable(ExecutionMode m) { super(m); } + final CFException ex; + FailingRunnable(ExecutionMode m) { super(m); ex = new CFException(); } public void run() { invoked(); - throw new CFException(); + throw ex; } } @@ -561,11 +568,21 @@ class FailingCompletableFutureFunction extends CheckedIntegerAction implements Function<Integer, CompletableFuture<Integer>> { - FailingCompletableFutureFunction(ExecutionMode m) { super(m); } + final CFException ex; + FailingCompletableFutureFunction(ExecutionMode m) { super(m); ex = new CFException(); } public CompletableFuture<Integer> apply(Integer x) { invoked(); value = x; - throw new CFException(); + throw ex; + } + } + + static class CountingRejectingExecutor implements Executor { + final RejectedExecutionException ex = new RejectedExecutionException(); + final AtomicInteger count = new AtomicInteger(0); + public void execute(Runnable r) { + count.getAndIncrement(); + throw ex; } } @@ -1249,10 +1266,22 @@ { final FailingRunnable r = new FailingRunnable(m); final CompletableFuture<Void> f = m.runAsync(r); - checkCompletedWithWrappedCFException(f); + checkCompletedWithWrappedException(f, r.ex); r.assertInvoked(); }} + public void testRunAsync_rejectingExecutor() { + CountingRejectingExecutor e = new CountingRejectingExecutor(); + try { + CompletableFuture.runAsync(() -> {}, e); + shouldThrow(); + } catch (Throwable t) { + assertSame(e.ex, t); + } + + assertEquals(1, e.count.get()); + } + /** * supplyAsync completes with result of supplier */ @@ -1283,10 +1312,22 @@ { FailingSupplier r = new FailingSupplier(m); CompletableFuture<Integer> f = m.supplyAsync(r); - checkCompletedWithWrappedCFException(f); + checkCompletedWithWrappedException(f, r.ex); r.assertInvoked(); }} + public void testSupplyAsync_rejectingExecutor() { + CountingRejectingExecutor e = new CountingRejectingExecutor(); + try { + CompletableFuture.supplyAsync(() -> null, e); + shouldThrow(); + } catch (Throwable t) { + assertSame(e.ex, t); + } + + assertEquals(1, e.count.get()); + } + // seq completion methods /** @@ -1405,12 +1446,12 @@ final CompletableFuture<Void> h4 = m.runAfterBoth(f, f, rs[4]); final CompletableFuture<Void> h5 = m.runAfterEither(f, f, rs[5]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); - checkCompletedWithWrappedCFException(h4); - checkCompletedWithWrappedCFException(h5); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); + checkCompletedWithWrappedException(h4, rs[4].ex); + checkCompletedWithWrappedException(h5, rs[5].ex); checkCompletedNormally(f, v1); }} @@ -1509,10 +1550,10 @@ final CompletableFuture<Integer> h2 = m.thenApply(f, rs[2]); final CompletableFuture<Integer> h3 = m.applyToEither(f, f, rs[3]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); checkCompletedNormally(f, v1); }} @@ -1611,10 +1652,10 @@ final CompletableFuture<Void> h2 = m.thenAccept(f, rs[2]); final CompletableFuture<Void> h3 = m.acceptEither(f, f, rs[3]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); checkCompletedNormally(f, v1); }} @@ -1776,9 +1817,9 @@ assertTrue(snd.complete(w2)); final CompletableFuture<Integer> h3 = m.thenCombine(f, g, r3); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h1, r1.ex); + checkCompletedWithWrappedException(h2, r2.ex); + checkCompletedWithWrappedException(h3, r3.ex); r1.assertInvoked(); r2.assertInvoked(); r3.assertInvoked(); @@ -1940,9 +1981,9 @@ assertTrue(snd.complete(w2)); final CompletableFuture<Void> h3 = m.thenAcceptBoth(f, g, r3); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h1, r1.ex); + checkCompletedWithWrappedException(h2, r2.ex); + checkCompletedWithWrappedException(h3, r3.ex); r1.assertInvoked(); r2.assertInvoked(); r3.assertInvoked(); @@ -2104,9 +2145,9 @@ assertTrue(snd.complete(w2)); final CompletableFuture<Void> h3 = m.runAfterBoth(f, g, r3); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h1, r1.ex); + checkCompletedWithWrappedException(h2, r2.ex); + checkCompletedWithWrappedException(h3, r3.ex); r1.assertInvoked(); r2.assertInvoked(); r3.assertInvoked(); @@ -2396,10 +2437,10 @@ f.complete(v1); final CompletableFuture<Integer> h2 = m.applyToEither(f, g, rs[2]); final CompletableFuture<Integer> h3 = m.applyToEither(g, f, rs[3]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); for (int i = 0; i < 4; i++) rs[i].assertValue(v1); g.complete(v2); @@ -2408,10 +2449,10 @@ final CompletableFuture<Integer> h4 = m.applyToEither(f, g, rs[4]); final CompletableFuture<Integer> h5 = m.applyToEither(g, f, rs[5]); - checkCompletedWithWrappedCFException(h4); + checkCompletedWithWrappedException(h4, rs[4].ex); assertTrue(Objects.equals(v1, rs[4].value) || Objects.equals(v2, rs[4].value)); - checkCompletedWithWrappedCFException(h5); + checkCompletedWithWrappedException(h5, rs[5].ex); assertTrue(Objects.equals(v1, rs[5].value) || Objects.equals(v2, rs[5].value)); @@ -2655,10 +2696,10 @@ f.complete(v1); final CompletableFuture<Void> h2 = m.acceptEither(f, g, rs[2]); final CompletableFuture<Void> h3 = m.acceptEither(g, f, rs[3]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); for (int i = 0; i < 4; i++) rs[i].assertValue(v1); g.complete(v2); @@ -2667,10 +2708,10 @@ final CompletableFuture<Void> h4 = m.acceptEither(f, g, rs[4]); final CompletableFuture<Void> h5 = m.acceptEither(g, f, rs[5]); - checkCompletedWithWrappedCFException(h4); + checkCompletedWithWrappedException(h4, rs[4].ex); assertTrue(Objects.equals(v1, rs[4].value) || Objects.equals(v2, rs[4].value)); - checkCompletedWithWrappedCFException(h5); + checkCompletedWithWrappedException(h5, rs[5].ex); assertTrue(Objects.equals(v1, rs[5].value) || Objects.equals(v2, rs[5].value)); @@ -2686,6 +2727,7 @@ for (ExecutionMode m : ExecutionMode.values()) for (Integer v1 : new Integer[] { 1, null }) for (Integer v2 : new Integer[] { 2, null }) + for (boolean pushNop : new boolean[] { true, false }) { final CompletableFuture<Integer> f = new CompletableFuture<>(); final CompletableFuture<Integer> g = new CompletableFuture<>(); @@ -2698,6 +2740,10 @@ checkIncomplete(h1); rs[0].assertNotInvoked(); rs[1].assertNotInvoked(); + if (pushNop) { // ad hoc test of intra-completion interference + m.thenRun(f, () -> {}); + m.thenRun(g, () -> {}); + } f.complete(v1); checkCompletedNormally(h0, null); checkCompletedNormally(h1, null); @@ -2910,16 +2956,16 @@ assertTrue(f.complete(v1)); final CompletableFuture<Void> h2 = m.runAfterEither(f, g, rs[2]); final CompletableFuture<Void> h3 = m.runAfterEither(g, f, rs[3]); - checkCompletedWithWrappedCFException(h0); - checkCompletedWithWrappedCFException(h1); - checkCompletedWithWrappedCFException(h2); - checkCompletedWithWrappedCFException(h3); + checkCompletedWithWrappedException(h0, rs[0].ex); + checkCompletedWithWrappedException(h1, rs[1].ex); + checkCompletedWithWrappedException(h2, rs[2].ex); + checkCompletedWithWrappedException(h3, rs[3].ex); for (int i = 0; i < 4; i++) rs[i].assertInvoked(); assertTrue(g.complete(v2)); final CompletableFuture<Void> h4 = m.runAfterEither(f, g, rs[4]); final CompletableFuture<Void> h5 = m.runAfterEither(g, f, rs[5]); - checkCompletedWithWrappedCFException(h4); - checkCompletedWithWrappedCFException(h5); + checkCompletedWithWrappedException(h4, rs[4].ex); + checkCompletedWithWrappedException(h5, rs[5].ex); checkCompletedNormally(f, v1); checkCompletedNormally(g, v2); @@ -2980,7 +3026,7 @@ final CompletableFuture<Integer> g = m.thenCompose(f, r); if (createIncomplete) assertTrue(f.complete(v1)); - checkCompletedWithWrappedCFException(g); + checkCompletedWithWrappedException(g, r.ex); checkCompletedNormally(f, v1); }} @@ -3089,7 +3135,7 @@ } } - public void testAllOf_backwards() throws Exception { + public void testAllOf_normal_backwards() throws Exception { for (int k = 1; k < 10; k++) { CompletableFuture<Integer>[] fs = (CompletableFuture<Integer>[]) new CompletableFuture[k]; @@ -3337,6 +3383,151 @@ } /** + * Test submissions to an executor that rejects all tasks. + */ + public void testRejectingExecutor() { + for (Integer v : new Integer[] { 1, null }) + { + final CountingRejectingExecutor e = new CountingRejectingExecutor(); + + final CompletableFuture<Integer> complete = CompletableFuture.completedFuture(v); + final CompletableFuture<Integer> incomplete = new CompletableFuture<>(); + + List<CompletableFuture<?>> futures = new ArrayList<>(); + + List<CompletableFuture<Integer>> srcs = new ArrayList<>(); + srcs.add(complete); + srcs.add(incomplete); + + for (CompletableFuture<Integer> src : srcs) { + List<CompletableFuture<?>> fs = new ArrayList<>(); + fs.add(src.thenRunAsync(() -> {}, e)); + fs.add(src.thenAcceptAsync((z) -> {}, e)); + fs.add(src.thenApplyAsync((z) -> z, e)); + + fs.add(src.thenCombineAsync(src, (x, y) -> x, e)); + fs.add(src.thenAcceptBothAsync(src, (x, y) -> {}, e)); + fs.add(src.runAfterBothAsync(src, () -> {}, e)); + + fs.add(src.applyToEitherAsync(src, (z) -> z, e)); + fs.add(src.acceptEitherAsync(src, (z) -> {}, e)); + fs.add(src.runAfterEitherAsync(src, () -> {}, e)); + + fs.add(src.thenComposeAsync((z) -> null, e)); + fs.add(src.whenCompleteAsync((z, t) -> {}, e)); + fs.add(src.handleAsync((z, t) -> null, e)); + + for (CompletableFuture<?> future : fs) { + if (src.isDone()) + checkCompletedWithWrappedException(future, e.ex); + else + checkIncomplete(future); + } + futures.addAll(fs); + } + + { + List<CompletableFuture<?>> fs = new ArrayList<>(); + + fs.add(complete.thenCombineAsync(incomplete, (x, y) -> x, e)); + fs.add(incomplete.thenCombineAsync(complete, (x, y) -> x, e)); + + fs.add(complete.thenAcceptBothAsync(incomplete, (x, y) -> {}, e)); + fs.add(incomplete.thenAcceptBothAsync(complete, (x, y) -> {}, e)); + + fs.add(complete.runAfterBothAsync(incomplete, () -> {}, e)); + fs.add(incomplete.runAfterBothAsync(complete, () -> {}, e)); + + for (CompletableFuture<?> future : fs) + checkIncomplete(future); + futures.addAll(fs); + } + + { + List<CompletableFuture<?>> fs = new ArrayList<>(); + + fs.add(complete.applyToEitherAsync(incomplete, (z) -> z, e)); + fs.add(incomplete.applyToEitherAsync(complete, (z) -> z, e)); + + fs.add(complete.acceptEitherAsync(incomplete, (z) -> {}, e)); + fs.add(incomplete.acceptEitherAsync(complete, (z) -> {}, e)); + + fs.add(complete.runAfterEitherAsync(incomplete, () -> {}, e)); + fs.add(incomplete.runAfterEitherAsync(complete, () -> {}, e)); + + for (CompletableFuture<?> future : fs) + checkCompletedWithWrappedException(future, e.ex); + futures.addAll(fs); + } + + incomplete.complete(v); + + for (CompletableFuture<?> future : futures) + checkCompletedWithWrappedException(future, e.ex); + + assertEquals(futures.size(), e.count.get()); + }} + + /** + * Test submissions to an executor that rejects all tasks, but + * should never be invoked because the dependent future is + * explicitly completed. + */ + public void testRejectingExecutorNeverInvoked() { + for (Integer v : new Integer[] { 1, null }) + { + final CountingRejectingExecutor e = new CountingRejectingExecutor(); + + final CompletableFuture<Integer> complete = CompletableFuture.completedFuture(v); + final CompletableFuture<Integer> incomplete = new CompletableFuture<>(); + + List<CompletableFuture<?>> futures = new ArrayList<>(); + + List<CompletableFuture<Integer>> srcs = new ArrayList<>(); + srcs.add(complete); + srcs.add(incomplete); + + List<CompletableFuture<?>> fs = new ArrayList<>(); + fs.add(incomplete.thenRunAsync(() -> {}, e)); + fs.add(incomplete.thenAcceptAsync((z) -> {}, e)); + fs.add(incomplete.thenApplyAsync((z) -> z, e)); + + fs.add(incomplete.thenCombineAsync(incomplete, (x, y) -> x, e)); + fs.add(incomplete.thenAcceptBothAsync(incomplete, (x, y) -> {}, e)); + fs.add(incomplete.runAfterBothAsync(incomplete, () -> {}, e)); + + fs.add(incomplete.applyToEitherAsync(incomplete, (z) -> z, e)); + fs.add(incomplete.acceptEitherAsync(incomplete, (z) -> {}, e)); + fs.add(incomplete.runAfterEitherAsync(incomplete, () -> {}, e)); + + fs.add(incomplete.thenComposeAsync((z) -> null, e)); + fs.add(incomplete.whenCompleteAsync((z, t) -> {}, e)); + fs.add(incomplete.handleAsync((z, t) -> null, e)); + + fs.add(complete.thenCombineAsync(incomplete, (x, y) -> x, e)); + fs.add(incomplete.thenCombineAsync(complete, (x, y) -> x, e)); + + fs.add(complete.thenAcceptBothAsync(incomplete, (x, y) -> {}, e)); + fs.add(incomplete.thenAcceptBothAsync(complete, (x, y) -> {}, e)); + + fs.add(complete.runAfterBothAsync(incomplete, () -> {}, e)); + fs.add(incomplete.runAfterBothAsync(complete, () -> {}, e)); + + for (CompletableFuture<?> future : fs) + checkIncomplete(future); + + for (CompletableFuture<?> future : fs) + future.complete(null); + + incomplete.complete(v); + + for (CompletableFuture<?> future : fs) + checkCompletedNormally(future, null); + + assertEquals(0, e.count.get()); + }} + + /** * toCompletableFuture returns this CompletableFuture. */ public void testToCompletableFuture() { @@ -3659,12 +3850,25 @@ //--- tests of implementation details; not part of official tck --- Object resultOf(CompletableFuture<?> f) { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + try { + System.setSecurityManager(null); + } catch (SecurityException giveUp) { + return "Reflection not available"; + } + } + try { java.lang.reflect.Field resultField = CompletableFuture.class.getDeclaredField("result"); resultField.setAccessible(true); return resultField.get(f); - } catch (Throwable t) { throw new AssertionError(t); } + } catch (Throwable t) { + throw new AssertionError(t); + } finally { + if (sm != null) System.setSecurityManager(sm); + } } public void testExceptionPropagationReusesResultObject() { @@ -3675,33 +3879,44 @@ final CompletableFuture<Integer> v42 = CompletableFuture.completedFuture(42); final CompletableFuture<Integer> incomplete = new CompletableFuture<>(); + final Runnable noopRunnable = new Noop(m); + final Consumer<Integer> noopConsumer = new NoopConsumer(m); + final Function<Integer, Integer> incFunction = new IncFunction(m); + List<Function<CompletableFuture<Integer>, CompletableFuture<?>>> funs = new ArrayList<>(); - funs.add((y) -> m.thenRun(y, new Noop(m))); - funs.add((y) -> m.thenAccept(y, new NoopConsumer(m))); - funs.add((y) -> m.thenApply(y, new IncFunction(m))); - - funs.add((y) -> m.runAfterEither(y, incomplete, new Noop(m))); - funs.add((y) -> m.acceptEither(y, incomplete, new NoopConsumer(m))); - funs.add((y) -> m.applyToEither(y, incomplete, new IncFunction(m))); - - funs.add((y) -> m.runAfterBoth(y, v42, new Noop(m))); + funs.add((y) -> m.thenRun(y, noopRunnable)); + funs.add((y) -> m.thenAccept(y, noopConsumer)); + funs.add((y) -> m.thenApply(y, incFunction)); + + funs.add((y) -> m.runAfterEither(y, incomplete, noopRunnable)); + funs.add((y) -> m.acceptEither(y, incomplete, noopConsumer)); + funs.add((y) -> m.applyToEither(y, incomplete, incFunction)); + + funs.add((y) -> m.runAfterBoth(y, v42, noopRunnable)); + funs.add((y) -> m.runAfterBoth(v42, y, noopRunnable)); funs.add((y) -> m.thenAcceptBoth(y, v42, new SubtractAction(m))); + funs.add((y) -> m.thenAcceptBoth(v42, y, new SubtractAction(m))); funs.add((y) -> m.thenCombine(y, v42, new SubtractFunction(m))); + funs.add((y) -> m.thenCombine(v42, y, new SubtractFunction(m))); funs.add((y) -> m.whenComplete(y, (Integer r, Throwable t) -> {})); funs.add((y) -> m.thenCompose(y, new CompletableFutureInc(m))); - funs.add((y) -> CompletableFuture.allOf(new CompletableFuture<?>[] {y, v42})); - funs.add((y) -> CompletableFuture.anyOf(new CompletableFuture<?>[] {y, incomplete})); + funs.add((y) -> CompletableFuture.allOf(y)); + funs.add((y) -> CompletableFuture.allOf(y, v42)); + funs.add((y) -> CompletableFuture.allOf(v42, y)); + funs.add((y) -> CompletableFuture.anyOf(y)); + funs.add((y) -> CompletableFuture.anyOf(y, incomplete)); + funs.add((y) -> CompletableFuture.anyOf(incomplete, y)); for (Function<CompletableFuture<Integer>, CompletableFuture<?>> fun : funs) { CompletableFuture<Integer> f = new CompletableFuture<>(); f.completeExceptionally(ex); - CompletableFuture<Integer> src = m.thenApply(f, new IncFunction(m)); + CompletableFuture<Integer> src = m.thenApply(f, incFunction); checkCompletedWithWrappedException(src, ex); CompletableFuture<?> dep = fun.apply(src); checkCompletedWithWrappedException(dep, ex); @@ -3711,7 +3926,7 @@ for (Function<CompletableFuture<Integer>, CompletableFuture<?>> fun : funs) { CompletableFuture<Integer> f = new CompletableFuture<>(); - CompletableFuture<Integer> src = m.thenApply(f, new IncFunction(m)); + CompletableFuture<Integer> src = m.thenApply(f, incFunction); CompletableFuture<?> dep = fun.apply(src); f.completeExceptionally(ex); checkCompletedWithWrappedException(src, ex); @@ -3725,7 +3940,7 @@ CompletableFuture<Integer> f = new CompletableFuture<>(); f.cancel(mayInterruptIfRunning); checkCancelled(f); - CompletableFuture<Integer> src = m.thenApply(f, new IncFunction(m)); + CompletableFuture<Integer> src = m.thenApply(f, incFunction); checkCompletedWithWrappedCancellationException(src); CompletableFuture<?> dep = fun.apply(src); checkCompletedWithWrappedCancellationException(dep); @@ -3736,7 +3951,7 @@ for (Function<CompletableFuture<Integer>, CompletableFuture<?>> fun : funs) { CompletableFuture<Integer> f = new CompletableFuture<>(); - CompletableFuture<Integer> src = m.thenApply(f, new IncFunction(m)); + CompletableFuture<Integer> src = m.thenApply(f, incFunction); CompletableFuture<?> dep = fun.apply(src); f.cancel(mayInterruptIfRunning); checkCancelled(f); @@ -3747,7 +3962,7 @@ }} /** - * Minimal completion stages throw UOE for all non-CompletionStage methods + * Minimal completion stages throw UOE for most non-CompletionStage methods */ public void testMinimalCompletionStage_minimality() { if (!testImplementationDetails) return; @@ -3776,8 +3991,10 @@ .filter((method) -> !permittedMethodSignatures.contains(toSignature.apply(method))) .collect(Collectors.toList()); - CompletionStage<Integer> minimalStage = - new CompletableFuture<Integer>().minimalCompletionStage(); + List<CompletionStage<Integer>> stages = new ArrayList<>(); + stages.add(new CompletableFuture<Integer>().minimalCompletionStage()); + stages.add(CompletableFuture.completedStage(1)); + stages.add(CompletableFuture.failedStage(new CFException())); List<Method> bugs = new ArrayList<>(); for (Method method : allMethods) { @@ -3793,20 +4010,22 @@ else if (parameterTypes[i] == long.class) args[i] = 0L; } - try { - method.invoke(minimalStage, args); - bugs.add(method); + for (CompletionStage<Integer> stage : stages) { + try { + method.invoke(stage, args); + bugs.add(method); + } + catch (java.lang.reflect.InvocationTargetException expected) { + if (! (expected.getCause() instanceof UnsupportedOperationException)) { + bugs.add(method); + // expected.getCause().printStackTrace(); + } + } + catch (ReflectiveOperationException bad) { throw new Error(bad); } } - catch (java.lang.reflect.InvocationTargetException expected) { - if (! (expected.getCause() instanceof UnsupportedOperationException)) { - bugs.add(method); - // expected.getCause().printStackTrace(); - } - } - catch (ReflectiveOperationException bad) { throw new Error(bad); } } if (!bugs.isEmpty()) - throw new Error("Methods did not throw UOE: " + bugs.toString()); + throw new Error("Methods did not throw UOE: " + bugs); } static class Monad { @@ -3955,12 +4174,33 @@ Monad.plus(godot, Monad.unit(5L))); } + /** Test long recursive chains of CompletableFutures with cascading completions */ + public void testRecursiveChains() throws Throwable { + for (ExecutionMode m : ExecutionMode.values()) + for (boolean addDeadEnds : new boolean[] { true, false }) + { + final int val = 42; + final int n = expensiveTests ? 1_000 : 2; + CompletableFuture<Integer> head = new CompletableFuture<>(); + CompletableFuture<Integer> tail = head; + for (int i = 0; i < n; i++) { + if (addDeadEnds) m.thenApply(tail, v -> v + 1); + tail = m.thenApply(tail, v -> v + 1); + if (addDeadEnds) m.applyToEither(tail, tail, v -> v + 1); + tail = m.applyToEither(tail, tail, v -> v + 1); + if (addDeadEnds) m.thenCombine(tail, tail, (v, w) -> v + 1); + tail = m.thenCombine(tail, tail, (v, w) -> v + 1); + } + head.complete(val); + assertEquals(val + 3 * n, (int) tail.join()); + }} + /** * A single CompletableFuture with many dependents. * A demo of scalability - runtime is O(n). */ public void testManyDependents() throws Throwable { - final int n = 1_000; + final int n = expensiveTests ? 1_000_000 : 10; final CompletableFuture<Void> head = new CompletableFuture<>(); final CompletableFuture<Void> complete = CompletableFuture.completedFuture((Void)null); final AtomicInteger count = new AtomicInteger(0); @@ -3987,6 +4227,78 @@ assertEquals(5 * 3 * n, count.get()); } + /** ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck */ + public void testCoCompletionGarbageRetention() throws Throwable { + final int n = expensiveTests ? 1_000_000 : 10; + final CompletableFuture<Integer> incomplete = new CompletableFuture<>(); + CompletableFuture<Integer> f; + for (int i = 0; i < n; i++) { + f = new CompletableFuture<>(); + f.runAfterEither(incomplete, () -> {}); + f.complete(null); + + f = new CompletableFuture<>(); + f.acceptEither(incomplete, (x) -> {}); + f.complete(null); + + f = new CompletableFuture<>(); + f.applyToEither(incomplete, (x) -> x); + f.complete(null); + + f = new CompletableFuture<>(); + CompletableFuture.anyOf(new CompletableFuture<?>[] { f, incomplete }); + f.complete(null); + } + + for (int i = 0; i < n; i++) { + f = new CompletableFuture<>(); + incomplete.runAfterEither(f, () -> {}); + f.complete(null); + + f = new CompletableFuture<>(); + incomplete.acceptEither(f, (x) -> {}); + f.complete(null); + + f = new CompletableFuture<>(); + incomplete.applyToEither(f, (x) -> x); + f.complete(null); + + f = new CompletableFuture<>(); + CompletableFuture.anyOf(new CompletableFuture<?>[] { incomplete, f }); + f.complete(null); + } + } + + /* + * Tests below currently fail in stress mode due to memory retention. + * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck + */ + + /** Checks for garbage retention with anyOf. */ + public void testAnyOfGarbageRetention() throws Throwable { + for (Integer v : new Integer[] { 1, null }) + { + final int n = expensiveTests ? 100_000 : 10; + CompletableFuture<Integer>[] fs + = (CompletableFuture<Integer>[]) new CompletableFuture<?>[100]; + for (int i = 0; i < fs.length; i++) + fs[i] = new CompletableFuture<>(); + fs[fs.length - 1].complete(v); + for (int i = 0; i < n; i++) + checkCompletedNormally(CompletableFuture.anyOf(fs), v); + }} + + /** Checks for garbage retention with allOf. */ + public void testCancelledAllOfGarbageRetention() throws Throwable { + final int n = expensiveTests ? 100_000 : 10; + CompletableFuture<Integer>[] fs + = (CompletableFuture<Integer>[]) new CompletableFuture<?>[100]; + for (int i = 0; i < fs.length; i++) + fs[i] = new CompletableFuture<>(); + for (int i = 0; i < n; i++) + assertTrue(CompletableFuture.allOf(fs).cancel(false)); + } + // static <U> U join(CompletionStage<U> stage) { // CompletableFuture<U> f = new CompletableFuture<>(); // stage.whenComplete((v, ex) -> {