导图社区 互联网-大数据-Spark
spark 知识整理,持续更新,本人自学的大数据,目前在BAT做大数据开发工程师。
编辑于2020-05-13 17:56:16Spark
基础
概述
Spark 是一个快速(基于内存), 通用, 可扩展的集群计算引擎
历史
2009 年,诞生
2010 年, 开源
2013 年, 进入 Apache
2014 年,成为顶级项目
特点
快速
Spark 实现了高效的 DAG 执行引擎, 可以通过基于内存来高效处理数据流。
易用
支持 Scala, Java, Python, R 和 SQL。 支持交互式的 Python 和 Scala 的 shell。
通用
Spark 提供了大量的类库, 包括 SQL 和 DataFrames, 机器学习(MLlib), 图计算(GraphicX), 实时流处理(Spark Streaming)。
可融合性
Spark 可以使用 Hadoop 的 YARN 和 Appache Mesos 作为它的资源管理和调度器, 并且可以处理所有 Hadoop 支持的数据, 包括 HDFS, HBase等。
内置模块
Cluster Manager
Hadoop YARN(在国内使用最广泛)。 Apache Mesos(国内使用较少, 国外使用较多)。 Standalone(Spark 自带的资源调度器, 需要在集群中的每台节点上配置 Spark)。 Kubernetes(用于管理云平台中多个主机上的容器化的应用)。
SparkCore
实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 SparkCore 中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。
Spark SQL
是 Spark 用来操作结构化数据的程序包。 通过SparkSql,我们可以使用 SQL或者Apache Hive 版本的 SQL 方言(HQL)来查询数据。 Spark SQL 支持多种数据源,比如 Hive 表、Parquet 以及 JSON 等。
Spark Streaming
是 Spark 提供的对实时数据进行流式计算的组件。 提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
Spark MLlib
提供常见的机器学习 (ML) 功能的程序库。 包含分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。
集群角色
Master
Spark 特有资源调度系统的 Leader。掌管着整个集群的资源信息,类似于 Yarn 框架中的 ResourceManager。
监听 Worker 健康状态
Master 对 Worker、Application 等的管理
接收 Worker 的注册并管理所有的Worker,接收 Client 提交的 Application,调度等待的 Application 并向Worker 提交。
Worker
Spark 特有资源调度系统的 Slave,有多个。每个 Slave 掌管着所在节点的资源信息,类似于 Yarn 框架中的 NodeManager。
通过 RegisterWorker 注册到 Master
定时发送心跳给 Master
根据 Master 发送的 Application 配置进程环境
启动 ExecutorBackend
执行 Task 所需的临时进程。
1. Master 和 Worker 是 Spark 的守护进程 2. Spark 在特定模式下正常运行所必须的进程
Driver
Spark 的驱动器是执行开发程序中的 main 方法的线程。 它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行。 如果用Spark Shell,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫作 sc 的SparkContext对象。 如果驱动器程序终止,那么Spark应用也就结束了。
将用户程序转化为作业(Job)
在 Executor 之间调度任务(Task)
跟踪 Executor 的执行情况
通过 UI 展示查询运行情况
Executor
Spark Executor是一个工作节点,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。 如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
负责运行组成 Spark 应用的任务并将状态信息返回给驱动器程序
通过自身的 Block Manager 为用户程序中要求缓存的 RDD 提供内存式存储
RDD 是直接缓存在Executor内的,因此任务可以在运行时充分利用缓存数据加速运算。
1. Driver 和 Executor 是临时程序 2. 当有具体任务提交到 Spark 集群才会开启的程序
运行模式
任务提交方式
spark-submit
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
--master 指定 master 的地址
local
local[K]
local[*]
yarn
spark://HOST:PORT
Spark standalone cluster。
mesos://HOST:PORT
--class 你的应用的启动类
--deploy-mode
是否发布你的驱动到 worker节点(cluster 模式) 或者作为一个本地客户端 (client 模式) (default: client)。
client 模式
Driver 程序运行在客户端,适用于交互、调试,希望立即看到 app 的输出。
cluster 模式
Driver 程序运行在由 RM(ResourceManager)启动的 AM(AplicationMaster)上, 适用于生产环境。
--conf 任意的 Spark 配置属性
格式key=value,如果值包含空格,可以加引号"key=value"。
--executor-memory 指定每个executor可用内存
--total-executor-cores 指定所有executor使用的cpu核数
--executor-cores 表示每个executor使用的 cpu 的核数
application-jar 打包好的应用 jar,包含依赖
这个 URL 在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar。
application-arguments 传给main()方法的参数
Spark-shell
bin/spark-shell
wordcount 程序
核心概念介绍
driver program 驱动程序
每个 Spark 应用程序都包含一个驱动程序, 驱动程序负责把并行操作发布到集群上。 驱动程序包含 Spark 应用程序中的主函数, 定义了分布式数据集以应用在集群中。
SparkContext
驱动程序通过 SparkContext 对象来访问 Spark, SparkContext 对象相当于一个到 Spark 集群的连接。
sc
在 spark-shell 中, 会自动创建一个SparkContext对象, 并把这个对象命名为sc。
RDDs(Resilient Distributed Dataset) 弹性分布式数据集
一旦拥有了SparkContext对象, 就可以使用它来创建 RDD 了。 例如我们调用sc.textFile(...)来创建了一个 RDD, 表示文件中的每一行文本。
cluster managers 集群管理器
为了在一个 Spark 集群上运行计算, SparkContext对象可以连接到几种集群管理器。 集群管理器负责跨应用程序分配资源。
Standalone
Mesos
YARN
Kubernetes(k8s)
executor 执行器
SparkContext 对象一旦成功连接到集群管理器, 就可以获取到集群中每个节点上的执行器(executor)。 执行器是一个进程(进程名: ExecutorBackend, 运行在 Worker 上), 用来执行计算和为应用程序存储数据。 Spark 会发送应用程序代码(比如:jar包)到每个执行器。 SparkContext 对象发送任务到执行器开始执行程序。
任务运行模式
Local 模式
Local 模式就是指的只在一台计算机上来运行 Spark。
Standalone 模式
构建一个由 Master + Slave 构成的 Spark 集群,Spark 运行在集群中。 Standalone 是指只用 Spark 来搭建一个集群, 不需要借助其他的框架,是相对于 Yarn 和 Mesos 来说的。
HA 配置
由于 master 只有一个, 所以也有单点故障问题。
工作模式图解
Yarn 模式
Spark 客户端可以直接连接 Yarn,不需要额外构建 Spark 集群。 有 yarn-client 和 yarn-cluster 两种模式,主要区别在于:Driver 程序的运行节点不同。
Yarn 配置
spark-env.sh
# 注释掉如下内容: # SPARK_MASTER_HOST=hadoop201 # SPARK_MASTER_PORT=7077 # 添加上如下内容: YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
spark-default.conf
# spark 的日志服务 spark.yarn.historyServer.address=hadoop201:18080 spark.history.ui.port=18080
工作模式图解
Mesos 模式
Spark客户端直接连接 Mesos,不需要额外构建 Spark 集群。
运行模式对比
WordCount
Spark Core
RDD
概述
一个 RDD 可以简单的理解为一个分布式的元素集合。 RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换操作, 然后得到新的 RDD, 并不会对原 RDD 有任何的影响。 在 Spark 中, 所有的工作要么是创建 RDD, 要么是转换已经存在 RDD 成为新的 RDD, 要么在 RDD 上去执行一些操作来得到一些计算结果。 每个 RDD 被切分成多个分区(partition), 每个分区可能会在集群中不同的节点上进行计算。
定义
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象。 在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
主要属性
⼀组分区
多个分区,分区可以看成是数据集的基本组成单位。 对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。 用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。默认值就是程序所分配到的 CPU Coure 的数目。 每个分区的存储是由 BlockManager 实现的,每个分区都会被逻辑映射成 BlockManager 的一个 Block, 而这个 Block 会被一个 Task 负责计算。
RDD 上的⼀组依赖
与其他 RDD 之间的依赖关系。 RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时, Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算。
计算每⼀个数据分⽚的函数
计算每个切片(分区)的函数。 Spark 中 RDD 的计算是以分片为单位的, 每个 RDD 都会实现 compute 函数以达到这个目的。
可选,⼀组 Preferred location 信息
存储每个切片优先(preferred location)位置的列表 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置。 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。
可选,对于键值对 RDD,有⼀个 Partitioner
对存储键值对的 RDD, 还有一个可选的分区器。 只有对于 key-value 的 RDD, 才会有 Partitioner, 非 key-value 的 RDD 的 Partitioner 的值是 None。 Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量。
特点
弹性
存储的弹性:内存与磁盘的自动切换
容错的弹性:数据丢失可以自动恢复
计算的弹性:计算出错重试机制
分片的弹性:可根据需要重新分片
分区
RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。 如果 RDD 是通过已有的文件系统构建,则 compute 函数是读取指定文件系统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute 函数是执行转换逻辑将其他 RDD 的数据进行转换。
只读
RDD 是只读的,要想改变 RDD 中的数据,只能在现有 RDD 基础上创建新的 RDD。 由一个 RDD 转换到另一个 RDD,可以通过丰富的转换算子实现,不再像 MapReduce 那样只能写 map 和 reduce 了。
缓存
如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据血缘关系得到分区的数据。 在后续其他地方用到该 RDD 的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
依赖(血缘)
RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。
窄依赖
窄依赖时, 子 RDD 中的分区要么只依赖一个父 RDD 中的一个分区(比如map, filter操作), 要么在设计时候就能确定子 RDD 是父 RDD 的一个子集(比如: coalesce)。 所以窄依赖的转换可以在任何的的一个分区上单独执行, 而不需要其他分区的任何信息。
父 RDD 的每个分区最多被一个 RDD 的分区使用
宽依赖
宽依赖工作的时候, 不能随意在某些记录上运行, 而是需要使用特殊的方式(比如按照 key)来获取分区中的所有数据。 如在排序(sort)的时候, 数据必须被分区, 同样范围的 key 必须在同一个分区内。 具有宽依赖的 transformations 包括: sort, reduceByKey, groupByKey, join, 和调用rePartition函数的任何操作。
父 RDD 的每个分区被不止一个子 RDD 的分区依赖
依赖关系查看
血缘关系:rdd.toDebugString
依赖关系:rdd.dependencies
checkpoint
虽然 RDD 的血缘关系天然地可以实现容错,当 RDD 的某个分区数据计算失败或丢失,可以通过血缘关系重建。 但是对于长时间迭代型应用来说,随着迭代的进行,RDDs 之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。 为此,RDD 支持checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint 后的 RDD 不需要知道它的父 RDDs 了,它可以从 checkpoint 处拿到数据。
编程
编程模型
在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。 经过一系列的 transformations 定义 RDD 之后,就可以调用 actions 触发 RDD 的计算 action 可以是向应用程序返回结果( count, collect 等),或者是向存储系统保存数据( saveAsTextFile 等)。 在 Spark 中,只有遇到 action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。 要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker,Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。
创建方式
从集合中创建
sc.parallelize(collection)
sc.makeRDD(collection)
说明
一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了。 parallelize 和 makeRDD 还有一个重要的参数就是把数据集切分成的分区数。 Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数。 当调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数。而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区,对后续的调用优化很有帮助。
从外部存储创建
Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集。 可以是本地文件系统, HDFS, Cassandra, HBase, Amazon S3 等等。 Spark 支持文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat。
远程文件系统
本地文件系统
sc.textFile(path)
url 可以是远程文件系统文件, hdfs://..., s3n://...等等。 如果是使用的本地文件系统的路径, 则必须每个节点都要存在这个路径。 所有基于文件的方法, 都支持目录, 压缩文件, 和通配符(*)。例如: textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")。 textFile 还可以有第二个参数, 表示分区数。默认情况下, 每个块对应一个分区(对 HDFS 来说, 块大小默认是 128M),可以传递一个大于块数的分区数, 但是不能传递一个比块数小的分区数。
从其他 RDD 转换
就是通过 RDD 的各种转换算子来得到新的 RDD。
算子
transformation
用来将 RDD 进行转化,构建 RDD 的血缘关系。
Value 类型
map(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
flatMap(func)
glom()
groupBy(func)
filter(func)
sample(withReplacement, fraction, seed)
distinct([numTasks]))
coalesce(numPartitions)
repartition(numPartitions)
sortBy(func,[ascending], [numTasks])
pipe(command, [envVars])
union(otherDataset)
subtract (otherDataset)
intersection(otherDataset)
cartesian(otherDataset)
zip(otherDataset)
双 Value 类型交互
Key-Value 类型
大多数的 Spark 操作可以用在任意类型的 RDD 上, 但是有一些比较特殊的操作只能用在 key-value 类型的 RDD 上。 这些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分组(group), 聚集(aggregate)等。 在 Spark 中, 这些操作在包含对偶类型(Tuple2)的 RDD 上自动可用(通过隐式转换)。
partitionBy(Partitioner)
对 pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程。
reduceByKey(func, [numTasks])
在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的 value 聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。
groupByKey()
按照key进行分组。 基于当前的实现, groupByKey 必须在内存中持有所有的键值对,如果一个 key 有太多的 value, 则会导致内存溢出(OutOfMemoryError)。 所以这操作非常耗资源, 如果分组的目的是为了在每个 key 上执行聚合操作(比如: sum 和 average), 则应该使 用PairRDDFunctions.aggregateByKey 或者 PairRDDFunctions.reduceByKey, 因为他们有更好的性能(会先在分区进行预聚合)。
区别
reduceByKey:按照key进行聚合,在 shuffle 之前有 combine(预聚合)操作,返回结果是 RDD[k,v]。 groupByKey:按照 key 进行分组,直接进行 shuffle。 reduceByKey 比 groupByKey 性能更好,建议使用。但是需要注意是否会影响业务逻辑。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
使用给定的 combine 函数和一个初始化的 zero value, 对每个 key 的 value 进行聚合。 zeroValue 给每一个分区中的每一个 key 一个初始值。 seqOp 函数用于在每一个分区中用初始值逐步迭代 value。 combOp 函数用于合并每个分区中的结果。
foldByKey()
aggregateByKey() 的简化操作,seqop 和 combop 相同。
combineByKey[C](createCombiner,mergeValue,mergeCombiners)
针对每个 K, 将 V 进行合并成 C, 得到 RDD[(K,C)]。 createCombiner: combineByKey 会遍历分区中的每个 key-value 对,如果第一次碰到这个 key, 则调用createCombiner 函数,传入 value, 得到一个 C 类型的值(如果不是第一次碰到这个 key, 则不会调用这个方法)。 mergeValue: 如果不是第一个遇到这个 key, 则调用这个函数进行合并操作,分区内合并。 mergeCombiners: 跨分区合并相同的 key 的值 (C),跨分区合并。
sortByKey()
在一个(K,V)的 RDD 上调用, K必须实现 Ordered[K] 接口(或者有一个隐式值: Ordering[K]), 返回一个按照key进行排序的(K,V)的 RDD。
mapValues()
针对 (K,V) 形式的类型只对 V 进行操作。
join(otherDataset, [numTasks])
在类型为 (K,V) 和 (K,W) 的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的 (K,(V,W)) 的 RDD。 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key 进行组合。 也支持外连接: leftOuterJoin, rightOuterJoin, fullOuterJoin。
cogroup(otherDataset, [numTasks])
在类型为 (K,V) 和 (K,W) 的 RDD 上调用,返回一个 (K,(Iterable<V>,Iterable<W>)) 类型的 RDD。
案例
统计出每一个省份广告被点击次数的 TOP3
action
用来触发 RDD 进行计算,得到 RDD 的相关计算结果或者 保存 RDD 数据到文件系统中。
reduce(func)
通过func函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
collect()
以数组的形式返回 RDD 中的所有元素,所有的数据都会被拉到 driver 端, 所以要慎用。
count()
返回 RDD 中元素的个数。
take(n)
返回 RDD 中前 n 个元素组成的数组,take 的数据也会拉到 driver 端, 应该只对小数据集使用。
first()
返回 RDD 中的第一个元素,类似于take(1)。
takeOrdered(n, [ordering])
返回排序后的前 n 个元素, 默认是升序排列,数据也会拉到 driver 端。
aggregate(zeroValue)(seqOp, combOp)
aggregate 函数将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用 combine 函数将每个分区的结果和初始值 (zeroValue) 进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致。 zeroValue 分区内聚合和分区间聚合的时候各会使用一次。
fold()
折叠操作,aggregate() 的简化操作,seqop 和 combop 一样的时候,可以使用 fold()。
saveAsTextFile(path)
将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用toString 方法,将它装换为文件中的文本。
saveAsSequenceFile(path)
将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
saveAsObjectFile(path)
用于将 RDD 中的元素序列化成对象,存储到文件中。
countByKey()
针对(K,V)类型的 RDD,返回一个 (K,Int) 的 map,表示每一个 key 对应的元素个数。 可以用来查看数据是否倾斜。
foreach(func)
针对 RDD 中的每个元素都执行一次 func 每个函数是在 Executor 上执行的, 不是在 driver 端执行的。
函数与变量传递
我们进行 Spark 进行编程的时候, 初始化工作是在 driver端完成的, 而实际的运行程序是在executor端进行的,所以就涉及到了进程间的通讯, 数据是需要序列化的。
传递函数
传递变量
方案:传递局部变量而不是属性
方案:类实现序列化接口 Serializable
kryo 序列化框架
Spark 处于性能的考虑, 支持另外一种序列化机制: kryo (2.0开始支持),kryo 比较快和简洁(速度是Serializable的10倍)。 从2.0开始, Spark 内部已经在使用 kryo 序列化机制: 当 RDD 在 Shuffle 数据的时候, 简单数据类型, 简单数据类型的数组和字符串类型已经在使用 kryo 来序列化。 有一点需要注意的是: 即使使用 kryo 序列化, 也要继承 Serializable 接口。
Job
调度
一个 Spark 应用包含一个驱动进程(driver process, 在这个进程中写 spark 的逻辑代码)和多个执行器进程(executor process, 跨越集群中的多个节点)。 Spark 程序自己是运行在驱动节点, 然后发送指令到执行器节点。 一个 Spark 集群可以同时运行多个 Spark 应用, 这些应用是由集群管理器(cluster manager)来调度。 Spark 应用可以并发的运行多个 job。job 对应着给定的应用内的在 RDD 上的每个 action 操作。
划分
针对每个 action, Spark 调度器就创建一个执行图(execution graph)和启动一个 Spark job。
应用
一个 Spark 应用可以包含多个 Spark job, Spark job 是在驱动程序中由 SparkContext 来定义的。 当启动一个 SparkContext 的时候, 就开启了一个 Spark 应用。 一个驱动程序被启动了, 多个执行器在集群中的多个工作节点(worker nodes)也被启动了。 一个执行器就是一个 JVM, 一个执行器不能跨越多个节点, 但是一个节点可以包括多个执行器。 一个 RDD 会跨多个执行器被并行计算. 每个执行器可以有这个 RDD 的多个分区, 但是一个分区不能跨越多个执行器。
DAG
Spark 的顶层调度层使用 RDD 的依赖为每个 job 创建一个由 stages 组成的 DAG(有向无环图),在 Spark API 中, 这被称作 DAG 调度器(DAG Scheduler)。 有些错误, 比如: 连接集群的错误, 配置参数错误, 启动一个 Spark job 的错误, 这些错误必须处理, 并且都表现为 DAG Scheduler 错误,这是因为一个 Spark job 的执行是被 DAG 来处理。 DAG 为每个 job 构建一个 stages 组成的图表, 从而确定运行每个 task 的位置, 然后传递这些信息给 TaskSheduler,TaskSheduler 负责在集群中运行任务。
Jobs
Spark job 处于 Spark 执行层级结构中的最高层,每个 Spark job 对应一个 action, 每个 action 被 Spark 应用中的驱动所程序调用。 可以把 Action 理解成把数据从 RDD 的数据带到其他存储系统的组件(通常是带到驱动程序所在的位置或者写到稳定的存储系统中)。 只要一个 action 被调用, Spark 就不会再向这个 job 增加新的东西。
stages
RDD 的转换是懒执行的, 直到调用一个 action 才开始执行 RDD 的转换。 一个 job 是由调用一个 action 来定义的,一个 action 可能会包含一个或多个转换( transformation ), Spark 根据宽依赖把 job 分解成 stage。 从整体来看, 一个 stage 可以任务是“计算(task)”的集合, 这些每个“计算”在各自的 Executor 中进行运算, 而不需要同其他的执行器或者驱动进行网络通讯,数据只需要传递一次, 每个执行器就可以顺序的执行这些操作。 当任何两个 workers 之间开始需要网络通讯的时候, 这时候一个新的 stage 就产生了, 例如: shuffle 的时候。 这些创建 stage 边界的依赖称为 ShuffleDependencies,shuffle 是由宽依赖所引起的, 比如: sort, groupBy, 因为他们需要在分区中重新分发数据,那些窄依赖的转换会被分到同一个 stage 中。 因为边界 stage 需要同驱动进行通讯, 所以与 job 有关的 stage 通常必须顺序执行而不能并行执行。
Tasks
stage 由 tasks 组成. 在执行层级中, task 是最小的执行单位,每一个 task 表现为一个本地计算,在一个 executor 上完成一个特定的事情。 一个 stage 中的所有 tasks 会对不同的数据执行相同的代码(程序代码一样, 只是作用在了不同的数据上)。 一个 task 不能被多个执行器来执行, 但是, 每个执行器会动态的分配多个 slots 来执行 tasks, 并且在整个生命周期内会并行的运行多个 task。 每个 stage 的 task 的数量对应着分区的数量, 即每个 Partition 都被分配一个 Task。
持久化
Spark 一个重要能力就是可以持久化数据集在内存中。 当我们持久化一个 RDD 时, 每个节点都会存储他在内存中计算的那些分区, 然后在其他的 action 中可以重用这些数据。 Spark 也会自动的对一些 shuffle 操作的中间数据做持久化操作(比如: reduceByKey)。这样做的目的是为了当一个节点 shuffle 失败了避免重新计算整个输入。 在实际使用的时候, 如果想重用数据, 仍然建议调用 persist() 或 cache()。
背景
每碰到一个 Action 就会产生一个 job, 每个 job 开始计算的时候总是从这个 job 最开始的 RDD 开始计算。 rdd记录了整个计算过程. 如果计算的过程中出现哪个分区的数据损坏或丢失, 则可以从头开始计算来达到容错的目的。 可以使用方法persist()或者cache()来持久化一个 RDD。在第一个 action 会计算这个 RDD, 然后把结果的存储到他的节点的内存中。 Spark 的 Cache 也是容错: 如果 RDD 的任何一个分区的数据丢失了, Spark 会自动的重新计算。 RDD 的各个 Partition 是相对独立的, 因此只需要计算丢失的部分即可, 并不需要重算全部 Partition。
方法
cache()
cache() 方法是使用默认存储级别(StorageLevel.MEMORY_ONLY)的简写方法。cache() 等价于 persist(StorageLevel.MEMORY_ONLY)。
persist(StorageLevel)
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER (Java and Scala)
MEMORY_AND_DISK_SER (Java and Scala)
DISK_ONLY
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
OFF_HEAP (experimental)
Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
检查点
Spark 中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过 Lineage 做容错的辅助。 Lineage 过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的 RDD 开始重做 Lineage,就会减少开销。 检查点通过将数据写入到 HDFS 文件系统实现了 RDD 的检查点功能。 为当前 RDD 设置检查点。该函数将会创建一个二进制的文件,并存储到 checkpoint 目录中,该目录是用 SparkContext.setCheckpointDir()设置的。 在 checkpoint 的过程中,该RDD 的所有依赖于父 RDD中 的信息将全部被移除。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发, 在触发的时候需要对这个 RDD 重新计算。
sc.setCheckpointDir(path)
如果spark运行在集群上, 则路径必须是 hdfs 目录
区别
持久化只是将数据保存在 BlockManager 中,而 RDD 的 Lineage 是不变的。但是checkpoint 执行完后,RDD 已经没有之前所谓的依赖 RDD 了,而只有一个强行为其设置的checkpointRDD,RDD 的 Lineage 改变了。 持久化的数据丢失可能性更大,磁盘、内存都可能会存在数据丢失的情况。但是 checkpoint 的数据通常是存储在如 HDFS 等容错、高可用的文件系统,数据丢失可能性较小。 默认情况下,如果某个 RDD 没有持久化,但是设置了checkpoint,会存在问题。本来这个 job 都执行结束了,但是由于中间 RDD 没有持久化,checkpoint job 想要将 RDD 的数据写入外部文件系统的话,需要全部重新计算一次,再将计算出来的 RDD 数据 checkpoint到外部文件系统。 所以,建议对 checkpoint()的 RDD 使用持久化, 这样 RDD 只需要计算一次就可以了。
分区
对于只存储 value的 RDD, 不需要分区器,只有存储 Key-Value 类型的才会需要分区器。 Spark 目前支持 Hash 分区和 Range 分区,用户也可以自定义分区。 Hash 分区为当前的默认分区,Spark 中分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 过程属于哪个分区和 Reduce 的个数。
查看分区:rdd.partitioner
使用分区:rdd.partitionBy(new HashPartitioner(num))
HashPartitioner
对于给定的 key,计算其 hashCode,并除以分区的个数取余,如果余数小于 0,则用余数+分区的个数(否则加0),最后返回的值就是这个 key 所属的分区 ID。 弊端: 可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有 RDD 的全部数据。
RangePartitioner
将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。 简单的说就是将一定范围内的数映射到某一个分区内。实现过程为: 第一步:先从整个 RDD 中抽取出样本数据,将样本数据排序,计算出每个分区的最大 key 值,形成一个 Array[KEY] 类型的数组变量 rangeBounds(边界数组); 第二步:判断 key 在 rangeBounds 中所处的范围,给出该 key 值在下一个 RDD 中的分区 id 下标;该分区器要求 RDD 中的 KEY 类型必须是可以排序的。 比如[1,100,200,300,400],然后对比传进来的key,返回对应的分区 id。
自定义分区器
继承类:org.apache.spark.Partitioner
实现方法
numPartitions:返回分区数, 必须要大于0
getPartition(key):返回指定键的分区编号(0到numPartitions-1)
equals:Java 判断相等性的标准方法
hashCode:若覆盖了equals, 则也应该覆写这个方法
案例
数据读取和保存
读写 Text 文件
读取 Json 文件
如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。 注意:使用 RDD 读取 JSON 文件处理很复杂,同时 SparkSQL 集成了很好的处理 JSON 文件的方式,所以实际应用中多是采用 SparkSQL 处理 JSON 文件。
读写 SequenceFile 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。 Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[ keyClass, valueClass](path)。 注意:SequenceFile 文件只针对 PairRDD。
读写 objectFile 文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。 可以通过 objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。
读写 HDFS 文件
Spark 的整个生态系统与 Hadoop 完全兼容的,所以对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样支持。 另外,由于 Hadoop 的 API 有新旧两个版本,所以 Spark 为了能够兼容 Hadoop 所有的版本,也提供了两套创建操作接口。 对于外部存储创建操作而言,HadoopRDD 和 newHadoopRDD 是最为抽象的两个函数接口。 其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本。例如, 对于 textFile 而言,只有 path 这个指定文件路径的参数, 其他参数在系统内部指定了默认值。 在 Hadoop 中以压缩形式存储的数据, 不需要指定解压方式就能够进行读取, 因为 Hadoop 本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压。 如果用 Spark 从 Hadoop 中读取某种类型的数据不知道怎么读取的时候, 上网查找一个使用 map-reduce 的时候是怎么读取这种这种数据的, 然后再将对应的读取方式改写成上面的 hadoopRDD 和 newAPIHadoopRDD 两个类就行了。
hadoopRDD
textFile()
hadoopFile()
hadoopRDD()
sequenceFile()
objectFile()
saveAsTextFile()
saveAsHadoopFile()
只能将RDD存到HDFS中
saveAsHadoopDataset()
参数类型是 JobConf, 所以其不仅能够将 RDD 存储到 HDFS 中, 也可以将 RDD 存储到其他数据库中, 如Hbase, MangoDB, Cassandra 等。
newHadoopRDD
newAPIHadoopFile()
saveAsNewAPIHadoopFile()
saveAsNewAPIHadoopDataset()
比较灵活,新版的 API 没有 codec 的参数, 所以要压缩存储文件到 HDFS 中,需要使用 hadoopConfiguration 参数, 设置对应 mapreduce.map.output.compress.codec 参数和 mapreduce.map.output.compress 参数。
主要参数
输入格式
制定数据输入的类型,如 TextInputFormat 等,新旧两个版本所引用的版本分别是 org.apache.hadoop.mapred.InputFormat 和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)。
键类型
指定 [K,V] 键值对中 K 的类型。
值类型
指定 [K,V] 键值对中 V 的类型。
分区值
指定由外部存储生成的 RDD 的 partition 数量的最小值,如果没有指定,系统会使用默认值 defaultMinSplits。
读写 Mysql 数据文件
从 Mysql 读取数据
向 Mysql 写入数据
读写 Hbase 文件
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问 HBase。 这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase.client.Result。
从 HBase 读取数据
向 HBase 写入数据
共享变量
传递给 Spark 算子(比如: map, reduce 等)的函数都是在远程的集群节点上执行, 函数中用到的所有变量都是独立的拷贝。 这些变量被拷贝到集群上的每个节点上, 都这些变量的更改不会传递回驱动程序。 支持跨 task 之间共享变量通常是低效的, 但是 Spark 对共享变量也提供了两种支持: 累加器和广播变量。
累加器(Accumulator)
累加器是一种变量, 仅仅支持“add”, 支持并发。累加器用于去实现计数器或者求和,Spark 内部已经支持数字类型的累加器, 开发者可以添加其他类型的支持.
内置累加器
在驱动程序中通过 sc.longAccumulator 得到 Long 类型的累加器, 还有 Double 类型的。 可以通过 value 来访问累加器的值(与sum等价),avg 得到平均值。 只能通过 add 来添加值。 累加器的更新操作最好放在 action 中, Spark 可以保证每个 task 只执行一次。如果放在 transformations 操作中则不能保证只更新一次,有可能会被重复执行。
自定义累加器
通过继承类 AccumulatorV2 来自定义累加器。
广播变量
广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy。 例如, 给每个节点一个比较大的输入数据集是一个比较高效的方法,Spark 也会用该对象的广播逻辑去分发广播变量来降低通讯的成本。 广播变量通过调用 SparkContext.broadcast(v) 来创建,广播变量是对 v 的包装, 通过调用广播变量的 value 方法可以访问。 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。任何可序列化的类型都可以这么实现。 通过 value 属性访问该对象的值(在Java中为value()方法)。 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
项目实战
Top10 热门品类
按照每个品类的 点击、下单、支付 的量来统计热门品类。
Top10 热门品类中每个品类的 Top10 活跃 Session 统计
对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。(注意: 这里我们只关注点击次数, 不关心下单和支付次数)。
页面单跳转化率统计
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。 比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
Spark Sql
概述
定义:Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块
特点
Integrated(易整合)
无缝的整合了 SQL 查询和 Spark 编程。
Uniform Data Access(统一的数据访问方式)
使用相同的方式连接不同的数据源。
Hive Integration(集成 Hive)
在已有的仓库上直接运行 SQL 或者 HiveQL。
Standard Connectivity(标准的连接方式)
通过 JDBC 或者 ODBC 来连接。
DataFrame
懒执行
分布式数据容器
定制化内存管理
优化的执行计划
性能比 RDD 要高
支持嵌套数据类型
提供高层的关系操作
记录数据的结构信息 schema
DataSet
支持编解码器
具有类型安全检查
具有 DataFrame 的查询优化特性
样例类被用来在 DataSet 中定义数据的结构信息
DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row]
编程
SparkSession
Spark 最新的 SQL 查询起始点
实质上是SQLContext和HiveContext的组合
内部封装了SparkContext 实际计算由其完成
DataFrame 进行编程
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。 DataFrame API 既有 transformation 操作也有 action 操作,DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API。
创建 DataFrame
With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
通过 Spark 的数据源创建
通过已知的 RDD 来创建
通过查询一个 Hive 表来创建
DataFrame 语法风格
SQL 语法风格(主要)
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。 临时视图只能在当前 Session 有效, 在新的 Session 中无效。
临时视图
全局视图
访问全局视图需要全路径: 如 global_temp.xxx。
DSL 语法风格(了解)
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL。 使用 DSL 语法风格不必去创建临时视图了。
RDD 和 DataFrame 的交互
从 RDD 到 DataFrame
涉及到 RDD, DataFrame, DataSet 之间的操作时, 需要导入: import spark.implicits._ 。 这里的 spark 不是包名, 而是表示 SparkSession 的那个对象,所以必须先创建 SparkSession 对象再导入, implicits 是一个内部 object。
手动转换
通过样例类反射转换(最常用)
通过 API 的方式转换(了解)
从 DataFrame到 RDD
DataSet 进行编程
DataSet 和 RDD 类似, 但是 DataSet 没有使用 Java 序列化或者 Kryo 序列化, 而是使用一种专门的编码器去序列化对象, 然后在网络上处理或者传输。 虽然编码器和标准序列化都负责将对象转换成字节,但编码器是动态生成的代码,使用的格式允许 Spark 执行许多操作,如过滤、排序和哈希,而无需将字节反序列化回对象。 DataSet 是具有强类型的数据集合,需要提供对应的类型信息。
创建DataSet
在实际使用的时候, 很少用到把序列转换成 DataSet, 更多的是通过 RDD 来得到 DataSet。
使用样例类的序列得到DataSet
使用基本类型的序列得到 DataSet
RDD 和 DataSet 的交互
从 RDD 到 DataSet
使用反射来推断包含特定类型对象的 RDD 的 schema 。 这种基于反射的方法可以生成更简洁的代码,并且当您在编写Spark应用程序时已经知道模式时,这种方法可以很好地工作。 为 Spark SQL 设计的 Scala API 可以自动的把包含样例类的 RDD 转换成 DataSet。 样例类定义了表结构: 样例类参数名通过反射被读到, 然后成为列名。样例类可以被嵌套, 也可以包含复杂类型: 像 Seq 或者 Array。
从 DataSet 到 RDD
DataFrame 和 DataSet 之间的交互
从 DataFrame 到 DataSet
从 DataSet 到 DataFrame
RDD,DataFrame 和 DataSet 之间的关系
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)。 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。 在后期的 Spark 版本中,DataSet会逐步取代RDD和DataFrame成为唯一的 API 接口。
三者的共性
分布式弹性数据集
惰性机制
具有 partition 的概念
具有有许多共同的函数
针对内存自动缓存运算
DataFrame 和 Dataset
拥有完全相同的成员函数
一般不与 spark mlib 同时使用
DataFrame 也可以叫 Dataset[Row]
相互转换大多需要 import spark.implicits._
均可使用模式匹配获取各个字段的值和类型
均支持 SparkSQL 的操作,能注册临时表/视窗,进行 sql 语句操作
支持一些特别方便的保存方式,比如保存成 csv,可以带上表头
三者的区别
RDD
不支持 sparksql 操作
一般和 spark mlib 同时使用
DataFrame
每一行的类型固定为 Row
列的值没法直接访问,只有通过解析才能获取各个字段的值
使用 getAS 方法或者模式匹配拿出特定字段
DataSet
DataFrame 是 DataSet 的一个特例
定义了 case class 之后可以很自由的获得每一行的信息
每一行是什么类型是不定的
DataFrame 也可以叫 Dataset[Row] (行固定) , Dataset 中每一行是什么类型是不定的
三者的互相转换
SparkSQL 程序
自定义 SparkSQL 函数
在 Shell 窗口中可以通过 spark.udf 功能用户可以自定义函数。
自定义 UDF 函数
用户自定会聚合函数
数据源
Spark SQL 的 DataFrame 接口支持操作多种数据源,一个 DataFrame 类型的对象可以像 RDD 那样操作(比如各种转换), 也可以用来创建临时表。 把 DataFrame 注册为一个临时表之后, 就可以在它的数据上面执行 SQL 查询。
通用加载和保存函数
文件保存选项(SaveMode)
保存操作可以使用 SaveMode, 用来指明如何处理数据. 使用mode()方法来设置。 这些 SaveMode 都是没有加锁的, 也不是原子操作, 如果你执行的是 Overwrite 操作, 在写入新的数据之前会先删除旧的数据。
SaveMode.Append 如果文件已经存在则追加
SaveMode.Overwrite 如果文件已经存在则覆盖
SaveMode.Ignore 如果文件已经存在则忽略
SaveMode.ErrorIfExists(default) 如果文件已经存在则抛出异常
spark.read.load 是加载数据的通用方法
df.write.save 是保存数据的通用方法
JSON 文件
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]。 可以通过 SparkSession.read.json() 去加载一个 JSON 文件。 也可以通过 SparkSession.read.format("json").load() 来加载。 这个 JSON 文件不是一个传统的 JSON 文件,每一行都得是一个完整的 JSON 串。
Parquet 文件
Parquet 是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。 Parquet 格式经常在 Hadoop 生态圈中被使用,它也支持 Spark SQL 的全部数据类型。 Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。 Parquet 格式的文件是 Spark 默认格式的数据源,当使用通用的方式时可以直接保存和读取,不需要使用format,spark.sql.sources.default 这个配置可以修改默认数据源。
JDBC 数据源
Spark SQL 也支持使用 JDBC 从其他的数据库中读取数据,JDBC 数据源比使用 JdbcRDD 更爽一些,这是因为返回的结果直接就是一个 DataFrame, DataFrame 更加容易被处理或者与其他的数据源进行 join。 Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。 注意: 如果想在 spark-shell 操作 jdbc, 需要把相关的 jdbc 驱动 copy 到 jars 目录下。
从 jdbc 读数据
使用 read.load 方法加载
使用 read.jdbc 方法加载
向 jdbc 写入数据
使用 write.save 方法写入
使用 write.jdbc 方法写入
Hive 数据库
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。 包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。 如果要在 Spark SQL 中包含 Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译 Spark SQL 时引入 Hive 支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。 若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。 即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。 如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
使用内嵌的 Hive
如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可,Hive 的元数据存储在 derby 中, 仓库地址: $SPARK_HOME/spark-warehouse。
使用外置的 Hive
Spark 要接管 Hive 需要把 hive-site.xml copy 到conf/目录下。 把 Mysql 的驱动 copy 到 jars/目录下。 如果访问不到hdfs, 则需要把 core-site.xml 和 hdfs-site.xml 拷贝到conf/目录下。
使用 spark-shell
启动: bin/spark-shell 查询: spark.sql("show tables").show
使用 spark-sql cli
在spark-shell执行 hive 方面的查询比较麻烦:spark.sql("").show。 Spark 专门给我们提供了书写 HiveQL 的工具: spark-sql cli。
使用 hiveserver2 + beeline
spark-sql 得到的结果不够友好, 所以可以使用 hiveserver2 + beeline。 启动 thrift服务器: sbin/start-thriftserver.sh --master yarn --hiveconf hive.server2.thrift.bind.host=hadoop201 --hiveconf hive.server2.thrift.port=10000 启动beeline客户端: bin/beeline 然后输入(按照提示输入用户名和密码): !connect jdbc:hive2://hadoop201:10000
在代码中访问 Hive
拷贝 hive-site.xml 到 resources 目录下。 添加依赖 spark-hive_2.11。 在开发工具中创建数据库默认是在本地仓库。 通过参数修改数据库仓库的地址: config("spark.sql.warehouse.dir", "hdfs://hadoop201:9000/user/hive/warehouse")
项目实战
需求:各区域热门商品 Top3
数据
思路:使用 sql 来完成. 碰到复杂的需求, 可以使用 udf 或 udaf
步骤
step1:查询出来所有的点击记录, 并与 city_info 表连接, 得到每个城市所在的地区
step2:与 Product_info 表连接得到产品名称
step3:按照地区和商品 id 分组, 统计出每个商品在每个地区的总点击次数
step4:每个地区内按照点击次数降序排列
step5:只取前三名,并把结果保存在数据库中
step6:城市备注需要自定义 UDAF 函数
代码
udaf 函数定义
具体实现
Spark Streaming
概述
定义
Spark Streaming 是 Spark 核心 API 的扩展, 用于构建弹性, 高吞吐量, 容错的在线数据流的流式处理程序,用于流式数据的处理。 数据可以来源于多种数据源: Kafka, Flume, Kinesis, 或者 TCP 套接字. 接收到的数据可以使用 Spark 的负责元语来处理, 尤其是那些高阶函数像: map, reduce, join, 和 window。 最终, 被处理的数据可以发布到 FS, 数据库或者在线 dashboards,另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。 在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。 批处理间隔是 Spark Streaming 的核心概念和关键参数,它决定了 Spark Streaming 提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。 Spark Streaming 提供了一个高级抽象: discretized stream(SStream), DStream 表示一个连续的数据流。 DStream 可以由来自数据源的输入数据流来创建, 也可以通过在其他的 DStream 上应用一些高阶操作来得到,在内部, 一个 DSteam 是由一个 RDD 序列来表示的。
特点
易用:通过高阶函数来构建应用
容错
易整合到 Spark 体系中
缺点:“微量批处理”架构延迟会相对高一些
架构
背压机制
根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率
通过属性 spark.streaming.backpressure.enabled 来控制是否启用 backpressure 机制
DStream 入门
编程
wordcount 案例
注意
一旦 StreamingContext 已经启动, 则不能再添加添加新的 streaming computations
一旦一个 StreamingContext 已经停止(StreamingContext.stop()), 他也不能再重启
在一个 JVM 内, 同一时间只能启动一个 StreamingContext
stop() 的方式停止 StreamingContext, 也会把 SparkContext 停掉
如果仅仅想停止 StreamingContext, 则应该这样: stop(false)
一个 SparkContext 可以重用去创建多个 StreamingContext 前提是以前 StreamingContext 停掉
wordcount 解析
一个 DSteam 用连续的一系列的 RDD 来表示
对 DStream 的任何操作都会转换成对他里面的 RDD 的操作
对这些 RDD 的转换是有 Spark 引擎来计算的,DStream 操作隐藏的大多数的细节
wordcount 数据流程解析
创建
Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。 每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。 此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。 例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用 local 或者 local[1]。
RDD 队列
可以通过使用 ssc.queueStream(queueOfRDDs) 来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。
自定义数据源
其实就是自定义接收器,需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
定义数据源
使用自定义数据源
Kafka 数据源
在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。 包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。 两个核心类:KafkaUtils、KafkaCluster。
高级 API
低级 API
转换
DStream 上的原语与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。
无状态转换操作
尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey() 会化简每个时间区间中的数据,但不会化简不同区间之间的数据。 举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。 无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键值对 DStream 拥有和 RDD 一样的与连接相关的转化操作,也就是 cogroup()、join()、leftOuterJoin() 等。 我们可以在 DStream 上使用这些操作,这样就对每个批次分别执行了对应的 RDD 操作。我们还可以像在常规的 Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来,也可以使用 StreamingContext.union() 来合并多个流。
transform操作
transform 原语允许 DStream上执行任意的RDD-to-RDD函数。 可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来。 该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
有状态转换操作
updateStateByKey
updateStateByKey 操作允许在使用新信息不断更新状态的同时能够保留他的状态。 需要做两件事情: 定义状态,状态可以是任意数据类型 定义状态更新函数,指定一个函数, 这个函数负责使用以前的状态和新值来更新状态 在每个阶段, Spark 都会在所有已经存在的 key 上使用状态更新函数, 而不管是否有新的数据在。
window 操作
Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。 默认情况下, 计算只对一个时间段内的 RDD 进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。 一个窗口可以包含多个时间段,基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。 窗口操作需要 2 个参数(这两个参数必须是源 DStream 的 interval 的倍数): 窗口长度 – 窗口的持久时间(执行一次持续多少个时间单位) 滑动步长 – 窗口操作被执行的间隔(每多少个时间单位执行一次)
reduceByKeyAndWindow(reduceFunc, windowDuration. slideDuration)
reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
比没有invReduceFunc高效,会利用旧值来进行计算。 invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了,第一个参数就是新的值, 第二个参数是旧的值。 即 ReduceFunc 是新的值进来需要的操作,invReduceFunc 是旧的值离开要进行的操作。
window(windowLength, slideInterval)
基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream。
countByWindow(windowLength, slideInterval)
返回一个滑动窗口计数流中的元素的个数。
countByValueAndWindow(windowLength, slideInterval, [numTasks])
对(K,V)对的DStream调用,返回 (K,Long) 对的新 DStream,其中每个 ke y的的对象的 v 是其在滑动窗口中频率。如上,可配置reduce任务数量。
输出
与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。 注意: 连接不能写在driver层面(序列化) 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失 增加 foreachPartition,在分区创建(获取)
print()
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)
编程进阶
累加器和广播变量
和 RDD 中的累加器和广播变量的用法完全一样。
DataFrame ans SQL Operations
你可以很容易地在流数据上使用 DataFrames 和 SQL。你必须使用 SparkContext 来创建 StreamingContext 要用的 SQLContext。 此外,这一过程可以在驱动失效后重启。我们通过创建一个实例化的 SQLContext 单实例来实现这个工作。
Caching / Persistence
和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在 DStream 上使用 persist() 方法将会自动把 DStreams 中的每个 RDD 保存在内存中。 当 DStream 中的数据要被多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像 reduceByWindow 和 reduceByKeyAndWindow 以及基于状态的 (updateStateByKey) 这种操作,保存是隐含默认的。 因此,即使开发者没有调用 persist(),由基于窗操作产生的 DStreams 会自动保存在内存中。
Structured Streaming
概述
spark2.0 新的流式计算模型
进一步降低了处理数据的延迟时间
实现了“有且仅有一次(Exectly Once)” 语义
Structured Streaming 基于 Spark SQl 引擎,具有弹性和容错的流式处理引擎
使用批处理计算静态数据(表中的数据)的方式相同,可以使用 Dataset/DataFrame API
通过 chekcpoin 和 WALs(Write-Ahead Logs), 系统保证 end-to-end exactly-once
Structured Streaming 查询使用微批处理引擎 (micro-batch processing engine) 处理
Spark2.3, 引入了一个新的低延迟处理模型 Continuous Processing, 延迟 1 毫秒
编程
wordcount
DataFrame lines 表示一个“无界表(unbounded table)”, 存储着流中所有的文本数据。这个无界表包含列名为 value 的一列数据, 数据的类型为 String, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row)。 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据。 紧接着我们把 DateFrame 通过 .as[String] 变成了 DataSet, 所以我们可以切割每行为多个单词,得到的 words DataSet 包含了所有的单词。 最后, 我们通过 value (每个唯一的单词)进行分组得到 wordCounts DataFrame, 并且统计每个单词的个数。注意, wordCounts 是一个流式 DataFrame, 它表示流中正在运行的单词数(the running word counts of the stream)。 我们必须在流式数据(streaming data)上启动查询,剩下的实际就是开始接收数据和计算个数。为此, 当数据更新的时候, 我们通过 outputMode("complete") 来打印完整的计数集到控制台, 然后通过.start来启动流式计算。 代码执行之后, 流式计算将会在后台启动。查询对象 (query: StreamingQuery) 可以激活流式查询 (streaming query), 然后通过 awaitTermination() 来等待查询的终止,从而阻止查询激活之后进程退出。
编程模型
Structured Streaming 的核心思想是: 把持续不断的流式数据当做一个不断追加的表。 这使得新的流式处理模型同批处理模型非常相像。我们可以表示我们的流式计算类似于作用在静态数表上的标准批处理查询, spark 在一个无界表上以增量查询的方式来运行。
基本概念
输入表:把输入数据流当做输入表(Input Table)
到达流中的每个数据项(data item)类似于被追加到输入表中的一行。
结果表:作用在输入表上的查询将会产生“结果表(Result Table)”
每个触发间隔(trigger interval, 例如 1s), 新行被追加到输入表, 最终会更新结果表。无论何时更新结果表, 我们都希望将更改的结果行写入到外部接收器(external sink)。
输出
Complete Mode:整个更新的结果表会被写入到外部存储
存储连接器负责决定如何处理整个表的写出(类似于 spark streaming 中的有转态的转换)。
Append Mode:从上次触发结束开始算起, 仅仅把那些新追加到结果表中的行写到外部存储
2.(类似于无状态的转换). 该模式仅适用于不会更改结果表中行的那些查询. (如果有聚合操作, 则必须添加 wartemark, 否则不支持此种模式)
Update Mode:从上次触发结束开始算起, 仅仅在结果表中更新的行会写入到外部存储
子主题