changeset 7793:47076955dac1

Separate lazy and eager parallel evaluation in IntermediateOp
author briangoetz
date Mon, 01 Apr 2013 18:36:28 -0400
parents 2c96c1334dac
children 467e9e7b1fe0
files src/share/classes/java/util/stream/AbstractPipeline.java src/share/classes/java/util/stream/IntermediateOp.java src/share/classes/java/util/stream/PipelineHelper.java test-ng/bootlib/java/util/stream/CollectorOps.java
diffstat 4 files changed, 72 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Mon Apr 01 18:31:31 2013 -0400
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Mon Apr 01 18:36:28 2013 -0400
@@ -95,13 +95,13 @@
 
     /**
      * The source spliterator. Only valid for the head pipeline.
-     * Before the pipeline is consumed if non-null then {@code sourceSupplier} is be null.
+     * Before the pipeline is consumed if non-null then {@code sourceSupplier} must be null.
      * After the pipeline is consumed if non-null then is set to null.
      */
     private Spliterator<?> sourceSpliterator;
     /**
      * The source supplier. Only valid for the head pipeline.
-     * Before the pipeline is consumed if non-null then {@code sourceSpliterator} is be null.
+     * Before the pipeline is consumed if non-null then {@code sourceSpliterator} must be null.
      * After the pipeline is consumed if non-null then is set to null.
      */
     private Supplier<? extends Spliterator<?>> sourceSupplier;
@@ -187,30 +187,39 @@
      * @return the new stream.
      */
     @SuppressWarnings("unchecked")
-    public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>> S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) {
+    public <E_NEXT, S_NEXT extends BaseStream<E_NEXT, S_NEXT>>
+    S_NEXT pipeline(IntermediateOp<E_OUT, E_NEXT> newOp) {
         return (S_NEXT) chain(this, newOp);
     }
 
     /** Specialized version of pipeline for stateless reference-bearing intermediate ops */
-    protected<V> Stream<V> chainedToRef(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) {
+    protected<V> Stream<V> chainedToRef(int opFlags,
+                                        StreamShape inputShape,
+                                        SinkWrapper<E_OUT> sinkWrapper) {
         return new ReferencePipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.REFERENCE,
                                                                sinkWrapper));
     }
 
     /** Specialized version of pipeline for stateless int-bearing intermediate ops */
-    protected IntStream chainedToInt(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) {
+    protected IntStream chainedToInt(int opFlags,
+                                     StreamShape inputShape,
+                                     SinkWrapper<E_OUT> sinkWrapper) {
         return new IntPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.INT_VALUE,
                                                          sinkWrapper));
     }
 
     /** Specialized version of pipeline for stateless long-bearing intermediate ops */
-    protected LongStream chainedToLong(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) {
+    protected LongStream chainedToLong(int opFlags,
+                                       StreamShape inputShape,
+                                       SinkWrapper<E_OUT> sinkWrapper) {
         return new LongPipeline<>(this, new StatelessOp<>(opFlags, inputShape, StreamShape.LONG_VALUE,
                                                           sinkWrapper));
     }
 
     /** Specialized version of pipeline for stateless double-bearing intermediate ops */
-    protected DoubleStream chainedToDouble(int opFlags, StreamShape inputShape, SinkWrapper<E_OUT> sinkWrapper) {
+    protected DoubleStream chainedToDouble(int opFlags,
+                                           StreamShape inputShape,
+                                           SinkWrapper<E_OUT> sinkWrapper) {
         return new DoublePipeline<>(this, new StatelessOp<>(opFlags, inputShape,
                                                             StreamShape.DOUBLE_VALUE, sinkWrapper));
     }
@@ -276,9 +285,10 @@
         }
         else {
             if (sourceStage.sourceAnyParChange) {
-                for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage; p != null; u = p, p = p.nextStage) {
+                for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
+                     p != null;
+                     u = p, p = p.nextStage) {
                     p.combinedFlags = StreamOpFlag.combineOpFlags(p.op.getOpFlags(), u.combinedFlags);
-                    p.depth = depth++;
                 }
             }
         }
@@ -343,7 +353,8 @@
     public S sequential() {
         if (StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
             sourceStage.sourceAnyParChange = true;
-            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL, sourceStage.combinedFlags);
+            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.NOT_PARALLEL,
+                                                                    sourceStage.combinedFlags);
         }
         return (S) this;
     }
@@ -352,7 +363,8 @@
     public S parallel() {
         if (!StreamOpFlag.PARALLEL.isKnown(sourceStage.combinedFlags)) {
             sourceStage.sourceAnyParChange = true;
-            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL, sourceStage.combinedFlags);
+            sourceStage.combinedFlags = StreamOpFlag.combineOpFlags(StreamOpFlag.IS_PARALLEL,
+                                                                    sourceStage.combinedFlags);
         }
         return (S) this;
     }
@@ -375,7 +387,8 @@
      * @return the node holding elements output from the pipeline.
      */
     protected abstract<P_IN> Node<E_OUT> collect(PipelineHelper<P_IN, E_OUT> helper,
-                                                 boolean flattenTree, IntFunction<E_OUT[]> generator);
+                                                 boolean flattenTree,
+                                                 IntFunction<E_OUT[]> generator);
 
     /**
      * Flatten a node.
@@ -425,7 +438,8 @@
      *                  implementations supporting primitive nodes then this parameter may be ignored.
      * @return the node builder.
      */
-    protected abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator);
+    protected abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
+                                                           IntFunction<E_OUT[]> generator);
 
     /**
      * Create a new pipeline by chaining an intermediate operation to an upstream pipeline.
@@ -439,7 +453,8 @@
      * @param <V> the type of elements output from the new pipeline.
      * @return a the new pipeline.
      */
-    static <U, V> AbstractPipeline<U, V, ?> chain(AbstractPipeline<?, U, ?> upstream, IntermediateOp<U, V> op) {
+    static <U, V> AbstractPipeline<U, V, ?> chain(AbstractPipeline<?, U, ?> upstream,
+                                                  IntermediateOp<U, V> op) {
         switch (op.outputShape()) {
             case REFERENCE:    return new ReferencePipeline<>(upstream, op);
             case INT_VALUE:    return new IntPipeline(upstream, op);
@@ -506,7 +521,10 @@
         private final StreamShape inputShape, outputShape;
         private final SinkWrapper<T> sinkWrapper;
 
-        StatelessOp(int opFlags, StreamShape inputShape, StreamShape outputShape, SinkWrapper<T> wrapper) {
+        StatelessOp(int opFlags,
+                    StreamShape inputShape,
+                    StreamShape outputShape,
+                    SinkWrapper<T> wrapper) {
             this.opFlags = opFlags;
             this.inputShape = inputShape;
             this.outputShape = outputShape;
@@ -579,12 +597,11 @@
             if (isParallel()) {
                 // Find the last stateful op
                 AbstractPipeline p = AbstractPipeline.this;
-                while (p.op != null && !p.op.isStateful()) {
+                while (p.depth > 0)
                     p = p.previousStage;
-                }
                 if (p.op != null) {
                     PipelineHelperImpl helper = p.previousStage.new PipelineHelperImpl();
-                    return p.op.evaluateParallel(helper, objectArrayGenerator()).spliterator();
+                    return p.op.evaluateParallelLazy(helper);
                 }
             }
 
@@ -614,7 +631,8 @@
         }
 
         @Override
-        public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator) {
+        public Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
+                                                   IntFunction<E_OUT[]> generator) {
             return AbstractPipeline.this.makeNodeBuilder(exactSizeIfKnown, generator);
         }
 
@@ -634,7 +652,8 @@
 
         @Override
         public Node<E_OUT> evaluateSequential(IntermediateOp<E_OUT, E_OUT> op,
-                                              Spliterator<P_IN> sourceSpliterator, IntFunction<E_OUT[]> generator) {
+                                              Spliterator<P_IN> sourceSpliterator,
+                                              IntFunction<E_OUT[]> generator) {
             long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags())
                                ? exactOutputSizeIfKnown(sourceSpliterator)
                                : -1;
--- a/src/share/classes/java/util/stream/IntermediateOp.java	Mon Apr 01 18:31:31 2013 -0400
+++ b/src/share/classes/java/util/stream/IntermediateOp.java	Mon Apr 01 18:36:28 2013 -0400
@@ -24,6 +24,7 @@
  */
 package java.util.stream;
 
+import java.util.Spliterator;
 import java.util.function.IntFunction;
 
 /**
@@ -152,7 +153,7 @@
 
     /**
      * Performs a parallel evaluation of the operation using the specified
-     * {@code PipelineHelper}, which describes the stream source and upstream
+     * {@code PipelineHelper} which describes the stream source and upstream
      * intermediate operations.  Only called on stateful operations.  If
      * {@link #isStateful()} returns true then implementations must override the
      * default implementation.
@@ -164,7 +165,29 @@
      * @param generator
      * @return a {@code Node} describing the result of the evaluation
      */
-    default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper, IntFunction<E_OUT[]> generator) {
+    default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper,
+                                                IntFunction<E_OUT[]> generator) {
         throw new UnsupportedOperationException("Parallel evaluation is not supported");
     }
+
+    /**
+     * Returns a {@code Spliterator} describing a parallel evaluation of the operation using
+     * the specified {@code PipelineHelper} which describes the stream source and upstream
+     * intermediate operations.  Only called on stateful operations.  It is not necessary
+     * (though acceptable) to do a full computation of the result here; it is preferable, if
+     * possible, to describe the result via a lazily evaluated spliterator.
+     *
+     * @implSpec The default implementation behaves as if:
+     * <pre>
+     *     return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator();
+     * </pre>
+     * and is suitable for implementations that cannot do better than a full synchronous
+     * evaluation.
+     *
+     * @param helper the pipeline helper
+     * @return a {@code Spliterator} describing the result of the evaluation
+     */
+    default <P_IN> Spliterator<E_OUT> evaluateParallelLazy(PipelineHelper<P_IN, E_OUT> helper) {
+        return evaluateParallel(helper, i -> (E_OUT[]) new Object[i]).spliterator();
+    }
 }
--- a/src/share/classes/java/util/stream/PipelineHelper.java	Mon Apr 01 18:31:31 2013 -0400
+++ b/src/share/classes/java/util/stream/PipelineHelper.java	Mon Apr 01 18:36:28 2013 -0400
@@ -170,7 +170,8 @@
      * @return A {@code Node.Builder} compatible with the output shape of this
      *         {@code PipelineHelper}
      */
-    Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
+    Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
+                                        IntFunction<P_OUT[]> generator);
 
     /**
      * Collects all output elements resulting from applying the pipeline stages
@@ -224,5 +225,6 @@
      * @return A {@code Node} containing the output of the stream pipeline
      */
     Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op,
-                                   Spliterator<P_IN> sourceSpliterator, IntFunction<P_OUT[]> generator);
+                                   Spliterator<P_IN> sourceSpliterator,
+                                   IntFunction<P_OUT[]> generator);
 }
--- a/test-ng/bootlib/java/util/stream/CollectorOps.java	Mon Apr 01 18:31:31 2013 -0400
+++ b/test-ng/bootlib/java/util/stream/CollectorOps.java	Mon Apr 01 18:36:28 2013 -0400
@@ -69,7 +69,8 @@
         }
 
         @Override
-        public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper, IntFunction<E_IN[]> generator) {
+        public <P_IN> Node<E_IN> evaluateParallel(PipelineHelper<P_IN, E_IN> helper,
+                                                  IntFunction<E_IN[]> generator) {
             return helper.collectOutput(false, generator);
         }
     }
@@ -84,7 +85,8 @@
         }
 
         @Override
-        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper, IntFunction<T[]> generator) {
+        public <P_IN> Node<T> evaluateParallel(PipelineHelper<P_IN, T> helper,
+                                               IntFunction<T[]> generator) {
             int flags = helper.getStreamAndOpFlags();
 
             Assert.assertTrue(StreamOpFlag.SIZED.isKnown(flags));