Spark核心编程

2021/03/02 Spark

Spark核心编程

Spark是基于内存的大数据综合处理框架,为大数据处理提供了一个一体化解决方案,而该方案的设计与实现都是基于一个核心概念展开的,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。

RDD定义

RDD可以理解为由若干个元素构成的分布式集合以及其上的操作,它是Spark中数据的重要组织形式。与MapReduce不同,Spark针对RDD提供了更加丰富的操作,而不只局限于Map和Reduce,用户利用这些操作可以非常方便地编写出复杂的业务逻辑,然后Spark会自动将RDD中的数据与相关任务分发到集群上,并行化地去执行。

Apache将RDD定义为弹性分布式数据集,它是Spark应用程序中数据的基本组织形式。弹性意味着RDD能够自动地进行内存和磁盘数据存储的切换,并且具有非常高的容错性;分布式说明RDD是一个存储在多个节点上的海量数据集合。RDD是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合。RDD具有自动容错、位置感知调度和可伸缩性等数据流模型的特点。

RDD (Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。

创建RDD

RDD可从现有的集合创建。比如在Scala shell中:

val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)

RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat 接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:

val rddFromTextFile = sc.textFile("LICENSE")

上述代码中的textFile 函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String (字符串)对象。

Spark操作

创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext 的那个驱动程序。

Spark程序中最常用的转换操作便是map 操作。该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size 函数。之前我们曾创建过一个这样的由若干String 构成的RDD对象。通过map 函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int 构成的RDD对象。

val intsFromStringsRDD = rddFromTextFile.map(line => line.size)

其输出应与如下类似,其中也提示了RDD的类型:

intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at <console>:14

现在我们可以调用一个常见的执行操作count ,来返回RDD中的记录数目。

intsFromStringsRDD.count

Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。 举例来说,用下面的一行代码可以得到和上面例子相同的结果:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。

这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:

val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)

注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum ,该计算将会被触发:

val computation = transformedRDD.sum

RDD缓存策略

Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache 函数来实现:

rddFromTextFile.cache

调用一个RDD的cache 函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache 函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

如果现在在已缓存了的RDD上调用count 或sum 函数,应该可以感觉到RDD的确已经载入到了内存中:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).
sum / rddFromTextFile.count

广播变量和累加器

Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。

广播变量 (broadcast variable)为只读 变量,它由运行SparkContext 的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext 上调用一个方法即可:

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

累加器 (accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value 访问。

Search

    微信好友

    博士的沙漏

    Table of Contents