Flink时间和窗口

2021/04/05 Flink

Flink时间和窗口

Flink是一个流式计算框架,在流处理应用中,数据是连续不断的;但有时候的业务需求,需要我们在在流的基础上做一定的聚合处理,比如两个元素计算一次结果,或者五秒钟计算一次结果…应用到实际中的场景则为过去一分钟用户点击量、过去一小时订单成交额度…等等

故此,Flink引入了窗口这个概念,窗口(Window)是Flink程序中算子之一,是处理无限的核心。窗口将流分成有限大小的“存储块”,我们可以在其上应用计算。

如同生活中所见的窗口一样,我们可以将Flink中的窗口算子理解为一种数据通道,比如我们规定,这个窗子打开后,向里边丢小球,每当窗口另一边有了十个小球,便做什么事情(比如计算所有小球重量,后者比较十个小球中最大或最小的数据…)

如果按照上方说的示例(过去一分钟用户点击量),我们则须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

时间概念类型

对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为三种时间概念,分别为事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)。 数据从终端产生,或者从系统中产生的过程中生成的时间为事件生成时间,当数据经过消息中间件传入到Flink系统中,在DataSource中接入的时候会生成事件接入时间,当数据在Flink系统中通过各个算子实例执行转换操作的过程中,算子实例所在系统的时间为数据处理时间。Flink已经支持这三种类型时间概念,用户能够根据需要选择时间类型作为对流式数据的依据,这种情况极大地增强了对事件数据处理的灵活性和准确性。

事件时间(Event Time)

事件时间(Event Time)是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中,时间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。事件数据具有不变的事件时间属性,该时间自事件元素产生就不会改变。通常情况下可以在Flink系统中指定事件时间属性或者设定时间提取器来提取事件时间。

所有进入到Flink流式系统处理的事件,其时间都是在外部系统中产生,经过网络进入到Flink系统内处理的,在理论情况下(所有系统都具有相同系统时钟),事件时间对应的时间戳一定会早于在Flink系统中处理的时间戳,但在实际情况中往往会出现数据记录乱序、延迟到达等问题。基于EventTime的时间概念,数据处理过程依赖于数据本身产生的时间,而不是Flink系统中Operator所在主机节点的系统时间,这样能够借助于事件产生时的时间信息来还原事件的先后关系。

接入时间(Ingestion Time)

接入时间(Ingestion Time)是数据进入Flink系统的时间,Ingestion Time依赖于Source Operator所在主机的系统时钟。Ingestion Time介于Event Time和Process Time之间,相对于Process Time,Ingestion Time生成的代价相对较高,Ingestion Time具有一定的可预见性,主要因为Ingestion Time在数据接入过程生成后,时间戳就不再发生变化,和后续数据处理Operator所在机器的时钟没有关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不准确的问题。但是需要注意的是相比于Event Time,Ingestion Time不能处理乱序事件,所以也就不用生成对应的Watermarks。

处理时间(Processing Time)

处理时间(Processing Time)是指数据在操作算子计算过程中获取到的所在主机时间。当用户选择使用Processing Time时,所有和时间相关的计算算子,例如Windows计算,在当前的任务中所有的算子将直接使用其所在主机的系统时间。Processing Time是Flink系统中最简单的一种时间概念,基于Processing Time时间概念,Flink的程序性能相对较高,延时也相对较低,对接入到系统中的数据时间相关的计算完全交给算子内部决定,时间窗口计算依赖的时间都是在具体算子运行的过程中产生,不需要做任何时间上的对比和协调。但Processing Time时间概念虽然在性能和易用性的角度上具有优势,但考虑到对数据乱序处理的情况,Processing Time就不是最优的选择。同时在分布式系统中,数据本身不乱序,但每台机器的时间如果不同步,也可能导致数据处理过程中数据乱序的问题,从而影响计算结果。总之,Processing Time概念适用于时间计算精度要求不是特别高的计算场景,例如统计某些延时非常高的日志数据等。

时间概念指定

1.12版本默认是根据EventTime,之前版本默认是采用ProcessingTime。渐渐的,摄入时间(IngestionTime)越来越不推荐使用了,实际上就生产而言,我们更多选择是根据业务采用不同的窗口分配器,选择根据事件时间还是处理时间进行计算。 如果用户选择使用Event Time或者Ingestion Time概念,则需要在创建的StreamExecutionEnvironment中调用setStream-TimeCharacteristic()方法设定系统的时间概念,如下代码使用TimeCharacteristic.EventTime作为系统的时间概念,这样对当前的StreamExecutionEnvironment会全局生效。对应的,如果使用Ingestion Time概念,则通过传入TimeCharacteristic. IngestionTime参数指定。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
//在系统中指定EventTime概念
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

EventTime和Watermark(水位线)

通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序到达或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,比如基于事件时间的Window创建后,具体该如何确定属于该Window的数据元素已经全部到达。如果确定全部到达,就可以对Window的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。Flink会将用读取进入系统的最新事件时间减去固定的时间间隔作为Watermark,该时间间隔为用户外部配置的支持最大延迟到达的时间长度,也就是说理论上认为不会有事件超过该间隔到达,否则就认为是迟到事件或异常事件。

简单来讲,当事件接入到Flink系统时,会在Sources Operator中根据当前最新事件时间产生Watermarks时间戳,记为X,进入到Flink系统中的数据事件时间,记为Y,如果Y < X,则代表Watermark X时间戳之前的所有事件均已到达,同时Window的End Time大于Watermark,则触发窗口计算结果并输出。从另一个角度讲,如果想触发对Window内的数据元素的计算,就必须保证对所有进入到窗口的数据元素满足其事件时间Y >= X,否则窗口会继续等待Watermark大于窗口结束时间的条件满足。可以看出当有了Watermarks机制后,对基于事件时间的流数据处理会变得特别灵活,可以有效地处理乱序事件的问题,保证数据在流式统计中的结果的正确性。

watermark基本概念

流处理中,事件产生的时间在实际生产环境中,由于网络原因,数据实际到达数据中心的顺序是乱序的,如果只根据eventTime进行window计算,那么就无法确定所有的数据都到达了,为了解决这个问题引入了Watermark. 就是为了尽可能数据准确,但是又不至于一直等下去,就引入了Watermark。

Watermark的特点

  • Watermark是一种衡量Event Time进展的机制。
  • Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。
  • 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。
  • Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime

t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。

顺序事件中的Watermarks

如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直,也就是理想状态下的水位线。当Watermark时间大于Windows结束时间就会触发对Windows的数据计算,并创建另一个新的Windows将事件时间Y < X的数据元素分配到新的Window中。事件按照其原本的顺序进入系统中,Watermark跟随着事件时间之后生成,可以看出Watermarks其实只是对Stream简单地进行周期性地标记,并没有特别大的意义,也就是说在顺序事件的数据处理过程中,Watermarks并不能发挥太大的价值,反而会因为设定了超期时间而导致延迟输出计算结果。

乱序事件中的Watermarks

现实情况下数据元素往往并不是按照其产生顺序接入到Flink系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用Watermarks来应对。事件11和事件17进入到系统中,Flink系统根据设定的延时值分别计算出Watermark W(11)和W(17),这两个Watermark到达一个Operator中后,便立即调整算子基于事件时间的虚拟时钟与当前的Watermark的值匹配,然后再触发相应的计算以及输出操作。

并行数据流中的Watermarks

Watermark在Source Operator中生成,并且在每个Source Operator的子Task中都会独立生成Watermark。在Source Operator的子任务中生成后就会更新该Task的Watermark,且会逐步更新下游算子中的Watermark水位线,随后一致保持在该并发之中,直到下一次Watermarks的生成,并对前面的Watermarks进行覆盖。W(17)水位线已经将Source算子和Map算子的子任务时钟的时间全部更新为值17,并且一直会随着事件向后移动更新下游算子中的事件时间。如果多个Watermark同时更新一个算子Task的当前事件时间,Flink会选择最小的水位线来更新,当一个Window算子Task中水位线大于了Window结束时间,就会立即触发窗口计算。

指定Timestamps与生成Watermarks

如果使用Event Time时间概念处理流式数据,除了在StreamExecationEviromment中指定TimeCharacteristic外,还需要在Flink程序中指定Event Time时间戳在数据中的字段信息,在Flink程序运行过程中会通过指定字段抽取出对应的事件时间,该过程叫作Timestamps Assigning。简单来讲,就是告诉系统需要用哪个字段作为事件时间的数据来源。另外Timestamps指定完毕后,下面就需要制定创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略。目前Flink支持两种方式指定Timestamps和生成Watermarks,一种方式在DataStream Source算子接口的Source Function中定义,另外一种方式是通过自定义Timestamp Assigner和Watermark Generator生成。

在Source Function中直接定义Timestamps和Watermarks

在DataStream Source算子中指定EventTime Timestamps,也就是说在数据进入到Flink系统中就直接指定分配EventTime和Watermark。用户需要复写SourceFunciton接口中run()方法实现数据生成逻辑,同时需要调用SourceContext的collectWithTimestamp()方法生成EventTime时间戳,调用emitWatermark()方法生成Watermarks。在addSource中通过匿名类实现SourceFunction接口,将本地集合数据读取到系统中,并且分别调用collectWithTimestamp和emitWatermark方法指定EventTime和生成Watermark。

//创建数组数据集
val input = List(("a", 1L, 1), ("b", 1L, 1), ("b", 3L, 1))
//添加DataSource数据源,实例化SourceFunction接口
val source: DataStream[(String, Long, Int)] = env.addSource(
  new SourceFunction[(String, Long, Int)]() {
    //复写run方法,调用SourceContext接口
    override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
      input.foreach(value => {
        //调用collectWithTimestamp增加Event Time抽取
        ctx.collectWithTimestamp(value, value._2)
        //调用emitWatermark,创建Watermark,最大延时设定为1
        ctx.emitWatermark(new Watermark(value._2 - 1))
      })
      //设定默认Watermark
      ctx.emitWatermark(new Watermark(Long.MaxValue))
    }
    override def cancel(): Unit = {}
  })

通过Flink自带的Timestamp Assigner指定Timestamp和生成Watermark

如果用户使用了Flink已经定义的外部数据源连接器,就不能再实现SourceFuncton接口来生成流式数据以及相应的Event Time和Watermark,这种情况下就需要借助Timestamp Assigner来管理数据流中的Timestamp元素和Watermark。Timestamp Assigner一般是跟在Data Source算子后面指定,也可以在后续的算子中指定,只要保证Timestamp Assigner在第一个时间相关的Operator之前即可。如果用户已经在SourceFunciton中定义Timestamps和Watermarks的生成逻辑,同时又使用了Timestamp Assigner,此时Assigner会覆盖Source Function中定义的逻辑。

Flink将Watermarks根据生成形式分为两种类型,分别是Periodic Watermarks和后者。Periodic Watermarks是根据设定时间间隔周期性地生成Watermarks,Punctuated Watermarks是根据接入数据的数量生成,例如数据流中特定数据元素的数量满足条件后触发生成Watermark。在Flink中两种生成Watermarks的逻辑分别借助于AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口定义。

在Flink系统中实现了两种Periodic Watermark Assigner,一种为升序模式,会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的Watermark,这种Timestamp Assigner比较适合于事件按顺序生成,没有乱序事件的情况;另外一种是通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍迟到多长时间内的数据到达系统。

使用Ascending Timestamp Assigner指定Timestamps和Watermarks 如下代码所示,通过调用DataStream API中的assignAscendingTimestamps来指定Timestamp字段,不需要显示地指定Watermark,因为已经在系统中默认使用Timestamp创建Watermark。

//指定系统时间概念为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input = env.fromCollection(List(("a", 1L, 1), ("b", 1L, 1), ("b", 3L, 1)))
//使用系统默认Ascending分配时间信息和Watermark
val withTimestampsAndWatermarks = input.assignAscendingTimestamps(t => t._3)
//对数据集进行窗口运算
val result = withTimestampsAndWatermarks.keyBy(0).timeWindow(Time.seconds(10)).sum("_2")

使用固定时延间隔的Timestamp Assigner指定Timestamps和Watermarks 如下代码所示,通过创建BoundedOutOfOrdernessTimestampExtractor实现类来定义Timestamp Assigner,其中第一个参数Time.seconds(10)代表了最长的时延为10s,第二个参数为extractTimestamp抽取逻辑。在代码中选择使用input数据集中第三个元素作为Event Timestamp,其中Watermarks的创建是根据Timestamp减去固定时间长度生成,如果当前数据中的时间大于Watermarks的时间,则会被认为是迟到事件,具体迟到事件处理策略可以参考后续章节。

val withTimestampsAndWatermarks = input.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[(String,Long,Int)](Time.seconds(10)){
//定义抽取EventTime Timestamp逻辑
  override def extractTimestamp(t: (String, Long, Int)): Long = t._2
})

自定义Timestamp Assigner和Watermark Generator

前面使用Flink系统中已经定义好的两种Timestamp Assigner,用户也可以自定义实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两个接口来分别生成Periodic Watermarks和Punctuated Watermarks。

Periodic Watermarks自定义生成

Periodic Watermarks根据固定的时间间隔,周期性地在Flink系统中分配Timestamps和生成Watermarks,在定义和实现AssignerWithPeriodicWatermarks接口之前,需要先在ExecutionConfig中调用setAutoWatermarkInterval()方法设定Watermarks产生的时间周期。

ExecutionConfig.setAutoWatermarkInterval(...)

通过创建Class实现AssignerWithPeriodicWatermarks接口,复写extractTimestamp和getCurrentWatermark两个方法,其中extractTimestamp定义了抽取TimeStamps的逻辑,getCurrentWatermark定义了生成Watermark的逻辑。其中getCurrentWatermark生成Watermark依赖于currentMaxTimestamp,getCurrentWatermark()方法每次都会被调用时,如果新产生的Watermark比现在的大,就会覆盖掉现有的Watermark,从而实现对Watermarks数据的更新。

通过实现AssignerWithPeriodicWatermarks接口自定义生成Watermark

class PeriodicAssigner extends 
      AssignerWithPeriodicWatermarks[(String,Long,Int)] {
  val maxOutOfOrderness = 1000L // 1秒时延设定,表示在1秒以内的数据延时有效,超过一秒的数据被认定为迟到事件
  var currentMaxTimestamp: Long = _
  override def extractTimestamp(event: (String,Long,Int), previousEventTimestamp: Long): Long = {
    //复写currentTimestamp方法,获取当前事件时间
    val currentTimestamp = event._2
    //对比当前的事件时间和历史最大事件时间,将最新的时间赋值给currentMaxTimestamp变量
    currentMaxTimestamp = max(currentTimestamp, currentMaxTimestamp)
    currentTimestamp
  }
    //复写getCurrentWatermark方法,生成Watermark
  override def getCurrentWatermark(): Watermark = {
    // 根据最大事件时间减去最大的乱序时延长度,然后得到Watermark
    new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  }
}

Punctuated Watermarks自定义生成

除了根据时间周期生成Periodic Watermark,用户也可以根据某些特殊条件生成Punctuated Watermarks,例如判断某个数据元素的当前状态,如果接入事件中状态为0则触发生成Watermarks,如果状态不为0,则不触发生成Watermarks的逻辑。生成Punctuated Watermark的逻辑需要通过实现AssignerWithPunctuatedWatermarks接口定义,然后分别复写extractTimestamp方法和checkAndGetNextWatermark方法,完成抽取Event Time和生成Watermark逻辑的定义。 通过实现AssignerWithPunctuatedWatermarks接口自定义生成Watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String, 
Long, Int)] {
  //复写extractTimestamp方法,定义抽取Timestamp逻辑
  override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
    element._2
  }
  //复写checkAndGetNextWatermark方法,定义Watermark生成逻辑
  override def checkAndGetNextWatermark(lastElement: (String, Long, Int), extractedTimestamp: Long): Watermark = {
    //根据元素中第三位字段状态是否为0生成Watermark
    if (lastElement._3 == 0) new Watermark(extractedTimestamp) else null
  }
}

Flink窗口

Windows窗口计算

Windows计算是流式计算中非常常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,然后对数据进行相应的聚合运算,从而得到一定时间范围内的统计结果。例如统计最近5分钟内某网站的点击数,此时点击的数据在不断地产生,但是通过5分钟的窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合处理,得出最近5分钟的网站点击数。

Flink DataStream API将窗口抽象成独立的Operator,且在Flink DataStream API中已经內建了大多数窗口算子。如下代码展示了如何定义Keyed Windows算子,在每个窗口算子中包含了Windows Assigner、Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、Output Tag(输出标签)以及Windows Funciton等组成部分,其中Windows Assigner和Windows Funciton是所有窗口算子必须指定的属性,其余的属性都是根据实际情况选择指定。

stream.keyBy(...) // 是Keyed类型数据集
.window(...)  //指定窗口分配器类型
[.trigger(...)] //指定触发器类型(可选)
[.evictor(...)]    //指定evictor或者不指定(可选)
[.allowedLateness(...)]   //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)] //指定Output Lag(可选)
.reduce/aggregate/fold/apply()   //指定窗口计算函数
[.getSideOutput(...)]    //根据Tag输出数据(可选)
  • Windows Assigner:指定窗口的类型,定义如何将数据流分配到一个或多个窗口;
  • Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;
  • Evictor:用于数据剔除;
  • Lateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算;
  • Output Tag:标记输出标签,然后在通过getSideOutput将窗口中的数据根据标签输出;
  • Windows Funciton:定义窗口上数据处理的逻辑,例如对数据进行sum操作。

Windows Assigner

Keyed和Non-Keyed窗口

在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型(将数据集按照Key分区),对应的Windows Assigner也会有所不同。上游数据集如果是KeyedStream类型,则调用DataStream API的window()方法指定Windows Assigner,数据会根据Key在不同的Task实例中并行分别计算,最后得出针对每个Key统计的结果。如果是Non-Keyed类型,则调用 WindowsAll()方法来指定Windows Assigner,所有的数据都会在窗口算子中路由到一个Task中计算,并得到全局统计结果。

从业务层面讲,如果用户选择对Key进行分区,就能够将相同key的数据分配在同一个分区,例如统计同一个用户在五分钟内不同的登录IP地址数。如果用户没有根据指定Key,此时需要对窗口上的数据进行去全局统计计算,这种窗口被称为Global Windows,例如统计某一段时间内某网站所有的请求数。

如下代码所示,不同类型的DataStream调用不同的Windows Assigner指定方法。具体的Windows Assigner定义可参考后续章节。

val inputStream:DataStream[T] = ...;
//调用KeyBy创建KeyedStream,然后调用window方法指定Windows Assigner
inputStream.keyBy(input=> input.id).window(new MyWindowsAssigner())
//对于DataStream数据集,直接调用windowALL指定Windows Assigner
imputstream.windowAll(new MyAllWindowsAssigner())

Windows Assigner

Flink支持两种类型的窗口,一种是基于时间的窗口,窗口基于起始时间戳(闭区间)和终止时间戳(开区间)来决定窗口的大小,数据根据时间戳被分配到不同的窗口中完成计算。Flink使用TimeWindow类来获取窗口的起始时间和终止时间,以及该窗口允许进入的最新时间戳信息等元数据。另一种是基于数量的窗口,根据固定的数量定义窗口的大小,例如每5000条数据形成一个窗口,窗口中接入的数据依赖于数据接入到算子中的顺序,如果数据出现乱序情况,将导致窗口的计算结果不确定。在Flink中可以通过调用DataSteam API中的countWindows()来定义基于数量的窗口。接下来我们重点介绍基于时间窗口的使用,基于数量的窗口读者参考官网资料。

在Flink流式计算中,通过Windows Assigner将接入数据分配到不同的窗口,根据Windows Assigner数据分配方式的不同将Windows分为4大类,分别是滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)和全局窗口(Global Windows)。并且这些Windows Assigner已经在Flink中实现,用户调用DataStream API的windows或windowsAll方法来指定Windows Assigner即可。

滚动窗口(Tumbling Windows)

滚动窗口是根据固定时间或大小进行切分,且窗口和窗口之间的元素互不重叠。这种类型的窗口的最大特点是比较简单,但可能会导致某些有前后关系的数据计算结果不正确,而对于按照固定大小和周期统计某一指标的这种类型的窗口计算就比较适合,同时实现起来也比较方便。

DataStream API中提供了基于Event Time和Process Time两种时间类型的Tumbling窗口,对应的Assigner分别为TumblingEventTimeWindows和TumblingProcessTimeWindows。调用DataStream API的Window方法来指定相应的Assigner,并使用每种Assigner的of()方法来定义窗口的大小,其中时间单位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同时间单位的组合。

定义Event Time和Process Time类型的滚动窗口,窗口时间按照10s进行切分,窗口的时间是[1:00:00.000-1:00:09.999]到[1:00:10.000-1:00:19.999]的等固定时间范围。

val inputStream:DataStream[T] = ...
//定义Event Time Tumbling Windows
val tumblingEventTimeWindows = inputStream
.keyBy(_.id)
//通过使用TumblingEventTimeWindows定义Event Time 滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(...)//定义窗口函数

//定义Process Time Tumbling Windows
val tumblingProcessingTimeWindows = inputStream
  .keyBy(_.id)
//通过使用TumblingProcessTimeWindows定义Event Time 滚动窗口
.window(TumblingProcessTimeWindows.of(Time.seconds(10)))
.process(...)//定义窗口函数

上述对滚动窗口定义相对比较常规,用户还可以直接使用DataStream API中timeWindow()快捷方法、定义TumblingEventTimeWindows或TumblingProcessTimeWindows,时间的类型根据用户事先设定的时间概念确定。如下代码使用timeWindow方法来定义滚动窗口:

val inputStream:DataStream[T] = ...;
inputStream.keyBy(_.id)
//通过使用timeWindow方式定义滚动窗口,窗口时间类型根据time characteristic确定
.timeWindow(Time.seconds(1))
.process(...)//定义窗口函数

默认窗口时间的时区是UTC-0,因此UTC-0以外的其他地区均需要通过设定时间偏移量调整时区,在国内需要指定Time.hours(-8)的偏移量。

滑动窗口

滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础之上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。当Windows size固定之后,窗口并不像滚动窗口按照Windows Size向前移动,而是根据设定的Slide Time向前滑动。窗口之间的数据重叠大小根据Windows size和Slide time决定,当Slide time小于Windows size便会发生窗口重叠,Slide size大于Windows size就会出现窗口不连续,数据可能不能在任何一个窗口内计算,Slide size和Windows size相等时,Sliding Windows其实就是Tumbling Windows。滑动窗口能够帮助用户根据设定的统计频率计算指定窗口大小的统计指标,例如每隔30s统计最近10min内活跃用户数等。

DataStream API针对Sliding Windows也提供了不同时间类型的Assigner,其中包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTime-Windows。代码清单4-10中分别创建了两种时间类型的Sliding Windows,并指定Windows size为1h,Slide time为10s。

val inputStream:DataStream[T] = ...
//定义Event Time Sliding Windows
val slidingEventTimeWindows  = inputStream
.keyBy(_.id)
//通过使用SlidingEventTimeWindows定义Event Time 滚动窗口
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)//定义窗口函数

//定义Process Time Sliding Windows
val slidingProcessingTimeWindows= inputStream
.keyBy(_.id)
//通过使用SlidingProcessingTimeWindows定义Event Time 滚动窗口
.window(SlidingProcessingTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)//定义窗口计算函数

和滚动窗口一样,Flink DataStream API中也提供了创建两种窗口的快捷方式,通过调用DataStream API的timeWindow方法就能够创建对应的窗口。如下代码所示,通过DataStream API timeWindow方法定义滑动窗口,窗口的时间类型根据用户在Execation Enviroment中设定的Time characteristic确定,指定的参数分别Windows Size、Slide Time还有时区偏移量,如果是国内时区则设定为Time.hours(-8)。

val inputStream:DataStream[T] = ...;
val slidingEventTimeWindows = inputStream
.keyBy(_.id)
//通过使用timeWindow方式定义滑动窗口,窗口时间类型根据time characteristic确定
.timeWindow(Time.seconds(10),Time.seconds(1),Time.hours(-8))
.process(...)//定义窗口函数

会话窗口(Session Windows)

会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows不需要有固定windows size和slide time,只需要定义session gap,来规定不活跃数据的时间上限即可。如图4-13所示,通过session gap来判断数据是否属于同一活跃数据集,从而将数据切分成不同的窗口进行计算。

Session Windows窗口类型比较适合非连续型数据处理或周期性产生数据的场景,根据用户在线上某段时间内的活跃度对用户行为数据进行统计。和前面两个窗口一样,DataStream API中可以创建基于Event Time和Process Time的Session Windows,对应的Assigner分别为EventTimeSessionWindows和ProcessTimeSessionWindows,如代码清单4-11所示,用户需要调用withGap()方法来指定Session Gap,来规定不活跃数据的时间周期。

val inputStream:DataStream[T] = ...
//定义Event Time Session Windows
val eventTimeSessionWindows  = inputStream
.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time 滚动窗口
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)

//定义Process Time Session Windows
val processingTimeSessionWindows = inputStream
.keyBy(_.id)
//通过使用ProcessingTimeSessionWindows定义Event Time 滚动窗口
.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)

在创建Session Windows的过程中,除了调用withGap方法输入固定的Session Gap,Flink也能支持动态的调整Session Gap。如代码清单4-12所示,只需要实现SessionWindowTimeGapExtractor接口,并复写extract方法,完成动态Session Gap的抽取,然后将创建好的Session Gap抽取器传入ProcessingTimeSessionWindows.withDynamicGap()方法中即可。

val inputStream:DataStream[T] = ...;
//定义Event Time Session Windows
val eventTimeSessionWindows  = inputStream
.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time 滚动窗口
.window(EventTimeSessionWindows.withDynamicGap(
  //实例化SessionWindowTimeGapExtractor接口
  new SessionWindowTimeGapExtractor[String] {
  override def extract(element: String): Long = {
    // 动态指定并返回Session Gap
}}))
.process(...)

//定义Process Time Session Windows
val processingTimeSessionWindows  = inputStream
.keyBy(_.id)
//通过使用ProcessingTimeSessionWindows定义Event Time 滚动窗口
.window(ProcessingTimeSessionWindows.withDynamicGap(
    //实例化SessionWindowTimeGapExtractor接口
  new SessionWindowTimeGapExtractor[String] {
  override def extract(element: String): Long = {
    // 动态指定并返回Session Gap
}}))
.process(...)

由于Session Windows本质上没有固定的起止时间点,因此底层计算逻辑和Tumbliing窗口及Sliding窗口有一定的区别。Session Windows为每个进入的数据都创建了一个窗口,最后再将距离Session Gap最近的窗口进行合并,然后计算窗口结果。因此对于Session Windows来说需要能够合并的Trigger和Windows Funciton,比如ReduceFunction、AggregateFunction、ProcessWindowFunction等。

全局窗口(Global Windows)

全局窗口(Global Windows)将所有相同的key的数据分配到单个窗口中计算结果,窗口没有起始和结束时间,窗口需要借助于Triger来触发计算,如果不对Global Windows指定Triger,窗口是不会触发计算的。因此,使用Global Windows需要非常慎重,用户需要非常明确自己在整个窗口中统计出的结果是什么,并指定对应的触发器,同时还需要有指定相应的数据清理机制,否则数据将一直留在内存中。 如代码清单4-13所示,定义Global Windows相对比较简单,可以通过Global-Windows创建Global Windows的分配器。后面我们会讲到对窗口Trigger的定义,读者可以结合在本节内容学习。

Val inputStream:DataStream[T] = ...;
val globalWindows = inputStream
    .keyBy(_.id)
    .window(GlobalWindows.create())//通过GlobalWindows定义Global Windows
    .process(...)

Windows Function

在上一节的学习我们已经了解Flink支持了不同类型窗口的Assigner,对数据集定义了Window Assigner之后,下一步就可以定义窗口内数据的计算逻辑,也就是Window Function的定义。Flink中提供了四种类型的Window Function,分别为ReduceFunction、AggregateFunction、FoldFunction以及ProcessWindowFunction。

四种类型的Window Fucntion按照计算原理的不同可以分为两大类,一类是增量聚合函数,对应有ReduceFunction、AggregateFunction和FoldFunction;另一类是全量窗口函数,对应有ProcessWindowFunction。增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。如果接入数据量比较大或窗口时间比较长,就比较有可能导致计算性能的下降。下面将分别对每种Window Function在Flink中的使用进行解释和说明。

ReduceFunction

ReduceFunction定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,然后输出类型相同的一个结果元素。如代码清单4-14所示,创建好Window Assigner之后通过在reduce()方法中指定ReduceFunciton逻辑,可以使用Scala lambada表达式定义计算逻辑。

Search

    微信好友

    博士的沙漏

    Table of Contents