导图社区 Spark 核心技术
spark核心技术学习,希望能够给同学入门提供帮助,一起学习
编辑于2020-07-07 23:12:46Spark 核心技术
环境搭建linux
安装scala
下载 http://www.scala-lang.org/download/2.11.8.html 版本为2.11.8(scala-2.11.8.tgz) 上传:上传到master虚拟机上的目录/usr/local/lib/scala中(可以用FileZilla等ftp工具上传) 用root用户解压: tar -xvf scala-2.11.8.tgz 在root用户下,将解压后的JDK目录拷贝到slave1和slave2: scp -r scala-2.11.8 root@slave1:/usr/local/lib scp -r scala-2.11.8 root@slave2:/usr/local/lib 分别在三台虚拟机上切换到hadoop-twq用户修改环境变量: vi ~/.bash_profile export SCALA_HOME=/usr/local/lib/scala-2.11.8 PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin source ~/.bash_profile which scala查看scala的安装目录是不是我们想要的 测试是否安装成功: scala
安装spark
Spark2.2 需要的环境: Java 8+, Scala 2.11.X 下载 http://spark.apache.org/downloads.html 上传:上传到master机器节点的目录~/bigdata下(可以用FileZilla等ftp工具上传) 需要预先在每一个节点中的hadoop-twq用户下的根目录下创建bigdata目录 解压 tar -xf spark-2.2.0-bin-hadoop2.7.tgz 配置slaves cd spark-2.2.0-bin-hadoop2.7/conf cp slaves.template slaves vi slaves,写入如下内容 slave1 slave2 配置spark-env.sh cp spark-env.sh.template spark-env.sh vi spark-env.sh写入如下内容 export JAVA_HOME=/usr/local/lib/jdk1.8.0_151 ● 将配置好的spark拷贝到slave1和slave2节点上: scp -r ~/bigdata/spark-2.2.0-bin-hadoop2.7 hadoop-twq@slave1:~/bigdata scp -r ~/bigdata/spark-2.2.0-bin-hadoop2.7 hadoop-twq@slave2:~/bigdata ● 在master上配置环境变量: cd ~ vi ~/.bash_profile export SPARK_HOME=~/bigdata/spark-2.2.0-bin-hadoop2.7 source ~/.bash_profile
启动Spark服务
启动 sh ~/bigdata/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh http://47.92.157.10:8080/ 查看是否成功 ● 使用spark-shell --master spark://master:7077 测试spark代码
spark-submit应用的提交
将spark应用打成jar包,用filezilla工具将jar包上传至服务器 用xshell连上服务器,用下面的命令进行提交scala/java spark应用 hadoop fs -rm -r hdfs://master:9999/user/hadoop-twq/wordcount spark-submit \ --class com.twq.WordCount \ --master spark://master:7077 \ --deploy-mode client \ --driver-memory 1g \ --executor-memory 1g \ --num-executors 2 \ spark-wordcount-1.0-SNAPSHOT.jar
安装MySql
Mysql的安装(用root账号安装) yum install mysql-server, 如果报右边的错,则执行下面两个命令: wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm rpm -ivh mysql-community-release-el7-5.noarch.rpm service mysqld start mysqladmin -u root password 'root' mysql -u root -p 输入密码,进入到mysql,并执行以下语句(使得客户端可以以root账号连上mysql服务): GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION; flush privileges;
Spark基于分布式的概念
分布式存储的原理
分布式存储的特点
1:数据分块存储在多台机器上
2:每一数据块都可以冗余存储在多台机器上,以提高数据块的高可用性
每个数据块可以冗余的分布存储在2台机器上
机器节点如何管理的
分布式内存
概念
什么是分布式内存? 从集群计算资源的管理上,和分布式磁盘管理类似,分布式内存是管理整个集群中每台机器的内存的大小 从单个应用计算master对计算资源的管理上来说,每个应用先申请需要的计算资源,然后在申请到资源的节点上启动计算服务,这个服务同时负责对在这个节点上申请到的资源进行管理,并且使用资源的时候需要向计算matser汇报
分布式内存的计算
疑问
Shuffle过程spark是基于内存的? 而MapReduce是基于磁盘的? 是真的这样吗? 不是的, MapReduce是基于磁盘的,没错 但是spark不完全是基于内存的, spark的shuffle中间结果也是需要写文件的, 只是对内存的利用比较充分而已
大数据中间结果数据的复用场景
一种是迭代式计算应用
一种是交互型数据挖掘应用
各种框架对中间结果复用的处理
1: MapReduce以及Dryad等 将中间结果写到分布式文件系统中,需磁盘IO
2: Pregel和HaLoop等 将一些特定的中间结果数据隐式的存储在分布式内存中
3: spark可以将任何类型的中间结果数据显示的调用api存储在分布式内存中
spark对中间结果的缓存方式
spark调用RDD中的rdd.persist(StorageLevel.MEMORY_ONLY) 方法来缓存这个中间rdd结果数据 val points = spark.textFile(...).map(parsePoint).persist() var w = // random initial vector for (i <- 1 to ITERATIONS) { val gradient = points.map{ p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y }.reduce((a,b) => a+b) w -= gradient }
Spark分布式计算的几个疑问
计算是怎么并行计算的?
答: 每一个block数据块就是一个分区计算的输入数据集,对每一个block计算都是可以同时进行的, 这样就达到了并行计算的目的,对于按照相同key来聚合(相同key必须在同一个分区中)的步骤, 可以根据数据的特点对数据进行重新分区
每一步的计算怎么理解?
答: 计算之前我们会给每一步定义 一个计算函数,每一步计算都是将 这个自定义函数应用到每一条数据中, 然后得到计算的结果数据
如果在计算第4步的时候,有某个计算任务因为网络原因等挂掉了,怎么办?
答:重新从其依赖的第一步和第二步以及第三步计算得出第四步需要的数据
数据的分区方式是什么样的?
答:对于一开计算的时候,读取文件的时候,文件的每一个block就是一个分区, 当然我们也可以设置成2个block一个分区,对于key-value类型的数据的分区 我们可以根据key按照某种规则来进行分区,比如按照key的hash值来分区
在计算伊始读取分区数据的时候,会发生从其他机器节点通过网络传输读取数据吗?
答:可能会发生,但是需要尽量避免,我们需要遵循移动计算而不移动数据的原则。 每一个数据块都包含了它所在的机器的信息,我们需要根据这个数据块所在的机器, 然后将计算任务发送到对应的机器上来执行,这个就是计算任务的本地性
每一步出现的数据都是一份存储吗?
答: 不是,数据的存储只有一份,就是一开始的数据存储,在shuffle的时候会有中间临时数据的存储
Spark通过抽象出RDD来解决实现上面的6个问题,所以每个RDD都会有以下6个特性:
1 : 一个分区列表,用于并行计算,每个分区对应这个一个原子数据集,作为这个分区计算的数据输入 2 : 计算这个RDD某个分区数据(这个分区数据是由父亲RDD对应分区计算出来的)函数 3 : 一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错) 4 : 这个rdd的分区元数据信息,其实就是该RDD怎么分区的,比如某个rdd是通过hash partitioner得到的 5 : 分区数据的存储地址,用来实现计算任务的本地性 6: spark的计算是“流”式计算
Spark的组件
Spark Core (RDD(Resilient Distributed Datasets))
Spark Core概念详解
工作流程及组件介绍
Spark core要做的事情
☛ 初始化spark集群以及提交spark计算应用
☛ 对spark应用进行计算任务(task)的分解调度以及执行
☛ 执行task的时候需要的资源的管理
1.deploy负责spark分布式计算集群的初始化以及每次分布式计算任务的提交 2.RDD是所有计算的基础,如果想构建一个业务计算的话,先构建RDD链,这个链可以用RDD中的api在计算master中来构建 3.有了RDD链后,可以通过scheduler来分解task, 并且将这些task分发到相应的机器上执行 4.执行task是需要集群的资源的,计算master是需要从资源master中申请资源的,这部分会有一个集群资源管理模块来干这个活 5.每一个task都是在某台机器上的某个Executor中执行的 6.在task的运行过程中,可能会涉及到中间数据的存储,这个由storage组件来完成 7.task的运行需要内存,内存的管理由memory来完成 8.当一个task需要其他机器上的数据作为输入的时候就需要shuffle来完成
Security是安全组件
Serializer是序列化组件
rpc是远程调用组件
这些组件都是基础的组件
RDD
概念
定义
弹性分布式数据集,一个只读且分区的数据集
优势
高效容错 可以控制数据的分区来优化计算性能 并行处理 提供了丰富的操作数据的api 可以显示的将任何类型的中间结果存储在内存中
RDD API计算具有Lazy特性
比如:在调用inputRdd.flatMap(_._2.toString.split(" "))创建新的rdd的时候是不会真正出发计算_._2.toString.split(" ")这个函数
RDD的5个抽象api
package org.apache.spark.rdd
compute
在一个任务的上下文中,计算某一个分区的数据,得到计算结果 def compute(split: Partition, context: TaskContext): Iterator[T]
getPartitions
获取分区列表,用于并行计算 protected def getPartitions: Array[Partition]
getDependencies
获取依赖列表 protected def getDependencies: Seq[Dependency[_]] = deps
getPreferredLocations
获取RDD的某一个分区的数据存储在哪一个机器上 protected def getPreferredLocations(split: Partition): Seq[String] = Nil
partitioner
分区器,分区的元数据信息 @transient val partitioner: Option[Partitioner] = None
RDD Dependence 依赖
窄依赖
父亲RDD的一个分区数据只能被子RDD的一个分区消费,但是RDD的一个子分区可以对应父亲RDD的多个分区 父分区->子分区 是一对一或多对一关系
宽依赖
父亲RDD的一个分区的数据同时被子RDD多个分区消费 父分区->子分区 是一对多的关系
RDD Partitioner分区器
哪些时候需要分区
从存储系统创建RDD需要分区
key-value类型进行数据shuffle的时候分区
非key-value RDD分区
分区的种类
hashPartitioner(默认)
规则
key的hash值%2 进行分区
代码实现分区
rdd.partitionBy(new HashPartitioner(2)) //通过hashPartitioner将分区重新划分为2个,没有进行聚合计算
rangePartitioner
规则
范围进行partition ,按照字母顺序划分范围 假设原来有4个分区,每个分区上数据分别有50条,20条,305条,25条 实现步骤如下: 1: 对每一个分区进行数据采样并计算每一个采样到的数据的权重 rdd1.partitionBy(new RangePartitioner(3, rdd1)) 输出样本大小samples:20(分区数据量) * 3(分区数) = 60 选择最小分区数据量计算 fraction(分数): 60 / 400= 0.15 每个分区采样大小: 3.0 * samples/ 4 = 45 50条 (400, (1, 50, size为45的samples)) fraction * 50 = 7.5 < 45 权重:50 / 45 = 1.11 20条 (400, (2, 20, size为20的samples)) fraction * 20 = 3 < 45 权重:20 / 20 = 1 305条 (400, (3, 305, size为45的samples)) fraction * 305 = 45.6 > 45(重新取样) 权重:1 / fraction = 6.67 25条 (400, (4, 25, size为25的samples)) fraction * 25 = 3.75 < 45 权重:25 / 25 = 1 2: 根据采样到的数据和权重计算每一个分区的最大的key值 (2, 1.11),(3, 1.11),(3, 6.67),(4, 1),(4,6.67),(5, 1),(6, 1.11),(7, 1),(10, 1),(11, 1),(14, 6.67),(22, 6.67) 权重之和计算出sumWeight=35.01 step = sumWeight / partitions = 35.01 / 3 = 11 1.11+1.11+6.67+1+6.67刚好大于11(第一个step),所以第一个key=4 (这个数据) 同理,第二个key=14 3: 用需要分区的key和上面计算到的每一个分区最大的key值对比决定这个key所在的分区
代码实现分区
rdd.partitionBy(new RangePartitionner[String,Int](2,rdd)) //参数2为分区数,rdd为需要进行分区的rdd ,String为key的类型,Int为value的类型
用途不同
HashPartitioner(能解决大部分场景) 不支持类型为Array(_)的key 可能会导致分区数据倾斜 分区后的数据不会排序 RangePartitioner 不支持不能排序的key 可以解决分区数据倾斜的问题 分区后分区之间的数据是排序的
自定义分区
用途
如果key为url,我们希望域名相同的key进入到同一个分区
代码实现
1.继承Partitioner 抽象类 class DomainNamePartitioner(val numParts: Int) extends Partitioner 2.覆写两个方法 override def getPartition(key: Any): Int= { val domain = new URL(key.toString).getHost val code = (domain.hashCode % numParts) if (code < 0) { code + numParts } else { code } } override def equals(obj: scala.Any): Boolean = obj match { case dnp: DomainNamePartitioner => dnp.numParts == numParts case _ => false }
合理分区有助于提高性能
 分区前后性能的对比,消除了宽依赖
问题
是否保留父RDD的分区器
不保留,因为子RDD的key可能会发生改变,保留父RDD的分区没有意义
改变RDD分区数
coalesce方法
rdd.coalesce(10,true) 第一个参数为新的分区数,第二个为是否进行shuffle 返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。 第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的
repartition方法
repartition(10)和coalesce(10,true)一样都要将数据进行shuffle
RDD的操作
spark 服务启起来之后master机器上会出现SparkSubmit服务,这个服务通常被称为driver,他的作用是创建RDD,slaves机器上会出现CoarseGrainedExcutorBackend服务,被称作excutor,作用是执行task。
创建RDD的三种方式
在非spark-shell中创建RDD 首先要创建SparkContext实例,sc 创建方法: 先创建SparkConf实例 val conf=new SparkConf().setApp("word count") 然后通过把conf传入SparkContext的构造方法创建 val sc=new SparkContext(conf)
1.从存储在存储系统中的数据上来创建
val inputRdd: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(”hdfs://master:9999/word.txt", classOf[TextInputFormat], classOf[LongWritable], //从hdfs中读取text def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] 从本地读取text
2.可以基于一个已经存在的RDD来创建一个RDD
val words: RDD[String] = inputRdd.flatMap(_._2.toString.split(" ")) 就是从已经存在的inputRdd上创建一个新的RDD source: def glom(): RDD[Array[T]] //将每一个分区的RDD合并成一个数组RDD
3.可以基于一个已经在spark内存中的列表数据来创建一个RDD
val words: RDD[Int] = sc. parallelize[Int](Seq(1,2,3,4,5,6,7,8,9)) val words=sc.mkRDD(Seq(1,2,3,4,5,6,7,8,9)) //上边方法的别名 sc.makeRDD(Seq((Seq(1, 2), Seq("172.26.232.93")), (Seq(3, 3, 4), Seq("172.26.232.93")))) //指定数据存储的分区位置
RDD主要API
transformation转换API
复习 Iterator迭代器,迭代一次即丢失 迭代器有hasNext()方法和next()方法用于遍历 map方法,对应形成新的值 flatMap方法,将每个值打平,拆分元素 filter方法 foreach方法 zipWithIndex方法,以键和下标进行组合成键值对 zip方法,参数为一个迭代器,将这个迭代器的键和参数迭代器的键一一对应组合成为新的值
map
flatMap
filter
mapPartition
mapPartitionWithIndex
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] f中Int是partition的下标 Iterator是对应的是每个分区的rdd数据 Iterator为生成新的rdd
glom
将数据按照分区转换成数组,进而将所有分区转换成一个二维数组
其他api
外部程序调用API(pipe)
def pipe( command: Seq[String], //所需要调用的外部命令 env: Map[String, String] = Map(), //环境变量 printPipeContext: (String => Unit) => Unit = null, //在管道化元素之前,将此函数称为一个机会管道上下文数据。 printRDDElement: (T, String => Unit) => Unit = null, //使用此函数可自定义如何管道元素 separateWorkingDir: Boolean = false, //是否为每个任务使用单独的工作目录 bufferSize: Int = 8192, //用于管道处理的stdin写入器的缓冲区大小。 encoding: String = Codec.defaultCharsetCodec.name): RDD[String]
缓存API
作用: 不论什么系统,其性能瓶颈最后还是在IO,也就体现在输入和输出。所以在java企业级系统我们就经常使用缓存,我的场景是要对同一张大表进行很多不同的计算,我发现每次计算都会对大表重新加载数据,哪怕是用的同一个RDD,当我使用persist后,发现只有第一次计算会将mysql源表的数据加载到RDD,后面的计算会直接使用内存中的数据进行计算,而使用内存计算是秒级的。这不仅减轻了我数据库的压力,也加快了我计算的速度。
cache()
cache是将RDD保存到Spark分布式内存中,的默认级别是MEMORY_ONLY,底层是调用persist实现的
persist()
把RDD缓存到内存中,方便快速使用 有如下几种缓存级别: MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。 MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 MEMORY_ONLY_2, MEMORY_AND_DISK_2 制备一个副本 Spark中cache和persist的使用建议 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
unpersist
删除缓存api
localCheckPoint
本质上是调用persist(MEM_AND_DISK)或者persist(DISK)
checkPoint
将RDD保存到HDFS分布式文件系统中 作用: 1.将数据持久化保存 2.切断RDD之间的依赖关系
broadcast
为什么要broadcast? 各个Exuctor端都需要同一个数据,并且只有读取操作 broadcast类型变量和传输一个可序列化的变量的区别 ? 1.broadcast类型变量可以保证只在executor的内存中存在一份 2.将要传输的变量不需要实现Serializable接口 3.可以高效地传输较大的数据集 如何使用? 创建 val brRDD=sc.broadcast(rdd) 读取 brRDD.value.get(x) 销毁 可重建的销毁: unpersist 不可重建的销毁: destroy 实现机制 driver端: 目前spark中只有一种实现 TorrentBroadcast.scala executor端: executor首先从自己的BlockManager去拿,如果有就直接用,如果没有执行下一步 从driver/其它executor端拉取对象的小块放入自己的BlockManager 供自己和其它的executor使用/拉取 这样的好处是避免了driver端因为发送数据给每个executor而造成热点问题
action行为API
简单的action api
foreach
foreachPartition
collect
rdd.collect() 将内存中的RDD展示 出来,仅用于数据量较少时使用
take
takeOrdered
按照顺序取数据,默认升序.前几个.可自定义排序方式 自定义排序需要继承Ordering类,同时覆写compare方法
first
top
max
min
reduce
依次对RDD两个连续的数据进行处理 如rdd.reduce(x,y=>x+y)
reduceLeft
只能返回Seq的类型及子类型的数据
fold
和reduce类似,但是fold有初始值 fold(0)((x,y)=>x+y) 相当于0+x+y
foldLeft
可以返回任意类型的数据
treeReduce
aggregate
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 大致的意思是aggregate接收两个函数,和一个初始化值。 seqOp函数用于聚集每一个分区,combOp用于聚集所有分区聚集后的结果。每一个分区的聚集,和最后所有分区的聚集都需要初始化值的参与。
treeAggregate
比较
1: aggregate在combine上的操作,时间复杂度为O(n). treeAggregate的时间复杂度为O(lg n)。n表示分区数 2: aggregate把数据全部拿到driver端,存在内存溢出的风险。treeAggregate则不会。 3: aggregate 比 treeAggregate在最后结果的reduce操作时,多使用了一次初始值
zip
zip只能压缩在每个分区中具有相同数量元素的RDDs,从而将RDDs组合成一对一的键值对 不能压缩分区数量不相等的RDDs
combineByKeyWithClassTag
combineByKey是基于combineByKeyWithClassTag实现的
combineByKey
combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的,诸如 groupByKey,reduceByKey等等. 这个方法的作用是对相同K,把V合并成一个集合. def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) } 如下解释下3个重要的函数参数: createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作) mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行) mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行) 如下看一个使用combineByKey来求解平均数的例子 val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) val d1 = sc.parallelize(initialScores) type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) d1.combineByKey( score => (1, score), (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) ).map { case (name, (num, socre)) => (name, socre / num) }.collect 参数含义的解释 a 、score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0) 1表示当前科目的计数器,此时只有一个科目 b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScore,然后把科目计算器加1即c1._1 + 1 c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数
aggregateByKey
底层也是调用了combineByKey,两者作用差不多 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] 第一个参数是, 每个key的初始值 第二个是个函数, SeqOp, 经测试这个函数就是用来先对每个分区内的数据按照key分别进行定义进行函数定义的操作 第三个是个函数, CombOp, 对经过 SeqOp 处理过的数据按照key分别进行进行函数定义的操作
groupByKey
groupBy
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] T是rdd中的元素,f的返回值是Key
reduceByKey
distinct
去重
两者的区别
☛ 对同一个key下的所有value值求总和或者平均值,我们用reduceByKey ☛ 对同一个key下的所有value进行排序,我们用groupByKey ☛ 对于一个key对应的value有很多数据的话,groupByKey可能发生OOM
foldByKey
combineByKey的实际使用案例
统计rdd中值为负的元素个数 1.初始化这个rdd val pairRDD=sc.parallelize[(String,Double)](Seq(("a",1),("a",-2),("b",2),("c",-4))) 2.声明一个LabelCounter类,这个类必须继承自Serializable class LabelCounter (var positiveNum:Long=0L,var negtiveNum:Long=0L) extends Serializable{ def +=(label:Double):LabelCounter={ if(label>=0){ positiveNum+=1L }else{ negtiveNum+=1L } this } def +=(other:LabelCounter):LabelCounter={ positiveNum+=other.positiveNum negtiveNum+=other.positiveNum this } } 3.定义combineByKey的三个方法参数 def createCombiner=(label:Double)=>new LabelCounter()+=label //每当遇到新Key调用这个方法 def mergeValue=(c:LabelCounter,label:Double)=>c+=label //当在同一个分区遇到相同的Key,则调用此方法 def mergeCombiner=(c1:LabelCounter,c2:LabelCounter)=>c1+= c2 //对分区相同的Key进行合并计算
combineByKey的重载版本
第四个参数为Partitioner 如果这个参数不传入,那么会存在以下三种情况 1: 如果父亲RDD有partitioner的话则取父亲的partitioner 2: 父亲RDD没有的话,则取分区数为配置spark.default.parallelism的HashPartitioner --conf spark.default.parallelism=4 3: 如果没有配置上面的配置,则分区数取spark应用所有executor的所有core数量的HashPartitioner
cogroup API
cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合. 与reduceByKey的区别,reduceByKey是一个RDD进行聚合,而cogroup是两个rdd
groupWith
和cogroup一样的功能,是它的别名
join
rightOuterJoin
leftOuterJoin
fullOuterJoin
subtractByKey
rdd.subtractByKey(otherRDD) 用rdd中的Key减掉otherRDD的结果返回
采样API
sample
sample(withReplacement: boolean, fraction: double, seed: Long) 第一个参数设置是否为有放回采样, 如为true则是有放回采样,采用泊松抽样算法实现. 为false表示无放回采样,采用伯努利抽样算法实现
takeSample
从所有分区里采样我所需要的样本 takeSample(false, 5)
randomSplit
把每个分区按照权重划分至分区
分层采样
分层采样:将数据根据不同的特征组成不同的组,然后按特定条件从不同的组中获取样本并重新组成新的数组 对于一个键值RDD, key用于分类,value可以是任意的值。 然后我们通过fractions参数定义分类条件和采样几率, 因此fractions参数定义成一个Map[K, Double]类型,其中key是键值的分层条件,Double是 满足条件的key条件的采样比例
sampleByKey方法
并不对过滤全量数据,因此只得到近似值
sampleByKeyExact
会对全量数据做采样计算,因此耗费大量的计算资源,但是结果会更准确。
1: spark sql将Dataset的api翻译成RDD api来达到计算目的 2: spark ml是利用Dataset的api和RDD api来达到计算目的 3: spark mllib是利用RDD api来达到计算目的 4: spark Streaming将DStream的api翻译成RDD api来 达到计算目的 5: spark graphx是利用RDD api以及扩展RDD来达到计算目的

Spark Sql (Dataset)
概述
产生的目的
主要解决结构化数据处理的问题以达到关系型数据处理 使得sql使用者可以很简单调用spark中复杂的算法包,比如机器学习等算法
什么是结构化数据?
定义:结构化数据就是带有schema的数据 
带上schema的好处
1: 让编程者可以用sql语句来处理数据 2: 让编程者可以利用schema信息来优化数据的存储 3: 使的spark sql可以根据schema信息对数据处理进行优化,比如code generation
所支持的查询方式
sql形式
DataFrame api code的形式
示例
有一个json文件,名称为person.json, 里面的数据如下: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} 分别用sql的形式和DataFrame api的形式来获取年龄大于21岁的人,如右: import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("Spark SQL basic example").getOrCreate() val df = spark.read.json("hdfs://master:9999/users/hadoop-twq/person.json") //用sql的方式 df.createOrReplaceTempView(”person") val sqlDF = spark.sql("SELECT age, name FROM person where age > 21") sqlDF.show() //用dataFrame api的方式 val filterAgeDf = df.filter($"age" > 21) filterAgeDf.show()
大数据应用的特点
1: 需要从各种各样的数据源抽取转换数据
Spark sql通过catalyst解决多数据源抽取数据的问题
catalyst定义扩展
数据源
很简单就可以实现半结构化数据的schema的推测,比如JSON
可以使spark sql联合查询多个数据源,然后处理数据
数据类型的自定义,使的很容易为机器学习领域中的向量自定义数据类型
2: 关系型数据的查询,比如用sql
3: 对查询出来的数据做复杂的机器学习或者图计算
4: 在实际情况中,一个应用中关系型数据的查询处理与复杂的程序算法一般都是结合起来使用的
DataFrame
1: 通过DataFrame的api使的关系型处理和程序性处理的集成更加紧密 2: 包含了一个扩展性很强的优化器-catalyst。可以控制code generation,还可以定义扩展点等
DataFrame的操作
屏蔽日志
代码方式
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
配置文件方式
在resours中添加log4j.properties文件,文件内容复制如下 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
子主题
子主题
DataSet
1: 强类型 2: 可以支持自定义强大的lambda函数 3: DataFrame是类型为Row的Dataset,即Dataset[Row] 4: 可以将Dataset理解成带有schema的RDD
Spark ml(Dataset-base api) Spark mllib(RDD-base api)
概念
spark机器学习的好处
1: 支持数据量非常大的机器学习,分布式计算 2: 一般的大规模数据的机器学习的算法都是迭代式计算,而spark初衷就是为了迭代式计算而设计的,可以将迭代需要的中间结果缓存在分布式内存中 3: spark活跃的开源社区,使的对机器学习库的贡献者也越来越多 4: 构建在spark之上的高层的库不止只有机器学习,同时还有spark streaming, spark sql,spark graphx等库,使的这些库之间的调用融合非常的简单
Spark Streaming (DStream)
概念
定义
对DStream实施map,filter等操作操作,会转换成另外一个DStream,也就是说DStream也有依赖关系的,DStream是一组连续的RDD序列,实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD
特点
Streaming实时接收到的数据是存储在spark的分布式内存中 streaming的处理流程和RDD批处理非常类似,只是数据的输入形式不一样而已 streaming的底层数据的计算最终还是调用RDD的api来实现的 Streaming除了支持处理每隔一段时间的实时数据,还支持处理每隔一段时间一个window的数据
容错
某个节点失败的恢复机制 某个节点计算很慢的恢复机制
replication
即每一个计算节点都有两份拷贝(数据和计算逻辑)且两个节点之间需要保证接收到的消息的顺序是一致的 特点: 虽然恢复很快,但成本太高
upstream backup
即每一个节点保留接收到数据的一份拷贝当这个节点失败的时候,一个备用机器来代替失败的节点,集群将失败节点上的数据发送给备用机器进行恢复 特点: 虽然降低了存储成本但恢复的时间太长
只能解决某个节点失败的容错,解决不了某个计算节点很慢的问题
spark streaming解决容错
RDD根据其依赖可以重新并行计算失败的节点的任务 可以利用spark的task调度过程中task推测机制来解决
Spark graphx
概念
定义
图(Graph)是用于表示对象之间关联关系的一种抽象数据结构,使用顶点(Vertex)和边(Edge)进行描述:顶点表示对象,边表示对象之间的关系。可抽象成用图描述的数据即为图数据。图计算,便是以图作为数据模型来表达问题并予以解决的这一过程。  
Vertex

Edge

简单代码示例
利用spark的工具spark-shell import org.apache.spark.graphx.VertexId import org.apache.spark.rdd.RDD val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("小明", "学生")), (7L, ("老王", "博士后")),(5L, ("老汤", "教授")), (2L, ("老李", "教授")))) import org.apache.spark.graphx.Edge val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "合作者"),Edge(5L, 3L, "指导者"),Edge(2L, 5L, "同事"), Edge(5L, 7L, "领导"))) val defaultUser = ("spark", "默认") import org.apache.spark.graphx.Graph val graph = Graph(users, relationships, defaultUser) // 看看博士后的有多少人 graph.vertices.filter { case (id, (name, pos)) => pos == "博士后" }.count // 源顶点id大于目标顶点id的数量 graph.edges.filter(e => e.srcId > e.dstId).count
EdgeTriplet
它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]] 
sql表示方法
SELECT src.id, dst.id, src.attr, e.attr, dst.attr FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst ON e.srcId = src.Id AND e.dstId = dst.Id
源代码
EdgeTriplet的源代码如下所示: class EdgeTriplet[VD, ED] extends Edge[ED] { //源顶点属性 var srcAttr: VD = _ // nullValue[VD] //目标顶点属性 var dstAttr: VD = _ // nullValue[VD] protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = { srcId = other.srcId dstId = other.dstId attr = other.attr this } EdgeTriplet类继承自Edge类,我们来看看这个父类: case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable Edge类中包含源顶点id,目标顶点id以及边的属性。所以从源代码中我们可以知道,triplets既包含了边属性也包含了源顶点的id和属性、目标顶点的id和属性。
简单使用代码
val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
图数据分块
Edge cut:

优点
保证了节点均匀分布在整个集群中
缺点
1: 数据存储量很大 2: 计算节点与计算节点之间的通讯次数会变多
Vertex cut:
目前顶点切割使用比较多 
优点
1: 数据存储量不大 2: 计算节点与计算节点之间的通讯次数会相对不多
缺点
会引发数据同步的问题