OpenJDK / lambda / lambda / jdk
changeset 5991:75930f7b49dc it2-bootstrap
Merge
author | briangoetz |
---|---|
date | Wed, 12 Sep 2012 16:04:04 -0400 |
parents | 8a8c359c3b06 402a8d1a781b |
children | c94321c10623 |
files | |
diffstat | 13 files changed, 403 insertions(+), 155 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/Arrays.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/Arrays.java Wed Sep 12 16:04:04 2012 -0400 @@ -3717,6 +3717,8 @@ } private static class ArraySpliterator<T> extends ArrayIterator<T> implements Spliterator<T> { + boolean iterating = false; + ArraySpliterator(T[] array) { this(array, 0, array.length); } @@ -3738,8 +3740,8 @@ @Override public void into(Sink<T, ?, ?> sink) { sink.begin(getRemainingSizeIfKnown()); - // Strange-looking way to iterate; reduce heap write traffic if (offset < endOffset) { + // Strange-looking way to iterate; reduce heap write traffic int wasOffset = offset; offset = endOffset; for (int i=wasOffset; i<endOffset; i++) @@ -3749,7 +3751,14 @@ } @Override + public int getNaturalSplitArity() { + return (endOffset - offset > 1) ? 1 : 0; + } + + @Override public Spliterator<T> split() { + if (iterating) + throw new IllegalStateException("split after iterate"); int t = (endOffset - offset) / 2; Spliterator<T> ret = new ArraySpliterator<>(array, offset, t); offset += t; @@ -3758,6 +3767,7 @@ @Override public Iterator<T> iterator() { + iterating = true; return this; } } @@ -3771,8 +3781,8 @@ @Override public void into(Sink<T, ?, ?> sink) { sink.begin(endOffset-offset); - // Strange-looking way to iterate; reduce heap write traffic if (offset < endOffset) { + // Strange-looking way to iterate; reduce heap write traffic int wasOffset = offset; offset = endOffset; for (int i=wasOffset; i<endOffset; i++)
--- a/src/share/classes/java/util/Optional.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/Optional.java Wed Sep 12 16:04:04 2012 -0400 @@ -51,7 +51,7 @@ private final boolean present; public Optional(T value) { - this.value = value; + this.value = Objects.requireNonNull(value); this.present = true; } @@ -187,4 +187,9 @@ result = 31 * result + (present ? 1 : 0); return result; } + + @Override + public String toString() { + return present ? String.format("Optional[%s]", value) : "Optional.empty"; + } }
--- a/src/share/classes/java/util/concurrent/ForkJoinUtils.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/concurrent/ForkJoinUtils.java Wed Sep 12 16:04:04 2012 -0400 @@ -56,7 +56,7 @@ private ForkJoinUtils() {}; // no construction public static int suggestDepth(long s) { - long leafSize = 1 + ((s + 7) >>> 3) / defaultFJPool().getParallelism(); + long leafSize = suggestTargetSize(s); int d = 0; while (s > leafSize) { s /= 2; @@ -65,6 +65,10 @@ return d; } + public static long suggestTargetSize(long s) { + return 1 + ((s + 7) >>> 3) / defaultFJPool().getParallelism(); + } + /** * The exception thrown if initialization of the * {@link #defaultFJPool default ForkJoinPool}
--- a/src/share/classes/java/util/streams/AbstractPipeline.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/AbstractPipeline.java Wed Sep 12 16:04:04 2012 -0400 @@ -172,6 +172,14 @@ } @Override + public long suggestTargetSize() { + int size = source.getSizeOrEstimate(); + if (size < 0) + size = -size; + return ForkJoinUtils.suggestTargetSize(size); + } + + @Override public Spliterator<T> spliterator() { return source.spliterator(); }
--- a/src/share/classes/java/util/streams/Spliterator.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/Spliterator.java Wed Sep 12 16:04:04 2012 -0400 @@ -34,7 +34,13 @@ */ public interface Spliterator<T> { /** - * Returns a Spliterator covering approximately half of the + * Return the number of calls to split() that will naturally divide the + * data structure. + */ + int getNaturalSplitArity(); + + /** + * Returns a Spliterator covering some portion of the * elements, guaranteed not to overlap with those subsequently * returned by this spliterator. After invoking this method, * the current Spliterator will <em>not</em> cover the
--- a/src/share/classes/java/util/streams/Streams.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/Streams.java Wed Sep 12 16:04:04 2012 -0400 @@ -86,6 +86,11 @@ public static<T> Spliterator<T> emptySpliterator() { return new Spliterator<T>() { @Override + public int getNaturalSplitArity() { + return 0; + } + + @Override public Spliterator<T> split() { return emptySpliterator(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/streams/ops/AbstractTask.java Wed Sep 12 16:04:04 2012 -0400 @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.streams.ops; + +import java.util.concurrent.CountedCompleter; +import java.util.streams.Sink; +import java.util.streams.Spliterator; + +/** + * AbstractTask + * + * @author Brian Goetz + */ +abstract class AbstractTask<E, R, T extends AbstractTask<E,R,T>> extends CountedCompleter { + protected final Spliterator<E> spliterator; + protected final long targetSize; + + protected int numChildren; + protected T children; + protected T nextSibling; + protected boolean isLeaf; + protected R result; + + protected AbstractTask(Spliterator<E> spliterator, long targetSize) { + super(null); + this.spliterator = spliterator; + this.targetSize = targetSize; + } + + protected AbstractTask(T parent, Spliterator<E> spliterator) { + super(parent); + this.spliterator = spliterator; + this.targetSize = parent.targetSize; + } + + protected abstract T makeChild(Spliterator<E> spliterator); + + protected abstract R doLeaf(); + + protected T getParent() { + return (T) getCompleter(); + } + + @Override + public void compute() { + int remaining = spliterator.getRemainingSizeIfKnown(); + int naturalSplits = spliterator.getNaturalSplitArity(); + isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0); + if (isLeaf) { + result = doLeaf(); + tryComplete(); + } + else { + T curChild = null; + setPendingCount(naturalSplits); + numChildren = naturalSplits + 1; + for (int i=0; i<naturalSplits; i++) { + T newChild = makeChild(spliterator.split()); + if (curChild == null) + children = newChild; + else + curChild.nextSibling = newChild; + curChild = newChild; + newChild.fork(); + } + + T lastChild = makeChild(spliterator); + curChild.nextSibling = lastChild; + lastChild.compute(); + } + } +} + +abstract class ComparableTask<E, R, T extends ComparableTask<E,R,T>> extends AbstractTask<E,R,T> implements Comparable<T> { + protected final int depth; + + protected ComparableTask(Spliterator<E> spliterator, int targetSize) { + super(spliterator, targetSize); + depth = 0; + } + + protected ComparableTask(T parent, Spliterator<E> spliterator) { + super(parent, spliterator); + depth = parent.depth + 1; + } + + @Override + public int compareTo(T o) { + if (this == o) + return 0; + ComparableTask<E,R,T> me = this; + T other = o; + if (me.depth < other.depth) { + while (me.depth < other.depth) + other = other.getParent(); + } + else if (other.depth < me.depth) { + while (other.depth < me.depth) + me = (T) me.getParent(); + } + while (me.getCompleter() != other.getCompleter()) { + me = me.getParent(); + other = other.getParent(); + } + ComparableTask<E,R,T> child = me.getParent().children; + for (; child != null; child = child.nextSibling) { + if (child == me) + return -1; + else if (child == other) + return 1; + } + throw new IllegalStateException(); + } +} +
--- a/src/share/classes/java/util/streams/ops/FoldOp.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/ops/FoldOp.java Wed Sep 12 16:04:04 2012 -0400 @@ -25,9 +25,11 @@ package java.util.streams.ops; import java.util.Iterator; +import java.util.concurrent.CountedCompleter; +import java.util.functions.BinaryOperator; +import java.util.functions.Combiner; +import java.util.functions.Factory; import java.util.streams.Spliterator; -import java.util.concurrent.RecursiveTask; -import java.util.functions.*; import java.util.streams.TerminalSink; /** @@ -87,36 +89,47 @@ @Override public <V> U computeParallel(ParallelOpHelper<T, V> helper) { - return helper.invoke(new ReduceTask<>(helper.suggestDepth(), helper.spliterator(), this, helper)); + ReduceTask<T, U, V> task = new ReduceTask<>(helper.spliterator(), helper.suggestTargetSize(), helper, this); + helper.invoke(task); + return task.result; } - private static class ReduceTask<T, U, V> extends RecursiveTask<U> { + private static class ReduceTask<T, U, V> extends AbstractTask<V, U, ReduceTask<T,U,V>> { private final ParallelOpHelper<T, V> helper; private final FoldOp<T,U> op; - private final int depth; - private final Spliterator<V> source; - private ReduceTask(int depth, Spliterator<V> source, FoldOp<T,U> op, ParallelOpHelper<T, V> helper) { - this.depth = depth; - this.source = source; + private ReduceTask(Spliterator<V> spliterator, long targetSize, ParallelOpHelper<T, V> helper, FoldOp<T, U> op) { + super(spliterator, targetSize); this.helper = helper; this.op = op; } + private ReduceTask(ReduceTask<T, U, V> parent, Spliterator<V> spliterator) { + super(parent, spliterator); + this.helper = parent.helper; + this.op = parent.op; + } + @Override - protected U compute() { - if (depth != 0) { - ReduceTask<T, U, V> left = new ReduceTask<>(depth - 1, source.split(), op, helper); - ReduceTask<T, U, V> right = new ReduceTask<>(depth - 1, source, op, helper); - right.fork(); - U leftResult = left.compute(); - U rightResult = right.join(); - return op.combiner.operate(leftResult, rightResult); - } - else { - final TerminalSink<T, U> reduceStage = op.sink(); - source.into(helper.sink(reduceStage)); - return reduceStage.getAndClearState(); + protected ReduceTask<T, U, V> makeChild(Spliterator<V> spliterator) { + return new ReduceTask<>(this, spliterator); + } + + @Override + protected U doLeaf() { + final TerminalSink<T, U> reduceStage = op.sink(); + spliterator.into(helper.sink(reduceStage)); + return reduceStage.getAndClearState(); + } + + @Override + public void onCompletion(CountedCompleter caller) { + if (!isLeaf) { + ReduceTask<T,U,V> child = children; + result = child.result; + child = child.nextSibling; + for (; child != null; child = child.nextSibling) + result = op.combiner.operate(result, child.result); } } }
--- a/src/share/classes/java/util/streams/ops/ForEachOp.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/ops/ForEachOp.java Wed Sep 12 16:04:04 2012 -0400 @@ -26,10 +26,12 @@ import java.util.Mapping; import java.util.Objects; -import java.util.streams.*; -import java.util.concurrent.RecursiveAction; import java.util.functions.BiBlock; import java.util.functions.Block; +import java.util.streams.Sink; +import java.util.streams.Spliterator; +import java.util.streams.StreamShape; +import java.util.streams.TerminalSink; /** * ForEachOp @@ -94,39 +96,39 @@ @Override public <V> Void computeParallel(ParallelOpHelper<T, V> helper) { - int depth = helper.suggestDepth(); + long targetSize = helper.suggestTargetSize(); Sink<V, ?, ?> compoundSink = helper.sink(sink); Spliterator<V> spliterator = helper.spliterator(); - if (depth == 0) { + if (helper.suggestDepth() == 0) { spliterator.into(compoundSink); } else { - helper.invoke(new ForEachTask<>(depth, spliterator, compoundSink)); + helper.invoke(new ForEachTask<>(spliterator, targetSize, compoundSink)); } return null; } - private static class ForEachTask<T> extends RecursiveAction { - private final int depth; - private final Spliterator<T> spliterator; + private static class ForEachTask<T> extends AbstractTask<T, Void, ForEachTask<T>> { private final Sink<T, ?, ?> sink; - private ForEachTask(int depth, Spliterator<T> spliterator, Sink<T, ?, ?> sink) { - this.depth = depth; - this.spliterator = spliterator; + private ForEachTask(Spliterator<T> spliterator, long targetSize, Sink<T, ?, ?> sink) { + super(spliterator, targetSize); + this.sink = sink; + } + + private ForEachTask(ForEachTask<T> parent, Spliterator<T> spliterator, Sink<T, ?, ?> sink) { + super(parent, spliterator); this.sink = sink; } @Override - protected void compute() { - if (depth == 0) { - spliterator.into(sink); - } else { - ForEachTask<T> left = new ForEachTask<>(depth - 1, spliterator.split(), sink); - ForEachTask<T> right = new ForEachTask<>(depth - 1, spliterator, sink); - right.fork(); - left.compute(); - right.join(); - } + protected ForEachTask<T> makeChild(Spliterator<T> spliterator) { + return new ForEachTask<>(this, spliterator, sink); + } + + @Override + protected Void doLeaf() { + spliterator.into(sink); + return null; } } }
--- a/src/share/classes/java/util/streams/ops/ParallelOp.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/ops/ParallelOp.java Wed Sep 12 16:04:04 2012 -0400 @@ -41,6 +41,7 @@ public interface ParallelOpHelper<T, V> { public int suggestDepth(); + public long suggestTargetSize(); public Spliterator<V> spliterator(); public Sink<V, ?, ?> sink(Sink sink); public Iterator<T> iterator();
--- a/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/ops/SeedlessFoldOp.java Wed Sep 12 16:04:04 2012 -0400 @@ -44,16 +44,13 @@ } private static<T> BinaryOperator<Optional<T>> combinerFor(final BinaryOperator<T> operator) { - return new BinaryOperator<Optional<T>>() { - @Override - public Optional<T> operate(Optional<T> left, Optional<T> right) { - if (!left.isPresent()) - return right; - else if (!right.isPresent()) - return left; - else - return new Optional<>(operator.operate(left.get(), right.get())); - } + return (left, right) -> { + if (!left.isPresent()) + return right; + else if (!right.isPresent()) + return left; + else + return new Optional<>(operator.operate(left.get(), right.get())); }; } @@ -82,9 +79,12 @@ @Override public Optional<T> getAndClearState() { - Optional<T> result = first ? Optional.<T>empty() : new Optional<>(state); - state = null; - return result; + try { + return first ? Optional.<T>empty() : new Optional<>(state); + } + finally { + state = null; + } } @Override
--- a/src/share/classes/java/util/streams/ops/TreeUtils.java Tue Sep 11 15:33:33 2012 -0700 +++ b/src/share/classes/java/util/streams/ops/TreeUtils.java Wed Sep 12 16:04:04 2012 -0400 @@ -25,8 +25,7 @@ package java.util.streams.ops; import java.util.*; -import java.util.concurrent.RecursiveAction; -import java.util.concurrent.RecursiveTask; +import java.util.concurrent.CountedCompleter; import java.util.functions.Block; import java.util.streams.*; @@ -41,8 +40,8 @@ } public static<T, U> Node<T> collect(ParallelOp.ParallelOpHelper<T, U> helper, - boolean flattenLeaves, - boolean flattenTree) { + boolean flattenLeaves, + boolean flattenTree) { int depth = helper.suggestDepth(); Spliterator<U> spliterator = helper.spliterator(); int size = spliterator.getRemainingSizeIfKnown(); @@ -67,11 +66,13 @@ // Need to account for SIZED flag from pipeline if (size != -1 && spliterator.isPredictableSplits() && splitSizesKnown) { T[] array = (T[]) new Object[size]; - helper.invoke(new SizedCollectorTask<>(depth, spliterator, helper, array, 0, size)); + helper.invoke(new SizedCollectorTask<>(spliterator, helper, helper.suggestTargetSize(), array, 0, size)); return node(array); } else { - Node<T> node = helper.invoke(new CollectorTask<>(depth, spliterator, helper)); + CollectorTask<U, T> task = new CollectorTask<>(spliterator, helper.suggestTargetSize(), helper); + helper.invoke(task); + Node<T> node = task.result; if (flattenTree) { T[] array = (T[]) new Object[node.size()]; helper.invoke(new ToArrayTask<>(node, array, 0)); @@ -100,72 +101,101 @@ } } - private static class CollectorTask<T, U> extends RecursiveTask<Node<U>> { - private final int depth; - private final Spliterator<T> spliterator; - private ParallelOp.ParallelOpHelper<U, T> helper; + private static class CollectorTask<T, U> extends AbstractTask<T, Node<U>, CollectorTask<T,U>> { + private final ParallelOp.ParallelOpHelper<U, T> helper; - public CollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper) { - this.depth = depth; - this.spliterator = spliterator; + private CollectorTask(Spliterator<T> spliterator, long targetSize, ParallelOp.ParallelOpHelper<U, T> helper) { + super(spliterator, targetSize); this.helper = helper; } + private CollectorTask(CollectorTask<T, U> parent, Spliterator<T> spliterator) { + super(parent, spliterator); + helper = parent.helper; + } + @Override - protected Node<U> compute() { - if (depth == 0) { - // @@@ Usual comment about using fixed stream builders if we know enough to do so - StreamBuilder<U> builder = StreamBuilders.make(); - spliterator.into(helper.sink(builder)); - return node(builder); - } - else { - CollectorTask<T, U> left = new CollectorTask<>(depth-1, spliterator.split(), helper); - CollectorTask<T, U> right = new CollectorTask<>(depth-1, spliterator, helper); - right.fork(); - Node<U> leftResult = left.compute(); - Node<U> rightResult = right.join(); - return node(leftResult, rightResult); + protected CollectorTask<T, U> makeChild(Spliterator<T> spliterator) { + return new CollectorTask<>(this, spliterator); + } + + @Override + protected Node<U> doLeaf() { + // @@@ Usual comment about using fixed stream builders if we know enough to do so + StreamBuilder<U> builder = StreamBuilders.make(); + spliterator.into(helper.sink(builder)); + return node(builder); + } + + @Override + public void onCompletion(CountedCompleter caller) { + if (!isLeaf) { + @SuppressWarnings("unchecked") + Node<U>[] nodes = (Node<U>[]) new Node[numChildren]; + int idx = 0; + for (CollectorTask<T, U> cur = children; cur != null; cur = cur.nextSibling) + nodes[idx++] = cur.result; + result = node(nodes); } } } - private static class SizedCollectorTask<T, U> extends RecursiveAction { - private final int depth; + private static class SizedCollectorTask<T, U> extends CountedCompleter { private final Spliterator<T> spliterator; - private ParallelOp.ParallelOpHelper<U, T> helper; + private final ParallelOp.ParallelOpHelper<U, T> helper; + private final long targetSize; private final U[] array; private int offset; private int length; - private SizedCollectorTask(int depth, Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, U[] array, int offset, int length) { - this.depth = depth; + private SizedCollectorTask(Spliterator<T> spliterator, ParallelOp.ParallelOpHelper<U, T> helper, long targetSize, U[] array, int offset, int length) { this.spliterator = spliterator; this.helper = helper; + this.targetSize = targetSize; this.array = array; this.offset = offset; this.length = length; } + private SizedCollectorTask(SizedCollectorTask<T, U> parent, Spliterator<T> spliterator, int offset, int length) { + super(parent); + this.spliterator = spliterator; + this.helper = parent.helper; + this.targetSize = parent.targetSize; + this.array = parent.array; + this.offset = offset; + this.length = length; + } + @Override - protected void compute() { - if (depth == 0) { + public void compute() { + int remaining = spliterator.getRemainingSizeIfKnown(); + int naturalSplits = spliterator.getNaturalSplitArity(); + boolean isLeaf = ((remaining <= targetSize) && (remaining >= 0)) || (naturalSplits == 0); + if (isLeaf) { spliterator.into(helper.sink(Arrays.sink(array, offset, length))); + tryComplete(); } else { - Spliterator<T> split = spliterator.split(); - int leftSize = split.getRemainingSizeIfKnown(); - assert(leftSize != -1); - SizedCollectorTask<T, U> left = new SizedCollectorTask<>(depth-1, split, helper, array, offset, leftSize); - SizedCollectorTask<T, U> right = new SizedCollectorTask<>(depth-1, spliterator, helper, array, offset+leftSize, length-leftSize); - right.fork(); - left.compute(); - right.join(); + setPendingCount(naturalSplits); + int s = 0; + for (int i=0; i<naturalSplits; i++) { + Spliterator<T> split = spliterator.split(); + SizedCollectorTask<T, U> task = new SizedCollectorTask<>(this, split, offset + s, length - s); + task.fork(); + int thisSplitSize = split.getRemainingSizeIfKnown(); + if (thisSplitSize == -1) + throw new IllegalStateException("split size must be known"); + s += thisSplitSize; + } + + SizedCollectorTask<T, U> task = new SizedCollectorTask<>(this, spliterator, offset + s, length - s); + task.compute(); } } } - private static class ToArrayTask<T> extends RecursiveAction { + private static class ToArrayTask<T> extends CountedCompleter { private final T[] array; private final Node<T> node; private final int offset; @@ -177,18 +207,22 @@ } @Override - protected void compute() { + public void compute() { if (node instanceof InternalNode) { - InternalNode<T> n = (InternalNode<T>) node; - Node<T> left = n.left(); - ToArrayTask<T> leftTask = new ToArrayTask<>(left, array, offset); - ToArrayTask<T> rightTask = new ToArrayTask<>(n.right(), array, offset + left.size()); - rightTask.fork(); - leftTask.compute(); - rightTask.join(); + Node<T>[] nodes = ((InternalNode<T>) node).nodes(); + setPendingCount(nodes.length - 1); + ToArrayTask<T> firstTask = new ToArrayTask<>(nodes[0], array, offset); + int size = nodes[0].size(); + for (int i=1; i<nodes.length; i++) { + ToArrayTask<T> task = new ToArrayTask<>(nodes[i], array, offset + size); + size += nodes[i].size(); + task.fork(); + } + firstTask.compute(); } else { node.copyTo(array, offset); + tryComplete(); } } } @@ -200,8 +234,7 @@ } public static interface InternalNode<T> extends Node<T> { - Node<T> left(); - Node<T> right(); + Node<T>[] nodes(); } public static<T> Node<T> node(final T[] array) { @@ -299,45 +332,37 @@ }; } - public static<T> Node<T> node(Node<T> left, Node<T> right) { - return new InternalNodeImpl<>(left, right); + @SafeVarargs + public static<T> Node<T> node(Node<T>... nodes) { + return new InternalNodeImpl<>(nodes); } private static class InternalNodeImpl<T> implements InternalNode<T> { - private final Node<T> left, right; + private final Node<T>[] nodes; int size = 0; - private InternalNodeImpl(Node<T> left, Node<T> right) { - this.left = left; - this.right = right; - } - - @Override - public Node<T> left() { - return left; - } - - @Override - public Node<T> right() { - return right; + private InternalNodeImpl(Node<T>[] nodes) { + this.nodes = nodes; } @Override public int size() { - if (size == 0) - size = left.size() + right.size(); + if (size == 0) { + for (Node<T> n : nodes) + size += n.size(); + } return size; } @Override public void forEach(Block<? super T> block) { - left.forEach(block); - right.forEach(block); + for (Node<T> n : nodes) + n.forEach(block); } @Override public Iterator<T> iterator() { - return concatenate(left.iterator(), right.iterator()); + return concatenate(Arrays.stream(nodes).map(n -> n.iterator()).iterator()); } @Override @@ -357,21 +382,32 @@ @Override public void copyTo(T[] array, int offset) { - left.copyTo(array, offset); - right.copyTo(array, offset+left.size()); + int s = 0; + for (Node<T> n : nodes) { + n.copyTo(array, offset+s); + s += n.size(); + } } @Override public String toString() { - return String.format("IntNode[%s,%s]", left.toString(), right.toString()); + return String.format("IntNode[%s]", Arrays.stream(nodes).map(n -> n.toString()).into(new StringJoiner(",")).toString()); + } + + @Override + public Node<T>[] nodes() { + return nodes; } private static class InternalNodeSpliterator<T> implements Spliterator<T>, Iterator<T> { private Node<T> cur; + private int splitsLeft; + private int nextSplitIndex; private Iterator<T> iterator; private InternalNodeSpliterator(InternalNodeImpl<T> cur) { this.cur = cur; + splitsLeft = cur.nodes.length - 1; } public Iterator<T> iterator() { @@ -381,17 +417,33 @@ } @Override + public int getNaturalSplitArity() { + return splitsLeft; + } + + @Override public Spliterator<T> split() { if (iterator != null) - throw new IllegalStateException(); - if (cur instanceof InternalNodeImpl) { - InternalNodeImpl<T> internalNode = (InternalNodeImpl<T>) cur; - Spliterator<T> ret = internalNode.left.spliterator(); - cur = internalNode.right; + throw new IllegalStateException("split after iterate"); + else if (splitsLeft == 0 || !(cur instanceof InternalNode)) + return Streams.emptySpliterator(); + else { + Node<T>[] nodes = ((InternalNode<T>) cur).nodes(); + Spliterator<T> ret = nodes[nextSplitIndex++].spliterator(); + if (--splitsLeft == 0) { + cur = nodes[nextSplitIndex]; + if (cur instanceof InternalNode) { + Node<T>[] newNodes = ((InternalNode<T>) cur).nodes(); + splitsLeft = newNodes.length - 1; + nextSplitIndex = 0; + } + else { + splitsLeft = 0; + nextSplitIndex = -1; + } + } return ret; } - else - return Streams.emptySpliterator(); } @Override @@ -430,21 +482,21 @@ } } - public static<T> Iterator<T> concatenate(final Iterator<? extends T> left, final Iterator<? extends T> right) { + public static<T> Iterator<T> concatenate(final Iterator<Iterator<T>> iterators) { + if (!iterators.hasNext()) + return Collections.emptyIterator(); return new Iterator<T>() { - private Iterator<? extends T> it = left; - private boolean switched = false; + private Iterator<? extends T> it = iterators.next(); @Override public boolean hasNext() { if (it.hasNext()) { return true; - } else if (switched) { + } else if (!iterators.hasNext()) { return false; } else { - switched = true; - it = right; - return it.hasNext(); + it = iterators.next(); + return hasNext(); } }
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Tue Sep 11 15:33:33 2012 -0700 +++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Wed Sep 12 16:04:04 2012 -0400 @@ -165,16 +165,16 @@ data.forEach(sink2); sink2.end(); U answer2 = terminalSink2.getAndClearState(); - assertTrue(equalator.test(answer, answer2)); + assertTrue(equalator.test(answer, answer2), String.format("%s != %s", answer, answer2)); } // Third pass -- wrap with SequentialPipeline.op U answer3 = data.seq(terminalOp, ops); - assertTrue(equalator.test(answer, answer3)); + assertTrue(equalator.test(answer, answer3), String.format("%s != %s", answer, answer3)); // Fourth pass -- wrap with ParallelPipeline.op U answer4 = data.par(terminalOp, ops); - assertTrue(equalator.test(answer, answer4)); + assertTrue(equalator.test(answer, answer4), String.format("%s != %s", answer, answer4)); return answer; } @@ -310,6 +310,11 @@ } @Override + public long suggestTargetSize() { + return ForkJoinUtils.suggestTargetSize(data.size()); + } + + @Override public Spliterator<T> spliterator() { return data.spliterator(); }