OpenJDK / jdk / hs
changeset 39778:5cda06c52cdd
8160402: Garbage retention with CompletableFuture.anyOf
Reviewed-by: martin, psandoz, plevart
author | dl |
---|---|
date | Tue, 26 Jul 2016 09:53:38 -0700 |
parents | a9b80432c521 |
children | 4666307d3155 |
files | jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java jdk/test/java/util/concurrent/tck/CompletableFutureTest.java |
diffstat | 2 files changed, 93 insertions(+), 54 deletions(-) [+] |
line wrap: on
line diff
--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Jul 26 09:49:25 2016 -0700 +++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java Tue Jul 26 09:53:38 2016 -0700 @@ -221,7 +221,10 @@ * across both while pushing actions. The second completion is * a CoCompletion pointing to the first, shared so that at most * one performs the action. The multiple-arity methods allOf - * and anyOf do this pairwise to form trees of completions. + * does this pairwise to form trees of completions. Method + * anyOf is handled differently from allOf because completion of + * any source should trigger a cleanStack of other sources. + * Each AnyOf completion can reach others via a shared array. * * Note that the generic type parameters of methods vary according * to whether "this" is a source, dependent, or completion. @@ -588,9 +591,9 @@ } /** - * Post-processing by dependent after successful UniCompletion - * tryFire. Tries to clean stack of source a, and then either runs - * postComplete or returns this to caller, depending on mode. + * Post-processing by dependent after successful UniCompletion tryFire. + * Tries to clean stack of source a, and then either runs postComplete + * or returns this to caller, depending on mode. */ final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { if (a != null && a.stack != null) { @@ -1003,12 +1006,12 @@ } @SuppressWarnings("serial") - static final class UniRelay<T> extends UniCompletion<T,T> { - UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { + static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { + UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) { super(null, dep, src); } - final CompletableFuture<T> tryFire(int mode) { - CompletableFuture<T> d; CompletableFuture<T> a; Object r; + final CompletableFuture<U> tryFire(int mode) { + CompletableFuture<U> d; CompletableFuture<T> a; Object r; if ((d = dep) == null || (a = src) == null || (r = a.result) == null) return null; @@ -1019,13 +1022,14 @@ } } - private CompletableFuture<T> uniCopyStage() { + private static <U, T extends U> CompletableFuture<U> uniCopyStage( + CompletableFuture<T> src) { Object r; - CompletableFuture<T> d = newIncompleteFuture(); - if ((r = result) != null) + CompletableFuture<U> d = src.newIncompleteFuture(); + if ((r = src.result) != null) d.result = encodeRelay(r); else - unipush(new UniRelay<T>(d, this)); + src.unipush(new UniRelay<U,T>(d, src)); return d; } @@ -1034,7 +1038,7 @@ if ((r = result) != null) return new MinimalStage<T>(encodeRelay(r)); MinimalStage<T> d = new MinimalStage<T>(); - unipush(new UniRelay<T>(d, this)); + unipush(new UniRelay<T,T>(d, this)); return d; } @@ -1069,7 +1073,7 @@ if ((r = g.result) != null) d.completeRelay(r); else { - g.unipush(new UniRelay<V>(d, g)); + g.unipush(new UniRelay<V,V>(d, g)); if (d.result == null) return null; } @@ -1103,7 +1107,7 @@ if ((s = g.result) != null) d.result = encodeRelay(s); else { - g.unipush(new UniRelay<V>(d, g)); + g.unipush(new UniRelay<V,V>(d, g)); } } catch (Throwable ex) { d.result = encodeThrowable(ex); @@ -1637,45 +1641,40 @@ return d; } + /** Completion for an anyOf input future. */ @SuppressWarnings("serial") - static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or - OrRelay(CompletableFuture<Object> dep, - CompletableFuture<T> src, CompletableFuture<U> snd) { - super(null, dep, src, snd); + static class AnyOf extends Completion { + CompletableFuture<Object> dep; CompletableFuture<?> src; + CompletableFuture<?>[] srcs; + AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, + CompletableFuture<?>[] srcs) { + this.dep = dep; this.src = src; this.srcs = srcs; } final CompletableFuture<Object> tryFire(int mode) { - CompletableFuture<Object> d; - CompletableFuture<T> a; - CompletableFuture<U> b; + // assert mode != ASYNC; + CompletableFuture<Object> d; CompletableFuture<?> a; + CompletableFuture<?>[] as; Object r; if ((d = dep) == null - || (a = src) == null || (b = snd) == null - || ((r = a.result) == null && (r = b.result) == null)) + || (a = src) == null || (r = a.result) == null + || (as = srcs) == null) return null; - d.completeRelay(r); - src = null; snd = null; dep = null; - return d.postFire(a, b, mode); + dep = null; src = null; srcs = null; + if (d.completeRelay(r)) { + for (CompletableFuture<?> b : as) + if (b != a) + b.cleanStack(); + if (mode < 0) + return d; + else + d.postComplete(); + } + return null; } - } - - /** Recursively constructs a tree of completions. */ - static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, - int lo, int hi) { - CompletableFuture<Object> d = new CompletableFuture<Object>(); - if (lo <= hi) { - CompletableFuture<?> a, b; Object r; - int mid = (lo + hi) >>> 1; - if ((a = (lo == mid ? cfs[lo] : - orTree(cfs, lo, mid))) == null || - (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : - orTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - if ((r = a.result) != null && (r = b.result) != null) - d.result = encodeRelay(r); - else - a.orpush(b, new OrRelay<>(d, a, b)); + final boolean isLive() { + CompletableFuture<Object> d; + return (d = dep) != null && d.result == null; } - return d; } /* ------------- Zero-input Async forms -------------- */ @@ -2354,7 +2353,28 @@ * {@code null} */ public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { - return orTree(cfs, 0, cfs.length - 1); + int n; Object r; + if ((n = cfs.length) <= 1) + return (n == 0) + ? new CompletableFuture<Object>() + : uniCopyStage(cfs[0]); + for (CompletableFuture<?> cf : cfs) + if ((r = cf.result) != null) + return new CompletableFuture<Object>(encodeRelay(r)); + cfs = cfs.clone(); + CompletableFuture<Object> d = new CompletableFuture<>(); + for (CompletableFuture<?> cf : cfs) + cf.unipush(new AnyOf(d, cf, cfs)); + // If d was completed while we were adding completions, we should + // clean the stack of any sources that may have had completions + // pushed on their stack after d was completed. + if (d.result != null) + for (int i = 0, len = cfs.length; i < len; i++) + if (cfs[i].result != null) + for (i++; i < len; i++) + if (cfs[i].result == null) + cfs[i].cleanStack(); + return d; } /* ------------- Control and status methods -------------- */ @@ -2526,7 +2546,7 @@ * @since 9 */ public CompletableFuture<T> copy() { - return uniCopyStage(); + return uniCopyStage(this); } /**
--- a/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Tue Jul 26 09:49:25 2016 -0700 +++ b/jdk/test/java/util/concurrent/tck/CompletableFutureTest.java Tue Jul 26 09:53:38 2016 -0700 @@ -4269,12 +4269,11 @@ } } - /* - * Tests below currently fail in stress mode due to memory retention. - * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest tck + /** + * Reproduction recipe for: + * 8160402: Garbage retention with CompletableFuture.anyOf + * cvs update -D '2016-05-01' ./src/main/java/util/concurrent/CompletableFuture.java && ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testAnyOfGarbageRetention tck; cvs update -A */ - - /** Checks for garbage retention with anyOf. */ public void testAnyOfGarbageRetention() throws Throwable { for (Integer v : new Integer[] { 1, null }) { @@ -4288,7 +4287,12 @@ checkCompletedNormally(CompletableFuture.anyOf(fs), v); }} - /** Checks for garbage retention with allOf. */ + /** + * Checks for garbage retention with allOf. + * + * As of 2016-07, fails with OOME: + * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledAllOfGarbageRetention tck + */ public void testCancelledAllOfGarbageRetention() throws Throwable { final int n = expensiveTests ? 100_000 : 10; CompletableFuture<Integer>[] fs @@ -4299,6 +4303,21 @@ assertTrue(CompletableFuture.allOf(fs).cancel(false)); } + /** + * Checks for garbage retention when a dependent future is + * cancelled and garbage-collected. + * 8161600: Garbage retention when source CompletableFutures are never completed + * + * As of 2016-07, fails with OOME: + * ant -Dvmoptions=-Xmx8m -Djsr166.expensiveTests=true -Djsr166.tckTestClass=CompletableFutureTest -Djsr166.methodFilter=testCancelledGarbageRetention tck + */ + public void testCancelledGarbageRetention() throws Throwable { + final int n = expensiveTests ? 100_000 : 10; + CompletableFuture<Integer> neverCompleted = new CompletableFuture<>(); + for (int i = 0; i < n; i++) + assertTrue(neverCompleted.thenRun(() -> {}).cancel(true)); + } + // static <U> U join(CompletionStage<U> stage) { // CompletableFuture<U> f = new CompletableFuture<>(); // stage.whenComplete((v, ex) -> {