OpenJDK / lambda / lambda / jdk
changeset 7820:e473df6aaa89
Cleanups in files that are part of JDK-8008682, in preparation for final review
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractShortCircuitTask.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/AbstractShortCircuitTask.java Thu Apr 04 22:28:40 2013 -0400 @@ -32,14 +32,15 @@ * stream ops, which can produce a result without processing all elements of the * stream. * - * @param <P_IN> Type of elements input to the pipeline - * @param <P_OUT> Type of elements output from the pipeline + * @param <P_IN> Type of input elements to the pipeline + * @param <P_OUT> Type of output elements from the pipeline * @param <R> Type of intermediate result, may be different from operation * result type * @param <T> Type of child and sibling tasks * @since 1.8 */ -abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, T extends AbstractShortCircuitTask<P_IN, P_OUT, R, T>> +abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, + T extends AbstractShortCircuitTask<P_IN, P_OUT, R, T>> extends AbstractTask<P_IN, P_OUT, R, T> { /** * The result for this computation; this is shared among all tasks and set @@ -55,14 +56,25 @@ */ protected volatile boolean canceled; - /** Constructor for root nodes */ + /** + * Constructor for root nodes. + * @param helper The {@code PipelineHelper} describing the stream pipeline + * up to this operation + * @param spliterator The {@code Spliterator} describing the source for this + * pipeline + */ protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator) { super(helper, spliterator); sharedResult = new AtomicReference<>(null); } - /** Constructor for non-root nodes */ + /** + * Constructor for non-root nodes. + * @param parent Parent task in the computation tree + * @param spliterator The {@code Spliterator} for the portion of the + * computation tree described by this task + */ protected AbstractShortCircuitTask(T parent, Spliterator<P_IN> spliterator) { super(parent, spliterator); @@ -73,6 +85,8 @@ * Returns the value indicating the computation completed with no task * finding a short-circuitable result. For example, for a "find" operation, * this might be null or an empty {@code Optional}. + * + * @return the result to return when no task finds a result */ protected abstract R getEmptyResult(); @@ -98,6 +112,7 @@ * {@code sharedResult}. The {@code compute()} method will check * {@code sharedResult} before proceeding with computation, so this causes * the computation to terminate early. + * @param result The result found */ protected void shortCircuit(R result) { if (result != null) @@ -107,6 +122,7 @@ /** * Sets a local result for this task. If this task is the root, set the * shared result instead (if not already set). + * @param localResult The result to set for this task */ @Override protected void setLocalResult(R localResult) { @@ -138,7 +154,7 @@ return super.getLocalResult(); } - /** Set this node as canceled */ + /** Mark this node as canceled */ protected void cancel() { canceled = true; }
--- a/src/share/classes/java/util/stream/AbstractTask.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/AbstractTask.java Thu Apr 04 22:28:40 2013 -0400 @@ -37,6 +37,7 @@ * the {@code Spliterator}) or internal nodes (which split the * {@code Spliterator} into multiple child tasks). * + * @implNote * <p>This class is based on {@link CountedCompleter}, a form of fork-join task * where each task has a semaphore-like count of uncompleted children, and the * task is implicitly completed and notified when its last child completes. @@ -79,11 +80,12 @@ * @param <T> Type of parent, child and sibling tasks * @since 1.8 */ -abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>> +abstract class AbstractTask<P_IN, P_OUT, R, + T extends AbstractTask<P_IN, P_OUT, R, T>> extends CountedCompleter<R> { /** - * Default target number of leaf tasks for parallel decomposition. + * Default target factor of leaf tasks for parallel decomposition. * To allow load balancing, we over-partition, currently to approximately * four tasks per processor, which enables others to help out * if leaf tasks are uneven or some processors are otherwise busy. @@ -99,7 +101,7 @@ */ protected Spliterator<P_IN> spliterator; - /** Target leaf size */ + /** Target leaf size, common to all tasks in a computation */ protected final long targetSize; /** @@ -116,6 +118,10 @@ /** * Constructor for root nodes. + * @param helper The {@code PipelineHelper} describing the stream pipeline + * up to this operation + * @param spliterator The {@code Spliterator} describing the source for this + * pipeline */ protected AbstractTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator) { @@ -126,19 +132,6 @@ } /** - * Alternate constructor for root nodes that have already gotten the - * Spliterator from the helper. - */ - protected AbstractTask(PipelineHelper<P_OUT> helper, - Spliterator<P_IN> spliterator, - long targetSize) { - super(null); - this.helper = helper; - this.spliterator = spliterator; - this.targetSize = targetSize; - } - - /** * Constructor for non-root nodes * * @param parent This node's parent task @@ -176,12 +169,11 @@ * Suggests whether it is adviseable to split the provided spliterator based * on target size and other considerations, such as pool state */ - public static<P_IN, P_OUT> boolean suggestSplit(PipelineHelper<P_OUT> helper, - Spliterator spliterator, - long targetSize) { + public static boolean suggestSplit(Spliterator spliterator, + long targetSize) { long remaining = spliterator.estimateSize(); return (remaining > targetSize); - // @@@ May want to fold in pool characteristics such as surplus task count + // @@@ May additionally want to fold in pool characteristics such as surplus task count } /** @@ -189,11 +181,11 @@ * and other considerations */ public boolean suggestSplit() { - return suggestSplit(helper, spliterator, targetSize); + return suggestSplit(spliterator, targetSize); } /** - * Returns the local result, if any. Subclasses should use + * Returns the local result, if any. Subclasses should use * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage * results. This returns the local result so that calls from within the * fork-join framework will return the correct result. @@ -235,7 +227,7 @@ } /** - * Determines if this this task a leaf node. (Only valid after + * Indicates whether this this task a leaf node. (Only valid after * {@link #compute} has been called on this node). If the node is not a * leaf node, then children will be non-null and numChildren will be * positive. @@ -245,7 +237,7 @@ } /** - * Determines if this task is a root node + * Indicates whether this task is the root node */ protected boolean isRoot() { return getParent() == null; @@ -302,6 +294,7 @@ /** * {@inheritDoc} + * @implNote * Clears spliterator and children fields. Overriders MUST call * {@code super.onCompletion} as the last thing they do if they want these * cleared @@ -314,6 +307,7 @@ /** * Determines if the task can be computed. + * @implSpec The default always returns true * * @return true if this task can be computed to either calculate the leaf * via {@link #doLeaf()} or split, otherwise false if this task
--- a/src/share/classes/java/util/stream/FindOps.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/FindOps.java Thu Apr 04 22:28:40 2013 -0400 @@ -30,9 +30,9 @@ import java.util.function.Supplier; /** - * Factory for creating instances of a short-circuiting {@code TerminalOp} - * that searches for an element in a stream pipeline, and terminates when it - * finds one. The search supports find-first (find the first element in the + * Factory for instances of a short-circuiting {@code TerminalOp} that searches + * for an element in a stream pipeline, and terminates when it finds one. + * Supported variants include find-first (find the first element in the * encounter order) and find-any (find any element, may not be the first in * encounter order.) *
--- a/src/share/classes/java/util/stream/ForEachOps.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/ForEachOps.java Thu Apr 04 22:28:40 2013 -0400 @@ -34,15 +34,16 @@ import java.util.Objects; /** - * Factory for creating instances of {@code TerminalOp} that implement - * {@code forEach} traversal over elements of a stream. + * Factory for creating instances of {@code TerminalOp} that perform an + * action for every element of a stream. Supported variants include unordered + * traversal (elements are provided to the {@code Consumer} as soon as they are + * available), and ordered traversal (elements are provided to the + * {@code Consumer} in encounter order.) * - * <p>{@code forEach} traverses all elements of a stream and sends those - * elements to a {@code Consumer}. - * - * <p>For either type of traversal elements will be sent to the {@code Consumer} - * on whatever thread and whatever order they become available, independent of - * the stream's encounter order. + * <p>Elements are provided to the {@code Consumer} on whatever thread and + * whatever order they become available. For ordered traversals, it is + * guaranteed that processing an element <em>happens-before</em> processing + * subsequent elements in the encounter order. * * <p>Exceptions occurring as a result of sending an element to the * {@code Consumer} will be relayed to the caller and traversal will be @@ -55,80 +56,76 @@ private ForEachOps() { } /** - * Constructs a {@code TerminalOp} that implements {@code forEach} - * traversal, which traverses all elements of a {@code Stream} and sends - * those elements the provided {@code Consumer}. + * Constructs a {@code TerminalOp} that perform an action for every element + * of a stream. * - * @param consumer The {@code Consumer} that receives all elements of a + * @param action The {@code Consumer} that receives all elements of a * stream + * @param ordered Whether an ordered traversal is requested * @param <T> The type of the stream elements * @return the {@code TerminalOp} instance */ - public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> consumer, boolean ordered) { - Objects.requireNonNull(consumer); - return new ForEachOp.OfRef<>(consumer, ordered); + public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) { + Objects.requireNonNull(action); + return new ForEachOp.OfRef<>(action, ordered); } /** - * Constructs a {@code TerminalOp} that implements {@code forEach} - * traversal, which traverses all {@code int} elements of a - * {@code IntStream} and sends those elements the provided - * {@code IntConsumer}. + * Constructs a {@code TerminalOp} that perform an action for every element + * of an {@code IntStream}. * - * @param consumer The {@code IntConsumer} that receives all elements of a + * @param action The {@code IntConsumer} that receives all elements of a * stream + * @param ordered Whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ - public static TerminalOp<Integer, Void> makeInt(IntConsumer consumer, boolean ordered) { - Objects.requireNonNull(consumer); - return new ForEachOp.OfInt(consumer, ordered); + public static TerminalOp<Integer, Void> makeInt(IntConsumer action, boolean ordered) { + Objects.requireNonNull(action); + return new ForEachOp.OfInt(action, ordered); } /** - * Constructs a {@code TerminalOp} that implements {@code forEach} - * traversal, which traverses all {@code long} elements of a - * {@code LongStream} and sends those elements the provided - * {@code LongConsumer}. + * Constructs a {@code TerminalOp} that perform an action for every element + * of an {@code LongStream}. * - * @param consumer The {@code LongConsumer} that receives all elements of a + * @param action The {@code LongConsumer} that receives all elements of a * stream + * @param ordered Whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ - public static TerminalOp<Long, Void> makeLong(LongConsumer consumer, boolean ordered) { - Objects.requireNonNull(consumer); - return new ForEachOp.OfLong(consumer, ordered); + public static TerminalOp<Long, Void> makeLong(LongConsumer action, boolean ordered) { + Objects.requireNonNull(action); + return new ForEachOp.OfLong(action, ordered); } /** - * Constructs a {@code TerminalOp} that implements {@code forEach} - * traversal, which traverses all {@code double} elements of a - * {@code DoubleStream} and sends those elements the provided - * {@code DoubleConsumer}. + * Constructs a {@code TerminalOp} that perform an action for every element + * of an {@code DoubleStream}. * - * @param consumer The {@code DoubleConsumer} that receives all elements of + * @param action The {@code DoubleConsumer} that receives all elements of * a stream + * @param ordered Whether an ordered traversal is requested * @return the {@code TerminalOp} instance */ - public static TerminalOp<Double, Void> makeDouble(DoubleConsumer consumer, boolean ordered) { - Objects.requireNonNull(consumer); - return new ForEachOp.OfDouble(consumer, ordered); + public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action, boolean ordered) { + Objects.requireNonNull(action); + return new ForEachOp.OfDouble(action, ordered); } /** * A {@code TerminalOp} that evaluates a stream pipeline and sends the * output to itself as a {@code TerminalSink}. Elements will be sent in - * whatever thread and whatever order they become available, independent of - * the stream's encounter order. + * whatever thread they become available. If the traversal is unordered, + * they will be sent independent of the stream's encounter order. * - * <p>This terminal operation is stateless. For parallel evaluation each + * <p>This terminal operation is stateless. For parallel evaluation, each * leaf instance of a {@code ForEachTask} will send elements to the same - * {@code TerminalSink} reference that is an instance of this class. State - * management, if any, is deferred to the consumer, held by the concrete - * sub-classes, that is the final receiver elements. + * {@code TerminalSink} reference that is an instance of this class. * * @param <T> The output type of the stream pipeline */ - private static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> { + private static abstract class ForEachOp<T> + implements TerminalOp<T, Void>, TerminalSink<T, Void> { private final boolean ordered; protected ForEachOp(boolean ordered) { @@ -143,12 +140,14 @@ } @Override - public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) { + public <S> Void evaluateSequential(PipelineHelper<T> helper, + Spliterator<S> spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); } @Override - public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) { + public <S> Void evaluateParallel(PipelineHelper<T> helper, + Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this).invoke(); else @@ -165,7 +164,7 @@ // Implementations - /** {@code forEach} with {@code Stream} */ + /** Implementation class for reference streams */ private static class OfRef<T> extends ForEachOp<T> { final Consumer<? super T> consumer; @@ -180,7 +179,7 @@ } } - /** {@code forEach} with {@code IntStream} */ + /** Implementation class for {@code IntStream} */ private static class OfInt extends ForEachOp<Integer> implements Sink.OfInt { final IntConsumer consumer; @@ -200,7 +199,7 @@ } } - /** {@code forEach} with {@code LongStream} */ + /** Implementation class for {@code LongStream} */ private static class OfLong extends ForEachOp<Long> implements Sink.OfLong { final LongConsumer consumer; @@ -220,7 +219,7 @@ } } - /** {@code forEach} with {@code DoubleStream} */ + /** Implementation class for {@code DoubleStream} */ private static class OfDouble extends ForEachOp<Double> implements Sink.OfDouble { final DoubleConsumer consumer; @@ -274,7 +273,7 @@ } Spliterator<S> split; - if (!AbstractTask.suggestSplit(helper, spliterator, targetSize) + if (!AbstractTask.suggestSplit(spliterator, targetSize) || (split = spliterator.trySplit()) == null) { helper.copyInto(sink, spliterator); propagateCompletion(); @@ -289,7 +288,10 @@ } } - /** A {@code ForkJoinTask} for performing a parallel ordered for-each operation */ + /** + * A {@code ForkJoinTask} for performing a parallel for-each operation + * which visits the elements in encounter order + */ private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { private final PipelineHelper<T> helper; private Spliterator<S> spliterator; @@ -300,7 +302,9 @@ private final ForEachOrderedTask<S, T> leftPredecessor; private Node<T> node; - protected ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action) { + protected ForEachOrderedTask(PipelineHelper<T> helper, + Spliterator<S> spliterator, + Sink<T> action) { super(null); this.helper = helper; this.spliterator = spliterator; @@ -332,7 +336,7 @@ private static<S, T> void doCompute(ForEachOrderedTask<S, T> task) { while (true) { Spliterator<S> split; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || (split = task.spliterator.trySplit()) == null) { if (task.getPendingCount() == 0) { task.helper.wrapAndCopyInto(task.action, task.spliterator);
--- a/src/share/classes/java/util/stream/MatchOps.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/MatchOps.java Thu Apr 04 22:28:40 2013 -0400 @@ -33,9 +33,9 @@ import java.util.function.Supplier; /** - * Factory for creating instances of a short-circuiting {@code TerminalOp} - * that evaluates a predicate on the elements of a stream and determines whether - * all, any or none of those elements match the predicate. + * Factory for instances of a short-circuiting {@code TerminalOp} that + * implement quantified predicate matching on the elements of a stream. + * Supported variants include match-all, match-any, and match-none. * * @since 1.8 */ @@ -67,8 +67,7 @@ } /** - * Constructs a {@code TerminalOp} for the given predicate and quantified - * match criteria + * Constructs a quantified predicate matcher for a Stream * * @param predicate The {@code Predicate} to apply to stream elements * @param matchKind The kind of quantified match (all, any, none) @@ -86,8 +85,6 @@ @Override public void accept(T t) { - // @@@ assert !stop when SortedOp supports short-circuit on Sink.end - // for sequential operations if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) { stop = true; value = matchKind.shortCircuitResult; @@ -95,7 +92,7 @@ } } - // @@@ Change to return MatchSink::new when compiler and runtime bugs are fixed + // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref Supplier<BooleanTerminalSink<T>> s = new Supplier<BooleanTerminalSink<T>>() { @Override public BooleanTerminalSink<T> get() {return new MatchSink();} @@ -104,8 +101,7 @@ } /** - * Constructs a {@code TerminalOp} for the given predicate and quantified - * match criteria for an {@code IntStream} + * Constructs a quantified predicate matcher for an {@code IntStream} * * @param predicate The {@code Predicate} to apply to stream elements * @param matchKind The kind of quantified match (all, any, none) @@ -129,6 +125,7 @@ } } + // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref Supplier<BooleanTerminalSink<Integer>> s = new Supplier<BooleanTerminalSink<Integer>>() { @Override public BooleanTerminalSink<Integer> get() {return new MatchSink();} @@ -137,8 +134,7 @@ } /** - * Constructs a {@code TerminalOp} for the given predicate and quantified - * match criteria for a {@code LongStream} + * Constructs a quantified predicate matcher for a {@code LongStream} * * @param predicate The {@code Predicate} to apply to stream elements * @param matchKind The kind of quantified match (all, any, none) @@ -163,6 +159,7 @@ } } + // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref Supplier<BooleanTerminalSink<Long>> s = new Supplier<BooleanTerminalSink<Long>>() { @Override public BooleanTerminalSink<Long> get() {return new MatchSink();} @@ -171,8 +168,7 @@ } /** - * Constructs a {@code TerminalOp} for the given predicate and quantified - * match criteria for a {@code DoubleStream} + * Constructs a quantified predicate matcher for a {@code DoubleStream} * * @param predicate The {@code Predicate} to apply to stream elements * @param matchKind The kind of quantified match (all, any, none) @@ -197,6 +193,7 @@ } } + // @@@ Workaround for JDK-8011591 -- when fixed, replace s with constructor ref Supplier<BooleanTerminalSink<Double>> s = new Supplier<BooleanTerminalSink<Double>>() { @Override public BooleanTerminalSink<Double> get() {return new MatchSink();} @@ -288,14 +285,17 @@ * @param <S> The type of source elements for the pipeline * @param <T> The type of output elements for the pipeline */ - private static final class MatchTask<S, T> extends AbstractShortCircuitTask<S, T, Boolean, MatchTask<S, T>> { + private static final class MatchTask<S, T> + extends AbstractShortCircuitTask<S, T, Boolean, MatchTask<S, T>> { private final MatchOp<T> op; + /** Constructor for root node */ MatchTask(MatchOp<T> op, PipelineHelper<T> helper, Spliterator<S> spliterator) { super(helper, spliterator); this.op = op; } + /** Constructor for non-root node */ MatchTask(MatchTask<S, T> parent, Spliterator<S> spliterator) { super(parent, spliterator); this.op = parent.op;
--- a/src/share/classes/java/util/stream/Node.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/Node.java Thu Apr 04 22:28:40 2013 -0400 @@ -48,12 +48,11 @@ * <p>A {@code Node} typically does not store the elements directly, but instead * mediates access to one or more existing (effectively immutable) data * structures such as a {@code Collection}, array, or a set of other - * {@code Node}s. {@code Node}s directly representing existing data structures - * are considered <em>flat</em> (have no children); commonly {@code Node}s are - * formed into a tree whose shape corresponds to the computation tree that - * produced the elements that are contained in the leaf nodes. The use of - * {@code Node} within the stream framework is largely to avoid copying data - * unnecessarily during parallel operations. + * {@code Node}s. Commonly {@code Node}s are formed into a tree whose shape + * corresponds to the computation tree that produced the elements that are + * contained in the leaf nodes. The use of {@code Node} within the stream + * framework is largely to avoid copying data unnecessarily during parallel + * operations. * * @param <T> the type of elements. * @since 1.8 @@ -65,7 +64,7 @@ * {@code Node}. * * @return a {@code Spliterator} describing the elements contained in this - * {@code Node}. + * {@code Node} */ Spliterator<T> spliterator(); @@ -92,24 +91,23 @@ /** * Retrieves the child {@code Node} at a given index. * - * @implSpec The default implementation throws + * @implSpec The default implementation always throws * {@code IndexOutOfBoundsException} * @param i the index to the child node * @return the child node * @throws IndexOutOfBoundsException if the index is less than 0 or greater - * than or equal to the - * number of child nodes. + * than or equal to the number of child nodes. */ default Node<T> getChild(int i) { throw new IndexOutOfBoundsException(); } /** - * Provide an array view of the contents of this node. + * Provides an array view of the contents of this node. * * <p>Depending on the underlying implementation, this may return a * reference to an internal array rather than a copy. Since the returned - * array may be shared, the resulting array should not be modified. The + * array may be shared, the returned array should not be modified. The * {@code generator} function may be consulted to create the array if a new * array needs to be created. * @@ -160,8 +158,8 @@ interface Builder<T> extends Sink<T> { /** - * Builds the node. Should be called after all elements have been pushed - * and signalled with an invocation of {@link Sink#end()}. + * Builds the node. Should be called after all elements have been + * pushed and signalled with an invocation of {@link Sink#end()}. * * @return the resulting {@code Node} */ @@ -280,7 +278,8 @@ * at a given offset into the array. It is the caller's responsibility * to ensure there is sufficient room in the array. * - * @param array the array into which to copy the contents of this {@code Node} + * @param array the array into which to copy the contents of this + * {@code Node} * @param offset the starting offset within the array * @throws IndexOutOfBoundsException if copying would cause access of * data outside array bounds
--- a/src/share/classes/java/util/stream/NodeUtils.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/NodeUtils.java Thu Apr 04 22:28:40 2013 -0400 @@ -305,7 +305,7 @@ private final IntFunction<U[]> generator; CollectorTask(PipelineHelper<U> helper, IntFunction<U[]> generator, Spliterator<T> spliterator) { - super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize())); + super(helper, spliterator); this.helper = helper; this.generator = generator; } @@ -388,7 +388,7 @@ private static <T, U> void doCompute(SizedCollectorTask<T, U> task) { while (true) { Spliterator<T> leftSplit; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || ((leftSplit = task.spliterator.trySplit()) == null)) { if (task.offset + task.length >= Streams.MAX_ARRAY_SIZE) throw new IllegalArgumentException("Stream size exceeds max array size"); @@ -494,7 +494,7 @@ private final PipelineHelper<Integer> helper; IntCollectorTask(PipelineHelper<Integer> helper, Spliterator<T> spliterator) { - super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize())); + super(helper, spliterator); this.helper = helper; } @@ -574,7 +574,7 @@ private static <T> void doCompute(IntSizedCollectorTask<T> task) { while (true) { Spliterator<T> leftSplit; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || ((leftSplit = task.spliterator.trySplit()) == null)) { if (task.offset + task.length >= Streams.MAX_ARRAY_SIZE) throw new IllegalArgumentException("Stream size exceeds max array size"); @@ -677,7 +677,7 @@ private final PipelineHelper<Long> helper; LongCollectorTask(PipelineHelper<Long> helper, Spliterator<T> spliterator) { - super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize())); + super(helper, spliterator); this.helper = helper; } @@ -757,7 +757,7 @@ private static <T> void doCompute(LongSizedCollectorTask<T> task) { while (true) { Spliterator<T> leftSplit; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || ((leftSplit = task.spliterator.trySplit()) == null)) { if (task.offset + task.length >= Streams.MAX_ARRAY_SIZE) throw new IllegalArgumentException("Stream size exceeds max array size"); @@ -859,7 +859,7 @@ private final PipelineHelper<Double> helper; DoubleCollectorTask(PipelineHelper<Double> helper, Spliterator<T> spliterator) { - super(helper, spliterator, AbstractTask.suggestTargetSize(spliterator.estimateSize())); + super(helper, spliterator); this.helper = helper; } @@ -939,7 +939,7 @@ private static <T> void doCompute(DoubleSizedCollectorTask<T> task) { while (true) { Spliterator<T> leftSplit; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize) || ((leftSplit = task.spliterator.trySplit()) == null)) { if (task.offset + task.length >= Streams.MAX_ARRAY_SIZE) throw new IllegalArgumentException("Stream size exceeds max array size");
--- a/src/share/classes/java/util/stream/PipelineHelper.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/PipelineHelper.java Thu Apr 04 22:28:40 2013 -0400 @@ -28,27 +28,26 @@ import java.util.function.IntFunction; /** - * Helper class for executing - * <a href="package-summary.html#StreamPipelines">stream pipelines</a>, - * capturing all of the information about a stream pipeline (source, output - * shape, stream flags, parallelism, etc) in one place. + * Helper class for executing <a href="package-summary.html#StreamPipelines"> + * stream pipelines</a>, capturing all of the information about a stream + * pipeline (output shape, intermediate operations, stream flags, parallelism, + * etc) in one place. * * @apiNote - * A stream pipeline consists of a source, zero or more intermediate operations, - * and a terminal operation. Execution of the stream pipeline begins when the - * terminal operation is executed. A {@code PipelineHelper} describes the - * portion of a stream pipeline including its source, some or all of its - * intermediate operations, and certain information about the terminal (or - * stateful) operation which follows the last intermediate operation described - * by this {@code PipelineHelper}. The {@code PipelineHelper} is passed to the + * A {@code PipelineHelper} describes the initial segment of a stream pipeline, + * including its source, intermediate operations, and may additionally + * incorporate information about the terminal (or stateful) operation which + * follows the last intermediate operation described by this + * {@code PipelineHelper}. The {@code PipelineHelper} is passed to the * {@link TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator)}, - * {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)}, and - * {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator, - * java.util.function.IntFunction)}, methods, which can use the {@code PipelineHelper} - * to access the source {@code Spliterator} for the pipeline, information about the pipeline - * such as input shape, output shape, stream flags, and size, and use the helper methods - * such as {@link #wrapAndCopyInto(Sink, Spliterator)}, {@link #copyInto(Sink, Spliterator)}, - * and {@link #wrapSink(Sink)} to execute pipeline operations. + * {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)}, + * and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator, + * java.util.function.IntFunction)}, methods, which can use the + * {@code PipelineHelper} to access information about the pipeline such as + * input shape, output shape, stream flags, and size, and use the helper methods + * such as {@link #wrapAndCopyInto(Sink, Spliterator)}, + * {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute + * pipeline operations. * * @param <P_OUT> Type of output elements from the pipeline * @since 1.8 @@ -56,12 +55,11 @@ abstract class PipelineHelper<P_OUT> { /** - * Gets the combined stream and operation flags for the output of the + * Gets the combined stream and operation flags for the output of the described * pipeline. This will incorporate stream flags from the stream source, all * the intermediate operations and the terminal operation. * - * @return the combined stream and operation flags for the output of the - * pipeline + * @return the combined stream and operation flags * @see StreamOpFlag */ abstract int getStreamAndOpFlags(); @@ -97,8 +95,7 @@ * </pre> * * @param sink the {@code Sink} to receive the results - * @param spliterator the spliterator describing the portion of the source - * input to process + * @param spliterator the spliterator describing the source input to process */ abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator); @@ -155,8 +152,8 @@ * @param exactSizeIfKnown if >=0 then a builder will be created that has a * fixed capacity of exactly sizeIfKnown elements; if < 0 then the * builder has variable capacity. A fixed capacity builder will fail - * if an element is added and the builder has reached capacity. - * @param generator the array generator + * if an element is added after the builder has reached capacity. + * @param generator a factory function for array instances * @return A {@code Node.Builder} compatible with the output shape of this * {@code PipelineHelper} */ @@ -167,10 +164,10 @@ * Collects all output elements resulting from applying the pipeline stages * to the source {@code Spliterator} into a {@code Node}. * - * @implSpec + * @implNote * If the pipeline has no intermediate operations and the source is backed - * by a {@code Node} then that {@code Node} will be returned or flattened - * and then returned. This reduces copying for a pipeline consisting of a + * by a {@code Node} then that {@code Node} will be returned (or flattened + * and then returned). This reduces copying for a pipeline consisting of a * stateful operation followed by a terminal operation that returns an * array, such as: * <pre>{@code @@ -182,7 +179,7 @@ * {@code Node} returned will contain no children, otherwise the * {@code Node} may represent the root in a tree that reflects the * shape of the computation tree. - * @param generator the array generator + * @param generator a factory function for array instances * @return the {@code Node} containing all output elements */ abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
--- a/src/share/classes/java/util/stream/Sink.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/Sink.java Thu Apr 04 22:28:40 2013 -0400 @@ -105,21 +105,21 @@ * }; * </pre> * - * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect to - * receive elements of type {@code U} as input, and pass the downstream sink to - * the constructor. Because the next stage expects to receive integers, we must - * call the {@code accept(int)} method when emitting values to the downstream. + * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect + * to receive elements of type {@code U} as input, and pass the downstream sink + * to the constructor. Because the next stage expects to receive integers, we + * must call the {@code accept(int)} method when emitting values to the downstream. * The {@code accept()} method applies the mapping function from {@code U} to * {@code int} and passes the resulting value to the downstream {@code Sink}. * * @param <T> Type of elements for value streams * @since 1.8 */ -@FunctionalInterface interface Sink<T> extends Consumer<T> { /** - * Resets the sink state to receive a fresh data set. This is used when a - * {@code Sink} is being reused by multiple calculations. + * Resets the sink state to receive a fresh data set. This must be called + * before sending any data to the sink. After calling {@link #end()}, + * you may call this method to reset the sink for another calculation. * @param size The exact size of the data to be pushed downstream, if * known or {@code Long.MAX_VALUE} if unknown or infinite. * @@ -131,8 +131,7 @@ /** * Indicates that all elements have been pushed. If the {@code Sink} is * stateful, it should send any stored state downstream at this time, and - * should clear any accumulated state (and associated resources) so that the - * sink may be reused for another computation. + * should clear any accumulated state (and associated resources). * * <p>Prior to this call, the sink must be in the active state, and after * this call it is returned to the initial state. @@ -140,8 +139,7 @@ default void end() {} /** - * Communicates to upstream sources that this {@code Sink} does not - * wish to receive any more data + * Indicates that this {@code Sink} does not wish to receive any more data. * * @implSpec The default implementation always returns false * @@ -152,7 +150,7 @@ } /** - * Accepts an int value + * Accepts an int value. * * @implSpec The default implementation throws IllegalStateException * @@ -163,7 +161,7 @@ } /** - * Accepts a long value + * Accepts a long value. * @implSpec The default implementation throws IllegalStateException * * @throws IllegalStateException If this sink does not accept long values @@ -173,7 +171,7 @@ } /** - * Accepts a double value + * Accepts a double value. * @implSpec The default implementation throws IllegalStateException * * @throws IllegalStateException If this sink does not accept double values @@ -187,7 +185,6 @@ * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to * {@code accept(int)}. */ - @FunctionalInterface interface OfInt extends Sink<Integer>, IntConsumer { @Override void accept(int value); @@ -205,7 +202,6 @@ * {@code accept(long)}, and wires {@code accept(Long)} to bridge to * {@code accept(long)}. */ - @FunctionalInterface interface OfLong extends Sink<Long>, LongConsumer { @Override void accept(long value); @@ -223,7 +219,6 @@ * {@code accept(double)}, and wires {@code accept(Double)} to bridge to * {@code accept(double)}. */ - @FunctionalInterface interface OfDouble extends Sink<Double>, DoubleConsumer { @Override void accept(double value); @@ -237,8 +232,8 @@ } /** - * Abstract {@code Sink} implementation designed for creating chains of - * sinks. The {@code begin} and {@code end}, and + * Abstract {@code Sink} implementation for creating chains of + * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink<T>}. The @@ -270,7 +265,7 @@ /** * Abstract {@code Sink} implementation designed for creating chains of - * sinks. The {@code begin} and {@code end}, and + * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}. @@ -302,7 +297,7 @@ /** * Abstract {@code Sink} implementation designed for creating chains of - * sinks. The {@code begin} and {@code end}, and + * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}. @@ -334,7 +329,7 @@ /** * Abstract {@code Sink} implementation designed for creating chains of - * sinks. The {@code begin} and {@code end}, and + * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
--- a/src/share/classes/java/util/stream/SliceOps.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/SliceOps.java Thu Apr 04 22:28:40 2013 -0400 @@ -31,7 +31,8 @@ import java.util.function.IntFunction; /** - * Factory methods for transforming a stream into a subsequence of itself. + * Factory for instances of a short-circuiting stateful intermediate operations + * that produce subsequences of their input stream. * * @since 1.8 */
--- a/src/share/classes/java/util/stream/StreamShape.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/StreamShape.java Thu Apr 04 22:28:40 2013 -0400 @@ -37,18 +37,14 @@ * @apiNote * This enum is used by implementations to determine compatibility between * streams and operations (i.e., if the output shape of a stream is compatible - * with the input shape of the next operation). It is also used to reduce the - * code bloat associated with having multiple specialized stream types for - * primitives by allowing some code to be largely shape-independent. + * with the input shape of the next operation). * * <p>Some APIs require you to specify both a generic type and a stream shape - * for input or output elements, such as {@link IntermediateOp} which has both - * generic type parameters for its input and output types, and getters for the - * input and output shape. When representing primitive streams in this way, the + * for input or output elements, such as {@link TerminalOp} which has both + * generic type parameters for its input types, and a getter for the + * input shape. When representing primitive streams in this way, the * generic type parameter should correspond to the wrapper type for that - * primitive type. Accordingly, the {@code IntermediateOp} implementing - * {@link Stream#mapToInt(ToIntFunction)} would have an output type parameter of - * {@code Integer} and an output shape of @{code INT_VALUE}. + * primitive type. * @since 1.8 */ enum StreamShape {
--- a/src/share/classes/java/util/stream/TerminalOp.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/TerminalOp.java Thu Apr 04 22:28:40 2013 -0400 @@ -52,21 +52,21 @@ default StreamShape inputShape() { return StreamShape.REFERENCE; } /** - * Gets the properties of the operation. Terminal operations may set a + * Gets the stream flags of the operation. Terminal operations may set a * limited subset of the stream flags defined in {@link StreamOpFlag}, and * these flags are combined with the previously combined stream and * intermediate operation flags for the pipeline. * * @implSpec The default implementation returns zero - * @return the properties of the operation - * @see {@link StreamOpFlag} + * @return the stream flags for this operation + * @see StreamOpFlag */ default int getOpFlags() { return 0; } /** * Performs a parallel evaluation of the operation using the specified - * {@code PipelineHelper}, which describes the stream source and upstream - * intermediate operations. + * {@code PipelineHelper}, which describes the upstream intermediate + * operations. * * @implSpec The default performs a sequential evaluation of the operation * using the specified {@code PipelineHelper} @@ -84,8 +84,8 @@ /** * Performs a sequential evaluation of the operation using the specified - * {@code PipelineHelper}, which describes the stream source and upstream - * intermediate operations. + * {@code PipelineHelper}, which describes the upstream intermediate + * operations. * * @param helper the pipeline helper * @param spliterator the source spliterator
--- a/src/share/classes/java/util/stream/TerminalSink.java Thu Apr 04 20:18:30 2013 -0400 +++ b/src/share/classes/java/util/stream/TerminalSink.java Thu Apr 04 22:28:40 2013 -0400 @@ -27,8 +27,8 @@ import java.util.function.Supplier; /** - * A Sink which accumulates state as elements are accepted, and allows a result - * to be retrieved after the computation is finished. + * A {@link Sink} which accumulates state as elements are accepted, and allows + * a result to be retrieved after the computation is finished. * * @param <T> The type of elements to be accepted * @param <R> The type of the result