Flink部署与应用

2021/04/11 Flink

Flink部署与应用

本章首先会重点讲解如何将前面章节中不同API编写的Flink应用部署在Flink实际集群环境,以及Flink所支持的不同部署环境与模式,其中包括Standalone cluster,Flink On Yarn以及Flink On Kubernetes三种部署模式。然后介绍Flink在集群环境中如何进行高可用等配置,其中重点包括如何实现JobManager的高可用。除此之外,本章也将介绍Flink集群安全认证管理,帮助用户进行整个Flink集群的安全管理,实现与大数据集群认证系统对接。最后还将介绍Flink集群在升级运维过程中,如何通过Savepoint技术实现数据一致性的保障。

Flink集群部署

作为通用的分布式数据处理框架,Flink可以基于Standalone集群进行分布式计算,也可以借助于第三方组件提供的计算资源完成分布式计算。目前比较熟知的资源管理器有Hadoop Yarn、Apache Mesos、Kubernetes等,目前Flink已经能够非常良好地支持这些资源管理器。以下将分别介绍Flink基于Standalone Cluster、Yarn Cluster、Kubunetes Cluster等资源管理器上的部署与应用。

Standalone Cluster部署

前面已经知道Flink是Master-Slave架构的分布式处理框架,因此构建Standalone Cluster需要同时配置这两种服务角色,其中Master角色对应的是JobManager,Slave角色对应的是TaskManager。在构建Flink Standalone集群之前,以下服务器基础环境参数必须要符合要求。

  • 环境要求 首先Flink集群需要构建在Unix系统之上,例如Linux和Mac OS等系统,如果是Windows系统则需要安装Cygwin构建Linux执行环境,不同的网络和硬件环境需要不同的环境配置。
  • JDK环境 其次需要在每台主机上安装JDK,且版本需要保持在1.8及以上,可以设定环境变量JAVA_HOME,也可以在配置文件中指定JDK安装路径,Flink配置文件在安装路径中的conf目录下,修改flink-conf.yaml中的env.java.home参数,指定Java安装路径即可。
  • SSH环境 最后需要在集群节点之间配置互信,可以使用SSH对其他节点进行免密登录,因为Flink需要通过提供的脚本对其他节点进行远程管理和监控,同时也需要每台节点的安装路径结构保持一致。

集群安装

Flink Standalone集群安装相对比较简单,只需要两步就能够安装成功,具体安装步骤如下:

  • 下载Flink对应版本安装包 如果环境能连接到外部网络,可以直接通过wget命令下载,或在官方镜像仓库中下载,然后上传到系统中的安装路径中,官方安装包下载地址为https://flink.apache.org/downloads.html。注意,如果仅是安装Standalone集群,下载Apache Flink x.x. only对应的包即可;如果需要Hadoop的环境支持,则下载相应Hadoop对应的安装包,这里建议用户使用基于Hadoop编译的安装包。可以通过如下命令从官网镜像仓库下载安装包:
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.2/flink-1.7.0-bin-hadoop27-scala_2.11.tgz
    

    通过tar命令解压Flink安装包,并进入到Flink安装路径。至此,Flink Standalone Cluster就基本安装完毕。

    tar -xvf flink-1.7.0-bin-hadoop27-scala_2.11.tgz
    

    进入到Flink安装目录后,所有Flink的配置文件均在conf路径中,启动脚本在bin路径中,依赖包都放置在lib路径中。

集群配置

安装完Standalone集群之后,下一步就需要对Flink集群的参数进行配置,配置文件都在Flink根目录中的conf路径下。 首先配置集群的Master和Slave节点,在master文件中配置Flink JobManager的hostname以及端口。然后在conf/slaves文件中对Slave节点进行配置,如果需要将多个TaskManager添加在Flink Standalone集群中,如以下只需在conf/slaves文件中添加对应的IP地址即可。

10.0.0.1
10.0.0.2

通过配置以上参数,将10.0.0.1和10.0.0.2节点添加到Flink集群之中,且每台节点的环境参数必须保持一致且符合Flink集群要求,否则会出现集群无法启动的问题。 同时在conf/flink-conf.yaml文件中可以配置Flink集群中JobManager以及TaskManager组件的优化配置项,主要的配置项如以下所示:

  • jobmanager.rpc.address 表示Flink Cluster集群的JobManager RPC通信地址,一般需要配置指定的JobManager的IP地址,默认localhost不适合多节点集群模式。
  • jobmanager.heap.mb 对JobManager的JVM堆内存大小进行配置,默认为1024M,可以根据集群规模适当增加。
  • taskmanager.heap.mb 对TaskManager的JVM堆内存大小进行配置,默认为1024M,可根据数据计算规模以及状态大小进行调整。
  • taskmanager.numberOfTaskSlots 配置每个TaskManager能够贡献出来的Slot数量,根据TaskManager所在的机器能够提供给Flink的CPU数量决定。
  • parallelism.default Flink任务默认并行度,与整个集群的CPU数量有关,增加parallelism可以提高任务并行的计算的实例数,提升数据处理效率,但也会占用更多Slot。
  • taskmanager.tmp.dirs 集群临时文件夹地址,Flink会将中间计算数据放置在相应路径中。

这些默认配置项会在集群启动的时候加载到Flink集群中,当用户提交任务时,可以通过-D符号来动态设定系统参数,此时flink-conf.yaml配置文件中的参数就会被覆盖掉,例如使用-Dfs.overwrite-files=true动态参数。

Flink Standalone集群配置完成后,然后在Master节点,通过bin/start-cluster.sh脚本直接启动Flink集群,Flink会自动通过Scp的方式将安装包和配置同步到Slaves节点。启动过程中如果未出现异常,就表示Flink Standalone Cluster启动成功,可以通过https://{JopManagerHost:Port} 访问Flink集群并提交任务,其中JopManagerHost和Port是前面配置JopManager的IP和端口。

动态添加JobManager&TaskManager

对于已经启动的集群,可以动态地添加或删除集群中的JobManager和TaskManager,该操作通过在集群节点上执行jobmanager.sh和taskmanager.sh脚本来完成。

  • 向集群中动态添加或删除JobManager
    bin/jobmanager.sh ((start|start-foreground) [host]
    [webui-port])|stop|stop-all
    
  • 向集群中动态添加TaskManager
    bin/taskmanager.sh start|start-foreground|stop|stop-all
    

Yarn Cluster部署

Hadoop Yarn是 Hadoop 2.x提出的统一资源管理器,也是目前相对比较流行的大数据资源管理平台,Spark和MapReduce等分布式处理框架都可以兼容并运行在Yarn上,Flink也是如此。 需要注意Flink应用提交到Yarn上目前支持两种模式,一种是Yarn Session Model,这种模式中Flink会向Hadoop Yarn申请足够多的资源,并在Yarn上启动长时间运行的Flink Session集群,用户可以通过RestAPI或Web页面将Flink任务提交到Flink Session集群上运行。另外一种为Single Job Model和大多数计算框架的使用方式类似,每个Flink任务单独向Yarn提交一个Application,并且每个任务都有自己的JobManager和TaskManager,当任务结束后对应的组件也会随任务释放。

环境依赖

将Flink任务部署在Yarn Cluster之前,需要确认Hadoop环境是否满足以下两点要求:

  • Hadoop版本至少保证在2.2以上,并且集群中安装有HDFS服务。
  • 主机中已经配置HADOOP_CONF_DIR变量指定Hadoop客户端配置文件目录,并在对应的路径中含有Hadoop配置文件,其中主要包括hdfs-default.xml、hdfs-site.xml以及yarn-site.xml等。在启动Flink集群的过程中,Flink会通过识别HADOOP_CONF_DIR环境变量读取Hadoop配置参数。

集群安装

通过从Flink官方下载地址下载Flink安装包,选择一台具有Hadoop客户端配置的主机,解压并进入到安装路径中,基本完成Flink集群的安装,下面介绍每种模式对应环境的启动方式。

tar xvzf flink-1.7.0-bin-hadoop2.tgz
cd flink-1.7.0/

Yarn Session模式

前面已经提到Yarn Session模式其实是在Yarn上启动一个Flink Session集群,其中包括JobManager和TaskManager组件。Session集群会一直运行在Hadoop Yarn之上,底层对应的其实是Hadoop的一个Yarn Application应用。当Yarn Session Cluster启动后,用户就能够通过命令行或RestAPI等方式向Yarn Session集群中提交Flink任务,从而不需要再与Yarn进行交互,这样其实也是让Flink应用在相同的集群环境运行,从而屏蔽底层不同的运行环境。

  • 启动Yarn Session Cluster 首先启动Yarn Session Cluster之前Flink需要使用Hadoop客户端参数,Flink默认使用YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量获取Hadoop客户端配置文件。如果启动的节点中没有相应的环境变量和配置文件,则可能导致Flink启动过程无法正常连接到Hadoop Yarn集群。 如果节点中没有相应的环境变量,则建议用户在每次启动Yarn Session之前通过手动的方式对环境变量进行赋值。如果启动节点中没有Hadoop客户端配置,用户可以将配置从Hadoop集群中获取出来,然后放置在指定路径中,再通过上述步骤进行环境变量配置。 如下通过yarn-session.sh命令启动Flink Yarn Session集群,其中-n参数配置启动4个Yarn Container,-jm参数配置JobManager的JVM内存大小,-tm参数配置Task-Manager的内存大小,-s表示集群中共启动16个slots来提供给应用以启动task实例。
    ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -s 16
    

    集群启动完毕之后就可以在Yarn的任务管理页面查看Flink Session集群状况,并点击ApplicationMaster对应的URL,进入Flink Session Cluster集群中,注意在On YARN的模式中,每次启动JobManager的地址和端口都不是固定的。

Yarn Session独立模式

通过以上方式启动Yarn Session集群,集群的运行与管理依赖于于本地Yarn Session集群的本地启动进程,一旦进程关闭,则整个Session集群也会终止。此时可以通过,在启动Session过程中指定参数–d或–detached,将启动的Session集群交给Yarn集群管理,与本地进程脱离。通过这种方式启动Flink集群时,如果停止Flink Session Cluster,需要通过Yarn Application -kill [appid]来终止Flink Session集群。

  • 本地进程绑定已有的Session 与Detached Yarn Session相反,如果用户想将本地进程绑定到Yarn上已经提交的Session,可以通过以下命令Attach本地的进程到Yarn集群对应的Application,然后Yarn集群上ApplicationID对应的Session就能绑定到本地进程中。此时用户就能够对Session进行本地操作,包括执行停止命令等,例如执行Ctrl+C命令或者输入stop命令,就能将Flink Session Cluster停止。
    ./bin/yarn-session.sh -id [applicationid]
    
  • 提交任务到Session 当Flink Yarn Session集群构建好之后,就可以向Session集群中提交Flink任务,可以通过命令行或者RestAPI的方式提交Flink应用到Session集群中。例如通过以下命令将Flink任务提交到Session中,正常情况下,用户就能够直接进入到Flink监控页面查看已经提交的Flink任务。
    ./bin/flink run ./windowsWordCountApp.jar
    

容错配置

在Yarn上执行的Flink Session集群通常情况下需要进行对应的任务恢复策略配置,以防止因为某些系统问题导致整个集群出现异常。针对在Yarn上的容错配置,Flink单独提供了如下几个相关的参数,用户可以根据实际情况进行配置使用。

  • yarn.reallocate-failed 该参数表示集群中TaskManager失败后是否被重新拉起,设定为True表示重新分配资源并拉起失败的TaskManager,默认为True,其本质是Yarn是否重新分配TaskManager的Container。
  • yarn.maximum-failed-containers 该参数表示集群所容忍失败Container数量的最大值,如果超过该参数,则会直接导致整个Session集群失败并停止,参数默认值为TaskManager数量,也就是用户启动集群提交任务时-n参数对应的值。
  • yarn.application-attempts 该参数表示整个Session集群所在的Yarn Application失败重启的次数,如果Session集群所在的整个应用失败,则在该参数范围内,Yarn也会重新拉起相应的Application,但如果重启次数超过该参数,Yarn不会再重启应用,此时整个Flink Session会失败,与此同时Session上提交的任务也会全部停止。 以上三个参数从不同层面保证了Flink任务在Yarn集群上的正常运行,且这些参数可以在conf/flink-default.yaml文件中配置,也可以在启动Session集群时通过-D动态参数指定,如-Dyarn.application-attempts=10。

防火墙配置

在生产环境中的集群,一般都具有非常高的安全管控,且网络基本都是通过防火墙进行隔离,有些情况下,用户想要实现在集群之外的机器上远程提交Flink作业,这种在Standalone集群中比较容易实现,因为JopManager的Rpc端口和Rest端口都可以通过防火墙配置打开。但在Yarn Session Cluster模式下用户每启动一次Session集群,Yarn都会给相应的Application分配一个随机端口,这使得Flink Session中的JobManager的Rest和Rpc端口都会发生变化,客户端无法感知远程Session Cluster端口的变化,同时端口也可能被防火墙隔离掉,无法连接到Session Cluster集群,进而导致不能正常提交任务到集群。针对这种情况,Flink提供了相应的解决策略,就是通过端口段实现。

Single Job模式

在Single Job模式中,Flink任务可以直接以单个应用提交到Yarn上,不需要使用Session集群提交任务,每次提交的Flink任务一个独立的Yarn Application,且在每个任务中都会有自己的JobManager和TaskManager组件,且应用所有的资源都独立使用,这种模式比较适合批处理应用,任务运行完便释放资源。 可以通过以下命令直接将任务提交到Hadoop Yarn集群,生成对应的Flink Application。注意需要在参数中指定-m yarn-cluster,表示使用Yarn集群提交Flink任务,-yn表示任务需要的TaskManager数量。

./bin/flink run -m yarn-cluster -yn 2 ./windowsWordCountApp.jar

Kubernetes Cluster部署

容器化部署是目前业界非常流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。随着Docker容器编排工具Kubernetes近几年逐渐流行起来,大多数企业也逐渐使用Kubernetes来管理集群容器资源。Flink也在最近的版本中支持了Kubernetes部署模式,让用户能够基于Kubernetes来构建Flink Session Cluster,也可以通过Docker镜像的方式向Kuernetes集群中提交独立的Flink任务。下面将介绍如何基于Kubernetes部署Flink Session集群,如果用户已经有部署好的Kubernetes集群,则可以直接使用,否则需要搭建Kubernetes集群,具体的搭建步骤可以参考Kubernetes的官方网站,这里不再赘述。

Yaml配置

在Kubernetes上构建Flink Session Cluster,需要将Flink集群中的组件对应的Docker镜像分别在Kubernetes集群中启动,其中包括JobManager、TaskManager、JobManager-Services三个镜像服务,其中每个镜像服务都可以从中央镜像仓库中获取,用户也可以构建本地的镜像仓库,针对每个组件所相应Kubernetes Yamb配置如下:

JobManager yaml配置

主要提供运行JobManager组件镜像的参数配置,包含JobManager自身的参数,例如RPC端口等配置信息,JobManager yaml的配置文件如下:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        args:
        - jobmanager
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8081
          name: ui
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager

TaskManager Yaml配置

主要提供运行TaskManager组件的参数配置,以及TaskManager自身的参数,例如RPC端口等配置信息。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        args:
        - taskmanager
        ports:
        - containerPort: 6121
          name: data
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager

JobManagerServices配置

主要为Flink Session集群提供对外的RestApi和UI地址,使得用户可以通过Flink UI的方式访问集群并获取任务和监控信息。JobManagerServices Yaml配置文件如下:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

当各个组件服务配置文件定义完毕后,就可以通过使用以下Kubectl命令创建Flink Session Cluster,集群启动完成后就可以通过JobJobManagerServices中配置的WebUI端口访问FlinkWeb页面。

//启动jobmanager-service服务
kubectl create -f jobmanager-service.yaml
//启动jobmanager-deployment服务
kubectl create -f jobmanager-deployment.yaml
//启动taskmanager-deployment服务
kubectl create -f taskmanager-deployment.yaml

集群启动后就可以通过kubectl proxy方式访问Flink UI,需要保证kubectl proxy在终端中运行,并在浏览器里输入以下地址,就能够访问FlinkUI,其中JobManagerHost和port是在JobManagerYaml文件中配置的相应参数。

http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

可以通过kubectl delete命令行来停止Flink Session Cluster。

//停止jobmanager-deployment服务
kubectl delete -f jobmanager-deployment.yaml
//停止taskmanager-deployment服务
kubectl delete -f taskmanager-deployment.yaml
//停止jobmanager-service服务
kubectl delete -f jobmanager-service.yaml

Flink高可用配置

目前Flink高可用配置仅支持Standalone Cluster和Yarn Cluster两种集群模式,同时Flink高可用配置中主要针对JobManager的高可用保证,因为JobManager是整个集群的管理节点,负责整个集群的任务调度和资源管理,如果JobManager出现问题将会导致新Job无法提交,并且已经执行的Job也将全部失败,因此对JobManager的高可用保证尤为重要。 Flink集群默认是不开启JobManager高可用保证的,需要用户自行配置,下面将分别介绍Flink在Standalone Cluster和Yarn Session Cluster两种集群模式下如何进行JobManager高可用配置,以保证集群安全稳定运行。

Standalone集群高可用配置

Standalone集群中的JobManager高可用主要借助Zookeeper来完成,Zookeeper作为大数据生态中的分布式协调管理者,主要功能是为分布式系统提供一致性协调(Coordination)服务。在Flink Standalone集群中,JobManager的服务信息会被注册到Zookeeper中,并通过Zookeeper完成JobManager Leader的选举。Standalone集群会同时存在多个JobManager,但只有一个处于工作状态,其他处于Standby状态,当Active JobManager失去连接后(如系统宕机),Zookeeper会自动从Standby中选举新的 JobManager来接管Flink集群。 如果用户并没有在环境中安装Zookeeper,也可以使用Flink中自带的Zookeeper服务,如以下代码所示需要通过在conf/zoo.cfg文件中配置需要启动的Zookeeper主机,然后Flink就能够在启动的过程中在相应的主机上启动Zookeeper服务,因此不再需要用户独立安装Zookeeper服务。

server.X=addressX:peerPort:leaderPort
server.Y=addressY:peerPort:leaderPort

在上述配置中,server.X和server.Y分别为Zookeeper服务所在主机对应的唯一ID,配置完成后通过执行bin/start-zookeeper-quorum.sh脚本来启动Zookeeper服务。需要注意的是,Flink方不建议用户在生产环境中使用这种方式,应尽可能使用独立安装的Zookeeper服务,保证生产系统的安全与稳定。 在Standalone Cluster高可用配置中,还需要对masters和flink-conf.yaml两个配置文件进行修改,以下分别介绍如何对每个配置文件的参数进行修改。

  • 首先,在conf/masters文件中添加以下配置,用于指定jobManagerAddress信息,分别为主备节点的JobManager的Web地址和端口。
    jobManagerAddress1:webUIPort1
    jobManagerAddress2:webUIPort2
    

    然后在conf/flink-conf.yaml文件中配置如下Zookeeper的相关配置: 高可用模式,通过Zookeeper支持系统高可用,默认集群不会开启高可用状态。

    high-availability: zookeeper
    
  • Zookeeper服务地址,多个IP地址可以使用逗号分隔。
    high-availability.zookeeper.quorum: zkHost:2181[,...],zkAdress:2181
    
  • Zookeeper中Flink服务对应的root路径。
    high-availability.zookeeper.path.root: /flink
    
  • 在Zookeeper中配置集群的唯一ID,用以区分不同的Flink集群。
    high-availability.cluster-id: /default_ns
    
  • 用于存放JobManager元数据的文件系统地址。
    high-availability.storageDir: hdfs:///flink/recovery
    

    以下通过实例来说明如何配置两个JobManager的Standalone集群。

  • 首先在conf/flink-conf.yaml文件中配置high-availability:zookeeper以及Zookeeper服务的地址和cluster-id信息等。
    high-availability: zookeeper
    high-availability.zookeeper.quorum: zkHost1:2181, zkHost2:2181, zkHost3:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.cluster-id: /standalone_cluster_1
    high-availability.storageDir: hdfs://namenode:8020/flink/ha
    
  • 其次在JobManager节点的conf/masters文件中配置Masters信息,且分别使用8081和8082端口作为JobManager Rest服务端口。
    localhost:8081
    localhost:8082
    
  • 如果系统中没有Zookeeper集群,则需要配置conf/zoo.cfg以启用Flink自带的Zookeeper服务。
    server.0=localhost:2888:3888
    
  • 通过start-zookeeper-quorum.sh启动Flink自带的Zookeeper集群。
    bin/start-zookeeper-quorum.sh
    Starting zookeeper daemon on host localhost.
    
  • 最后启动Standalone HA-cluster集群。
    $ bin/start-cluster.sh
    Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
    Starting jobmanager daemon on host localhost.
    Starting jobmanager daemon on host localhost.
    Starting taskmanager daemon on host localhost.
    
  • 可以通过如下命令停止Flink Standalone HA集群以及ZooKeeper服务。
    ./bin/stop-cluster.sh
    Stopping taskmanager daemon (pid: 7647) on localhost.
    Stopping jobmanager daemon (pid: 7495) on host localhost.
    Stopping jobmanager daemon (pid: 7349) on host localhost.
    ./bin/stop-zookeeper-quorum.sh
    Stopping zookeeper daemon (pid: 7101) on host localhost.
    

拉取镜像

docker pull flink

创建文件夹,编辑yml文件

mkdir -p docker/flink
vim /docker/flink/docker-compose.yml

docker-compose.yaml文件内容

version: "2.1"
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

在文件夹路径下执行

docker-compose up -d 

执行失败的话,先查看一下docker-compose版本、或者有没有安装

// 查看版本
docker-compose -version

访问Flink地址: localhost:8081

Search

    微信好友

    博士的沙漏

    Table of Contents