Flink监控与性能优化

2021/04/12 Flink

Flink监控与性能优化

对于构建好的Flink集群,如何能够有效地进行集群以及任务方面的监控与优化是非常重要的,尤其对于7*24小时运行的生产环境。本章将从多个方面介绍Flink在监控和性能优化方面的内容,其中监控部分包含了Flink系统内部提供的常用监控指标的获取,以及用户如何自定义实现指标的采集和监控。同时,也会重点介绍Checkpointing以及任务反压的监控。然后通过分析各种监控指标帮助用户更好地对Flink应用进行性能优化,以提高Flink任务执行的数据处理性能和效率。

监控指标

Flink任务提交到集群后,接下来就是对任务进行有效的监控。Flink将任务监控指标主要分为系统指标和用户指标两种:系统指标主要包括Flink集群层面的指标,例如CPU负载,各组件内存使用情况等;用户指标主要包括用户在任务中自定义注册的监控指标,用于获取用户的业务状况等信息。Flink中的监控指标可以通过多种方式获取,例如可以从Flink UI中直接查看,也可以通过Rest Api或Reporter获取。

系统监控指标

在FlinkUI Overview页签中,包含对系统中的TaskManager、TaskSlots以及Jobs的相关监控指标,用户可通过该页面获取正在执行或取消的任务数,以及任务的启动时间、截止时间、执行周期、JobID、Task实例等指标。

在Task Manager页签中可以获取到每个TaskManager的系统监控指标,例如JVM内存,包括堆外和堆内存的使用情况,以及NetWork中内存的切片数、垃圾回收的次数和持续时间等。

另外除了可以在FlinkUI中获取指标之外,用户也可以使用Flink提供的RestAPI获取监控指标。通过使用http://hostname:8081拼接需要查询的指标以及维度信息,就可以将不同服务和任务中的Metric信息查询出来,其中hostname为JobManager对应的主机名。

例如访问http://hostname:8081/jobmanager/metrics来获取所有JobManager的监控指标名称。

GET http://hostname: 8081/jobmanager/metrics
查询结果:
[{
     "id": "Status.JVM.GarbageCollector.PS_MarkSweep.Time"
    },{
     "id": "Status.JVM.Memory.NonHeap.Committed"
    },
    ...
]

同时可以在URL中添加get=metric1,metric2参数获取指定Metric的监控指标。例如,获取Jobanager内存CPU占用时间,就可以通过拼接get=Status.JVM.CPU.Time获取,其他的监控指标查询方法相似。

GET /jobmanager/metrics?get=Status.JVM.CPU.Time
 查询结果:
[
    {
        "id": "Status.JVM.CPU.Time",
        "value": "7052900000000"
    }
]

获取taskmanagers中metric1和metric2对应的Value。

GET /taskmanagers/metrics?get=metric1,metric2

获取taskmanagers中metric1和metric2对应的Value,以及所有taskmanagers的指标统计的最大值和最小值。

GET /taskmanagers/metrics?get=metric1,metric2&agg=max,min

获取指定taskmanagerid对应的TaskManager上的全部监控指标的Metric名称。

GET /taskmanagers/<taskmanagerid>/metrics

获取指定JobID对应的监控指标对应的Metric名称。

GET /jobs/<jobid>/metrics

获取指定JobID对应的任务中的metric1和metric2指标。

GET /jobs/<jobid>/metrics?get=metric1,metric2

获取指定JobID以及指定顶点的subtask监控指标。

GET /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

监控指标注册

除了使用Flink系统自带的监控指标之外,用户也可以自定义监控指标。可以通过在RichFunction中调用getRuntimeContext().getMetricGroup()获取MetricGroup对象,然后将需要监控的指标记录在MetricGroup所支持的Metric中,然后就可以将自定义指标注册到Flink系统中。目前Flink支持Counters、Gauges、Histograms以及Meters四种类型的监控指标的注册和获取。

Counters指标

Counters指标主要为了对指标进行计数类型的统计,且仅支持Int和Long数据类型。在代码实现了map方法中对进入到算子中的正整数进行计数统计,得到MyCounter监控指标。 实现RichMapFunction定义Counters指标

class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var counter: Counter = _
  //在Open方法中获取Counter实例化对象
  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("MyCounter")}
  override def map(input : Long): Long = {
    if (input> 0) { counter.inc()//如果value>0,则counter自增}
    value
  }}

Gauges指标

Gauges相对于Counters指标更加通用,可以支持任何类型的数据记录和统计,且不限制返回的结果类型。通过使用Gauges指标对输入Map函数中的数据量进行累加统计,得到MyGauge统计指标。 实现RichMapFunction定义Gauges指标

class gaugeMapper extends RichMapFunction[String,String] {
  @transient private var countValue = 0
  //在Open方法中获取gauge实例化对象
  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => countValue ) )
  }
  override def map(value: String): String = {
    countValue += 1 //累加countValue值
    value
  }
}

Histograms指标

Histograms指标主要为了计算Long类型监控指标的分布情况,并以直方图的形式展示。Flink中没有默认的Histograms实现类,可以通过引入Codahale/DropWizard Histograms来完成数据分布指标的获取。注意,DropwizardHistogramWrapper包装类并不在Flink默认依赖库中,需要单独引入相关的Maven Dependency。定义histogramMapper类实现RichMapFunction接口,使用DropwizardHistogram-Wrapper包装类转换Codahale/DropWizard Histograms,统计Map函数中输入数据的分布情况。 实现RichMapFunction定义Histogram指标

class histogramMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _
 //在Open方法中获取Histogram实例化对象
  override def open(config: Configuration): Unit = {
//使用DropwizardHistogramWrapper包装类转换Codahale/DropWizard Histograms
    val dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new
      DropwizardHistogramWrapper(dropwizardHistogram))
  }
  override def map(value: Long): Long = {
    histogram.update(value) //更新指标
    value
  }}

Meters指标

Meters指标是为了获取平均吞吐量方面的统计,与Histograms指标相同,Flink中也没有提供默认的Meters收集器,需要借助Codahale/DropWizard meters实现,并通过DropwizardMeterWrapper包装类转换成Flink系统内部的Meter。在实现的RichMapFunction中定义Meter指标,并在Meter中使用markEvent()标记进入到函数中的数据。 实现RichMapFunction定义Meter指标

class meterMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _
  //在Open方法中获取Meter实例化对象
  override def open(config: Configuration): Unit = {
//使用DropwizardMeterWrapper包装类转换Codahale/DropWizard Meter
    val dropwizardMeter = new com.codahale.metrics.Meter()
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }
  override def map(value: Long): Long = {
    meter.markEvent() //更新指标
    value
  }
}

当自定义监控指标定义完毕之后,就可以通过使用RestAPI获取相应的监控指标,具体的使用方式读者可参考上一小节内容。

监控指标报表

Flink提供了非常丰富的监控指标Reporter,可以将采集到的监控指标推送到外部系统中。通过在conf/flink-conf.yaml中配置外部系统的Reporter,在Flink集群启动的过程中就会将Reporter配置加载到集群环境中,然后就可以把Flink系统中的监控指标输出到Reporter对应的外部监控系统中。目前Flink支持的Reporter有JMX、Graphite、Prometheus、StatsD、Datadog、Slf4j等系统,且每种系统对应的Reporter均已在Flink中实现,用户可以直接配置使用。

在conf/flink-conf.yaml中Reporter的配置包含以下几项内容,其中reporter-name是用户自定义的报表名称,同时reporter-name用以区分不同的reporter。

  • metrics.reporter..:配置reporter-name对应的报表的参数信息,可以通过指定config名称将参数传递给报表系统,例如配置服务器端口.port:9090。
  • metrics.reporter..class:配置reporter-name对应的class名称,对应类依赖库需要已经加载至Flink环境中,例如JMX Reporter对应的是org.apache.flink.metrics.jmx.JMXReporter。
  • metrics.reporter..interval:配置reporter指标汇报的时间间隔,单位为秒。
  • metrics.reporter..scope.delimiter:配置reporter监控指标中的范围分隔符,默认为metrics.scope.delimiter对应的分隔符。
  • metrics.reporters:默认开户使用的reporter,通过逗号分隔多个reporter,例如reporter1和reporter2。

如下通过介绍Jmx以及Prometheus两种Reporter来说明如何使用Reporter完成对监控指标的输出。

JMX Reporter配置

JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活地开发无缝集成的系统、网络和服务管理应用。目前大多数的应用都支持JMX,主要因为JMX可以为运维监控提供简单可靠的数据接口。Flink在系统内部已经实现JMX Reporter,并通过配置就可以使用JMX Reporter输出监控指标到JMX系统中。

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

其中class配置对应的org.apache.flink.metrics.jmx.JMXReporter类已经集成在Flink系统中,用户可以直接配置使用,metrics.reporter.jmx.port配置是JMX服务的监听端口。

Prometheus Reporter配置

对于使用Prometheus Reporter将监控指标发送到Prometheus中,首先需要在启动集群前将/opt/flink-metrics-prometheus_2.11-1.7.2.jar移到/lib路径下,并在conf/flink-conf.yaml中配置Prometheus Reporter相关信息。Prometheus Reporter有两种形式,一种方式是通过配置Prometheus监听端口将监控指标输出到对应端口中,也可以不设定端口信息,默认使用9249,对于多个Prometheus Reporter实例,可以使用端口段来设定。

metrics.reporter.prometheus.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prometheus.port: 9249

另外一种方式是使用PrometheusPushGateway,将监控指标发送到指定网关中,然后Prometheus从该网关中拉取数据,对应的Reporter Class为PrometheusPushGateway-Reporter,另外需要指定Pushgateway的Host、端口以及JobName等信息,通过配置deleteOnShutdown来设定Pushgateway是否在关机情况下删除metrics指标。

metrics.reporter.promgateway.class:
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

Backpressure监控与优化

反压在流式系统中是一种非常重要的机制,主要作用是当系统中下游算子的处理速度下降,导致数据处理速率低于数据接入的速率时,通过反向背压的方式让数据接入的速率下降,从而避免大量数据积压在Flink系统中,最后系统无法正常运行。Flink具有天然的反压机制,不需要通过额外的配置就能够完成反压处理。

Backpressure进程抽样

当在FlinkUI中切换到Backpressure页签时,Flink才会对整个Job触发反压数据的采集,反压过程对系统有一定的影响,主要因为JVM进程采样成本较高。如图10-2所示,Flink通过在TaskManager中采样LocalBufferPool内存块上的每个Task的stackTrace实现。默认情况下,TaskManager会触发100次采样,然后将采样的结果汇报给JobManager,最终通过JobManager进行汇总计算,得出反压比例并在页面中展示,反压比例等于反压出现次数/采样次数。

Backpressure页面监控

通过触发JVM进程采样的方式获取到反压监控数据,同时Flink会将反压状态分为三个级别,分别为OK、LOW、HIGH级别,其中OK对应的反压比例为大于0小于10%,LOW对应的反压比例大于10%小于50%,HIGH对应的反压比例大于50%小于100%。

Backpressure配置

针对反压的优化,用户可以调整以下参数:

  • web.backpressure.cleanup-interval:当启动反压数据采集后,需要等待页面并获取反压数据的时间长度,默认60s。
  • web.backpressure.delay-between-samples:Stack Trace抽样到确认反压状态之间的时延,默认为50ms。
  • web.backpressure.num-samples:设定Stack Trace抽样数以确定反压状态,默认为100。

Checkpointing监控与优化

Checkpointing页面监控

Flink Web页面中也提供了针对Job Checkpointing相关的监控信息,Checkpointing监控页面中共有Overview、History、Summary和Configuration四个页签,分别对Checkpointing从不同的角度进行了监控,每个页面中都包含了与Checkpointing相关的指标。

  • Overview页签 Overview页签中宏观地记录了Flink应用中Checkpoints的数量以及Checkpoint的最新记录,包括失败和完成的Checkpoints记录。
    • Checkpoint Counts:包含了触发、进行中、完成、失败、重置等Checkpoint状态数量统计。
    • Latest Completed Checkpoint:记录了最近一次完成的Checkpoint信息,包括结束时间,端到端时长,状态大小等。
    • Latest Failed Checkpoint:记录了最近一次失败的Checkpoint信息。
    • Latest Savepoint:记录了最近一次Savepoint触发的信息。
    • Latest Restore:记录了最近一次重置操作的信息,包括从Checkpoint和Savepoint两种数据中重置恢复任务。
  • History页签 History页面中记录了历史触发Checkpoint的详情,包括Checkpoint的ID、状态、触发时间,最后一次Acknowledgement信息等,通过点击More details对应的链接可以查看子Task对应的Checkpoint数据。
  • Summary页签 Summary页面中记录了所有完成的Checkpoint统计指标的最大值、最小值,以及平均值等,指标中包括端到端的持续时间、状态大小,以及分配过程中缓冲的数据大小。
  • Configuration页签 Configuration页签中包含Checkpoints中所有的基本配置,具体的配置解释如下:
  • Checkpointing Mode:标记Checkpointing是Exactly Once还是At Least Once的模式。
  • Interval: Checkpointing触发的时间间隔,时间间隔越小意味着越频繁的Checkpointing。
  • Timeout: Checkpointing触发超时时间,超过指定时间JobManager会取消当次Checkpointing,并重新启动新的Checkpointing。
  • Minimum Pause Between Checkpoints:配置两个Checkpoints之间最短时间间隔,当上一次Checkpointing结束后,需要等待该时间间隔才能触发下一次Checkpoints,避免触发过多的Checkpoints导致系统资源被消耗。
  • Persist Checkpoints Externally:如果开启Checkpoints,数据将同时写到外部持久化存储中。

Search

    微信好友

    博士的沙漏

    Table of Contents