Flink组件栈

2021/04/10 Flink

Flink编程SQL

通过对前面章节的学习,我们对Flink的基本编程接口有了一定的认识和了解,在本章将重点介绍Flink在不同的应用领域中所提供的组件栈,其中包括构建复杂事件处理应用的FlinkCEP组件栈,构建机器学习应用的FlinkML组件栈,以及构建图计算应用的Gelly组件栈。这些组件栈本质上都是构建在DataSet或DataStream接口之上的,其主要目的就是方便用户构建不同应用领域的应用。

Flink复杂事件处理

复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。目前主流的CEP工具有Esper、Jboss Drools和商业版的MicroSoft StreamInsight等,Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。

基础概念

环境准备

在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。与FlinkCEP对应的Maven Dependence如下(将如下配置添加到本地Maven项目工程的Pom.xml文件中即可)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

基本概念

事件定义

  • 简单事件 简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。例如通过对当天的订单总额按照用户维度进行汇总统计,超过一定数量之后进行报告。这种情况只需要计算每个用户每天的订单金额累加值,达到条件进行输出即可。
  • 复杂事件 相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。

事件关系

复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。

  • 时序关系 动作事件和动作事件之间,动作事件和状态变化事件之间,都存在时间顺序。事件和事件的时序关系决定了大部分的时序规则,例如A事件状态持续为1的同时B事件状态变为0等。
  • 聚合关系 动作事件和动作事件之间,状态事件和状态事件之间都存在聚合关系,即个体聚合形成整体集合。例如A事件状态为1的次数为10触发预警。
  • 层次关系 动作事件和动作事件之间,状态事件和状态事件之间都存在层次关系,即父类事件和子类事件的层次关系,从父类到子类是具体化的,从子类到父类是泛化的。
  • 依赖关系 事物的状态属性之间彼此的依赖关系和约束关系。例如A事件状态触发的条件前提是B事件触发,则A与B事件之间就形成了依赖关系。
  • 因果关系 对于完整的动作过程,结果状态为果,初始状态和动作都可以视为原因。例如A事件状态的改变导致了B事件的触发,则A事件就是因,而B事件就是果。

事件处理

复杂事件处理的目的是通过相应的规则对实时数据执行相应的处理策略,这些策略包括了推断、查因、决策、预测等方面的应用。

  • 事件推断 主要利用事物状态之间的约束关系,从一部分状态属性值可以推断出另一部分的状态属性值。例如由三角形一个角为90度及另一个角为45度,可以推断出第三个角为45度。
  • 事件查因 当出现结果状态,并且知道初始状态,可以查明某个动作是原因;同样当出现结果状态,并且知道之前发生了什么动作,可以查明初始状态是原因。当然反向的推断要求原因对结果来说必须是必要条件。
  • 事件决策 想得到某个结果状态,知道初始状态,决定执行什么动作。该过程和规则引擎相似,例如某个规则符合条件后触发行动,然后执行报警等操作。
  • 事件预测 该种情况知道事件初始状态,以及将要做的动作,预测未发生的结果状态。例如气象局根据气象相关的数据预测未来的天气情况等。

Pattern API

FlinkCEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。通过使用Pattern API构建CEP应用程序,其中包括输入事件流的创建,以及Pattern接口的定义,然后通过CEP.pattern方法将定义的Pattern应用在输入的Stream上,最后使用PatternStream.select方法获取触发事件结果。

以下实例是将温度大于35度的信号事件抽取出来,并产生事件报警,最后将结果输出到外部数据集中。

//创建输入事件流
val inputStream: DataStream[Event] = ...
//定义Pattern接口
val pattern = Pattern
  .begin[Event]("start") 
  .where(_.getType == "temperature")
  .next("middle")
  .subtype(classOf[TempEvent])
  .where(_.getTemp >= 35.0)
  .followedBy("end")
  .where(_.getName == "end")
//将创建好的Pattern应用在输入事件流上
val patternStream = CEP.pattern(inputStream, pattern)
//获取触发事件结果
val result: DataStream[Result] = patternStream.select(getResult(_))

模式定义

个体Pattern可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。

个体Pattern都是通过begin方法定义的,例如以下通过Pattern.begin方法定义基于Event事件类型的Pattern,其中是指定的PatternName象。

val start = Pattern.begin[Event]("start_pattern")

下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。

start.where(event => event.getType == "temperature")

指定循环次数

对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern,且有3种方式来指定循环方式。

  • times:可以通过times指定固定的循环执行次数。
    //指定循环触发4次
    start.times(4);
    //可以执行触发次数范围,让循环执行次数在该范围之内
    start.times(2, 4);
    
  • optional:也可以通过optional关键字指定要么不触发要么触发指定的次数。
    start.times(4).optional();
    start.times(2, 4).optional();
    
  • greedy:可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发。
    //触发2、3、4次,尽可能重复执行
    start.times(2, 4).greedy();
    //触发0、2、3、4次,尽可能重复执行
    start.times(2, 4).optional().greedy();
    
  • oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
    // 触发一次或者多次
    start.oneOrMore();
    //触发一次或者多次,尽可能重复执行
    start.oneOrMore().greedy();
    // 触发0次或者多次
    start.oneOrMore().optional();
    // 触发0次或者多次,尽可能重复执行
    start.oneOrMore().optional().greedy();
    
  • timesOrMor:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
    // 触发两次或者多次
    start.timesOrMore(2);
    // 触发两次或者多次,尽可能重复执行
    start.timesOrMore(2).greedy();
    // 不触发或者触发两次以上,尽可能重复执行
    start.timesOrMore(2).optional().greedy();
    

定义模式条件

每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Iterative Conditions、Simple Conditions及Combining Conditions三种类型。

  • 迭代条件:Iterative Conditions能够对前面模式所有接收的事件进行处理,根据接收的事件集合统计出计算指标,并作为本次模式匹配中的条件输入参数。如代码清单8-2所示,通过subtype将Event事件转换为TempEvent,然后在where条件中通过使用ctx.getEventsForPattern(…)方法获取“middle”模式所有接收的Event记录,并基于这些Event数据之上对温度求取平均值,然后判断当前事件的温度是否小于平均值。
    middle.oneOrMore()
    .subtype(classOf[TempEvent])
    .where(
      (value, ctx) => {
        lazy val avg = ctx.getEventsForPattern("middle").map(_.getValue).avg
        value.getName.startsWith("condition") && value.getPrice < avg
          }
      )
    
  • 简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。如以下代码将Sensor事件中Type为temperature的事件筛选出来。
    start.where(event => event.getType == "temperature"))
    

可以通过subtype对事件进行子类类型转换,然后在where方法中针对子类定义模式条件。

start.subtype(classOf[TempEvent]).where(tempEvent => event.getValue > 10)
  • 组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,如以下代码直接使用or方法连接条件即可。
    pattern.where(event => event.getName.startWith("foo").or(event => 
    event.getType == "temperature")
    
  • 终止条件 如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。
    pattern.oneOrMore().until(event => event.getName == "end")
    

    需要注意的是,在上述迭代条件中通过调用ctx.getEventsForPattern(“middle”)的过程中,成本相对较高,会产生比较大的性能开销,因此建议用户尽可能少地使用该方式。

模式序列

Flink Gelly图计算应用

早在2010年,Google就推出了著名的分布式图计算框架Pregel,之后Apache Spark社区也推出GraphX等图计算组件库,以帮助用户有效满足图计算领域的需求。Flink则通过封装DataSet API,形成图计算引擎Flink Gelly。同时Gelly中的Graph API,基本涵盖了从图创建,图转换到图校验等多个方面的图操作接口,让用户能够更加简便高效地开发图计算应用。本节将重点介绍如何通过Flink Gelly组件库来构建图计算应用。

基本概念

在使用Flink Gelly组件库之前,需要将Flink Gelly依赖库添加到工程中。用户如果使用Maven作为项目管理工具,需要在本地工程的Pom.xml文件中添加如下Maven Dependency配置。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-gelly_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

对于使用Scala语言开发Flink Gelly应用的用户,需要添加如下Maven Dependency配置。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-gelly-scala_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

数据结构

FlinkML机器学习应用

机器学习是一门多领域交叉学科,涵盖设计概率论、统计学、逼近轮等多门学科。机器学习主要是用来设计和分析一些可以让机器自动学习的算法,这些算法是从数据中自动分析获取规律,并利用已经学习的规律对未知数据进行预测,产生预测结果。而机器学习共分为几大种类型,分别是监督学习、半监督学习、无监督学习以及强化学习,每种学习类型具有相应的算法集。例如对于无监督学习来说,对应的是聚类算法,有监督学习则对应的是分类与回归算法等。

和其他分布式处理框架一样,Flink基于DataSet API之上,封装出针对机器学习领域的组件栈FlinkML。在FlinkML中提供了诸如分类、聚类、推荐等常用的算法,用户可以直接使用这些算法构建相应的机器学习应用。虽然目前FlinkML中集成的算法相对较少,但是Flink社区会在未来的版本中陆续集成更多的算法。

基本概念

环境准备

FlinkML组件库作为Flink的应用库,并没有直接集成在集群环境中,因此需要在工程中单独引入对应的Maven依赖配置,可以将代码清单8-9的配置引入到工程Pom.xml中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

数据结构

和SparkMLib一样,FlinkML也是通过使用Breeze库实现了底层的向量和矩阵等数据结构,用以辅助构建算法模型,其中Breeze库提供了Vector/Matrix的实现以及对应的计算接口。在FlinkML中则使用了自己定义的Vector和Matrix,只是在具体的计算过程中通过转换为Breeze的形式进行运算。

在FlinkML中可以通过两种方式来构建Vector数据,分别是读取LibSVM数据或读取文件数据转换成DataSet[String]数据集,然后再通过Map算子将数据集转换为DataSet[LabelVector]数据集。

通过读取LibSVM数据

FlinkML中提供了MLUtils类的readLibSVM方法,用于读取LIBSVM格式类型的数据,同时提供writeLibSVM方法将Flink中的数据以LIBSVM格式输出到外部文件系统中。

import org.apache.flink.ml.MLUtils
//读取LibSVM数据
val trainData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/svmfile1")
val predictData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/svmfile2")
//写出LibSVM数据
val svmData: DataSet[LabeledVector] = ...
val dataSink:DataSink[String] = MLUtils. writeLibSVM("/path/svmfile2", svmData)

读取CSV文件转换

可以通过DataSet API中提供读取文件数据的方式,将外部数据读取到Flink系统中,并通过Map函数将数据集转换成LabelVector类型。如以下代码通过readCsvFile方法将数据集读取进来并转换为LabeledVector数据结构。

val trainCsvData = env.readCsvFile[(String, String, String, 
String)]("/path/svm_train.data")
val trainData:DataSet[LabeledVector] = trainCsvData.map { data =>
  val numList = data.productIterator.toList.map(_.asInstanceOf[String].toDouble)
  LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
}

有监督学习算子

目前在Flink有监督算子分类算法中,FlinkML已经支持了SVM算法、多元线性回归算法以及K-Nearest算法等常用分类算法。

SVM算法

在机器学习中,支持向量机(SVM)是在分类与回归分析中分析数据的监督式学习模型。SVM模型是将实例表示为空间中的点,通过映射使得单独类别的实例被尽可能宽的明显间隔分开,然后将新的实例映射到同一空间中,并基于它们落在间隔的哪一侧来预测所属类别。除了进行线性分类外,SVM还可以使用所谓的核技巧有效地进行非线性分类,并将其输入隐式映射到高维特征空间中。

在Flink中SVM是一个预测函数,含有fit和predict两个方法,其中fit方法是基于训练数据集进行转化和训练,生成SVM模型;而Predict方法使用已经训练好的模型进行预测,并生成LabelVector类型预测结果。

参数配置

应用举例

通过给定训练和测试数据集,使用SVM算法构建分类模型并使predict进行预测。

val env = ExecutionEnvironment.getExecutionEnvironment
//指定训练数据集和测试数据集
val trainLibSvmFile: String = ...
val testLibSvmFile: String = ...
// 读取训练LibSVM数据集
val trainingDS: DataSet[LabeledVector] = env.readLibSVM(trainLibSvmFile)
// 创建SVM算子,并制定Blocks数量为10
val svm = SVM().setBlocks(10)
// 训练SVM模型
svm.fit(trainingDS)
// 读取SVM测试数据集,对模型进行评估
val testingDS= env.readLibSVM(pathToTestingFile).map(_.vector)
// 通过predict方法对测试数据集进行预测,产生预测结果
val predictionDS: DataSet[(Vector, Double)] = svm.predict(testingDS)

多元线性回归

多元线性回归模型主要用回归方程描述一个因变量与多个自变量的依存关系。和其他预测算法一样,在Flink ML中多元线性回归算法中包含fit和predict两个方法,分别对用户训练模型和基于模型进行预测,其中fit方法传入的是LablePoint数据结构的dataset数据集,并返回模型结果。predict方法中可以有所有Vector接口的实现类,方法结果返回的是含有输入参数和Double类型的打分结果。

参数配置

基于给定的训练和测试数据集,通过使用多元线性回归完成模型的构建和预测。

// 创建多元线性回归学习器
val mlr = MultipleLinearRegression()
.setIterations(10)
.setStepsize(0.5)
.setConvergenceThreshold(0.001)
// 创建训练集和测试集
val trainingDS: DataSet[LabeledVector] = ...
val testingDS: DataSet[Vector] = ...
// 将定义好的模型适配到数据集上进行模型训练
mlr.fit(trainingDS)
// 使用测试数据集进行模型预测,产生预测结果
val predictions = mlr.predict(testingDS)

数据预处理

在FlinkML中实现了基本的数据预处理方法,其中包括多项式特征、标准化、区间化等常用方法,这些算子都继承于Transformer接口,并使用fit方法从训练集学习模型(例如,归一化的平均值和标准偏差)。

特征加工的过程中,通过增加一些输入数据的非线性特征来增加模型的复杂度通常是非常有效的,可以获得特征的更高维度和相互间的关系项。当多项式特征比较少的时候,可以对很少的特征进行多项式变化,产生更多的特征。多项式变换就是把现有的特征排列组合相乘,例如如果是degree为2的变换,则会把现有的特征中,抽取两个相乘,并获得所有组合的结果。

val env = ExecutionEnvironment.getExecutionEnvironment
  // 获取训练数据集
  val trainingDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.2,Vector()))
  // 设定多项式转换维度为3
  val polyFeatures = PolynomialFeatures()
    .setDegree(3)
  //使用PolynomialFeatures进行特征转换
  polyFeatures.fit(trainingDS)

标准化处理函数的主要目的是根据用户统计出来的均值和方差对数据集进行标准化处理,防止因为度量单位不同而导致计算出现的偏差。标准化处理函数通过使用均值来对某个特征进行中心化,然后除以非常量特征(non-constant features)的标准差进行缩放。

FlinkML中提供了StandardScaler类帮助用户对数据进行标准化处理,其中包含fit和transform两个方法,其中fit方法通过基于给定数据集中学习平均值和标准偏差fit方法定义如下:

fit[T <: Vector]: DataSet[T] => Unit
fit: DataSet[LabeledVector] => Unit

transform方法的定义如下,其主要目的是完成对数据集标准化处理。

transform[T <: Vector]: DataSet[T] => DataSet[T]
transform: DataSet[LabeledVector] => DataSet[LabeledVector]

标准化处理的StandardScaler类主要包含2个参数:其中Mean表示缩放数据集的期望值,默认值为0.0;Std表示缩放数据集的标准偏差,默认值为1.0。

/ 创建标准化函数,并设定平均值为5.0,标准偏差为2.0
val scaler = StandardScaler()
.setMean(5.0)
.setStd(2.0)
// 读取数据集
val dataSet: DataSet[Vector] = ...
// 从训练数据集中学习平均值和标准偏差值
scaler.fit(dataSet)
// 对给定数据集进行缩放,使其平均值mean=10.0标准偏差std=2.0
val result = scaler.transform(dataSet)

区间缩放是将某一列向量根据最大值和最小值进行区间缩放,将指标转换到指定范围的区间内,从而尽可能地使特征的度量标准保持一致,避免因为某些指标比较大的特征在训练模型过程中占有太大的权重,影响整个模型的效果。

FlinkML中通过MinMaxScaler类实现区间缩放功能,其也实现了Transformer接口,包含了fit方法和transform方法,MinMaxScaler通过fit方法进行训练,训练数据可以是Vector的子类型或者是LabeledVector类型。然后通过transform方法对数据集进行区间缩放操作,并产生新的区间缩放后的数据集。

// 创建MinMax缩放器
val minMaxscaler = MinMaxScaler()
  .setMin(-1.0)
// 创建DataSet输入数据集
val dataSet: DataSet[Vector] = ...
// 学习给定数据集中的最大值和最小值
minMaxscaler.fit(dataSet)
// 将给定的数据集转换成-1到1之间的集合
val scaledDS = minMaxscaler.transform(dataSet)

推荐算法

Alternating Least Squares (ALS)

ALS算法也被称为为交替最小二乘算法,是目前业界使用相对广泛协同过滤算法。ALS算法通过观察用户对商品的打分,来判断每个用户的喜好并向用户推荐合适的商品。从协同过滤的角度分析,ALS算法属于User-Item CF,即同时考虑了User和Item两个方面的内容,用户和商品的数据,可以抽象成三元组<User,Item,Rating>。

目前ALS算法已经集成到FlinkML库中,ALS是一个Predictor函数类,具有fit和predict方法。

基于给定数据集,通过使用ALS算法构建推荐模型并基于模型进行结果预测。

// 读取训练数据集
val trainingDS: DataSet[(Int, Int, Double)] = ...
// 读取测试数据集
val testingDS: DataSet[(Int, Int)] = ...
// 设定ALS学习器
val als = ALS()
.setIterations(100)
.setNumFactors(10)
.setBlocks(100)
.setTemporaryPath("hdfs://temporary/Path")
// 通过ParameterMap计算额外参数
val parameters = ParameterMap()
.add(ALS.Lambda, 0.9)
.add(ALS.Seed, 42L)
// 计算隐式分解
als.fit(trainingDS, parameters)
// 根据测试数据集,计算推荐结果
val result = als.predict(testingDS)

Pipelines In FlinkML

机器学习已经被成功应用到多个领域,如智能推荐、自然语言处理、模式识别等。但是不管是什么类型的机器学习应用,其实都基本遵循着相似的流程,包括数据源接入、数据预处理、特征抽取、模型训练、模型预估、模型可视化等步骤。如果能够将这些步骤有效连接并形成流水线式的数据处理模式,将极大地提升机器学习应用构建的效率。受Scikit-Learn开源项目的启发,在FlinkML库中提供了Pipelines的设计,将数据处理和模型预测算子进行连接,以提升整体机器学习应用的构建效率。同时,FlinkML中的算法基本上都实现于Transformers和Predictors接口,主要目的就是为了能够提供一整套的ML Pipelines,帮助用户构建复杂且高效的机器学习应用。

// 获取训练数据集
val trainingDS: DataSet[LabeledVector] = ...
// 获取测试数据集,无Label标签
val testingDS: DataSet[Vector] = ...
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()
// 构建Pipelines,将polyFeatures和mlr进行连接
val pipeline = polyFeatures
  .chainPredictor(mlr)
// 将创建的Pipeline应用到训练数据集上
pipeline.fit(trainingDS)
// 对测试数据集进行预测打分,产生预测结果
val result: DataSet[LabeledVector] = pipeline.predict(testingDS) 

除了可以使用FlinkML中已经定义的Transformers和Predictors之外,用户也可以自定义算子并应用在Pipelines中,完成整个机器学习任务链路的构建。FlinkML中实现对数据的转换操作的接口为Transformer,用于模型训练及预测的接口为Predictor。在整个Pipelines中,Transformer类型算子后面可以接其他算子,而Predictor类型算子后面则不能接入任何类型算子,也就是说,Predictor算子是整个Pipelines的终点。

Estimator接口是Transformer接口和Predictor接口的父类,如代码清单8-16所示,在Estimator中定义了fit方法,主要负责调用具体的算法实现逻辑,该方法中含有两个参数,分别为传入的训练数据集和Estimator所用到的参数。在FitOperation中定义了fit方法具体计算逻辑,Estimator中的fit方法借由包装类将计算逻辑通过调用隐式方法转换到FitOperation中的fit方法。同理Transformers类和Predictors类也是通过这种方式进行,因此用户在定义相关实现逻辑时需要有Scala隐式转换相关的知识。

trait Estimator[Self] extends WithParameters with Serializable {
  that: Self =>
  def fit[Training](
      training: DataSet[Training],
      fitParameters: ParameterMap = ParameterMap.Empty)
      (implicit fitOperation: FitOperation[Self, Training]): Unit = {
    FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
    fitOperation.fit(this, fitParameters, training)
  }
}

Search

    微信好友

    博士的沙漏

    Table of Contents