Flink编程DataSet
目前Flink在批量计算领域的应用不是特别广泛,但并不代表Flink不擅长处理批量数据,批量数据其实是流式数据的子集,可以通过一套引擎处理批量和流式数据,而Flink在未来也会重点投入更多的资源到批流融合中。
- Flink在批计算领域的应用,包括Flink提出的针对批处理计算的DataSet API的介绍与使用,
- Flink对迭代计算的支持等。
DataSet API
Flink提出DataSet API用于处理批量数据。Flink将接入数据转换成DataSet数据集,并行分布在集群的每个节点上,基于DataSet数据集完成各种转换操作(map,filter等),并通过DataSink操作将结果数据输出到外部系统中。
开发环境配置
在使用Flink DataSet API进行批量应用程序开发之前,需要在工程中引入Flink批量计算相关依赖库,可以在项目工程中的pom.xml文件中添加flink-java对应的Dependency配置,引入DataSet API所需要的依赖库,用户可以根据需要选择Java版本或者Scala版本,也可以将两个依赖库同时引入工程。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.0</version>
</dependency>
// 引入Scala版本的批量计算依赖库
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.0</version>
</dependency>
应用实例
DataSet API创建ExecutionEnvironment环境,然后使用ExecutionEnvironment提供的方法读取外部数据,将外部数据转换成DataSet数据集,最后在创建好的数据集上应用DataSet API提供的Transformation操作,对数据进行转换,处理成最终的结果,并对结果进行输出。
//如果使用Scala语言编写DataSet API程序,需要引入相应隐式的方法
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
//创建ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//读取数据集
val text = env.fromElements(
"Who's there?", "Hello World")
//对数据集进行转换操作,形成(Word,value)格式,并进行Group操作,统计词频
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//输出统计出来的结果
counts.print()
}
}
DataSources数据接入
DataSet API支持从多种数据源中将批量数据集读到Flink系统中,并转换成DataSet数据集。数据接入接口共有三种类型,分别是文件系统类型、Java Collection类型,以及通用类数据源。同时在DataSet API中可以自定义实现InputFormat/RichInputFormat接口,以接入不同数据格式类型的数据源,常见的数据格式有CsvInputFormat、TextInput-Format TextInputFormat等。
文件类数据
readTextFile(path) / TextInputFormat
使用DataSet API中的readTextFile方法读取文本文件,并将文件内容转换成DataSet[String]类型数据集。
/读取本地文件
val textFiles:DataSet[String] = env.readTextFile("file:///path/textfile")
//读取HDFS文件
val hdfsFiles = env.readTextFile("hdfs://nnHost:nnPort/path/textfile")
readTextFileWithValue(path) / TextValueInputFormat
读取文本文件内容,将文件内容转换成DataSet[StringValue]类型数据集。StringValue是一种可变的String类型,通过StringValue存储文本数据可以有效降低String对象创建数量,从而降低系统性能上的开销。
//读取本地文件,指定读取字符格式类型为UTF-8
val ds = env.readTextFileWithValue("file:///path/textfile", "UTF-8")
readCsvFile(path) / CsvInputFormat
读取指定分隔符切割的CSV文件,且可以直接转换成Tuple类型、Case Class对象或者POJOs类。在方法中可以指定行切割符、列切割符、字段等信息。
val csvInput = env.readCsvFile[(String, Double)](
" hdfs://nnHost:nnPort/path/to/csvfile ",
includedFields = Array(0, 3))
readSequenceFile(Key, Value, path) / SequenceFileInputFormat
读取SequenceFileInputFormat类型的文件,在参数中指定Key Class和Value Class类型,返回结果为Tuple2[Key,Value]类型。
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],"hdfs://nnHost:nnPort/path/to/file")
集合类数据
fromCollection(Seq)
从给定集合中创建DataSet数据集,集合类型可以是数组、List等,也可以从非空Iterable中创建,需要指定数据集的Class类型。
//从Seq中创建DataSet数据集
val dataSet: DataSet[String] = env.fromCollection(Seq("flink",
"hadoop", "spark"))
//从Iterable中创建DataSet数据集
val dataSet: DataSet[String] =
env.fromCollection(Iterable("flink","hadoop", "spark"))
fromElements(elements: _*)
从给定数据元素序列中创建DataSet数据集,且所有的数据对象类型必须一致。
val dataSet: DataSet[String] = env.fromElements("flink", "hadoop","spark")
generateSequence(from, to)
指定from到to范围区间,然后在区间内部生成数字序列数据集。
val numbers: DataSet[Long] = env.generateSequence(1, 10000000)
通用数据接口
DataSet API中提供了Inputformat通用的数据接口,以接入不同数据源和格式类型的数据。InputFormat接口主要分为两种类型:一种是基于文件类型,在DataSet API对应readFile()方法;另外一种是基于通用数据类型的接口,例如读取RDBMS或NoSQL数据库中等,在DataSet API中对应createInput()方法。
readFile(inputFormat, path) / FileInputFormat
自定义文件类型输入源,将指定格式文件读取并转成DataSet数据集。
// 通过自定义PointInFormat,读取指定格式数据
env.readFile(new PointInFormat(),"file:///path/file")
createInput(inputFormat) / InputFormat
自定义通用型数据源,将读取的数据转换为DataSet数据集。如以下实例使用Flink内置的JDBCInputFormat,创建读取mysql数据源的JDBC Input Format,完成从mysql中读取Person表,并转换成DataSet [Row]数据集。
//通过创建JDBCInputFormat读取JDBC数据源
val jdbcDataSet: DataSet[Row] =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test ")
.setQuery("select id, name from person")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.
STRING_TYPE_INFO))
.finish()
)
第三方文件系统
为简化用户和其他第三方文件系统之间的交互,Flink针对常见类型数据源提出通用的FileSystem抽象类,每种数据源分别继承和实现FileSystem类,将数据从各个系统中读取到Flink中。DataSet API中内置了HDFS数据源、Amazon S3,MapR file system,Alluxio等文件系统的连接器,用户可以参考官方文档说明进行使用。
DataSet转换操作
针对DataSet数据集上的转换,Flink提供了非常丰富的转换操作符,从而实现基于DataSet批量数据集的转换。转换操作实质是将DataSet转换成另外一个新的DataSet,然后将各个DataSet的转换连接成有向无环图,并基于Dag完成对批量数据的处理。
数据处理
Map
完成对数据集Map端的转换,并行将每一条数据转换成新的一条数据,数据分区不发生变化。
val dataSet: DataSet[String] = env.fromElements("flink", "hadoop", "spark")
val transformDS: DataSet[String] = dataSet.map(x => x.toUpperCase)
FlatMap
将接入的每一条数据转换成多条数据输出,包括空值。例如以下实例将文件中每一行文本切割成字符集合。
val dataSet: DataSet[String] = env.fromElements("flink,hadoop,spark")
val words = dataSet.flatMap { _.split(",") }
MapPartition
功能和Map函数相似,只是MapPartition操作是在DataSet中基于分区对数据进行处理,函数调用中会按照分区将数据通过Iteator的形式传入,并返回任意数量的结果值。
val dataSet: DataSet[String] = env.fromElements("flink", "hadoop", "spark")
dataSet.mapPartition { in => in map { (_, 1) } }
Filter
根据条件对传入数据进行过滤,当条件为True后,数据元素才会传输到下游的DataSet数据集中。
val dataSet: DataSet[Long] = env.fromElements(222,12,34,323)
val resultDs = dataSet.filter(x => x > 100)
聚合操作
Reduce
通过两两合并,将数据集中的元素合并成一个元素,可以在整个数据集上使用,也可以和Group Data Set结合使用。
val dataSet: DataSet[Long] = env.fromElements(222,12,34,323)
val result = dataSet.reduce((x, y) => x + y)
ReduceGroup
将一组元素合并成一个或者多个元素,可以在整个数据集上使用,也可以和Group Data Set结合使用。
val dataSet: DataSet[Long] = env.fromElements(222,12,34,323)
dataSet.reduceGroup { collector => collector.sum }
Aggregate
通过Aggregate Function将一组元素值合并成单个值,可以在整个DataSet数据集上使用,也可以和Group Data Set结合使用。如下代码是,在DataSet数据集中根据第一个字段求和,根据第三个字段求最小值。
val dataSet: DataSet[(Int, String, Long)] = env.fromElements((12, "Alice", 34),
(12, "Alice", 34), (12, "Alice", 34))
val result:DataSet[(Int, String, Long)]
= dataSet.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 2)
也可以使用Aggregation函数的缩写方法,sum()、min()、max()等。
val result2: DataSet[(Int, String, Long)] = dataSet.sum(0).min(2)
Distinct
求取DataSet数据集中的不同记录,去除所有重复的记录。
val dataSet: DataSet[Long] = env.fromElements(222,12,12,323)
val distinct: DataSet[Long] = dataSet.distinct
多表关联
Join
根据指定的条件关联两个数据集,然后根据选择的字段形成一个数据集。关联的key可以通过key表达式、Key-selector函数、字段位置以及Case Class字段指定。
对于两个Tuple类型的数据集可以通过字段位置进行关联,左边数据集的字段通过where方法指定,右边数据集的字段通过equalTo()方法指定。
val dataSet1: DataSet[(Int, String)] = ...
val dataSet2: DataSet[(Double, Int)] = ...
val result = dataSet1.join(dataSet2).where(0).equalTo(1)
对于Case Class类型的数据集可以直接使用字段名称作为关联Key:
val dataSet1: DataSet[Person] =
env.fromElements(Person(1,"Peter"),Person(2,"Alice"))
val dataSet2: DataSet[(Double, Int)] = env.fromElements((12.3,1),(22.3,3))
val result = dataSet1.join(dataSet2).where("id").equalTo(1)
可以在关联的过程中指定自定义Join Funciton,Funciton的入参为左边数据集中的数据元素和右边数据集的中的数据元素所组成的元祖,并返回一个经过计算处理后的数据,其中Left和right的Key相同。
val result = dataSet1.join(dataSet2).where("id").equalTo(1){
(left,right) => (left.id,left.name,right._1 + 1)
}
FlatMap与Map方法的相似,Join Function中同时提供了FlatJoin Function用来关联两个数据集,FlatJoin函数返回可以是一个或者多个元素,也可以不返回任何结果。
val result = dataSet1.join(dataSet2).where("id").equalTo(1) {
(left, right, collector: Collector[(String, Double)]) =>
collector.collect(left.name, right._1 + 1)
collector.collect("prefix_" + left.name, right._1 + 2)
}
为了能够更好地引导Flink底层去正确地处理数据集,可以在DataSet数据集关联中,通过Size Hint标记数据集的大小,Flink可以根据用户给定的线索调整计算策略,例如可以使用joinWithTiny或joinWithHuge提示第二个数据集的大小。
val dataSet1: DataSet[Person] =
env.fromElements(Person(1,"Peter"),Person(2,"Alice"))
val dataSet2: DataSet[(Double, Int)] = env.fromElements((12.3,1),(22.3,3))
//提示Flink第二个数据集是小数据集
val result = dataSet1. joinWithTiny (dataSet2).where("id").equalTo(1)
//提示Flink第二个数据集是大数据集
val result = dataSet1.joinWithHuge(dataSet2).where("id").equalTo(1)
除了能够使用joinWithTiny或joinWithHuge方法来提示关联数据集的大小之外,Flink还提供了Join算法提示,可以让Flink更加灵活且高效地执行Join操作。
//将第一个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
ds1.join(ds2,JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo(1)
//将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第二个数据集非常小的情况
ds1.join(ds2,JoinHint.BROADCAST_HASH_SECOND).where("id").equalTo(1)
//和不设定Hint相同,将优化的工作交给系统处理
ds1.join(ds2,JoinHint.OPTIMIZER_CHOOSES).where("id").equalTo(1)
//将两个数据集重新分区,并将第一个数据集转换成HashTable存储,该策略适用于第一个数据集比
第二个数据集小,但两个数据集相对都比较大的情况
ds1.join(ds2,JoinHint.REPARTITION_HASH_FIRST).where("id").equalTo(1)
//将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第二个数据集比
第一个数据集小,但两个数据集相对都比较大的情况
ds1.join(ds2,JoinHint.REPARTITION_HASH_SECOND).where("id").equalTo(1)
//将两个数据集重新分区,并将每个分区排序,该策略适用于两个数据集已经排好顺序的情况
ds1.join(ds2,JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo(1)
OuterJoin
OuterJoin对两个数据集进行外关联,包含left、right、full outer join三种关联方式,分别对应DataSet API中的leftOuterJoin、rightOuterJoin以及fullOuterJoin方法。
//左外关联两个数据集,按照相同的key进行关联,如果右边数据集中没有数据则会填充空值
dataSet1.leftOuterJoin(dataSet2).where("id").equalTo(1)
//右外关联两个数据集,按照相同的key进行关联,如果左边数据集中没有数据则会填充空值
dataSet1.rightOuterJoin(dataSet2).where("id").equalTo(1)
和JoinFunciton一样,OuterJoin也可以指定用户自定义的JoinFunciton。
dataSet1.leftOuterJoin(dataSet2).where("id").equalTo(1){
(left, right) =>
if(right == null) {(left.id,1)
} else{ (left.id,right._1)
}
}
对于大数据集,Flink也在OuterJoin操作中提供相应的关联算法提示,可以针对左右数据集的分布情况选择合适的优化策略,以提升整体作业的处理效率。
//将第二个数据集广播出去,并转换成HashTable存储,该策略适用于第一个数据集非常小的情况
ds1.leftOuterJoin(ds2,JoinHint.BROADCAST_HASH_SECOND).where("id").equalTo(
1)
//将两个数据集重新分区,并将第二个数据集转换成HashTable存储,该策略适用于第一个数据集比
第二个数据集小,但两个数据集相对都比较大的情况
ds1.leftOuterJoin(ds2,JoinHint.REPARTITION_HASH_SECOND).where("id").equalTo(1)
和Join操作不同,OuterJoin的操作只能适用于部分关联算法提示。其中leftOuterJoin仅支持OPTIMIZER_CHOOSES、BROADCAST_HASH_SECOND、REPARTITION_HASH_SECOND以及REPARTITION_SORT_MERGE四种策略。rightOuterJoin仅支持OPTIMIZER_CHOOSES、BROADCAST_HASH_FIRST、REPARTITION_HASH_ FIRST以及REPARTITION_SORT_MERGE四种策略。fullOuterJoin仅支持OPTIMIZER_CHOOSES、REPARTITION_SORT_MERGE两种策略。
Cogroup
将两个数据集根据相同的Key记录组合在一起,相同Key的记录会存放在一个Group中,如果指定key仅在一个数据集中有记录,则co-group Function会将这个Group与空的Group关联。
val dataset = dataSet1.coGroup(dataSet2).where("id").equalTo(1)
Cross
将两个数据集合并成一个数据集,返回被连接的两个数据集所有数据行的笛卡儿积,返回的数据行数等于第一个数据集中符合查询条件的数据行数乘以第二个数据集中符合查询条件的数据行数。Cross操作可以通过应用Cross Funciton将关联的数据集合并成目标格式的数据集,如果不指定Cross Funciton则返回Tuple2类型的数据集。
val dataSet1: DataSet[(Int, String)] = env.fromElements((12, "flink"), (22,
"spark"))
val dataSet2: DataSet[String] = env.fromElements("flink")
//不指定Cross Funciton,返回Tuple[T,V],其中T为左边数据集数据类型,V为右边数据集
val crossDataSet: DataSet[((Int, String), String)] = dataSet1.cross(dataSet2)
集合操作
Union
合并两个DataSet数据集,两个数据集的数据元素格式必须相同,多个数据集可以连续合并。
val dataSet1: DataSet[(Long, Int)] = ...
val dataSet2: DataSet[(Long, Int)] = ...
//合并两个数据集
val unioned = dataSet1.union(dataSet2)
Rebalance
对数据集中的数据进行平均分布,使得每个分区上的数据量相同。
val dataSet: DataSet[String] = env.fromElements("flink","spark")
//将DataSet数据集进行重平衡,然后执行map操作
val result = dataSet.rebalance().map {_.toUpperCase}
Hash-Partition
根据给定的Key进行Hash分区,key相同的数据会被放入同一个分区内。
val dataSet: DataSet[(String, Int)] = ...
//根据第一个字段进行数据重分区,然后再执行MapPartition操作处理每个分区的数据
val result = dataSet.partitionByHash(0).mapPartition { ... }
Range-Partition
根据给定的Key进行Range分区,key相同的数据会被放入同一个分区内。
val dataSet: DataSet[(String, Int)] = ...
//根据第一个字段进行数据重分区,然后再执行MapPartition操作处理每个分区的数据
val result = dataSet.partitionByRange(0).mapPartition { ... }
Sort Partition
在本地对DataSet数据集中的所有分区根据指定字段进行重排序,排序方式通过Order.ASCENDING以及Order.DESCENDING关键字指定。
val dataSet: DataSet[(String, Int)] = ...
//本地对根据第二个字段对分区数据进行逆序排序,
val result = dataSet.sortPartition(1, Order. DESCENDING)
//根据第一个字段对分区进行升序排序
.sortPartition(0, Order. ASCENDING)
//然后在排序的分区上执行MapPartition转换操作
.mapPartition { ... }
排序操作
First-n
返回数据集的n条随机结果,可以应用于常规类型数据集、Grouped类型数据集以及排序数据集上。
val dataSet: DataSet[(Int, String)] = ...
// 普通数据集上返回五条记录
val result1 = dataSet.first(5)
// 聚合数据集上返回五条记录
val result2 = dataSet.groupBy(0).first(5)
// Group排序数据集上返回五条记录
val result3 = dataSet.groupBy(0).sortGroup(1, Order.ASCENDING).first(5)
Minby/Maxby
从数据集中返回指定字段或组合对应最小或最大的记录,如果选择的字段具有多个相同值,则在集合中随机选择一条记录返回。
val dataSet: DataSet[(Int, Double, String)] = ...
// 返回数据集中第一个字段和第三个字段最小的记录,并产生新的数据集
val result1: DataSet[(Int, Double, String)] = dataSet.minBy(0, 2)
// 根据第一个字段对数据集进行聚合,并返回每个Group内第二个字段最小对应的记录
val result2: DataSet[(Int, Double, String)] = dataSet.groupBy(1).minBy(1)
DataSinks数据输出
经过各种数据Transformation操作后,最终形成用户需要的结果数据集。通常情况下,用户希望将结果数据输出在外部存储介质或者传输到下游的消息中间件内,在Flink中将DataStream数据输出到外部系统的过程被定义为DataSink操作。在Flink内部定义的第三方外部系统连接器中,支持数据输出的有Apache Kafka、Apache Cassandra、Kinesis、ElasticSearch、Hadoop FileSystem、RabbitMQ、NIFI等,除了Flink内部支持的第三方数据连接器之外,其他例如Apache Bahir框架也支持了相应的数据连接器,其中包括ActiveMQ、Flume、Redis、Akka、Netty等常用第三方系统。用户使用这些第三方Connector将DataStream数据集写入到外部系统中,需要将第三方连接器的依赖库引入到工程中。
基本数据输出
基本数据输出包含了文件输出、客户端输出、Socket网络端口等,这些输出方法已经在Flink DataStream API中完成定义,使用过程不需要依赖其他第三方的库。如下代码所示,实现将DataStream数据集分别输出在本地文件系统和Socket网络端口。
val personStream = env.fromElements(("Alex", 18), ("Peter", 43))
//通过writeAsCsv方法将数据转换成CSV文件输出,并执行输出模式为OVERWRITE
personStream.writeAsCsv("file:///path/to/person.csv",WriteMode.OVERWRITE)
//通过writeAsText方法将数据直接输出到本地文件系统
personStream.writeAsText("file:///path/to/person.txt")
//通过writeToSocket方法将DataStream数据集输出到指定Socket端口
personStream.writeToSocket(outputHost, outputPort, new SimpleStringSchema())
第三方数据输出
通常情况下,基于Flink提供的基本数据输出方式并不能完全地满足现实场景的需要,用户一般都会有自己的存储系统,因此需要将Flink系统中计算完成的结果数据通过第三方连接器输出到外部系统中。Flink中提供了DataSink类操作算子来专门处理数据的输出,所有的数据输出都可以基于实现SinkFunction完成定义。例如在Flink中定义了FlinkKafkaProducer类来完成将数据输出到Kafka的操作,需要根据不同的Kafka版本需要选择不同的FlinkKafkaProducer,目前FlinkKafkaProducer类支持Kafka大于1.0.0的版本,FlinkKafkaProducer11或者010支持Kafka0.10.0.x的版本。 通过使用FlinkKafkaProducer11将DataStream中的数据写入Kafka的Topic中。
val wordStream = env.fromElements("Alex", "Peter", "Linda")
//定义FlinkKafkaProducer011 Sink算子
val kafkaProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // 指定Broker List参数
"kafka-topic", // 指定目标Kafka Topic名称
new SimpleStringSchema) // 设定序列化Schema
/通过addsink添加kafkaProducer到算子拓扑中
wordStream.addSink(kafkaProducer)
在以上代码中使用FlinkKafkaProducer往Kafka中写入数据的操作相对比较基础,还可以配置一些高级选项,例如可以配置自定义properties类,将自定义的参数通过properties类传入FlinkKafkaProducer中。另外还可以自定义Partitioner将DataStream中的数据按照指定分区策略写入Kafka的分区中。也可以使用KeyedSerializationSchema对序列化Schema进行优化,从而能够实现一个Producer往多个Topic中写入数据的操作。
迭代计算
迭代计算在批量数据处理过程中的应用非常广泛,如常用的机器学习算法KMeans、逻辑回归,以及图计算等,都会用到迭代计算。DataSet API对迭代计算功能的支持相对比较完善,在性能上较其他分布式计算框架也具有非常高的优势。目前Flink中的迭代计算种类有两种模式,分别是Bulk Iteration(全量迭代计算)和Delt Iteration(增量迭代计算)。
全量迭代
在数据接入迭代算子过程中,Step Function每次都会处理全量的数据,然后计算下一次迭代的输入,最后根据触发条件输出迭代计算的结果,并将结果通过DataSet API传输到下一个算子中继续进行计算。Flink中迭代的数据和其他计算框架相比,并不是通过在迭代计算过程中不断生成新的数据集完成,而是基于同一份数据集上完成迭代计算操作,因此不需要对数据集进行大量拷贝复制操作,从而避免了数据在复制过程中所导致的性能下降问题。 针对全量迭代计算,一共分为以下几个步骤:
- 首先初始化数据,可以通过从DataSource算子中读取,也可以从其他转换Operators中接入。
- 其次定义Step Function,并在每一步迭代过程使用Step Function,结合数据集以及上一次迭代计算的Solution数据集,进行本次迭代计算。
- 每一次迭代过程中Step Function输出的结果,被称为Next Partital Solution数据集,该结果会作为下一次迭代计算的输入数据集。
- 最后一次迭代计算的结果输出,可通过DataSink输出,或接入到下一个Operators中。迭代终止的条件有两种,分别为达到最大迭代次数或者符合自定义聚合器收敛条件:
- 最大迭代次数:指定迭代的最大次数,当计算次数超过该设定值时,终止迭代。
- 自定义收敛条件:用户自定义的聚合器和收敛条件,例如将终止条件设定为当Sum统计结果小于零则终止,否则继续迭代。
val env = ExecutionEnvironment.getExecutionEnvironment // 创建初始化数据集 val initial = env.fromElements(0) //调用迭代方法,并设定迭代次数为10000次 val count = initial.iterate(10000) { iterationInput: DataSet[Int] => val result = iterationInput.map { i => val x = Math.random() val y = Math.random() i + (if (x * x + y * y < 1) 1 else 0) } result } //输出迭代结果 val result = count map { c => c / 10000.0 * 4 } result.print() env.execute("Iterative Pi Example")
增量迭代
增量迭代是通过部分计算取代全量计算,在计算过程中会将数据集分为热点数据和非热点数据集,每次迭代计算会针对热点数据展开,这种模式适合用于数据量比较大的计算场景,不需要对全部的输入数据集进行计算,所以在性能和速度上都会有很大的提升。
- Iteration Input:初始化数据,可以是DataSource生成,也可以是计算算子生成;
- Step Function:在每一步迭代过程中使用的计算方法,可以是类似于map、reduce、join等方法;
- Next Partial Solution:在每一次迭代过程中,当前的Step Function输出的结果,该结果会作为下一次迭代计算的输入;
- Iteration Result:最后一次迭代计算的输出,可以通过指定DataSink输出,或者接入下一个Operators中。
增量迭代的终止条件可以指定为:
- WorkSet为空:如果下一次迭代输入WorkSet为空,则终止迭代。
- 最大迭代次数:当计算次数超过指定迭代的最大次数,则终止迭代。
// 读取初始化数据集 val initialSolutionSet: DataSet[(Long, Double)] = ... //读取初始化WorkSet数据集 val initialWorkset: DataSet[(Long, Double)] = ... //设定迭代参数 val maxIterations = 100 val keyPosition = 0 //通过iterateDelta应用增量迭代方法 val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) { (solution, workset) => val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges()) val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent()) val nextWorkset = deltas.filter(new FilterByThreshold()) (deltas, nextWorkset) } //输出迭代计算的结果 result.writeAsCsv(outputPath) env.execute()
广播变量
广播变量是分布式计算框架中经常会用到的一种数据共享方式,目的是对小数据集采用网络传输的方式,在每个并行的计算节点的实例内存中存储一份该数据集,所在的计算节点实例均可以在本地内存中直接读取被广播的数据集,这样能够避免在数据计算过程中多次通过远程的方式从其他节点中读取小数据集,从而提升整体任务的计算性能。
在DataSet API中,广播变量通过DataSet的withBroadcastSet(DataSet, String)方法定义,其中第一个参数为所需要广播的DataSet数据集,需要保证DataSet在广播之前已经创建完毕,第二个参数为广播变量的名称,需要在当前应用中保持唯一,如以下代码将broadcastData数据集广播在data数据集所在的每个实例中。
// 创建需要广播的数据集
val broadcastData = env.fromElements(1, 2, 3)
//广播DataSet数据集,指定广播变量名称为broadcastSetName
data.map(...).withBroadcastSet(broadcastData, "broadcastSetName")
DataSet API支持在RichFunction接口中通过RuntimeContext读取到广播变量。首先在RichFunction中实现Open()方法,然后调用getRuntimeContext()方法获取应用的RuntimeContext,接着调用getBroadcastVariable()方法通过广播名称获取广播变量。同时Flink直接通过collect操作将数据集转换为本地Collection。需要注意的是,Collection对象的数据类型必须和定义的数据集的类型保持一致,否则会出现类型转换问题。
如以下代码实例所示,在dataSet2的Map转换中通过withBroadcastSet方法指定dataSet1为广播变量,然后通过实现RichMapFunction接口,在open()方法中调用RuntimeContext对象的getBroadcastVariable()方法,将dataSet1数据集获取到本地并转换成Collection。最后在map方法中访问dataSet1中的数据,完成后续的处理操作。
// 创建需要广播的数据集
val dataSet1:DataSet[Int] = ...
//创建输入数据集
val dataSet2:DataSet[String] = ...
dataSet2.map(new RichMapFunction[String, String]() {
var broadcastSet: Traversable[Int] = null
override def open(config: Configuration): Unit = {
// 获取广播变量数据集,并且转换成Collection对象
broadcastSet = getRuntimeContext().getBroadcastVariable[Int]("broadcastSet-1").asScala
}
def map(input: String): String = {
input + broadcastSet.toList //获取broadcastSet元素信息
}
//广播DataSet数据集,指定广播变量名称为broadcastSetName
}).withBroadcastSet(dataSet1, "broadcastSet-1")
分布式缓存
在批计算中,需要处理的数据集大部分来自于文件,对于某些文件尽管是放在类似于HDFS之上的分布式文件系统中,但由于Flink并不像MapReduce一样让计算随着数据所在位置上进行,因此多数情况下会出现通过网络频繁地复制文件的情况。因此对于有些高频使用的文件可以通过分布式缓存的方式,将其放置在每台计算节点实例的本地task内存中,这样就能够避免因为读取某些文件而必须通过网络远程获取文件的情况,进而提升整个任务的执行效率。
分存式缓存在ExecutionEnvironment中直接注册文件或文件夹,Flink在启动任务的过程中将会把指定的文件同步到task所在计算节点的本地文件系统中,目前支持本地文件、HDFS、S3等文件系统,另外可以通过Boolean参数来指定文件是否可执行,具体使用方式如下:
val env = ExecutionEnvironment.getExecutionEnvironment
// 通过从HDFS文件读取并转换成分布式缓存
env.registerCachedFile("hdfs:///path/file", "hdfsFile")
// 通过从本地文件中读取并注册为分布式缓存,并将可执行设定为True
env.registerCachedFile("file:///path/file", "localFile", true)
获取缓存文件的方式和广播变量相似,也是实现RichFunction接口,并通过RichFunction接口获得RuntimeContext对象,然后通过RuntimeContext提供的接口获取对应的本地缓存文件,使用方式如以下代码所示:
// 定义RichMapFunction获取分布式缓存文件
class FileMapper extends RichMapFunction[String, Int] {
var myFile: File = null
override def open(config: Configuration): Unit = {
// 通过RuntimeContext和DistributedCache获取缓存文件
myFile = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
}
override def map(value: String): Int = {
// 使用读取到的文件内容
val inputFile = new FileInputStream(myFile)
... //定义数据处理逻辑
}
}
通过RuntimeContext和DistributedCache获取缓存文件,且文件为java.io.File类型,然后将文件定义成静态对象中,就可以直接在map方法中读取文件中的内容,进行后续的算子操作,同时使用完缓存文件后Flink会自动将文件从本地文件系统中清除。
语义注解
在Flink批量数据处理过程中,往往传入函数的对象中可能含有很多字段,其中有些字段是Function计算会用到的,但有些字段在进入Funciton后并没有参与到实际计算过程中。针对这种情况,Flink提出了语义注解的功能,将这些字段在Function中通过注解的形式标记出来,区分出哪些是需要参与函数计算的字段,哪些是直接输出的字段。Flink Runtime在执行算子过程中,会对注解的字段进行判别,对于不需要函数处理的字段直接转发到Output对象中,以减少数据传输过程中所消耗的网络IO或者不必要的排序操作等,以提升整体应用的处理效率。
在DataSet API中将语义注解支持的字段分为三种类型,分别为Forwarded Fields、Non-Forward Fields以及Read Fields,下面详细介绍每种语义注解的使用方式。
Forwarded Fileds注解
转发字段(Forwarded Fileds)代表数据从Function进入后,对指定为Forwarded的Fileds不进行修改,且不参与函数的计算逻辑,而是根据设定的规则表达式,将Fields直接推送到Output对象中的相同位置或指定位置上。
转发字段的规则通过表达式进行指定,表达式中可以指定转发字段的源位置和目标位置。例如“f0->f2”,代表将Input的Tuple对象中的第一位字段转发到Output的Tuple对象中的第三位字段的位置上;单个字符“f2”,代表将Input的Tuple对象中的第三位字段转发到Output的Tuple对象中的相同位置,位置不发生变化;“f1->*”表达式代表将Input的Tuple对象中的第二位字段转发为Output整个字段,其他字段不再输出。多个表达式可以同时使用,表达式中间通过分号分隔,例如混合表达式“f1->f2;f3->f1;f0”。在使用表达式的定义字段转发规则的过程中,用户需要非常清楚哪些字段需要转发,保证所有的定义都是正确的,以免在字段转发过程中出现问题。注意,在Scala环境中一般是通过_1表示元祖中第一个参数,依次类推。
转发字段定义方式有两种,首先可以通过在函数类上添加Java注解的方式指定,其次也可以通过在Operator算子对应的Function后调用类似ForwardedFieldsFirst的方法来指定。
函数注解方式
ForwardedFields注解主要用于单输入的Function进行字段转发,例如Map、Reduce等。如下代码所示,定义实现MapFunction接口的MyMap Function Class,完成map方法的定义,最后在MyMap Class上添加ForwardedFields注解,其中f0->f2代表将输入元祖Tuple2中的第一个字段转发到输出元祖Tuple3中的第三个字段的位置上,该字段不参与函数计算,直接转发至输出对象中。
//通过函数注解方式配置转发字段,将输入数据集中的第一个字段转发到输出数据集的第二个字段中
@ForwardedFields("_1->_2")
class MyMapper extends MapFunction[(Int, Double), (Double, Int)]{
def map(t: (Int, Double)): (Double, Int) = {
//map函数中也定义为将t._1输出到output对象的t_2字段中
return (t._2 / 2, t._1)
}
对于多输入函数,如Cogroup、Join等函数,可以使用@ForwardedFieldsFirst以及@ForwardedFieldsSecond注解分别对输入的数据集进行转发配置,而且@Forwarded-FieldsFirst和@ForwardedFieldsSecond也可以在函数定义的过程中同时使用。
算子参数方式
在单输入Operator算子中,可以调用withForwardFields完成函数的转发字段的定义。例如data.map(myMapFnc).withForwardedFields(“f0->f2”),实现数据集data在myMapFnc函数调用中,Input Tuple中的第一个字段转发为Output Tuple中的第三个字段转发定义。针对多输入算子的转发字段定义,例如CoGroup、Join等算子,可以通过withForwardedFieldsSecond方法或withForwardedFieldsSecond方法分别对第一个和第二个输入数据集中的字段进行转发,两个方法也可以同时使用。以下实例就实现了对Join函数中第二个Input对象中第二个字段转发到输出对象中第三个字段逻辑的定义。
//创建数据集
val dataSet1: DataSet[Person] = ...
val dataSet2: DataSet[(Double, Int)] = ...
//指定Join函数,并且在算子尾部通过withForwardedFieldsSecond方法指定字段转发逻辑
val result = dataSet1.join(dataSet2).where("id").equalTo(1) {
(left, right, collector: Collector[(String, Double, Int)]) =>
collector.collect(left.name, right._1 + 1, right._2)
collector.collect("prefix_" + left.name, right._1 + 2, right._2)
}.withForwardedFieldsSecond("_2->_3")//定义转发逻辑
Non-Forwarded Fileds注解
和前面提到的Forwarded Fileds相反,Non-Forwarded Fileds用于指定不转发的字段,也就是说除了某些字段不转发在输出Tuple相应的位置上,其余字段全部放置在输出Tuple中相同的字段位置上,对于被Non-Forwarded Fileds指定的字段将必须参与到函数计算过程中,并产生新的结果进行输出。在使用Non-Forwarded Fields注解时,需要对应的Function具有相同类型的Input和Output对象。例如,表达式“f1;f3”代表输入函数的Input对象中,第二个和第四个字段不需要保留在Output对象中,其余字段全部按照原来位置进行输出,其中第二个和第四个字段需要在函数计算过程中产生,然后在输出结果中完成整个Output Tuple对象的整合。
和Forwarded Fileds一样,非转发字段的定义可以通过函数类注解的方式实现,对于不同的函数输入分别有不同的注解方式可以使用:对于单输入的算子,例如Map、Reduce等,可以通过NonForwardedFields注解进行的定义。对于多输入的算子,例如CoGroup、Join等,可以通过NonForwardedFieldsFirst和NonForwardedFieldsSecond分别对第一个对象和第二个对象中的字段进行转发逻辑定义。
// 不转发第二个,其余字段转发到输出对象相同位置上
@NonForwardedFields("_2")
class MyMapper extends MapFunction[(String, Long, Int), (String, Long, Int)] {
def map(input: (String, Long, Int)): (String, Long, Int) = {
//第一个和第三个字段不参与函数计算,第二个字段参与到函数计算过程中,并产生新的结果
return (input._1, input._2 / 2, input._3)
}
}
Read Fields注解
读取字段(Read Fields)注解用来指定Function中需要读取以及参与函数计算的字段,在注解中被指定的字段将全部参与当前函数结果的运算过程,如条件判断、数值计算等。和前面的字段注解类似,Flink针对读取字段也提供了相应的注解类定义,可以创建Function的Class上部通过使用注解定义转发规则。
对于单输入类型函数,使用@ReadFields完成注解定义,表达式可以是“f0;f2”,表示Input中Tuple的第一个字段和第三个字段参与函数的运算过程,其他字段因为不参与计算不需要读取至函数中,进而减少函数执行过程中数据传输的大小。 如下代码实例所示,其中f0和f3参与了函数计算过程,f0参与了条件判断,f3字段参与了数值运算,指定在@ReadFields(“_1; _2”)函数注解指明,f1虽然在函数中引用过,但其并没有涉及运算,因此无须在注解中指明。
@ReadFields("_1; _2")
class MyMapper extends MapFunction[(Int, Int, Double, Int), (Int, Long)]{
def map(value: (Int, Int, Double, Int)): (Int, Double) = {
if (value._1 == 42) {
return (value._1, value._3)
} else {
return (value._2 + 10, value._3)
}
}}
针对多输入的函数,例如Join、CoGroup等函数,可以使用ReadFieldsFirst和ReadFieldsSecond注解来完成对第一个和第二个输入对象读取字段的定义,具体的定义方式和单数入函数定义类似,并且可以同时使用两个注解。
一旦使用了Read Fields注解,函数中所有参与计算的字段均必须在注解中指明,否则会导致计算函数执行失败等情况,另外如果字段并未参与函数的计算过程,也可以在注解中指定,这种方式不会对程序有太大影响,用户应该尽可能清楚函数中哪些字段参与了计算,哪些字段未参与函数计算过程。