导图社区 Hadoop基础
小白必看《Hadoop基础》,包括:HDFS分布式文件系统、MapReduce并行计算模型、yarn资源调度平台 、hadoop概述。注释内含大量解释,无他唯手熟尔!
编辑于2021-11-10 15:49:58Hadoop基础
HDFS 分布式文件系统
HDFS 概述
HDFS 是 Hadoop Distribute File System 的简称,意为:Hadoop 分布式文 件系统。是 Hadoop 核心组件之一,作为最底层的分布式存储服务而存在
HDFS 三大组件
NameNode
NameNode: 1) NameNode 是 HDFS 的核心。 2) NameNode 也称为 Master。 c、 NameNode 仅存储 HDFS 的元数据:文件系统中所有文件的目录树,并跟踪整 个集群中的文件。 3) NameNode 不存储实际数据或数据集。数据本身实际存储在 DataNodes 中。 4) NameNode 知道 HDFS 中任何给定文件的块列表及其位置。使用此信息 NameNode 知道如何从块中构建文件。 5) NameNode 并不持久化存储每个文件中各个块所在的 DataNode 的位置信息, 这些信息会在系统启动时从数据节点重建。 6) NameNode 对于 HDFS 至关重要,当 NameNode 关闭时,HDFS / Hadoop 集群无 法访问。 7) NameNode 是 Hadoop 集群中的单点故障。 8) NameNode 所在机器通常会配置有大量内存(RAM)。
DataNode
DataNode: 1)DataNode 负责将实际数据存储在 HDFS 中。 2) DataNode 也称为 Slave。 c、 NameNode 和 DataNode 会保持不断通信。 3) DataNode 启动时,它将自己发布到 NameNode 并汇报自己负责持有的块列表。 4) 当某个 DataNode 关闭时,它不会影响数据或群集的可用性。NameNode 将安 排由其他 DataNode 管理的块进行副本复制。 f、 DataNode 所在机器通常配置有大量的硬盘空间。因为实际数据存储在 DataNode 中。 5) DataNode 会定期(dfs.heartbeat.interval 配置项配置,默认是 3 秒)向 NameNode 发送心跳,如果 NameNode 长时间没有接受到 DataNode 发送的心 跳, NameNode 就会认为该 DataNode 失效。 6) block 汇报时间间隔取参数 dfs.blockreport.intervalMsec,参数未配置的 话默认为 6 小时
SecondaryNamenode
Checkpoint 详细步骤: 1)NameNode 管理着元数据信息,其中有两类持久化元数据文件:edits 操作日志文件和fsimage 元数据镜像文件。新的操作日志不会立即与 fsimage 进行合并,也不会刷到NameNode 的内存中,而是会先写到 edits 中(因为合并需要消耗大量的资源),操作成功之后更新至内存。 2)有 dfs.namenode.checkpoint.period 和 dfs.namenode.checkpoint.txns 两个配置,只要达到这两个条件任何一个,secondarynamenode 就会执行 checkpoint 的操作。 3)当触发 checkpoint 操作时,NameNode 会生成一个新的 edits 即上图中的 edits.new 文件,同时 SecondaryNameNode 会将 edits 文件和 fsimage 复制到本地(HTTP GET 方式)。 4)secondarynamenode 将下载下来的 fsimage 载入到内存,然后一条一条地执行 edits 文件中的各项更新操作,使得内存中的 fsimage 保存最新,这个过程就是 edits 和 fsimage文件合并,生成一个新的 fsimage 文件即上图中的 Fsimage.ckpt 文件。 5)secondarynamenode 将新生成的 Fsimage.ckpt 文件复制到 NameNode 节点。 6)在 NameNode 节点的 edits.new 文件和 Fsimage.ckpt 文件会替换掉原来的 edits 文件和 fsimage 文件,至此刚好是一个轮回,即在 NameNode 中又是 edits 和 fsimage 文件。 7)等待下一次 checkpoint 触发 SecondaryNameNode 进行工作,一直这样循环操作。
HDFS 写数据流程
详细步骤解析: 1、client发起文件上传请求,通过RPC与NameNode建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,返回是否可以上传; 2、client请求第一个 block该传输到哪些DataNode服务器上; 3、NameNode根据配置文件中指定的备份数量及副本放置策略进行文件分配,返回可用的DataNode的地址,如:A,B,C; 4、client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,后逐级返回client; 5、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(默认64K),A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答。 6、数据被分割成一个个packet数据包在pipeline上依次传输,在pipeline反方向上,逐个发送ack(命令正确应答),最终由pipeline中第一个DataNode节点A将pipeline ack发送给client; 7、当一个block传输完成之后,client再次请求NameNode上传第二个block到服务器。
HDFS 读数据流程
详细步骤解析: 1、Client向NameNode发起RPC请求,来确定请求文件block所在的位置; 2、NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode都会返回含有该block副本的DataNode地址; 3、这些返回的DN地址,会按照集群拓扑结构得出DataNode与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离Client近的排靠前;心跳机制中超时汇报的DN状态为STALE,这样的排靠后; 4、Client选取排序靠前的DataNode来读取block,如果客户端本身就是DataNode,那么将从本地直接获取数据;底层上本质是建立Socket Stream(FSDataInputStream),重复的调用父类DataInputStream的read方法,直到这个块上的数据读取完毕; 5、当读完列表的block后,若文件读取还没有结束,客户端会继续向NameNode获取下一批的block列表; 6、读取完一个block都会进行checksum验证,如果读取DataNode时出现错误,客户端会通知NameNode,然后再从下一个拥有该block副本的DataNode继续读。 7、read方法是并行的读取block信息,不是一块一块的读取;NameNode只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据; 最终读取来所有的block会合并成一个完整的最终文件。
HDFS 其它功能
不同集群之间的数据复制
集群内部文件拷贝 scp
cd /export/softwares/ scp -r jdk-8u141-linux-x64.tar.gz root@node2:/export/
跨集群之间的数据拷贝 distcp
bin/hadoop distcp hdfs://node1:8020/jdk-8u141-linux-x64.tar.gz hdfs://cluster2:9000/
Archive 档案的使用
HDFS 安全模式
NameNode 主节点启动时,HDFS 首先进入安全模式,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求
手动进入安全模式: hdfs dfsadmin -safemode enter
手动退出安全模式: hdfs dfsadmin -safemode leave
MapReduce 并行计算模型
MapReduce 概述
MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业 务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在 Hadoop 集群上。
MapReduce 所包含的思想分为两步: Map 负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。 可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。 Reduce 负责“合”,即对 map 阶段的结果进行全局汇总。
MapReduce 基本原理
Map task 工作机制
简单概述: inputFile通过split被逻辑切分为多个split文件,通过 record 按行读取内容给map(用户自己实现)进行处理, 数据被map处理结束之后交给 outputCollctor收集器, 对齐结果key进行分区(默认使用hash分区), 然后写入buffer, 每个map task 都有一个内存缓冲区, 存储着map的输出结果, 当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存到磁盘, 当整个maptask结束后再磁盘中这个maptask产生的所有临时文件合并, 生成最后的正式输出文件, 然后等待 reduce task来拉取数据.
详细步骤: 1、读取数据组件 InputFormat(默认 TextInputformat) 会通过getSplit方法对输入目录中文件进行逻辑切片规划得到 block, 有多少block 就对应启动多少个 MapTask. 2、将输入文件且分为 block 之后, 有 RecordReader 对象(默认为 LineRecordReader) 进行读取, 以 \n 作为分隔符, 读取一行数据, 返回 <key, value>, key 表示们航首字符偏移量, value 表示这一行文本内容. 3、读取block 返回 <key, value>, 进入用户自己继承Mapper类中, 执行用户重写的map函数, RecordReader 读取一行这里调用一次 4、Mapper逻辑结束之后, 将Mapper的每条结果通过 context.write 进行collection数据收集. 在 collect中, 会先对其进行分区处理, 默认使用 HashPartitioner MapReduct 提供 Partitioner 接口, 它的作用就是根据 key 或 value 及 Reducer 的数量来决定当前的这对输出数据最终应该交给那个reduce task 处理, 默认对 key hash后再以 reducer 的数量取模. 默认的取模方式只是为了平均 Reducer的处理能力. 如果用户自己对 Partitioner 有需求, 可以定制并设置到job上. 5、接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量手机Mapper结果, 减少磁盘IO的影响. 我们的key/value 对以及 Partition 的结果都会被写入缓冲区. 当然, key 与 value 值都会被序列化成字节数组. 环形缓冲区其实是一个数组, 数组中存放着key和value的序列化数据 与 key,value 的元数据信息, 包括 Partition、key的起始位置、value的起始位置、以及value的长度. 环形结构是一个抽象概念. 缓冲区是有大小限制, 默认为100MB. 当 Mapper 的输出结果很多时, 就可能会撑爆内存, 所以需要在一定条件下将缓冲区的数据临时写入磁盘, 然后重新利用这块缓冲区.这个从内存往磁盘写数据的过程称为 spill, 中文可翻译为溢写. 这个溢写是有单独线程完成.不影响往缓冲区写Mapper结果的线程. 溢写线程启动时不应该阻止Mapper结果输出, 所以整个缓冲区有个溢写的比例 spill.percent. 这个比例默认为0.8, 也就是当缓冲区的数据已经达到阈值 buffer size * spill percent = 100M * 0.8 = 80M, 溢写线程启动, 锁定80M的内存, 执行溢写过程. Mapper的数据结果还可以往剩下的20MB内存中写,互不影响. 6、当溢写线程启动后, 需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序 如果 Job 设置过 Combiner, 那么现在就是使用 Combiner 的时候了. 将有相同 Key 的 Key/Value 对的 Value 加起来, 减少溢写到磁盘的数据量. Combiner 会优化 MapReduce 的中间结果, 所以它在整个模型中会多次使用 那哪些场景才能使用 Combiner 呢? 从这里分析, Combiner 的输出是 Reducer 的输入, Combiner 绝不能改变最终的计算结果. Combiner 只应该用于那种 Reduce 的输入 Key/Value 与输出 Key/Value 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它对 Job 执行效率有帮助, 反之会影响 Reducer 的最终结果 7、合并溢写文件, 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner), 如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量
Reduce task 工作机制
Reduce工作机制: Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。 详细步骤: 1、Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。 2、Merge阶段,这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。 3、合并排序,把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。 4、对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
MapReduce 的 Shuffle 过程
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。 shuffle: 洗牌、发牌 ——(核心机制:数据分区,排序,分组,规约,合并等过程)。 shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。 1)Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。 2)Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。 3)Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。 4)Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。 5)Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。 6)Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。 Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M
yarn 资源调度平台
Yarn 概述
YARN 是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。 ResourceManager 负责所有资源的监控、分配和管理; ApplicationMaster 负责每一个具体应用程序的调度和协调; NodeManager 负责每一个节点的维护。 对于所有的 applications,RM 拥有绝对的控制权和对资源的分配权。而每个 AM 则会和 RM 协商资源,同时和 NodeManager 通信来执行和监控 task。 可以把yarn理解为相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序,Yarn为这些程序提供运算所需的资源(内存、cpu)。
1)yarn 并不清楚用户提交的程序的运行机制 2)yarn 只提供运算资源的调度(用户程序向 yarn 申请资源,yarn 就负责分配资源) 3)yarn 中的主管角色叫 ResourceManager 4)yarn 中具体提供运算资源的角色叫 NodeManager 5)yarn 与运行的用户程序完全解耦,意味着 yarn 上可以运行各种类型的分布式运算程序, 比如 mapreduce、storm,spark,tez …… 6) spark、storm 等运算框架都可以整合在 yarn 上运行,只要他们各自的框架中有符合 yarn 规范的资源请求机制即可 7)yarn 成为一个通用的资源调度平台.企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享
Yarn 三大组件
ResourceManager
1)ResourceManager 负责整个集群的资源管理和分配,是一个全局的资源管理系统。 2)NodeManager 以心跳的方式向 ResourceManager 汇报资源使用情况(目前主要是 CPU 和内存的使用情况)。RM 只接受 NM 的资源回报信息,对于具体的资源处理则交给 NM 自己处理。 3)YARN Scheduler 根据 application 的请求为其分配资源,不负责 application job 的监控、追踪、运行状态反馈、启动等工作。
NodeManager
1)NodeManager 是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN 集群每个节点都运行一个 2)NodeManager。 NodeManager 定时向 ResourceManager 汇报本节点资源(CPU、内存)的使用情况和Container 的运行状态。当 ResourceManager 宕机时 NodeManager 自动连接 RM 备用节点。 3)NodeManager 接收并处理来自 ApplicationMaster 的 Container 启动、停止等各种请求。
ApplicationMaster
1)用 户 提 交 的 每 个 应 用 程 序 均 包 含 一 个 ApplicationMaster , 它 可 以 运 行 在ResourceManager 以外的机器上。 2)负责与 RM 调度器协商以获取资源(用 Container 表示)。 3)将得到的任务进一步分配给内部的任务(资源的二次分配)。 与 NM 通信以启动/停止任务。 4)监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 5)当前 YARN 自带了两个 ApplicationMaster 实现,一个是用于演示 AM 编写方法的实例程序 DistributedShell,它可以申请一定数目的 Container 以并行运行一个 Shell 命令或者 Shell 脚本;另一个是运行 MapReduce 应用程序的 AM—MRAppMaster。 注:RM 只负责监控 AM,并在 AM 运行失败时候启动它。RM 不负责 AM 内部任务的容错,任务的容错由 AM 完成。
Yarn 运行机制
详细原理: 1)client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。 2)ResourceManager启动一个container用于运行ApplicationMaster。 3)启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳。 4)ApplicationMaster向ResourceManager发送请求,申请相应数目的container。 5)ResourceManager返回ApplicationMaster的申请的containers信息。申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container。AM与NM保持心跳,从而对NM上运行的任务进行监控和管理。 6)container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息。 7)应用运行期间,client直接与AM通信获取应用的状态、进度更新等信息。 8)应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回。
hadoop概述
hadoop概述
分布式集群
hadoop生态
hadoop版本
hadoop安装目录结构
解压 hadoop-3.3.0-Centos7-64-with-snappy.tar.gz,目录结构如下: bin:Hadoop 最基本的管理脚本和使用脚本的目录,这些脚本是 sbin 目录 下管理脚本的基础实现,用户可以直接使用这些脚本管理和使用 Hadoop。 etc:Hadoop 配置文件所在的目录,包括 core-site,xml、hdfs-site.xml、 mapred-site.xml 等从 Hadoop1.0 继承而来的配置文件和 yarn-site.xml 等 Hadoop2.0 新增的配置文件。 include:对外提供的编程库头文件(具体动态库和静态库在 lib 目录中), 这些头文件均是用 C++定义的,通常用于 C++程序访问 HDFS 或者编写 MapReduce 程序。 lib:该目录包含了 Hadoop 对外提供的编程动态库和静态库,与 include 目 录中的头文件结合使用。 libexec:各个服务对用的 shell 配置文件所在的目录,可用于配置日志输 出、启动参数(比如 JVM 参数)等基本信息。 sbin:Hadoop 管理脚本所在的目录,主要包含 HDFS 和 YARN 中各类服务的 启动/关闭脚本。 share:Hadoop 各个模块编译后的 jar 包所在的目录,官方自带示例。
hadoop核心配置文件
hadoop-env.sh
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
workers
文件里面记录的是集群主机名
hadoop基本组成
HDFS 分布式文 件系统
master/slave 架构
分块存储 block
名字空间(NameSpace)
元数据管理 Namenode
数据存储 Datanode
副本机制
一次写入,多次读出
HDFS基本操作
Shell命令选项