导图社区 知识点整理
SparkCore,系统构架,原理,流程,pySpark,实际操作用法,简介,开发方式,使用场景,与Hadoop的对比,看过哪些源码,SparkSteaming,SparkSQL,Spark优化。
编辑于2021-04-11 10:57:53知识点整理
Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口
Spark Core
系统架构
运行模式
Local
调试
Standalone
集群,Spark原生的CM
Mesos
集群
Yarn
集群, Yarn ResourceManager
概念组成
Application应用程序
基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor
用户写的
Driver驱动
在yarn模式中,driver在MRAppMstr中运行 spark.driver.memory=1g默认 spark.driver.cores=1(默认) 就是MRAppMstr的核数 1、driver需要申请的内存 = 基本内存 + 额外内存 基本内存: spark.driver.memory=1g(默认) 额外内存: 1、有参数配置则直接等于参数配置 spark.yarn.driver.memoryOverhead 2、无参数配置(即默认) max(driver基本内存 * 0.1,384M)
运行Application的main()函数,并且创建SparkContext;
new SparkContext(conf): appName;Master;conf
用户写的RDD DAG
通常SparkContext代表了Driver
可能本地运行,也可能在某worker上
根据SparkContext向cluster Manager(或者Yarn)申请资源
RDD 的 DAG,逐层RDD及其依赖关系
将Application分解成多个stage,stage中包含多个tasks
Cluster Manager
在集群上获取资源的外部服务
Standalone|ResourceManager(Yarn)|Mesos
taskscheuler分配资源给executor
Executor
默认executor使用的vcore spark.executor.cores=1(默认) yarn中每个executor进程默认用一个vcore 默认能起的executor数 spark.executor.instances=2(默认) 4、executor内每个task使用的核数: spark.task.cpus=1(默认) 5、每个executor内能够并行运行的task数: spark.executor.cores / spark.task.cpus 3、executor需要申请的内存 = 基本内存 + 额外内存 基本内存: spark.executor.memory=1g(默认) 额外内存: 1、有参数配置则直接等于参数配置 spark.yarn.executor.memoryOverhead 2、无参数配置(即默认) max(executor基本内存 * 0.1,384M)
某Application运行在Worker Node上的一个进程
executor是一个进程 task是一个线 executor数,默认为2,通过spark.executor.instances配置参数
Heartbeat向CM|Yarn汇报状态
集群架构
Master
Master进程
Driver进程
Worker
Worker进程
executor进程
执行task,内存20%
shuffle copy缓存 20%
spark.shuffle.memoryFraction
RDD持久化 60%
spark.storage.memoryFraction
原理、流程
RDD: 内存共享模型,弹性分布式数据集
Spark 的核心
对象的集合,只保存元数据信息, 有各种方法,operation()
分布式
数据分布在磁盘/HDFS上,不在RDD里面
只读
静态的每次transform都生成一个新的RDD
迭代式算法
迭代式,出错找父RDD
容错性
数据丢失时,可以进行重建;可对RDD缓存
惰性调用
Transform不做运算,只记录过程;Action才进行计算
生成方法
parallelize(,partition)
rdd=sc.parallelize(list,partition)
文件加载
spark.sparkContext.textFile
spark.read.csv
spark.read.json
option(multipleline=true)
InputFormat
HDFS, Hive, Hbase
转换生成新RDD
算子
Transform
RDD间的转换;得到另外一个RDD, 不会马上提交spark运行
map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues
宽依赖 Shuffle
repartition,join,cogroup,sortBy,reduceByKey, groupByKey,*ByKey等算子
join还不一定
子 RDD 的各个分片会依赖于父RDD 的多个分片
会触发Shuffle,产生的RDD叫做ShuffleRDD
窄依赖
map,filter,mapPartitions,mapValues,union
子 RDD的各个分片(partition)不依赖于其他父RDD分片
不需要Shuffle
Action
collect, reduce, count, save, lookupKey,take,countbykey,countbyvalue
提交spark运行(生成从头开始的job),且返回结果,不是返回RDD
collect等返回结构到driver端,内存消耗
Shuffle
Shuffle是分区间的行为
RDD中的数据,从不同的分区去到不同的分区;还会跨node传输
需要写入到磁盘并通过网络传输,有时还需要对数据进行排序,消耗网络以及磁盘的I/O
可以再shuffle阶段,写入Combiner
Shuffle write
类Map,将ShuffleMapTask的结果写入到内存
Shuffle fetch
类reduce过程,获取ShuffleMapTask的结果给ShuffleReduceTask
DAG
Directed Acycle graph,反应RDD之间的依赖关系
从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分
Stage
DAG中,Transform环节,根据宽依赖的Transform来分stage;stage边缘会触发Shuffle
Taskset: task以数据分区做划分
1. MR里面的task,map task和reducetask的,maptask是以inputsplit划分的,task里面数据再继续分区 继续partition/sort/combine ,到了reduce端task再sort/combine/reduce 一个node同时只能跑一个task(默认) 2. Spark的task概念,stage层下面的概念,stage里是一个taskset,以数据分区,每个分区一个task处理。 每个task以core为对应,一个core运行一个task。
一个分区一个task
Partition分区
分区的意义,目的是计算时候的key,保证至少在同一个node上 不要有node间的shuffle 原则上,分区应该等于不小于集群的total cores。甚至是total cores的2-3倍,以实现资源利用最大化
优先考虑数据本地化
数据从HDFS读入,分区是默认的,不需要partitioner
shuffleMap分区
数据读入时的分区inputSplit,并行度分区; 到后面的k,v整理好的hashpartition
sc.defaultParallelism = spark.default.parallelism sc.defaultMinPartitions = min(spark.default.parallelism,2) b,从hdfs分布式文件系统hdfs://生成的rdd,操作时如果没有指定分区数,则默认分区数规则为: rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions) k,v的分区才是有意义的,为了算子服务
file.blocksize|defaul.parallelism
shuffleReduce分区
reduce算子决定的分区数,默认也是由spark.default.parallelism决定
分区片大小上限
spark.files.maxPartitionBytes
Task
和MR里面的task概念有本质区别
executor中的线程,每个task使用一个vcore(spark.task.cpus)
一个partition由一个task运行,partition数决定task数
流程
1. Spark-submit提交
2.启动Driver进程,
在APPMaster的container中
2.1运行Application运行环境
2.2创建SparkContext
sparkConf spark.executor.instances spark.executor.cores
2.3向Yarn等集群资源管理器获取资源,运行executor
2.4.ClusterMaster分配Executor
3. SparkContext将应用程序代码分成stages,一个stage包含多个tasks
3.1SparkContext 根据RDD 的依赖关系构建DAG图
3.2 DAGScheduler 将DAG解析成stages
3.3 Stages发送给TaskScheduler
4.ClusterManager|Yarn分配资源,启动excutor,到各个worker上执行excutor
4.1 Executor向SparkCatext申请task
4.2 TaskScheduler分配Task给executor
4.3 SparkContext将应用程序发送给executor
5.Executor的并发度由CPU Core决定,每个core上同时一个task
pySpark,实际操作用法
SparkSession
Spark2.0
2.0之前SparkContext,SparkSession封装了SparkContext
ss.read.[option(header,true).]text()生成sql.DataFrame
.rdd生成pySpark.rdd.RDD
.map生成PipelineRDD
如果是sc, sc.textFile(file,partitions),生成的是RDD
封装了SparkConf
SparkSession.conf()
封装了sqlConext
封装了sqlContext?
SparkSession.sql(select * from)
ss.close()
只要不close,rdd一直在内存中,一直可以复用
SparkConf()
SparkConf().get(spark.executor.cores)
SparkConf().getAll()
RDD生成
sparkContext.parallelize(list,partition)
生成ParallelCollectionRDD
第二个参数可以制定partition个数,必须>=2
read.csv|text|json
读取压缩文件需要repartition
ss.read.option(header,true).text(,partition) 读取文件头
分区,受defaultPartitions以及HDFS文件blocks,以及total cores等影响
Hive|Hbase|Mysql
Transform生成新的RDD
RDD转换Transform
dir(RDD)来获取所有的方法
join()
窄: 当两个RDD都使用hashpartition且分区数据一样,数据不需要shuffle
宽: 两个RDD的分区不一致,这时候就是宽依赖
提供join, leftOuterJoin,rightOuterJoin,但是只针对k-v pair
窄依赖
map(func) | 窄
mapPartitions每次处理一个分区,map每次一条
flatMap(func) | 窄
map后对rdd再进行一次flatten,迭代器,扁平化输出
mapValues(func)
相当于map手动去k,v后,只对v操作
只用在k,v上,把所有v用func处理一遍,k不变
wjoin1.mapValues(lambda x:x if int(x)>999 else 0).sortByKey(lambda x:x[1]).take(50)
flatMapValues(func)
可以将(k,(v1,v2))展开成(k,v1),(k,v2)
a.union(b) | 窄
不会去重
filter(func) | 窄
filter(lambda x:条件)
keys,values | 窄
sample(withReplacement,fraction=0.01)
sample算子时用来抽样用的,其有3个参数 withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复 fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30% seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,那么这个参数是干嘛的呢,这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
取样1%, 检查数据倾斜
takeSample(withReplacement,num)
mapPartitions
function一次接收所有的partition数据,吐给mysql等只需要一个par一个连接
容易造成OOM
宽依赖 Shuffle
distinct() | 宽
无参数,返回rdd,未排序
instersection | 宽
需要,相当于join了
substract 宽
需要shuffle
sortByKey(ascending,NumPartition)
只能用于k,v, 且只能排k
sortByKey()不写默认以asc排序,以k为排序
不是k,v对,就会报错 task lost
sortBy(lambda x:x[1],ascending,NumPartition)
sortBy底层是sortByKey(),默认升序;
不只用于(k,v),既可以用于(k),也可以用于(x,x,x,x,x);比sortByKey多一个参数
groupBy(func)
接受函数分组,将key分组,同时所有key转成迭代器
x = sc.parallelize([1,2,3]) y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B') print(x.collect()) print([(j[0],[i for i in j[1]]) for j in y.collect()])
groupByKey(numPartition,partitioner)
只能用于k,v
默认不用参数,输出结果类似groupBy(func);以key分组,同时把v转成迭代器
groupByKey(func)返回的是(key,iteratebal迭代器)
纯生成迭代器,所以只有shuffle,没有reduce功能
reduceByKey(func)
vs reduce(): 后者是action,reduceByKey是transformation vs groupByKey(): 后者在reduce端生成迭代器,再聚合;reduceByKey()先在map端聚合,再shuffle,消耗资源少,高效
只能用于k,v,其他会报错
至少有1个函数参数,shuffle前聚合; 区别于reduce,后者是action
可以当作是spark中的combiner来用,combine逻辑自定义,一样不能avg
*ByKey | 宽
底层基本上都是combineByKey x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)]) def to_list(a): #单个v 转c return [a] def append(a, b): #c 与v 合并 a.append(b) return a def extend(a, b): #c 与 c 合并 a.extend(b) return a sorted(x.combineByKey(to_list, append, extend).collect()) [('a', [1, 2]), ('b', [1])] ————————————————
reduceByKey先聚合再shuffle,groupByKey先shuffle再聚合
aggregate(初始值,分区内聚合func,分区聚合func),也是比groupByKey高效
stage边缘,shuffle需要; 判断宽窄的唯一标准就是需不需要shuffle。子rdd一个分区中的数据是否依赖于多个父rdd分区
RDD动作Action
countByKey()
>>> wsj.map(lambda x:x[0]).countByValue() defaultdict(<type 'int'>, {u'mobile': 32618, u'tablet': 18719, u'desktop': 32053}) >>> wsj.map(lambda x:x[0]).countByKey() defaultdict(<type 'int'>, {u'm': 32618, u'd': 32053, u't': 18719}) >>> 没毛用 还有坑
注意这个是个action,返回一个dict
如果对象是字符串,首字母作为key;如果是k,v,以k作为key来count
countByValue()
>>> wsj.map(lambda x:x[0]).countByValue() defaultdict(<type 'int'>, {u'mobile': 32618, u'tablet': 18719, u'desktop': 32053}) >>> wsj.map(lambda x:x[0]).countByKey() defaultdict(<type 'int'>, {u'm': 32618, u'd': 32053, u't': 18719}) >>>
不管对象传进来什么,全部拿来做keyCount;适合做word Count
count()|min|max|sum|mean()
返回int
collect()
stdout,将rdd转换成list
all into Drivers memory!!! attention!
first()
取值都是action
take(10)
默认顺序,比较快
生成list
takeOrdered()和top相反,默认升序
top()
数字按大小排序
lookup()
rdd.lookup(someKey)
reduce(func)
并行整合,返回一个值,汇总或最大最小
.reduce(lambda x,y:x+y), 只是代表其中元素相加
输出类型和输入一样
区别于reduceByKey,这个是一个action
rdd.saveAs*
生成一个目录
foreach, foreachPartitions
foreachParttion可以减少连接数
RDD进阶
broadcast
将小 RDD 复制广播到每个 Executor 的内存里
注意是executor 比放在tasklevel。更省内存
persist|cache
cache=persist Memory ONly
将RDD计算写入内存(或者磁盘),根据缓存级别不同
unpersist
cache = persist(memoryonly)
checkPoint
vs Persist: checkpoint是截断;且写入HDFS persist/cache不截断;根据缓存级别不同写入内存、磁盘
写入hdfs,rdd关系截断,后续计算都从磁盘二进制文件读取
RDD分区
.getNumPartitions()
获取partition个数
.glom
按照分区显示输出
repartition
重新分区,且重新分布
强制shuffle的coalesce,repartition=coalesce(n,shuffle=True)
增加分区,不shuffle则无意义。rdd.repartition(100)
coalesce
repartition是coalesce的强shuffle版本 减少分区,部分分区无数据,可以用coalesce,无shuffle减少分区数
减少分区的时候,建议用coalesce,可以shuffle=false
分区
分区函数
HashPartition
RangePartition
每个分区分配一个task
分区就决定了stage task的并行度
shuffle之前的分区数,不会变的,直到shffufle发生之后,分区数变化
shuffle之后的分区(并行度),和算子有关,部分算子可以指定分区groupByKey,cogroup,distinct,reduceByKey,sortByKEY
简介
伯克利开源项目,由scala开发
迭代式应用的高性能需求而设计
Hadoop生态圈,Yarn框架支持
开发方式
spark-submit
spark-submit test_pySpark.py
参数
解释器交互
spark-shell
scala,兼容java
pyspark
python
sparkR
R
yarn apllication
-list
-kill appid
使用场景
Spark [Core]
批处理 ,RDD
类 MapReduce、Hive
Spark SQL
类SQL处理,交互式,返回Spark-DataFrame
类 Impala
依赖于Hive MetaStore
将HiveQL转换成了Spark的RDD操作
由SparkCore封装
Spark Streaming
流式处理;线上实时时序数据
类Storm
MLlib
机器学习
类Mahout
GraphX
图形算法 pagerank
与Hadoop的比对
Spark特点
(1)提供 Cache 机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的 IO 开销;
(2)提供了一套支持 DAG 图的分布式并行计算的编程框架,减少多次计算之间中间结果写到 Hdfs 的开销;
(3)使用多线程池模型减少 Task 启动开稍, shuffle 过程中避免不必要的 sort 操作并减少磁盘 IO 操作
利用数据集内存缓存以及启动任务时的低延迟和低系统开销
支持多种语言编程 scala|python|java,spark-shell解释器
vs Hadoop
Map+Reduce有限的处理方式 vs RDD, RDD灵活,多种数据集操作提供更多转换,如sort,join
中间过程写入磁盘,二次处理,磁盘IO开销大 vs 内存中处理,同时支持溢出
Reduce需等待Map vs 同一个partition中支持流水线处理
高延迟 vs 小batch处理,stream流处理
优势
低延时
高并发
低磁盘IO
不保存 少保存到磁盘
低依赖
stage内部并发,支持到core的高并发
分区内相同的转换,内部处理不需shuffle
容错性高
RDD重算,惰性操作,RDD可复用共享
cache持久化
抽象程度高
代码简洁,语义清晰;不需要在意具体实现细节,丰富api
逻辑清晰度高
DAG
灵活度高
多种操作转换api
看过哪些源码?
Spark Streaming
StreamingContext
应用案例
# -*- coding: utf-8 -*- from pyspark import SparkContext from pyspark.streaming import StreamingContext # 创建 sc对象, local[2]表示“本地2个核”, NetworkWordCount为应用名称 sc = SparkContext("local[2]", "NetworkWordCount") # 设置日志级别 sc.setLogLevel("WARN") # 创建Spark Streaming Context对象, 此处的5表示5秒钟为一个单位接收数据流 ssc = StreamingContext(sc, 5) # 从一个端口为9999的服务器接收TCP socket数据流 lines = ssc.socketTextStream("localhost", 9999) # 下面是实时处理的过程,单词计数 words = lines.flatMap(lambda line: line.split(" ")) pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) wordCounts.map(lambda x: (x[0].encode('utf-8'), x[1])).pprint() # 启动应用 ssc.start() # 等待应用结束
实质是micro batch的形式通过Spark处理
整合Kafka
DirectDStream
Receiver
SparkSQL
简介
前身: Shark (DataBricks)
即Hive on Spark.
旨在hive不局限于mapreduce计算框架,提升速度
2014.7停止,转向SparkSQL
Shark对于hive过多依赖
是一个项目,一个模块
将SQL转化为对RDD/DataFrame的操作
支持基于DF的类SQL的DSL处理
两个分支
HiveContext
hive语法解析器
sqlContext
SQL语法解析器,支持比hive多的语法
主要组成
Core(数据处理)、Catalyst(执行计划解析优化)、Hive(hive支持)、Hive-ThriftServer
支持SQL需要: Parser+Optimizer+Execution
特点
高度整合
SparkSQL可以和Spark无缝结合,同时支持多种语言开发python scala java R
多数据源整合
多数据源数据(hive,parquet,json,s3,hdfs)最终DF落地成table、view,然后join union等
与Hive相容
reuse Hive Metastore以及UDF等,支持HiveQL语法(HiveContext)
与 Hive
Hive依赖于MapReduce,SparkSQL基于Spark更快
两者都不负责计算,实际的计算引擎是MR or Spark
SparkSQL还依赖于Hive Metastore
SparkSQL不止可以通过Hive,还可以通过File/RDBMS/NoSQL,RDD获取数据
运行方式
sqlContext
spark.sql(), ss.sql()当前运行环境
DataFrame API
spark1.3开始有,此前是schemaRDD
DataSet API
spark1.6开始
DataFrame
DataFrame是SparkSQL的灵魂 就像RDD之于Spark
DataFrame前身SchemaRDD(Spark1.3之前)
Spark1.3之后,不再继承于RDD,独立实现方法
DataFrame底层使用同一个优化器,跟用python,scalar等语言开发无关
生成
spark.sql生成
spark.createDataFrame(list)
spark.createDataFrame(k-v)
spark.createDataFrame(someRDD,[schema])
createDataFrame(rdd, StructType)
用structType可以指定数据类型
df=RDD.toDF(schema)
基本操作DSL
df.printSchema()
df.columns
df.count()
df.head(3)
跟df.take(3)结果一样
df.show()
默认20行
df.filter(isnull(col)
wtfDF.filter(BrowsChrome).show()
df.select().distinct()
select(col.alias(newname))
df.select().display() 简单图表
df.selectExpr()
df.where(1=1).show()
df.dropDuplicates
df.drop(col)
删除一列
df1.join(df2,df1.key==df2.key,inner)
等同df1.join(df2,key,inner)
df.orderby(df.key.desc()).show(10)
.orderBy(wtfDF.Sessions.desc(),Bounce Rate) 如果用.desc(), 需要引用df.col
.orderBy(desc(Sessions),Bounce Rate).show()
这种用法要用到from pyspark.sql.functions import *
df.sort(,ascending=False)
居然比orderBy还好用 >>> plDF.groupBy('Club').agg(max('Overall').alias('maxrate'),avg('Overall').alias('avgrate')).sort('avgrate',ascending=False).show() 比Rdd.sortBy()少了
df.union
df.explode一行拆多行
df.withColumn()列改动
df.withColumnRenamed(old,new)列改名
df.intersect()
df.subtract()
df.foreach(); df.foreachPartition()
df.sample()
>>> plDF.count() 18207 >>> plDF2=plDF.sample(0.01) >>> plDF2.count() 178
聚合运算
df.groupBy(col).agg({col1:avg,col2:max})
df.groupBy().agg()
spark.sql('select Club,count(1) as count,avg(Overall) as avg_rate,max(Overall) max_rate,avg(Age) as avg_age from ftpl group by Club order by avg_rate desc').show() 等于 >>>from pyspark.sql import functions >>> plDF.groupBy('Club').agg(functions.max('Overall'),functions.avg('Overall')).show() 等于 >>> from pyspark.sql.functions import * >>> plDF.groupBy('Club').agg(max('Overall'),avg('Overall')).show() 注意:如果是import *的话,命名空间是本地,可以直接引用函数
同 max,min,avg,sum,count
默认生成字段名: avg(col1),max(clo2), 需要进一步.withColumnRenamed()
开窗函数
df2.withColumn(rid,row_number().over(Window.partitionBy(height).orderBy(name))).show()
生成表view
df.registerTempTable(newTable)
注意:只在当前的ss里面有效,spark.sql()引用不到,ss.sql()可以找到表
ss.sql("select Browser,max(`Bounce Rate`) as max_br from v_wtf group by browser").show() 带空格的列名,引用和mysql一样,用反引号
df.createOrReplaceTempView(viewName)
同样,只在ss.sql()中有效,在spark.sql()找不到
RDD DataFrame Dataset
与RDD
df.rdd转化成RDD, rdd.toDF(schema)
DF是Row的集合,包含了数据的结构信息(列名,类型)
RDD是java对象的合集 ; DF是分布式Row的合集,比RDD多了schema的信息
Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row
DF支持表的操作:df.select(), spark.sql(select ...)
RDD对于join合并等支持不够
DF提供比RDD更高层更丰富的api。 比如开窗函数,列名更改
RDD需要自己进行优化,DF有自动优化器处理Catalyst
通过Spark SQL Catalyst优化器(树变换框架)的最先进的优化和代码生成。
DF可以输出Table/View进行sql操作
python不支持dataset api
调优
broadcast
spark.sql.autoBroadcastJoinThreshold
基表不能被broadcast
?
Spark优化
spark-sumbit参数调优
./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
driver-memory
Driver进程使用的内存
collect()使用driver的内存
Memory for driver (e.g. 1000M, 2G) (Default: 1024M)
Executor-memory
每个Executor进程申请的最大内存,num-Executor*Executor-memory决定了Job申请的最大内存
Memory per executor (e.g. 1000M, 2G) (Default: 1G).
spark.executor.instances
指定Job总共最多用到多少个Executor来处理
经验上由total cores/spark.executor.cores确定
Number of executors to launch (Default: 2).
executor的数量,在sparksession声明的时候,由executor.cores确定,后面怎么ss.conf.set()都没用
默认是2; 在yarn上,container=executor+driver
spark.executor.cores
每个executor申请的core数,决定了task在executor上的多线程上限
executor.instances * executor.cores 决定了total cores,允许并行跑的上限
理论上可以配置超过集群的vCore总数,有啥risk?
Number of cores per executor. (Default: 1 in YARN mode,or all available cores on the worker in standalone mode)
--conf spark.storage.memoryFraction
RDD持久化所需要的内存占比,默认0.6;根据持久化策略,以及shuffle内存的使用情况,可以调整
--conf spark.shuffle.memoryFraction
shuffle内存使用上限,默认0.2
--conf spark.default.parallelism
默认的每个stage里task同时运行的上限
建议是total cores的2-3倍
RDD的默认分区数,影响了sc.read的partition(28->56)
决定了shuffleRDD的默认分区数,如果算子么有指定的话
提高并行度,有利于加快job运行,但是也会增加磁盘IO和cpu压力
--conf spark.sql.shuffle.partitions
和spark.default.parallelism一样,都是决定分区并行度的
区别于spark.default.parallelism,这个是DataFrame操作中决定join/aggregation操作shuffle端的分区数
spark.defaul.parallelism对DF不起作用,spark.sql.shuffle.partiitons对RDD,以及非join/agg的DF操作不起作用
spark.sql.autoBroadcastJoinThreshold
集群调优
代码优化
RDD持久化
60%内存分配?
复用RDD, 且有必要进行cache
关联到spark.storage.memoryFraction 0.6占比
cache
=persist MemoryOnly
内存不够就不会进行持久化
persist
Memory Only
Memory And Disk
优先内存,不够了就存在磁盘上; 子RDD需要部分从磁盘读取
Memory Only Ser
内存only,但是需要序列化处理,更省内存
Memory And Disk Ser
需要序列化,省内存?
反序列化消耗时间,还有CPU
Disk Only
MemoryOnly_2 MemoryAndDisk_2
2代表在其他节点上保留一个副本,容错
unpersist
需要注意,不需要的RDD,要及时释放内存
broadcast
对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。
将小的RDD,broadcast(bc的不是RDD本身)到各个excutor内存中; 适用于大表join小表,小表进行broadcast,类似hive中的mapjoin
到executor级别,相比到task级别,节省了内存开销; 减小了大表的数据传输IO
算子选择
foreachPartitions vs foreach
减少连接数
mapPartitions优于map
reduceByKey() 优先于 groupByKey().mapValues(sum/len); groupByKey会全部先shuffle
思路:选择较优方案,避免shuffle后再汇总,先combine
处理数据倾斜
1.初始状态下,文件分区后,各个分区中的数据是杂乱的,并没有key的概念 2.数据倾斜的概念,是等到shuffle要发生,发现groupByKey/reduceByKey/join等的key,在某个分区上的分布的远远多于其他partition。 造成了这个partition的task需要处理、shuffle比其他分区多的情况 3.大多数shuffle发生的时候,RDD已经被map/filter操作成了 k,v对的格式。这时候就可以针对key重新分区了。 只有k,v pair上才有partitioner。 所以谈到数据倾斜的时候,基本也是针对k,v pair来讲的。 4.针对k,v的join,可以把小表broadcast到executor中,注意是executor level,这样各个task的partition的大表分区,可以直接和小表进行join,而不需要进行shuffle。 5. 如果是两个大表(k,v)的join,这个时候可以考虑,对k运行partitioner,类似hashpartition,重新分区,且分区数一致。可以保证两边的数据都在同一个节点,其实同一个分区上。减少了shuffle
重新分区
减少分区:filter之后coalesce重新分区(减少task,减少启动消耗,可以shuffle=False)
增加分区:如果只是部分分区数据过多,可以尝试repartition增加分区数,扩大并行度
针对某个hash取余,过度集中
reduceByKey,distinct,cogroup,groupByKEY,sortByKey,都有partition参数,来调整stage的并行度
某key过多
针对join,可以用broadcast小表,减少shuffle
针对聚合运算(max/sum),某key过多,可以加前缀,打散并行处理后,去前缀再和其他汇总
针对join,某key过多,可以独立这些数据出来,加前缀,重新分区(两个表分区一致);另一个均匀的表,扩容n倍;join完后和剩下的其他数据合并
.sample()
sample算子时用来抽样用的,其有3个参数 withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复 fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30% seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,那么这个参数是干嘛的呢,这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
.sample(要不要放回去重新抽,抽样比例).coungByKey
数据本地化
taskIDtoLocaltion
getPreferLocation
数据存储格式
ORC相比textFile等,拥有更好的查询性能和压缩比