导图社区 MapReduce框架原理、Hadoop序列化知识点笔记
MapReduce框架原理 数据切片 CombineTextInputFormat切片机制 Shuffle机制 Partition分区 WritableComparable排序 Combiner合并 OutputFormat Reduce Join 由于Java的序列化是一个.
编辑于2022-11-07 10:18:09 广东MapReduce框架原理、Hadoop序列化知识点笔记
MapReduce概述
MapReduce定义
MapReduce是分布式运算程序的编程框架
核心功能是将用户编写的业务逻辑代码和自带默认组建整合成一个完整的分布式运算程序
MapReduce特点
优点
易于编程,利用接口
基于HDFS良好的扩展性
高容错性
适合大数据的离线处理
缺点
不擅长实时计算
不擅长流式计算
Spark是MapReduce的改进版,它可以进行流计算
不擅长DAG(有向图)计算
MapReduce核心编程思想
总思维导图
整个过程切分为Map(切片,(K,V)对应工作),和Reduce(计算统计阶段)
MapReduce进程
MRAppMaster
负责程序过程调度和状态协调
MapTask
负责Map阶段的整个数据处理流程
ReduceTask
负责Reduce阶段的整个数据处理流程
WordCount基本三阶段
Map阶段
继承父类Mapper规定泛型<K,V>
重写map方法,规定入参格式
利用write方法写出排序归类后的<K,V>
Reducer阶段
继承父类Reduer规定泛型<K,V>
重写reduce方法,规定入参格式(其中有迭代器用作计数)
利用write方法写出统计后的<K,V>
Driver阶段
统合架构,启动MapReduce的工具
获取实例
设置驱动路径
设置Mapper和Reducer的路径
设置输出类型
MapOutput的类型和最终输出的类型
设置输入输出路径
提交任务
Hadoop序列化
为何不用Java的序列化
Java序列化附带其他信息,内容较多,不便于传输
自定义对象实现序列化接口
写好自定义对象的Bean文件
Fields、Getter&Setter、toString,
序列化方法(write)
入参DataOutput out
out.writexxx
反序列化方法(readFields)
入参 DataInput in
in.readxxx
Mapper类
set k值,set v值,v值不止一个,则多设置
Reducer类
若v值不只1个,则迭代器中累加多个值
封装时将两个v值都封装入Bean中
Driver类
MapReduce框架原理
总思维导图
MapReduce总流程概述
MapReduce在整个Hadoop过程中负责计算部分,它分为两个部分,MapTask和ReduceTask,其中Shuffle介于其间,它的整体运作过程是,HDFS中的数据通过RecordReader,从输入的InputSplit中产生Key/Value,写入环形缓冲区,这里依据keyhash算法算出了各个Split的Partition,与此同时依照k进行排序,当环形缓冲区达到80%后,开始Spill(溢写)(此处可进行combiner),当整个文件Spill结束后,进行归并(Merge)并对同样的分区(Partition)依照K进行再排序(此处可进行combiner),最终将数据写到磁盘,Reduce方法开始,以K分组,对同组K中V进行计算,利用Reduce方法写出。
Map阶段
InputFormat数据输入(MapTask)
数据切片
切片的实质
HDFS的BLK是物理数据切块
MapTask的数据切片是逻辑分片,不涉及物理
如何控制切片大小
默认等于blocksize
如何判断是否切片
针对每一个文件单独切片,若剩下的部分是否大于Blocksize的1.1倍(设定这个数字防止切片过小)
汇总切片信息到切片规划文件
CombineTextInputFormat切片机制
目的:防止大量小切片产生大量MapTask降低效率
应用:Driver中进行设定
CombineTextInputFormat.setMaxInputSplitSize(job, 2000000);
数据读取
FileInputFormat实现类
类型
TextInputFormat(逐行读取的String类型读取格式)
KeyValueTextInputFormat(键值对读取)
NLineInputFormat(多行读取,指定行数为N)
自定义InputFormat(根据需求,应用举例时我们用小文件举例)
应用:在Driver中进行设定
通过Configuration属性的set方法设定切割符
通过job实例的方法setInputFormatClass设置输入格式
自定义InputFormat的应用
需求:将多个小文件合并成一个SequenceFile
步骤
自定义类继承FileInputFormat
重写isSplitable方法
重写createRecordReader方法
返回一个new RecordReader对象
自定义RecordReader类,继承RecordReader
作用:将记录文档读出作为<k,v>值
initialize
作用:开流
FileSplit fs
FSDataInputStream fis
nextKeyValue
getCurrentKey
getCurrentValue
getProgress
MapTask工作机制
总思维导图
Read阶段
MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
Map阶段
该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
Collect收集阶段
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中
Spill阶段
即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
步骤
利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
Combine
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
Shuffle机制
总思维导图
Partition分区
默认Partitioner分区
默认Partitioner分区为key的hashCode和Integer.max_value进行与运算,并以numReduceTasks取余
自定义Partitioner
自定义类继承Patitioner
重写getPartition方法
Driver
setPartitionerClass()-设置自定义的Partitioner类
setNumReduceTasks()-设定相应数量的ReduceTasks
总结
分区号从0开始
如果分区数大于ReduceTasks数,会产生空文件
如果ReduceTasks数小于分区数,则溢出
WritableComparable排序
默认依靠字典顺序快排
排序类型
部分排序
全排序
只设置一个ReduceTask,处理大型文件效率很低
输出结果只有一个文件,文件内部有序
辅助排序(GroupingComparator)
在Reduce端对key分组
二次排序
自定义排序中compareTo判断条件为两个时为二次排序
自定义WritableComparable
全排序
如果是bean对象,则在该类中实现(implements)WritableComparable接口,并在此接口中重写compareTo方法
区内排序(Partition内)
实操需求:不同省份输出不同的手机号,在不同的省份内又用流量内部排序
实操步骤
自定义Partition,重写getPartition方法
Driver类中
setPartitionerClass
setNumReduceTasks
GroupingComparator
需求:求每个订单中最贵的商品
实现步骤
Bean类实现WritableComparable接口
Fields,Method,Getter & Setter
compareTo
Comparator类
super(Bean.class,true)
Driver类,设置reduce的分组
setGroupingComparatorClass
Combiner
相当于Map阶段的Reduce
自定义Combiner实现的步骤
继承Reducer,重写reduce方法
使用一个count计数,遍历相同key的值进行累加
int count = 0;
for(IntWritable v :values){
count += v.get();
}
写出
context.write(key, new IntWritable(count));
Driver类中setCombinerClass
Reduce阶段
ReduceTask工作机制
总思维导图
Copy阶段
ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
Merge阶段
在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
Sort阶段
各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
Reduce阶段
reduce()函数将计算结果写到HDFS上。
OutputFormat
目的:控制最终文件的输出路径和输出格式
自定义OutputFormat
详细参考MapReduce代码文件
MapTask与ReduceTask小结
MapTask与ReduceTask的数量之谜
MapTask究竟有多少个
默认MapTask如何决定
如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = total_size / block_size;
如何设置MapTask个数
1.如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
2.如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
3.如果输入中有很多小文件,依然想减少map个数,则需要将小文件merge为大文件,然后使用准则2。
ReduceTask究竟设置多少个
默认ReduceTask如何决定
直接默认了1
如何设置ReduceTask值
job.setNumReduceTasks()
如何设置ReduceTask才合适
Hadoop Documents
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).
通过可用节点数,和每个节点的最大的Containers的数量乘积得出一个确定值,在此定值的0.95~1.75倍游动
Join机制
Reduce Join
实现思路(主要依靠reduce方法)
Map端负责以连接字段做key,将需要的内容作为value,进行输出
Reduce端负责计算合并
缺点
Map资源利用率不高,只进行分类工作
Reduce阶段压力过大,且容易产生数据倾斜
解决方式
Map Join
Map Join
使用场景
Map Join适用于一张表十分小、一张表很大的场景。
思路
采用DistributedCache
在Mapper的setup阶段,将文件读取到缓存集合中
在驱动函数中加载缓存
计数器应用与数据清洗(ETL)
数据清洗过程中,利用计数器记录各种数据