流式编程思考
各语言流式API的现状
支持流式API的语言其实不多,比较典型的代表是Java的Stream与Kotlin的Sequence(其实是我对这俩最熟。示例代码如下 Java流的示例
Stream.of(1, 2, 3, 4, 5)
.limit(4)
.map(i -> i * 2)
.filter(i -> i % 3 > 0)
.map(Object::toString)
.collect(Collectors.joining(","));
以上代码流程为
- 首先生成一个1~5的流 -> 1, 2, 3, 4, 5
- 截取前4个 -> 1, 2, 3, 4
- 每个元素映射为原来的2倍 -> 2, 4, 6, 8
- 过滤掉能被3整除的数 -> 2, 4, 8
- 每个元素转为String -> “2”, “4”, “8”
- 合并所有元素 -> “2,4,8”
Kotlin的流式API与Java类似,只是部分名称稍有不同
sequenceOf(1, 2, 3, 4, 5)
.take(4)
.map { it * 2 }
.filter { it % 3 > 0 }
.map { it.toString() }
.joinToString(",")
事实上,Java对流的实现依赖的是Spliterator,是一种特殊的Iterator,可以提供并发的额外好处。相比之下,Kotlin的实现是直接基于Iterator,要简单优雅很多。 为方便演示,后续的示例我主要还是用Java或者一些伪代码展示。
不妨换个思路,从forEach入手
基本上大多数支持了闭包的语言,都会对其集合类型list或者array提供一个for循环,更高级一点的,还有一个大家通常称之为forEach的函数式接口。该接口接受一个consumer作为入参:对于集合中的每一个元素,都进行某种特定处理。即
a.forEach(x -> println(x))
等价于
for x in a
println(x)
现在不妨假设我们有[1,2,3,4]这样一个列表,使用forEach挨个打印它们将会打印出4行,分别是1,2,3,4。如果我们想打印成2,3,4,5,或者说,每个元素先分别+1再打印,该如何操作呢? 答案很容易,只需要打印的时候转换一下就行
forEach(i -> println(i + 1));
这就够了,以上就是咱这个新式流机制的基本原理。为了更严谨的说明,这里我们需要引入一个流的定义,或者说接口
public interface Seq<T> {
void forEach(Consumer<T> consumer);
}
Seq是我对Sequence的简写,意味着序列操作。这里值得注意的是,Java里的Iterable是天然实现了这个接口的。
回到之前的例子。如果我们有一个代表[1,2,3,4]的oldSeq,现在想要得到一个代表[2,3,4,5]的新的newSeq ,根据上述的转换方式,利用Java的匿名类机制,可以很容易实现
Seq<Integer> newSeq = new Seq<Integer>() {
@Override
public void forEach(Consumer<Integer> consumer) {
oldSeq.forEach(i -> consumer.accept(i + 1));
}
};
以上代码的含义为,对于任何一个操作consumer,都是在原有的元素上+1后再操作,这个操作可以是打印,也可以是别的任何行为。进一步的,借用Java 8的lambda函数,我们可以将其更简洁的写为
Seq<Integer> newSeq = c -> oldSeq.forEach(i -> c.accept(i + 1));
至此,聪明的你可能会发现,我们基于平平无奇的forEach接口,推导实现出了第一个具有里程碑意义的函数式接口,那就是伟大的map! 于是我们有了
public interface Seq<T> {
void forEach(Consumer<T> consumer);
default <E> Seq<E> map(Function<T, E> function) {
return c -> forEach(t -> c.accept(function.apply(t)));
}
}
顺理成章,我们还可以依样画葫芦,写出filter的实现
public interface Seq<T> {
void forEach(Consumer<T> consumer);
default <E> Seq<E> map(Function<T, E> function) {
return c -> forEach(t -> c.accept(function.apply(t)));
}
default Seq<T> filter(Predicate<T> predicate) {
return c -> forEach(t -> {
if (predicate.test(t)) {
c.accept(t);
}
});
}
}
到这里,我搞出来的这个新的流式API的定义就算讲清楚了。它的后续的一切强大接口和有趣功能,都是基于这样一个简单的forEach 而衍生出来的。
public interface Seq<T> {
void forEach(Consumer<T> consumer);
}
这个API是一切的基础,是梦开始的地方。它将带领大家一步步渐入佳境,沿途把橄榄枝抛向几乎所有主流非主流语言,并贯穿整个专栏始终。
流的下标与中断,实现take/drop/takeWhile/dropWhile
回顾上一篇实现出来的map/filter接口,我们可以顺着相同的思路继续实现新的功能。当然,一切还是要从初始的forEach接口出发
public interface Seq<T> {
void forEach(Consumer<T> consumer);
}
功能实现
forEachIndexed
有的时候,我们在处理流时,需要顺带获取元素的下标。要实现这一点,只需要在forEach的同时加上一个计数器即可。而要注意的是,由于Java不允许在闭包里修改变量,所以需要将变量包在数组里,于是有
interface IndexedConsumer<T> {
void accept(int i, T t);
}
default int forEachIndexed(IndexedConsumer<T> consumer) {
int[] index = new int[]{0};
forEach(t -> consumer.accept(index[0]++, t));
return index[0];
}
take
take意思是获取流中的前n个元素,后面的全部丢弃。它等价于Java Stream里的limit和Kotlin Sequence里的take。它和map/filter最核心的区别是它通常不需要遍历完整个流,在达到数量限制后就将停止遍历,后续的所有元素全部丢弃。
Java和Kotlin都是基于Iterator实现的流,因而可以很自然的实现该功能,只需要在达到上限后让iterator.hasNext()=false即可,整个流自然结束。而对于我们这个基于forEach的机制,它可不像Iterator那样,每一个元素都清清楚楚,始终清楚了解自己迭代到哪儿了,自然也就不方便中断结束。对此,我们必须借助异常捕获机制来跳出当前运行。
通常情况下,脑子正常的程序员不会利用异常抛出来控制程序调度,这种方式不仅显得有点蠢,最大的弊端是抛出异常的开销很大。除了异常对象本身实例化的开销外,更严重的是它们会自动捕获当前的调用栈,而捕获调用栈的开销可比异常对象的开销大了太多,不管是时间上还是空间上。不过好在幸运的是我们有办法避免这种开销。
首先我们定义一个专用于流中断的异常,并将它做成全局单例。这一来不管程序怎么运行,它的开销最多只有一次。而后,在此基础上,对于条件允许的语言,可以取消掉它初始化时对调用栈的捕获。以Java为例,全局单例和取消调用栈的代码如下。
class StopException extends RuntimeException {
static final StopException INSTANCE = new StopException() {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
};
}
为此我们再额外添加一个stop函数,方便后边使用。并加入一个带catch功能的tillStop,用以封装装原本的forEach,顺带把forEachIndexed也改造下,也加入这个封装。然后,我们就可以实现take了。
static <E> E stop() throws StopException {
throw StopException.INSTANCE;
}
default void tillStop(Consumer<T> consumer) {
try {
forEach(consumer);
} catch (StopException ignore) {}
}
default int forEachIndexed(IndexedConsumer<T> consumer) {
int[] index = new int[]{0};
tillStop(t -> consumer.accept(index[0]++, t));
return index[0];
}
default Seq<T> take(int n) {
return c -> forEachIndexed((i, t) -> {
if (i < n) {
c.accept(t);
} else {
stop();
}
});
}
大概流程就是,当判断当前元素的下标i小于n时,执行当前consumer,否则直接中止流。
drop
drop与take含义相反,丢掉前面n个元素,保留剩下的全部。它不需要中断流,因而实现更简单。
default Seq<T> drop(int n) {
return c -> forEachIndexed((i, t) -> {
if (i >= n) {
c.accept(t);
}
});
}
takeWhile
takeWhile与take的主要区别是它不限制保留的元素个数,而是只要条件满足的都保留下来,直到遇见不满足的情况,直接退出遍历。
default Seq<T> takeWhile(Predicate<T> predicate) {
return c -> tillStop(t -> {
if (predicate.test(t)) {
c.accept(t);
} else {
stop();
}
});
}
dropWhile
dropwhile与takewhile类似,丢弃所有满足条件的元素,直到遇到不满足的。哪怕后面再次遇到满足条件的元素,也不会再丢弃了。
default Seq<T> dropWhile(Predicate<T> predicate) {
return c -> {
boolean[] done = new boolean[]{false};
forEach(t -> {
if (done[0]) {
c.accept(t);
} else if (!predicate.test(t)) {
c.accept(t);
done[0] = true;
}
});
};
}
它的实现稍微复杂一点,需要一个状态done来记录是否已经找到第一个不满足条件的元素,从而关闭丢弃功能。
流的组合,实现flatMap/zip
flatMap 当一个流的每个元素都能映射为新的流时,有时候需要将它拍平,这个操作就叫做flatMap
default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {
return c -> forEach(t -> function.apply(t).forEach(c));
}
zip 与Python里的zip一样,有时候需要将两个流进行合并。然而基于forEach的流实在是没法两两合并,好在对于iterable还是可以的
default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {
return c -> {
Iterator<E> iterator = iterable.iterator();
tillStop(t -> {
if (iterator.hasNext()) {
c.accept(function.apply(t, iterator.next()));
} else {
stop();
}
});
};
}
default <E> Seq<Pair<T, E>> zip(Iterable<E> iterable) {
return zip(iterable, Pair::new);
}
这里的Pair 就是一个普通的二元,可以很容易的自己实现,我就不赘述了。除了二元值,三元也是类似的
default <B, C> Seq<Triple<T, B, C>> zip(Iterable<B> bs, Iterable<C> cs) {
return c -> {
Iterator<B> bi = bs.iterator();
Iterator<C> ci = cs.iterator();
tillStop(t -> {
if (bi.hasNext() && ci.hasNext()) {
c.accept(new Triple<>(t, bi.next(), ci.next()));
} else {
stop();
}
});
};
}
流的收集,基于fold/reduce实现toList/toSet/toMap/join
流的收集是流的高频使用场景,它的含义是将流里的元素挨个收集到某个容器中去。在Java里的容器通常就是List, Set, Map这三种。除此之外,分组操作groupBy也是一种收集,同样经常使用,我下期会讨论。从广义上讲,流的收集是流的终端操作的其中一类,另外还有流的归一,以及不带返回值的纯粹消费,后面都会讲。 基于流本身的函数式特性,它的收集自然也可以通过一些标准的接口实现,首先要介绍的就是fold。fold是函数式语言里高频出镜的一个概念,它需要你提供一个初始值,由这个初始值和流里的元素依次作用,产生一个新的值,当最终收集完成后再返回这个值。对于一些没有for循环的纯函数式语言来说,它是极有必要的,不然你很难实现哪怕像是列表求和这种非常基础的操作。fold的接口定义如下
<E> E fold(E init, BiFunction<E, T, E> function);
其中T是流里元素的类型,E 则是目标返回值的类型,只要用户能提供出类型为E的初始值以及(E, T) -> E的函数,就能完成流的收集。所以从这个思路来看,求和其实也是一种收集,相当于把所有元素收集到一个数字里。 为了实现fold,我们首先按照上述思路,写出以下Java代码
default <E> E fold(E init, BiFunction<E, T, E> function) {
E acc = init;
forEach(t -> acc = function.apply(acc, t));
return acc;
}
可惜的是,Java里的lambda表达式不允许传入非final的可变变量, 为了让这段代码能够正常编译。我们需要将这个累加器acc塞到一个实例里,这样就能通过间接引用的方式每次修改了。这个实例我们不妨称之为Mutable
public class Mutable<T> {
public T it;
public Mutable(T it) {
this.it = it;
}
}
有了Mutable之后,我们就能对fold的初版代码进行改造, 于是有
default <E> E fold(E init, BiFunction<E, T, E> function) {
Mutable<E> acc = new Mutable<>(init);
forEach(t -> acc.it = function.apply(acc.it, t));
return acc.it;
}