Flink作业链和资源组

2021/04/06 Flink

Flink作业链和资源组

在Flink作业中,用户可以指定相应的链条将相关性非常强的转换操作绑定在一起,这样能够让转换过程上下游的Task在同一个Pipeline中执行,进而避免因为数据在网络或者线程间传输导致的开销。一般情况下Flink在例如Map类型的操作中默认开启TaskChain,以提高Flink作业的整体性能。Flink同时也为用户提供细粒度的链条控制,用户能够根据自己的需要创建作业链或禁止作业链。

作业链

禁用全局链条

用户能够通过禁止全局作业链的操作来关闭整个Flink作业的链条,需要注意这个操作会影响整个作业的执行情况,在选择关闭之前,用户需要非常清楚作业的执行过程,否则可能会出现一些意想不到的情况。`

StreamExecutionEnvironment.disableOperatorChaining()

关闭全局作业链后,创建对应Opecator的链条,需要用户事先指定操作符,然后再通过使用startNewChain()方法来创建,且创建的链条只对当前的操作符和之后的操作符有效,不影响其他操作,例如:

someStream.filter(...).map(...).startNewChain().map(...)

在上述过程中,新创建的作业链只针对两个map操作进行链条绑定,对前面的filter操作无效,如果用户需要创建,则可以在filter操作和map操作之间进行startNewChain()方法即可。

禁用局部链条

如果用户不想关闭整体作业算子上的链条,而是只想关闭某些操作符上的链条,可以通过disableChaining()方法来禁用当前操作符上的链条,例如:

someStream.map(...).disableChaining();

在上述过程中,只会禁用map操作上的链条,且不会对其他操作符产生影响。

Slots资源组

Slot是整个Flink系统中所提供的资源最小单元,其概念上和Yarn中的Container类似,Flink在TaskManager启动后,会自动管理当前TaskManager上所能提供的Slot,在作业提交到Flink集群后,由JobManager进行统一分配Slots数量。在这里需要介绍的是Slot资源共享的问题,即多个Task计算过程中在同一个Slot中进行,这样能够对特定过程中的Tasks进行物理隔离,使其能够在同一个Slot中执行,对数据转换操作进行隔离。另外,如果当前操作符(operator)中所有input操作均具有相同的slot group,则该操作符会继承前面操作符的slot group,然后在同一个Slot中进行数据处理,如果不是则当前的操作符会选择默认的slot group(“default”),然后经作业发送到对应的slot上执行。如果用户不显示指定slot group,则所有的操作符均在default slot group中执行操作,而默认情况slot相互之间没有隔离,故Task上的操作符执行可能会在不同的slot上转换执行。可以通过如下方法指定slot group:

someStream.filter(...).slotSharingGroup("name");

这样就可以创建一个名为name的slot group,将filter操作指定在slot group中对应的slot上执行计算操作。

Asynchronous I/O异步操作

在使用Flink处理流式数据的过程中,会经常和外部系统进行数据交互。通常情况下在Flink中可以通过RichMapFunction来创建外部数据库系统的Client连接,然后通过Client连接将数据元素写入外部存储系统中或者从外部存储系统中读取数据。考虑到连接外部系统的网络等因素,这种同步查询和操作数据库的方式往往会影响整个函数的处理效率,用户如果想提升应用的处理效率,就必须考虑增加算子的并行度,这将导致大量的资源开销。Flink在1.2版本中引入了Asynchronous I/O,能够支持通过异步方式连接外部存储系统,以提升Flink系统与外部数据库交互的性能及吞吐量,但前提是数据库本身需要支持异步客户端。

自定义实现AsyncFunction,创建数据库异步客户端DBClient,从数据库中异步获取数据,然后通过调用callback方法将结果返回给ResultFuture对象,完成从数据库中查询数据并创建下游DataStream数据集的操作。

//通过实例化AsyncFunction接口,实现数据库数据异步查询
class AsyncDBFunction extends AsyncFunction[String, (String, String)] {
  //创建数据连接的异步客户端,能够支持数据的异步查询
  lazy val dblient: DBClient = new DBClient(host, post)
  //用于future callbacks的context
  implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
    //复写asyncInvoke方法,实现AsyncFunction触发数据库数据查询
    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
      //通过client查询数据传入字符串,返回Future对象
      val resultFutureRequested: Future[String] = dblient.query(str)
      //当客户端查询完请求后,通过调用callback将查询结果返回给resultFuture
      resultFutureRequested.onSuccess {
        case result: String => resultFuture.complete(Iterable((str, result)))
    }}}
//创建DataStream数据集
val stream: DataStream[String] = ...
// 在现有的DataStream上应用创建好的AsyncDatabaseRequest方法,返回查询结果后回应的数据集
val resultStream: DataStream[(String, String)] =
  AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

通过使用Asynchronous I/O的方式会在很大程度上提升Flink系统的性能和吞吐量,主要原因是在异步函数中可以尽可能异步并发地查询外部数据库。在异步IO中需要考虑对数据库查询超时以及并发线程数控制两个因素。Asynchronous I/O提供了Timeout和Capacity两个参数来配置异步数据IO操作,其中Timeout是定义Asynchronous请求最长等待时间,超过该时间Flink将认为查询数据超时而请求失败,避免因请求无法按时返回导致系统长时间等待的情况,对于超时的请求可以通过复写AsyncFunction中的timeout方法来处理。Capacity定义在同一时间点异步请求并发数,通过Capacity参数来控制总的请求数,一旦Capacity定义的请求数被耗尽,Flink会直接触发反压机制来抑制上游数据的接入,从而保证Flink系统的正常运行。

使用异步IO方式进行数据输出,其输出结果的先后顺序有可能并不是按照之前原有数据的顺序进行排序,因此在Flink异步IO中,需要用户显式指定是否对结果排序输出,而是否排序同样影响着结果的顺序和系统性能,下面针对结果是否进行排序输出进行对比:

  • 乱序模式:异步查询结果输出中,原本数据元素的顺序可能会发生变化,请求一旦完成就会输出结果,可以使用AsyncDataStream.unorderedWait(…)方法应用这种模式。如果系统同时选择使用Process Time特征具有最低的延时和负载。
  • 顺序模式:异步查询结果将按照输入数据元素的顺序输出,原本Stream数据元素的顺序保持不变,这种情况下具有较高的时延和负载,因为结果数据需要在Operator的Buffer中进行缓存,直到所有异步请求处理完毕,将按照原来元素顺序输出结果,这也将对Checkpointing过程造成额外的延时和性能损耗。可以使用AsyncDataStream.orderedWait(…)方法使用这种模式。

另外在使用Event-Time时间概念处理流数据的过程中,Asynchronous I/O Operator总能够正确保持Watermark的顺序,即使使用乱序模式,输出Watermark也会保持原有顺序,但对于在Watermark之间的数据元素则不保持原来的顺序,也就是说如果使用了Watermark,将会对异步IO造成一定的时延和开销,具体取决于Watermark的频率,频率越高时延越高同时开销越大。

Search

    微信好友

    博士的沙漏

    Table of Contents