Flink环境准备
本章主要介绍Flink在使用前的环境安装准备,包括必须依赖的环境以及相应的参数,首先从不同运行环境进行介绍,包括本地调试环境、Standalone集群环境,以及在On Yarn环境上。另外介绍Flink自带的Template模板,如何通过该项目模板本地运行代码环境的直接生成,而不需要用户进行配置进行大量的开发环境配置,节省了开发的时间成本。最后介绍Flink源码编译相关的事项,通过对源码进行编译,从而对整个Flink计算引擎有更深入的理解。
运行环境介绍
Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarn或Mesos等不同的资源管理器部署自己的应用。
JDK环境
Flink核心模块均使用Java开发,所以运行环境需要依赖JDK,本书暂不详细介绍JDK安装过程,用户可以根据官方教程自行安装,其中包括Windows和Linux环境安装,需要注意的是JDK版本需要保证在1.8以上。
Scala环境
如果用户选择使用Scala作为Flink应用开发语言,则需要安装Scala执行环境,Scala环境可以通过本地安装Scala执行环境,也可以通过Maven依赖Scala-lib来引入。
Maven编译环境
Flink的源代码目前仅支持通过Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。Maven的具体安装方法这里不再赘述。
需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
Hadoop环境
对于执行在Hadoop Yarn资源管理器的Flink应用,则需要配置对应的Hadoop环境参数。目前Flink官方提供的版本支持hadoop2.4、2.6、2.7、2.8等主要版本,所以用户可以在这些版本的Hadoop Yarn中直接运行自己的Flink应用,而不需要考虑兼容性的问题。
Flink项目模板
Flink为了对用户使用Flink进行应用开发进行简化,提供了相应的项目模板来创建开发项目,用户不需要自己引入相应的依赖库,就能够轻松搭建开发环境,前提是在JDK(1.8及以上)和Maven(3.0.4及以上)的环境已经安装好且能正常执行。在Flink项目模板中,Flink提供了分别基于Java和Scala实现的模板,下面就两套项目模板分别进行介绍和应用。
基于Java实现的项目模板
创建项目
创建模板项目的方式有两种,一种方式是通过Maven archetype命令进行创建,另一种方式是通过Flink提供的Quickstart Shell脚本进行创建,具体实例说明如下。
- 通过Maven Archetype进行创建:
```
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeCatalog=https://repository.apache.org/
content/repositories/snapshots/
-DarchetypeVersion=1.7.0
通过以上Maven命令进行项目创建的过程中,命令会交互式地提示用户对项目的groupId、artifactId、version、package等信息进行定义,且部分选项具有默认值。我们创建了实例项目成功之后,客户端会提示用户项目创建成功,且在当前路径中具有相应创建的Maven项目。
- 通过quickstart脚本创建:
$ curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s 1.6.0
通过以上脚本可以比较简单地创建项目,执行后项目会自动生成,但是项目的名称和一些GAV信息都是自动生成的,用户不能进行交互式重新定义,其中的项目名称为quickstart,gourpid为org.myorg.quickstart,version为0.1。这种方式对于Flink入门相对比较适合,其他有一定基础的情况下,则不建议使用这种方式进行项目创建。
在Maven 3.0以上的版本中,DarchetypeCatalog配置已经从命令行中移除,需要用户在Maven Settings中进行配置,或者直接将该选项移除,否则可能造成不能生成Project的错误。
### 检查项目
对于使用quickstart curl命令创建的项目,我们可以看到的项目结构如下所示,如果用户使用Maven Archetype,则可以自己定义对应的artifactId等信息。
quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties
从上述项目结构可以看出,该项目已经是一个相对比较完善的Maven项目,其中创建出来对应的Java实例代码,分别是BatchJob.java和Streaming.java两个文件,分别对应Flink批量接口DataSet的实例代码和流式接口DataStream的实例代码。
通过Maven创建Java应用,用户可以在Pom中指定Main Class,这样提交执行过程中就具有默认的入口Main Class,否则需要用户在执行的Flink App的Jar应用中指定Main Class。
### 开发应用
在项目创建和检测完成后,用户可以选择在模板项目中的代码上编写应用,也可以定义Class调用DataSet API或DataStream API进行Flink应用的开发,然后通过编译打包,上传并提交到集群上运行。具体应用的开发读者可以参考后续章节。
## 基于Scala实现的项目模板
Flink在开发接口中同样提供了Scala的接口,用户可以借助Scala高效简洁的特性进行Flink App的开发。在创建项目的过程中,也可以像上述Java一样创建Scala模板项目,而在Scala项目中唯一的区别就是可以支持使用SBT进行项目的创建和编译,以下实例,将从SBT和Maven两种方式进行介绍。
### 创建项目
通过Maven archetype命令创建Flink Scala版本的模板项目,其中项目相关的参数同创建Java项目一样,需要通过交互式的方式进行输入,用户可以指定对应的项目名称、groupid、artifactid以及version等信息。
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-scala
-DarchetypeCatalog=https://repository.apache.org/
content/repositories/snapshots/
-DarchetypeVersion=1.7.0
执行完上述命令之后,表示项目创建成功,可以进行后续操作。同时可以在同级目录中看到已经创建好的Scala项目模板,其中包括了两个Scala后缀的文件。
### 使用quickstart curl脚本创建
在创建Scala项目模板的过程中,也可以通过quickstart curl脚本进行创建,这种方式相对比较简单,只要执行以下命令即可:
curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash -s 1.7.0
## 创建SBT项目
在使用Scala接口开发Flink应用中,不仅可以使用Maven进行项目的编译,也可以使用SBT(Simple Build Tools)进行项目的编译和管理,其项目结构和Maven创建的项目结构有一定的区别。可以通过SBT命令或者quickstart脚本进行创建SBT项目,具体实现方式如下:
### 使用SBT命令创建项目
sbt new path/flink-project.g8
执行上述命令后,会在客户端输出创建成功的信息,表示项目创建成功,同时在同级目录中生成创建的项目,其中包含两个Scala的实例代码供用户参考。
### 使用quickstart curl脚本创建项目
可以通过使用以下指令进行项目创建Scala项目:
bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
如果项目编译方式选择SBT,则需要在环境中提前安装SBT编译器,同时版本需要在0.13.13以上,否则无法通过上述方式进行模板项目的创建,具体的安装教程可以参考SBT官方网站https://www.scala-sbt.org/download.html进行下载和安装。
### 检查项目
对于使用Maven archetype创建的Scala项目模板,其结构和Java类似,在项目中增加了Scala的文件夹,且包含两个Scala实例代码,其中一个是实现DataSet接口的批量应用实例BatchJob,另外一个是实现DataStream接口的流式应用实例StreamingJob
quickstart/ ├── pom.xml └── src └── main ├── resources │ └── log4j.properties └── scala └── org └── myorg └── quickstart ├── BatchJob.scala └── StreamingJob.scala
### 开发应用
在项目创建和检测完成后,用户可以选择在Scala项目模板的代码上编写应用,也可以定义Class调用DataSet API或DataStream API进行Flink应用的开发,然后通过编译打包,上传并提交到集群上运行。
## Flink开发环境配置
对于通过项目模板生成的项目,项目中的主要参数配置已被初始化,所以无须额外进行配置,如果用户通过手工进行项目的创建,则需要创建Flink项目并进行相应的基础配置,包括Maven Dependences、Scala的Version等配置信息。
### Flink基础依赖库
对于Java版本,需要在项目的pom.xml文件中配置的依赖库,其中flink-java和flink-streaming-java分别是批量计算DataSet API和流式计算DataStream API的依赖库,{flink.version}是官方的发布的版本号,用户可根据自身需要进行选择。
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>{flink.version}</version> <!—指定flink版本-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>{flink.version}</version> <!—指定flink版本-->
<scope>provided</scope>
</dependency>
创建Scala版本Flink项目依赖库配置如下,和Java相比需要指定scala的版本信息,目前官方建议的是使用Scala 2.11,如果需要使用特定版本的Scala,则要将源码下载进行指定Scala版本编译,否则Scala各大版本之间兼容性较弱会导致应用程序在实际环境中无法运行的问题。Flink基于Scala语言项目依赖配置库
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>{flink.version} </version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>{flink.version} </version>
<scope>provided</scope>
</dependency>
另外在上述Maven Dependences配置中,核心的依赖库配置的Scope为provided,主要目的是在编译阶段能够将依赖的Flink基础库排除在项目之外,当用户提交应用到Flink集群的时候,就避免因为引入Flink基础库而导致Jar包太大或类冲突等问题。而对于Scope配置成provided的项目可能出现本地IDE中无法运行的问题,可以在Maven中通过配置Profile的方式,动态指定编译部署包的scope为provided,本地运行过程中的scope为compile,从而解决本地和集群环境编译部署的问题。 由于Flink在最新版本中已经不再支持scala 2.10的版本,建议读者使用scala 2.11,同时Flink将在未来的新版本中逐渐支持Scala 2.12。
Flink Connector和Lib依赖库
除了上述Flink项目中应用开发必须依赖的基础库之外,如果用户需要添加其他依赖,例如Flink中內建的Connector,或者其他第三方依赖库,需要在项目中添加相应的Maven Dependences,并将这些Dependence的Scope需要配置成compile。
如果项目中需要引入Hadoop相关依赖包,和基础库一样,在打包编译的时候将Scope注明为provided,因为Flink集群中已经将Hadoop依赖包添加在集群的环境中,用户不需要再将相应的Jar包打入应用中,否则容易造成Jar包冲突。
对于有些常用的依赖库,为了不必每次都要上传依赖包到集群上,用户可以将依赖的包可以直接上传到Flink安装部署路径中的lib目录中,这样在集群启动的时候就能够将依赖库加载到集群的ClassPath中,无须每次在提交任务的时候上传依赖的Jar包。
Flink源码编译
对于想深入了解Flink源码结构和实现原理的读者,可以按照本节的内容进行Flink源码编译环境的搭建,完成Flink源码的编译。 Flink源码可以从官方 Git Repository上通过git clone命令下载:
git clone https://github.com/apache/flink
也可以通过官方镜像库手动下载,下载地址为https://archive.apache.org/dist/flink/。用户根据需要选择需要编译的版本号,下载代码放置在本地路径中,然后通过如下Maven命令进行编译,需要注意的是,Flink源码编译依赖于JDK和Maven的环境,且JDK必须在1.8版本以上,Maven必须在3.0版本以上,否则会导致编译出错。
mvn clean install -DskipTests
Flink的Maven依赖
<properties>
<hadoop.hdfs.version>2.10.1</hadoop.hdfs.version>
<!-- <hadoop.hdfs.version>2.7.1</hadoop.hdfs.version>-->
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
<!-- 需要去掉其中的低版本依赖,部分api的使用方法返回值不一致,需要版本一致-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink相关包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!--指定打的jar包使用的jdk版本-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>