Pyspark实战

2022/11/08 Python

PySpark实战

Python PySpark是Spark官方提供的一个Python类库,其中内置了完全的Spark API,使得Python用户在导入这个类库后,可以使用自己熟悉的Python语言来编写Spark应用程序,并最终将程序提交到Spark集群运行。

window本地环境PySpark安装

PySpark是基于Python语言开发的类库,仅支持在单机环境下供Python用户开发调试使用,需要将程序提交到Spark集群上才能使用Spark集群分布式的能力处理大规模的数据处理任务。

PySpark就是官方为了让Python用户更方便地使用Spark而开发出来的类库。Python用户不需要编写复杂的底层逻辑,只需要调用PySpark的API即可。

本文基于Windows 64位操作系统进行PySpark的安装演示。 现在主流的方式都是通过Anaconda来管理自己的Python环境了。安装PySpark

pip  pyspark 

搭建Spark环境。我们从官网或者国内镜像下载最新版本的Spark安装包,这里下载的是spark-3.3.2-bin-hadoop3.tgz,然后将其解压缩到合适的位置,这里比如是D:\spark。

https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.3.2/

然后我们需要新增一个环境变量:

SPARK_HOME=D:\spark\spark-3.3.2-bin-hadoop3

再将其添加到Path环境变量中:

%SPARK_HOME%\bin

然后我们在命令行尝试运行spark\bin下面的pyspark:

> pyspark
Python 3.10.9 | packaged by Anaconda, Inc. | (main, Mar  1 2023, 18:18:15) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
23/04/19 15:31:10 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/19 15:31:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Python version 3.10.9 (main, Mar  1 2023 18:18:15)
Spark context Web UI available at http://DESKTOP-DFA24F11:4040
Spark context available as 'sc' (master = local[*], app id = local-1681889472795).
SparkSession available as 'spark'.
>>> 23/04/19 15:31:28 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

Hadoop环境搭建。由于Spark的运行需要Hadoop作为底层支持,所以我们还需要从官网或者国内镜像下载最新的Hadoop安装包,这里现在的是hadoop-3.3.4.tar.gz,然后将其加压缩到合适的目录,假设这里为D:\hadoop。


然后我们需要配置Hadoop的环境变量,新建:

HADOOP_HOME=D:\hadoop\hadoop-3.3.4

然后将其添加到Path环境变量中:

%HADOOP_HOME%\bin
%HADOOP_HOME%\sbin

然后我们进入到D:\hadoop\hadoop-3.3.4\etc\hadoop目录下,修改hadoop-env.cmd,设置Java Home:

set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_341

我本机的Java环境在C:\Program Files\Java,因为Program Files中间有空格,按照这样填写的话Hadoop启动会报错,无法找到Java的地址,因此我们使用PROGRA~1来代替Program Files就能解决这个问题。

然后我们从github的winutils仓库中下载hadoop-3.0.0的压缩包,将其中的内容全部覆盖复制到D:\hadoop\hadoop-3.3.4\bin下面,然后将hadoop.dll拷贝到C:\Windows\System32下面,然后我们就可以验证Hadoop了:

(base)PS C:\Users\zhangxun> hadoop version
Hadoop 3.3.4
Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
Compiled by stevel on 2022-07-29T12:32Z
Compiled with protoc 3.7.1
From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
This command was run using /D:/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar

至此,Hadoop配置完成。

HelloWorld测试

我们在合适的地方新建一个文件夹,然后用vscode打开这个文件夹,后面的示例代码程序都在这个文件夹里面完成了。

from pyspark import SparkConf
from pyspark import SparkContext

if __name__ == '__main__':
    conf=SparkConf().setAppName("test1").setMaster("local")
    sc=SparkContext(conf=conf)

    rdd=sc.parallelize([1,2,3,4,5])

    print("rdd是:",rdd)
    print("rdd.collect是:",rdd.collect())

执行该程序:

>python hello.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
rdd是: ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
rdd.collect是: [1, 2, 3, 4, 5]

如此证明pyspark程序可以提交到本地的spark环境正常运行。

PySpark程序运行机制分析

在没有学习PySpark执行原理之前,很多人可能会认为PySpark编写的程序会被翻译为JVM能识别的字节码,然后提交给Spark运行,其实不是这样的。

Spark工作原理如下: Driver负责总体的调度,Executor负责具体Task的运行,它们都是运行在JVM进程之中的,而这些JVM进程则是可以部署在多种的资源管理系统中的,比如Yarn、Mesos或者是K8s等;用户提交的Spark程序交给Driver进行管理,Driver将程序分解为一个个的Task交给Executor执行。

为了不影响现有Spark的工作架构,Spark在外围包装了一层Python的API,借助Py4j实现Python和Java的交互,进而实现通过Python代码来编写Spark应用程序并提交给Spark集群运行。

在Driver端,Python通过Py4j来调用Java方法,将用户使用Python写的程序映射到JVM中,比如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象。

在Executor端,都启动一个Python守护进程,当Task收到任务请求后,交给底层的Python进程去执行。

所以,Pyspark的运行机制和我们预想的并不一样,这种方式可以不破坏现有的Spark执行架构,同时也方便多种语言支持的扩展,但是也很明显,使用PySpark运行Spark任务肯定比使用Java或者Scala要有一些额外的性能损耗。

Docker下安装PySpark

Search

    微信好友

    博士的沙漏

    Table of Contents