changeset 7806:2ffb91b59273

Migrate Collector from having multiple boolean properties to a single Set<Characteristics>; update Collector docs
author briangoetz
date Wed, 03 Apr 2013 16:12:46 -0400
parents f7fb4e3d6a20
children 68138df9be76
files src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/ReferencePipeline.java
diffstat 3 files changed, 160 insertions(+), 106 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collector.java	Wed Apr 03 21:51:55 2013 +0200
+++ b/src/share/classes/java/util/stream/Collector.java	Wed Apr 03 16:12:46 2013 -0400
@@ -24,72 +24,87 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
- * A reduction operation that supports folding input elements into a cumulative result.  The result may be a value
- * or may be a mutable result container.  Examples of operations accumulating results into a mutable result
- * container include: accumulating input elements into a {@code Collection}; concatenating strings into
- * a {@code StringBuilder}; computing summary information about elements such as sum, min, max, or average;
- * computing "pivot table" summaries such as "maximum valued transaction by seller", etc.  Reduction operations
- * can be performed either sequentially or in parallel.
+ * A <a href="package-summary.html#Reduction">reduction operation</a> that supports folding
+ * input elements into a cumulative result.  The result may be a value or may be a mutable
+ * result container.  Examples of operations accumulating results into a mutable result
+ * container include: accumulating input elements into a {@code Collection}; concatenating
+ * strings into a {@code StringBuilder}; computing summary information about elements such as
+ * sum, min, max, or average; computing "pivot table" summaries such as "maximum valued
+ * transaction by seller", etc.  Reduction operations can be performed either sequentially or
+ * in parallel.
  *
- * <p>A {@code Collector} has three functions that perform: creation of an initial result,
- * incorporating a new data element into a result, and combining two result into one.
- * The last function -- combining two results into one -- is used during parallel operations, where we
- * collect subsets of the input in parallel, and the merge the subresults into a combined result.
- * The result may be a mutable container or a value.  If the result is mutable, the accumulation and combination functions
- * may either mutate their left argument and return that (such as adding elements to a collection),
- * or return a new result (in which case it should not mutate anything).
+ * <p>A {@code Collector} is specified by three functions that work together to manage
+ * a result or result container.  They are: creation of an initial result, incorporating
+ * a new data element into a result, and combining two result into one. The last function
+ * -- combining two results into one -- is used during parallel operations, where subsets
+ * of the input are collected in parallel, and then the subresults merged into a combined
+ * result. The result may be a mutable container or a value.  If the result is mutable,
+ * the accumulation and combination functions may either mutate their left argument and return
+ * that (such as adding elements to a collection), or return a new result, in which case it
+ * should not perform any mutation.
+ *
+ * <p>Collectors also have a set of characteristics, including {@link Characteristics#CONCURRENT}
+ * and {@link Characteristics#STRICTLY_MUTATIVE}.  These characteristics provide hints that
+ * can be used by a reduction implementation to provide better performance.
  *
- * <p>Libraries that implement reduction based on {@code Collector}, such as the {@link Stream#collect(Collector)}
- * must adhere to the following constraints:
+ * <p>Libraries that implement reduction based on {@code Collector}, such as the
+ * {@link Stream#collect(Collector)} must adhere to the following constraints:
  * <ul>
- *     <li>The first argument passed to the accumulator function, and both arguments passed to the combiner
- *     function, must be the result of of a previous invocation of {@link #resultSupplier()}, {@link #accumulator()}, or
- *     {@link #combiner()}.</li>
- *     <li>The implementation should not do anything with the result of any of the result supplier, accumulator,
- *     or combiner functions other than to pass them again to the accumulator or combiner functions,
- *     or return them to the caller of the reduction operation.</li>
- *     <li>If a result is passed to the accumulator or combiner function, and the same object is not returned
- *     from that function, it is never used again.</li>
- *     <li>Once a result is passed to the combiner function, it is never passed to the accumulator function again.</li>
- *     <li>For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner
- *     functions must be serially thread-confined.</li>
+ *     <li>The first argument passed to the accumulator function, and both arguments passed
+ *     to the combiner function, must be the result of of a previous invocation of
+ *     {@link #resultSupplier()}, {@link #accumulator()}, or {@link #combiner()}.</li>
+ *     <li>The implementation should not do anything with the result of any of the result
+ *     supplier, accumulator, or combiner functions other than to pass them again to the
+ *     accumulator or combiner functions, or return them to the caller of the reduction
+ *     operation.</li>
+ *     <li>If a result is passed to the accumulator or combiner function, and the same object
+ *     is not returned from that function, it is never used again.</li>
+ *     <li>Once a result is passed to the combiner function, it is never passed to the
+ *     accumulator function again.</li>
+ *     <li>For non-concurrent collectors, any result returned from the result supplier,
+ *     accumulator, or combiner functions must be serially thread-confined.  This enables
+ *     collection to occur in parallel without the {@code Collector} needing to implement
+ *     any additional synchronization.  The reduction implementation must manage that
+ *     the input is properly partitioned, that partitions are processed in isolation,
+ *     and combining happens only after accumulation is complete.</li>
+ *     <li>For concurrent collectors, an implementation is free to (but not required to)
+ *     implement reduction concurrently.  A concurrent collection is one where the
+ *     accumulator function is called concurrently from multiple threads, rather than
+ *     keeping the result isolated during accumulation.</li>
  * </ul>
  *
- * <p>Additionally,
- * collectors have two properties, {@code isConcurrent} and {@code isStable}.  A concurrent collector is one
- * for which it is safe to invoke the accumulator function on the same result container concurrently; a stable
- * collector is one where the accumulator function always returns the result container that was passed to it.
- * Concurrent collectors are always stable.
- *
  * @apiNote
- * <p>Performing a reduction operation with a {@code Collector} should produce a result equivalent to:
+ * <p>Performing a reduction operation with a {@code Collector} should produce a result
+ * equivalent to:
  * <pre>
  *     BiFunction<R,T,R> accumulator = collector.accumulator();
  *     R result = collector.resultSupplier().get();
- *     for (T t : inputSource)
+ *     for (T t : data)
  *         result = accumulator.apply(result, t);
  *     return result;
  * </pre>
  *
- * However, the library is free to partition the input, perform the reduction on the partitions, and then use the
- * combiner function to combine the partial results to achieve a parallel reduction.  Depending on the specific
- * reduction operation, this may perform better or worse, depending on the relative cost of the accumulator
- * and combiner functions.
+ * However, the library is free to partition the input, perform the reduction on the partitions,
+ * and then use the combiner function to combine the partial results to achieve a parallel
+ * reduction.  Depending on the specific reduction operation, this may perform better or worse,
+ * depending on the relative cost of the accumulator and combiner functions.
  *
- * <p>An example of an operation that can be easily modeled by {@code Collector} is accumulating elements into a
- * {@code TreeSet}. In this case, the @{code resultSupplier()} function is {@code new Treeset<T>()}, the
- * {@code accumulator} function is {@code (set, element) -> { set.add(element); return set; }}, and the combiner
- * function is {@code (left, right) -> { left.addAll(right); return left; }}.  (This behavior is implemented by
- * the method {@code Collectors.toCollection(TreeSet::new)}).
+ * <p>An example of an operation that can be easily modeled by {@code Collector} is accumulating
+ * elements into a {@code TreeSet}. In this case, the @{code resultSupplier()} function is
+ * {@code () -> new Treeset<T>()}, the {@code accumulator} function is
+ * {@code (set, element) -> { set.add(element); return set; }}, and the combiner function is
+ * {@code (left, right) -> { left.addAll(right); return left; }}.  (This behavior is
+ * implemented by the method {@code Collectors.toCollection(TreeSet::new)}).
  *
- * <p>The {@code Collector}
- *
- * @@@ Document concurrent behavior and interaction with ordering
+ * TODO Document concurrent behavior and interaction with ordering
+ * TODO Associativity and commutativity
  *
  * @see Stream#collect(Collector)
  * @see Stream#collectUnordered(Collector)
@@ -101,46 +116,75 @@
  */
 public interface Collector<T, R> {
     /**
-     * A function that creates and return a new result that represents "no values".  If the accumulator or combiner
-     * functions may mutate their arguments, this must be a new, empty result container.
+     * A function that creates and return a new result that represents "no values".
+     * If the accumulator or combiner functions may mutate their arguments, this must
+     * be a new, empty result container.
      *
      * @return A function which, when invoked, returns a result representing "no values"
      */
     Supplier<R> resultSupplier();
 
     /**
-     * @@@ needs update @@@
-     * @@@ Interaction with stability
-     * @@@ associativity @@@
-     * A function that accepts a result container and a value and incorporates the value into the container.
-     * @return A function which, when invoked with a result container and a value, modifies the state of the result
-     * container to reflect incorporation of the new value
+     * A function that folds a new value into a cumulative result.  The result may be a
+     * mutable result container or a value.  The accumulator function may modify a mutable
+     * container and return it, or create a new result and return that, but it should not
+     * modify a provided mutable container and return a different object.
+     *
+     * <p>If the collector has the {@link Characteristics#STRICTLY_MUTATIVE} characteristic,
+     * then the accumulator function <em>must</em> always return its first argument, after
+     * possibly  mutating its state.
+     *
+     * @return A function which folds a new value into a cumulative results
      *
      * MUST either mutate and return same, or NOT mutate
      */
     BiFunction<R, T, R> accumulator();
 
     /**
-     * A function that accepts two result containers and combines their states.  It may return a new container,
-     * or may merge the state of one container into the other, and return the modified container.
-     * @return A function which, when invoked with two result containers and combines their states.
+     * A function that accepts two partial results and merges them.  It may fold state from one
+     * argument into the other and return that, or may return a new result, but if it returns
+     * a new result, should not modify the state of either of its arguments.
+     *
+     * <p>If the collector has the {@link Characteristics#STRICTLY_MUTATIVE} characteristic,
+     * then the combiner function <em>must</em> always return its first argument, after
+     * possibly mutating its state.
+     *
+     * @return A function which combines two partial results into a cumulative result
      */
     BinaryOperator<R> combiner();
 
     /**
-     * Indicates that this collector is <em>concurrent</em>, meaning that the result container can support the
-     * accumulator function being called concurrently with the same result container from multiple threads.
-     * Concurrent collectors must always be stable.
-     *
-     * @return True if this collector supports concurrent accumulation
+     * Returns a {@code Set} of {@code Collector.Characteristics}.  This set
+     * should be immutable.
+     * @return An immutable set of collector characteristics
      */
-    default boolean isConcurrent() { return false; }
+    default Set<Characteristics> characteristics() {
+        return Collections.emptySet();
+    }
 
-    /**
-     * Indicates that this collector is <em>stable</em>, meaning that the accumulator function always returns
-     * the result container passed in as its first argument.
-     *
-     * @return True if this collector is stable
-     */
-    default boolean isStable() { return false; }
+    enum Characteristics {
+        /**
+         * Indicates that this collector is <em>concurrent</em>, meaning that the result
+         * container can support the accumulator function being called concurrently with
+         * the same result container from multiple threads. Concurrent collectors must also
+         * always have the STRICTLY_MUTATIVE characteristic.
+         *
+         * <p>Because a concurrent collection cannot guarantee that the elements will be
+         * presented to the accumulator function in encounter order, a concurrent collector
+         * must represent a combining operation that is not only
+         * <a href="package-summary.html#Associativity">associative</a>, but also commutative.
+         */
+        CONCURRENT,
+        /**
+         * Indicates that the result container has no intrinsic order, such as a {@link Set}.
+         */
+        UNORDERED,
+        /**
+         * Indicates that this collector operates by strict mutation of its result container.
+         * This means that the {@link #accumulator()} and {@link #combiner()} functions will
+         * always modify the state of and return their first argument, rather than returning
+         * a different result container.
+         */
+        STRICTLY_MUTATIVE
+    }
 }
--- a/src/share/classes/java/util/stream/Collectors.java	Wed Apr 03 21:51:55 2013 +0200
+++ b/src/share/classes/java/util/stream/Collectors.java	Wed Apr 03 16:12:46 2013 -0400
@@ -30,6 +30,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.DoubleSummaryStatistics;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IntSummaryStatistics;
@@ -60,6 +61,15 @@
  */
 public final class Collectors {
 
+    private static final Set<Collector.Characteristics> CH_CONCURRENT
+            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
+                                                     Collector.Characteristics.STRICTLY_MUTATIVE));
+    private static final Set<Collector.Characteristics> CH_STRICT
+            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.STRICTLY_MUTATIVE));
+    private static final Set<Collector.Characteristics> CH_STRICT_UNORDERED
+            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.STRICTLY_MUTATIVE,
+                                                     Collector.Characteristics.UNORDERED));
+
     private Collectors() {}
 
     /**
@@ -78,19 +88,22 @@
         private final Supplier<R> resultSupplier;
         private final BiFunction<R, T, R> accumulator;
         private final BinaryOperator<R> combiner;
-        private final boolean isConcurrent;
-        private final boolean isStable;
+        private final Set<Characteristics> characteristics;
 
         CollectorImpl(Supplier<R> resultSupplier,
                       BiFunction<R, T, R> accumulator,
                       BinaryOperator<R> combiner,
-                      boolean isConcurrent,
-                      boolean isStable) {
+                      Set<Characteristics> characteristics) {
             this.resultSupplier = resultSupplier;
             this.accumulator = accumulator;
             this.combiner = combiner;
-            this.isConcurrent = isConcurrent;
-            this.isStable = isStable;
+            this.characteristics = characteristics;
+        }
+
+        CollectorImpl(Supplier<R> resultSupplier,
+                      BiFunction<R, T, R> accumulator,
+                      BinaryOperator<R> combiner) {
+            this(resultSupplier, accumulator, combiner, Collections.emptySet());
         }
 
         @Override
@@ -109,13 +122,8 @@
         }
 
         @Override
-        public boolean isConcurrent() {
-            return isConcurrent;
-        }
-
-        @Override
-        public boolean isStable() {
-            return isStable;
+        public Set<Characteristics> characteristics() {
+            return characteristics;
         }
     }
 
@@ -135,7 +143,7 @@
         return new CollectorImpl<>(collectionFactory,
                                    (r, t) -> { r.add(t); return r; },
                                    (r1, r2) -> { r1.addAll(r2); return r1; },
-                                   false, true);
+                                   CH_STRICT);
     }
 
     /**
@@ -177,7 +185,7 @@
                     return left;
             }
         };
-        return new CollectorImpl<>(Collections::emptyList, accumulator, combiner, false, false);
+        return new CollectorImpl<>(Collections::emptyList, accumulator, combiner);
     }
 
     /**
@@ -190,9 +198,10 @@
      */
     public static<T>
     Collector<T, Set<T>> toSet() {
-        // @@@ Declare that the collector is NOT_ORDERED so the reduce op can declare NOT_ORDERED in
-        //     the terminal op flags
-        return toCollection(HashSet::new);
+        return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new,
+                                   (r, t) -> { r.add(t); return r; },
+                                   (r1, r2) -> { r1.addAll(r2); return r1; },
+                                   CH_STRICT_UNORDERED);
     }
 
     /**
@@ -204,7 +213,8 @@
     public static Collector<String, StringBuilder> toStringBuilder() {
         return new CollectorImpl<>(StringBuilder::new,
                                    (r, t) -> { r.append(t); return r; },
-                                   (r1, r2) -> { r1.append(r2); return r1; }, false, true);
+                                   (r1, r2) -> { r1.append(r2); return r1; },
+                                   CH_STRICT);
     }
 
     /**
@@ -221,7 +231,7 @@
             return sj;
         };
         return new CollectorImpl<>(() -> new StringJoiner(separator), (r, t) -> { r.add(t); return r; },
-                                   merger, false, true);
+                                   merger, CH_STRICT);
     }
 
     private static<K, V, M extends Map<K,V>> BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) {
@@ -261,7 +271,7 @@
         BiFunction<R, U, R> downstreamAccumulator = downstream.accumulator();
         return new CollectorImpl<>(downstream.resultSupplier(),
                                    (r, t) -> downstreamAccumulator.apply(r, mapper.apply(t)),
-                                   downstream.combiner(), downstream.isConcurrent(), downstream.isStable());
+                                   downstream.combiner(), downstream.characteristics());
     }
 
     /**
@@ -286,7 +296,7 @@
      */
     public static <T> Collector<T, T>
     reducing(BinaryOperator<T> op) {
-        return new CollectorImpl<>(() -> null, (r, t) -> (r == null ? t : op.apply(r, t)), op, false, false);
+        return new CollectorImpl<>(() -> null, (r, t) -> (r == null ? t : op.apply(r, t)), op);
     }
 
     /**
@@ -321,7 +331,7 @@
                              BinaryOperator<U> op) {
         return new CollectorImpl<>(() -> null,
                                    (r, t) -> (r == null ? mapper.apply(t) : op.apply(r, mapper.apply(t))),
-                                   op, false, false);
+                                   op);
     }
 
     /**
@@ -430,7 +440,7 @@
                 m.put(key, newContainer);
             return m;
         };
-        return new CollectorImpl<>(mapFactory, accumulator, mapMerger(downstream.combiner()), false, true);
+        return new CollectorImpl<>(mapFactory, accumulator, mapMerger(downstream.combiner()), CH_STRICT);
     }
 
     public static<T, K>
@@ -457,15 +467,15 @@
         Supplier<D> downstreamSupplier = downstream.resultSupplier();
         BiFunction<D, T, D> downstreamAccumulator = downstream.accumulator();
         BinaryOperator<M> combiner = mapMerger(downstream.combiner());
-        if (downstream.isConcurrent()) {
+        if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
             BiFunction<M, T, M> accumulator = (m, t) -> {
                 K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                 downstreamAccumulator.apply(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t);
                 return m;
             };
-            return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true);
+            return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT);
         }
-        else if (downstream.isStable()) {
+        else if (downstream.characteristics().contains(Collector.Characteristics.STRICTLY_MUTATIVE)) {
             BiFunction<M, T, M> accumulator = (m, t) -> {
                 K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                 D resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
@@ -474,7 +484,7 @@
                 }
                 return m;
             };
-            return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true);
+            return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT);
         }
         else {
             BiFunction<M, T, M> accumulator = (m, t) -> {
@@ -497,7 +507,7 @@
                     }
                 } while (true);
             };
-            return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true);
+            return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT);
         }
     }
 
@@ -549,7 +559,7 @@
         };
         return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(),
                                                          downstream.resultSupplier().get()),
-                                   accumulator, partitionMerger(downstream.combiner()), false, true);
+                                   accumulator, partitionMerger(downstream.combiner()), CH_STRICT);
     }
 
     private static<D> BinaryOperator<Map<Boolean, D>> partitionMerger(BinaryOperator<D> op) {
@@ -607,7 +617,7 @@
                           BinaryOperator<U> mergeFunction) {
         BiFunction<M, T, M> accumulator
                 = (map, value) -> { map.merge(value, mapper.apply(value), mergeFunction); return map; };
-        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), false, true);
+        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_STRICT);
     }
 
     /**
@@ -655,7 +665,7 @@
         BiFunction<M, T, M> accumulator = (map, value) -> {
             map.merge(value, mapper.apply(value), mergeFunction); return map;
         };
-        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), true, true);
+        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_CONCURRENT);
     }
 
     /**
@@ -670,7 +680,7 @@
     Collector<T, IntSummaryStatistics> toIntSummaryStatistics(ToIntFunction<? super T> mapper) {
         return new CollectorImpl<>(IntSummaryStatistics::new,
                                    (r, t) -> { r.accept(mapper.applyAsInt(t)); return r; },
-                                   (l, r) -> { l.combine(r); return l; }, false, true);
+                                   (l, r) -> { l.combine(r); return l; }, CH_STRICT);
     }
 
     /**
@@ -685,7 +695,7 @@
     Collector<T, LongSummaryStatistics> toLongSummaryStatistics(ToLongFunction<? super T> mapper) {
         return new CollectorImpl<>(LongSummaryStatistics::new,
                                    (r, t) -> { r.accept(mapper.applyAsLong(t)); return r; },
-                                   (l, r) -> { l.combine(r); return l; }, false, true);
+                                   (l, r) -> { l.combine(r); return l; }, CH_STRICT);
     }
 
     /**
@@ -700,7 +710,7 @@
     Collector<T, DoubleSummaryStatistics> toDoubleSummaryStatistics(ToDoubleFunction<? super T> mapper) {
         return new CollectorImpl<>(DoubleSummaryStatistics::new,
                                    (r, t) -> { r.accept(mapper.applyAsDouble(t)); return r; },
-                                   (l, r) -> { l.combine(r); return l; }, false, true);
+                                   (l, r) -> { l.combine(r); return l; }, CH_STRICT);
     }
 
     private static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> {
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Apr 03 21:51:55 2013 +0200
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Wed Apr 03 16:12:46 2013 -0400
@@ -440,7 +440,7 @@
 
     @Override
     public final <R> R collectUnordered(Collector<? super U, R> collector) {
-        if (collector.isConcurrent()) {
+        if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
             R container = collector.resultSupplier().get();
             BiFunction<R, ? super U, R> accumulator = collector.accumulator();
             forEach(u -> accumulator.apply(container, u));