Flink编程模型

2021/04/03 Flink

Flink编程模型

本章将重点介绍Flink编程模型中的基本概念和编写Flink应用程序所遵循的基本模式。其中,包括Flink支持的数据集类型,有界数据集和无界数据集的区别,以及有界数据集和无界数据集之间的转换。同时针对无界和有界数据集的处理,将介绍Flink分别提供对应的开发接口DataStream API和DataSet API的使用。然后介绍Flink程序结构,包括基本的Flink应用所包含的组成模块等。最后介绍Flink所支持的数据类型,包括常用的POJOs、Tuples等数据类型。

数据集类型

现实世界中,所有的数据都是以流式的形态产生的,不管是哪里产生的数据,在产生的过程中都是一条条地生成,最后经过了存储和转换处理,形成了各种类型的数据集。根据现实的数据产生方式和数据产生是否含有边界(具有起始点和终止点)角度,将数据分为两种类型的数据集,一种是有界数据集,另外一种是无界数据集。

有界数据集

有界数据集具有时间边界,在处理过程中数据一定会在某个时间范围内起始和结束,有可能是一分钟,也有可能是一天内的交易数据。对有界数据集的数据处理方式被称为批计算(Batch Processing),例如将数据从RDBMS或文件系统等系统中读取出来,然后在分布式系统内处理,最后再将处理结果写入存储介质中,整个过程就被称为批处理过程。而针对批数据处理,目前业界比较流行的分布式批处理框架有Apache Hadoop和Apache Spark等。

无界数据集

对于无界数据集,数据从开始生成就一直持续不断地产生新的数据,因此数据是没有边界的,例如服务器的日志、传感器信号数据等。和批量数据处理方式对应,对无界数据集的数据处理方式被称为流式数据处理,简称为流处理(Streaming Process)。可以看出,流式数据处理过程实现复杂度会更高,因为需要考虑处理过程中数据的顺序错乱,以及系统容错等方面的问题,因此流处理需要借助专门的流数据处理技术。目前业界的Apache Storm、Spark Streaming、Apache Flink等分布式计算引擎都能不同程度地支持处理流式数据。

统一数据处理

有界数据集和无界数据集只是一个相对的概念,主要根据时间的范围而定,可以认为一段时间内的无界数据集其实就是有界数据集,同时有界数据也可以通过一些方法转换为无界数据。例如系统一年的订单交易数据,其本质上应该是有界的数据集,可是当我们把它一条一条按照产生的顺序发送到流式系统,通过流式系统对数据进行处理,在这种情况下可以认为数据是相对无界的。对于无界数据也可以拆分成有界数据进行处理,例如将系统产生的数据接入到存储系统,按照年或月进行切割,切分成不同时间长度的有界数据集,然后就可以通过批处理方式对数据进行处理。从以上分析我们可以得出结论:有界数据和无界数据其实是可以相互转换的。有了这样的理论基础,对于不同的数据类型,业界也提出了不同的能够统一数据处理的计算框架。

目前在业界比较熟知的开源大数据处理框架中,能够同时支持流式计算和批量计算,比较典型的代表分别为Apache Spark和Apache Flink两套框架。其中Spark通过批处理模式来统一处理不同类型的数据集,对于流数据是将数据按照批次切分成微批(有界数据集)来进行处理。Flink则从另外一个角度出发,通过流处理模式来统一处理不同类型的数据集。Flink用比较符合数据产生的规律方式处理流式数据,对于有界数据可以转换成无界数据统一进行流式,最终将批处理和流处理统一在一套流式引擎中,这样用户就可以使用一套引擎进行批计算和流计算的任务。

前面已经提到用户可能需要通过将多种计算框架并行使用来解决不同类型的数据处理,例如用户可能使用Flink作为流计算的引擎,使用Spark或者MapReduce作为批计算的引擎,这样不仅增加了系统的复杂度,也增加了用户学习和运维的成本。而Flink作为一套新兴的分布式计算引擎,能够在统一平台中很好地处理流式任务和批量任务,同时使用流计算模式更符合数据产生的规律,相信Flink会在未来成为众多大数据处理引擎的一颗明星。

Flink编程接口

Flink根据数据集类型的不同将核心数据处理接口分为两大类,一类是支持批计算的接口DataSet API,另外一类是支持流计算的接口DataStream API。同时Flink将数据处理接口抽象成四层,由上向下分别为SQL API、Table API、DataStream /DataSet API以及Stateful Stream Processing API,用户可以根据需要选择任意一层抽象接口来开发Flink应用。

  • Flink SQL Flink提供了统一的SQL API完成对批计算和流计算的处理,目前SQL API也是社区重点发展的接口层,对SQL API也正在逐步完善中,其主要因为SQL语言具有比较低的学习成本,能够让数据分析人员和开发人员更快速地上手,帮助其更加专注于业务本身而不是受限于复杂的编程接口。
  • Table API Table API将内存中的DataStream和DataSet数据集在原有的基础之上增加Schema信息,将数据类型统一抽象成表结构,然后通过Table API提供的接口处理对应的数据集。SQL API则可以直接查询Table API中注册表中的数据表。Table API构建在DataStream和DataSet之上的同时,提供了大量面向领域语言的编程接口,例如GroupByKey、Join等操作符,提供给用户一种更加友好的处理数据集的方式。除此之外,Table API在转换为DataStream和DataSet的数据处理过程中,也应用了大量的优化规则对处理逻辑进行了优化。同时Table API中的Table可以和DataStream及DataSet之间进行相互转换。
  • DataStream API和DataSet API DataStream API和DataSet API主要面向具有开发经验的用户,用户可以使用DataStream API处理无界流数据,使用DataSet API处理批量数据。DataStream API和DataSet API接口同时提供了各种数据处理接口,例如map,filter、oins、aggregations、window等方法,同时每种接口都支持了Java、Scala及Python等多种开发语言的SDK。
  • Stateful Stream Process API Stateful Stream Process API是Flink中处理Stateful Stream最底层的接口,用户可以使用Stateful Stream Process接口操作状态、时间等底层数据。使用Stream Process API接口开发应用的灵活性非常强,可以实现非常复杂的流式计算逻辑,但是相对用户使用成本也比较高,一般企业在使用Flink进行二次开发或深度封装的时候会用到这层接口。

Flink程序结构

和其他分布式处理引擎一样,Flink应用程序也遵循着一定的编程模式。不管是使用DataStream API还是DataSet API基本具有相同的程序结构。 通过流式计算的方式实现对文本文件中的单词数量进行统计,然后将结果输出在给定路径中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SteamWordCount {

  public static void main(String[] args) throws Exception {
    // 创建流处理执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    // 指定数据源地址,读取输入数据
    DataStream<String> inputDataStream = env.readTextFile("D:\\flink\\helloworld.txt");

    // 对数据集指定转换操作逻辑
    DataStream<Tuple2<String, Integer>> resultStream =
            inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                      @Override
                      public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] words = s.split(" ");
                        for (String word : words) {
                          collector.collect(new Tuple2<>(word, 1));
                        }
                      }
                    })
                    .keyBy(tuple -> tuple.f0)
                    .sum(1);
    // 指定计算结果输出位置
    resultStream.print();
    // 指定名称并触发流式任务
    env.execute("Stream WordCount");
  }

}

整个Flink程序一共分为设定Flink执行环境、创建和加载数据集、对数据集指定转换操作逻辑、指定计算结果输出位置、调用execute方法触发程序执行。下面将详细介绍每个步骤。

设定Flink执行环境

运行Flink程序的第一步就是获取相应的执行环境,执行环境决定了程序执行在什么环境(例如本地运行环境或者集群运行环境)中。同时不同的运行环境决定了应用的类型,批量处理作业和流式处理作业分别使用的是不同的Execution Environment。例如StreamExecutionEnvironment是用来做流式数据处理环境,ExecutionEnvironment是批量数据处理环境。 执行环境是整个flink程序执行的上下文,记录了相关配置(如并行度等),并提供了一系列方法,如读取输入流的方法,以及真正开始运行整个代码的execute方法等。

   public void createStreamExecutionEnvironment() {
        //设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境
        StreamExecutionEnvironment.getExecutionEnvironment();
//指定并行度创建本地执行环境
        StreamExecutionEnvironment.createLocalEnvironment(5);
//指定远程JobManagerIP和RPC端口以及运行程序所在jar包及其依赖包
        StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar");
        }

其中第三种方式可以直接从本地代码中创建与远程集群的Flink JobManager的RPC连接,通过指定应用程序所在的Jar包,将运行程序远程拷贝到JobManager节点上,然后将Flink应用程序运行在远程的环境中,本地程序相当于一个客户端。

开发批量应用需要获取Execution-Environment来构建批量应用开发环境,如以下代码实例通过调用ExecutionEnvironment的静态方法来获取批计算环境。

   public void createExecutionEnvironment() {
        //设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境
        ExecutionEnvironment.getExecutionEnvironment();
//指定并行度创建本地执行环境
        ExecutionEnvironment.createLocalEnvironment(5);
//指定远程JobManagerIP和RPC端口以及运行程序所在jar包及其依赖包
        ExecutionEnvironment.createRemoteEnvironment("JobManagerHost",6021,5,"/user/application.jar");
    }

针对Scala和Java不同的编程语言环境,Flink分别制定了不同的语言同时分别定义了不同的Execution Environment接口。StreamExecutionEnvironment Scala开发接口在org.apache.flink.streaming.api.scala包中,Java开发接口在org.apache.flink.streaming.api.java包中;ExecutionEnvironment Scala接口在org.apache.flink.api.scala包中,Java开发接口则在org.apache.flink.api.java包中。用户使用不同语言开发Flink应用时需要引入不同环境对应的执行环境。

初始化数据

创建完成ExecutionEnvironment后,需要将数据引入到Flink系统中。ExecutionEnvironment提供不同的数据接入接口完成数据的初始化,将外部数据转换成DataStream或DataSet数据集。 如以下代码所示,通过调用readTextFile()方法读取file:///pathfile路径中的数据并转换成DataStream数据集。

  DataStream<String> inputDataStream = env.readTextFile("D:\\flink\\helloworld.txt");

通过读取文件并转换为DataStream[String]数据集,这样就完成了从本地文件到分布式数据集的转换,同时在Flink中提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将Flink系统和其他第三方系统连接,直接获取外部数据。

执行转换操作

数据从外部系统读取并转换成DataStream或者DataSet数据集后,下一步就将对数据集进行各种转换操作。Flink中的Transformation操作都是通过不同的Operator来实现,每个Operator内部通过实现Function接口完成数据处理逻辑的定义。在DataStream API和DataSet API提供了大量的转换算子,例如map、flatMap、filter、keyBy等,用户只需要定义每种算子执行的函数逻辑,然后应用在数据转换操作Dperator接口中即可。 如下代码实现了对输入的文本数据集通过FlatMap算子转换成数组,然后过滤非空字段,将每个单词进行统计,得到最后的词频统计结果。

        // 对数据集指定转换操作逻辑
        DataStream<Tuple2<String, Integer>> resultStream =
        inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        String[] words = s.split(" ");
        for (String word : words) {
        collector.collect(new Tuple2<>(word, 1));
        }
        }
        })
        .keyBy(tuple -> tuple.f0)
        .sum(1);

通过接口处理数据,极大地简化数据处理逻辑的定义,只需要通过传入相应Lambada计算表达式,就能完成Function定义。特殊情况下用户也可以通过实现Function接口来完成定义数据处理逻辑。然后将定义好的Function应用在对应的算子中即可。

Flink中定义Funciton的计算逻辑可以通过如下几种方式完成定义。

通过创建Class实现Funciton接口

Flink中提供了大量的函数供用户使用,例如以下代码通过定义MyMapFunction Class实现MapFunction接口,然后调用DataStream的map()方法将MyMapFunction实现类传入,完成对实现将数据集中字符串记录转换成大写的数据处理。

  public  static  class myFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String [] words=  s.split(" ");
            for ( String word: words) {
                collector.collect(new Tuple2<>(word,1));
            }
        }
    }

通过创建匿名类实现Funciton接口

除了以上单独定义Class来实现Function接口之处,也可以直接在map()方法中创建匿名实现类的方式定义函数计算逻辑。

inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                            @Override
                            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                                String[] words = s.split(" ");
                                for (String word : words) {
                                    collector.collect(new Tuple2<>(word, 1));
                                }
                            }
                        })

通过实现RichFunciton接口

前面提到的转换操作都实现了Function接口,例如MapFunction和FlatMap-Function接口,在Flink中同时提供了RichFunction接口,主要用于比较高级的数据处理场景,RichFunction接口中有open、close、getRuntimeContext和setRuntimeContext等方法来获取状态,缓存等系统内部数据。和MapFunction相似,RichFunction子类中也有RichMap-Function, 如下代码通过实现RichMapFunction定义数据处理逻辑。

//定义匿名类实现RichMapFunction接口,完成对字符串到整形数字的转换
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})

分区Key指定

在DataStream数据经过不同的算子转换过程中,某些算子需要根据指定的key进行转换,常见的有join、coGroup、groupBy类算子,需要先将DataStream或DataSet数据集转换成对应的KeyedStream和GroupedDataSet,主要目的是将相同key值的数据路由到相同的Pipeline中,然后进行下一步的计算操作。需要注意的是,在Flink中这种操作并不是真正意义上将数据集转换成Key-Value结构,而是一种虚拟的key,目的仅仅是帮助后面的基于Key的算子使用,分区Key可以通过两种方式指定:

根据字段位置指定

在DataStream API中通过keyBy()方法将DataStream数据集根据指定的key转换成重新分区的KeyedStream,如以下代码所示,对数据集按照相同key进行sum()聚合操作。

val dataStream: DataStream[(String, Int)] = env.fromElements(("a", 1), ("c",
  2))
//根据第一个字段重新分区,然后对第二个字段进行求和运算
Val result = dataStream.keyBy(0).sum(1)

在DataSet API中,如果对数据根据某一条件聚合数据,对数据进行聚合时候,也需要对数据进行重新分区。 如以下代码所示,使用DataSet API对数据集根据第一个字段作为GroupBy的key,然后对第二个字段进行求和运算。

val dataSet = env.fromElements(("hello", 1), ("flink", 3))
  //根据第一个字段进行数据重分区
  val groupedDataSet:GroupedDataSet[(String,Int)] = dataSet.groupBy(0)
  //求取相同key值下第二个字段的最大值
  groupedDataSet.max(1)

根据字段名称指定

KeyBy和GroupBy的Key除了能够通过字段位置来指定之外,也可以根据字段的名称来指定。使用字段名称需要DataStream中的数据结构类型必须是Tuple类或者POJOs类的。 如以下代码所示,通过指定name字段名称来确定groupby的key字段。

val personDataSet = env.fromElements(new Persion("Alex", 18),new 
Persion("Peter", 43))
//指定name字段名称来确定groupby字段
personDataSet.groupBy("name").max(1)

如果程序中使用Tuple数据类型,通常情况下字段名称从1开始计算,字段位置索引从0开始计算,以下代码中两种方式是等价的。

val personDataStream = env.fromElements(("Alex", 18),("Peter", 43))
//通过名称指定第一个字段名称
personDataStream.keyBy("_1")
//通过位置指定第一个字段
personDataStream.keyBy(0)

如果在Flink中使用嵌套的复杂数据结构,可以通过字段名称指定Key,例如:

class CompelexClass(var nested: NestedClass, var tag: String) {
  def this() { this(null, "") }
}
class NestedClass (
    var id: Int,
    tuple: (Long, Long, String)){
  def this() { this(0, (0, 0, "")) }
}

通过调用“nested”获取整个NestedClass对象里所有的字段,调用“tag”获取CompelexClass中tag字段,调用“nested.id”获取NestedClass中的id字段,调用“nested.tuple._1”获取NestedClass中tuple元祖的第一个字段。由此可以看出,Flink能够支持在复杂数据结构中灵活地获取字段信息,这也是非 Key-Value的数据结构所具有的优势。

通过Key选择器指定

另外一种方式是通过定义Key Selector来选择数据集中的Key,如下代码所示,定义KeySelector,然后复写getKey方法,从Person对象中获取name为指定的Key。

case class Person(name: String, age: Int)
val person= env.fromElements(Person("hello",1), Person("flink",4))
//定义KeySelector,实现getKey方法从case class中获取Key
val keyed: KeyedStream[WC]= person.keyBy(new KeySelector[Person, String]() {
  override def getKey(person: Person): String = person.word
})

输出结果

数据集经过转换操作之后,形成最终的结果数据集,一般需要将数据集输出在外部系统中或者输出在控制台之上。在Flink DataStream和DataSet接口中定义了基本的数据输出方法,例如基于文件输出writeAsText(),基于控制台输出print()等。同时Flink在系统中定义了大量的Connector,方便用户和外部系统交互,用户可以直接通过调用addSink()添加输出系统定义的DataSink类算子,这样就能将数据输出到外部系统。 以下实例调用DataStream API中的writeAsText()和print()方法将数据集输出在文件和客户端中。

//将数据输出到文件中
counts.writeAsText("file://path/to/savefile")
//将数据输出控制台
counts.print()

程序触发

所有的计算逻辑全部操作定义好之后,需要调用ExecutionEnvironment的execute()方法来触发应用程序的执行,其中execute()方法返回的结果类型为JobExecutionResult,里面包含了程序执行的时间和累加器等指标。需要注意的是,execute方法调用会因为应用的类型有所不同,DataStream流式应用需要显性地指定execute()方法运行程序,如果不调用则Flink流式程序不会执行,但对于DataSet API输出算子中已经包含对execute()方法的调用,则不需要显性调用execute()方法,否则会出现程序异常。

//调用StreamExecutionEnvironment的execute方法执行流式应用程序
env.execute("App Name");

使用DataSet批处理数据

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class DataSetWordCount {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        String path = "D:\\flink\\helloworld.txt";
        DataSet<String> stringDataSource = executionEnvironment
                .readTextFile(path);
        DataSet<Tuple2<String, Integer>> dataSet =
                stringDataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                            @Override
                            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                                String[] words = s.split(" ");
                                for (String word : words) {
                                    collector.collect(new Tuple2<>(word, 1));
                                }
                            }
                        })
                        .groupBy(0)
                        .sum(1);
        dataSet.print();
    }
}

Flink数据类型

Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的Java或Scala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java原生基本类型(装箱)或 String类型,例如Integer、String、Double等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建Int类型的数据集
val intStream:DataStream[Int] = env.fromElements(3, 1, 2, 1, 5)
//创建String类型的数据集
val dataStream: DataStream[String] = env.fromElements("hello", "flink")

Flink实现另外一种TypeInfomation是BasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或 String对象的数组,如下代码通过使用Array数组和List集合创建DataStream数据集。

//通过从数组中创建数据集
val dataStream: DataStream[Int] = env.fromCollection(Array(3, 1, 2, 1, 5))
//通过List集合创建数据集
val dataStream: DataStream[Int] = env.fromCollection(List(3, 1, 2, 1, 5))

Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,Flink在Java接口中定义了元祖类(Tuple)供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化Tuple2创建具有两个元素的数据集
val tupleStream2: DataStream[Tuple2[String, Int]] = env.fromElements(new Tuple2("a",1), new Tuple2("c", 2))

Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据word字段重新分区

/定义WordCount Case Class数据结构
case class WordCount(word: String, count: Int)
//通过fromElements方法创建数据集
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") // 根据word字段为分区字段,
val keyStream2 = input.keyBy(0) //也可以通过指定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用Tuple中的默认字段名称。

//通过scala Tuple创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),
  ("c", 2))
//使用默认字段名称获取字段,其中_1表示tuple这种第一个字段
tupleStream.keyBy("_1")

POJOs类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java和Scala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where(“name”).equalTo(“personName”),对于用户做数据处理则非常透明和简单,如代码清单3-2所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:

  • POJOs类必须是Public修饰且必须独立定义,不能是内部类;
  • POJOs类中必须含有默认空构造器;
  • POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
  • POJOs类中的字段类型必须是Flink支持的。
//定义Java Person类,具有public修饰符
public class Person {
  //字段具有public修饰符
    public String name;
    public int age;
  //具有默认空构造器
    public Person() {
    }
    public Person(String name, int age) {
      this.name = name;
      this.age = age;   
    }
}

定义好POJOs Class后,就可以在Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

val persionStream = env.fromElements(new Person("Peter",14),new 
Person("Linda",25))
//通过Person.name来指定Keyby字段
persionStream.keyBy("name")
Scala POJOs数据结构定义如下使用方式与Java POJOs相同
class Person(var name: String, var age: Int) {
    //默认空构造器
      def this() {
        this(null, -1)
  }
}

Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

特殊数据类型

在Flink中也支持一些比较特殊的数据数据类型,例如Scala中的List、Map、Either、Option、Try数据类型,以及Java中Either数据类型,还有Hadoop的Writable数据类型。如下代码所示,创建Map和List类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息。

//创建Map类型数据集
val mapStream = 
env.fromElements(Map("name"->"Peter","age"->18),Map("name"->"Linda",
"age"->25))
//创建List类型数据集
val listStream = env.fromElements(List(1,2,3,5),List(2,4,3,2))

TypeInformation信息获取

通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala API和Java API中,Flink分别使用了不同的方式重构了数据类型信息。

Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。 当使用Scala API开发Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报“could not find implicit value for evidence parameter of type TypeInformation”的错误。 这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:

import org.apache.flink.api.scala._

Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示(Ctype Himts)来告诉系统函数中传入的参数类型信息和输出参数信息。 通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

DataStream<Integer> typeStream = input
  .flatMap(new MyMapFunction<String, Integer>())
  .returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
    });
//定义泛型函数,输入参数类型为<T,O>,输出参数类型为O
  class MyMapFunction<T, O> implements MapFunction<T, O> {
    public void flatMap(T value, Collector<O> out) {
      //定义计算逻辑
    }
  }

在使用Java API定义POJOs类型数据时,PojoTypeInformation为POJOs类中的所有字段创建序列化器,对于标准的类型,例如Integer、String、Long等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。

通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用Avro对POJOs类进行序列化,如下代码通过在ExecutionConfig中调用enableForceAvro()来开启Avro序列化。

ExecutionEnvironment env =
        ExecutionEnvironment.getExecutionEnvironment();
//开启Avro序列化方式
        env.getConfig().enableForceAvro();

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfig的addDefault-KryoSerializer()方法向Kryo中添加自定义的序列化器。

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass)

自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。 如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。 通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0, T1> {
  public T0 field0;
  public T1 field1;
}

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation。

public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple> {
  @Override
  public TypeInformation<CustomTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
    return new CustomTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
  }
}

Search

    微信好友

    博士的沙漏

    Table of Contents