流式编程思考

2022/10/16 Stream

流式编程思考

各语言流式API的现状

支持流式API的语言其实不多,比较典型的代表是Java的Stream与Kotlin的Sequence(其实是我对这俩最熟。示例代码如下 Java流的示例

Stream.of(1, 2, 3, 4, 5)
    .limit(4)
    .map(i -> i * 2)
    .filter(i -> i % 3 > 0)
    .map(Object::toString)
    .collect(Collectors.joining(","));

以上代码流程为

  • 首先生成一个1~5的流 -> 1, 2, 3, 4, 5
  • 截取前4个 -> 1, 2, 3, 4
  • 每个元素映射为原来的2倍 -> 2, 4, 6, 8
  • 过滤掉能被3整除的数 -> 2, 4, 8
  • 每个元素转为String -> “2”, “4”, “8”
  • 合并所有元素 -> “2,4,8”

Kotlin的流式API与Java类似,只是部分名称稍有不同

sequenceOf(1, 2, 3, 4, 5)
    .take(4)
    .map { it * 2 }
    .filter { it % 3 > 0 }
    .map { it.toString() }
    .joinToString(",")

事实上,Java对流的实现依赖的是Spliterator,是一种特殊的Iterator,可以提供并发的额外好处。相比之下,Kotlin的实现是直接基于Iterator,要简单优雅很多。 为方便演示,后续的示例我主要还是用Java或者一些伪代码展示。

不妨换个思路,从forEach入手

基本上大多数支持了闭包的语言,都会对其集合类型list或者array提供一个for循环,更高级一点的,还有一个大家通常称之为forEach的函数式接口。该接口接受一个consumer作为入参:对于集合中的每一个元素,都进行某种特定处理。即

a.forEach(x -> println(x))

等价于

for x in a
    println(x)

现在不妨假设我们有[1,2,3,4]这样一个列表,使用forEach挨个打印它们将会打印出4行,分别是1,2,3,4。如果我们想打印成2,3,4,5,或者说,每个元素先分别+1再打印,该如何操作呢? 答案很容易,只需要打印的时候转换一下就行

forEach(i -> println(i + 1));

这就够了,以上就是咱这个新式流机制的基本原理。为了更严谨的说明,这里我们需要引入一个流的定义,或者说接口

public interface Seq<T> {
    void forEach(Consumer<T> consumer);
}

Seq是我对Sequence的简写,意味着序列操作。这里值得注意的是,Java里的Iterable是天然实现了这个接口的。

回到之前的例子。如果我们有一个代表[1,2,3,4]的oldSeq,现在想要得到一个代表[2,3,4,5]的新的newSeq ,根据上述的转换方式,利用Java的匿名类机制,可以很容易实现

Seq<Integer> newSeq = new Seq<Integer>() {
    @Override
    public void forEach(Consumer<Integer> consumer) {
        oldSeq.forEach(i -> consumer.accept(i + 1));
    }
};

以上代码的含义为,对于任何一个操作consumer,都是在原有的元素上+1后再操作,这个操作可以是打印,也可以是别的任何行为。进一步的,借用Java 8的lambda函数,我们可以将其更简洁的写为

Seq<Integer> newSeq = c -> oldSeq.forEach(i -> c.accept(i + 1));

至此,聪明的你可能会发现,我们基于平平无奇的forEach接口,推导实现出了第一个具有里程碑意义的函数式接口,那就是伟大的map! 于是我们有了

public interface Seq<T> {
    void forEach(Consumer<T> consumer);

    default <E> Seq<E> map(Function<T, E> function) {
        return c -> forEach(t -> c.accept(function.apply(t)));
    }
}

顺理成章,我们还可以依样画葫芦,写出filter的实现

public interface Seq<T> {
    void forEach(Consumer<T> consumer);

    default <E> Seq<E> map(Function<T, E> function) {
        return c -> forEach(t -> c.accept(function.apply(t)));
    }

    default Seq<T> filter(Predicate<T> predicate) {
        return c -> forEach(t -> {
            if (predicate.test(t)) {
                c.accept(t);
            }
        });
    }
}

到这里,我搞出来的这个新的流式API的定义就算讲清楚了。它的后续的一切强大接口和有趣功能,都是基于这样一个简单的forEach 而衍生出来的。

public interface Seq<T> {
    void forEach(Consumer<T> consumer);
}

这个API是一切的基础,是梦开始的地方。它将带领大家一步步渐入佳境,沿途把橄榄枝抛向几乎所有主流非主流语言,并贯穿整个专栏始终。

流的下标与中断,实现take/drop/takeWhile/dropWhile

回顾上一篇实现出来的map/filter接口,我们可以顺着相同的思路继续实现新的功能。当然,一切还是要从初始的forEach接口出发

public interface Seq<T> {
    void forEach(Consumer<T> consumer);
}

功能实现

forEachIndexed

有的时候,我们在处理流时,需要顺带获取元素的下标。要实现这一点,只需要在forEach的同时加上一个计数器即可。而要注意的是,由于Java不允许在闭包里修改变量,所以需要将变量包在数组里,于是有

interface IndexedConsumer<T> {
    void accept(int i, T t);
}

default int forEachIndexed(IndexedConsumer<T> consumer) {
    int[] index = new int[]{0};
    forEach(t -> consumer.accept(index[0]++, t));
    return index[0];
}

take

take意思是获取流中的前n个元素,后面的全部丢弃。它等价于Java Stream里的limit和Kotlin Sequence里的take。它和map/filter最核心的区别是它通常不需要遍历完整个流,在达到数量限制后就将停止遍历,后续的所有元素全部丢弃。

Java和Kotlin都是基于Iterator实现的流,因而可以很自然的实现该功能,只需要在达到上限后让iterator.hasNext()=false即可,整个流自然结束。而对于我们这个基于forEach的机制,它可不像Iterator那样,每一个元素都清清楚楚,始终清楚了解自己迭代到哪儿了,自然也就不方便中断结束。对此,我们必须借助异常捕获机制来跳出当前运行。

通常情况下,脑子正常的程序员不会利用异常抛出来控制程序调度,这种方式不仅显得有点蠢,最大的弊端是抛出异常的开销很大。除了异常对象本身实例化的开销外,更严重的是它们会自动捕获当前的调用栈,而捕获调用栈的开销可比异常对象的开销大了太多,不管是时间上还是空间上。不过好在幸运的是我们有办法避免这种开销。

首先我们定义一个专用于流中断的异常,并将它做成全局单例。这一来不管程序怎么运行,它的开销最多只有一次。而后,在此基础上,对于条件允许的语言,可以取消掉它初始化时对调用栈的捕获。以Java为例,全局单例和取消调用栈的代码如下。

class StopException extends RuntimeException {
    static final StopException INSTANCE = new StopException() {
        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    };
}

为此我们再额外添加一个stop函数,方便后边使用。并加入一个带catch功能的tillStop,用以封装装原本的forEach,顺带把forEachIndexed也改造下,也加入这个封装。然后,我们就可以实现take了。

static <E> E stop() throws StopException {
    throw StopException.INSTANCE;
}

default void tillStop(Consumer<T> consumer) {
    try {
        forEach(consumer);
    } catch (StopException ignore) {}
}

default int forEachIndexed(IndexedConsumer<T> consumer) {
    int[] index = new int[]{0};
    tillStop(t -> consumer.accept(index[0]++, t));
    return index[0];
}

default Seq<T> take(int n) {
    return c -> forEachIndexed((i, t) -> {
        if (i < n) {
            c.accept(t);
        } else {
            stop();
        }
    });
}

大概流程就是,当判断当前元素的下标i小于n时,执行当前consumer,否则直接中止流。

drop

drop与take含义相反,丢掉前面n个元素,保留剩下的全部。它不需要中断流,因而实现更简单。

default Seq<T> drop(int n) {
    return c -> forEachIndexed((i, t) -> {
        if (i >= n) {
            c.accept(t);
        }
    });
}

takeWhile

takeWhile与take的主要区别是它不限制保留的元素个数,而是只要条件满足的都保留下来,直到遇见不满足的情况,直接退出遍历。

default Seq<T> takeWhile(Predicate<T> predicate) {
    return c -> tillStop(t -> {
        if (predicate.test(t)) {
            c.accept(t);
        } else {
            stop();
        }
    });
}

dropWhile

dropwhile与takewhile类似,丢弃所有满足条件的元素,直到遇到不满足的。哪怕后面再次遇到满足条件的元素,也不会再丢弃了。

default Seq<T> dropWhile(Predicate<T> predicate) {
    return c -> {
        boolean[] done = new boolean[]{false};
        forEach(t -> {
            if (done[0]) {
                c.accept(t);
            } else if (!predicate.test(t)) {
                c.accept(t);
                done[0] = true;
            }
        });
    };
}

它的实现稍微复杂一点,需要一个状态done来记录是否已经找到第一个不满足条件的元素,从而关闭丢弃功能。

流的组合,实现flatMap/zip

flatMap 当一个流的每个元素都能映射为新的流时,有时候需要将它拍平,这个操作就叫做flatMap

default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {
    return c -> forEach(t -> function.apply(t).forEach(c));
}

zip 与Python里的zip一样,有时候需要将两个流进行合并。然而基于forEach的流实在是没法两两合并,好在对于iterable还是可以的

default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {
    return c -> {
        Iterator<E> iterator = iterable.iterator();
        tillStop(t -> {
            if (iterator.hasNext()) {
                c.accept(function.apply(t, iterator.next()));
            } else {
                stop();
            }
        });
    };
}

default <E> Seq<Pair<T, E>> zip(Iterable<E> iterable) {
    return zip(iterable, Pair::new);
}

这里的Pair 就是一个普通的二元,可以很容易的自己实现,我就不赘述了。除了二元值,三元也是类似的

default <B, C> Seq<Triple<T, B, C>> zip(Iterable<B> bs, Iterable<C> cs) {
    return c -> {
        Iterator<B> bi = bs.iterator();
        Iterator<C> ci = cs.iterator();
        tillStop(t -> {
            if (bi.hasNext() && ci.hasNext()) {
                c.accept(new Triple<>(t, bi.next(), ci.next()));
            } else {
                stop();
            }
        });
    };
} 

流的收集,基于fold/reduce实现toList/toSet/toMap/join

流的收集是流的高频使用场景,它的含义是将流里的元素挨个收集到某个容器中去。在Java里的容器通常就是List, Set, Map这三种。除此之外,分组操作groupBy也是一种收集,同样经常使用,我下期会讨论。从广义上讲,流的收集是流的终端操作的其中一类,另外还有流的归一,以及不带返回值的纯粹消费,后面都会讲。 基于流本身的函数式特性,它的收集自然也可以通过一些标准的接口实现,首先要介绍的就是fold。fold是函数式语言里高频出镜的一个概念,它需要你提供一个初始值,由这个初始值和流里的元素依次作用,产生一个新的值,当最终收集完成后再返回这个值。对于一些没有for循环的纯函数式语言来说,它是极有必要的,不然你很难实现哪怕像是列表求和这种非常基础的操作。fold的接口定义如下

<E> E fold(E init, BiFunction<E, T, E> function);

其中T是流里元素的类型,E 则是目标返回值的类型,只要用户能提供出类型为E的初始值以及(E, T) -> E的函数,就能完成流的收集。所以从这个思路来看,求和其实也是一种收集,相当于把所有元素收集到一个数字里。 为了实现fold,我们首先按照上述思路,写出以下Java代码

default <E> E fold(E init, BiFunction<E, T, E> function) {
    E acc = init;
    forEach(t -> acc = function.apply(acc, t));
    return acc;
}

可惜的是,Java里的lambda表达式不允许传入非final的可变变量, 为了让这段代码能够正常编译。我们需要将这个累加器acc塞到一个实例里,这样就能通过间接引用的方式每次修改了。这个实例我们不妨称之为Mutable

public class Mutable<T> {
    public T it;

    public Mutable(T it) {
        this.it = it;
    }
}

有了Mutable之后,我们就能对fold的初版代码进行改造, 于是有

default <E> E fold(E init, BiFunction<E, T, E> function) {
    Mutable<E> acc = new Mutable<>(init);
    forEach(t -> acc.it = function.apply(acc.it, t));
    return acc.it;
}

Search

    微信好友

    博士的沙漏

    Table of Contents