Steam流式编程原理

2022/10/15 JDK8 Stream

Stream流式编程原理

什么是Stream

Stream它并不是一个容器,它只是对容器的功能进行了增强,添加了很多便利的操作,例如查找、过滤、分组、排序等一系列的操作。 并且有串行、并行两种执行模式,并行模式充分的利用了多核处理器的优势,使用fork/join框架进行了任务拆分,同时提高了执行速度。 简而言之,Stream就是提供了一种高效且易于使用的处理数据的方式。

Stream特点:

  • Stream自己不会存储元素。
  • Stream操作不会改变源对象。相反,他们会返回一个持有结果的新Stream。
  • Stream操作是延迟执行的。它会等到需要结果的时候才执行。也就是执行终端操作的时候。

一个Stream的操作在一个管道内,分为三个步骤:

  • 第一步是创建Stream,从集合、数组中获取一个流;
  • 第二步是中间操作链,对数据进行处理;
  • 第三步是终端操作,用来执行中间操作链,返回结果;

为什么需要流式操作

集合API是Java API中最重要的部分。基本上每一个java程序都离不开集合。尽管很重要,但是现有的集合处理在很多方面都无法满足需要。 一个原因是,许多其他的语言或者类库以声明的方式来处理特定的数据模型,比如SQL语言,你可以从表中查询,按条件过滤数据,并且以某种形式将数据分组,而不必需要了解查询是如何实现的——数据库帮你做所有的脏活。这样做的好处是你的代码很简洁。很遗憾,Java没有这种好东西,你需要用控制流程自己实现所有数据查询的底层的细节。 其次是你如何有效地处理包含大量数据的集合。理想情况下,为了加快处理过程,你会利用多核架构。但是并发程序不太好写,而且很容易出错。 Stream API很好的解决了这两个问题。它抽象出一种叫做流的东西让你以声明的方式处理数据,更重要的是,它还实现了多线程:帮你处理底层诸如线程、锁、条件变量、易变变量等等。

怎么创建Stream

常用的Stream有三种创建方式:

集合 Collection.stream()
数组 Arrays.stream
静态方法 Stream.of

由集合创建

Java8 中的 Collection 接口被扩展,提供了两个获取流的方法,这两个方法是default方法,也就是说所有实现Collection接口的接口都不需要实现就可以直接使用:

default Stream stream() : 返回一个串行流
default Stream parallelStream() : 返回一个并行流
List<Integer> integerList = new ArrayList<>();
integerList.add(1);
integerList.add(2);
Stream<Integer> stream = integerList.stream();
Stream<Integer> stream1 = integerList.parallelStream();

由数组创建

Java8 中的 Arrays 的静态方法 stream() 可以获取数组流:

int[] array = {1,2,3};
Stream<Integer> stream = Arrays.stream(array);

由静态方法Stream.of创建

可以使用静态方法 Stream.of(), 通过显示值 创建一个流。它可以接收任意数量的参数。

Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);

Stream中间操作

如果Stream只有中间操作是不会执行的,当执行终端操作的时候才会执行中间操作,这种方式称为延迟加载或惰性求值。多个中间操作组成一个中间操作链,只有当执行终端操作的时候才会执行一遍中间操作链,下面看下Stream有哪些中间操作。

distinct

distinct: 对于Stream中包含的元素进行去重操作(去重逻辑依赖元素的equals方法),新生成的Stream中没有重复的元素;

List<String> list = Arrays.asList("b","b","c","a");
list.forEach(System.out::print); //bbca
list.stream().distinct().forEach(System.out::print);//bca

filter

filter: 对于Stream中包含的元素使用给定的过滤函数进行过滤操作,新生成的Stream只包含符合条件的元素;

//筛选1501班的学生
computerClub.stream().filter(e -> e.getClassNum().equals("1501")).forEach(System.out::println);
//筛选年龄大于15的学生
List<Student> collect = computerClub.stream().filter(e -> e.getAge() > 15).collect(Collectors.toList());

map

map: 对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。 这个方法有三个对于原始类型的变种方法,分别是:mapToInt,mapToLong和mapToDouble。这三个方法也比较好理解,比如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。之所以会有这样三个变种方法,可以免除自动装箱/拆箱的额外消耗;

//篮球俱乐部所有成员名 + 暂时住上商标^_^,并且获取所有队员名
List<String> collect1 = basketballClub.stream()
        .map(e -> e.getName() + "^_^")
        .collect(Collectors.toList());
        collect1.forEach(System.out::println);

flatMap

flatMap:和map类似,不同的是其每个元素转换得到的是Stream对象,会把子Stream中的元素压缩到父集合中;

//获取年龄大于15的所有俱乐部成员
List<Student> collect2 = Stream.of(basketballClub, computerClub, pingpongClub)
.flatMap(e -> e.stream().filter(s -> s.getAge() > 15))
.collect(Collectors.toList());
collect2.forEach(System.out::println);

//用双层list获取所有年龄大于15的俱乐部成员
List<Student> collect3 = allClubStu.stream()
.flatMap(e -> e.stream().filter(s -> s.getAge() > 15))
.collect(Collectors.toList());
collect3.forEach(System.out::println);

peek

peek: 生成一个包含原Stream的所有元素的新Stream,同时会提供一个消费函数(Consumer实例),新Stream每个元素被消费的时候都会执行给定的消费函数;

//篮球俱乐部所有成员名 + 赞助商商标^_^,并且获取所有队员详细内容
List<Student> collect = basketballClub.stream()
.peek(e -> e.setName(e.getName() + "^_^"))
.collect(Collectors.toList());
collect.forEach(System.out::println);
//Student{idNum='2015134012', name='小c^_^', age=13, classNum='1503'}
//Student{idNum='2015134013', name='小s^_^', age=14, classNum='1503'}
//Student{idNum='2015134015', name='小d^_^', age=15, classNum='1504'}
//Student{idNum='2015134018', name='小y^_^', age=16, classNum='1505'}

limit

limit: 对一个Stream进行截断操作,获取其前N个元素,如果原Stream中包含的元素个数小于N,那就获取其所有的元素;

List<String> list = Arrays.asList("a","b","c");
//获取list中top2即截断取前两个
List<String> collect1 = list.stream().limit(2).collect(Collectors.toList());
collect1.forEach(System.out::print);//ab

skip

skip: 返回一个丢弃原Stream的前N个元素后剩下元素组成的新Stream,如果原Stream中包含的元素个数小于N,那么返回空Stream;

List<String> list = Arrays.asList("a","b","c");
//获取list中top2即截断取前两个
List<String> collect1 = list.stream().skip(2).collect(Collectors.toList());
collect1.forEach(System.out::print);//c

sorted

sorted有两种形式存在: sorted(Comparator): 指定比较规则进行排序。 sorted(): 产生一个新流,按照自然顺序排序。

List<String> list = Arrays.asList("b","c","a");
//获取list中top2即截断取前两个
List<String> collect1 = list.stream().sorted().collect(Collectors.toList());
collect1.forEach(System.out::print);//abc

Stream的终端操作

如果说Stream中间操作返回的是Stream,那么终端操作返回的就是最终转换需要返回的结果。

汇聚操作:

foreach(Consumer c) 遍历操作 collect(Collector) 将流转化为其他形式 其中Collectors具体方法有:

  • toList List 把流中元素收集到List
  • toSet Set 把流中元素收集到Set
  • toCollection Coolection 把流中元素收集到Collection中
  • groupingBy Map<K,List> 根据K属性对流进行分组
  • partitioningBy Map<boolean, List> 根据boolean值进行分组
//此处只是演示 此类需求直接用List构造器即可
List<Student> collect = computerClub.stream().collect(Collectors.toList());
Set<Student> collect1 = pingpongClub.stream().collect(Collectors.toSet());

//注意key必须是唯一的 如果不是唯一的会报错而不是像普通map那样覆盖
        Map<String, String> collect2 = pingpongClub.stream()
        .collect(Collectors.toMap(Student::getIdNum, Student::getName));

//分组 类似于数据库中的group by
        Map<String, List<Student>> collect3 = pingpongClub.stream()
        .collect(Collectors.groupingBy(Student::getClassNum));

//字符串拼接 第一个参数是分隔符 第二个参数是前缀 第三个参数是后缀
        String collect4 = pingpongClub.stream().map(Student::getName).collect(Collectors.joining(",", "【", "】")); //【小u,小i,小m,小n】

//三个俱乐部符合年龄要求的按照班级分组
        Map<String, List<Student>> collect5 = Stream.of(basketballClub, pingpongClub, computerClub)
        .flatMap(e -> e.stream().filter(s -> s.getAge() < 17))
        .collect(Collectors.groupingBy(Student::getClassNum));

//按照是否年龄>16进行分组 key为true和false
        ConcurrentMap<Boolean, List<Student>> collect6 = Stream.of(basketballClub, pingpongClub, computerClub)
        .flatMap(Collection::stream)
        .collect(Collectors.groupingByConcurrent(s -> s.getAge() > 16));


匹配操作

booelan allMatch(Predicate) 都符合 boolean anyMatch(Predicate) 任一元素符合 boolean noneMatch(Predicate) 都不符合 boolean b = basketballClub.stream().allMatch(e -> e.getAge() < 20); boolean b1 = basketballClub.stream().anyMatch(e -> e.getAge() < 20); boolean b2 = basketballClub.stream().noneMatch(e -> e.getAge() < 20);

寻找操作

findFirst——返回第一个元素 findAny——返回当前流中的任意元素

Optional<Student> first = basketballClub.stream().findFirst();
if (first.isPresent()) {
    Student student = first.get();
    System.out.println(student);
}

Optional<Student> any = basketballClub.stream().findAny();
if (any.isPresent()) {
    Student student2 = any.get();
    System.out.println(student2);
}
Optional<Student> any1 = basketballClub.stream().parallel().findAny();
System.out.println(any1);

计数和极值

  • count 返回流中元素的总个数
  • max(Comparator) 返回流中最大值
  • min(Comparator) 返回流中最小值
    long count = basketballClub.stream().count();
    Optional<Student> max = basketballClub.stream().max(Comparator.comparing(Student::getAge));
    if (max.isPresent()) {
      Student student = max.get();
    }
    Optional<Student> min = basketballClub.stream().min(Comparator.comparingInt(Student::getAge));
    if (min.isPresent()) {
      Student student = min.get();
    }
    

    Stream怎么用

    Stream没用之前我们针对集合的便利帅选等操作更多的是for-loop/while-loop,用了Stream后发现原来代码可以如此简洁,并且越发形似SQL语句。甚至可以做很多复杂的动作:

    ap<Integer, List<String>> lowCaloricDishesNameGroup = 
      dishes.parallelStream() // 开启并行处理
            .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
            .sorted(comparing(Dish::getCalories)) // 按照热量进行排序
            .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
                Dish::getCalories, 
                Collectors.mapping(Dish::getName, Collectors.toList())
            ));
    

Stream的操作分类

Stream使用一种类似SQL语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等。 数据流在管道中经过中间操作(intermediate operation)处理,由终止操作(terminal operation)得到前面处理的结果。这些也在《Stream流式编程知识总结》有相应的说明。

Stream操作分为两类:

  • 中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted等。

中间操作又分为有状态(stateful)及无状态(stateless)

  • 有状态:必须等上一步操作完拿到全部元素后才可操作,如sorted
  • 无状态:该操作的数据不收上一步操作的影响,如filter map

终止操作:触发数据的流动,并收集结果,如collect findFirst forEach等。终止操作又分为短路操作(short-circuiting)及非短路操作(non-short-circuiting)

  • 短路操作:会在适当的时刻终止遍历,类似于break,如anyMatch findFirst等
  • 非短路操作:会遍历所有元素,如collect max等

Stream的实现过程

我们已经学会如何使用Stream API,用起来真的很爽,但简洁的方法下面似乎隐藏着无尽的秘密,如此强大的API是如何实现的呢? 比如Pipeline是怎么执行的,每次方法调用都会导致一次迭代吗?自动并行又是怎么做到的,线程个数是多少?我们学习Stream流水线的原理,这是Stream实现的关键所在。 首先回顾一下容器执行Lambda表达式的方式,以ArrayList.forEach()方法为例,具体代码如下:

// ArrayList.forEach()
public void forEach(Consumer<? super E> action) {
    ...
    for (int i=0; modCount == expectedModCount && i < size; i++) {
        action.accept(elementData[i]);// 回调方法
    }
    ...
}

我们看到ArrayList.forEach()方法的主要逻辑就是一个for循环,在该for循环里不断调用action.accept()回调方法完成对元素的遍历。

为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。

IntStream.range(1, 10)
   .peek(x -> System.out.print("\nA" + x))
   .limit(3)
   .peek(x -> System.out.print("B" + x))
   .forEach(x -> System.out.print("C" + x));

输出为:

A1B1C1
A2B2C2
A3B3C3

中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的forEach操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,…,直到最初的第一步。 第一次forEach执行的时候,会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A1B1C1第二次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A2B2C2

…当第四次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,到limit的时候,发现limit(3)这个job已经完成,这里就相当于循环里面的break操作,跳出来终止循环。

如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串
        int len = str.length();// 2. mapToInt(), 转换成长度
        longest = Math.max(len, longest);// 3. max(), 保留最长的长度
    }
}

采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。

如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。

Stream的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。 Stream采用某种方式记录用户每一步的操作,中间操作会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,面对如此简洁高效的API不由得使我们有所疑问:

  • 用户的操作如何记录?
  • 操作如何叠加?
  • 叠加后的操作如何执行?
  • 执行后的结果(如果有)在哪里?

操作如何记录

注意这里使用的是“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。 Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将各Pipeline按照先后顺序连接到一起,就构成了整个流水线。

  • Head记录Stream起始操作,将包装为Spliterator的原始数据存放在Stage中
  • StatelessOp记录无状态的中间操作
  • StatefulOp记录有状态的中间操作
  • TerminalOp用于触发数据数据在各Stage间的流动及处理,并收集最终数据(如果有)

使用Collection.stream、Arrays.stream或Stream.of等接口会生成Head,其内部均采用StreamSupport.stream方法,将原始数据包装为Spliterator存放在Stage中。

Head、StatelessOp、StatefulOp三个操作实例化会指向其父类AbstractPipeline。 对于Head:

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

Head操作会将包装为Spliterator的原始数据存放在该Stage中,将自身存放sourceStage中,并把串并行操作也记录在内。Head的前期功能就是记录这些源数据。 对于StatelessOp及StatefulOp:

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

中间操作通过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操作。

每一个中间操作Stage中的sourceStage都指向前一个Stage的soureStage,如此递归,最终指向Head。卧槽,似乎是不是明白些啥了,接着往下看吧,现在仅仅是第一阶段。

操作如何叠加

以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。

这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。

上一个问题解决了每一步操作数据源以及内部实现是怎么记录的,此时并没有执行,Stage只是保存了当前的操作,并不能确定下一个Stage需要何种操作,所以想要让pipeline运行起来,需要一种将所有操作叠加到一起的方案。

Stream类库采用了Sink接口来协调各Stage之间的关系:

interface Sink<T> extends Consumer<T> {
    /**
     * Resets the sink state to receive a fresh data set.  This must be called
     * before sending any data to the sink.  After calling {@link #end()},
     * you may call this method to reset the sink for another calculation.
     * @param size The exact size of the data to be pushed downstream, if
     * known or {@code -1} if unknown or infinite.
     *
     * <p>Prior to this call, the sink must be in the initial state, and after
     * this call it is in the active state.
     *
     * 开始遍历前调用,通知Sink做好准备
     */
    default void begin(long size) {}

    /**
     * Indicates that all elements have been pushed.  If the {@code Sink} is
     * stateful, it should send any stored state downstream at this time, and
     * should clear any accumulated state (and associated resources).
     *
     * <p>Prior to this call, the sink must be in the active state, and after
     * this call it is returned to the initial state.
     *
     * 所有元素遍历完成后调用,通知Sink没有更多元素了
     */
    default void end() {}
    
    /**
     * Indicates that this {@code Sink} does not wish to receive any more data.
     *
     * @implSpec The default implementation always returns false.
     *
     * @return true if cancellation is requested
     *
     * 是否可以结束操作,可以让短路操作尽早结束
     */
    default boolean cancellationRequested() {}

    /**
     * Accepts a value.
     *
     * @implSpec The default implementation throws IllegalStateException.
     *
     * @throws IllegalStateException if this sink does not accept values
     *
     * 遍历时调用,接收的一个待处理元素,并对元素进行处理。
     * Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前
     * Stage.accept方法即可
     */
    default void accept(T value) {}
}

其实Stream的各种操作实现的本质,就是如何重载Sink的这四个接口方法,各个操作通过Sink接口accept方法依次向下传递执行。

下面结合具体源码来理解Stage是如何将自身的操作包装成Sink,以及Sink是如何将处理结果转发给下一个Sink的。 无状态Stage(Stream.map):

// Stream.map 将生成一个新Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        // 该方法将回调函数(处理逻辑)包装成Sink
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

有状态Stage(Stream.sorted):

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    // 存放用于排序的元素
    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }

    @Override
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        // 创建用于存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
        // 只有在接收到所有元素后才开始排序
        list.sort(comparator);
        downstream.begin(list.size());
        // 排序完成后,将数据传递给下游Sink
        if (!cancellationWasRequested) {
            // 下游Sink不包含短路操作,将数据依次传递给下游Sink
            list.forEach(downstream::accept);
        }
        else {
            // 下游Sink包含短路操作
            for (T t : list) {
                // 对于每一个元素,都要询问是否可以结束处理
                if (downstream.cancellationRequested()) break;
                // 将元素传递给下游Sink
                downstream.accept(t);
            }
        }
        // 告知下游Sink数据传递完毕
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        // 依次将需要排序的元素加入到临时列表中
        list.add(t);
    }
}

Stream.sorted会在接收到所有元素之后再进行排序,之后才开始将数据依次传递给下游Sink。

两个操作之间通过Sink接口的accept方法进行挂钩,此时如果从第一个Sink开始执行accept方法便可以把整个管道流动起来,但是这个“如果”怎么实现呢?另外记着每一个操作中的opWrapSink是用于包装Sink的,也就是说只有包装后的Sink才具有条件使得整个管道流动起来。 叠加后的操作如何执行 终止操作(TerminalOp)之后不能再有别的操作,终止操作会创建一个包装了自己操作的Sink,这个Sink只处理数据而不会将数据传递到下游Sink(没有下游了)。

在调用Stream的终止操作时,会执行AbstractPipeline.evaluate:

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}

最终会根据并行还是串行执行TerminalOp中不同的的evaluate方法。如果是串行执行,接下来在TerminalOp的evaluate方法中会调用wrapAndCopyInto来包装、串联各层Sink,触发pipeline,并获取最终结果。

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

其中wrapSink(包装)实现:

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    // AbstractPipeline.this,最后一层Stage
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 从下游向上游遍历,不断包装Sink
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */);
    }
    return (Sink<P_IN>) sink;
}

wrapSink方法通过下游Stage的“opWrapSink”方法不断将下游Stage的Sink从下游向上游遍历包装,最终得到上文我说的第一个Sink。 有了第一个Sink,如何执行呢,还记的wrapAndCopyInto中的copyInto吧:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        // 不包含短路操作
        
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍历调用 sink.accept
        spliterator.forEachRemaining(wrappedSink);
        // 3. end
        wrappedSink.end();
    }
    else {
        // 包含短路操作
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    @SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    while (p.depth > 0) {
        p = p.previousStage;
    }
    // 1. begin
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    // 2. 遍历调用 sink.accept
    //    每一次遍历都询问cancellationRequested结果
    //    如果cancellationRequested为true,则中断遍历
    p.forEachWithCancel(spliterator, wrappedSink);
    // 3. end
    wrappedSink.end();
}

copyInto会根据不同的情况依次调用:

sink.bigin sink.accept(遍历调用,如果包含短路操作,则每次遍历都需要询问cancellationRequested,适时中断遍历) sink.end 执行结果在哪儿 每一种TerminalSink中均会提供一个获取最终结果的方法: TerminalOp通过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中:

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

Stream并行执行原理 使用Collection.parallelStream或Stream.parallel等方法可以将当前的Stream流标记为并行执行。

上文提到在调用Stream的终止操作时,会执行AbstractPipeline.evaluate方法,根据paraller标识是执行并行操作还是串行操作:

...
return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */

如果被标记为sequential,则会调用TerminalOp.evaluateSequential,evaluateSequential的调用过程上文已经讲述的很清楚。

如果被标记为parallel,则会调用TerminalOp.evaluateParallel,对于该方法不同的TerminalOp会有不同的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果。

Search

    微信好友

    博士的沙漏

    Table of Contents