Spark操作sql

2021/03/04 Spark

Spark SQL

Spark SQL是 Spark框架上用来处理结构化和半结构化数据的接口。用户借助于Spark SQL可以方便、高效地构建Spark大数据平台上的数据仓库,为Spark带来了通用、高效、多元一体的结构化数据处理能力。

Spark SQL优点

  • 内存列存储 Spark SQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储方式。该存储方式无论在空间占用量还是读取吞吐率上都占有很大优势。
  • 字节码生成技术 Spark 1.1.0增加了codegen模块,使用动态字节码生成技术对匹配的表达式采用特定的代码动态编译。
  • Scala代码优化 Spark SQL在使用Scala编写代码的时候,能够尽量避免低效的、容易引起GC(垃圾回收)的代码。

Spark SQL作为Spark平台上的交互式查询工具,主要具有如下特点:

  • 面向多种语言(Java、Scala、Python和R),能够将SQL语句集成到Spark应用程序中
  • 具有统一的数据访问方式,不仅兼容Hive,还可以从各种结构化数据源(例如JSON、Hive、Parquet等)中读取数据
  • 不仅能与Spark上的其他组件无缝衔接,还支持从类似商业智能软件(例如Tableau等)的外部工具中通过标准数据库连接器(JDBC/ODBC)连接Spark SQL进行查询
  • 重用了Hive的前端与MetaStore(元数据存储),完全兼容Hive数据的查询与UDFs

Spark SQL逻辑架构

与关系型数据库类似,Spark SQL执行的SQL语句由Projection、Data Source和Filter组成,分别对应查询过程中的Result、Data Source和Operation。Spark SQL执行一个SQL语句的完整过程通常涉及以下4个步骤:解析(Parse)、绑定(Bind)、优化(Optimize)和执行(Execute)。

SQL解析

Spark SQL有两个解析器:一个是Hive的SQL解析器,另一个是内置的简单SQL解析器。解析器对SQL语句的语法结构进行解析,提取出SQL语句中的关键词(如SELECT、FROM、Where等)、表达式、Projection和Data Source等,从而推断SQL语句是否规范。

逻辑计划元数据绑定和语义分析

这个阶段需要参照数据表的元数据信息,主要涉及表中各列的名称、数据类型等。在此基础上将SQL语句和数据库的数据字典进行绑定,如果Projection、Data Source等描述的内容均存在且符合相关的类型约束,则表示该SQL语句是可以执行的。

逻辑计划优化

Spark SQL以规则的方式针对各种优化条件进行映射,从而实现优化逻辑计划。一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划。

物理计划生成与执行

Spark SQL基于规则选取不同的物理执行算子把优化后的逻辑计划转换成物理计划,并最终提交给执行引擎以Operation→Data Source→Result的次序执行物理计划。在执行过程中,有时甚至不需要读取物理表就可以返回结果,例如重复执行刚提交运行过的SQL语句,可直接从数据库的缓冲池中获取查询结果。

Spark SQL CLI

CLI(Command-Line Interface,命令行界面)是指可在提示符下输入可执行指令的界面,用户通过键盘输入指令,计算机接收到指令后予以执行。Spark SQL CLI是指使用命令行界面直接输入SQL指令,然后提交给Spark集群执行,并在界面中显示运行过程和最终的结果。

DataFrame编程模型

DataFrame简介

相对于传统的MapReduce API,Spark通过RDD和函数式编程模式把分布式数据处理转换成分布式数据集的处理,提供了更高抽象层次的、更丰富的 API,极大地简化了并行程序的开发过程,提高了开发效率。然而,对于没有MapReduce和函数式编程经验的新手来说,熟练掌握Spark中的API仍然存在着一定的门槛。另一方面,数据科学家们所熟悉的 R、Pandas等传统数据处理工具虽然提供了直观的API,但却局限于单机处理,无法胜任大数据场景。为了降低Spark的使用门槛,推进其广泛应用,Spark 1.3.0引入了DataFrames。DataFrames同时支持Scala、Java与Python等多种语言,方便更多的数据分析师基于Spark开展数据分析工作。

DataFrame是一个以命名列方式组织的分布式数据集,与传统的关系型数据库中的表结构类似。与一般的RDD不同,DataFrame带有结构信息(即数据字段),即DataFrame所表示的表数据集的每一列都带有名称和类型,对于数据的内部结构具有很强的描述能力。因此Spark SQL可以对隐藏在DataFrame背后的数据源以及作用于DataFrame之上的变换进行有针对性的优化,从而大幅提升SQL查询的效率。

在Spark 1.3.0版本之前,Spark SQL使用SchemaRDD表示和存储结构化的数据,并借此实现对数据的一系列操作。SchemaRDD是存放Row对象的RDD,每个Row对象代表一行记录。SchemaRDD借助于记录中的结构信息实现了高效的数据存储。例如对于包含Name、Age、Height三个成员属性的Person类型数据集,分别采用RDD和DataFrame进行表示,逻辑框架如图6.9所示。区别于先前版本中的SchemaRDD,DataFrame不再直接自RDD继承,但可以通过相关方法实现到RDD的转换。DataFrame对SchemaRDD的功能进行了扩展,更加适用于处理分布式大数据场景。

创建DataFrames

在实际应用场景中通常会涉及多种类型的数据源。Spark SQL不仅支持从多种不同类型数据源加载文件直接创建DataFrame,还支持将DataFrame保存为多种不同类型的数据文件。

基于Parquet类型文件创建DataFrame

加载与保存DataFrame的最简单的方式是使用默认的数据源类型, Spark SQL默认的数据源类型为Parquet。Parquet是一种流行的列式文件存储格式,可以高效地存储具有嵌套字段的记录。Parquet格式在 Hadoop中被广泛使用。Spark SQL提供了直接读取和存储 Parquet格式文件的方法。基于Parquet数据源创建DataFrame可使用parquetFile方法实现。

基于JSON类型文件创建DataFrame

若一个JSON文件中的数据遵循相同的结构信息,那么Spark SQL可以通过扫描该文件推测出该结构信息,并允许用户直接使用列名访问列下的各项数据。对于一个包含大量JSON文件的目录,Spark SQL的结构信息推断功能能够让用户非常高效地对结构化的数据进行操作,而无须编写专门的代码来读取不同结构的文件。基于JSON数据源创建DataFrame可使用jsonFile方法实现。

基于RDD创建DataFrame

Spark SQL支持两种方法将存在的RDD转换为DataFrame。

  • 通过反射机制创建DataFrame Spark SQL的Scala接口支持将包含样本类的RDD自动转换为DataFrame。首先通过样本类定义数据的模式;然后Spark SQL通过反射读出样本类中的参数名称,并作为表中字段的名称,样本类可以嵌套或者包含复杂的类型(如序列、数组等);最后注册为一个表以供后续的SQL查询使用。

保存DataFrames

保存为临时表

使用registerTempTable可创建一个临时表,并将DataFrame保存至该表中。临时表的生命周期包含于其所在的sqlContext或hiveContext实例中,即在应用退出后临时表会自动销毁。需要注意,一个sqlContext(或hiveContext)中的临时表不能同时被另一个sqlContext(或hiveContext)使用。

保存为持久化表

相对于registerTempTable,采用saveAsTable方法将数据以表的形式持久化到存储系统上,因此只要连接存在,Spark重启后表中数据仍然存在。

缓存表

缓存表是将数据存储到内存中,是Spark SQL性能调优的一种重要方式。Spark SQL可以通过sqlContext调用cacheTable方法将表以列优先的形式存储到内存。当Spark进行相关SQL查询时,将会仅仅浏览需要的列并且自动地压缩数据,以减少内存的使用以及垃圾回收的压力。可以通过sqlContext调用 uncacheTable方法删除缓存在内存中的表。需要注意,如果使用先前版本中的schemaRDD调用 cache方法,表将不会以列优先形式存储。为了提高 SQL查询效率,推荐使用DataFrame调用cacheTable方法。

Search

    微信好友

    博士的沙漏

    Table of Contents