从集合中获取并行流


问题内容

它是纠正与Java 8,你需要执行下面的代码确实获得从平行流Collection?

private <E> void process(final Collection<E> collection) {
    Stream<E> stream = collection.parallelStream().parallel();
    //processing
}

从CollectionAPI:

默认Stream parallelStream()

返回一个可能与此流作为其源的并行Stream。此方法允许返回顺序流。

从BaseStreamAPI:

S parallel()

返回并行的等效流。可能由于流已经是并行的,或者因为基础流的状态被修改为并行而返回自身。

我需要调用一个可以使流并行化两次的函数,这并不难吗?


问题答案:

基本上,默认实现Collection.parallelStream()不会创建并行流。实现看起来像这样:

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

但这是默认方法,对于某些实现类也提供另一种实现来创建顺序流是完全有效的。例如,假设我创建了一个SequentialArrayList

class MySequentialArrayList extends ArrayList<String> {
    @Override
    public Stream<String> parallelStream() {
        return StreamSupport.stream(spliterator(), false);
    }
}

对于该类的对象,false将按预期输出以下代码:

ArrayList<String> arrayList = new MySequentialArrayList();
System.out.println(arrayList.parallelStream().isParallel());

在这种情况下,调用BaseStream#parallel()方法可确保返回的流始终是并行的。通过将parallel字段设置为,它要么已经并行,要么使其并行true:

public final S parallel() {
    sourceStage.parallel = true;
    return (S) this;
}

这是AbstractPipeline#parallel()方法的实现。

因此,将打印以下针对同一对象的代码true:

System.out.println(arrayList.parallelStream().parallel().isParallel());

但是,如果流已经是并行的,则是的,这是一个额外的方法调用,但这将确保您始终获得并行流。我还没有深入研究流的并行化,因此我无法评论哪种Collection或在什么情况下会parallelStream()为您提供顺序流。