导图社区 Spark内核部署、架构、任务调度知识框架笔记
Spark内核部署、架构、任务调度知识框架笔记,包括Spark内核概述、Spark部署模式、Spark通讯架构、Spark任务调度机制、SparkShfffle、Spark内存管理(Executor)、Spark核心组件解析。
编辑于2022-11-04 10:55:54 广东Spark内核部署、架构、任务调度知识框架笔记
Spark内核概述
概述
Spark核心组件的运行机制
Spark任务调度机制
Spark内存管理机制
Spark核心功能的运行原理
Spark核心组件
Driver
将用户程序转化为作业(job);
在Executor之间调度任务(task);
跟踪Executor的执行情况;
通过UI展示查询运行情况;
Executor
负责运行组成Spark应用的任务,并将结果返回给驱动器进程;
它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算
Spark通用运行流程
图示
流程
任务提交后,先启动Driver进程
Driver进程向集群管理器(ResourceManager)注册应用程序(Application)
集群管理器根据此任务的配置文件分配Executor并启动
Driver所需的资源全部满足后,Driver开始执行main函数
Spark查询为懒执行,当执行到action算子时开始反向推算,根据宽依赖进行stage的划分
每一个stage对应一个taskset,taskset中有多个task,根据本地化原则,task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况
Spark部署模式
Standalone模式
YARN模式
Yarn Client模式
图示
流程
Yarn Cluster模式
图示
流程
Spark通讯架构
Spark通信架构概述
Netty通讯框架
图示
Endpoint(Client/Master/Worker)
1个InBox
N个OutBox
N取决于当前Endpoint与多少其他的Endpoint进行通信
基于Actor模型
图示
Spark通讯架构解析
名词解释
RpcEndpoint
针对每个节点(Client/Master/Worker)都称之为一个Rpc端点,实现RpcEndpoint接口
RpcEnv
RPC上下文环境
Dispatcher
消息分发器
对接收和发送的消息分发到对应的指令收件箱/发件箱
Inbox
指令消息收件箱
存入消息时,都将对应EndpointData加入内部ReceiverQueue
Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费
RpcEndpointRef
对远程RpcEndpoint的一个引用
需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息
OutBox
指令消息发件箱
当消息放入Outbox后,紧接着通过TransportClient将消息发送出去
消息放入发件箱以及发送过程是在同一个线程中进行
RpcAddress
表示远程的RpcEndpointRef的地址,Host + Port
TransportClient
Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer
TransportServer
Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱
流程图
Spark任务调度机制
Spark任务提交流程
图示
流程
Client向RM申请启动Application,检查是否有足够资源满足需求,满足则准备AM上下文
提交AM上下文,启动AM
启动Driver后台线程
启动后通过本地的RPC连接Driver
向RM申请Container资源运行Executor进程
返回后在对应NodeManager的Container上启动Executor
Executor进程起来后,会向Driver反向注册
注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver
Spark任务调度概述
概述
Job是以Action方法为界,遇到一个Action方法则触发一个Job
Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分
Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task
Spark任务调度分两路
Stage调度
Task调度
图示
模块交互过程
Job提交和Task拆分
解释说明
Transaction形成了RDD血缘关系图(DAG),最后通过Action的调用,触发Job并调度执行
DAGScheduler负责Stage级的调度,将Job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler
TaskScheduler负责Task级的调度
将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行
调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统
解说
Driver初始化SparkContext过程中
会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver
SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。
Spark Stage级调度
DAGScheduler
Spark任务调度是从DAG切割开始,主要由DAGScheduler完成,遇到Action时触发Job计算,交由DAGScheduler提交
Stage由宽依赖进行划分
从父Stage提交再到子Stage,是一个回溯过程
将Task信息序列化并打包成TaskSet交给TaskScheduler
Spark Task级调度
概述
Spark Task的调度是由TaskScheduler来完成
DAGScheduler将Stage打包到TaskSet交给TaskScheduler
TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中
TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务
SchedulerBackend接受Executor的注册信息,维护Executor的状态,定期询问TaskScheduler运行任务
TaskScheduler的调度策略
FIFO
FAIR
图示
解释
首先对子Pool进行排序,再对子Pool里的TaskManager进行排序,基于Schedulable特质使用相同的排序算法
排序算法
runningTasks
若A(runningTask > minShare),B(runningTasks < minShare),则B先于A
若A,B(runningTask < minShare),则比较runningTasks/minShare,小者优先
minShare
若A,B(runningTask > minShare),则runningTasks/weight,小者优先
如果上述比较均相等,则比较名字
weight
本地化调度
PROCESS_LOCAL
进程本地化,task和数据在同一个Executor中,性能最好
NODE_LOCAL
节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输
RACK_LOCAL
机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输
NO_PREF
对于task来说,从哪里获取都一样,没有好坏之分
ANY
task和数据可以在集群的任何地方,而且不在一个机架中,性能最差
失败重试与黑名单机制
失败重试
SchedulerBackend -> TaskScheduler -> TaskManage 记录失败次数
黑名单机制
记录ExecutorId和Host,避免再一次进入同一节点
SparkShuffle
Shuffle的核心要点
ShuffleMapStage与ResultStage
最后一个Stage被称为finalStage,本质上是一个ResultStage对象,对应action算子
在最后一个阶段前的Stage统称为ShuffleMapStage,这类Stage的结束伴随着shuffle文件的写磁盘
Shuffle中的任务个数
map端从HDFS中读取数据,初始的RDD分区个数由该文件的split个数决定
reduce端的stage默认取spark.default.parallelism作为分区数,若无配置,则以map端最后一个RDD的分区数作为分区数
reduce端的数据读取
map task执行后会将计算状态和磁盘小文件位置等信息封装到MapStatus对象中
reduce task会请求此小文件的位置信息,进行数据的拉取
HashShuffle解析
未经优化的HashShuffleManager
会创造过多的磁盘文件,产生过多小文件
图示
优化后的HashShuffleManager
将同key小文件合并到相同的磁盘文件中,减少了磁盘文件的数量
图示
SortShuffle解析
普通运行机制
先在内存内写入,阈值溢写到磁盘,根据key进行排序,最终merge,单独写一份index文件
图示
bypass运行机制
只做一个最终合并,不进行排序
图示
Spark内存管理(Executor)
堆內和堆外内存规划
概述
Executor的内存管理建立在JVM的内存管理上
引入堆内内存和堆外内存的概念
堆内内存受到JVM统一管理,堆外内存直接向操作系统进行内存的申请和释放
堆内内存
大小
–executor-memory
spark.executor.memory
内存分类
Storage内存
缓存RDD数据
缓存广播(Broadcast)变量
Execution内存
执行Shuffle时占用的内存为执行内存
内存存储释放流程
申请内存流程
Spark 在代码中 new 一个对象实例
JVM从堆内内存分配空间,创建对象并返回对象引用
从堆内内存分配空间,创建对象并返回对象引用
释放内存流程
从堆内内存分配空间,创建对象并返回对象引用
等待JVM的垃圾回收机制释放该对象占用的堆内内存。
Spark中非序列化的对象会导致内存计算不准确,进而导致OOM状态
堆外内存
概述
使Spark可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据
使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据
堆外内存与堆内内存对比
JVM对于内存的清理是无法准确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差
默认不启用
配置 spark.memory.offHeap.enabled 参数启用
spark.memory.offHeap.size 参数设定堆外空间的大小
内存空间分配
静态内存管理
堆内内存
图示
60%Storage/20%Execution/20%Other
都有预留空间,防止OOM
堆外内存
图示
统一内存管理
存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域
堆内内存
堆外内存
动态占用机制
设定基本的存储内存和执行内存区域
spark.storage.storageFraction
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间
执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素
存储内存管理
RDD的持久化机制
解释
由Spark的Storage负责,实现了RDD与物理存储的解耦
主从式架构
Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来
Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave
Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block
持久化存储级别
分类
MEMORY_ONLY
以非序列化的Java对象的方式持久化在JVM内存中
没有持久化的partition就会在下一次需要使用它们的时候,重新被计算
MEMORY_AND_DISK
当某些partition无法存储在内存中时,会持久化到磁盘中
MEMORY_ONLY_SER
使用Java序列化方式,将Java对象序列化后进行持久化
MEMORY_AND_DISK_SER
DISK_ONLY
非序列化Java对象方式存储到磁盘上
MEMORY_ONLY_2
副本数量为2
MEMORY_AND_DISK_2
三维度
存储位置
存储形式
副本数量
图示
RDD的缓存过程
Partition中的数据经过Iterator来访问,获取Record(数据项),实际占用了other部分的空间
RDD 在缓存到存储内存之后,Partition (不连续)被转换成 Block(连续),Record 在堆内或堆外存储内存中占用一块连续的空间,这个过程叫(Unroll-展开)
因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行
淘汰与落盘
新Block需要缓存而剩余空间不足时就需要淘汰(Eviction),若能落盘(Drop)则落盘,否则删除
执行内存管理
Shuffle Write
在map段使用普通的排序方法,主要占用堆内执行空间
在map段使用Tungsten排序方式,可以占用堆外或堆内内存
Shuffle Read
reduce端聚合时占用堆内内存
最终结果排序时需要占用堆内执行空间
Tungsten
建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外
每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址
Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页
Spark核心组件解析
BlockManager数据存储与管理
解释
Driver和Executor的所有数据都由对应的BlockManager进行管理
Driver-BlockManagerMaster(HDFS-Namenode)
Executor-BlockManager
DiskStore
MemoryStore
BlockTransferService
负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写
图示
Spark共享变量底层实现
概述
Broadcast Variable(广播变量)
Accumulator(累加变量)
广播变量
Spark的Broadcast Variable是只读的,并且每个Executor上只会有一个副本
当执行到需要广播变量时才会请求广播变量
图示
累加器
Accumulator是存在于Driver端的,集群上运行的task进行Accumulator的累加,随后把值发到Driver端,在Driver端汇总
task只能对Accumulator进行累加操作,不能读取它的值
图示