OpenJDK / lambda / lambda / jdk
changeset 7793:47076955dac1
Separate lazy and eager parallel evaluation in IntermediateOp
author | briangoetz |
---|---|
date | Mon, 01 Apr 2013 18:36:28 -0400 |
parents | 2c96c1334dac |
children | 467e9e7b1fe0 |
files | src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/IntermediateOp.java src/share/classes/java/util/stream/PipelineHelper.java test-ng/bootlib/java/util/stream/CollectorOps.java |
diffstat | 4 files changed, 72 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java Mon Apr 01 18:31:31 2013 -0400 +++ b/src/share/classes/java/util/stream/AbstractPipeline.java Mon Apr 01 18:36:28 2013 -0400 @@ -95,13 +95,13 @@ /** * The source spliterator. Only valid for the head pipeline. - * Before the pipeline is consumed if non-null then {@code sourceSupplier} is be null. + * Before the pipeline is consumed if non-null then {@code sourceSupplier} must be null. * After the pipeline is consumed if non-null then is set to null. */ private Spliterator<?> sourceSpliterator; /** * The source supplier. Only valid for the head pipeline. - * Before the pipeline is consumed if non-null then {@code sourceSpliterator} is be null. + * Before the pipeline is consumed if non-null then {@code sourceSpliterator} must be null. * After the pipeline is consumed if non-null then is set to null. */ private Supplier<? extends Spliterator<?>> sourceSupplier; @@ -187,30 +187,39 @@ * @return the new stream. */ @SuppressWarnings("unchecked") - public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>> S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) { + public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>> + S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) { return (S_NEXT) chain(this, newOp); } /** Specialized version of pipeline for stateless reference-bearing intermediate ops */ - protected<V> Stream<V> chainedToRef(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) { + protected<V> Stream<V> chainedToRef(int opFlags, + StreamShape inputShape, + SinkWrapper<E_OUT> sinkWrapper) { return new ReferencePipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.REFERENCE, sinkWrapper)); } /** Specialized version of pipeline for stateless int-bearing intermediate ops */ - protected IntStream chainedToInt(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) { + protected IntStream chainedToInt(int opFlags, + StreamShape inputShape, + SinkWrapper<E_OUT> sinkWrapper) { return new IntPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.INT_VALUE, sinkWrapper)); } /** Specialized version of pipeline for stateless long-bearing intermediate ops */ - protected LongStream chainedToLong(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) { + protected LongStream chainedToLong(int opFlags, + StreamShape inputShape, + SinkWrapper<E_OUT> sinkWrapper) { return new LongPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.LONG_VALUE, sinkWrapper)); } /** Specialized version of pipeline for stateless double-bearing intermediate ops */ - protected DoubleStream chainedToDouble(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) { + protected DoubleStream chainedToDouble(int opFlags, + StreamShape inputShape, + SinkWrapper<E_OUT> sinkWrapper) { return new DoublePipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.DOUBLE_VALUE, sinkWrapper)); } @@ -276,9 +285,10 @@ } else { if (sourceStage.sourceAnyParChange) { - for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage; p != null; u = p, p = p.nextStage) { + for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage; + p != null; + u = p, p = p.nextStage) { p.combinedFlags = StreamOpFlag.combineOpFlags(p.op.getOpFlags(), u.combinedFlags); - p.depth = depth++; } } } @@ -343,7 +353,8 @@ public S sequential() { if (StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) { sourceStage.sourceAnyParChange = true; - sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL, sourceStage.combinedFlags); + sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL, + sourceStage.combinedFlags); } return (S) this; } @@ -352,7 +363,8 @@ public S parallel() { if (!StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) { sourceStage.sourceAnyParChange = true; - sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL, sourceStage.combinedFlags); + sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL, + sourceStage.combinedFlags); } return (S) this; } @@ -375,7 +387,8 @@ * @return the node holding elements output from the pipeline. */ protected abstract<P_IN> Node<E_OUT> collect(PipelineHelper<P_IN, E_OUT> helper, - boolean flattenTree, IntFunction<E_OUT[]> generator); + boolean flattenTree, + IntFunction<E_OUT[]> generator); /** * Flatten a node. @@ -425,7 +438,8 @@ * implementations supporting primitive nodes then this parameter may be ignored. * @return the node builder. */ - protected abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator); + protected abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, + IntFunction<E_OUT[]> generator); /** * Create a new pipeline by chaining an intermediate operation to an upstream pipeline. @@ -439,7 +453,8 @@ * @param <V> the type of elements output from the new pipeline. * @return a the new pipeline. */ - static <U, V> AbstractPipeline<U, V, ?> chain(AbstractPipeline<?, U, ?> upstream, IntermediateOp<U, V> op) { + static <U, V> AbstractPipeline<U, V, ?> chain(AbstractPipeline<?, U, ?> upstream, + IntermediateOp<U, V> op) { switch (op.outputShape()) { case REFERENCE: return new ReferencePipeline<>(upstream, op); case INT_VALUE: return new IntPipeline(upstream, op); @@ -506,7 +521,10 @@ private final StreamShape inputShape, outputShape; private final SinkWrapper<T> sinkWrapper; - StatelessOp(int opFlags, StreamShape inputShape, StreamShape outputShape, SinkWrapper<T> wrapper) { + StatelessOp(int opFlags, + StreamShape inputShape, + StreamShape outputShape, + SinkWrapper<T> wrapper) { this.opFlags = opFlags; this.inputShape = inputShape; this.outputShape = outputShape; @@ -579,12 +597,11 @@ if (isParallel()) { // Find the last stateful op AbstractPipeline p = AbstractPipeline.this; - while (p.op != null && !p.op.isStateful()) { + while (p.depth > 0) p = p.previousStage; - } if (p.op != null) { PipelineHelperImpl helper = p.previousStage.new PipelineHelperImpl(); - return p.op.evaluateParallel(helper, objectArrayGenerator()).spliterator(); + return p.op.evaluateParallelLazy(helper); } } @@ -614,7 +631,8 @@ } @Override - public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator) { + public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, + IntFunction<E_OUT[]> generator) { return AbstractPipeline.this.makeNodeBuilder(exactSizeIfKnown, generator); } @@ -634,7 +652,8 @@ @Override public Node<E_OUT> evaluateSequential(IntermediateOp<E_OUT, E_OUT> op, - Spliterator<P_IN> sourceSpliterator, IntFunction<E_OUT[]> generator) { + Spliterator<P_IN> sourceSpliterator, + IntFunction<E_OUT[]> generator) { long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags()) ? exactOutputSizeIfKnown(sourceSpliterator) : -1;
--- a/src/share/classes/java/util/stream/IntermediateOp.java Mon Apr 01 18:31:31 2013 -0400 +++ b/src/share/classes/java/util/stream/IntermediateOp.java Mon Apr 01 18:36:28 2013 -0400 @@ -24,6 +24,7 @@ */ package java.util.stream; +import java.util.Spliterator; import java.util.function.IntFunction; /** @@ -152,7 +153,7 @@ /** * Performs a parallel evaluation of the operation using the specified - * {@code PipelineHelper}, which describes the stream source and upstream + * {@code PipelineHelper} which describes the stream source and upstream * intermediate operations. Only called on stateful operations. If * {@link #isStateful()} returns true then implementations must override the * default implementation. @@ -164,7 +165,29 @@ * @param generator * @return a {@code Node} describing the result of the evaluation */ - default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper, IntFunction<E_OUT[]> generator) { + default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper, + IntFunction<E_OUT[]> generator) { throw new UnsupportedOperationException("Parallel evaluation is not supported"); } + + /** + * Returns a {@code Spliterator} describing a parallel evaluation of the operation using + * the specified {@code PipelineHelper} which describes the stream source and upstream + * intermediate operations. Only called on stateful operations. It is not necessary + * (though acceptable) to do a full computation of the result here; it is preferable, if + * possible, to describe the result via a lazily evaluated spliterator. + * + * @implSpec The default implementation behaves as if: + * <pre> + * return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator(); + * </pre> + * and is suitable for implementations that cannot do better than a full synchronous + * evaluation. + * + * @param helper the pipeline helper + * @return a {@code Spliterator} describing the result of the evaluation + */ + default <P_IN> Spliterator<E_OUT> evaluateParallelLazy(PipelineHelper<P_IN, E_OUT> helper) { + return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator(); + } }
--- a/src/share/classes/java/util/stream/PipelineHelper.java Mon Apr 01 18:31:31 2013 -0400 +++ b/src/share/classes/java/util/stream/PipelineHelper.java Mon Apr 01 18:36:28 2013 -0400 @@ -170,7 +170,8 @@ * @return A {@code Node.Builder} compatible with the output shape of this * {@code PipelineHelper} */ - Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator); + Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, + IntFunction<P_OUT[]> generator); /** * Collects all output elements resulting from applying the pipeline stages @@ -224,5 +225,6 @@ * @return A {@code Node} containing the output of the stream pipeline */ Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op, - Spliterator<P_IN> sourceSpliterator, IntFunction<P_OUT[]> generator); + Spliterator<P_IN> sourceSpliterator, + IntFunction<P_OUT[]> generator); }
--- a/test-ng/bootlib/java/util/stream/CollectorOps.java Mon Apr 01 18:31:31 2013 -0400 +++ b/test-ng/bootlib/java/util/stream/CollectorOps.java Mon Apr 01 18:36:28 2013 -0400 @@ -69,7 +69,8 @@ } @Override - public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper, IntFunction<E_IN[]> generator) { + public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper, + IntFunction<E_IN[]> generator) { return helper.collectOutput(false, generator); } } @@ -84,7 +85,8 @@ } @Override - public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper, IntFunction<T[]> generator) { + public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper, + IntFunction<T[]> generator) { int flags = helper.getStreamAndOpFlags(); Assert.assertTrue(StreamOpFlag.SIZED.isKnown(flags));