导图社区 Spark基础
小白必看《Spark基础》,主要从spark on YARN 调度模式、sprk关键概念、pyspark架构、python的OS模块、standalone HA高可用集群模式等多个方面作了阐述。
编辑于2021-12-04 20:52:46PySpark基础环境
Spark on YARN 调度模式
Spark on yarn 的本质
Spark on yarn 的本质
将Spark任务的pyspark文件,经过Py4J转换,提交到Yarn的JVM中去运行
Spark on YARN 的两种部署模式
两种模式的区别:Driver程序运行在哪里
Client 本地模式(测试环境中使用)
特征: 1)Driver运行在Client上的SparkSubmit进程中(客户端上) 2)应用程序运行结果会在客户端显示
流程步骤: 1)Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster; 2)随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存; 3)ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程; 4)Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数; 5)之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。
Cluster 集群模式(生产环境中使用)
特征: 1)Driver运行在ApplicationMaster中,合二为一(集群空闲节点) 2)应用的运行结果不能在客户端client显示
1)任务提交后会和ResourceManager通讯申请启动ApplicationMaster; 2)随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver; 3)Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后在合适的NodeManager上启动Executor进程; 4)Executor进程启动后会向Driver反向注册; 5)Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行;
Spark关键概念
Spark集群角色
1)Driver:是一个JVM Process 进程,编写的Spark应用程序就运行在Driver上,由Driver进程执行; 2)Master(ResourceManager):是一个JVM Process 进程,主要负责资源的调度和分配,并进行集群的监控等职责; 3)Worker(NodeManager):是一个JVM Process 进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算。 4)Executor:是一个JVM Process 进程,一个Worker(NodeManager)上可以运行多个Executor,Executor通过启动多个线程(task)来执行对RDD的partition进行并行计算,也就是执行我们对RDD定义的例如flatMap、map、reduce等算子操作。
Master URL
spark-shell 交互式编程
示例: spark-shell 可以携带参数 spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务 spark-shell --master local[*] *表示使用当前机器上所有可用的资源 默认不携带参数就是--master local[*] spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上
spark-submit 提交圆周率作业
Local 示例: ${SPARK_HOME}/bin/spark-submit \ --master local[2] \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10
Standalone 示例: ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077 \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10
spark on yarn(cluster)示例: ${SPARK_HOME}/bin/spark-submit \ --master yarn --deploy-mode cluster --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10
提交Spark Application
应用提交语法:
spark-submit [options] <app jar | python file> [app arguments]
查看帮助,灵活设置就好:
-submit --help
公共参数配置
提交运行Spark Application时,有些基本参数需要传递值,如下所示:
公共参数: - --master :Spark使用什么资源管理器来分配【CPU】和【内存】资源,后面可以跟【spark://host:7077】,【yarn】 ,【local[n]】等,企业中用的最多的是【yarn】。如果不指定默认是local[*] - --deploy-mode :只有【2】个值,【client】和【cluster】,区别是【Driver】进程运行在哪里 - --name :给Spark程序起一个名字,后期可以在WEBUI(端口是【4040】)页面上看到
动态加载Spark Applicaiton运行时的参数,通过--conf进行指定,如下使用方式:
Driver Program 参数配置
每个Spark Application运行时都有一个Driver Program,属于一个JVM Process进程,可以设置内存Memory和CPU Core核数
Driver进程相关参数: - --driver-memory 申请【Driver】进程的【内存】有大多。 - --driver-cores :仅当是以--deploy-mode 是【cluster】是才有意义。表示【Driver】进程申请几个【core】
Executor 参数配置
每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)
Executor进程相关参数: - --total-executor-cores :仅当用Spark 的【standalone】方式才有意义,表示所有的executor一共申请多少个【核core】 - --executor-cores :仅当Spark 的【standalone】方式以及【yarn】方式才有意义,表示每个executor进程申请几个【核core】。yarn默认是1个,standalone默认是Worker的全部的core。企业中一般指定为3-5个,通常是4. - --num-executors :仅当是【yarn】时才有意义,表示一共申请多少个【executor】。 - --executor-memory 申请的【每个Executor】的【内存】有多大。Executor进程中的一个core可以支配3-5G的内存,如果是3G,executor-memory 最好设为executor-cores*3 - --queue 就是用来隔离CPU和内存资源的。每个队列中都包含了指定容量的CPU和内存
官网的提交应用案例
# Run application locally on 8 cores ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[8] \ /path/to/examples.jar \ 100 # Run on a [Spark standalone cluster] in [client deploy mode] ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --deploy-mode client \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a [Spark standalone cluster] in [cluster deploy mode] with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a [YARN cluster] in [cluster deploy mode] export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # can be client for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 # Run a Python application on a [Spark standalone cluster] ./bin/spark-submit \ --master spark://207.184.161.138:7077 \ examples/src/main/python/pi.py \ 1000 # Run on a [Mesos cluster] in [cluster deploy mode] with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master mesos://207.184.161.138:7077 \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-cores 100 \ http://path/to/examples.jar \ 1000 # Run on a [Kubernetes cluster] in [cluster deploy mode] ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master k8s://xx.yy.zz.ww:443 \ --deploy-mode cluster \ --executor-memory 20G \ --num-executors 50 \ http://path/to/examples.jar \ 1000
PySpark架构
(1)在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象; (2)在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。 相关角色: master:主节点进程,在整个集群中,最多只有一个Master处于Active状态。在使用spark-shell等交互式运行或者使用官方提供的run-example实例时,Driver运行在Master节点中;若是使用spark-submit工具进行任务的提交或者IDEA等工具开发运行任务时,Driver是运行在本地客户端的。 worker:从节点进程,类似于yarn中的NodeManager,在整个集群中,可以有多个Worker(>0)。负责当前WorkerNode上的资源汇报、监督当前节点运行的Executor。并通过心跳机制来保持和Master的存活性连接。Executor受到Worker掌控,一个Worker启动Executor的个数受限于 机器中CPU核数。每个Worker节点存在一个多个CoarseGrainedExecutorBackend进程,每个进程包含一个Executor对象,该对象持有一个线程池,每个线程执行一个Task。 ---------------------------------------------------------------------------------------- 相关组件: (1)Application:指的是用户编写的Spark应用程序,包含了含有一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。 (2)Driver:运行Application的main函数并创建SparkContext,SparkContext的目的是为了准备Spark应用程序的运行环境。SparkContext负责资源的申请、任务分配和监控等。当Executor运行结束后,Driver负责关闭SparkContext; (3)Job:一个Application可以产生多个Job,其中Job由Spark Action触发产生。每个Job包含多个Task组成的并行计算。 (4)Stage:每个Job会拆分为多个Task,作为一个TaskSet,称为Stage;Stage的划分和调度是由DAGScheduler负责的。Stage分为Result Stage和Shuffle Map Stage; (5)Task:Application的运行基本单位,Executor上的工作单元。其调度和 管理由TaskScheduler负责。 (6)RDD:Spark基本计算单元,是Spark最核心的东西。表示已被分区、被序列化、不可变的、有容错机制的、能被并行操作的数据集合。 (7)DAGScheduler:根据Job构建基于Stage的DAG,划分Stage依据是RDD之间的依赖关系。 (8)TaskScheduler:将TaskSet提交给Worker运行,每个Worker运行了什么Task于此处分配。同时还负责监控、汇报任务运行情况等。
Python的OS模块
import os 1)设置系统环境变量 os.environ['环境变量名称']='环境变量值' #其中key和value均为string类型 os.putenv('环境变量名称', '环境变量值') os.environ.setdefault('环境变量名称', '环境变量值') 2)修改系统环境变量 os.environ['环境变量名称']='新环境变量值' 3)获取系统环境变量 os.environ['环境变量名称'] os.getenv('环境变量名称') os.environ.get('环境变量名称', '默认值') #默认值可给可不给,环境变量不存在返回默认值 4)删除系统环境变量 del os.environ['环境变量名称'] del(os.environ['环境变量名称']) 5)判断系统环境变量是否存在 '环境变量值' in os.environ # 存在返回 True,不存在返回 False
import os #指定环境变量,避免在yarn模式下找不到解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
Python语言开发Spark程序代码
WordCount 单词记数(案例)
PySpark代码(上图) 然后在终端提交: spark-submit --master spark://node1:7077 /export/pyworkspace/pyspark_sz27/pyspark_base/main/wordcount_standalone.py
Pycharm远程执行示意图
Standalone HA 高可用集群模式
基本原理
原理: ZooKeeper提供了一个Leader Election选举机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。 解决了什么问题? Master的单点故障问题
实现方案
运行2个或多个Master进程. 其中一个是Active状态, 正常工作. 其余的为Standby状态, 待命中, 一旦Active Master出现问题, 立刻接上.

Standalone 自带的调度模式
搭建步骤
Standalone主要组件
主节点Master
管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务
从节点Workers
管理每个机器的资源,分配对应的资源来运行Task; 每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
历史服务器HistoryServer(可选)
Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
standalone的client模式的应用执行和调度过程
standalone的cluster模式的应用执行和调度过程
常用命令
/export/server/spark/sbin/start-all.sh
sbin/start-history-server.sh
Spark Application 运行到集群时
由两部分组成
Driver Program(用户编写的数据处理逻辑): 1)相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行; 2)运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象; 3)一个SparkApplication仅有一个;
SparkContext
用户逻辑与Spark集群主要的交互接口,它会和Cluster Manager交互,包括向它申请计算资源等
Executor(线程池执行器): 1)相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所以可以认为Executor中线程数就等于CPU Core核数; 2)一个Spark Application可以有多个,可以设置个数和资源信息;
Driver Program是用户编写的数据处理逻辑,这个逻辑中包含用户创建的SparkContext。SparkContext 是用户逻辑与Spark集群主要的交互接口,它会和Cluster Manager交互,包括向它申请计算资源等。 Cluster Manager负责集群的资源管理和调度,现在支持Standalone、Apache Mesos和Hadoop的 YARN。Worker Node是集群中可以执行计算任务的节点。 Executor是在一个Worker Node上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。Task 是被送到某个Executor上的计算单元,每个应用都有各自独立的 Executor,计算最终在计算节点的 Executor中执行。
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段: 1)、用户程序创建 SparkContext 时,新创建的 SparkContext 实例会连接到 ClusterManager。 Cluster Manager 会根据用户提交时设置的 CPU 和内存等信息为本次提交分配计算资源,启动 Executor。 2)、Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后, Driver会向Executor发送 Task; 3)、Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver; 4)、Driver会根据收到的Task的运行状态来处理不同的状态更新。 Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据; 5)、Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止;
三个核心概念
Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下: 1)Task:被分配到各个 Executor 的单位工作内容,它是 Spark 中的最小执行单位,一般来说有多少个 Paritition(物理层面的概念,即分支可以理解为将数据划分成不同 部分并行处理),就会有多少个 Task,每个 Task 只会处理单一分支上的数据。 2)Job:由多个 Task 的并行计算部分,一般 Spark 中的 action 操作(如 save、collect,后面进一步说明),会生成一个 Job。 3)Stage:Job 的组成单位,一个 Job 会切分成多个 Stage,Stage 彼此之间相互依赖顺序执行,而每个 Stage 是多个 Task 的集合,类似 map 和 reduce stage。
PySpark
安装
pip install pyspark
PySpark只是Spark的一个库
单机模式命令
bin/pyspark --master local[*]

语法参数
bin/spark-submit \ --class <main-class> --class: 你的应用的启动类 --master <master-url> \ --master 指定Master的地址 --executor-memory 1G \ --指定每个executor可用内存为1G --total-executor-cores 2 \ --指定每个executor使用的cup核数为2个 --deploy-mode <deploy-mode> \ --deploy-mode: 是否发布你的驱动到worker节点 --py-files XXXX.zip \ -- .py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序,这些文件将被交付给每一个执行器来使用。 --conf <key>=<value> \ --conf: 任意的Spark配置属性, 格式key=value. ... # other options <application-jar> \ --打包好的应用jar,包含依赖. [application-arguments] --传给main()方法的参数
Local 本地模式
搭建步骤
原理
在本地使用单机多线程模拟Spark集群中的各个角色
Local[N]模式
【本质】启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
【用途】是用单机的多个线程来模拟Spark分布式计算,直接运行在本地,便于调试,通常用来验证开发出来的应用程序逻辑上有没有问题。
【语法】N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N,则默认是1个线程(该线程有1个core)
spark目录结构
bin 可执行脚本 conf 配置文件 data 示例程序使用数据 examples 示例程序 jars 依赖 jar 包 python pythonAPI sbin 集群管理命令 yarn 整合yarn需要的东东
Spark 框架概述

Spark概述
1)Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎 2)Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
Spark 对比 Hadoop
与MapReduce不同点
Spark中的Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS
Spark框架为什么这么快?
1)数据结构(编程模型):Spark框架核心 RDD:弹性分布式数据集,认为是列表List Spark 框架将要处理的数据封装到集合RDD中,调用RDD中函数处理数据类比Scala语言中实现WordCountList -> flatMap、map、groupBy、reduce RDD 数据可以放到内存中,内存不足可以放到磁盘中
2)Task任务运行方式:以线程Thread方式运行 MapReduce中Task是以进程Process方式运行,但时Spark Task以线程Thread方式运行。 线程Thread运行在进程Process中,启动和销毁是很快的(相对于进程来说)。
Spark 四大特点
速度快
1、MR会多次读写磁盘,有多次IO序列化与反序列化,耗时久 2、Spark的中间结果保存在内存,很少使用磁盘IO 3、MR的MapTask或ReduceTask是以进程存在的,而Spark的Task是以线程存在的。进程的启动和销毁比线程的要慢。

易用使用
编程的API简洁,一句代码就能搞定:加载 -> 计算 -> 输出
通用性强
一站式自带大数据所有场景的组件
运行方式多样
1)Spark应用支持多种资源管理器 比如自带的standalone,或者yarn,meso 2-可以加载访问所有主流的数据源 比如mysql,HDFS,json,csv,hive
Spark 框架模块

Spark Core
Spark核心:包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。数据结构:RDD 弹性分布式数据集
Spark SQL
Spark 用来离线操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL操作数据。数据结构:Dataset/DataFrame = RDD + Schema
Spark Streaming
Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。 数据结构:DStream = Seq[RDD],DStream离散化流
Spark MLlib
提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。 数据结构:RDD或者DataFrame
Spark GraphX
Spark中用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。数据结构:RDPG-弹性分布式属性图
Structured Streaming

Structured Streaming是建立在SparkSQL引擎之上的可伸缩和高容错的流式处理引擎,可以像操作静态数据的批量计算一样来执行流式计算。当流式数据不断的到达的过程中Spark SQL的引擎会连续不断的执行计算并更新最终结果。
Spark通信框架
Netty
Spark 运行模式

本地模式(Local Mode)
local
分配1Core CPU运行
local[k]
分配K Core CPU,同时运行K个Task
local[*]
获取物理机器CPU Core核数
集群模式(Cluster Mode)
Spark Standalone集群模式(开发测试及生成环境使用)
使用Zookeeper搭建高可用,避免Master出现单点故障
Hadoop YARN集群模式(生产环境使用)
云服务(Cloud)
Spark SQL
SparkSQL概述
Spark SQL 是什么?
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用 Hive是将SQL转为MapReduce,SparkSQL可以理解成是将SQL解析成RDD + 优化再执行
Spark SQL模块官方定义:针对结构化数据处理Spark Module模块。
1)结构化数据处理(一般指数据有固定的Schema约束) 2)抽象数据结构:DataFrame = RDD - 泛型 + Schema + 方便的SQL操作 + 优化 3)分布式SQL引擎
SparkSession 应用入口(取代了原本的SQLContext与HiveContext)
DataFrame
DataFrame 是什么?

1)DataFrame ==> RDD - 泛型 + Schema + 方便的SQL操作 + 优化 2)DataFrame是特殊的RDD 3)DataFrame是一个分布式的表
【详细】在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
DataFrame 的 Schema
Schema信息封装在StructType中,包含很多StructField对象
Row
DataFrame中每条数据封装在Row中,Row表示每行数据
构建row对象
创建 DataFrame 注:文中的spark其实是SparkSession的实例对象
RDD 转换成 DataFrame 本质:给RDD加上Schema信息
1)反射机制 df=spark.createDataFrame(Row1) 利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。 使用反射方法Schema模式,Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。
2)自定义Schema df=spark.createDataFrame(RDD1,schema1) 通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。 1)创建元组或列表的RDD 2)使用StruType方法创建schema对象 3)使用createDataFrame方法,传入RDD元组和schema对象参数,创建DataFrame
3)直接toDF df=rdd1.toDF(['name', 'age']) 注意toDF( )里面传的是容器
Pandas 构建 DataFrame 前提:有Pandas版的DataFrame
df = spark.createDataFrame(pandas_df)
读取外部文件 转化为 DataFrame
df = spark.read.json('文件路径')
df = spark.read.parquet('文件路径')
df2 = spark.read.format("csv") \ .option("sep", ";") \ #以逗号分割 .option("inferSchema", "true") \ #推断模式 .option("header", "true") \ #首行为列字段 .load("file:///export/pyfolder1/pyspark-chapter03_3.8/data/sql/resources/people.csv") df2.show()
DataFrame常用操作

DSL编程 调用DataFrame/Dataset API(函数)

类似SQL语法函数:调用Dataset中API进行数据分析,Dataset中涵盖很多函数,大致分类如下: 1)选择函数 select:选取某些列的值 2)过滤函数 filter/where:设置过滤条件,类似SQL中WHERE语句 3)分组函数 groupBy/rollup/cube:对某些字段分组,在进行聚合统计 4)聚合函数 agg:通常与分组函数连用,使用一些count、max、sum等聚合函数操作 5)排序函数 sort/orderBy:按照某写列的值进行排序(升序ASC或者降序DESC) 6)限制函数 limit:获取前几条数据,类似RDD中take函数 7)重命名函数 withColumnRenamed:将某列的名称重新命名 8)删除函数 drop:删除某些列 9)增加列函数 withColumn:当某列存在时替换值,不存在时添加此列 上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意,调用函数时,通常指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象。
单词记数 DSL编程 案例

电影评分 DSL 编程 案例

SQL编程 将DataFrame/Dataset注册为临时视图或表,编写SQL语句

第1步:注册为临时视图 df.createOrReplaceTempView('视图名') 第2步:编写SQL spark.sql('select * from 表名').show()
单词记数 SQL编程 案例

电影评分 DSL 编程 案例

Shuffle 分区数量
External DataSource
SparkSQL模块内置支持数据源
总结起来三种类型数据,也是实际开发中常用的: 1)文件格式数据 文本文件text、csv文件和json文件 2)列式存储数据 Parquet格式、ORC格式 3)数据库表 关系型数据库RDBMS:MySQL、DB2、Oracle和MSSQL、Hive仓库表
数据源与格式
1)结构化数据(Structured) 2)非结构化数据(UnStructured) 3)半结构化数据(Semi-Structured)
加载、保存数据
加载数据 read() load()
保存数据 write() save()
当将数据DataFrame保存到Hive表时,可以设置分区和分桶:
保存模式 SaveMode
1)Append 追加模式,当数据存在时,继续追加; 2)Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据; 3)ErrorIfExists 存在及报错; 4)Ignore 忽略,数据存在时不做任何操作;
jdbc 读、写
SparkSQL的窗口函数
--窗口函数案例: --创建数据集 --(1,'cc',70), create or replace temporary view tab(cid,name,score) as values (1,'zs',60), (1,'ls',70), (1,'ww',80), (1,'zl',90), (2,'aa',80), (2,'bb',90); --排序 select *, row_number() over(partition by cid order by score) as rn, row_number() over(partition by cid order by score desc) as rn2, rank() over(partition by cid order by score) as rk, dense_rank() over(partition by cid order by score) as drk, --聚合 sum(score) over(partition by cid ) as sum1, sum(score) over(partition by cid order by score) as sum1, sum(score) over(partition by cid order by score rows between 1 preceding and 1 following ) as sum1, --lag/lead lag(score,2,0) over(partition by cid order by score) lag1, lead(score,2,0) over(partition by cid order by score) lead1, --ntile ntile(2) over(partition by cid order by score) nt1, --first_value/last_value first_value(score) over(partition by cid order by score) first, first_value(score) over(partition by cid order by score desc) last1, last_value(score) over(partition by cid order by score rows between unbounded preceding and unbounded following ) last from tab;
参考 HiveSQL 思维导图
Spark自定义函数
三种类型
1)UDF(User-Defined-Function) 函数 一对一的关系,输入一个值经过函数以后输出一个值; 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法; 2)UDAF(User-Defined Aggregation Function) 聚合函数 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用; 3)UDTF(User-Defined Table-Generating Functions) 函数 一对多的关系,输入一个值输出多个值(一行变为多行); 用户自定义生成函数,有点像flatMap;
自定义注册一个UDF
udf(lambda 匿名函数,返回数据类型) udf1= udf(lambda x:x**2 , IntegerType() ) udf(lambda 有名函数,返回数据类型) udf2=udf( lambda x:square(x) , IntegerType() )
# -*- coding:utf-8 -*- # Desc:This is Code Desc import string from pyspark.sql import SparkSession import os import pandas as pd from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': # 1-创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate() # 2-创建数据集 pdf = pd.DataFrame(data={'integers': [1, 2, 3], 'floats': [-1.0, 0.6, 2.6], 'integer_arrays': [[1, 2], [3, 4], [5, 6, 8, 9]]}) df = spark.createDataFrame(pdf) # 3-定义方式1- udf(lambda 匿名函数,返回数据类型) 整数->平方 udf1= udf(lambda x:x**2 , IntegerType() ) # 4-使用udf1 df.select( '*', udf1('integers').alias('square1') ).show() def square(x):return x**2 # 5-定义方式2-udf(lambda 有名函数,返回数据类型) 整数->平方 udf2=udf( lambda x:square(x) , IntegerType() ) # 6-使用udf2 df.select( '*', udf2('integers').alias('square2') ).show() # 7 定义方式3-udf3 udf3=udf(square,IntegerType()) # 8-使用udf3 df.select( '*', udf3('integers').alias('square3') ).show() # 9 ,验证1-如果实际的结果,不是定义的返回值类型IntegerType(),则显示为null df.select( '*', udf1('integers').alias('square_integer'), udf1('floats').alias('square_float') ).show() # 10 ,验证2-如果实际的结果,不是定义的-FloatType(),则显示为null udf1_1 = udf(lambda x: x ** 2, FloatType()) df.select( '*', udf1_1('integers').alias('square_integer'), udf1_1('floats').alias('square_float') ).show() # 11-定义udf4,返回值类型是数组类型 udf4=udf( lambda arr: [x**2 for x in arr] , ArrayType( IntegerType() )) df.select( 'integers', 'floats', 'integer_arrays', udf4('integer_arrays').alias('arr2') ).show() # 12-udf的返回值类型是Tuple或混合输出类型 # 如下:有一个函数,输入一个数字,返回数字以及该数字对应字母表中的字母。 # 13-定义udf5,返回类型用自定义的schema schema=StructType().add('num',IntegerType()).add('letter',StringType()) udf5=udf( lambda x:(x,string.ascii_letters[x]), schema ) # 14-使用udf5 df.select( '*', udf5('integers').alias('tup') ).show()
用装饰器 @udf(returnType=返回数据类型)
# -*- coding:utf-8 -*- # Desc:This is Code Desc from pyspark.sql import SparkSession import os import pandas as pd from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType os.environ['SPARK_HOME'] = '/export/server/spark' PYSPARK_PYTHON = "/root/anaconda3/bin/python" # 当存在多个版本时,不指定很可能会导致出错 os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON if __name__ == '__main__': # 1-创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate() df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) #模拟数据集实现用户名字长度,用户名字转为大写,以及age年龄字段增加1岁 #2 -定义函数实现-用户名字长度 name_len_udf=udf(lambda name:len(name),IntegerType()) #3 -@装饰器方式定义函数实现-用户名字转为大写 @udf def to_upper(name): return name.upper() #4 -@装饰器方式定义函数实现-age年龄字段增加1岁 @udf(returnType=IntegerType()) def add_one(age):return age+1 #5用DSL风格计算 print('用DSL风格计算') df.select( '*', name_len_udf('name').alias('name_len'), to_upper('name').alias('upper'), add_one('age').alias('new_age') ).show() #5用SQL风格计算 #将python版的udf,注册成SQL版的函数 spark.udf.register( 'name_len_udf1' , name_len_udf) spark.udf.register( 'to_upper1' , to_upper) spark.udf.register( 'add_one1' , add_one) print('用SQL风格计算') df.createOrReplaceTempView('people') spark.sql(''' select *, name_len_udf1 (name) as name_len, to_upper1(name) as upper, add_one1(age) as new_age from people ''').show()
列表输出类型,则把注册类型换成ArrayType() udf3=udf( lambda x:square(x) , ArrayType(IntegerType()) )

Tuple或混合输出类型 udf4 = udf(lambda x: convert_ascii(x), array_schema)

SparkSQL底层如何执行
RDD 和 SparkSQL 运行时的区别
RDD 的运行流程
Spark 的运行流程
和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是直接生成计划交给集群执行, 而是经过了一个叫做 Catalyst 的优化器, 这个优化器能够自动帮助开发者优化代码
Catalyst
执行策略的两个优化方向: 1)基于规则优化/Rule Based Optimizer/RBO; 2)基于代价优化/Cost Based Optimizer/CBO;
Catalyst 的主要运行原理分为三步: 1)先对 SQL 或者 Dataset 的代码解析, 生成逻辑计划 2)然后对逻辑计划进行优化, 再生成物理计划 3)最后生成代码到集群中以 RDD 的形式运行
Catalyst, 整个 SparkSQL 的架构大致如下:
1)API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句 2)收到 SQL 语句以后, 将其交给 Catalyst, Catalyst 负责解析 SQL, 生成执行计划等 3)Catalyst 的输出应该是 RDD 的执行计划 4)最终交由集群运行
具体流程:
具体流程
第一步:解析 SQL, 并且生成 AST (抽象语法树)
第二步:在 AST 中加入元数据信息, 做这一步主要是为了一些优化, 例如 col = col 这样的条件, 下图是一个简略图, 便于理解
第三步:对已经加入元数据的 AST, 输入优化器, 进行优化, 从两种常见的优化开始, 简单介绍: 谓词下推 Predicate Pushdown, 将 Filter 这种可以减小数据集的操作下推, 放在 Scan 的位置, 这样可以减少操作时候的数据量。
列值裁剪 Column Pruning, 在谓词下推后, people 表之上的操作只用到了 id 列, 所以可以把其它列裁剪掉, 这样可以减少处理的数据量, 从而优化处理速度
第四步:上面的过程生成的 AST 其实最终还没办法直接运行, 这个 AST 叫做 逻辑计划, 结束后, 需要生成 物理计划, 从而生成 RDD 来运行
使用 explain 方法查看物理执行计划

使用 Spark WebUI 进行查看

生成逻辑物理执行计划示例
spark.sql("select * from people where age >= 20").show()
生成的逻辑和物理实行计划,右边的是依据QueryExecution的 toString 要领,获得的对应效果   源码分析: 
SparkSQL与Hive整合
原理
Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置。 执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务,简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢。 使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表。 所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据。
使用SparkSQL操作集群Hive表

分布式SQL引擎
方式1:交互式命令行: SparkSQL CLI

方式2:启动服务HiveServer2: ThriftServer JDBC/ODBC Server

PySpark Core
RDD 详解

为什么需要RDD?
因为MapReduce的迭代计算效率太慢了
MR中的迭代:  Spark中的迭代: 
RDD的定义
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD核心三大要点
不可变 immutable
分区的 partitioned
并行计算 parallel
RDD五大特性
计算函数
依赖关系
数据集怎么来的
分区列表
分区函数
最佳位置
数据集在哪,在哪计算合适,如何分区
RDD特点
分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。
只读

RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。
缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,
检查点 checkpoint
迭代太多,血缘关系太长,一旦后续迭代出错则需需按照超长的依赖关系重建,影响性能,这时候可以吧RDD存到HDFS,即checkpoint,该操作会切断血缘
RDD 的创建
并行化方式创建RDD

data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)

通过外部数据创建RDD
rdd = sc.textFile('../data/words.txt')

小文件读取创建RDD
rdd = sc.wholeTextFiles('../data/words.txt')

RDD 的算子
Transformer算子
Transformer算子
值类型 valueType
map
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>> rdd2 = rdd1.map(lambda x: x+1) >>> rdd2.collect() [2, 3, 4, 5, 6, 7, 8, 9, 10]
将func函数作用到数据集的每一个元素上,生成一个新的RDD返回。
flatmap
>>>rdd1 = sc.parallelize(["a b c","d e f","h i j"]) >>>rdd2 = rdd1.flatMap(lambda x:x.split(" ")) >>>rdd2.collect() ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
flatMap会先执行map的操作,再将所有对象合并为一个对象
groupBy
>>> x = sc.parallelize([1,2,3]) >>> y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' ) >>> print(y.mapValues(list).collect()) [('A', [1, 3]), ('B', [2])]
filter
>>>rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3) >>>rdd.filter(lambda x:x>4).collect() [5, 6, 7, 8, 9]
filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回
distinct
>>> rdd1 = sc.parallelize([1,2,3,3,3,5,5,6]) >>> rdd1.distinct().collect() [2, 6, 1, 3, 5]
对指定RDD的数据进行去重
双值类型 DoubleValueType
Union
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("c",1),("b",3)]) >>> rdd3 = rdd1.union(rdd2) >>> rdd3.collect() [('a', 1), ('b', 2), ('c', 1), ('b', 3)]
对两个RDD求并集
Intersection
>>> rdd1 = sc.parallelize([("a",1),("b",2)]) >>> rdd2 = sc.parallelize([("a",1),("b",3)]) >>> rdd1.intersection(rdd2).collect() [('a', 1)]
对两个RDD求交集
键值对类型 Key-Value
groupByKey
>>> rdd = sc.parallelize([("a",1),("b",2),("c",3),("d",4)]) >>> result = rdd.groupByKey().collect() >>> result [('d', <pyspark.resultiterable.ResultIterable object at 0x7f67c2d6f520>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f67c359cb80>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7f67c359ce20>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7f67c359c3d0>)] >>> result[1] ('b', <pyspark.resultiterable.ResultIterable object at 0x7f67c2d8e9d0>) >>> result[1][1] <pyspark.resultiterable.ResultIterable object at 0x7f67c2d8e9d0> >>> list(result[1][1]) [2]
以元组中的第0个元素作为key,进行分组,返回一个新的RDD
groupByKey之后的结果中 value是一个Iterable迭代对象
reduceByKey
>>>rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>>rdd.reduceByKey(lambda x,y:x+y).collect() [('b', 1), ('a', 2)]
将key相同的键值对,按照Function进行计算
sortByKey
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] >>> sc.parallelize(tmp).sortByKey().collect() [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] # 降序排序 >>> sc.parallelize(tmp).sortByKey(False).collect() [('d', 4), ('b', 2), ('a', 1), ('2', 5), ('1', 3)] # 多分区排序 >>> sc.parallelize(tmp).sortByKey(True, 2).glom().collect() [[('1', 3), ('2', 5), ('a', 1)], [('b', 2), ('d', 4)]] # Key处理函数排序 >>>tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5), ('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)] >>>>>> sc.parallelize(tmp2).sortByKey(True, 1).collect() [('Mary', 1), ('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('was', 8), ('white', 9), ('whose', 6)] >>> sc.parallelize(tmp2).sortByKey(True, 1, keyfunc=lambda k: k.upper()).collect() [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
根据key排序
【语法】sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
Action算子
Action算子
countByValue
>>> x = sc.parallelize([1,3,1,2,3]) >>> y = x.countByValue() >>> print(x.collect()) [1, 3, 1, 2, 3] >>> print(y) defaultdict(<class 'int'>, {1: 2, 3: 2, 2: 1}) >>> sc.parallelize([])
统计值出现的次数
collect
>>> rdd = sc.parallelize([1,3,5,2,6,7,11,9,10],3) >>> rdd.map(lambda x: x + 1).collect() [2, 4, 6, 3, 7, 8, 12, 10, 11] 
返回一个list,list中包含 RDD中的所有元素
reduce
>>>rdd1 = sc.parallelize([1,2,3,4,5]) >>>rdd1.reduce(lambda x,y : x+y) 15 reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。 
fold
>>> rdd1 = sc.parallelize([1,2,3,4,5], 3) >>> rdd1.fold(10, lambda x,y : x+y) 55 flod可以接收一个初始值, 其聚合逻辑和reduce相同 不过不同的是, 这个初始值每个分区内聚合的时候 都会存在 同时分区之间结果聚合的时候也会存在
flod可以接收一个初始值, 其聚合逻辑和reduce相同 不过不同的是, 这个初始值每个分区内聚合的时候 都会存在 同时分区之间结果聚合的时候也会存在
first
>>>sc.parallelize([2, 3, 4]).first() 2
返回RDD的第一个元素
take
>>>sc.parallelize([2, 3, 4, 5, 6]).take(2) [2, 3] >>>sc.parallelize([2, 3, 4, 5, 6]).take(10) [2, 3, 4, 5, 6] >>>sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) [91, 92, 93] #注意, 不是排序取 而是数据在加载的时候就这个顺序, 如下, 可以看到 不是排序哦 >>> sc.parallelize([5,3,1,1,6]).take(2) [5, 3]
返回RDD的前N个元素
top
>>> x = sc.parallelize([1,3,1,2,3]) >>> y = x.top(num = 3) >>> print(x.collect()) [1, 3, 1, 2, 3] >>> print(y) [3, 3, 2]
排序取前几个从大到小
count
>>>sc.parallelize([2, 3, 4]).count() 3
返回RDD中元素的个数
takeSample
>>> rdd = sc.parallelize(range(0, 10)) >>> rdd.takeSample(True, 20, 1) [0, 6, 3, 4, 3, 1, 3, 7, 3, 5, 3, 0, 0, 9, 6, 5, 7, 9, 4, 7] >>> rdd.takeSample(True, 5, 1) [8, 8, 0, 3, 6] >>> rdd.takeSample(True, 5, 1) [8, 8, 0, 3, 6] >>> rdd.takeSample(False, 5, 2) [5, 9, 3, 4, 6] >>> rdd.takeSample(False, 5, 2) [5, 9, 3, 4, 6]
takeSample(参数1: True or False, 参数2:采样数, 参数3:随机数种子)
takeOrdered
>>> rdd = sc.parallelize([7,9,3,5,1,10,13,6]) >>> rdd.takeOrdered(3) [1, 3, 5] >>> rdd.takeOrdered(5) [1, 3, 5, 6, 7] # 降序 >>> rdd.takeOrdered(5, key=lambda x:-x) [13, 10, 9, 7, 6]
使用自然顺序或自定义比较器返回 RDD 的前 n 个元素。
Executor都会将执行的结果统一发送回Driver
foreach
>>> words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark and python"] ) >>> def f(x): print(x) >>> fore = words.foreach(f) 结果截图: 
仅返回满足foreach内函数条件元素。
saveAsTextFile
>>> data = sc.parallelize([1,2,3], 2) >>> data.saveAsTextFile("hdfs://node1:8020/output/file1")  # 注意, 有几个分区就保存几个文件哦 # 注意, 如果使用集群模式不建议保存到本地, 因为文件会零散存入各个Executor所在的服务器, 并且极有可能失败. # Local模式下可以保存为本地, 写入本地使用file:// 协议 >>> sc.parallelize([1,2,3]).saveAsTextFile("file:///root/aaa") # 集群模式下, 建议写入HDFS
将数据保存为文本文件, 可以保存到本地也可以保存到HDFS
不会统一发送结果回Driver
重要算子
分区操作算子
mapPartitions
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9], 3) rdd1.glom().collect() [[1, 2, 3], [4, 5, 6], [7, 8, 9]] def it(iter): for i in iter: yield i + 1 rdd1.mapPartitions(it).collect() [2, 3, 4, 5, 6, 7, 8, 9, 10] 
一次处理一整个分区的数据
foreachPartition
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9], 1) >>> rdd1.foreachPartition(lambda iter: print(sum(iter))) 45
一次处理一整个分区的数据
重分区算子
repartition
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] >>> len(rdd.repartition(2).glom().collect()) 2 >>> len(rdd.repartition(10).glom().collect()) 10 >>> rdd.glom().collect() [[1], [2, 3], [4, 5], [6, 7]]
增加分区数,会产生shuffle
coalesce
>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() [[1], [2, 3], [4, 5]] >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] # 不会增加分区 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(4).glom().collect() [[1], [2, 3], [4, 5]] # 增加True选项才会增加分区 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(4,True).glom().collect() [[4, 5], [2, 3], [], [1]]
减少分区数,不会产生shuffle
partitionBy
def par(k): if k == 'hadoop': return 1 if k == 'spark': return 0 if k == 'flink': return 2 return 0 print(sc.parallelize([('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)]).partitionBy(3, partitionFunc=par).glom().collect()) 结果截图: 
根据key的映射值,自动放进对应下标的分区
聚合算子
reduce(上面有)
fold(上面有)
KeyValue型map - mapValues算子
# 将所有的value都+10 >>> rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)]) >>> rdd.mapValues(lambda v: v + 10).collect() [('hadoop', 11), ('spark', 11), ('flink', 11), ('spark', 11)]
mapValues算子可以针对 KV型(二元元组)数据进行map map逻辑仅针对value
KeyValue型聚合
groupByKey
rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)]) rdd.groupByKey().collect() [('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2e41ef11f0>), ('flink', <pyspark.resultiterable.ResultIterable object at 0x7f2e41ef1190>), ('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2e41ef1820>)] # 结果和groupBy是一样的, 只是自动按照key来分组了 比较方便 # 如果要求value的和, 还要继续写 >>> rdd.groupByKey().mapValues(lambda x:sum(list(x))).collect() [('flink', 1), ('spark', 2), ('hadoop', 1)]
普通group需要手动给出分组条件, groupByKey自动按照key来进行分组
reduceByKey
rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)]) rdd.reduceByKey(lambda a, b: a + b).collect() [('hadoop', 1), ('flink', 1), ('spark', 2)]
自动按照key进行分组, 同时可以完成value的聚合计算
foldByKey
>>> rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1), ('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)], 3) >>> rdd.glom().collect() [[('hadoop', 1), ('spark', 1)], [('flink', 1), ('spark', 1)], [('hadoop', 1), ('spark', 1), ('flink', 1), ('spark', 1)]] >>> rdd.foldByKey(10, lambda a, b: a + b).collect() [('hadoop', 22), ('spark', 34), ('flink', 22)]
同普通的fold基本相同, 就是会按照key进行分组 在组内进行: 1)分区内带初始值聚合 2)分区间不带初始值聚合
aggregateByKey
>>> from operator import add >>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1), ('b', 1), ('a', 1)], 2) >>> rdd.glom().collect() [[('a', 1), ('b', 1)], [('a', 1), ('b', 1), ('a', 1)]] >>> rdd.aggregateByKey(0, add, add).collect() [('b', 2), ('a', 3)] >>> rdd.aggregateByKey(5, add, add).collect() [('b', 12), ('a', 13)]
aggregateByKey 可以按照key分组, 并自定义: 1)分区内聚合逻辑, 并可以带有初始值 2)分区间聚合逻辑, 不带初始值
Join算子
(说明)当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。
内连接 join
>>> x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) >>> y = sc.parallelize([(1001, "sales"), (1002, "tech")]) >>> x.join(y).collect() [(1002, ('lisi', 'tech')), (1001, ('zhangsan', 'sales'))]
左外连接 leftOuterJoin
>>> x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) >>> y = sc.parallelize([(1001, "sales"), (1002, "tech")]) >>> x.leftOuterJoin(y).collect() [(1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None)), (1001, ('zhangsan', 'sales'))]
右外连接rightOuterJoin
>>> x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) >>> y = sc.parallelize([(1001, "sales"), (1002, "tech")]) >>> x.rightOuterJoin(y).collect() [(1002, ('lisi', 'tech')), (1001, ('zhangsan', 'sales'))]
RDD 缓存(持久化)
缓存函数
缓存级别
实际项目中缓存数据时,往往选择如下两种级别:
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
释放缓存
此函数属于eager,立即执行
RDD Checkpoint 检查点机制
检查点机制案例

持久化 和 Checkpoint 的区别
存储位置
1) Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存); 2)Checkpoint 可以保存数据到本地硬盘以及HDFS这类可靠的存储上;
生命周期
1)Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法; 2)Checkpoint的RDD在程序结束后依然存在,不会被删除;程序在失败后重试的时候可以再次读取
Lineage(血统、依赖链、依赖关系)
1)Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来; 2)Checkpoint会斩断依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链;
Spark 案例练习
针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析

共享变量
广播变量 Broadcast Variables

【原理】广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。
【优点】广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。
累加器 Accumulators
累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);
三种类型的Accumulator
累加整数型 LongAccumulator
累加浮点型 DoubleAccumulator
累加集合元素 CollectionAccumulator
示例代码
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) >>> accum.value 10
累加器注意事项

PySpark累加器和广播变量案例演示

Spark 内核调度
Spark on yarn 的Cluster调度执行过程
Spark Scheduler 调度器
作用是任务调度:如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。
DAG有向无环图 和 Stage阶段
什么是DAG有向无环图?
1)如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图) 2)每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)
依赖类型
窄依赖
RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage
宽依赖
由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分
Stage阶段
Stage简介:把DAG划分成互相依赖的多个Stage,划分依据是RDD之间的宽依赖,Stage是由一组并行的Task组成。 Stage切割规则:从后往前,遇到宽依赖就切割Stage。 Stage计算模式:pipeline管道计算模式,一个task处理一串分区的数据,整个计算逻辑全部走完
pipeline管道计算模式
为什么要划分Stage? 1)方便并行计算 2)一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行。 Pipeline:HDFS----textRDD----splitRDD-----tupleRDD 如何划分DAG的stage? 1)窄依赖不划分,放在同一个Stage阶段上,形成pipeline流水线,方便并行执行 2)宽依赖要划分,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage
WordCount 中的 RDD
RDD的五大特性: 1)分区列表 分区可提高并行度 2)计算函数 用函数来操作各个分区中的数据 3)依赖列表 子RDD依赖父RDD 4)分区器(可选) 键值RDDs的分区器(例如,reduceByKey中的默认的Hash分区器) 5)最佳位置(可选) 计算每个分区的首选位置列表/最佳位置(例如HDFS文件),移动计算比移动数据更划算!
一个JOB一个DAG
DAG有向无环图: 1)黑色圆圈表示一个RDD 上图中有5个黑色圆圈,说明整个Job中有个5个RDD 【1号】RDD类型:HadoopRDD,从HDFS或LocalFS读取文件数据; 【2号、3号和4号】RDD类型:MapPartitionsRDD,从一个RDD转换而来,没有经过shuffle操作; 【5号】RDD类型:ShuffledRDD,从一个RDD转换而来,经过Shuffle重分区操作,Spark Shuffle类似MapReduce流程中Map Phase和Reduce Phase中的Shuffle; 2)浅蓝色矩形框表示调用RDD函数 上图中【5号】RDD所在在蓝色矩形框上的函数【reduceByKey】,表明【5号】RDD是【4号】RDD调用reduceByKey函数得到; 3)查看ShuffleRDD源码,实现RDD的5个特性(右图) ----------------------------------------------------------------------- RDD 设计的一个重要优势是能够记录 RDD 间的依赖关系,即所谓血统(lineage)。 通过丰富的转移操作(Transformation),可以构建一个复杂的有向无环图,并通过这 个图来一步步进行计算。
RDD 依赖
RDD的容错机制
RDD 的容错机制是通过将 RDD 间转移操作构建成有向无环图来实现。从抽象的角度看,RDD 间存在着血统继承关系,其本质上是 RDD之间的依赖(Dependency)关系。
为什么要设计宽窄依赖?
1)窄依赖 · 方便Spark进行并行计算 · 如果有一个分区数据丢失,只需要从父RDD的对应个分区重新计算即可,不需要重新计算整个任务,提高容错 2)宽依赖 划分Stage阶段的依据,产生Shuffle。 3)构建Lineage血缘关系 将创建RDD的元数据信息和转换行为作为Lineage(血统)记录下来,以便丢失分区数据时重新运算进行恢复
区分宽窄依赖
区分: 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖; 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖,涉及Shuffle;
窄依赖(Narrow Dependency)
【定义】父 RDD 与子 RDD 间的分区是一对一的。父RDD中的分区内数据不能被分割,只能被子RDD中的一个分区整个利用
【注意】下图的map和union操作都是窄依赖。join操作比较特殊,可能同时存在宽、窄依赖
宽依赖 - Shuffle依赖(Wide Dependency)
【定义】Shuffle 有“洗牌、搅乱”的意思,Shuffle依赖即打乱原 RDD 结构的操作。即父 RDD 中的分区可能会被多个子 RDD 分区使用。
Spark Shuffle 洗牌
MapReduce框架的Shuffle过程
Spark的Shuffle简介
Spark的Shuffle简介: Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。 Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
Spark的Shuffle分为Write和Read两个阶段,隶属不同的Stage: 1)ShuffleWrite是Parent Stage的最后一步 2)ShuffleRead是Child Stage的第一步
执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。
Hash Shuffle Manager(Spark1.1以前)
【弊端】每个Task进行shuffle操作后,产生了大量的临时磁盘文件,从而因大量的磁盘IO操作而影响了性能
Shuffle阶段划分: shuffle write:mapper阶段,上一个stage得到最后的结果写出 shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
1. 未经优化的hashShuffleManager
未经优化的hashShuffleManager: HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下: 根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。 未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。
未经优化: 上游的task数量:m 下游的task数量:n 上游的executor数量:k (m>=k) 总共的磁盘文件:m*n
2. 优化后的hashShuffleManager
优化后的hashShuffleManager: 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup磁盘文件的数量与下游stage的task数量是相同的。 这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
优化之后的: 上游的task数量:m 下游的task数量:n 上游的executor数量:k (m>=k) 总共的磁盘文件:k*n
Soft Shuffle Manager(Spark1.2之后)
【优化】每个shuffle write task进行shuffle操作后,虽然也会产生较多的临时磁盘文件,但最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个shuffle write task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
Soft Shuffle 的两种运行机制
1)普通机制
SortShuffle的普通机制
SortShuffle的普通机制: 1)定义数据结构 该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。 2)申请内存 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 注意:shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。如果申请到了就不需要溢写,否则会发生溢写。 3)排序 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。 4)溢写 排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。 5)merge 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。 SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
2)bypass机制
SortShuffle的bypass机制
SortShuffle 的 bypass运行机制: bypass触发条件: 1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。 2)不是聚合类的shuffle算子(比如reduceByKey)。 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。 bypass与普通SortShuffleManager运行机制的不同在于: 第一,磁盘写机制不同; 第二,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。 总结:SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。
当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。(比如groupByKey)
Shuffle的配置选项
下面列出Shuffle类不同分类算子: 去重 def distinct() def distinct(numPartitions: Int) 聚合 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 排序 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 重分区 def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null) 集合或者表操作 def intersection(other: RDD[T]): RDD[T] def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
spark 的shuffle调优: 主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等 spark.shuffle.file.buffer 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M) 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.shuffle.io.maxRetries shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次) spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s) 调优建议:一般的调优都是将重试次数调高,不调整时间间隔。 spark.shuffle.memoryFraction 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。 spark.shuffle.manager 参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项: Hash:spark1.x版本的默认值,HashShuffleManager Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制 spark.shuffle.sort.bypassMergeThreshold 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
Job 调度流程
图解: 当启动Spark Application的时候,运行MAIN函数,首先创建SparkContext对象(构建DAGScheduler和TaskScheduler)。 1)DAGScheduler实例对象 将每个Job的DAG图划分为Stage,依据RDD之间依赖为宽依赖(产生Shuffle) 2)TaskScheduler实例对象 调度每个Stage中所有Task:TaskSet,发送到Executor上执行
当RDD调用Action函数(比如count、saveTextFile或foreachPartition)时,触发一个Job执行,调度中流程如下图所示:
图解: Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。 1)DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。 2)TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。 Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。
一个Spark应用程序包括Job、Stage及Task: 1)Job是以Action方法为界,遇到一个Action方法则触发一个Job; 2)Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分; 3)Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
Spark 基本概念
官方文档:http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary Application:应用,指的是用户编写的Spark应用程序/代码,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码; Driver:驱动,Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等; Cluster Manager:集群管理器,指的是在集群上获取资源的外部服务,Standalone模式下由Master负责,Yarn模式下ResourceManager负责; Executor:执行器/线程池,运行在Worker Node中的JVM进程,负责执行task任务,并为应用程序存储数据,是执行分区计算任务的进程; RDD:Resilient Distributed Dataset弹性分布式数据集,是分布式内存的一个抽象概念; DAG:Directed Acyclic Graph有向无环图,反映RDD之间的依赖关系和执行流程; Job:作业,按照DAG执行就是一个作业,Job==DAG; Stage:阶段,是作业的基本调度单位,同一个Stage中的Task可以并行执行,多个Task组成TaskSet任务集; Task:任务,运行在Executor上的工作单元,1个Task计算1个分区,包括pipline上的一系列操作; TaskSet:任务集,就是同一个Stage中的task们组成的集合
Spark 并行度
资源并行度与数据并行度: 资源的并行度(运行时无法改变) 指的就是提交时的申请的总core数。 如果提交到的是local[k]: 资源的并行度就是【k】 如果提交到的是standalone :资源的并行度就是 【--total-executor-cores】 如果提交到的是yarn:资源的并行度就是:【--num-executors * --executor-cores】 数据的并行度(运行时可以手动改变) task的个数,即partition个数 task又分为map时的task和reduce(shuffle)时的task; task的数目和很多因素有关: 申请的资源的总core数, (如果是SparkCore)spark.default.parallelism参数, (如果是SparkSQL)spark.sql.shuffle.partitions参数, 读取数据源的类型, shuffle方法的第二个参数, 如reduceByKey(lambda, num ) repartition的数目等等。rdd.repartition(num)
设置Task数量
最好将RDD的分区数设置为:资源的并行度*(2~3倍)
partition个数 = task个数
设置Application的并行度
SparkConf.set("spark.defalut.parallelism", 4)
案例
注意: 若每个executor 只分配到2个task,那每个executor 剩下的一个cpu core 就浪费掉了!
注意: 就算executor分配充裕了,但还要注意task数量要足够多,否则无法充分利用并行来节约集群资源(建议把Application总CPU Core数量的2~3倍)