导图社区 分布式协调组件:ZooKeeper
从Zookeeper的介绍、使用、原理到应用,让你只要通过一个思维导图掌握Zookeeper。
编辑于2020-12-19 14:43:05分布式协调组件:ZooKeeper
Zookeeper简介
Zookeeper:☑️ 分布式的小文件存储系统☑️ 监控
⭐️ Zookeeper 是一个分布式【协调服务】的开源框架,主要用来解决分布式集群中应用系统的【⚠️ 一致性】问题 ▷ 例如:怎样避免同时操作同一数据造成脏读的问题 ( ⚠️ 分布式系统数据存在一致性的问题 )⭐️ ZooKeeper 本质上是一个【☑️ 分布式的小文件存储系统】 ▷ 提供基于类似于文件系统的【⭐️ 目录树】方式的数据存储,并且可以对树中的节点进行有效管理⭐️ ZooKeeper:提供给客户端【☑️ 监控】存储在zk内部数据的功能,从而可以达到基于数据的集群管理 即,提供典型应用策略,诸如: ▷ 统一命名服务,e.g. dubbo:JavaWeb分布式协调服务框架:注册中心:提供各服务器节点信息、供客户端访问 ▷ 分布式应用配置管理,e.g. solr的配置集中管理:各节点通过zk实现配置文件拉取、同步更新 ▷ 负载均衡、状态同步服务、集群管理、分布式消息队列(sub/pub)、分布式锁、分布式协调等功能
一致性问题:▷ 单机程序,多线程操作同一个共享变量,有可能出现线程安全的问题,引入同步、加锁的机制来解决▷ 分布式系统:▷ 分布式系统中如果多个进程同时操作某个数据,有可能导致该数据出现安全问题 ▷ 多个【进程】操作同一个数据,再无法采用同步、加锁的机制来解决
Zookeeper架构组成
Master/slave架构zk集群中的Leader不是指定而来,而是通过选举产生
Leader(一个)
▷ Zookeeper 集群工作的核心角色,集群内部各个服务器的调度者▷ 事务请求(写操作) 的唯一调度和处理者,保证集群事务处理的顺序性▷ 负责进行投票的发起和决议,更新系统状态
Follower(多个)
▷ 处理客户端非事务(读操作) 请求,并向客户端返回结果▷ 转发事务请求给 Leader▷ 参与集群 Leader 选举投票,2n-1台可以做集群投票(参与选举投票耗费资源)
⭐️ Leader进程和所有的Follower进程之间都通过心跳检测机制来感知彼此的情况对于 create,setData, delete 等有写操作的请求,需要统一转发给leader 处理:▷ leader 需要决定编号、执行操作,这个过程称为一个事务
*** Observer(多个,非必要)
** 针对访问量比较大的 zookeeper 集群, 还可新增观察者角色 Observer▷ 观察 Zookeeper 集群的最新状态变化并将这些状态同步过来▷ 对于非事务请求(读操作)进行独立处理▷ 对于事务请求,转发给 Leader服务器进行处理❌ 不会参与任何形式的投票,只提供非事务服务⭐️ 通常用于在不影响集群事务处理能力的前提下提升集群的【非事务】处理能力,增加了集群增加并发的【读请求】
Zookeeper 特点
▷ 集群中只要有【半数以上】 节点存活,Zookeeper集群就能正常服务▷ 全局数据一致:每个server(Follower)保存一份Leader数据的副本,Client无论连接到哪个server,数据都是一致的▷ 更新请求顺序进行(内部原理) ▷ 数据更新原子性,一次数据更新要么成功,要么失败
Zookeeper环境搭建
Zookeeper的搭建方式(共三种)
单机模式
Zookeeper只运行在一台服务器上,适合测试环境
伪集群模式
就是在一台服务器上运行多个Zookeeper 实例
集群模式
Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体” ⭐️ 生产使用
Zookeeper集群搭建
▷ 下载稳定版本的zookeeper
zookeeper-3.4.14.tar.gz
▷ 上传、解压
scp /Users/kathryn/LG_DataDev/software/zookeeper-3.4.14.tar.gz root@linux121:/opt/lagou/software[root@linux121 ~]# tar -zxvf /opt/lagou/software/zookeeper-3.4.14.tar.gz -C /opt/lagou/servers
▷ 修改配置文件▷ 创建data与log目录
#创建zk存储数据目录 datamkdir -p /opt/lagou/servers/zookeeper-3.4.14/data#创建zk日志文件目录mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data/logs#修改zk配置文件cd /opt/lagou/servers/zookeeper-3.4.14/conf#文件改名mv zoo_sample.cfg zoo.cfg▷ vi /opt/lagou/servers/zookeeper-3.4.14/conf/zoo.cfg#更新datadirdataDir=/opt/lagou/servers/zookeeper-3.4.14/data#增加logdirdataLogDir=/opt/lagou/servers/zookeeper-3.4.14/data/logs#打开注释#ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时autopurge.purgeInterval=1#增加集群配置(放在配置文件最后)##server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口server.1=linux121:2888:3888server.2=linux122:2888:3888server.3=linux123:2888:3888
⭐️ 服务器之间通信端口 2888⭐️ 服务器之间投票选举端口 3888
▷ 添加myid配置
▷ 在zookeeper的 data 目录下创建一个 myid 文件,内容为1,这个文件就是记录每个服务器的IDecho -n 1 > /opt/lagou/servers/zookeeper-3.4.14/data/myid▷ 安装包分发rsync-script /opt/lagou/servers/zookeeper-3.4.14▷ 修改myid值 linux122echo -n 2 >/opt/lagou/servers/zookeeper-3.4.14/data/myid▷ 修改myid值 linux123echo -n 3 >/opt/lagou/servers/zookeeper-3.4.14/data/myid
集群内每个服务器节点配置本机ID(集群内唯一),提供选举流程使用
▷ 依次启动三个zk实例▷ 查看zk启动情况
/opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh start/opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh status
单节点启动命令,三个节点都要执行
▷ 集群启动停止脚本
[root@linux121 ~]# vi /zk.sh#!/bin/sh echo "start zookeeper server..." if(($#==0));thenecho "no params";exit;fihosts="linux121 linux122 linux123"for host in $hostsdossh $host "source /etc/profile; /opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh $1"done
▷ (linux121)执行 zk.sh 启停集群
▷ 启动 sh /zk.sh start▷ 停止 sh /zk.sh stop▷ 状态 sh /zk.sh status
▷ zk.sh 脚本没有x权限,可以通过指定其shell脚本的解释器,来实现脚本执行▷ jps ⇨ QuorumPeerMain = zk进程
集群支持动态添加机器(水平扩容)
方式一:全部重启:关闭所有 Zookeeper 服务,修改配置之后启动。不影响之前客户端的会话
方式二:逐个重启:在过半存活即可用的原则下,一台机器重启不影响整个集群对外提供服务。这是比较常用的方式
3.5 版本开始支持动态扩容
Zookeeper数据结构与监听机制
ZooKeeper数据模型Znode
▷ 在ZooKeeper中,数据信息被保存在一个个数据节点上,这些节点被称为znode ▷ ZNode 是Zookeeper 中最小数据单位 ▷ 在 ZNode 下面又可以再挂 ZNode▷ 形成了一个层次化命名空间 ZNode 树,我们称为 ZNode Tree,它采用了类似文件系统的层级树状结构进行管理▷ 左图根目录 / 下有两个节点,分别是:app1 和app2,其中 app1 下面又有三个子节点▷ 所有ZNode按层次化进行组织,形成这么一颗树▷ ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示▷ 开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点
ZNode 的类型
持久性节点(Persistent)
临时性节点(Ephemeral)
顺序性节点(Sequential)
节点组合生成四种节点类型
持久节点
是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除
持久顺序节点
▷ 有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上▷ 顺序特性实质是在创建节点的时候,节点名后边会追加一个由父节点维护的自增整形数字,来表示其顺序
临时节点
▷ 是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束(客户端与zookeeper连接断开不一定会话失效),这个客户端创建的所有临时节点都会被移除▷ 与持久性节点不同的是,临时节点不能创建子节点
临时顺序节点
是有顺序的临时节点,和持久顺序节点相同,在其创建的时候,节点名后边会追加一个由父节点维护的自增整形数字
事务ID
▷ 事务是对物理和抽象的应用状态上的操作集合▷ 狭义上的事务指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)
▷ 在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作▷ 也称之为事务操作或更新操作,一般包括数据节点创建create、删除数据节点,节点数据内容更新等操作▷ 对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用 ZXID 来表示▷ 每一个 ZXID 对应一次更新操作,通常是一个 64 位的数字▷ 从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序
ZNode 的状态信息
▷ 打开zk的shell客户端,连接到zk集群
[root@linux121 ~]# /opt/lagou/servers/zookeeper-3.4.14/bin/zkCli.sh
▷ 连接上当前节点(linux121:follower)
▷ 查看根目录下的 zookeeper 节点内容
[zk: localhost:2181(CONNECTED) 0] get /zookeepercZxid = 0x0ctime = Thu Jan 01 08:00:00 CST 1970mZxid = 0x0mtime = Thu Jan 01 08:00:00 CST 1970pZxid = 0x0cversion = -1dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 0numChildren = 1[zk: localhost:2181(CONNECTED) 1] ls /zookeeper[quota]
▷ get /zookeeper 根目录下的zookeeper ZNode 节点内容包括两部分: ① 节点数据内容(紧跟一行空行:表示数据内容是空) ② 节点状态信息(每个节点,zk都会维护一个状态信息) ▷ cZxid 就是 Create ZXID,表示节点被创建时的事务ID ▷ ctime 就是 Create Time,表示节点创建时间 ▷ mZxid 就是 Modified ZXID,表示节点最后一次被修改时的事务ID ▷ mtime 就是 Modified Time,表示节点最后一次被修改的时间 ▷ pZxid 表示该节点的子节点列表最后一次被修改时的事务 ID ⇨ 只有子节点列表变更才会更新 pZxid,子 节点内容变更不会更新 ▷ cversion 表示子节点的版本号 ▷ dataVersion 表示内容版本号 ▷ aclVersion 标识acl版本(acl:zk权限控制) ▷ ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0 ▷ dataLength 表示数据长度 ▷ numChildren 表示直系子节点数▷ ls /zookeeper:查看子节点
Watcher 机制
▶︎ Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能▷ 一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象▷ 当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理▶︎ 在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能▷ ZooKeeper 允许客户端向服务端注册一个 Watcher 监听▷ 当服务端的一些指定事件触发了这个 Watcher,那么Zk就会向指定客户端发送一个事件通知来实现分布式的通知功能
发布/订阅系统一般有两种设计模式,分别是推(Push)模式和拉(Pull)模式▷ 推模式:服务端主动将数据更新发送给所有订阅的客户端▷ 拉模式:客户端主动请求获取最新数据,通常客户端都采用定时进行轮询拉取的方式
Zookeeper的Watcher机制主要包括:① 客户端线程② 客户端WatcherManager③ Zookeeper服务器三部分
① 客户端注册 watcher
① 客户端 ▷ 在向Zookeeper服务器注册(监听某个目录)的同时 ▷ 将Watcher对象存储在客户端WatcherManager当中
② 服务端处理 watcher
② 当Zookeeper服务器触发Watcher事件后,会向客户端发送通知
③ 客户端回调 watcher
③ 客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑,主动到服务端获取最新的数据
Zookeeper的基本使用
ZooKeeper命令行操作
通过zkClient进入zookeeper客户端命令行
/opt/lagou/servers/zookeeper-3.4.14/bin/zkCli.sh 连接本地的zookeeper服务器/opt/lagou/servers/zookeeper-3.4.14/bin/zkCli.sh -server ip:port(2181) 连接指定的服务器
⭐️ 客户端与zk集群通信的端口是2181
查看可用的Zookeeper命令
help
[zk: localhost:2181(CONNECTED) 0] helpZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port
常见的zk命令:▷ set path data [version] 给path路径设置新数据 [,可以指定该数据的版本信息]▷ ls path / ls2 path 展示路径信息 [,可以加上watch对象:watcher 监听器]▷ rmr path 删除▷ get path [watch] 获取节点下对应的存储数据▷ create [-s] [-e] path data acl 创建(持久化/临时)节点,指定数据,权限校验▷ close 关闭▷ connect host:port 连接到其他节点上
创建节点
create [-s][-e] path data
-s 顺序节点-e 临时节点不指定,持久节点
① 创建顺序节点
[zk: localhost:2181(CONNECTED) 1] create -s /zk-test 123Created /zk-test0000000000
▷ 查看根目录
[zk: localhost:2181(CONNECTED) 7] ls /[zk-test0000000000, zookeeper]
▷ 查看新建节点的节点内容
[zk: localhost:2181(CONNECTED) 9] get /zk-test0000000000123cZxid = 0x700000003ctime = Mon Nov 30 15:27:49 CST 2020mZxid = 0x700000003mtime = Mon Nov 30 15:27:49 CST 2020pZxid = 0x700000003cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 3numChildren = 0
▷ 查看子节点
[zk: localhost:2181(CONNECTED) 10] ls /zk-test0000000000[]
▷ 执行动作:在根节点下创建了一个叫做zk-test0000000000的节点,该节点内容就是123▷ 执行结果:创建的zk-test节点后面添加了一串数字(顺序节点的数字后缀)
② 创建临时节点
[zk: localhost:2181(CONNECTED) 11] create -e /zk-tmp 123WATCHER::WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Created /zk-tmp
▷ 查看根目录
[zk: localhost:2181(CONNECTED) 12] ls /[zk-tmp, zk-test0000000000, zookeeper]
▷ 查看新建节点的节点内容
[zk: localhost:2181(CONNECTED) 13] get /zk-tmp123cZxid = 0x700000004ctime = Mon Nov 30 15:42:43 CST 2020mZxid = 0x700000004mtime = Mon Nov 30 15:42:43 CST 2020pZxid = 0x700000004cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x1000007718b0000dataLength = 3numChildren = 0
▷ 测试临时节点特性
[zk: localhost:2181(CONNECTED) 10] quit[zk: localhost:2181(CONNECTED) 0] ls /[zk-test0000000000, zookeeper]
▷ 执行动作:在根节点下创建了一个叫做/zk-tmp的临时节点,该节点内容就是123⭐️ 临时节点特性:临时节点在客户端会话结束后,就会自动删除
③ 创建永久节点
[zk: localhost:2181(CONNECTED) 1] create /zk-persist 123Created /zk-persist
▷ 查看根目录
[zk: localhost:2181(CONNECTED) 2] ls /[zk-test0000000000, zookeeper, zk-persist]
▷ 查看新建节点的节点内容
[zk: localhost:2181(CONNECTED) 1] get /zk-persist123cZxid = 0x700000009ctime = Mon Nov 30 15:51:53 CST 2020mZxid = 0x700000009mtime = Mon Nov 30 15:51:53 CST 2020pZxid = 0x700000009cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 3numChildren = 0
▷ 测试永久节点特性
[zk: localhost:2181(CONNECTED) 10] quit[zk: localhost:2181(CONNECTED) 0] ls /[zk-test0000000000, zookeeper, zk-persist]
▷ 执行动作:在根节点下创建了一个叫做/zk-persist的永久节点,该节点内容就是123▷ 执行结果:创建的zk-persist节点后面不会增加数字后缀,永久节点在客户端会话结束后不会自动删除
读取节点
ls path
查看指定节点下的第一级的所有子节点
get path
获取指定节点的数据内容和属性信息
效果举例
更新节点
set path data [version]
更新指定节点的数据内容▷ data就是要更新的新内容,version表示数据版本
[zk: localhost:2181(CONNECTED) 3] set /zk-persist 111cZxid = 0x700000009ctime = Mon Nov 30 15:51:53 CST 2020mZxid = 0x70000000cmtime = Mon Nov 30 16:13:37 CST 2020pZxid = 0x700000009cversion = 0dataVersion = 1aclVersion = 0ephemeralOwner = 0x0dataLength = 3numChildren = 0
▷ mZxid:由原来0x700000009变为0x70000000c▷ dataVersion:由原来的0变为1,表示进行了更新
删除节点
delete path [version]
删除Zookeeper上的指定节点
[zk: localhost:2181(CONNECTED) 0] ls /[zk-test0000000000, zookeeper, zk-persist][zk: localhost:2181(CONNECTED) 1] delete /zk-persist[zk: localhost:2181(CONNECTED) 2] ls /[zk-test0000000000, zookeeper]
⭐️ 若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点
Zookeeper-开源客户端
ZkClient
ZkClient是Github上一个开源的zookeeper客户端,在Zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
实现步骤
▷ 创建Maven工程(zkclient-demo),添加依赖
pom.xml
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency></dependencies>
▷ 创建会话
new ZkClient()
com.lagou.zk.demo.ZkDemo
import org.I0Itec.zkclient.ZkClient;public class ZkDemo { public static void main(String[] args) { /** * 获取到zkClient对象,建立到zk集群的会话 * client与zk集群通信端口是2181 */ ZkClient zkClient = new ZkClient("linux121:2181"); System.out.println("zkClient is ready"); }}
▷ 创建节点
create...(String path, boolean createParent)
com.lagou.zk.demo.ZkDemo
public class ZkDemo { public static void main(String[] args) { /** * 获取到zkClient对象,建立到zk集群的会话 * client与zk集群通信端口是2181 */ ZkClient zkClient = new ZkClient("linux121:2181"); System.out.println("zkClient is ready"); /** * 创建节点 * 如果需要级联创建,第二个参数设置为true */ zkClient.createPersistent("/la-client/lg-cl",true); System.out.println("path is created"); }}
▷ 删除节点
delete[Recursive](String path)
com.lagou.zk.demo.ZkDemo
public class ZkDemo { public static void main(String[] args) { /** * 获取到zkClient对象,建立到zk集群的会话 * client与zk集群通信端口是2181 */ ZkClient zkClient = new ZkClient("linux121:2181"); System.out.println("zkClient is ready"); /** * 删除节点 * delete(path) 删除空节点 * deleteRecursive(path) 递归删除非空节点,先删除子节点然后删除父节点 */ zkClient.deleteRecursive("/la-client"); System.out.println("delete path is successful"); }}
▷ 监听节点变化
subscribeChildChanges(String path, IZkDataListener listen)
com.lagou.zk.demo.GetChildChange
public class GetChildChange { public static void main(String[] args) throws InterruptedException { // 1. 获取zkClient ZkClient zkClient = new ZkClient("linux121:2181"); // 2. zkClient对指定目录进行监听(不存在目录:/lg-client),指定收到通知之后的逻辑 // 2.1 对lg-client注册了监听器,监听器是一直监听 zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() { // 2.2 匿名内部类:接收到通知之后的执行逻辑定义 public void handleChildChange(String path, List<String> childNodes) throws Exception { System.out.println("childNodes of " + path + " have changed to " + childNodes); } }); // 3. 使用zkClient创建节点,删除节点,验证监听器是否运行 zkClient.createPersistent("/lg-client"); Thread.sleep(1000); // 为了方便观察结果数据 zkClient.createPersistent("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client"); Thread.sleep(Integer.MAX_VALUE); }}
▷ 运行结果childNodes of /lg-client have changed to []childNodes of /lg-client have changed to [c1]childNodes of /lg-client have changed to []childNodes of /lg-client have changed to null
[zk: localhost:2181(CONNECTED) 2] create /lg-client 123Created /lg-client[zk: localhost:2181(CONNECTED) 3] create /lg-client/lg 123Created /lg-client/lg[zk: localhost:2181(CONNECTED) 4] delete /lg-client/lg[zk: localhost:2181(CONNECTED) 5] delete /lg-client
▷ 运行结果childNodes of /lg-client have changed to []childNodes of /lg-client have changed to [lg]childNodes of /lg-client have changed to []childNodes of /lg-client have changed to null
⭐️ 一旦客户端对一个节点注册了子节点列表变更监听: ▷ 会一直监听下去 ▷ 客户端可以对一个不存在的节点进行子节点变更的监听 ▷ 当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端 ▷ 该节点本身的创建或删除也会通知到客户端
▷ 获取数据
subscribeDataChanges(String path, IZkDataListener listen)
com.lagou.zk.demo.GetDataChange
/** * 使用监听器监听节点数据的变化 */public class GetDataChange { public static void main(String[] args) throws InterruptedException { // 获取zkClient对象 ZkClient zkClient = new ZkClient("linux121:2181"); /** * 设置自定义的序列化类型,否则会报错 * ⭐ setZkSerializer * public void setZkSerializer(ZkSerializer zkSerializer) { * this._zkSerializer = zkSerializer; * } * * ⭐️ ZkSerializer * * public interface ZkSerializer { * * byte[] serialize(Object var1) throws ZkMarshallingError; // 序列化方法,数据转成二进制数组 * * Object deserialize(byte[] var1) throws ZkMarshallingError; // 反序列化方法,二进制数组转成数据 * * } * * * ZkSerializer 的两个 implementation: * * * SerializableSerializer 默认使用,但是会报错 * * * BytesPushThroughSerializer */ zkClient.setZkSerializer(new ZkStrSerializer()); // 判断节点是否存在,不存在创建节点并赋值 boolean exists = zkClient.exists("/lg-client"); if (! exists) { zkClient.createEphemeral("/lg-client","123"); } // 注册监听器,节点数据改变的类型,接收通知后的处理逻辑定义 zkClient.subscribeDataChanges("/lg-client", new IZkDataListener() { public void handleDataChange(String path, Object data) throws Exception { System.out.println("data of " + path + " has changed to " + data); } public void handleDataDeleted(String path) throws Exception { System.out.println(path + " is deleted"); } }); // 更新节点的数据,删除节点,验证监听器是否正常运行 Object data = zkClient.readData("/lg-client"); System.out.println(data); zkClient.writeData("/lg-client", "new data"); Thread.sleep(1000); zkClient.delete("/lg-client"); Thread.sleep(Integer.MAX_VALUE); }}
▷ 运行结果123data of /lg-client has changed to new data/lg-client is deleted
⭐️ 可以成功监听节点数据变化或删除事件
com.lagou.zk.demo.ZkStrSerializer
public class ZkStrSerializer implements ZkSerializer { // 序列化,数据 --> byte[] public byte[] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } // 反序列化,byte[] --> 数据 public Object deserialize(byte[] bytes) throws ZkMarshallingError { return new String(bytes); }}
Zookeeper内部原理
Leader选举
选举机制
⭐️ 半数机制:集群中半数以上机器存活,集群可用 ▷ 所以Zookeeper适合安装【奇数,2N+1,N>0】台服务器▷ Zookeeper虽然在配置文件中并没有指定Master和Slave▷ 但是,Zookeeper工作时,是有一个节点为Leader,其它为Follower,Leader是通过内部的选举机制产生的
Zookeeper的选举机制
选举机制情景假设:有五台服务器组成的Zookeeper集群,它们的id从1-5(myid=1~5,分别唯一),同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的
集群初始化(首次)启动
▷ Zxid值(还没有)▶︎ myid值▶︎ 超过半数
① 服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态
② 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出(server1从投自己转为投2),但是由于没有达到超过半数以上(5/2=2.5)的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态
③ 服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader
④ 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能成为Follower
⑤ 服务器5启动,同4一样成为follower
集群非首次启动
▶︎ Zxid值
每个节点在选举时都会参考自身节点的zxid值(事务ID);优先选择zxid值大的节点称为Leader
ZAB一致性协议
存在背景
分布式数据存在一致性问题
产生原因
需要副本
① 将数据复制到分布式部署的多台机器中,可以消除单点故障,防止系统由于某台(些)机器宕机导致的不可用② 通过负载均衡技术,能够让分布在不同地方的数据副本全都对外提供服务。有效提高系统性能
复制需要网络
在分布式系统中引入数据复制机制后,多台数据节点之间进行数据复制(形成副本)一定需要通过网络传输,由于网络这个不可抗力因素很容易产生数据不一致的情况e.g. 当客户端Client1将系统中的一个值K1由V1更新为V2,但是客户端Client2读取的是一个还没有同步更新的副本,K1的值依然是V1,这就导致了数据的不一致性。其中,常见的就是主从数据库之间的复制延时问题
ZAB协议
⭐️ ZK就是分布式一致性问题的工业解决方案⭐️ paxos是其底层理论算法(晦涩难懂著名),其中zab,raft和众多开源算法是对paxos的工业级实现⭐️ ZK没有完全采用paxos算法,而是使用了一种称为Zookeeper Atomic Broadcast(ZAB,Zookeeper原子消息广播协议)的协议作为其数据一致性的核心算法
ZAB 协议:是为分布式协调服务 Zookeeper 专门设计的一种支持① 崩溃恢复 ② 原子广播的协议
主备模式保证一致性 ⭐️▷ 所有客户端写入数据都是写入Leader中,然后由 Leader 复制到 Follower 中(前提)▷ ZAB会将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程上▷ ZAB协议能够保证了事务操作的一个全局的变更序号(ZXID) ⇨ 保证事务有序执行
广播消息
⓪ ZK集群为了保证任何事务操作能够有序的顺序执行,只能是 Leader 服务器接受写请求,即使是 Follower 服务器接受到客户端的请求,也会转发到 Leader 服务器进行处理
① Leader接收到Client请求之后,会将这个请求封装成一个事务,并给这个事务分配一个全局递增的唯一 ID,称为事务ID(ZXID),ZAB 协议要求保证事务的顺序,因此必须将每一个事务按照 ZXID进行先后排序然后处理
② 发送Proposal到Follower
③ Leader接收Follower的ACK
超过半数Folower反馈ACK,Proposal被允许
④ 超过半数ACK,发送Commit到Follower,同时自己Commit
▷ 恢复正常前,会自我检测发现与Leader通信不成功,自行拒绝对外提供服务▷ 不能正常反馈Follower恢复正常后会进入数据同步阶段最终与Leader保持一致
⭐️ zk提供的应该是最终一致性的标准zk所有节点接收写请求之后可以在【一定时间内】保证所有节点都能看到该条数据(一般来说时间非常短)
Leader崩溃恢复
▷ Leader宕机后,ZK集群无法正常工作▷ ZAB协议提供了一个【高效且可靠】的leader选举算法
Leader宕机后,被选举的新Leader需要解决的问题:① ZAB 协议确保那些已经在 Leader 提交的事务最终会被所有服务器提交② ZAB 协议确保丢弃那些只在 Leader 提出/复制,但没有提交的事务
ZAB协议设计了一个选举算法,能够确保:① 已经被Leader提交的事务被集群接受 ② 丢弃还没有提交的事务⭐️ 选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最大编号(ZXID)的事务
ZAB 协议:是为分布式协调服务 Zookeeper 专门设计的一种支持① 崩溃恢复 ② 原子广播的协议
消息广播
ZAB 协议的消息广播过程类似【二阶段提交过程】
⓪ 对于客户端发送的【写请求】,全部由 Leader 接收
▷ ZK集群为了保证任何事务操作能够有序的顺序执行,只能是 Leader 服务器接受写请求▷ 即使是 Follower 服务器接受到客户端的请求,也会转发到 Leader 服务器进行处理
① Leader 将请求封装成一个事务 Proposal(提议)
▷ Leader接收到Client请求之后,会将这个请求封装成一个事务▷ 并给这个事务分配一个全局递增的唯一 ID,称为事务ID(ZXID)▷ ZAB 协议要求保证事务的顺序,因此必须将每一个事务按照 ZXID进行先后排序然后处理
② Leader 将 Proposal 发送给所有 Follwer
Follower 校验ZXID 不能小于上次请求的事务ID,以保证严格有序
③ Leader接收Follower的ACK
Follwer对刚才的提议是否通过
④ 超过半数ACK,发送Commit到Follower,同时自己Commit
▷ Leader 以及 接收到Commit的Follower 将写操作写入到自己的文件中▷ 不能正常反馈的Follower,会自我检测发现与Leader通信不成功,自行拒绝对外提供服务▷ 不能正常反馈Follower恢复正常后会进入数据同步阶段最终与Leader保持一致
⭐️ zk提供的应该是最终一致性的标准zk所有节点接收写请求之后可以在【一定时间内】保证所有节点都能看到该条数据(一般来说该时间非常短)
Leader崩溃恢复
▷ Leader宕机后,ZK集群无法正常工作▷ ZAB协议提供了一个【高效且可靠】的leader选举算法⭐️ 选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最大编号(ZXID)的事务
Leader宕机后,被选举的新Leader需要解决的问题:① ZAB 协议确保那些已经在 Leader 提交的事务最终会被所有服务器提交 ▷ 已经被Leader提交的事务被集群接受② ZAB 协议确保丢弃那些只在 Leader 提出/复制,但没有提交的事务 ▷ 丢弃还没有提交的事务
Zookeeper应用实践
▷ ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,可以使用它来进行分布式数据的发布与订阅▷ 通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能,如数据发布/订阅、命名服务、集群管理、Master选举、分布式锁和分布式队列等
Zookeeper的两大特性
① 客户端如果对Zookeeper的数据节点注册Watcher监听 ▷ 那么当该数据节点的内容或是其子节点列表发生变更时,Zookeeper服务器就会向订阅的客户端发送变更通知
② 对在Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除
【应用】利用其两大特性,可以实现集群机器存活监控系统:▷ 监控系统在/clusterServers节点上注册一个Watcher监听 ▷ 但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[Hostname] ▷ 监控系统就能够实时监测机器的变动情况
ZK应用实践
① 集群管理(服务器动态上下线监听)
需求:分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,而任意一台客户端都要能实时感知到主节点服务器的上下线
流程分析:▷ 服务器server提供时间查询服务,有两台,端口一个是3333,一个是4444,线程模拟时间查询服务▷ Client不直接连接server,否则如果server宕机,请求会被拒绝▷ zk对server统一进行管理,每个server启动的时候都去zk的servers目录下创建一个临时带顺序的节点▷ zk监听servers节点,获取所有节点,获取节点存储的服务器的ip地址+port信息,此机器证明是健康的机器▷ Client:维护一个list结构来储存上线的服务器,list<ip:port>▷ Client从维护的list中挑选一个服务器建立socket连接,建立网络通信,请求服务器时间▷ 服务器宕机,会通知zk集群,Client会感知,此时再去拉取当前可用的节点信息,重新建立网络通信
▷ 服务端
com.lagou.zk.onoffline.Server
/** * 1. 服务端主要提供了client需要的时间查询服务 * 2. 服务端向zk建立临时节点 * 3. 服务端每次启动都告知zk自己的 ip:port 信息 */public class Server { // 2.1 获取zkClient private ZkClient zkClient = null; private void connectZk(){ // 2.1 初始化zkClient (可以写多个节点,第一个节点不可用就会尝试第二个) zkClient = new ZkClient("linux121:2181,linux122:2181"); // 2.2 创建服务端建立临时节点的目录 /servers if (! zkClient.exists("/servers")) { zkClient.createPersistent("/servers"); } } // 3.0 告知zk服务器相关信息 private void saveServerInfo(String ip, String port){ // 3.1 服务器建立临时节点,数据内容为服务器的 ip:port 信息,临时节点创建成功返回刚新建的节点路径信息 String sequencePath = zkClient.createEphemeralSequential("/servers/server", ip + ":" + port); // 3.2 标识信息 System.out.println("--> 服务器 " + ip + ":" + port + " ,向zk保存信息成功,成功上线,可以接收client查询"); } // 1.0 启动服务端 public static void main(String[] args) { // 1.1 准备两个服务端启动上线(多线程模拟,一个线程代表一个服务器) Server server = new Server(); server.connectZk(); server.saveServerInfo(args[0], args[1]); // 1.2 提供时间服务的线程还没有启动,创建一个线程类,可以接收socket的请求 new TimeService(Integer.parseInt(args[1])).start(); }}
▷ 时间查询服务
com.lagou.zk.onoffline.TimeService
/** * 提供时间查询服务 */public class TimeService extends Thread{ // 3. 通过属性,进行port参数传递 private int port = 0; public TimeService(int port) { this.port = port; } @Override public void run() { // 1. 通过socket与client进行交流,启动serverSocket监听请求 try { // 2. 指定监听的端口 ServerSocket serverSocket = new ServerSocket(port); // 6. 通过死循环,保证服务端一致运行 while(true){ // 4. 等待服务请求(阻塞) Socket socket = serverSocket.accept(); // 5. 不关心client发送内容,server只考虑发送一个时间值 OutputStream out = socket.getOutputStream(); out.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } }}
▷ 客户端
com.lagou.zk.onoffline.Client
/** * 1. 注册监听zk执行目录 /servers * 2. 同时维护自己本地一个list结构来储存servers信息 * 3. 收到通知要进行更新 * 4. 发送时间查询请求并接收服务端返回的数据 */public class Client { // 1.1 获取zkClient private ZkClient zkClient = null; // 2.1 维护一个servers 信息集合 private List<String> infos = new ArrayList<String>(); private void connectZk() { // 1.2 初始化zkClient (可以连接多个节点,前面的不可用就会尝试后面的) zkClient = new ZkClient("linux121:2181,linux122:2181"); // 1.3 第一次获取服务器信息,所有的子节点(的相对路径) List<String> children = zkClient.getChildren("/servers"); for (String child : children) { // 2.2 从子节点中获取存储着的 ip + port 数据内容 Object o = zkClient.readData("/servers/" + child); infos.add(String.valueOf(o)); } // 3.1 对servers目录进行监听 zkClient.subscribeChildChanges("/servers", new IZkChildListener() { public void handleChildChange(String s, List<String> children) throws Exception { // 3.2 接收到通知,说明节点发生了变化,client需要更新infos集合中的数据 List<String> list = new ArrayList<String>(); // 3.3 遍历更新过后的所有节点信息 for (String path : children) { Object o = zkClient.readData("/servers/" + path); list.add(String.valueOf(o)); } // 3.4 最新数据覆盖老数据 infos = list; System.out.println("--> 接收到通知,最新服务器信息为:" + infos); } }); } // 4.1 发送时间查询的请求 public void sendRequest() throws IOException { // 4.2 目标服务器地址 Random random = new Random(); int i = random.nextInt(infos.size()); String ipPort = infos.get(i); String[] arr = ipPort.split(":"); // 4.3 建立socket连接、输入输出流 Socket socket = new Socket(arr[0], Integer.parseInt(arr[1])); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); // 4.4 发送数据 out.write("query time".getBytes()); out.flush(); // 4.5 接收返回结果 byte[] b = new byte[1024]; in.read(b); //读取服务端返回数据 System.out.println("client端接收到server:" + ipPort + "返回结果:" + new String(b)); // 4.6 释放资源 in.close(); out.close(); socket.close(); } // 4.7 让Client持续发送时间查询请求 public static void main(String[] args) throws IOException, InterruptedException { final Client client = new Client(); client.connectZk(); // 监听器逻辑 while (true) { client.sendRequest(); // 发送请求 // 每隔几秒中发送一次请求到服务端 Thread.sleep(2000); } }}
▷ 测试
配置IDEA的Configuration,运行两个Server程序 + 一个Client程序
Server1:--> 服务器 localhost:3333 ,向zk保存信息成功,成功上线,可以接收client查询Server2:--> 服务器 localhost:4444 ,向zk保存信息成功,成功上线,可以接收client查询Client:client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:40 CST 2020 client端接收到server:localhost:3333返回结果:Tue Dec 01 16:34:42 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:44 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:46 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:48 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:50 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:52 CST 2020 client端接收到server:localhost:3333返回结果:Tue Dec 01 16:34:54 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:34:56 CST 2020 client端接收到server:localhost:3333返回结果:Tue Dec 01 16:34:58 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:35:00 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:35:02 CST 2020 client端接收到server:localhost:3333返回结果:Tue Dec 01 16:35:04 CST 2020 client端接收到server:localhost:3333返回结果:Tue Dec 01 16:35:06 CST 2020 client端接收到server:localhost:4444返回结果:Tue Dec 01 16:35:08 CST 2020
② 分布式锁
锁的概念
▷ 在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全(数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串行执行消除并发修改变量▷ 对变量或者堆代码码块做同步本质上就是加锁。目的就是实现多个线程在一个时刻同一个代码块只能有一个线程可执行
e.g. 电商平台用户下单流程
① 用户下单② 判断商品缓存数量:用户下订单之前一定要去检查库存,确保库存足够了才会给用户下单③ 商品库存表-购买件数④ 更新缓存商品数量:每台机器都会预先将商品的库存保存在Redis中,用户下单的时候会更新Redis的库存▷ 单线程解决线程安全问题: ②~④ 的步骤锁住(增加的锁只对自己当前 JVM 里面的线程有效)▷ 高并发多节点上的多个线程分别执行 ②~④ 的步骤:分布式锁 【⭐️ 分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作的时候需要获取到该锁,才能执行相应操作】
zk实现分布式锁
▷ 利用Zookeeper可以创建临时带序号节点的特性来实现一个分布式锁实现思路▷ 锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断▷ 每个线程都先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点(是不是我这个线程创建的),如果是那么获取锁成功,如果不是那么获取锁失败▷ 获取到锁的进程运行结束,相当于客户端与服务器之间的会话失效,临时节点会被自动删除▷ 获取锁失败的线程获取当前节点上一个临时顺序节点(我创建的节点序号-1的节点),并对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁
com.lagou.zk.disLock.DistributeLockTest
/** * zk 实现分布式锁 */public class DistributeLockTest { public static void main(String[] args) { // 1. 使用10个线程模拟分布式环境 for (int i = 0; i < 10; i++) { new Thread(new DisLockRunnable()).start(); // 启动线程 } } // 2. 准备一个线程类 static class DisLockRunnable implements Runnable { public void run() { // 2.1 每个线程具体的任务,每个线程就是抢锁 DisClient client = new DisClient(); client.getDisLock(); // 2.2 模拟获取锁之后的其他动作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 2.3 释放锁 client.deleteLock(); } }}
com.lagou.zk.disLock.DisClient
/** * 分布式客户端:实现抢锁 * a. 去zk创建临时序列节点,并获取到序号 * b. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁,执行相关操作,最后释放锁 * c. 如果不是最小节点,当前线程需要等待,等待前一个序号的节点被删除,然后再次判断自己是否是最小节点... */public class DisClient { // 5. 初始化zk的 /dislock 节点 (注意:会出现线程安全的问题) public DisClient() { synchronized (DisClient.class) { if (! zkClient.exists("/dislock")) { zkClient.createPersistent("/dislock"); } } } // 0. 获取到zkClient private ZkClient zkClient = new ZkClient("linux121:2181, linux122:2181"); // 1.1.1 当前节点,用于判断是否第一次创建当前节点,变量提升到全局变量 private String currentNodePath; // 1.4.2.3 前一个节点 private String beforeNodePath; // 2.3.4 线程同步计数器,为了让countDownLatch被获取到,将变量提升到全局 private CountDownLatch countDownLatch; /** * 把抢锁过程分为两部分 * 1. 一部分是创建节点,比较序号 * 2. 另一部分是等待锁 */ // 3. 完整获取锁方法 public void getDisLock(){ // 3.1 获取到当前线程的名称 String threadName = Thread.currentThread().getName(); // 3.2 首先调用tryGetLock if (tryGetLock()){ // 调用 ==> 1. // 3.2.1 说明获取到锁 System.out.println(threadName + " : 获取到了锁"); } else { // 3.2.2 说明没有获取到锁 System.out.println(threadName + " : 获取锁失败,进入等待状态"); waitForLock(); // 调用 ==> 2. // 3.2.2 递归获取锁 getDisLock(); // 调用 ==> 3. } } // 1. 尝试获取锁 public boolean tryGetLock(){ // 1.1 创建临时顺序节点 (/dislock/序号) // 1.1.1 判断是否第一次创建当前节点,变量提升到全局变量 if (currentNodePath == null || "".equals(currentNodePath)){ currentNodePath = zkClient.createEphemeralSequential("/dislock/", "lock"); } // 1.2 获取到 /dislock 下所有的子节点 List<String> children = zkClient.getChildren("/dislock"); // 1.3 对节点信息进行排序 Collections.sort(children); // 默认是升序 String minNode = children.get(0); // 1.4 判断自己创建节点是否与最小序号一致 if (currentNodePath.equals("/dislock/" + minNode)){ // 1.4.1 说明当前线程创建的就是序号最小节点 return true; } else { // 1.4.2 说明最小节点不是自己创建的 // 1.4.2.1 要监控自己当前节点序号前一个节点 int i = Collections.binarySearch(children, currentNodePath.substring("/dislock/".length())); // 1.4.2.2 获取前一个节点的相对路径 (lastNodeChild路径信息中不包括父节点的信息) String lastNodeChild = children.get(i - 1); // 1.4.2.3 获取前一个节点的绝对路径,并提升为全局变量 beforeNodePath = "/dislock/" + lastNodeChild; } // 1.4.2.4 没有获取到锁的,都返回false return false; } // 2. 等待之前节点释放锁,如何判断锁被释放,需要唤醒线程,继续尝试tryGetLock public void waitForLock() { // 2.2 准备一个监听器 (方便订阅 & 取消订阅) IZkDataListener iZkDataListener = new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { } // 2.2.1 监听前一个节点被删除后 public void handleDataDeleted(String s) throws Exception { // 2.2.2 提醒当前线程再次获取锁 // 2.3.3 每执行一次则countDownLatch值-1,直到countDownLatch值变为0,唤醒之前await线程 // 2.3.4 为了让countDownLatch被获取到,将变量提升到全局 countDownLatch.countDown(); } }; // 2.1 监控前一个节点 zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener); // 2.3 在监听的通知没来之前,该线程应该是等待状态,可以再次判断上一个节点是否还存在 if (zkClient.exists(beforeNodePath)) { // 2.3.1 开始等待,CountDownLatch 线程同步计数器 countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); // 2.3.2 阻塞,直到countDownLatch值变为0 } catch (InterruptedException e) { e.printStackTrace(); } } // 2.4 解除监听 zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener); } // 4. 释放锁 public void deleteLock(){ if (zkClient != null) { zkClient.delete(currentNodePath); zkClient.close(); } }}
▷ 测试结果
Thread-6 : 获取锁失败,进入等待状态Thread-2 : 获取锁失败,进入等待状态Thread-4 : 获取到了锁Thread-7 : 获取锁失败,进入等待状态Thread-9 : 获取锁失败,进入等待状态Thread-3 : 获取锁失败,进入等待状态Thread-8 : 获取锁失败,进入等待状态Thread-0 : 获取锁失败,进入等待状态Thread-5 : 获取锁失败,进入等待状态Thread-1 : 获取锁失败,进入等待状态Thread-7 : 获取到了锁Thread-2 : 获取到了锁Thread-6 : 获取到了锁Thread-3 : 获取到了锁Thread-9 : 获取到了锁Thread-8 : 获取到了锁Thread-0 : 获取到了锁Thread-5 : 获取到了锁Thread-1 : 获取到了锁
▷ 分布式锁的实现可以是 Redis、Zookeeper⭐️ 相对来说生产环境如果使用分布式锁可以考虑使用Redis实现而非Zk(Redis性能更高)
Hadoop HA
HA 概述
▷ 所谓HA(High Available),即高可用(7*24小时不中断服务),实现高可用最关键的策略是消除单点故障▷ Hadoop-HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA ▷ Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)▷ NameNode主要在以下两个方面影响HDFS集群 ① NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启 ② NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用▷ HDFS HA功能通过配置 Active/Standby 两个NameNodes实现在集群中对NameNode的热备来解决上述问题▷ 如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器
HDFS-HA 工作机制
通过双NameNode消除单点故障(Active/Standby)
HDFS-HA工作要点
① 元数据管理方式需要改变
▷ 内存中各自保存一份元数据▷ Edits日志只有Active状态的NameNode节点可以做【写操作】▷ 两个NameNode都可以【读取】Edits▷ 共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现)
② 需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生(集群中出现两个Active的Namenode)
③ 必须保证两个NameNode之间能够ssh无密码登录
④ 隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务
HDFS-HA工作机制
配置部署HDFS-HA进行自动故障转移
元数据管理方式
▷ 两个NameNode,一个active,一个standby。正常情况下,对外提供服务的是active NameNode▷ 两个NN内部都有一份元数据、Edits文件、FsImage文件▷ ZKFC,由它来监控NameNode的健康状况▷ ZK保证两个NameNode中的元数据一致: 构建一个基于ZK(开源)实现的小的分布式文件系统(qjournal)管理edits文件,便于数据读取和写入此文件系统▷ Active NameNode 对外服务时,当有记录写入Edits文件,同时会写一份到qjournal,接收activeNN写入的数据▷ Standby NameNode 可以访问qjournal,拉取Edits文件▷ 配合单个NameNode节点的元数据维护方式,保证activeNN和standbyNN的元数据一致
双NameNode自动故障转移
▷ (简单理解)两个NameNode都在ZK上注册,在某个节点下创建节点,同一个节点只有一个NN会创建成功,创建成功的NN成为activeNN,创建不成功则监听该节点,如果activeNN宕机,则standby会被通知到▷ (实际实现)ZKFC负责检测activeNN和standbyNN在ZK上的状态;如果发现activeNN的ZKFC进程在ZK上的角色不太正常,会在本机进行检测;如果确认已经死亡或者宕机,会通知给standbyNN上的ZKFC,通知standbyNN转正,即去往ZK上告知自己成为activeNN;为了确保万无一失(避免出现brain split脑裂问题),ZKFC会使用ssh远程免密登陆的方式登陆activeNN所在的节点,直接执行kill -9 的命令,杀死原来的activeNN(因为有的NN会出现假死现象)
自动故障转移为HDFS部署增加了两个新组件
① ZooKeeper
⭐️ ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务HA的自动故障转移依赖于ZooKeeper的以下功能:▷ 故障检测:集群中的每个NameNode在ZooKeeper中维护了一个临时会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移▷ 现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode
② ZKFailoverController(ZKFC)进程
⭐️ ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:▷ 健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的▷ ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除▷ 基于ZooKeeper的选择:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已被成功选择,并负责运行故障转移进程以使它的本地NameNode为Active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为Active状态
HDFS-HA集群配置(官方文档:
集群规划
linux121
☑️
☑️
☑️
☑️
☑️
linux122
☑️
☑️
☑️
☑️
☑️
☑️
linux123
☑️
☑️
☑️
☑️
▷ 环境准备
1. 修改IP2. 修改主机名及主机名和IP地址的映射3. 关闭防火墙4. ssh免密登录5. 安装JDK,配置环境变量等
▷ 启动Zookeeper集群,查看状态
sh /zk.sh start
sh /zk.sh status
▷ 配置HDFS-HA集群
▷ 停止原先HDFS集群
stop-dfs.sh
▷ 在所有节点,/opt/lagou/servers目录下创建一个ha文件夹
mkdir /opt/lagou/servers/ha
▷ 将/opt/lagou/servers/目录下的 hadoop-2.9.2拷贝到ha目录下
cp -r /opt/lagou/servers/hadoop-2.9.2 /opt/lagou/servers/ha
▷ 删除原集群data目录
rm -rf /opt/lagou/servers/ha/hadoop-2.9.2/data
▷ 配置hdfs-site.xml
vi /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/hdfs-site.xml
<property> <name>dfs.nameservices</name> <value>lagoucluster</value> </property> <property> <name>dfs.ha.namenodes.lagoucluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.lagoucluster.nn1</name> <value>linux121:9000</value> </property> <property> <name>dfs.namenode.rpc-address.lagoucluster.nn2</name> <value>linux122:9000</value> </property> <property> <name>dfs.namenode.http-address.lagoucluster.nn1</name> <value>linux121:50070</value> </property> <property> <name>dfs.namenode.http-address.lagoucluster.nn2</name> <value>linux122:50070</value> </property> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://linux121:8485;linux122:8485;linux123:8485/lagou</value> </property> <property> <name>dfs.client.failover.proxy.provider.lagoucluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProv ider</value> </property> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/opt/journalnode</value> </property> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property>
qjournal通信端口:8485
▷ 配置core-site.xml
vi /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/core-site.xml
<property> <name>fs.defaultFS</name> <value>hdfs://lagoucluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/lagou/servers/ha/hadoop-2.9.2/data/tmp</value> </property> <property> <name>ha.zookeeper.quorum</name> <value>linux121:2181,linux122:2181,linux123:2181</value> </property>
qjournal通信端口:8485
▷ 分发配置好的hadoop环境到其他节点
rsync-script /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/hdfs-site.xmlrsync-script /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/core-site.xml
▷ 启动HDFS-HA集群
① JournalNode(三个节点都启动)
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start journalnode
② 在[nn1]上,对其进行格式化,并启动NameNode
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -format/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode
③ 在[nn2]上,同步nn1的元数据信息
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -bootstrapStandby
④ 在[nn1]上初始化zkfc
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs zkfc -formatZK
⑤ 在[nn1]上,启动集群
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-dfs.sh
[root@linux121 ~]# /opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-dfs.shStarting namenodes on [linux121 linux122]linux121: namenode running as process 10269. Stop it first.linux122: starting namenode, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-namenode-linux122.outlinux121: starting datanode, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-datanode-linux121.outlinux123: starting datanode, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-datanode-linux123.outlinux122: starting datanode, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-datanode-linux122.outStarting journal nodes [linux121 linux122 linux123]linux123: journalnode running as process 10231. Stop it first.linux122: journalnode running as process 10074. Stop it first.linux121: journalnode running as process 10135. Stop it first.Starting ZK Failover Controllers on NN hosts [linux121 linux122]linux121: starting zkfc, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-zkfc-linux121.outlinux122: starting zkfc, logging to /opt/lagou/servers/ha/hadoop-2.9.2/logs/hadoop-root-zkfc-linux122.out
⑥ 验证
'linux121:9000' (active)
'linux122:9000' (standby)
▷ 将Active NameNode进程kill
kill -9 activeNN的PID
(dead)
'linux122:9000' (active)
▷ 启动[nn1]上的NameNode
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode
'linux121:9000' (standby)
'linux122:9000' (active)
[root@linux121 ~]# jpsxxxx JournalNodexxxx DFSZKFailoverControllerxxxx QuorumPeerMainxxxx NameNodexxxx DataNode
[root@linux122 ~]# jpsxxxx JournalNodexxxx DFSZKFailoverControllerxxxx QuorumPeerMainxxxx NameNodexxxx DataNode
[root@linux123 ~]# jpsxxxx JournalNodexxxx QuorumPeerMainxxxx DataNode
YARN-HA工作机制(实现故障转移即可)
YARN-HA配置(官方文档:
集群规划
linux121
☑️
☑️
☑️
☑️
☑️
linux122
☑️
☑️
☑️
☑️
☑️
☑️
linux123
☑️
☑️
☑️
☑️
☑️(⭐️)
▷ 环境准备
1. 修改IP2. 修改主机名及主机名和IP地址的映射3. 关闭防火墙4. ssh免密登录5. 安装JDK,配置环境变量等6. 配置Zookeeper集群
▷ 配置yarn-site.xml
vi /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!--启用resourcemanager ha--> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!--声明两台resourcemanager的地址--> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster-yarn</value> </property><property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property><property> <name>yarn.resourcemanager.hostname.rm1</name> <value>linux122</value> </property><property> <name>yarn.resourcemanager.hostname.rm2</name> <value>linux123</value> </property> <!--指定zookeeper集群的地址--> <property> <name>yarn.resourcemanager.zk-address</name> <value>linux121:2181,linux122:2181,linux123:2181</value> </property> <!--启用自动恢复--> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property>
▷ 分发配置信息
rsync-script /opt/lagou/servers/ha/hadoop-2.9.2/etc/hadoop/yarn-site.xml
▷ 启动yarn集群
[root@linux123 ~]# /opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-yarn.sh[root@linux122 ~]# /opt/lagou/servers/ha/hadoop-2.9.2/sbin/yarn-daemon.sh start resourcemanager
▷ 验证
linux122自动跳转到linux123上
http://linux123:8088/cluster
▷ 将Active RM进程kill
kill -9 activeRM的PID
http://linux122:8088/cluster
(dead)
▷ 启动[nn1]上的NameNode
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/yarn-daemon.sh start resourcemanager
http://linux122:8088/cluster
linux123自动跳转到linux122上