导图社区 spark面试知识点
整理了下spark知识脑图,有需要的童鞋可以看看,干货满满
编辑于2019-12-15 06:40:53spark
任务调度
资源调度流程(yarn-client)
1、通过spark-submit提交任务
2、本地启动driver
3、Driver发请求给RM--启动AM
4、RM分配资源启动AM
5、AM向RM申请资源启动Excutor
6、AM分配资源启动Executor
7、Excutor反向注册给driver
8、开始任务调度(action算子触发)
任务调度流程
1、遇到action算子,触发任务,开始调度
2、构建DAG有向无环图
3、DAG调度器根据宽窄依赖切分stage
4、将stage以taskSet的形式发送taskScheduler
5、Task调度器根据本地化算法将task发送到Excutor中执行
6、task调度器接收task执行情况
如果task失败,task调度器负责重试,默认重试3次,如果失败原因为shuffle file 农田found 任务调度器不再重试,由DAG调度器重试上一个stage
DAG调度器默认重试stage4次
调度器
DAG调度器
~~~基于Stage构建DAG,决定每个任务的最佳位置
记录哪个RDD或者Stage输出被物化
将taskset传给底层调度器TaskScheduler
重新提交shuffle输出丢失的stage
Task调度器
~~~~负责发送task到excutor中执行
提交taskset(一组并行task)到集群运行并汇报结果
出现shuffle输出lost要报告fetchfailed错误
碰到straggle任务需要放到别的节点上重试
为每一个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)
资源调度粒度
粗粒度
一次性申请所有资源,后期不申请,所有task执行完才释放资源,如spark
执行速度快,不能充分利用资源
细粒度
每个task执行需要单独申请资源,task结束释放,如mr
充分利用资源
角色
Application
基于Spark的应用程序,包括driver程序和集群上的executor
DriverProgram
运行main函数并且新建SparkContext的程序
ClusterManager
在集群上获取资源的外部服务(如standalone,Mesos,Yarn)
WorkerNode
集群中任何可以运行应用代码的节点
Executor
WorkNode上为某个应用启动的一个进程,负责运行任务,并且负责将数据存在内存或磁盘上。每个应用有各自独立的executors
Task
被送到executor上的执行单元
job
包含很多任务的并行计算的task,可以看作和Spark的action对应,每个action都会触发一个job任务
stage
一个job会被拆分很多组任务,每组任务被成为stage(就像MapReduce分map任务和reduce任务一样)
BlockManager
每个excutor中都有一个块管理器
管理Excutor中的数据
rdd缓存数据
累加器和广播变量
shuffle文件
组件
ConnectionManager
和其他Excutor中的BlockManager创建连接
BlockTransferService
拉取数据
MemoryStore
管理内存里面的数据
DiskStore
管理磁盘上的数据
driver端的叫BlockManagerMaster
shuffle
算子
reduceByKey
groupByKey
sortByKey
distinct
join
shuffle write
上一个stage的每个map task就必须保证将自己处理的当前分区中的数据相同的key写入同一分区文件中。
shuffle read
reduce task会从上一个stage的所有task所在的机器上寻找属于自己的分区文件,这样可保证每一个key所对应的value都会汇聚到同一节点
产生问题
oom,读写文件缓存过多
每个task端都会产生多个文件,产生小文件过多,耗时低效的I/O操作
shuffle manager
HashShuffleManager
1、一个executor会执行多个task,task输出到全局文件(executor全局)
2、reduce端从每个executor拉取数据,而不是从每个task进行拉取
SortShuffleManager
普通运行机制
map端缓冲区达到阈值进行溢写时会对数据进行分区排序
byPass运行机制
触发条件:shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值
map端缓冲区达到阈值进行溢写时会对数据进行分区不会进行排序
shuffle流程
1、maptask执行,产生磁盘小文件(以mapstatus形式保存),发送给DAGScheduler,DAGScheduler把小文件地址保存在MapOutputTreaker(Driver端)里
2、reduceTask执行前向Driver端的MapOutputTreaker请求磁盘小文件地址
3、reduceTask所在的Executor中的blockmanager会根据地址向MapTask所在的Executor中的blockmanager拉取数据(BlockManager中的ConnectionManager建立通信 => BlockManager中的BlockTransFerservice负责拉取数据)
4、万一shuffle file拉取过程失败,默认重试三次,每次重试5s,如果依然失败则报shuffle file not found,task执行失败,TaskScheduler不负责重试,由DAGScheduler负责重试stage
运行模式
local
standalone
yarn
yarn-client
本地启动driver,spark执行日志会本地打印,方便调试。可能导致网络I/O打满
Driver在本地启动,资源调度由AM负责
yarn-cluster
在集群中启动driver(AM)资源调度和任务调度都有driver负责
本地不会打印详细日志,线上使用
Driver在集群启动 资源调度和任务调度都是由AM负责
优化
文件写入缓冲区大小调整
spark.shuffle.file.buffer 默认值:32k 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
shuffle调优
reduce拉去数据缓冲区大小调整
spark.reducer.maxSizeInFlight 默认值:48m 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
reduce oom
reduce task去map拉数据,reduce 一边拉数据一边聚合 reduce段有一块聚合内存(executor memory * 0.2) 解决办法:1、增加reduce 聚合的内存的比例 设置spark.shuffle.memoryFraction 2、 增加executor memory的大小 --executor-memory 5G 3、减少reduce task每次拉取的数据量 设置spark.reducer.maxSizeInFlight 24m
对于耗时shuffle拉取数据
修改最大重试次数
spark.shuffle.io.maxRetries 默认值:3 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。 shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage
修改重试时间
spark.shuffle.io.retryWait 默认值:5s 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性
实时流任务来不及处理,导致任务积累
增加资源
优化代码
本地化级别
进程本地化PROCESS_LOCAL
数据位于同一进程
节点本地化NODE_LOCAL
数据位于同一节点的不同进程
NODE_PREF
如spark连接数据库
机架本地化RACK_LOCAL
数据位于同一机架的不同节点
所有ANY
数据倾斜
双重聚合
第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
使用Hive ETL预处理数据
过滤少数导致数据倾斜的key
提高shuffle操作的并行度
reduce join转为map join
采样key并拆分join操作
使用随机前缀和扩容RDD进行join
调节Executor堆外内存
Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外 内存(netty是零拷贝),所以使用了堆外内存。
调节场景
shuffle file cannot find (DAGScheduler,resubmitting task)
executor lost
task lost
out of memory
Executor由于内存不足或者对外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数 据,Executor并没有挂掉,而是在拉取数据的过程出现了问题
解决办法
在spark-submit时添加--config
yarn下
conf spark.yarn.executor.memoryOverhead=2048 单位M
standlone下
conf spark.executor.memoryOverhead=2048单位M
堆外内存上限默认是每一个executor的内存大小的10%
组件
Spark Core
包含用于任务调度,故障恢复,与存储系统和内存管理交互的组件
Spark Streaming
支持流数据的可伸缩和容错处理。
接受小批量数据并对数据执行RDD转换。
MLlib
一个机器学习库,包含各种机器学习算法。
它包括相关性和假设检验,分类和回归,聚类和主成分分析。
Spark SQL
它允许通过SQL(结构化查询语言)以及SQL的Apache Hive变体(称为HQL(Hive查询语言))查询数据。
它支持JDBC和ODBC连接,这些连接建立Java对象与现有数据库,数据仓库和商业智能工具之间的关系。
图计算
为什么比mr快
spark会将数据尽量放到内存中进行计算
粗粒度资源调度
DAG有向无环图
可复用RDD,对RDD进行缓存
sql
DataFrame
概念
DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表 格,除了数据以外,还掌握数据的结构信息,即schema。
与RDD的区别
RDD不支持sparksql操作
DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值
支持一些特别方便的保存方式,比如保存成csv,可以带上表头
DF是经过封装的RDD
与RDD的共性
都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
都有惰性机制,下游有action算子才会执行
根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
都有partition的概念
代码使用
在代码中通过sqlContext编写sql 上线运行
spark shell (repl)测试使用
spark-sql 直接写sql
整合hive,使用hive元数据
1、开启hive元数据服务
2、将hive-site.xml文件赋值到spark conf目录下
3、将mysql的驱动jar复制到spark lib目录下
streaming
微批处理
经过一段时间过来的数据会被封装为一个RDD
经过一段时间会进行一次任务调度
DataStream
底层为RDD,每隔一段时间封装一个RDD
有状态算子
updateStateByKey
窗口操作
reduceByKeyAndWindow
滑动窗口
滚动窗口
有界流&无界流
有界流
数据大小固定,有开始有结束
无界流
数据有开始无结束
HA高可用
spark stream24小时运行,driver存在单点故障
实现方法
1、在代码里面创建StreamingContext
2、StreamContext.getOrCreate(checkpointPath,createContext)
3、提交任务加上 --supervise(如果Driver挂了,yarn会再启动一个driver)
kafka
消息队列
作用
缓冲减压
异步通信
生产者
javaAPI打入数据
如flume
发送消息
生产者自己决定哪个分区
轮询(默认),可能造成数据倾斜问题
消费者
java API
spark stream
偏移量
偏移量默认存放在ZK,也可使用自定义存放使用偏移量
默认60秒更新一次偏移量
删除数据策略
kafka数据存储在ZK,通过时间策略删除(默认7天)
架构
producer
消息生产者
消息不经过内存缓冲,直接写入文件
自己决定往哪个partition写消息
轮询
hash分区
等等
consumer
消息消费者
自己管理偏移量
每个consumer都有一个group
broker
kafka集群server
去中心化集群
基于zookeeper
负责消息读,写请求,存储消息
topic
一个消息队列为一个topic
partition
一个topic分成多个partition
一个partition只对应一个broker,一个broker可以管多个partition
zookeeper
存储元信息
特点
消费者生产者模型,FIFO
高性能
单节点支持上千个客户端
持久性
消息直接持久化在普通磁盘上且性能好(顺序写入)
分布式
数据副本冗余、流量负载均衡、可扩展
灵活
消息长时间持久化,消费者自己维护偏移量
kafka写磁盘还快的原因
kafka写磁盘为顺序写入
kafka用了sendFile的0拷贝技术,提高速度
零拷贝指计算机操作过程中,cpu不需要为数据在内存直接的拷贝消耗资源
通常指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式
批量读写,64K为单位,100K为单位,每次网络传输量小,提高读写性能
如何保证数据不丢失
生产者
数据生产后不回调
数据生产后,第一个副本完成后回调(默认一个分区一个副本)
数据生产后,所有副本完成后回调
消费者
保存消费偏移量
kafka to spark
receiver模式
使用kafka的消费者模型
当偏移量更新之后,计算过程如果出现异常导致程序结束,当前batch的数据丢失
开启WAL预写日志,可防止数据丢失
direct模式
直接连接broker拉取数据
job开始计算的时候才区拉数据
自定义管理偏移量
hbase
redis
mysql
rdd
五大特性
RDD由一组分区组成
函数实际上是作用在每个分区上
每个分区都会由一个task处理
RDD之间有一系列的依赖关系
分区类算子必须作用在kv格式的RDD上
窄依赖
一对一
宽依赖
分区对应关系:一对多,多对一
有partition
切分stage每一个stage里面是一组并行计算的task
spark为task运行提供了最佳计算位置
移动计算而不是移动数据,任务执行本地化
常用算子
transformations
map
fliter
flatmap
sample
reduceByKey
union
join
mapValues
sort
partitionBy
actions
count
collect
reduce
lookup
save
每个RDD都会记录自己依赖与哪个或哪些RDD,万一某个 RDD的某些partition挂了,可以通过其它RDD并行计算迅速 恢复出来
弹性分布式数据集
缓存
磁盘
内存
内存与磁盘
压缩。。。
副本_2
任务执行流程
基本流程
加载数据集
spark没有读取数据的方法,使用mr的方法inputFormat(split)
transformations延迟执行
action触发执行
广播变量&累加器
累加器
特性
局部累加
全局汇总
使用流程
1、在driver端定义
2、在excutor端累加
3、在driver端读取
广播变量
变量副本数量
使用广播变量
等于Excutor
不使用广播变量
等于task数量
特性
只能在driver端定义
只能在excutor端读取,不能改变值
可以实现mapjoin
副本减少,task线程对象减少,提升效率
使用流程
1、task需要用到广播变量时找excutor读取
2、excutor有则返回,没有则向driver端请求
3、driver端返回