changeset 5991:75930f7b49dc it2-bootstrap

Merge
author briangoetz
date Wed, 12 Sep 2012 16:04:04 -0400
parents 8a8c359c3b06 402a8d1a781b
children c94321c10623
files
diffstat 13 files changed, 403 insertions(+), 155 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/Arrays.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/Arrays.java	Wed Sep 12 16:04:04 2012 -0400
@@ -3717,6 +3717,8 @@
     }
 
     private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> {
+        boolean iterating = false;
+
         ArraySpliterator(T[] array) {
             this(array, 0, array.length);
         }
@@ -3738,8 +3740,8 @@
         @Override
         public void into(Sink<T, ?, ?> sink) {
             sink.begin(getRemainingSizeIfKnown());
-            // Strange-looking way to iterate; reduce heap write traffic
             if (offset < endOffset) {
+                // Strange-looking way to iterate; reduce heap write traffic
                 int wasOffset = offset;
                 offset = endOffset;
                 for (int i=wasOffset; i<endOffset; i++)
@@ -3749,7 +3751,14 @@
         }
 
         @Override
+        public int getNaturalSplitArity() {
+            return (endOffset - offset > 1) ? 1 : 0;
+        }
+
+        @Override
         public Spliterator<T> split() {
+            if (iterating)
+                throw new IllegalStateException("split after iterate");
             int t = (endOffset - offset) / 2;
             Spliterator<T> ret = new ArraySpliterator<>(array, offset, t);
             offset += t;
@@ -3758,6 +3767,7 @@
 
         @Override
         public Iterator<T> iterator() {
+            iterating = true;
             return this;
         }
     }
@@ -3771,8 +3781,8 @@
         @Override
         public void into(Sink<T, ?, ?> sink) {
             sink.begin(endOffset-offset);
-            // Strange-looking way to iterate; reduce heap write traffic
             if (offset < endOffset) {
+                // Strange-looking way to iterate; reduce heap write traffic
                 int wasOffset = offset;
                 offset = endOffset;
                 for (int i=wasOffset; i<endOffset; i++)
--- a/src/share/classes/java/util/Optional.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/Optional.java	Wed Sep 12 16:04:04 2012 -0400
@@ -51,7 +51,7 @@
     private final boolean present;
 
     public Optional(T value) {
-        this.value = value;
+        this.value = Objects.requireNonNull(value);
         this.present = true;
     }
 
@@ -187,4 +187,9 @@
         result = 31 * result + (present ? 1 : 0);
         return result;
     }
+
+    @Override
+    public String toString() {
+        return present ? String.format("Optional[%s]", value) : "Optional.empty";
+    }
 }
--- a/src/share/classes/java/util/concurrent/ForkJoinUtils.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/concurrent/ForkJoinUtils.java	Wed Sep 12 16:04:04 2012 -0400
@@ -56,7 +56,7 @@
     private ForkJoinUtils() {}; // no construction
 
     public static int suggestDepth(long s) {
-        long leafSize = 1 + ((s + 7) >>> 3) / defaultFJPool().getParallelism();
+        long leafSize = suggestTargetSize(s);
         int d = 0;
         while (s > leafSize) {
             s /= 2;
@@ -65,6 +65,10 @@
         return d;
     }
 
+    public static long suggestTargetSize(long s) {
+        return 1 + ((s + 7) >>> 3) / defaultFJPool().getParallelism();
+    }
+
     /**
      * The exception thrown if initialization of the 
      * {@link #defaultFJPool default ForkJoinPool}
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Sep 12 16:04:04 2012 -0400
@@ -172,6 +172,14 @@
             }
 
             @Override
+            public long suggestTargetSize() {
+                int size = source.getSizeOrEstimate();
+                if (size < 0)
+                    size = -size;
+                return ForkJoinUtils.suggestTargetSize(size);
+            }
+
+            @Override
             public Spliterator<T> spliterator() {
                 return source.spliterator();
             }
--- a/src/share/classes/java/util/streams/Spliterator.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/Spliterator.java	Wed Sep 12 16:04:04 2012 -0400
@@ -34,7 +34,13 @@
  */
 public interface Spliterator<T> {
     /**
-     * Returns a Spliterator covering approximately half of the
+     * Return the number of calls to split() that will naturally divide the
+     * data structure.
+     */
+    int getNaturalSplitArity();
+
+    /**
+     * Returns a Spliterator covering some portion of the
      * elements, guaranteed not to overlap with those subsequently
      * returned by this spliterator.  After invoking this method,
      * the current Spliterator will <em>not</em> cover the
--- a/src/share/classes/java/util/streams/Streams.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/Streams.java	Wed Sep 12 16:04:04 2012 -0400
@@ -86,6 +86,11 @@
     public static<T> Spliterator<T> emptySpliterator() {
         return new Spliterator<T>() {
             @Override
+            public int getNaturalSplitArity() {
+                return 0;
+            }
+
+            @Override
             public Spliterator<T> split() {
                 return emptySpliterator();
             }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/share/classes/java/util/streams/ops/AbstractTask.java	Wed Sep 12 16:04:04 2012 -0400
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.streams.ops;
+
+import java.util.concurrent.CountedCompleter;
+import java.util.streams.Sink;
+import java.util.streams.Spliterator;
+
+/**
+ * AbstractTask
+ *
+ * @author Brian Goetz
+ */
+abstract class AbstractTask<E, R, T extends AbstractTask<E,R,T>> extends CountedCompleter {
+    protected final Spliterator<E> spliterator;
+    protected final long targetSize;
+
+    protected int numChildren;
+    protected T children;
+    protected T nextSibling;
+    protected boolean isLeaf;
+    protected R result;
+
+    protected AbstractTask(Spliterator<E> spliterator, long targetSize) {
+        super(null);
+        this.spliterator = spliterator;
+        this.targetSize = targetSize;
+    }
+
+    protected AbstractTask(T parent, Spliterator<E> spliterator) {
+        super(parent);
+        this.spliterator = spliterator;
+        this.targetSize = parent.targetSize;
+    }
+
+    protected abstract T makeChild(Spliterator<E> spliterator);
+
+    protected abstract R doLeaf();
+
+    protected T getParent() {
+        return (T) getCompleter();
+    }
+
+    @Override
+    public void compute() {
+        int remaining = spliterator.getRemainingSizeIfKnown();
+        int naturalSplits = spliterator.getNaturalSplitArity();
+        isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+        if (isLeaf) {
+            result = doLeaf();
+            tryComplete();
+        }
+        else {
+            T curChild = null;
+            setPendingCount(naturalSplits);
+            numChildren = naturalSplits + 1;
+            for (int i=0; i<naturalSplits; i++) {
+                T newChild = makeChild(spliterator.split());
+                if (curChild == null)
+                    children = newChild;
+                else
+                    curChild.nextSibling = newChild;
+                curChild = newChild;
+                newChild.fork();
+            }
+
+            T lastChild = makeChild(spliterator);
+            curChild.nextSibling = lastChild;
+            lastChild.compute();
+        }
+    }
+}
+
+abstract class ComparableTask<E, R, T extends ComparableTask<E,R,T>> extends AbstractTask<E,R,T> implements Comparable<T> {
+    protected final int depth;
+
+    protected ComparableTask(Spliterator<E> spliterator, int targetSize) {
+        super(spliterator, targetSize);
+        depth = 0;
+    }
+
+    protected ComparableTask(T parent, Spliterator<E> spliterator) {
+        super(parent, spliterator);
+        depth = parent.depth + 1;
+    }
+
+    @Override
+    public int compareTo(T o) {
+        if (this == o)
+            return 0;
+        ComparableTask<E,R,T> me = this;
+        T other = o;
+        if (me.depth < other.depth) {
+            while (me.depth < other.depth)
+                other = other.getParent();
+        }
+        else if (other.depth < me.depth) {
+            while (other.depth < me.depth)
+                me = (T) me.getParent();
+        }
+        while (me.getCompleter() != other.getCompleter()) {
+            me = me.getParent();
+            other = other.getParent();
+        }
+        ComparableTask<E,R,T> child = me.getParent().children;
+        for (; child != null; child = child.nextSibling) {
+            if (child == me)
+                return -1;
+            else if (child == other)
+                return 1;
+        }
+        throw new IllegalStateException();
+    }
+}
+
--- a/src/share/classes/java/util/streams/ops/FoldOp.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/FoldOp.java	Wed Sep 12 16:04:04 2012 -0400
@@ -25,9 +25,11 @@
 package java.util.streams.ops;
 
 import java.util.Iterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.functions.BinaryOperator;
+import java.util.functions.Combiner;
+import java.util.functions.Factory;
 import java.util.streams.Spliterator;
-import java.util.concurrent.RecursiveTask;
-import java.util.functions.*;
 import java.util.streams.TerminalSink;
 
 /**
@@ -87,36 +89,47 @@
 
     @Override
     public <V> U computeParallel(ParallelOpHelper<T, V> helper) {
-        return helper.invoke(new ReduceTask<>(helper.suggestDepth(), helper.spliterator(), this, helper));
+        ReduceTask<T, U, V> task = new ReduceTask<>(helper.spliterator(), helper.suggestTargetSize(), helper, this);
+        helper.invoke(task);
+        return task.result;
     }
 
-    private static class ReduceTask<T, U, V> extends RecursiveTask<U> {
+    private static class ReduceTask<T, U, V> extends AbstractTask<V, U, ReduceTask<T,U,V>> {
         private final ParallelOpHelper<T, V> helper;
         private final FoldOp<T,U> op;
-        private final int depth;
-        private final Spliterator<V> source;
 
-        private ReduceTask(int depth, Spliterator<V> source, FoldOp<T,U> op, ParallelOpHelper<T, V> helper) {
-            this.depth = depth;
-            this.source = source;
+        private ReduceTask(Spliterator<V> spliterator, long targetSize, ParallelOpHelper<T, V> helper, FoldOp<T, U> op) {
+            super(spliterator, targetSize);
             this.helper = helper;
             this.op = op;
         }
 
+        private ReduceTask(ReduceTask<T, U, V> parent, Spliterator<V> spliterator) {
+            super(parent, spliterator);
+            this.helper = parent.helper;
+            this.op = parent.op;
+        }
+
         @Override
-        protected U compute() {
-            if (depth != 0) {
-                ReduceTask<T, U, V> left = new ReduceTask<>(depth - 1, source.split(), op, helper);
-                ReduceTask<T, U, V> right = new ReduceTask<>(depth - 1, source, op, helper);
-                right.fork();
-                U leftResult = left.compute();
-                U rightResult = right.join();
-                return op.combiner.operate(leftResult, rightResult);
-            }
-            else {
-                final TerminalSink<T, U> reduceStage = op.sink();
-                source.into(helper.sink(reduceStage));
-                return reduceStage.getAndClearState();
+        protected ReduceTask<T, U, V> makeChild(Spliterator<V> spliterator) {
+            return new ReduceTask<>(this, spliterator);
+        }
+
+        @Override
+        protected U doLeaf() {
+            final TerminalSink<T, U> reduceStage = op.sink();
+            spliterator.into(helper.sink(reduceStage));
+            return reduceStage.getAndClearState();
+        }
+
+        @Override
+        public void onCompletion(CountedCompleter caller) {
+            if (!isLeaf) {
+                ReduceTask<T,U,V> child = children;
+                result = child.result;
+                child = child.nextSibling;
+                for (; child != null; child = child.nextSibling)
+                    result = op.combiner.operate(result, child.result);
             }
         }
     }
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ForEachOp.java	Wed Sep 12 16:04:04 2012 -0400
@@ -26,10 +26,12 @@
 
 import java.util.Mapping;
 import java.util.Objects;
-import java.util.streams.*;
-import java.util.concurrent.RecursiveAction;
 import java.util.functions.BiBlock;
 import java.util.functions.Block;
+import java.util.streams.Sink;
+import java.util.streams.Spliterator;
+import java.util.streams.StreamShape;
+import java.util.streams.TerminalSink;
 
 /**
  * ForEachOp
@@ -94,39 +96,39 @@
 
     @Override
     public <V> Void computeParallel(ParallelOpHelper<T, V> helper) {
-        int depth = helper.suggestDepth();
+        long targetSize = helper.suggestTargetSize();
         Sink<V, ?, ?> compoundSink = helper.sink(sink);
         Spliterator<V> spliterator = helper.spliterator();
-        if (depth == 0) {
+        if (helper.suggestDepth() == 0) {
             spliterator.into(compoundSink);
         } else {
-            helper.invoke(new ForEachTask<>(depth, spliterator, compoundSink));
+            helper.invoke(new ForEachTask<>(spliterator, targetSize, compoundSink));
         }
         return null;
     }
 
-    private static class ForEachTask<T> extends RecursiveAction {
-        private final int depth;
-        private final Spliterator<T> spliterator;
+    private static class ForEachTask<T> extends AbstractTask<T, Void, ForEachTask<T>> {
         private final Sink<T, ?, ?> sink;
 
-        private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T, ?, ?> sink) {
-            this.depth = depth;
-            this.spliterator = spliterator;
+        private ForEachTask(Spliterator<T> spliterator, long targetSize, Sink<T, ?, ?> sink) {
+            super(spliterator, targetSize);
+            this.sink = sink;
+        }
+
+        private ForEachTask(ForEachTask<T> parent, Spliterator<T> spliterator, Sink<T, ?, ?> sink) {
+            super(parent, spliterator);
             this.sink = sink;
         }
 
         @Override
-        protected void compute() {
-            if (depth == 0) {
-                spliterator.into(sink);
-            } else {
-                ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink);
-                ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink);
-                right.fork();
-                left.compute();
-                right.join();
-            }
+        protected ForEachTask<T> makeChild(Spliterator<T> spliterator) {
+            return new ForEachTask<>(this, spliterator, sink);
+        }
+
+        @Override
+        protected Void doLeaf() {
+            spliterator.into(sink);
+            return null;
         }
     }
 }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/ParallelOp.java	Wed Sep 12 16:04:04 2012 -0400
@@ -41,6 +41,7 @@
 
     public interface ParallelOpHelper<T, V> {
         public int suggestDepth();
+        public long suggestTargetSize();
         public Spliterator<V> spliterator();
         public Sink<V, ?, ?> sink(Sink sink);
         public Iterator<T> iterator();
--- a/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java	Wed Sep 12 16:04:04 2012 -0400
@@ -44,16 +44,13 @@
     }
 
     private static<T> BinaryOperator<Optional<T>> combinerFor(final BinaryOperator<T> operator) {
-        return new BinaryOperator<Optional<T>>() {
-            @Override
-            public Optional<T> operate(Optional<T> left, Optional<T> right) {
-                if (!left.isPresent())
-                    return right;
-                else if (!right.isPresent())
-                    return left;
-                else
-                    return new Optional<>(operator.operate(left.get(), right.get()));
-            }
+        return (left, right) -> {
+            if (!left.isPresent())
+                return right;
+            else if (!right.isPresent())
+                return left;
+            else
+                return new Optional<>(operator.operate(left.get(), right.get()));
         };
     }
 
@@ -82,9 +79,12 @@
 
             @Override
             public Optional<T> getAndClearState() {
-                Optional<T> result = first ? Optional.<T>empty() : new Optional<>(state);
-                state = null;
-                return result;
+                try {
+                    return first ? Optional.<T>empty() : new Optional<>(state);
+                }
+                finally {
+                    state = null;
+                }
             }
 
             @Override
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/src/share/classes/java/util/streams/ops/TreeUtils.java	Wed Sep 12 16:04:04 2012 -0400
@@ -25,8 +25,7 @@
 package java.util.streams.ops;
 
 import java.util.*;
-import java.util.concurrent.RecursiveAction;
-import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.CountedCompleter;
 import java.util.functions.Block;
 import java.util.streams.*;
 
@@ -41,8 +40,8 @@
     }
 
     public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper,
-                                       boolean flattenLeaves,
-                                       boolean flattenTree) {
+                                        boolean flattenLeaves,
+                                        boolean flattenTree) {
         int depth = helper.suggestDepth();
         Spliterator<U> spliterator = helper.spliterator();
         int size = spliterator.getRemainingSizeIfKnown();
@@ -67,11 +66,13 @@
             // Need to account for SIZED flag from pipeline
             if (size != -1 && spliterator.isPredictableSplits() && splitSizesKnown) {
                 T[] array = (T[]) new Object[size];
-                helper.invoke(new SizedCollectorTask<>(depth, spliterator, helper, array, 0, size));
+                helper.invoke(new SizedCollectorTask<>(spliterator, helper, helper.suggestTargetSize(), array, 0, size));
                 return node(array);
             }
             else {
-                Node<T> node = helper.invoke(new CollectorTask<>(depth, spliterator, helper));
+                CollectorTask<U, T> task = new CollectorTask<>(spliterator, helper.suggestTargetSize(), helper);
+                helper.invoke(task);
+                Node<T> node = task.result;
                 if (flattenTree) {
                     T[] array = (T[]) new Object[node.size()];
                     helper.invoke(new ToArrayTask<>(node, array, 0));
@@ -100,72 +101,101 @@
         }
     }
 
-    private static class CollectorTask<T, U> extends RecursiveTask<Node<U>> {
-        private final int depth;
-        private final Spliterator<T> spliterator;
-        private ParallelOp.ParallelOpHelper<U, T> helper;
+    private static class CollectorTask<T, U> extends AbstractTask<T, Node<U>, CollectorTask<T,U>> {
+        private final ParallelOp.ParallelOpHelper<U, T> helper;
 
-        public CollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) {
-            this.depth = depth;
-            this.spliterator = spliterator;
+        private CollectorTask(Spliterator<T> spliterator, long targetSize, ParallelOp.ParallelOpHelper<U, T> helper) {
+            super(spliterator, targetSize);
             this.helper = helper;
         }
 
+        private CollectorTask(CollectorTask<T, U> parent, Spliterator<T> spliterator) {
+            super(parent, spliterator);
+            helper = parent.helper;
+        }
+
         @Override
-        protected Node<U> compute() {
-            if (depth == 0) {
-                // @@@ Usual comment about using fixed stream builders if we know enough to do so
-                StreamBuilder<U> builder = StreamBuilders.make();
-                spliterator.into(helper.sink(builder));
-                return node(builder);
-            }
-            else {
-                CollectorTask<T, U> left = new CollectorTask<>(depth-1, spliterator.split(), helper);
-                CollectorTask<T, U> right = new CollectorTask<>(depth-1, spliterator, helper);
-                right.fork();
-                Node<U> leftResult = left.compute();
-                Node<U> rightResult = right.join();
-                return node(leftResult, rightResult);
+        protected CollectorTask<T, U> makeChild(Spliterator<T> spliterator) {
+            return new CollectorTask<>(this, spliterator);
+        }
+
+        @Override
+        protected Node<U> doLeaf() {
+            // @@@ Usual comment about using fixed stream builders if we know enough to do so
+            StreamBuilder<U> builder = StreamBuilders.make();
+            spliterator.into(helper.sink(builder));
+            return node(builder);
+        }
+
+        @Override
+        public void onCompletion(CountedCompleter caller) {
+            if (!isLeaf) {
+                @SuppressWarnings("unchecked")
+                Node<U>[] nodes = (Node<U>[]) new Node[numChildren];
+                int idx = 0;
+                for (CollectorTask<T, U> cur = children; cur != null; cur = cur.nextSibling)
+                    nodes[idx++] = cur.result;
+                result = node(nodes);
             }
         }
     }
 
-    private static class SizedCollectorTask<T, U> extends RecursiveAction {
-        private final int depth;
+    private static class SizedCollectorTask<T, U> extends CountedCompleter {
         private final Spliterator<T> spliterator;
-        private ParallelOp.ParallelOpHelper<U, T> helper;
+        private final ParallelOp.ParallelOpHelper<U, T> helper;
+        private final long targetSize;
         private final U[] array;
         private int offset;
         private int length;
 
-        private SizedCollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) {
-            this.depth = depth;
+        private SizedCollectorTask(Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, long targetSize, U[] array, int offset, int length) {
             this.spliterator = spliterator;
             this.helper = helper;
+            this.targetSize = targetSize;
             this.array = array;
             this.offset = offset;
             this.length = length;
         }
 
+        private SizedCollectorTask(SizedCollectorTask<T, U> parent, Spliterator<T> spliterator, int offset, int length) {
+            super(parent);
+            this.spliterator = spliterator;
+            this.helper = parent.helper;
+            this.targetSize = parent.targetSize;
+            this.array = parent.array;
+            this.offset = offset;
+            this.length = length;
+        }
+
         @Override
-        protected void compute() {
-            if (depth == 0) {
+        public void compute() {
+            int remaining = spliterator.getRemainingSizeIfKnown();
+            int naturalSplits = spliterator.getNaturalSplitArity();
+            boolean isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0);
+            if (isLeaf) {
                 spliterator.into(helper.sink(Arrays.sink(array, offset, length)));
+                tryComplete();
             }
             else {
-                Spliterator<T> split = spliterator.split();
-                int leftSize = split.getRemainingSizeIfKnown();
-                assert(leftSize != -1);
-                SizedCollectorTask<T, U> left = new SizedCollectorTask<>(depth-1, split, helper, array, offset, leftSize);
-                SizedCollectorTask<T, U> right = new SizedCollectorTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize);
-                right.fork();
-                left.compute();
-                right.join();
+                setPendingCount(naturalSplits);
+                int s = 0;
+                for (int i=0; i<naturalSplits; i++) {
+                    Spliterator<T> split = spliterator.split();
+                    SizedCollectorTask<T, U> task = new SizedCollectorTask<>(this, split, offset + s, length - s);
+                    task.fork();
+                    int thisSplitSize = split.getRemainingSizeIfKnown();
+                    if (thisSplitSize == -1)
+                        throw new IllegalStateException("split size must be known");
+                    s += thisSplitSize;
+                }
+
+                SizedCollectorTask<T, U> task = new SizedCollectorTask<>(this, spliterator, offset + s, length - s);
+                task.compute();
             }
         }
     }
 
-    private static class ToArrayTask<T> extends RecursiveAction {
+    private static class ToArrayTask<T> extends CountedCompleter {
         private final T[] array;
         private final Node<T> node;
         private final int offset;
@@ -177,18 +207,22 @@
         }
 
         @Override
-        protected void compute() {
+        public void compute() {
             if (node instanceof InternalNode) {
-                InternalNode<T> n = (InternalNode<T>) node;
-                Node<T> left = n.left();
-                ToArrayTask<T> leftTask = new ToArrayTask<>(left, array, offset);
-                ToArrayTask<T> rightTask = new ToArrayTask<>(n.right(), array, offset + left.size());
-                rightTask.fork();
-                leftTask.compute();
-                rightTask.join();
+                Node<T>[] nodes = ((InternalNode<T>) node).nodes();
+                setPendingCount(nodes.length - 1);
+                ToArrayTask<T> firstTask = new ToArrayTask<>(nodes[0], array, offset);
+                int size = nodes[0].size();
+                for (int i=1; i<nodes.length; i++) {
+                    ToArrayTask<T> task = new ToArrayTask<>(nodes[i], array, offset + size);
+                    size += nodes[i].size();
+                    task.fork();
+                }
+                firstTask.compute();
             }
             else {
                 node.copyTo(array, offset);
+                tryComplete();
             }
         }
     }
@@ -200,8 +234,7 @@
     }
 
     public static interface InternalNode<T> extends Node<T> {
-        Node<T> left();
-        Node<T> right();
+        Node<T>[] nodes();
     }
 
     public static<T> Node<T> node(final T[] array) {
@@ -299,45 +332,37 @@
         };
     }
 
-    public static<T> Node<T> node(Node<T> left, Node<T> right) {
-        return new InternalNodeImpl<>(left, right);
+    @SafeVarargs
+    public static<T> Node<T> node(Node<T>... nodes) {
+        return new InternalNodeImpl<>(nodes);
     }
 
     private static class InternalNodeImpl<T> implements InternalNode<T> {
-        private final Node<T> left, right;
+        private final Node<T>[] nodes;
         int size = 0;
 
-        private InternalNodeImpl(Node<T> left, Node<T> right) {
-            this.left = left;
-            this.right = right;
-        }
-
-        @Override
-        public Node<T> left() {
-            return left;
-        }
-
-        @Override
-        public Node<T> right() {
-            return right;
+        private InternalNodeImpl(Node<T>[] nodes) {
+            this.nodes = nodes;
         }
 
         @Override
         public int size() {
-            if (size == 0)
-                size = left.size() + right.size();
+            if (size == 0) {
+                for (Node<T> n : nodes)
+                    size += n.size();
+            }
             return size;
         }
 
         @Override
         public void forEach(Block<? super T> block) {
-            left.forEach(block);
-            right.forEach(block);
+            for (Node<T> n : nodes)
+                n.forEach(block);
         }
 
         @Override
         public Iterator<T> iterator() {
-            return concatenate(left.iterator(), right.iterator());
+            return concatenate(Arrays.stream(nodes).map(n -> n.iterator()).iterator());
         }
 
         @Override
@@ -357,21 +382,32 @@
 
         @Override
         public void copyTo(T[] array, int offset) {
-            left.copyTo(array, offset);
-            right.copyTo(array, offset+left.size());
+            int s = 0;
+            for (Node<T> n : nodes) {
+                n.copyTo(array, offset+s);
+                s += n.size();
+            }
         }
 
         @Override
         public String toString() {
-            return String.format("IntNode[%s,%s]", left.toString(), right.toString());
+            return String.format("IntNode[%s]", Arrays.stream(nodes).map(n -> n.toString()).into(new StringJoiner(",")).toString());
+        }
+
+        @Override
+        public Node<T>[] nodes() {
+            return nodes;
         }
 
         private static class InternalNodeSpliterator<T> implements Spliterator<T>, Iterator<T> {
             private Node<T> cur;
+            private int splitsLeft;
+            private int nextSplitIndex;
             private Iterator<T> iterator;
 
             private InternalNodeSpliterator(InternalNodeImpl<T> cur) {
                 this.cur = cur;
+                splitsLeft = cur.nodes.length - 1;
             }
 
             public Iterator<T> iterator() {
@@ -381,17 +417,33 @@
             }
 
             @Override
+            public int getNaturalSplitArity() {
+                return splitsLeft;
+            }
+
+            @Override
             public Spliterator<T> split() {
                 if (iterator != null)
-                    throw new IllegalStateException();
-                if (cur instanceof InternalNodeImpl) {
-                    InternalNodeImpl<T> internalNode = (InternalNodeImpl<T>) cur;
-                    Spliterator<T> ret = internalNode.left.spliterator();
-                    cur = internalNode.right;
+                    throw new IllegalStateException("split after iterate");
+                else if (splitsLeft == 0 || !(cur instanceof InternalNode))
+                    return Streams.emptySpliterator();
+                else {
+                    Node<T>[] nodes = ((InternalNode<T>) cur).nodes();
+                    Spliterator<T> ret = nodes[nextSplitIndex++].spliterator();
+                    if (--splitsLeft == 0) {
+                        cur = nodes[nextSplitIndex];
+                        if (cur instanceof InternalNode) {
+                            Node<T>[] newNodes  = ((InternalNode<T>) cur).nodes();
+                            splitsLeft = newNodes.length - 1;
+                            nextSplitIndex = 0;
+                        }
+                        else {
+                            splitsLeft = 0;
+                            nextSplitIndex = -1;
+                        }
+                    }
                     return ret;
                 }
-                else
-                    return Streams.emptySpliterator();
             }
 
             @Override
@@ -430,21 +482,21 @@
         }
     }
 
-    public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) {
+    public static<T> Iterator<T> concatenate(final Iterator<Iterator<T>> iterators) {
+        if (!iterators.hasNext())
+            return Collections.emptyIterator();
         return new Iterator<T>() {
-            private Iterator<? extends T> it = left;
-            private boolean switched = false;
+            private Iterator<? extends T> it = iterators.next();
 
             @Override
             public boolean hasNext() {
                 if (it.hasNext()) {
                     return true;
-                } else if (switched) {
+                } else if (!iterators.hasNext()) {
                     return false;
                 } else {
-                    switched = true;
-                    it = right;
-                    return it.hasNext();
+                    it = iterators.next();
+                    return hasNext();
                 }
             }
 
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Tue Sep 11 15:33:33 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Sep 12 16:04:04 2012 -0400
@@ -165,16 +165,16 @@
             data.forEach(sink2);
             sink2.end();
             U answer2 = terminalSink2.getAndClearState();
-            assertTrue(equalator.test(answer, answer2));
+            assertTrue(equalator.test(answer, answer2), String.format("%s != %s", answer, answer2));
         }
 
         // Third pass -- wrap with SequentialPipeline.op
         U answer3 = data.seq(terminalOp, ops);
-        assertTrue(equalator.test(answer, answer3));
+        assertTrue(equalator.test(answer, answer3), String.format("%s != %s", answer, answer3));
 
         // Fourth pass -- wrap with ParallelPipeline.op
         U answer4 = data.par(terminalOp, ops);
-        assertTrue(equalator.test(answer, answer4));
+        assertTrue(equalator.test(answer, answer4), String.format("%s != %s", answer, answer4));
 
         return answer;
     }
@@ -310,6 +310,11 @@
         }
 
         @Override
+        public long suggestTargetSize() {
+            return ForkJoinUtils.suggestTargetSize(data.size());
+        }
+
+        @Override
         public Spliterator<T> spliterator() {
             return data.spliterator();
         }