OpenJDK / lambda / lambda / jdk
changeset 7806:2ffb91b59273
Migrate Collector from having multiple boolean properties to a single Set<Characteristics>; update Collector docs
author | briangoetz |
---|---|
date | Wed, 03 Apr 2013 16:12:46 -0400 |
parents | f7fb4e3d6a20 |
children | 68138df9be76 |
files | src/share/classes/java/util/stream/Collector.java src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/ReferencePipeline.java |
diffstat | 3 files changed, 160 insertions(+), 106 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collector.java Wed Apr 03 21:51:55 2013 +0200 +++ b/src/share/classes/java/util/stream/Collector.java Wed Apr 03 16:12:46 2013 -0400 @@ -24,72 +24,87 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Supplier; /** - * A reduction operation that supports folding input elements into a cumulative result. The result may be a value - * or may be a mutable result container. Examples of operations accumulating results into a mutable result - * container include: accumulating input elements into a {@code Collection}; concatenating strings into - * a {@code StringBuilder}; computing summary information about elements such as sum, min, max, or average; - * computing "pivot table" summaries such as "maximum valued transaction by seller", etc. Reduction operations - * can be performed either sequentially or in parallel. + * A <a href="package-summary.html#Reduction">reduction operation</a> that supports folding + * input elements into a cumulative result. The result may be a value or may be a mutable + * result container. Examples of operations accumulating results into a mutable result + * container include: accumulating input elements into a {@code Collection}; concatenating + * strings into a {@code StringBuilder}; computing summary information about elements such as + * sum, min, max, or average; computing "pivot table" summaries such as "maximum valued + * transaction by seller", etc. Reduction operations can be performed either sequentially or + * in parallel. * - * <p>A {@code Collector} has three functions that perform: creation of an initial result, - * incorporating a new data element into a result, and combining two result into one. - * The last function -- combining two results into one -- is used during parallel operations, where we - * collect subsets of the input in parallel, and the merge the subresults into a combined result. - * The result may be a mutable container or a value. If the result is mutable, the accumulation and combination functions - * may either mutate their left argument and return that (such as adding elements to a collection), - * or return a new result (in which case it should not mutate anything). + * <p>A {@code Collector} is specified by three functions that work together to manage + * a result or result container. They are: creation of an initial result, incorporating + * a new data element into a result, and combining two result into one. The last function + * -- combining two results into one -- is used during parallel operations, where subsets + * of the input are collected in parallel, and then the subresults merged into a combined + * result. The result may be a mutable container or a value. If the result is mutable, + * the accumulation and combination functions may either mutate their left argument and return + * that (such as adding elements to a collection), or return a new result, in which case it + * should not perform any mutation. + * + * <p>Collectors also have a set of characteristics, including {@link Characteristics#CONCURRENT} + * and {@link Characteristics#STRICTLY_MUTATIVE}. These characteristics provide hints that + * can be used by a reduction implementation to provide better performance. * - * <p>Libraries that implement reduction based on {@code Collector}, such as the {@link Stream#collect(Collector)} - * must adhere to the following constraints: + * <p>Libraries that implement reduction based on {@code Collector}, such as the + * {@link Stream#collect(Collector)} must adhere to the following constraints: * <ul> - * <li>The first argument passed to the accumulator function, and both arguments passed to the combiner - * function, must be the result of of a previous invocation of {@link #resultSupplier()}, {@link #accumulator()}, or - * {@link #combiner()}.</li> - * <li>The implementation should not do anything with the result of any of the result supplier, accumulator, - * or combiner functions other than to pass them again to the accumulator or combiner functions, - * or return them to the caller of the reduction operation.</li> - * <li>If a result is passed to the accumulator or combiner function, and the same object is not returned - * from that function, it is never used again.</li> - * <li>Once a result is passed to the combiner function, it is never passed to the accumulator function again.</li> - * <li>For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner - * functions must be serially thread-confined.</li> + * <li>The first argument passed to the accumulator function, and both arguments passed + * to the combiner function, must be the result of of a previous invocation of + * {@link #resultSupplier()}, {@link #accumulator()}, or {@link #combiner()}.</li> + * <li>The implementation should not do anything with the result of any of the result + * supplier, accumulator, or combiner functions other than to pass them again to the + * accumulator or combiner functions, or return them to the caller of the reduction + * operation.</li> + * <li>If a result is passed to the accumulator or combiner function, and the same object + * is not returned from that function, it is never used again.</li> + * <li>Once a result is passed to the combiner function, it is never passed to the + * accumulator function again.</li> + * <li>For non-concurrent collectors, any result returned from the result supplier, + * accumulator, or combiner functions must be serially thread-confined. This enables + * collection to occur in parallel without the {@code Collector} needing to implement + * any additional synchronization. The reduction implementation must manage that + * the input is properly partitioned, that partitions are processed in isolation, + * and combining happens only after accumulation is complete.</li> + * <li>For concurrent collectors, an implementation is free to (but not required to) + * implement reduction concurrently. A concurrent collection is one where the + * accumulator function is called concurrently from multiple threads, rather than + * keeping the result isolated during accumulation.</li> * </ul> * - * <p>Additionally, - * collectors have two properties, {@code isConcurrent} and {@code isStable}. A concurrent collector is one - * for which it is safe to invoke the accumulator function on the same result container concurrently; a stable - * collector is one where the accumulator function always returns the result container that was passed to it. - * Concurrent collectors are always stable. - * * @apiNote - * <p>Performing a reduction operation with a {@code Collector} should produce a result equivalent to: + * <p>Performing a reduction operation with a {@code Collector} should produce a result + * equivalent to: * <pre> * BiFunction<R,T,R> accumulator = collector.accumulator(); * R result = collector.resultSupplier().get(); - * for (T t : inputSource) + * for (T t : data) * result = accumulator.apply(result, t); * return result; * </pre> * - * However, the library is free to partition the input, perform the reduction on the partitions, and then use the - * combiner function to combine the partial results to achieve a parallel reduction. Depending on the specific - * reduction operation, this may perform better or worse, depending on the relative cost of the accumulator - * and combiner functions. + * However, the library is free to partition the input, perform the reduction on the partitions, + * and then use the combiner function to combine the partial results to achieve a parallel + * reduction. Depending on the specific reduction operation, this may perform better or worse, + * depending on the relative cost of the accumulator and combiner functions. * - * <p>An example of an operation that can be easily modeled by {@code Collector} is accumulating elements into a - * {@code TreeSet}. In this case, the @{code resultSupplier()} function is {@code new Treeset<T>()}, the - * {@code accumulator} function is {@code (set, element) -> { set.add(element); return set; }}, and the combiner - * function is {@code (left, right) -> { left.addAll(right); return left; }}. (This behavior is implemented by - * the method {@code Collectors.toCollection(TreeSet::new)}). + * <p>An example of an operation that can be easily modeled by {@code Collector} is accumulating + * elements into a {@code TreeSet}. In this case, the @{code resultSupplier()} function is + * {@code () -> new Treeset<T>()}, the {@code accumulator} function is + * {@code (set, element) -> { set.add(element); return set; }}, and the combiner function is + * {@code (left, right) -> { left.addAll(right); return left; }}. (This behavior is + * implemented by the method {@code Collectors.toCollection(TreeSet::new)}). * - * <p>The {@code Collector} - * - * @@@ Document concurrent behavior and interaction with ordering + * TODO Document concurrent behavior and interaction with ordering + * TODO Associativity and commutativity * * @see Stream#collect(Collector) * @see Stream#collectUnordered(Collector) @@ -101,46 +116,75 @@ */ public interface Collector<T, R> { /** - * A function that creates and return a new result that represents "no values". If the accumulator or combiner - * functions may mutate their arguments, this must be a new, empty result container. + * A function that creates and return a new result that represents "no values". + * If the accumulator or combiner functions may mutate their arguments, this must + * be a new, empty result container. * * @return A function which, when invoked, returns a result representing "no values" */ Supplier<R> resultSupplier(); /** - * @@@ needs update @@@ - * @@@ Interaction with stability - * @@@ associativity @@@ - * A function that accepts a result container and a value and incorporates the value into the container. - * @return A function which, when invoked with a result container and a value, modifies the state of the result - * container to reflect incorporation of the new value + * A function that folds a new value into a cumulative result. The result may be a + * mutable result container or a value. The accumulator function may modify a mutable + * container and return it, or create a new result and return that, but it should not + * modify a provided mutable container and return a different object. + * + * <p>If the collector has the {@link Characteristics#STRICTLY_MUTATIVE} characteristic, + * then the accumulator function <em>must</em> always return its first argument, after + * possibly mutating its state. + * + * @return A function which folds a new value into a cumulative results * * MUST either mutate and return same, or NOT mutate */ BiFunction<R, T, R> accumulator(); /** - * A function that accepts two result containers and combines their states. It may return a new container, - * or may merge the state of one container into the other, and return the modified container. - * @return A function which, when invoked with two result containers and combines their states. + * A function that accepts two partial results and merges them. It may fold state from one + * argument into the other and return that, or may return a new result, but if it returns + * a new result, should not modify the state of either of its arguments. + * + * <p>If the collector has the {@link Characteristics#STRICTLY_MUTATIVE} characteristic, + * then the combiner function <em>must</em> always return its first argument, after + * possibly mutating its state. + * + * @return A function which combines two partial results into a cumulative result */ BinaryOperator<R> combiner(); /** - * Indicates that this collector is <em>concurrent</em>, meaning that the result container can support the - * accumulator function being called concurrently with the same result container from multiple threads. - * Concurrent collectors must always be stable. - * - * @return True if this collector supports concurrent accumulation + * Returns a {@code Set} of {@code Collector.Characteristics}. This set + * should be immutable. + * @return An immutable set of collector characteristics */ - default boolean isConcurrent() { return false; } + default Set<Characteristics> characteristics() { + return Collections.emptySet(); + } - /** - * Indicates that this collector is <em>stable</em>, meaning that the accumulator function always returns - * the result container passed in as its first argument. - * - * @return True if this collector is stable - */ - default boolean isStable() { return false; } + enum Characteristics { + /** + * Indicates that this collector is <em>concurrent</em>, meaning that the result + * container can support the accumulator function being called concurrently with + * the same result container from multiple threads. Concurrent collectors must also + * always have the STRICTLY_MUTATIVE characteristic. + * + * <p>Because a concurrent collection cannot guarantee that the elements will be + * presented to the accumulator function in encounter order, a concurrent collector + * must represent a combining operation that is not only + * <a href="package-summary.html#Associativity">associative</a>, but also commutative. + */ + CONCURRENT, + /** + * Indicates that the result container has no intrinsic order, such as a {@link Set}. + */ + UNORDERED, + /** + * Indicates that this collector operates by strict mutation of its result container. + * This means that the {@link #accumulator()} and {@link #combiner()} functions will + * always modify the state of and return their first argument, rather than returning + * a different result container. + */ + STRICTLY_MUTATIVE + } }
--- a/src/share/classes/java/util/stream/Collectors.java Wed Apr 03 21:51:55 2013 +0200 +++ b/src/share/classes/java/util/stream/Collectors.java Wed Apr 03 16:12:46 2013 -0400 @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.DoubleSummaryStatistics; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.IntSummaryStatistics; @@ -60,6 +61,15 @@ */ public final class Collectors { + private static final Set<Collector.Characteristics> CH_CONCURRENT + = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, + Collector.Characteristics.STRICTLY_MUTATIVE)); + private static final Set<Collector.Characteristics> CH_STRICT + = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.STRICTLY_MUTATIVE)); + private static final Set<Collector.Characteristics> CH_STRICT_UNORDERED + = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.STRICTLY_MUTATIVE, + Collector.Characteristics.UNORDERED)); + private Collectors() {} /** @@ -78,19 +88,22 @@ private final Supplier<R> resultSupplier; private final BiFunction<R, T, R> accumulator; private final BinaryOperator<R> combiner; - private final boolean isConcurrent; - private final boolean isStable; + private final Set<Characteristics> characteristics; CollectorImpl(Supplier<R> resultSupplier, BiFunction<R, T, R> accumulator, BinaryOperator<R> combiner, - boolean isConcurrent, - boolean isStable) { + Set<Characteristics> characteristics) { this.resultSupplier = resultSupplier; this.accumulator = accumulator; this.combiner = combiner; - this.isConcurrent = isConcurrent; - this.isStable = isStable; + this.characteristics = characteristics; + } + + CollectorImpl(Supplier<R> resultSupplier, + BiFunction<R, T, R> accumulator, + BinaryOperator<R> combiner) { + this(resultSupplier, accumulator, combiner, Collections.emptySet()); } @Override @@ -109,13 +122,8 @@ } @Override - public boolean isConcurrent() { - return isConcurrent; - } - - @Override - public boolean isStable() { - return isStable; + public Set<Characteristics> characteristics() { + return characteristics; } } @@ -135,7 +143,7 @@ return new CollectorImpl<>(collectionFactory, (r, t) -> { r.add(t); return r; }, (r1, r2) -> { r1.addAll(r2); return r1; }, - false, true); + CH_STRICT); } /** @@ -177,7 +185,7 @@ return left; } }; - return new CollectorImpl<>(Collections::emptyList, accumulator, combiner, false, false); + return new CollectorImpl<>(Collections::emptyList, accumulator, combiner); } /** @@ -190,9 +198,10 @@ */ public static<T> Collector<T, Set<T>> toSet() { - // @@@ Declare that the collector is NOT_ORDERED so the reduce op can declare NOT_ORDERED in - // the terminal op flags - return toCollection(HashSet::new); + return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, + (r, t) -> { r.add(t); return r; }, + (r1, r2) -> { r1.addAll(r2); return r1; }, + CH_STRICT_UNORDERED); } /** @@ -204,7 +213,8 @@ public static Collector<String, StringBuilder> toStringBuilder() { return new CollectorImpl<>(StringBuilder::new, (r, t) -> { r.append(t); return r; }, - (r1, r2) -> { r1.append(r2); return r1; }, false, true); + (r1, r2) -> { r1.append(r2); return r1; }, + CH_STRICT); } /** @@ -221,7 +231,7 @@ return sj; }; return new CollectorImpl<>(() -> new StringJoiner(separator), (r, t) -> { r.add(t); return r; }, - merger, false, true); + merger, CH_STRICT); } private static<K, V, M extends Map<K,V>> BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) { @@ -261,7 +271,7 @@ BiFunction<R, U, R> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.resultSupplier(), (r, t) -> downstreamAccumulator.apply(r, mapper.apply(t)), - downstream.combiner(), downstream.isConcurrent(), downstream.isStable()); + downstream.combiner(), downstream.characteristics()); } /** @@ -286,7 +296,7 @@ */ public static <T> Collector<T, T> reducing(BinaryOperator<T> op) { - return new CollectorImpl<>(() -> null, (r, t) -> (r == null ? t : op.apply(r, t)), op, false, false); + return new CollectorImpl<>(() -> null, (r, t) -> (r == null ? t : op.apply(r, t)), op); } /** @@ -321,7 +331,7 @@ BinaryOperator<U> op) { return new CollectorImpl<>(() -> null, (r, t) -> (r == null ? mapper.apply(t) : op.apply(r, mapper.apply(t))), - op, false, false); + op); } /** @@ -430,7 +440,7 @@ m.put(key, newContainer); return m; }; - return new CollectorImpl<>(mapFactory, accumulator, mapMerger(downstream.combiner()), false, true); + return new CollectorImpl<>(mapFactory, accumulator, mapMerger(downstream.combiner()), CH_STRICT); } public static<T, K> @@ -457,15 +467,15 @@ Supplier<D> downstreamSupplier = downstream.resultSupplier(); BiFunction<D, T, D> downstreamAccumulator = downstream.accumulator(); BinaryOperator<M> combiner = mapMerger(downstream.combiner()); - if (downstream.isConcurrent()) { + if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) { BiFunction<M, T, M> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); downstreamAccumulator.apply(m.computeIfAbsent(key, k -> downstreamSupplier.get()), t); return m; }; - return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true); + return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT); } - else if (downstream.isStable()) { + else if (downstream.characteristics().contains(Collector.Characteristics.STRICTLY_MUTATIVE)) { BiFunction<M, T, M> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); D resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); @@ -474,7 +484,7 @@ } return m; }; - return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true); + return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT); } else { BiFunction<M, T, M> accumulator = (m, t) -> { @@ -497,7 +507,7 @@ } } while (true); }; - return new CollectorImpl<>(mapFactory, accumulator, combiner, true, true); + return new CollectorImpl<>(mapFactory, accumulator, combiner, CH_CONCURRENT); } } @@ -549,7 +559,7 @@ }; return new CollectorImpl<>(() -> new Partition<>(downstream.resultSupplier().get(), downstream.resultSupplier().get()), - accumulator, partitionMerger(downstream.combiner()), false, true); + accumulator, partitionMerger(downstream.combiner()), CH_STRICT); } private static<D> BinaryOperator<Map<Boolean, D>> partitionMerger(BinaryOperator<D> op) { @@ -607,7 +617,7 @@ BinaryOperator<U> mergeFunction) { BiFunction<M, T, M> accumulator = (map, value) -> { map.merge(value, mapper.apply(value), mergeFunction); return map; }; - return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), false, true); + return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_STRICT); } /** @@ -655,7 +665,7 @@ BiFunction<M, T, M> accumulator = (map, value) -> { map.merge(value, mapper.apply(value), mergeFunction); return map; }; - return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), true, true); + return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_CONCURRENT); } /** @@ -670,7 +680,7 @@ Collector<T, IntSummaryStatistics> toIntSummaryStatistics(ToIntFunction<? super T> mapper) { return new CollectorImpl<>(IntSummaryStatistics::new, (r, t) -> { r.accept(mapper.applyAsInt(t)); return r; }, - (l, r) -> { l.combine(r); return l; }, false, true); + (l, r) -> { l.combine(r); return l; }, CH_STRICT); } /** @@ -685,7 +695,7 @@ Collector<T, LongSummaryStatistics> toLongSummaryStatistics(ToLongFunction<? super T> mapper) { return new CollectorImpl<>(LongSummaryStatistics::new, (r, t) -> { r.accept(mapper.applyAsLong(t)); return r; }, - (l, r) -> { l.combine(r); return l; }, false, true); + (l, r) -> { l.combine(r); return l; }, CH_STRICT); } /** @@ -700,7 +710,7 @@ Collector<T, DoubleSummaryStatistics> toDoubleSummaryStatistics(ToDoubleFunction<? super T> mapper) { return new CollectorImpl<>(DoubleSummaryStatistics::new, (r, t) -> { r.accept(mapper.applyAsDouble(t)); return r; }, - (l, r) -> { l.combine(r); return l; }, false, true); + (l, r) -> { l.combine(r); return l; }, CH_STRICT); } private static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> {
--- a/src/share/classes/java/util/stream/ReferencePipeline.java Wed Apr 03 21:51:55 2013 +0200 +++ b/src/share/classes/java/util/stream/ReferencePipeline.java Wed Apr 03 16:12:46 2013 -0400 @@ -440,7 +440,7 @@ @Override public final <R> R collectUnordered(Collector<? super U, R> collector) { - if (collector.isConcurrent()) { + if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) { R container = collector.resultSupplier().get(); BiFunction<R, ? super U, R> accumulator = collector.accumulator(); forEach(u -> accumulator.apply(container, u));