Spark基础入门
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。
Spark的发展
第一代大数据生态系统Hadoop已经非常成功,其采用HDFS实现分布式存储,使用MapReduce进行分布式计算。MapReduce是一个简单通用和自动容错的批处理计算模型,不仅极大地简化了并行程序的开发过程,而且提高了程序执行的效率。然而在 MapReduce任务内部,为了防止 Reduce任务的失败,Map通常会把结果存储在磁盘上。由于MapReduce每次都需要将中间数据写回磁盘,导致网络通信、磁盘I/O等消耗了大量的系统资源,尤其是在处理具有较多迭代次数的计算任务时,这一缺点尤为突出。因此 MapReduce的运算性能仍难以满足面向大数据的交互式查询、迭代计算、流式计算等方面的需求。为了弥补MapReduce这一不足,涌现出了一系列专用的数据处理模型,例如 Storm、Impala、GraphLab等。随着新模型的不断出现,不同类型的作业需要一系列不同的处理框架才能很好地完成,然而这无形中又增加了系统在部署、测试、运维等方面的成本。
针对MapReduce及各种专用数据处理模型在计算性能、集成性、部署运维等方面的问题,2009年美国加洲大学伯克利分校开始研发全新的大数据处理框架,即Spark。2010年,Spark实现开源。自从2013年Spark进入Apache的孵化器项目后,发生了翻天覆地的变化。2014年初,Spark成为了Apache排名第三的顶级项目,其发展势头更加迅猛,一个多月左右就会发布一个小版本,两三个月左右会发布一个大版本,2015年6月份发布了1.4.0,2015年9月份发布了1.5.0,至本书作者执笔时已经发布到2.0。
Spark的迅猛发展和其突出的计算性能引起了大数据处理相关领域的广泛关注,使得其迅速成为了业内一门具有强劲竞争力的热门技术。在短短的三年时间里,世界各地开设了多次 Spark主题峰会,反映出Spark技术的前沿性与火爆的发展势头。在2014年的Spark峰会上,Hadoop三大发行商均声称未来将会把精力投入到Spark的研究中;Yahoo有全世界最强大的Hadoop集群(Hadoop的70%是由Yahoo贡献的),但早在几年前Yahoo便已经开启了Spark的研发工作;Intel、IBM等大公司纷纷宣布其产品支持Spark;亚马逊完全基于Spark搭建了云服务平台;Google、Facebook也陆续开展了转向Spark框架的研发工作。
同样也是在2014年Spark在中国的发展达到了一个前所未有的火爆状态。国内许多重量级的数据企业纷纷搭建了自己的Spark集群。例如2015年百度搭建了一个大规模的Spark集群,其中最大单集群规模达上千台节点,包含了数万个核心和上百 TB的内存,与此同时公司内部还运行着大量的小型Spark集群。淘宝的推荐系统已经用Spark取代了部分的MapReduce。腾讯通过Spark对数据实时采集、算法实时训练、系统实时预测,进而实现了大数据背景下的精准推荐。目前国内最大规模的Spark集群来自于腾讯,其中包含了8000个节点,而最大的单个Job则来自于阿里巴巴。虽然上述集群中节点数目看似不多,但是1000个节点的Spark集群,其性能相当于包含5000个节点的Hadoop集群。
什么是Spark
Apache官网对Spark定义如下:
“Apache Spark is a fast and general engine for large-scale data processing”
由此可见,Spark 是一个快速通用的大规模数据处理引擎。Spark的功能看似与Hadoop相同,但是两者却有着明显的区别。Hadoop是一个开源分布式计算平台,是第一代大数据生态系统,也是目前应用最为广泛的大数据生态系统。Hadoop以分布式文件系统 HDFS和分布式计算 MapReduce为核心,为用户提供了系统底层细节透明的分布式基础框架。HDFS的高容错性、高伸缩性等优点允许用户将Hadoop部署在低廉的硬件上;M apReduce分布式编程模型允许用户在不了解分布式系统底层细节的情况下开发并行应用程序。因此用户利用Hadoop能够轻松地组织计算机资源,从而搭建自己的分布式计算平台,并且可以充分利用集群的计算和存储能力完成海量数据的处理。Spark作为大数据处理引擎与Hadoop有着密切的联系,但是两者并非是一个层面的概念。Hadoop不仅有数据的处理还有数据的存储,而Spark仅仅是大数据处理框架。因此Spark其实与Hadoop上的MapReduce是一个层面的概念。目前的Hadoop已经趋于完善,其中的Yarn与HDFS非常经典,已经成为了业内大数据存储和分布式资源管理的标准,在未来短时间内难以被轻易取代。很多资料均预言 Spark将取代Hadoop,其实是指取代Hadoop中的MapReduce。
Spark主要特征
Spark是一种基于内存的、分布式的、大数据处理框架,它与Hadoop上的MapReduce是一个层面上的概念,这意味着两者在诸多方面存在着竞争与可比性。本节将通过与MapReduce的对比分析来介绍Spark的主要特征。
-
快速 面向磁盘的MapReduce受限于磁盘读/写性能和网络I/O性能的约束,在处理迭代计算、实时计算、交互式数据查询等方面并不高效,但是这些却在图计算、数据挖掘和机器学习等相关应用领域中非常常见。针对这一不足,将数据存储在内存中并基于内存进行计算是一个有效的解决途径。Spark是面向内存的大数据处理引擎,这使得 Spark能够为多个不同数据源的数据提供近乎实时的处理性能,适用于需要多次操作特定数据集的应用场景。 Spark与MapReduce相比在计算性能上有如此显著的提升,主要得益于以下两方面。 1).Spark是基于内存的大数据处理框架 Spark既可以在内存中处理一切数据,也可以使用磁盘来处理未全部装入到内存中的数据。由于内存与磁盘在读/写性能上存在巨大的差距,因此CPU基于内存对数据进行处理的速度要快于磁盘数倍。然而MapReduce对数据的处理是基于磁盘展开的。 2).Spark具有优秀的作业调度策略 Spark中使用了有向无环图(Directed Acyclic Graph,DAG)这一概念。一个Spark应用由若干个作业构成,首先 Spark将每个作业抽象成一个图,图中的节点是数据集,图中的边是数据集之间的转换关系;然后Spark基于相应的策略将DAG划分出若干个子图,每个子图称为一个阶段,而每个阶段对应一组任务;最后每个任务交由集群中的执行器进行计算。
-
简洁易用 Spark的易用性还体现在其针对数据处理提供了丰富的操作。Spark提供了80多个针对数据处理的基本操作,如 map、flatMap、reduceByKey、filter、cache、collect、textFile等,这使得用户基于Spark进行应用程序开发非常简洁高效。
-
通用 Spark框架包含了多个紧密集成的组件。位于底层的是Spark Core,其实现了Spark的作业调度、内存管理、容错、与存储系统交互等基本功能,并针对弹性分布式数据集提供了丰富的操作。在Spark Core的基础上,Spark提供了一系列面向不同应用需求的组件,主要有Spark SQL、Spark Streaming、MLlib、GraphX。
Spark组件
Spark SQL
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发者将SQL语句融入到Spark应用程序开发过程中,无论是使用Python、Java还是Scala,用户可以在单个的应用中同时进行SQL查询和复杂的数据分析。由于能够与Spark所提供的丰富的计算环境紧密结合,Spark SQL得以从其他开源数据仓库工具中脱颖而出。 Spark SQL在Spark l.0中被首次引入。在Spark SQL之前,美国加州大学伯克利分校曾经尝试修改Apache Hive以使其运行在Spark上,进而提出了组件Shark。然而随着Spark SQL的提出与发展,其与Spark引擎和API结合得更加紧密,使得Shark已经被Spark SQL所取代。
Spark Streaming
众多应用领域对实时数据的流式计算有着强烈的需求,例如网络环境中的网页服务器日志或是由用户提交的状态更新组成的消息队列等,这些都是实时数据流。Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。由于这些API与Spark Core中的基本操作相对应,因此开发者在熟知Spark核心概念与编程方法之后,编写Spark Streaming应用程序会更加得心应手。从底层设计来看,Spark Streaming支持与Spark Core同级别的容错性、吞吐量以及可伸缩性。
MLlib
MLlib是Spark提供的一个机器学习算法库,其中包含了多种经典、常见的机器学习算法,主要有分类、回归、聚类、协同过滤等。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化基础算法。所有这些方法都被设计为可以在集群上轻松伸缩的架构。
GraphX
GraphX是Spark面向图计算提供的框架与算法库。GraphX中提出了弹性分布式属性图的概念,并在此基础上实现了图视图与表视图的有机结合与统一;同时针对图数据处理提供了丰富的操作,例如取子图操作subgraph、顶点属性操作mapVertices、边属性操作mapEdges等。GraphX还实现了与Pregel的结合,可以直接使用一些常用图算法,如PageRank、三角形计数等。
上述这些 Spark核心组件都以 jar包的形式提供给用户,这意味着在使用这些组件时,与MapReduce上的Hive、Mahout、Pig等组件不同,无需进行复杂烦琐的学习、部署、维护和测试等一系列工作,用户只要搭建好 Spark平台便可以直接使用这些组件,从而节省了大量的系统开发与运维成本。将这些组件放在一起,就构成了一个Spark软件栈。基于这个软件栈,Spark提出并实现了大数据处理的一种理念——“一栈式解决方案(one stack to rule them all)”,即Spark可同时对大数据进行批处理、流式处理和交互式查询。
多种运行模式
Spark支持多种运行模式:本地单机模式、集群单机模式、基于Mesos、基于YARN。
- 本地单机模式 所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中。
- Spark Standalone模式 该模式是不借助于第三方资源管理框架的完全分布式模式。Spark使用自己的Master进程对应用程序运行过程中所需的资源进行调度和管理。对于中小规模的Spark集群首选Standalone模式。
- Spark on Yarn模式 在这一模式下,Spark作为一个提交程序的客户端将Spark任务提交到Yarn上,然后通过Yarn来调度和管理Spark任务执行过程中所需的资源。在搭建此模式的Spark集群过程中,需要先搭建Yarn集群,然后将Spark作为Hadoop中的一个组件纳入到Yarn的调度管理下,这样将更有利于系统资源的共享。
- Spark on Mesoes模式 Spark和资源管理框架Mesos相结合的运行模式。Apache Mesos与Yarn类似,能够将CPU、内存、存储等资源从计算机的物理硬件中抽象地隔离出来,搭建了一个高容错、弹性配置的分布式系统。Mesos同样也采用Master/Slave架构,并支持粗粒度模式和细粒度模式两种调度模式。
Spark本地环境开发
Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。
Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集群上运行。
开发前需要准备好JDK、Scala、Hadoop、Spark环境。步骤如下:
- 安装JDK JRE(Java运行时环境)或JDK(Java开发套件)是要安装的
- 安装Scala Spark的运行依赖Scala编程语言。好在预编译的二进制包中已包含Scala运行环境,我们不需要另外安装Scala便可运行Spark。
- 配置Spark环境变量 本地构建Spark环境的第一步是下载其最新的版本包。各个版本的版本包及源代码的GitHub地址可从Spark项目的下载页面找到:http://spark.apache.org/downloads.html 。 为了访问HDFS (Hadoop Distributed File System,Hadoop分布式文件系统)以及标准或定制的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。
测试开发环境
下载完上述版本包后,解压,并在终端进入解压时新建的主目录:
>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
>cd spark-1.2.0-bin-hadoop2.4
# 用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试是否一切正常:
>./bin/run-example org.apache.spark.examples.SparkPi
该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示:
Job 0 finished: reduce at SparkPi.scala:38, took 0.802186 s
Pi is roughly 3.146995734978675
要在本地模式下设置并行的级别,以local[N]
的格式来指定一个master 变量即可。上述参数中的N 表示要使用的线程数目。比如只使用两个线程时,可输入如下命令:
MASTER=local[2] ./bin/run-example org.apache.spark.examples.SparkPi
开始Java语言进行Spark开发,local模式应用程序开发 Maven依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
编写测试代码
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class LocalFileDemo {
public static void main(String[] args) {
String logFile = "file:///D:/Hello.txt";
SparkConf conf = new SparkConf().setAppName("LocalFileDemo").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> fileRdd = sc.textFile(logFile).cache();
long numAs = fileRdd.filter(x->x.contains("a")).count();
long numBs = fileRdd.filter(x->x.contains("b")).count();
System.out.println("Lines with a: " + numAs + ",lines with b: " + numBs);
}
}
Spark集群
Spark集群由两类程序构成:一个驱动程序和多个执行程序。本地模式时所有的处理都运行在同一个JVM内,而在集群模式时它们通常运行在不同的节点上。
举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:
- 一个运行Spark单机主进程和驱动程序的主节点;
- 各自运行一个执行程序进程的多个工作节点。
Spark安装和配置
基于docker安装单节点spark测试环境
docker pull sequenceiq/spark:1.6.0
Spark编程模型
SparkContext
任何Spark程序的编写都是从SparkContext (或用Java编写时的JavaSparkContext )开始的。SparkContext 的初始化需要一个SparkConf 对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。
初始化后,我们便可用SparkContext 对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。
若要用Scala代码来实现的话,可参照下面的代码:
val conf = new SparkConf()
.setAppName("Test Spark App")
.setMaster("local[4]")
val sc = new SparkContext(conf)
Spark shell
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。