Flink编程SQL
Flink提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一。
本章将重点介绍如何使用Flink Table & SQL API来构建流式应用和批量应用。
Flink Table模块
Flink的Table模块包括 Table API和 SQL:
- Table API: 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便;
- SQL:作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手。 Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。
Table API & SQL特点
Flink之所以选择将 Table API & SQL作为未来的核心 API,是因为其具有一些非常重要的特点:
- 声明式:用户只关心做什么,不用关心怎么做
- 高性能:支持查询优化,可以获取更好的执行性能
- 批流统一:相同的统计逻辑,既可以流模式运行,也可以批模式运行
- 标准稳定:语义遵循SQL标准,不易变动
- 易理解:语义明确,所见即所得
Table API& SQL发展历程
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。 Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL的实现! 查询处理器的选择 : Flink1.11之后Blink Query Processor查询处理器已经是默认的了。
开发准备
开发环境构建
在使用Table API和SQL开发Flink应用之前,通过添加Maven配置到项目中,在本地工程中引入相应的flink-table_2.11依赖库,库中包含了Table API和SQL接口。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.0</version>
</dependency>
然后需要分别引入开发批量应用和流式应用对应的库,对于批量应用,需要引入以下flink-scala_2.11依赖库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.0</version>
</dependency>
对于构建实时应用,需要引入以下flink-streaming-scala_2.11依赖库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.0</version>
</dependency>
由于Flink Table接口中引入了Apache Calcite第三方库,会阻止Java虚拟机对用户的Classloaders进行垃圾回收,因此不建议用户在构建Flink应用时将flink-table依赖库打包进fat-jar中,可以在集群环境中将{FLINK_HOME}/opt的对应的flink-table jar复制到{FLINK_HOME}/lib中来解决此类问题,其中FLINK_HOME为Flink安装路径。
程序结构
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable");
// create a Table object from a Table API query
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...
获取环境
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
Table API和SQL中具有相同的基本编程模型。首先需要构建对应的TableEnviroment创建关系型编程环境,才能够在程序中使用Table API和SQL来编写应用程序,另外Table API和SQL接口可以在应用中同时使用,Flink SQL基于Apache Calcite框架实现了SQL标准协议,是构建在Table API之上的更高级接口。
创建表
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
查询表
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry")
.isEqual("FRANCE"))
.groupBy($("cID"), $("cName")
.select($("cID"), $("cName"), $("revenue")
.sum()
.as("revSum"));
// emit or convert Table
// execute query
数据查询和过滤
对于已经在TableEnvironment中注册的数据表,可以通过scan方法查询已经在CataLog中注册的表并转换为Table结构,然后在Table上使用select操作符查询需要获取的指定字段。 如以下代码所示从TableEnvironment中查询已经注册的Sensors表。
val sensors: Table = tableEnv.scan("Sensors")
//可以通过在Table结构上使用select方法查询指定字段,并通过as进行字段重命名
val result = tableEnv.scan("Sensors").select('id, 'var1 as 'myvar1)
//使用select(*)将所有的字段查询出来
val result = tableEnv.scan("Sensors").select('*)
Table API中可以使用类似SQL中的as方法对字段进行重命名,例如将Sensors表中的字段按照位置分别命名为a、b、c、d。
val sensors: Table = tableEnv.scan("Sensors").as('a, 'b, 'c, 'd)
可以使用filter或where方法过滤字段和检索条件,将需要的数据检索出来。注意,在Table API语法中进行相等判断时需要三个等号连接表示。
//使用filter方法进行数据筛选
val result = sensors.filter('var1%2 === 0)
//使用where方法进行数据筛选
val result = sensors.where('id === "1001")
窗口操作
Flink Table API要将窗口分为GroupBy Window和OverWindow两种类型,具体的说明如下:
GroupBy Window
GroupBy Window和DataStream API、DataSet API中提供的窗口一致,都是将流式数据集根据窗口类型切分成有界数据集,然后在有界数据集之上进行聚合类运算。如下代码所示,在Table API中使用window()方法对窗口进行定义和调用,且必须通过as()方法指定窗口别名以在后面的算子中使用。在window()方法指定窗口类型之后,需要紧跟groupBy()方法来指定创建的窗口名称以窗口数据聚合的Key,然后使用Select()方法来指定需要查询的字段名称以及窗口聚合数据进行统计的函数,如以下代码指定为varl.sum,其他还可以为mm.max等。
//获取TableEnvironment
val tStreamEnv = TableEnvironment.getTableEnvironment(env)
// 通过scan方法在CataLog中找到Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
val result = sensors
.window([w: Window] as 'window) // 指定窗口类型并对窗口重命名为window
.groupBy('window) // 根据窗口进行聚合,窗口数据会分配到单个Task算子中
.select('var1.sum) // 指定对var1字段进行sum求和
在流式计算任务中,GroupBy聚合条件中可以以上实例选择使用Window名称,也可是一个(或多个)Key值与Window的组合。如果仅指定Window名称,则和Global Window相似,窗口中的数据都会被汇合到一个Task线程中处理,统计窗口全局的结果;如果指定Key和Window名称的组合,则窗口中的数据会分布到并行的算子实例中计算结果。如下实例所示,GroupBy中指定除窗口名称以外的Key,完成对指定Key在窗口上的数据聚合统计。
//获取TableEnvironment
val tStreamEnv = TableEnvironment.getTableEnvironment(env)
// 通过scan方法在CataLog中找到Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
val result = sensors
.window([w: Window] as 'window) // 指定窗口类型并将窗口重命名为window
.groupBy('window, 'id) // 根据窗口进行聚合,窗口数据会分配到单个Task算子中
.select('id, 'var1.sum) // 指定对var1字段进行sum求和
在select语句中除了可以获取数据元素外,还可以获取窗口的元数据信息,例如可以通过window.start获取当前窗口的起始时间,通过window.end获取当前窗口的截止时间(含窗口区间上界),以及通过window.rowtime获取当前窗口截止时间(不含窗口区间上界)。
// 通过scan方法在CataLog中找到Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
val result = sensors
.window([w: Window] as 'window) // 指定窗口类型并将窗口重命名为window
.groupBy('window, 'id) // 根据窗口进行聚合,窗口数据会分配到单个Task算子中
// 指定对var1字段进行sum求和,并指定窗口起始时间、结束时间及rowtime等元数据信息
.select('id, 'var1.sum,'window.start, 'window.end, 'window.rowtime)
需要注意的是,在以上window()方法中需要指定的是不同的窗口类型,以确定数据元素被分配到窗口的逻辑。在Table API中支持Tumble、Sliding及Session Windows三种窗口类型,并分别通过不同的Window对象来完成定义。例如Tumbling Windows对应Tumble对象,Sliding Windows对应Slide对象,Session Windows对应Session对象,同时每种对象分别具有和自身窗口类型相关的参数。
Tumbling Windows
前面已经提到滚动窗口的窗口长度是固定的,窗口和窗口之间的数据不会重合,例如每5min统计最近5min内的用户登录次数。滚动窗口可以基于Event Time、Process Time以及Row-Count来定义。 如以下代码实例,Table API中的滚动窗口使用Tumble Class来创建,且分别基于EventTime、ProcessTime以及Row-Count来定义窗口。
// 通过scan方法在CataLog中查询Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
// 基于EventTime时间概念创建滚动窗口,窗口长度为1h
sensors.window(Tumble over 1.hour on 'rowtime as 'window)
// 基于ProcessTime时间概念创建滚动窗口,窗口长度为1h
sensors.window(Tumble over 1.hour on 'proctime as 'window)
//基于元素数量创建滚动窗口,窗口长度为100条记录('proctime没有实际意义)
sensors.window(Tumble over 100.rows on 'proctime as 'window)
其中over操作符指定窗口的长度,例如over 10.minutes代表10min创建一个窗口,over 10.rows代表10条数据创建一个窗口。on操作符定义了窗口基于的时间概念类型为EventTime还是ProcessTime,EventTime对应着rowtime,ProcessTime对应着proctime。最后通过as操作符将创建的窗口进行重命名,同时窗口名称需要在后续的算子中使用。
Sliding Windows
滑动窗口的窗口长度也是固定的,但窗口和窗口之间的数据能够重合,例如每隔10s统计最近5min的用户登录次数。滑动窗口也可以基于EventTime、ProcessTime以及Row-Count来定义。 如下代码实例所示,Table API中的滑动窗口使用Slide Class来创建,且分别基于EventTime、ProcessTime以及Row-Count来定义窗口。
// 通过scan方法在CataLog中查询Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
// 基于EventTime时间概念创建滑动窗口,窗口长度为10分钟,每隔5s统计一次
sensors.window(Slide over 10.minutes every 5.millis on 'rowtime as 'window)
// 基于ProcessTime时间概念创建滑动窗口,窗口长度为10分钟,每隔5s统计一次
sensors.window(Slide over 10.minutes every 5.millis on 'procetime as 'window)
//基于元素数量创建滑动窗口,指定10条记录创建一个窗口,窗口每5条记录移动一次
// 注意,'proctime没有实际意义
sensors.window(Slide over 100.rows every 5.rows on 'procetime as 'window)
上述代码中的over、on操作符与Tumpling窗口中的一样,都是指定窗口的固定长度和窗口的时间概念类型。和Tumpling窗口相比,Sliding Windows增加了every操作符,通过该操作符指定窗口的移动频率,例如every 5.millis表示窗口每隔5s移动一次。同时在最后创建Sliding窗口也需要使用as操作符对窗口进行重命名,并在后续操作中通过窗口名称调用该窗口。
Session Windows
与Tumpling、Sliding窗口不同的是,Session窗口不需要指定固定的窗口时间,而是通过判断固定时间内数据的活跃性来切分窗口,例如10min内数据不接入则切分窗口并触发计算。Session窗口只能基于EventTime和ProcessTime时间概念来定义,通过withGap操作符指定数据不活跃的时间Gap,表示超过该时间数据不接入,则切分窗口并触发计算。 如以下代码,通过指定EventTime和ProcessTime时间概念来创建Session Window。
// 通过scan方法在CataLog中查询Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
// 基于EventTime时间概念创建会话窗口,Session Gap为10min
sensors.window(Session withGap 10.minutes on 'rowtime as 'window)
// 基于ProcessTime时间概念创建会话窗口,Session Gap为10min
sensors.window(Session withGap 10.minutes on 'proctime as 'window)
Over Window
Over Window和标准SQL中提供的OVER语法功能类似,也是一种数据聚合计算的方式,但和Group Window不同的是,Over Window不需要对输入数据按照窗口大小进行堆叠。Over Window是基于当前数据和其周围邻近范围内的数据进行聚合统计的,例如基于当前记录前面的20条数据,然后基于这些数据统计某一指标的聚合结果。
在Table API中,Over Window也是在window方法中指定,但后面不需要和groupby操作符绑定,后面直接接select操作符,并在select操作符中指定需要查询的字段和聚合指标。如以下代码使用Over Class创建Over Window并命名为window,通过select操作符指定聚合指标var1.sum和var2.max。
// 通过scan方法在CataLog中查询Sensors表
val sensors:Table = tStreamEnv.scan("Sensors")
val table = sensors
//指定OverWindow并重命名为window
sensors.window(Over partitionBy 'id orderBy 'rowtime preceding
UNBOUNDED_RANGE as 'window)
// 通过在Select操作符中指定查询字段、窗口上var1求和值和var2最大值
.select('id, 'var1.sum over 'window, 'var2.max over 'window)
上述Over Window的创建需要依赖于partitionBy、orderBy、preceding及following四个参数:
- partitionBy操作符中指定了一个或多个分区字段,Table中的数据会根据指定字段进行分区处理,并各自运行窗口上的聚合算子求取统计结果。需要注意,partitionBy是一个可选项,如果用户不使用partitionBy操作,则数据会在一个Task实例中完成计算,不会并行到多个Tasks中处理。
- orderBy操作符指定了数据排序的字段,通常情况下使用EventTime或ProcessTime进行时间排序。
- preceding操作符指定了基于当前数据需要向前纳入多少数据作为窗口的范围。preceding中具有两种类型的时间范围,其中一种为Bounded类型,例如指定100.rows表示基于当前数据之前的100条数据;也可以指定10.minutes,表示向前推10min,计算在该时间范围以内的所有数据。另外一种为UnBounded类型,表示从进入系统的第一条数据开始,且UnBounded类型可以使用静态变量UNBOUNDED_RANGE指定,表示以时间为单位的数据范围;也可以使用UNBOUNDED_ROW指定,表示以数据量为单位的数据范围。
- following操作符和preceding相反,following指定了从当前记录开始向后纳入多少数据作为计算的范围。目前Table API还不支持从当前记录开始向后指定多行数据进行窗口统计,可以使用静态变量CURRENT_ROW和CURRENT_RANGE来设定仅包含当前行,默认情况下Flink会根据用户使用窗口间隔是时间还是数量来指定following参数。需要注意的是,preceding和following指定的间隔单位必须一致,也就说二者必须是时间和数量中的一种类型。
如以下实例定义了Unbounded类型的Over Window,其中包括了UNBOUNDED_RANGE和UNBOUNDED_ROW两种preceding参数类型。
//创建UNBOUNDED_RANGE类型的OverWindow,指定分区字段为id,并根据rowtime排序 .window([w: OverWindow] as 'window) //创建UNBOUNDED_RANGE类型的OverWindow,指定分区字段为id,并根据proctime排序 sensors.window(Over partitionBy 'id orderBy 'proctime preceding UNBOUNDED_RANGE as 'window) //创建UNBOUNDED_ROW类型的OverWindow,指定分区字段为id,并根据rowtime排序 sensors.window(Over partitionBy 'id orderBy 'rowtime preceding UNBOUNDED_ROW as 'window) //创建UNBOUNDED_ROW类型的OverWindow,指定分区字段为id,并根据proctime排序 sensors.window(Over partitionBy 'id orderBy 'proctime preceding UNBOUNDED_ROW as 'window)
如下实例定义了Unbounded类型的Over Window,其中包括UNBOUNDED_RANGE和UNBOUNDED_ROW两种preceding参数类型。
//创建BOUNDED类型的OverWindow,窗口大小为向前10min,并根据rowtime排序 sensors.window(Over partitionBy 'id orderBy 'rowtime preceding 10.minutes as 'window) //创建BOUNDED类型的OverWindow,窗口大小为向前10min,并根据proctime排序 sensors.window(Over partitionBy 'id orderBy 'proctime preceding 10.minutes as 'window) //创建BOUNDED类型的OverWindow,窗口大小为向前100条,并根据rowtime排序 sensors.window(Over partitionBy 'id orderBy 'rowtime preceding 100.rows as 'window) //创建BOUNDED类型的OverWindow,窗口大小为向前100条,并根据rowtime排序 sensors.window(Over partitionBy 'id orderBy 'proctime preceding 100.rows as 'window)
聚合操作
在Flink Table API中提供了基于窗口以及不基于窗口的聚合类操作符基本涵盖了数据处理的绝大多数场景,和SQL中Group By语句相似,都是对相同的key值的数据进行聚合,然后基于聚合数据集之上统计例如sum、count、avg等类型的聚合指标。
GroupBy Aggregation
在全量数据集上根据指定字段聚合,首先将相同的key的数据聚合在一起,然后在聚合的数据集上计算统计指标。需要注意的是,这种聚合统计计算依赖状态数据,如果没有时间范围,在流式应用中状态数据根据不同的key及统计方法,将会在计算过程中不断地存储状态数据,所以建议用户尽可能限定统计时间范围避免因为状态体过大导致系统压力过大。
val sensors: Table = tStreamEnv.scan("Sensors")
//根据id进行聚合,求取Var1字段的sum结果
val groupResult = sensors.groupBy('id).select('id, 'var1.sum as 'var1Sum)
GroupBy Window Aggregation
该类聚合运算是构建在GroupBy Window之上然后根据指定字段聚合并统计结果。与非窗口统计相比,GroupBy Window可以将数据限定在一定范围内,这样能够有效控制状态数据的存储大小。如下代码实例所示,通过window操作符指定GroupBy Window类型之后,紧接着就是使用groupBy操作符指定需要根据哪些key进行数据的聚合,最后在select操作符中查询相关的指标。
val sensors: Table = tStreamEnv.scan("Sensors")
val groupWindowResult: Table = orders
// 定义窗口类型为滚动窗口
.window(Tumble over 1.hour on 'rowtime as 'window)
// 根据id和window进行聚合
.groupBy('id, 'window)
//获取字段id,窗口属性start、end、rowtime,以及聚合指标var1Sum
.select('id, 'window.start, 'window.end, 'window.rowtime, 'var1.sum as 'var1Sum)
Over Window Aggregation
和GroupBy Window Aggregation类似,但Over Window Aggregation是构建在Over Window之上,同时不需要在window操作符之后接groupby操作符。如以下代码实例所示在select操作符中通过“var1.avg over ‘window”来指定需要聚合的字段及聚合方法。需要注意的是,在select操作符中只能使用一个相同的Window,且Over Window Aggregation仅支持preceding定义的UNBOUNDED和BOUNDED类型窗口,对于following定义的窗口目前不支持。同时Over Window Aggregation仅支持流式计算场景。
val sensors: Table = tStreamEnv.scan("Sensors")
val overWindowResult: Table = sensors
// 定义UNBOUNDED_RANGE类型的OverWindow
.window(Over partitionBy 'id orderBy 'rowtime preceding UNBOUNDED_RANGE
as 'window)
//获取字段id以及每种指标的聚合结果
.select('id, 'var1.avg over 'window, 'var2.max over 'window, 'var3.min over 'window)
Distinct Aggregation
Distinct Aggregation和标准SQL中的COUNT(DISTINCT a)语法相似,主要作用是将Aggregation Function应用在不重复的输入元素上,对于重复的指标不再纳入计算范围内。Distinct Aggregation可以与GroupBy Aggregation、GroupBy Window Aggregation及Over Window Aggregation结合使用。
val sensors: Table = tStreamEnv.scan("Sensors")
// 基于GroupBy Aggregation,对不同的var1指标进行求和
val groupByDistinctResult = sensors
.groupBy('id)
.select('id, 'var1.sum.distinct as 'var1Sum)
// 基于GroupBy Window Aggregation,对不同的var1指标进行求和
val groupByWindowDistinctResult = sensors
.window(Tumble over 1.minutes on 'rowtime as 'window).groupBy('id, 'window)
.select('id, 'var1.sum.distinct as 'var1Sum)
// 基于GroupBy Window Aggregation,对不同的var1求平均值,并获取var2的最小值
val overWindowDistinctResult = sensors
.window(Over partitionBy 'id orderBy 'rowtime
preceding UNBOUNDED_RANGE as 'window)
.select('id, 'var1.avg.distinct over 'window, 'var2.min over 'window)
Distinct
单个Distinct操作符和标准SQL中的DISTINCT功能一样,用于返回唯一不同的记录。Distinct操作符可以直接应用在Table上,但是需要注意的是,Distinct操作是非常消耗资源的,且仅支持在批量计算场景中使用。
val sensors: Table = tStreamEnv.scan("Sensors")
//返回sensors表中唯一不同的记录
val distinctResult = sensors.distinct()
多表关联
Inner Join
Inner Join和标准SQL的JOIN语句功能一样,根据指定条件内关联两张表,并且只返回两个表中具有相同关联字段的记录,同时两张表中不能具有相同的字段名称。
val t1 = tStreamEnv.fromDataStream(stream1, 'id1, 'var1, 'var2)
val t2 = tStreamEnv.fromDataStream(stream2, 'id2, 'var3, 'var4)
val innerJoinresult = t1.join(t2).where('id1 === 'id2).select('id1, 'var1, 'var3)
Outer Join
Outer Join操作符和标准SQL中的LEFT/RIGHT/FULL OUTER JOIN功能一样,且根据指定条件外关联两张表中不能有相同的字段名称,同时必须至少指定一个关联条件。如以下代码案例,分别对t1和t2表进行三种外关联操作。
//从DataSet数据集中创建Table
val t1 = tBatchEnv.fromDataSet(dataset1, 'id1, 'var1, 'var2)
val t2 = tBatchEnv.fromDataSet(dataset2, 'id2, 'var3, 'var4)
//左外关联两张表
val leftOuterResult = t1.leftOuterJoin(t2, 'id1 === 'id2).select('id1, 'var1, 'var3)
//右外关联两张表
val rightOuterResult = t1. rightOuterJoin (t2, 'id1 === 'id2).select('id1, 'var1, 'var3)
//全外关联两张表
val fullOuterResult = t1. fullOuterJoin (t2, 'id1 === 'id2).select('id1, 'var1, 'var3)
Time-windowed Join
Time-windowed Join是Inner Join的子集,在Inner Join的基础上增加了时间条件,因此在使用Time-windowed Join关联两张表时,需要至少指定一个关联条件以及两张表中的关联时间,且两张表中的时间属性对应的时间概念必须一致(EventTime或者ProcessTime),时间属性对比使用Table API提供的比较符号(<, <=, >=, >),同时可以在条件中增加或者减少时间大小,例如rtime - 5.minutes,表示右表中的时间减去5分钟。
//从DataSet数据集中创建Table,并且两张表都使用EventTime时间概念
val t1= tBatchEnv.fromDataSet(dataset1, 'id1, 'var1, 'var2,'time1.rowtime)
val t2= tBatchEnv.fromDataSet(dataset2, 'id2, 'var3, 'var4,'time2.rowtime)
//将t1和t2表关联,并在where操作符中指定时间关联条件
val result = t1.join(t2)
//指定关联条件
.where('id1 === 'id2 && 'time1 >= 'time2 - 10.minutes && 'time1 < 'time2 + 10.minutes)
//查询并输出结果
.select('id1, 'var1, 'var2, 'time1)
Join with Table Function
在Inner Join中可以将Table与自定义的Table Funciton进行关联,Table中的数据记录与Table Fuction输出的数据进行内关联,其中如果Table Function返回空值,则不输出结果。
val table = tBatchEnv.fromDataSet(dataset, 'id, 'var1, 'var2,'time.rowtime)
// 初始化自定义的Table Function
val upper: TableFunction[_] = new MyUpperUDTF()
// 通过Inner Join关联经过MyUpperUDTF处理的Table,然后形成新的表
val result: Table = table
.join(upper('var1) as 'upperVar1)
.select('id, 'var1, 'upperVar1, 'var2)
在Left Outer Join中使用Table Funciton与使用Inner Join类似,区别在于如果Table Funciton返回的结果是空值,则在输出结果中对应的记录将会保留且Table Function输出的值为Null。
Join with Temporal Table
val tempTable = tEnv.scan("TempTable")
val temps = tempTable.createTemporalTableFunction('t_proctime, 't_id)
val table = tEnv.scan("Table")
val result = table.join(temps('o_rowtime), 'table_key == 'temp_key)
集合操作
当两张Table都具有相同的Schema结构,则这两张表就可以进行类似于Union类型的集合操作。注意,以下除了UnionAll和In两个操作符同时支持流计算场景和批量计算场景之外,其余的操作符都仅支持批量计算场景。
//从DataSet数据集中创建Table
val t1= tBatchEnv.fromDataSet(dataset1, 'id1, 'var1, 'var2)
val t2= tBatchEnv.fromDataSet(dataset2, 'id2, 'var3, 'var4)
- Union:和标准SQL中的UNION语句功能相似,用于合并两张表并去除相同的记录。
val unionTable = t1.union(t2)
- UnionAll:和标准SQL中的UNIONALL语句功能相似,用于合并两张表但不去除相同的记录。
val unionAllTable = t1.unionAll(t2)
- Intersect:和标准SQL中的INTERSECT语句功能相似,合并两张变且仅返回两张表中的交集数据,如果记录重复则只返回一条记录。
val intersectTable = t1.intersect(t2)
- IntersectAll:和标准SQL中的INTERSECT ALL语句功能相似,合并两张表且仅返回两张表中的交集数据,如果记录重复则返回所有重复的记录。
val intersectAllTable = t1.intersectAll(t2)
- Minus:和标准SQL中EXCEPT语句功能相似,合并两张表且仅返回左表中有但是右表没有的数据差集,如果左表记录重复则只返回一条记录。
val minusTable = t1.minus(t2)
- MinusAll:和标准SQL中EXCEPT ALL语句功能相似,合并两张表仅返回左表有但是右表没有的数据差集,如果左表中记录重复n次,右表中相同记录出现m次,则返回n-m条记录。
val minusAllTable = t1.minusAll(t2)
- In:和标准SQL中IN语句功能相似,通过子查询判断左表中记录的某一列是否在右表中或给定的列表中,如果存在则返回True,如果不存在则返回False,where操作符根据返回条件判断是否返回记录。
val stream1: DataStream[(Long, String)] = ... val stream2: DataStream[Long] = ... val left: Table = tStreamEnv.fromDataStream(stream1,'id,'name) val right: Table = tStreamEnv.fromDataStream(stream2,'id) //使用in语句判断left表中记录的id是否在右表中,如过在则返回记录 val result1 = left.where('id in(right)) //使用in语句判断left表中记录的id是否在给定列表中,如过在则返回记录 val result2 = left.where('id in("92","11"))
排序操作
- Orderby:和标准SQL中ORDER BY语句功能相似,Orderby操作符根据指定的字段对Table进行全局排序,支持顺序(asc)和逆序(desc)两种方式。可以使用Offset操作符来控制排序结果输出的偏移量,使用fetch操作符来控制排序结果输出的条数。需要注意,该操作符仅支持批量计算场景。
val table: Table= ds.toTable(tBatchEnv, 'id,'var1,'var2) //根据var1对table按顺序方式排序 val result = table.orderBy('var1.asc) //根据id对table按逆序方式排序 val result1 = table.orderBy('var1.desc) // 返回排序结果中的前5条 val result2: Table = in.orderBy('var1.asc).fetch(5) // 忽略排序结果中的前10条数据,然后返回剩余全部数据 val result3: Table = in.orderBy('var1.asc).offset(10) // 忽略排序结果中的前10条数据,然后返回剩余数据中的前5条数据 val result4: Table = in.orderBy('var1.asc).offset(10).fetch(5)
数据写入
通过Insert Into操作符将查询出来的Table写入注册在TableEnvironment的表中,从而完成数据的输出。注意,目标表的Schema结构必须和查询出来的Table的Schema结构一致。
val sensors: Table = tableEnv.scan("Sensors")
//将查询出来的表写入OutSensors表中,OutSensors必须已经在TableEnvironment中注册
sensors.insertInto("OutSensors")
Flink SQL使用
SQL作为Flink中提供的接口之一,占据着非常重要的地位,主要是因为SQL具有灵活和丰富的语法,能够应用于大部分的计算场景。Flink SQL底层使用Apache Calcite框架,将标准的Flink SQL语句解析并转换成底层的算子处理逻辑,并在转换过程中基于语法规则层面进行性能优化,比如谓词下推等。另外用户在使用SQL编写Flink应用时,能够屏蔽底层技术细节,能够更加方便且高效地通过SQL语句来构建Flink应用。Flink SQL构建在Table API之上,并含盖了大部分的Table API功能特性。同时Flink SQL可以和Table API混用,Flink最终会在整体上将代码合并在同一套代码逻辑中,另外构建一套SQL代码可以同时应用在相同数据结构的流式计算场景和批量计算场景上,不需要用户对SQL语句做任何调整,最终达到实现批流统一的目的。
Flink SQL实例
以下通过实例来了解Flink SQL整体的使用方式。在前面小节中我们知道如何在Flink TableEnvironment中注册和定义数据库或者表结构,最终是能够让用户方便地使用Table API或者SQL处理不同类型数据的,然后调用TableEnvironment SqlQuery方法执行Flink SQL语句,完成数据处理
// 获取StreamTableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 表结构schema (id, type,timestamp, var1, var2)
tableEnv.register("sensors", sensors_table)
val csvTableSink = new CsvTableSink("/path/csvfile", ...)
//定义字段名称
val fieldNames: Array[String] = Array("id", "type")
//定义字段类型
val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG, Types.STRING)
//通过registerTableSink将CsvTableSink注册成table
tableEnv.registerTableSink("csv_output_table", fieldNames, fieldTypes, csvSink)
// 计算与传感器类型为A的每个传感器id对应的var1指标的和
val result: Table = tEnv.sqlQuery(
"select id,
sum(var1)as sumvar1
from sensors_table
where type='speed'
group by sensor_id)")
// 通过SqlUpdate方法,将类型为温度的数据筛选出来并输出到外部表中
tableEnv.sqlUpdate(
"INSERT INTO csv_output_table SELECT product, amount FROM Sensors WHERE type = 'temperature'")
以上实例使用SQL语句筛选出sensor_type为“speed”的记录,并根据id对var1指标进行聚合并求和,执行完sqlQuery()方法后生成result Table,并且后续计算中可以直接使用result Table进行。在实例代码中可以看出,整个应用包括了从数据源的注册到具体数据处理SQL转换,以及结果数据的输出。结果输出使用了Flink自带的CSVSink,然后将SQL执行的结果用INSERT INTO的方式写入对应CSV文件中,完成结果数据的存储落地。
执行SQL
Flink SQL可以借助于TableEnvironment的SqlQuery和SqlUpdate两种操作符使用,前者主要是从执行的Table中查询并处理数据生成新的Table,后者是通过SQL语句将查询的结果写入到注册的表中。其中SqlQuery方法中可以直接通过$符号引用Table,也可以事先在TableEnvironment中注册Table,然后在SQL中使用表名引用Table。
在SQL中引用Table
如以下代码实例所示,创建好Table对象之后,可以在SqlQuery方法中直接使用$符号来引用创建好的Table,Flink会自动将被引用的Table注册到TableEnvironment中,从代码层面将Table API和SQL进行融合。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 从外部数据源中转换形成DataStream数据集
val inputSteam: DataStream[(Long, String, Integer)] = ...
//将DataStream数据集转换成Table
val sensor_table = inputSteam.toTable(tableEnv, 'id, 'type, 'var1)
//在SqlQuery中直接使用$符号来引用创建好的Table
val result = tableEnv.sqlQuery(
s"SELECT SUM(var1) FROM $sensor_table WHERE product === 'temperature’")
在SQL中引用注册表
如以下代码实例所示,事先调用registerDataStream方法将DataStream数据集在TableEnvironment中注册成Table,然后在sqlQuery()方法中SQL语句就直接可以通过Table名称来引用Table。
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// 在SQL中直接引用注册好的Table名称,完成数据的处理,并输出结果
val result = tableEnv.sqlQuery(
"SELECT product, amount FROM Sensors WHERE typeLIKE 'temperature’")
在SQL中数据输出
如以下代码实例所示,可以调用sqlUpdate()方法将查询出来的数据输出到外部系统中,首先通过实现TableSink接口创建外部系统对应的TableSink,然后将创建好的TableSink实例注册在TableEnvironment中。最后使用sqlUpdate方法指定Insert Into语句将Table中的数据写入CSV文件TableSink对应的Table中,最终完成将Table数据的输出到CSO文件中。
val csvTableSink = new CsvTableSink("/path/csvfile", ...)
//定义字段名称
val fieldNames: Array[String] = Array("id", "type")
//定义字段类型
val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG, Types.STRING)
//通过registerTableSink将CsvTableSink注册成Table
tableEnv.registerTableSink("csv_output_table", fieldNames, fieldTypes, csvSink)
// 通过SqlUpdate方法,将类型为温度的数据筛选出并输出到外部表中
tableEnv.sqlUpdate(
"INSERT INTO csv_output_table SELECT id, type FROM Sensors WHERE type = 'temperature'")
数据查询与过滤
可以通过Select语句查询表中的数据,并使用Where语句设定过滤条件,将符合条件的数据筛选出来。
//查询Persons表全部数据
SELECT * FROM Sensors
//查询Persons表中name、age字段数据,并将age命名为d
SELECT id, type AS t FROM Sensors
//将信号类型为'temperature'的数据查询出来
SELECT * FROM Sensors WHERE type = 'temperature'
//将id为偶数的信号信息查询出来
SELECT * FROM Sensors WHERE id % 2 = 0
Group Windows窗口操作
Group Window是和GroupBy语句绑定使用的窗口,和Table API一样,Flink SQL也支持三种窗口类型,分别为Tumble Windows、HOP Windows和Session Windows,其中HOP Windows对应Table API中的Sliding Window,同时每种窗口分别有相应的使用场景和方法。
Tumble Windows
滚动窗口的窗口长度是固定的,且窗口和窗口之间的数据不会重合。SQL中通过TUMBLE(time_attr, interval)关键字来定义滚动窗口,其中参数time_attr用于指定时间属性,参数interval用于指定窗口的固定长度。滚动窗口可以应用在基于EventTime的批量计算和流式计算场景中,和基于ProcessTime的流式计算场景中。窗口元数据信息可以通过在Select语句中使用相关的函数获取,且窗口元数据信息可用于后续的SQL操作,例如可以通过TUMBLE_START获取窗口起始时间,TUMBLE_END获取窗口结束时间,TUMBLE_ROWTIME获取窗口事件时间,TUMBLE_PROCTIME获取窗口数据中的ProcessTime。如以下实例所示,分别创建基于不同时间属性的Tumble窗口。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建数据集
val ds: DataStream[(Long, String, Int)] = ...
// 注册表名信息并定义字段proctime为Process Time,定义字段rowtime为rowtime,
tableEnv.registerDataStream("Sensors", ds, 'id, 'type, 'var1, 'proctime.proctime, 'rowtime.rowtime)
//基于proctime创建TUMBLE窗口,并指定10min切分为一个窗口,根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE), id"
//基于rowtime创建TUMBLE窗口,并指定5min切分为一个窗口,根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY TUMBLE(proctime, INTERVAL '5’ MINUTE), id"
HOP Windows
滑动窗口的窗口长度固定,且窗口和窗口之间的数据可以重合。在Flink SQL中通过HOP(time_attr, interval1, interval2)关键字来定义HOP Windows,其中参数time_attr用于指定使用的时间属性,参数interval1用于指定窗口滑动的时间间隔,参数interval2用于指定窗口的固定大小。其中如果interval1小于interval2,窗口就会发生重叠。HOP Windows可以应用在基于EventTime的批量计算场景和流式计算场景中,以及基于ProcessTime的流式计算场景中。HOP窗口的元数据信息获取的方法和Tumble的相似,例如可以通过HOP_START获取窗口起始时间,通过HOP_END获取窗口结束时间,通过HOP_ROWTIME获取窗口事件时间,通过HOP_PROCTIME获取窗口数据中的ProcessTime。
如以下代码所示,分别创建基于不同时间概念的HOP窗口,并通过相应方法获取窗口云数据。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建数据集
val ds: DataStream[(Long, String, Int)] = ...
// 注册表名信息并定义字段proctime为ProcessTime,定义字段rowtime为rowtime,
tableEnv.registerDataStream("Sensors", ds, 'id, 'type, 'var1, 'proctime.proctime, 'rowtime.rowtime)
//基于proctime创建HOP窗口,并指定窗口长度为10min,每1min滑动一次窗口
//然后根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY HOP(proctime, INTERVAL '1' MINUTE,INTERVAL '10' MINUTE), id"
//基于rowtime创建HOP窗口,并指定窗口长度为10min,每5min滑动一次窗口
//根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY HOP(rowtime, INTERVAL '5' MINUTE,INTERVAL '10' MINUTE), id"
//基于rowtime创建HOP窗口,并指定5min切分为一个窗口,根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id,
//获取窗口起始时间并记为wStart字段
HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) as wStart,
//获取窗口起始时间并记为wEnd字段
HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) as wEnd,
SUM(var1)
FROM Sensors GROUP BY HOP(rowtime, INTERVAL '5’ MINUTE, INTERVAL '10’ MINUTE), id"
Session Windows
Session窗口没有固定的窗口长度,而是根据指定时间间隔内数据的活跃性来切分窗口,例如当10min内数据不接入Flink系统则切分窗口并触发计算。在SQL中通过SESSION(time_attr, interval)关键字来定义会话窗口,其中参数time_attr用于指定时间属性,参数interval用于指定Session Gap。Session Windows可以应用在基于EventTime的批量计算场景和流式计算场景中,以及基于ProcessTime的流式计算场景中。
Session窗口的元数据信息获取与Tumble窗口和HOP窗口相似,通过SESSION_START获取窗口起始时间,SESSION_END获取窗口结束时间,SESSION_ROWTIME获取窗口数据元素事件时间,SESSION_PROCTIME获取窗口数据元素处理时间。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建数据集
val ds: DataStream[(Long, String, Int)] = ...
// 注册表名信息并定义字段proctime为ProcessTime,定义字段rowtime为rowtime,
tableEnv.registerDataStream("Sensors", ds, 'id, 'type, 'var1, 'proctime.proctime, 'rowtime.rowtime)
//基于proctime创建SESSION窗口,指定Session Gap为1h
//然后根据id进行聚合求取var1的和
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY SESSION(proctime, INTERVAL '1' HOUR), id"
//基于rowtime创建SESSION窗口,指定Session Gap为1h
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY SESSION(proctime, INTERVAL '5' HOUR), id"
//基于rowtime创建SESSION窗口,指定Session Gap为1h,
tableEnv.sqlQuery(SELECT id,
//获取窗口起始时间并记为wStart字段
SESSION_START(proctime, INTERVAL '5' HOUR) as wStart,
//获取窗口起始时间并记为wStart字段
SESSION_END(rowtime, INTERVAL '5' HOUR) as wEnd,
SUM(var1) FROM Sensors
GROUP BY SESSION(rowtime, INTERVAL '5' HOUR), id"
//基于rowtime创建SESSION窗口,指定Session Gap为1h
tableEnv.sqlQuery(SELECT id, SUM(var1) FROM Sensors GROUP BY SESSION(rowtime, INTERVAL '5’ HOUR), id"
Scalar Function
Scalar Function也被称为标量函数,表示对单个输入或者多个输入字段计算后返回一个确定类型的标量值,其返回值类型可以为除TEXT、NTEXT、IMAGE、CURSOR、 TIMESTAMP和TABLE类型外的其他所有数据类型。例如Flink常见的内置标量函数有DATE()、UPPER()、LTRIM()等,同时在自定义标量函数中,用户需要确认Flink内部是否已经实现相应的Scalar Fuction,如果已经实现则可以直接使用;如果没有实现,则在注册自定义函数过程中,需要和内置的其他Scalar Function名称区分,否则会导致注册函数失败,影响应用的正常执行。
定义Scalar Function需要继承org.apache.flink.table.functions包中的ScalarFunction类,同时实现类中的evaluation方法,自定义函数计算逻辑需要在该方法中定义,同时该方法必须声明为public且将方法名称定义为eval。同时在一个ScalarFunction实现类中可以定义多个evaluation方法,只需要保证传递进来的参数不相同即可。
通过定义Add Class并继承ScalarFunction接口,实现对两个数值相加的功能。然后在Table Select操作符和SQL语句中使用。
// 注册输入数据源
tStreamEnv.registerTableSource("InputTable", new InputEventSource)
//在窗口中使用输入数据源,并基于TableSource中定义的EventTime字段创建窗口
val table: Table = tStreamEnv.scan("InputTable")
// 在Object或者静态环境中创建自定义函数
class Add extends ScalarFunction {
def eval(a: Int, b: Int): Int = {//整型数据相加
if (a == null || b == null) null
a + b}
def eval(a: Double, b: Double): Double = {//Double类型数据相加
if (a == null || b == null) null
a + b}
}
// 实例化ADD函数
val add = new Add
// 在Scala Table API中使用自定义函数
val result = table.select('a, 'b, add('a, 'b))
// 在Table Environment中注册自定义函数
tStreamEnv.registerFunction("add", new Add)
//在SQL中使用ADD Scalar函数
tStreamEnv.sqlQuery("SELECT a,b, ADD(a,b) FROM InputTable")
在自定义标量函数过程中,函数的返回值类型必须为标量值,尽管Flink内部已经定义了大部分的基本数据类型以及POJOs类型等,但有些比较复杂的数据类型如果Flink不支持获取,此时需要用户通过继承并实现ScalarFunction类中的getResultType实现getResult-Type方法对数据类型进行转换。例如在Table API和SQL中可能需要使用Types.TIMESTAMP数据类型,但是基于ScalarFunction得出的只能是Long类型,因此可以通过实现getResultType方法对结果数据进行类型转换,从而返回Timestamp类型。
object LongToTimestamp extends ScalarFunction {
def eval(t: Long): Long = { t % 1000}
override def getResultType(signature: Array[Class[_]]): TypeInformation[_]
= {
Types.TIMESTAMP
}}
Table Function
和Scalar Function不同,Table Function将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是单独的一个标量指标,且返回结果中可以含有一列或多列指标,根据自定义Table Funciton函数返回值确定,因此从形式上看更像是Table结构数据。
定义Table Function需要继承org.apache.flink.table.functions包中的TableFunction类,并实现类中的evaluation方法,且所有的自定义函数计算逻辑均在该方法中定义,需要注意方法必须声明为public且名称必须定义为eval。另外在一个TableFunction实现类中可以实现多个evaluation方法,只需要保证参数不相同即可。
在Scala语言Table API中,Table Function可以用在Join、LeftOuterJoin算子中,Table Function相当于产生一张被关联的表,主表中的数据会与Table Function所有产生的数据进行交叉关联。其中LeftOuterJoin算子当Table Function产生结果为空时,Table Function产生的字段会被填为空值。
在应用Table Function之前,需要事先在TableEnvironment中注册Table Function,然后结合LATERAL TABLE关键字使用,根据语句结尾是否增加ON TRUE关键字来区分是Join还是leftOuterJoin操作。如代码清单7-13所示,通过自定义SplitFunction Class继承TableFunction接口,实现根据指定切割符来切分输入字符串,并获取每个字符的长度和HashCode的功能,然后在Table Select操作符和SQL语句中使用定义的SplitFunction。
// 注册输入数据源
tStreamEnv.registerTableSource("InputTable", new InputEventSource)
// 在Scala Table API中使用自定义函数
val split = new SplitFunction(",")
//在join函数中调用Table Function,将string字符串切分成不同的Row,并通过as指定字段名称为str,length,hashcode
table.join(split('origin as('string, 'length, 'hashcode)))
.select('origin, 'str, 'length, 'hashcode)
table.leftOuterJoin(split('origin as('string, 'length, 'hashcode)))
.select('origin, 'str, 'length, 'hashcode)
// 在Table Environment中注册自定义函数,并在SQL中使用
tStreamEnv.registerFunction("split", new SplitFunction(","))
//在SQL中和LATERAL TABLE关键字一起使用Table Function
//和Table API的JOIN一样,产生笛卡儿积结果
tStreamEnv.sqlQuery("SELECT origin, str, length FROM InputTable, LATERAL TABLE(split(origin)) as T(str, length,hashcode)")
//和Table API中的LEFT OUTER JOIN一样,产生左外关联结果
tStreamEnv.sqlQuery("SELECT origin, str, length FROM InputTable, LATERAL TABLE(split(origin)) as T(str, length,hashcode) ON TRUE")
和Scalar Function一样,对于不支持的输出结果类型,可以通过实现TableFunction接口中的getResultType()对输出结果的数据类型进行转换,具体可以参考ScalarFunciton定义。
Aggregation Function
Flink Table API中提供了User-Defined Aggregate Functions (UDAGGs),其主要功能是将一行或多行数据进行聚合然后输出一个标量值,例如在数据集中根据Key求取指定Value的最大值或最小值。
自定义Aggregation Function需要创建Class实现org.apache.flink.table.functions包中的AggregateFunction类。关于AggregateFunction的接口定义如代码清单7-14所示可以看出AggregateFunction定义相对比较复杂。
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
// 创建Accumulator(强制)
public ACC createAccumulator();
// 累加数据元素到ACC中(强制)
public void accumulate(ACC accumulator, [user defined inputs]);
// 从ACC中去除数据元素(可选)
public void retract(ACC accumulator, [user defined inputs]);
// 合并多个ACC(可选)
public void merge(ACC accumulator, java.lang.Iterable<ACC> its);
//获取聚合结果(强制)
public T getValue(ACC accumulator);
//重置ACC(可选)
public void resetAccumulator(ACC accumulator);
//如果只能被用于Over Window则返回True(预定义)
public Boolean requiresOver = false;
//指定统计结果类型(预定义)
public TypeInformation<T> getResultType = null;
//指定ACC数据类型(预定义)
public TypeInformation<T> getAccumulatorType = null;
}
在AggregateFunction抽象类中包含了必须实现的方法createAccumulator()、accumulate()、getValue()。其中,createAccumulator()方法主要用于创建Accumulator,以用于存储计算过程中读取的中间数据,同时在Accumulator中完成数据的累加操作;accumulate()方法将每次接入的数据元素累加到定义的accumulator中,另外accumulate()方法也可以通过方法复载的方式处理不同类型的数据;当完成所有的数据累加操作结束后,最后通过getValue()方法返回函数的统计结果,最终完成整个AggregateFunction的计算流程。
除了以上三个必须要实现的方法之外,在Aggregation Function中还有根据具体使用场景选择性实现的方法,如retract()、merge()、resetAccumulator()等方法。其中,retract()方法是在基于Bouded Over Windows的自定义聚合算子中使用;merge()方法是在多批聚合和Session Window场景中使用;resetAccumulator()方法是在批量计算中多批聚合的场景中使用,主要对accumulator计数器进行重置操作。
因为目前在Flink中对Scala的类型参数提取效率相对较低,因此Flink建议用户尽可能实现Java语言的Aggregation Function,同时应尽可能使用原始数据类型,例如Int、Long等,避免使用复合数据类型,如自定义POJOs等,这样做的主要原因是在Aggregation Function计算过程中,期间会有大量对象被创建和销毁,将对整个系统的性能造成一定的影响。
自定义数据源
Flink Table API可以支持很多种数据源的接入,除了能够使用已经定义好的TableSource数据源之外,用户也可以通过自定义TableSource完成从其他外部数据介质(数据库,消息中间件等)中接入流式或批量类型的数据。Table Source在TableEnviroment中定义好后,就能够在Table API和SQL中直接使用。
与Table Source相似的在Table API中提供通过TableSink接口定义对Flink中数据的输出操作,包括将数据输出到外部的存储系统中,例如常见的数据库、消息中间件及文件系统等。用户实现TableSink接口并在TableEnvironment中注册,就能够在Table API和SQL中获取TableSink对应的Table,然后将数据输出到TableSink对应的存储介质中。
TableSource定义
TableSource是在Table API中专门针对获取外部数据提出的通用数据源接口。TableSource定义中将数据源分为两类,一种为 StreamTableSource,主要对应流式数据源的数据接入;另外一种BatchTableSource,主要对应批量数据源的数据接入。
TableSource<T> {
public TableSchema getTableSchema();
public TypeInformation<T> getReturnType();
public String explainSource();
}
可以看出在TableSource接口中,共有getTableSchema()、getReturnType()和explainSource()三个方法需要实现。其中,getTableSchema()方法用于指定数据源的Table Schema信息,例如字段名称和类型等;getReturnType()方法用于返回数据源中的字段数据类型信息,所有的返回字段必须是Flink TypeInformation支持的类型;explainSource()方法用于返回TableSource的描述信息,功能类似于SQL中的explain方法。
在Table API中,将TableSource分为主要针对流式数据接入的StreamTableSource和主要针对批量数据接入的BatchTableSource。其中StreamTableSource是从DataStream数据集中将数据转换成Table,BatchTableSource是从DataSet数据集中将数据转换成Table,以下分别介绍每种TableSource的定义和使用。
StreamTableSource
如以下代码所示StreamTableSource是TableSource的子接口,在StreamTableSource中可以通过getDataStream方法将数据源从外部介质中抽取出来并转换成DataStream数据集,且对应的数据类型必须是TableSource接口中getReturnType方法中返回的数据类型。StreamTableSource可以看成是对DataStream API中SourceFunciton的封装,并且在转换成Table的过程中增加了Schema信息。
StreamTableSource[T] extends TableSource[T] {
//定义获取DataStream的逻辑
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
通过实现StreamTableSource接口,完成了从外部数据源中获取DataStream数据集,并在getTableSchema方法中指定数据源对应的Table的Schema结构信息。
// 定义InputEventSource
class InputEventSource extends StreamTableSource[Row] {
override def getReturnType = {
val names = Array[String]("id", "value")
val types = Array[TypeInformation[_]](Types.STRING, Types.LONG)
Types.ROW(names, types)
}
//实现getDataStream方法,创建输入数据集
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// 定义获取DataStream数据集
val inputStream: DataStream[(String, Long)] = execEnv.addSource(...)
//将数据集转换成指定数据类型
val stream: DataStream[Row] = inputStream.map(t => Row.of(t._1, t._2))
stream
}
//定义TableSchema信息
override def getTableSchema: TableSchema = {
val names = Array[String]("id", "value")
val types = Array[TypeInformation[_]](Types.STRING, Types.LONG)
new TableSchema(names, types)
}
}
定义好的StreamTableSource之后就可以在Table API和SQL中使用,在Table API中通过registerTableSource方法将定义好的TableSource注册到TableEnvironment中,然后就可以使用scan操作符从TableEnvironment中获取Table在SQL中则直接通过表名引用注册好的Table即可。
// 注册输入数据源
tStreamEnv.registerTableSource("InputTable", new InputEventSource)
//在窗口中使用输入数据源,并基于TableSource中定义的EventTime字段创建窗口
val table: Table = tStreamEnv.scan("InputTable")
BatchTableSource
和StreamTableSource相似,BatchTableSource接口具有了getDataSet()方法,主要将外部系统中的数据读取并转换成DataSet数据集,然后基于对DataSet数据集进行处理和转换,生成BatchTableSource需要的数据类型。其中DataSet数据集中的数据格式也必须要和TableSource中getReturnType返回的数据类型一致。BatchTableSource接口定义如下。
BatchTableSource<T> implements TableSource<T> {
public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
其中,BatchTableSource本质上也是实现DataSet底层的SourceFunction,通过实例化BatchTableSource完成对外部批量数据的接入,然后在Table API中应用之后定义好的BatchTableSource。
// 创建InputEventSource
class InputBatchSource extends BatchTableSource[Row] {
//定义结果类型信息
override def getReturnType = {
val names = Array[String]("id", "value")
val types = Array[TypeInformation[_]](Types.STRING, Types.LONG)
Types.ROW(names, types)
}
//获取DataSet数据集
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
//从外部系统中读取数据
val inputDataSet = execEnv.createInput(...)
val dataSet: DataSet[Row] = inputDataSet.map(t => Row.of(t._1, t._2))
dataSet
}
//定义TableSchema信息
override def getTableSchema: TableSchema = {
val names = Array[String]("id", "value")
val types = Array[TypeInformation[_]](Types.STRING, Types.LONG)
new TableSchema(names, types)
}}}
BathTableSource和StreamTableSource的使用方式一样,也需要在TableEnvironment中注册已经创建好的TableSource信息,然后就可以在Table API或SQL中使用TableSource对应的表结构。
TableSink定义
与TableSource接口定义相似,TableSink接口的主要功能是将Table中的数据输出到外部系统中。如代码清单7-18所示TableSink接口中主要包含getOutputType、getFieldNames、getFieldTypes和configure四个方法。其中,getOutputType定义了输出的数据类型,且类型必须是TypeInformation所支持的类型;getFieldNames方法定义了当前Table中的字段名称;getFieldTypes方法和getFieldNames对应,返回了Table中字段的数据类型;configure方法则定义了输出配置,其中fieldNames定义了输出字段名称,fieldTypes定义了输出数据类型。
TableSink<T> {
public TypeInformation<T> getOutputType();
public String[] getFieldNames();
public TypeInformation[] getFieldTypes();
public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
在TableSink接口中,分别通过BatchTableSink和StreamingTableSink两个子接口定义和实现对批数据和流数据的输出功能。
StreamTableSink
可以通过实现StreamTableSink接口将Table中的数据以流的形式输出。在Stream-TableSink接口中emitDataStream方法定义了Table中输出数据的逻辑,实际是将Data-Stream数据集发送到对应的存储系统中。另外根据Table中数据记录更新的方式不同,将StreamTableSink分为AppendStreamTableSink、RetractStreamTableSink以及UpsertStreamTableSink三种类型。
AppendStreamTableSink
AppendStreamTableSink只输出在Table中所有由于INSERT操作所更新的记录,对于类似于DELTE操作更新的记录则不输出。如果用户同时输出了INSERT和DELETE操作的数据,则系统会抛出TableException异常信息。AppendStreamTableSink接口定义如下。
AppendStreamTableSink<T> implements TableSink<T> {
public void emitDataStream(DataStream<T> dataStream);
}
RetractStreamTableSink
RetractStreamTableSink同时输出INSERT和DELETE操作更新的记录,输出结果会被转换为Tuple2< Boolean, T>的格式。其中,Boolean类型字段用于对结果进行标记,如果是INSERT操作更新的记录则标记为true,反之DELETE操作更新的记录则标记为false;第二个字段为具体的输出数据。和AppendStreamTableSink相比RetractStreamTableSink则更加灵活,可以将全部操作更新的数据输出,并把筛选和处理的逻辑交给用户控制。RetractStreamTableSink接口定义如以下代码所示,接口中包括getRecordType和emitDataStream两个方法,getRecordType主要返回输出数据集对应的数据类型,emitDataStream定义数据输出到外部系统的逻辑。
RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink
和RetractStreamTableSink相比,UpsertStreamTableSink增加了对与UPDATE操作对应的记录输出的支持。该接口能够输出INSERT、UPDATE、DELETE三种操作更新的记录。使用UpsertStreamTableSink接口,需要指定输出相应的唯一主键keyFields,可以是单个字段的或者多个字段的组合,如果KeyFields不唯一且AppendOnly为false时,该接口中的方法会抛出TableException。
UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
//指定对应的keyFields,需要用户保持唯一性
public void setKeyFields(String[] keys);
// 设定Table输出模式是否为AppendOnly
public void setIsAppendOnly(boolean isAppendOnly);
//指定Table中的数据类型
public TypeInformation<T> getRecordType();
//定义数据输出逻辑
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
以上是UpsertStreamTableSink的接口定义,UpsertStreamTableSink接口中emitDataStream方法中的输入数据集的格式为Tuple2<Boolean,T>类型、其中,第一个Boolean字段标记UNPSERT更新为true,DELETE字段更新为false;第二个字段为类型T的输出数据记录。
BatchTableSink
BatchTableSink接口主要用于对批量数据的输出,和StreamTableSink不同的是,该接口底层操作的是DataSet数据集。BatchTableSink中没有区分是INSERT还是DELETE等操作更新的数据,而是全部都统一输出。BatchTableSink接口定义如下。在BatchTableSink接口中通过实现emitDataSet方法定义DataSet
BatchTableSink<T> implements TableSink<T> {
public void emitDataSet(DataSet<T> dataSet);
}
TableFactory定义
TableFactory主要作用是将事先定义好的TableSource和TableSink实现类封装成不同的Factory,然后通过字符参数进行配置,这样在Table API或SQL中就可以使用配置参数来引用定义好的数据源。TableFactory使用Java的SPI机制为TableFactory来寻找接口实现类,因此需要保证在META-INF/services/资源目录中包含所有与TableFactory实现类对应的配置列表,所有的TableFactory实现类都将被加载到Classpath中,然后应用中就能够通过使用TableFactory来读取输出数据集在Flink集群启动过程。
TableFactory接口定义中包含requiredContext和supportedProperties两个方法,其中requiredContext定义了当前实现的TableFactory中的Context上下文,同时通过Key-Value的方式来标记TableFactory,例如connector.type=dev-system,Flink应用中配置参数需要和Context具有相同的Key-Value参数才能够匹配到TableSource并使用,否则不会匹配相应的TableFactory实现类。在supportedProperties方法中定义了当前TableFactory中需要使用到的参数,如果Flink应用中配置的参数不属于当前的TableFactory,便会抛出异常。需要注意的是,Context参数中的Key不能和supportedProperties参数名称相同。如代码清单7-19所示,通过定义SocketTableSourceFactory实现类,完成从Socket端口中接入数据,并在Table API或SQL Client使用。
class SocketTableSourceFactory extends StreamTableSourceFactory[Row] {
//指定TableFactory上下文参数
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
context.put("update-mode", "append")
context.put("connector.type", "dev-system")
context
}
//指定TableFactory用于处理的参数集合
override def supportedProperties(): util.List[String] = {
val properties = new util.ArrayList[String]()
properties.add("connector.host")
properties.add("connector.port")
properties
}
//创建SocketTableSource实例
override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
val socketHost = properties.get("connector.host")
val socketPort = properties.get("connector.port")
new SocketTableSource(socketHost, socketPort)
}
}
在SQL Client中使用TableFactory
SQL Client能够支持用户在客户端中使用SQL编写Flink应用,从而可以在SQL Client中查询记录实现和定义好的SocketTableSource中数据的数据源
tables:
- name: SocketTable
type: source //指定Table类型是source还是sink
update-mode: append
connector:
type: dev-system
host: localhost
port: 10000
配置文件通过Yaml文件格式进行配置,需要放置在SQL Client environment文件中,文件中的配置项将直接被转换为扁平的字符配置,然后传输给相应的TableFactory。注意,需要将对应的TableFactory事先注册至Flink执行环境中,然后才能将配置文件项传递给对应的TableFactory,进而完成数据源的构建。
在Table & SQL API中使用TableFactory
如果用户想在Table & SQL API中使用TableFactory定义的数据源,也需要将对应的配置项传递给对应的TableFactory。为了安全起见,Flink提供了ConnectorDescriptor接口让用户定义连接参数,然后转换成字符配置项
class MySocketConnector(host:String,port:String) extends
ConnectorDescriptor("dev-system", 1, false) {
override protected def toConnectorProperties(): Map[String, String] = {
val properties = new HashMap[String, String]
properties.put("connector.host", host)
properties.put("connector.port", port)
properties
}}
创建MySocketConnector之后,在Table & SQL API中通过ConnectorDescriptor连接Connector,然后调用registerTableSource将对应的TableSource注册到TableEnvironment中。接下来就可以在Table & SQL API中正式使用注册的Table,以完成后续的数据处理了。
val tableEnv: StreamTableEnvironment = // ...
tableEnv.connect(new MySocketConnector("localhost","10000"))
.inAppendMode()
.registerTableSource("MySocketTable")
flink开发环境执行sql及生产环境提交sql文件
flink提供了sql-client.sh工具可直接操作sql,该工具一般在开发环境用于调试,在生产环境还是要打成jar文件。为了避免在java文件中写大量sql,我们可以将sql提取出来放到一个后缀是.sql的文件中,自己编辑java代码读取该sql文件。然后将java代码与sql文件一块打成jar,部署到flink环境中。
开发环境使用sql-client.sh
- 启动flink
start-cluster.sh
- 启动sqlclient
sql-client.sh
- 测试sqlclient
- 启动kafka,并创建生产者与消费者
kafka-console-producer.sh --broker-list 192.168.23.190:9092 --topic topic_source kafka-console-consumer.sh --bootstrap-server 192.168.23.190:9092 --topic topic_sink
- 在sqlclient中创建表source_sensor,获取topic_source的数据
create table source_sensor (id string,ts bigint,vc int) with('connector' = 'kafka', 'topic' = 'topic_source', 'properties.bootstrap.servers' = '192.168.23.190:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json');
- 在sqlclient中创建表sink_sensor,将数据传送给topic_sink
create table sink_sensor (id string,ts bigint,vc int) with('connector' = 'kafka', 'topic' = 'topic_sink', 'properties.bootstrap.servers' = '192.168.23.190:9092', 'format' = 'json');
- 将source_sensor的数据存储到sink_sensor
insert into sink_sensor select * from source_sensor;
生产环境将sql打成jar提交
public class Main { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); tabEnv.executeSql("create table source_sensor (id string,ts bigint,vc int)\n" + "with('connector' = 'kafka',\n" + " 'topic' = 'topic_source',\n" + " 'properties.bootstrap.servers' = '192.168.23.190:9092',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json')"); tabEnv.executeSql("create table sink_sensor (id string,ts bigint,vc int)\n" + "with('connector' = 'kafka',\n" + " 'topic' = 'topic_sink',\n" + " 'properties.bootstrap.servers' = '192.168.23.190:9092',\n" + " 'format' = 'json')"); tabEnv.executeSql("insert into sink_sensor select * from source_sensor"); } }