导图社区 Zookeeper入门知识点学习方案
Zookeeper入门知识点学习方案:Zoopkeeper提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,树的数据结构,为分布式系统的沟通调度桥梁。
编辑于2022-11-08 09:32:22 广东Zookeeper入门知识点学习方案
入门简介
是什么
官方版
ZooKeeper顾名思义:动物园管理员  它是拿来管大象(Hadoop)、蜜蜂(Hive)、小猪(Pig)的管理员, Apache Hbase和Apache Solr以及阿里的Dubbo等项目中都采用到了Zookeeper。 一句话:ZooKeeper是一个分布式的、高性能的,开源的分布式系统的协调(Coordination)服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件。
Google的三篇论文 一直是分布式领域 传阅的经典
MapReduce:Hadoop
GFS:HDFS
BigTable:HBase
这三篇论文里都提及Google的一个 lock service---Chubby,于是有了Zookeeper。
设计模式来理解
是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据, 然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在 Zookeeper上注册的那些观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式
一句话
zookeeper=文件系统+通知机制
能干嘛
命名服务
配置维护
集群管理
分布式消息同步和协调机制
负载均衡
对Dubbo的支持
备注
Zoopkeeper提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构, 并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型,作为分布式系统 的沟通调度桥梁
去哪下
官网
首页
https://zookeeper.apache.org/
下载
https://zookeeper.apache.org/releases.html#download 
怎么玩
统一命名服务(Name Service如Dubbo服务注册中心)
Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,000,000,000+次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。 在Dubbo实现中: 服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址, 这个操作就完成了服务的发布。 服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。 注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。
配置管理(Configuration Management如淘宝开源配置管理框架Diamond)
在大型的分布式系统中,为了服务海量的请求,同一个应用常常需要多个实例。如果存在配置更新的需求,常常需要逐台更新,给运维增加了很大的负担同时带来一定的风险(配置会存在不一致的窗口期,或者个别节点忘记更新)。zookeeper可以用来做集中的配置管理,存储在zookeeper集群中的配置,如果发生变更会主动推送到连接配置中心的应用节点,实现一处更新处处更新的效果  现在把这些配置全部放到zookeeper上去,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。
集群管理(Group Membership 如Hadoop分布式集群管理)
Java操作API
安装配置
Linux下安装
1. 官网下载安装包,本次版本zookeeper-3.4.9.tar.gz
2. 拷贝进入到/opt目录下并解压

3. 新建专属zookeeper目录,mkdir /myzookeeper, 随后将上一步解压的zookeeper内容拷贝进/myzookeeper目录内

4. 进入conf文件夹,拷贝zoo_sample.cfg改为zoo.cfg
 
5. zoo.cfg解读
tickTime
tickTime:通信心跳数,Zookeeper服务器心跳时间,单位毫秒 Zookeeper使用的基本时间, 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,时间单位为毫秒。 它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间.(session的最小超时时间是2*tickTime。)
initLimit
LF初始通信时限 集群中的follower跟随者服务器(F)与leader领导者服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。 投票选举新leader的初始化时间 Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。 Leader允许F在 initLimit 时间内完成这个工作。
syncLimit
syncLimit:LF同步通信时限 集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime, Leader认为Follwer死掉,从服务器列表中删除Follwer。 # 在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。 如果L发出心跳包在syncLimit之后,还没有从F那收到响应,那么就认为这个F已经不在线了。
dataDir
dataDir:数据文件目录+数据持久化路径 保存内存数据库快照信息的位置,如果没有其他说明,更新的事务日志也保存到数据库。
clientPort
clientPort:客户端连接端口 监听客户端连接的端口。
6. 开启服务+客户端连接
启动+关闭服务
/myzookeeper/zookeeper-3.4.9/bin路径下 
echo ruok | nc 127.0.0.1 2181
客户端连接
连接:./zkCli.sh 退出:quit 
退出quit
永远的HelloWorld
查看+获得zookeeper服务器上的数据存储信息

文件系统
Zookeeper维护一个类似文件系统的数据结构
所使用的数据模型风格很像文件系统的目录树结构,简单来说,有点类似windows中注册表的结构, 有名称, 有树节点, 有Key(键)/Value(值)对的关系, 可以看做一个树形结构的数据库,分布在不同的机器上做名称管理。
初识znode节点
ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode 很显然zookeeper集群自身维护了一套数据结构。这个存储结构是一个树形结构,其上的每一个节点,我们称之为"znode",每一个znode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识 
数据模型/znode节点深入
Znode的数据模型
是什么
Znode维护了一个stat结构,这个stat包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让Zookeeper验证缓存和协调更新。每次znode的数据发生了变化,版本号就增加。 例如,无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败。
ZooKeeper的Stat结构体

czxid- 引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id)
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。 事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
ctime - znode被创建的毫秒数(从1970年开始)
mzxid - znode最后更新的zxid
mtime - znode最后修改的毫秒数(从1970年开始)
pZxid-znode最后更新的子节点zxid
cversion - znode子节点变化号,znode子节点修改次数
dataversion - znode数据变化号
aclVersion - znode访问控制列表的变化号
ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
dataLength- znode的数据长度
numChildren - znode子节点数量
小总结
zookeeper内部维护了一套类型UNIX的树形数据结构: 由znode构成的集合, znode的集合又是一个树形结构, 每一个znode又有很多属性进行描述。 Znode = nodeValue + Stat
znode中的存在类型
PERSISTENT-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
持久/临时
 znode是由客户端创建的,它和创建它的客户端的内在联系,决定了它的存在性: PERSISTENT-持久化节点:创建这个节点的客户端在与zookeeper服务的连接断开后,这个节点也不会被删除(除非您使用API强制删除)。 PERSISTENT_SEQUENTIAL-持久化顺序编号节点:当客户端请求创建这个节点A后,zookeeper会根据parent-znode的zxid状态,为这个A节点编写一个全目录唯一的编号(这个编号只会一直增长)。当客户端与zookeeper服务的连接断开后,这个节点也不会被删除。 EPHEMERAL-临时目录节点:创建这个节点的客户端在与zookeeper服务的连接断开后,这个节点(还有涉及到的子节点)就会被删除。 EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:当客户端请求创建这个节点A后,zookeeper会根据parent-znode的zxid状态,为这个A节点编写一个全目录唯一的编号(这个编号只会一直增长)。当创建这个节点的客户端与zookeeper服务的连接断开后,这个节点被删除。 另外,无论是EPHEMERAL还是EPHEMERAL_SEQUENTIAL节点类型,在zookeeper的client异常终止后,节点也会被删除
基础命令和Java客户端操作
zkCli的常用命令操作
一句话:和redis的KV键值对类似,只不过key变成了一个路径节点值,v就是data
Zookeeper表现为一个分层的文件系统目录树结构 不同于文件系统之处在于:zk节点可以有自己的数据,而unix文件系统中的目录节点只有子节点  一个节点对应一个应用,节点存储的数据就是应用需要的配置信息。
常用命令
help

ls
使用 ls 命令来查看当前znode中所包含的内容
ls2
查看当前节点数据并能看到更新次数等数据
stat
查看节点状态
set
设置节点的具体值

set 节点 value值
get
获得节点的值
get 节点
create
普通创建
-s
含有序列
-e
临时(重启或者超时消失)
delete
rmr
四字命令
是什么
zookeeper支持某些特定的四字命令,他们大多是用来查询ZK服务的当前状态及相关信息的, 通过telnet或nc向zookeeper提交相应命令,如:echo ruok | nc 127.0.0.1 2181 运行公式:echo 四字命令 | nc 主机IP zookeeper端口 
常用
ruok:测试服务是否处于正确状态。如果确实如此,那么服务返回“imok ”,否则不做任何相应
stat:输出关于性能和连接的客户端的列表
conf:输出相关服务配置的详细信息
cons:列出所有连接到服务器的客户端的完全的连接 /会话的详细信息。包括“接受 / 发送”的包数量、会话id 、操作延迟、最后的操作执行等等信息
dump:列出未经处理的会话和临时节点
envi:输出关于服务环境的详细信息(区别于conf命令)
reqs:列出未经处理的请求
wchs:列出服务器watch的详细信息
wchc:通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表
wchp:通过路径列出服务器 watch的详细信息。它输出一个与 session相关的路径
小总结

Java客户端操作
Maven工程和配置POM
projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> modelVersion>4.0.0modelVersion> groupId>com.atguigu.mallgroupId> artifactId>zk0919artifactId> version>0.0.1-SNAPSHOTversion> packaging>jarpackaging> name>zk0919name> url>http://maven.apache.orgurl> properties> project.build.sourceEncoding>UTF-8project.build.sourceEncoding> properties> dependencies> dependency> groupId>com.101tecgroupId> artifactId>zkclientartifactId> version>0.10version> dependency> dependency> groupId>org.apache.zookeepergroupId> artifactId>zookeeperartifactId> version>3.4.9version> dependency> dependency> groupId>log4jgroupId> artifactId>log4jartifactId> version>1.2.17version> dependency> dependency> groupId>junitgroupId> artifactId>junitartifactId> version>4.9version> scope>testscope> dependency> dependencies> project>
log4j.xml
xmlversion="1.0"encoding="UTF-8"?> DOCTYPElog4j:configurationSYSTEM"log4j.dtd"> log4j:configurationxmlns:log4j="http://jakarta.apache.org/log4j/"> appendername="log.console"class="org.apache.log4j.ConsoleAppender"> layoutclass="org.apache.log4j.PatternLayout"> paramname="ConversionPattern"value="%d{HH:mm:ss,SSS} %5p (%C{1}:%M) - %m%n"/> layout> appender> appendername="log.file"class="org.apache.log4j.DailyRollingFileAppender"> paramname="File"value="D:\\atguigu4XML.log"/> paramname="Append"value="true"/> paramname="DatePattern"value="'.'yyyy-MM-dd"/> layoutclass="org.apache.log4j.PatternLayout"> paramname="ConversionPattern"value="%d{HH:mm:ss,SSS} %5p (%C{1}:%M) - %m%n"/> layout> appender> root> levelvalue="info"/> appender-refref="log.console"/> appender-refref="log.file"/> root> log4j:configuration>
Code
package com.atguigu.zk3; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class HelloZK { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu"; private static final int SESSION_TIMEOUT = 50*1000; public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { } }); } public void stopZK(ZooKeeper zk) throws InterruptedException { if(zk != null) { zk.close(); } } public void createZNode(ZooKeeper zk,String path,String nodeValue) throws KeeperException, InterruptedException { zk.create(path,nodeValue.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } public String getZNode(ZooKeeper zk,String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path, false, new Stat()); return new String(byteArray); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { HelloZK hello = new HelloZK(); ZooKeeper zk = hello.startZK(); Stat stat = zk.exists(PATH, false); if(stat == null) { hello.createZNode(zk, PATH, "zk1014"); String result = hello.getZNode(zk, PATH); System.out.println("**********result: "+result); }else{ System.out.println("***********znode has already ok***********"); } hello.stopZK(zk); } }
通知机制
Session
是什么
客户端使用某种语言绑定创建一个到服务的句柄时,就建立了一个ZooKeeper会话。会话创建后,句柄处于CONNECTING状态,客户端库会试图连接到组成ZooKeeper服务的某个服务器;连接成功则进入到CONNECTED状态。通常操作中句柄将处于这两个状态之一。如果发生不可恢复的错误,如会话过期、身份鉴定失败,或者应用显式关闭,则句柄进入到CLOSED状态。下图显式了ZooKeeper客户端可能的状态转换: 
watch
通知机制
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。
是什么
观察者的功能
ZooKeeper 支持watch(观察)的概念。客户端可以在每个znode结点上设置一个观察。如果被观察服务端的znode结点有变更,那么watch就会被触发,这个watch所属的客户端将接收到一个通知包被告知结点已经发生变化,把相应的事件通知给设置Watcher的Client端。 Zookeeper里的所有读取操作:getData(),getChildren()和exists()都有设置watch的选项
一句话:异步回调的触发机制
watch事件理解

1. 一次触发
当数据有了变化时zkserver向客户端发送一个watch,它是一次性的动作,即触发一次就不再有效,类似一次性纸杯。 只监控一次 如果想继续Watch的话,需要客户端重新设置Watcher。因此如果你得到一个watch事件且想在将来的变化得到通知,必须新设置另一个watch。
2. 发往客户端
Watches是异步发往客户端的,Zookeeper提供一个顺序保证:在看到watch事件之前绝不会看到变化,这样不同客户端看到的是一致性的顺序。 在(导致观察事件被触发的)修改操作的成功返回码到达客户端之前,事件可能在去往客户端的路上,但是可能不会到达客户端。观察事件是异步地发送给观察者(客户端)的。ZooKeeper会保证次序:在收到观察事件之前,客户端不会看到已经为之设置观察的节点的改动。网络延迟或者其他因素可能会让不同的客户端在不同的时间收到观察事件和更新操作的返回码。这里的要点是:不同客户端看到的事情都有一致的次序。
3. 为数据设置watch
节点有不同的改动方式。可以认为ZooKeeper维护两个观察列表:数据观察和子节点观察。getData()和exists()设置数据观察。getChildren()设置子节点观察。此外,还可以认为不同的返回数据有不同的观察。getData()和exists()返回节点的数据,而getChildren()返回子节点列表。所以,setData()将为znode触发数据观察。成功的create()将为新创建的节点触发数据观察,为其父节点触发子节点观察。成功的delete()将会为被删除的节点触发数据观察以及子节点观察(因为节点不能再有子节点了),为其父节点触发子节点观察。 观察维护在客户端连接到的ZooKeeper服 务器中。这让观察的设置、维护和分发是轻量级的。客户端连接到新的服务器时,所有会话事件将被触发。同服务器断开连接期间不会收到观察。客户端重新连接 时,如果需要,先前已经注册的观察将被重新注册和触发。通常这都是透明的。有一种情况下观察事件将丢失:对还没有创建的节点设置存在观察,而在断开连接期 间创建节点,然后删除。
4. 时序性和一致性
Watches是在client连接到Zookeeper服务端的本地维护,这可让watches成为轻量的,可维护的和派发的。当一个client连接到新server,watch将会触发任何session事件,断开连接后不能接收到。当客户端重连,先前注册的watches将会被重新注册并触发。 关于watches,Zookeeper维护这些保证: (1)Watches和其他事件、watches和异步恢复都是有序的。Zookeeper客户端保证每件事都是有序派发 (2)客户端在看到新数据之前先看到watch事件 (3)对应更新顺序的watches事件顺序由Zookeeper服务所见
5. 变化备注
data watches and child watches.
watch之数据变化
watch之节点变化
code
一次性
package com.atguigu.zk3; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class WatchOne { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu"; private static final int SESSION_TIMEOUT = 50*1000; //定义实例变量 private ZooKeeper zk = null; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { } }); } public void stopZK() throws InterruptedException { if(zk != null) { zk.close(); } } public void createZNode(String path,String nodeValue) throws KeeperException, InterruptedException { zk.create(path,nodeValue.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } public String getZNode(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); return new String(byteArray); } public String triggerValue(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,false, new Stat()); String retValue = new String(byteArray); System.out.println("**************triggerValue: "+retValue); return retValue; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { WatchOne watchOne = new WatchOne(); watchOne.setZk(watchOne.startZK()); if(watchOne.getZk().exists(PATH, false) == null) { watchOne.createZNode(PATH,"BBB"); System.out.println("**********************>: "+watchOne.getZNode(PATH)); Thread.sleep(Long.MAX_VALUE); }else{ System.out.println("i have znode"); } } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } }
多次(命名服务)
package com.atguigu.zk3; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class WatchMoreTest { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu"; private static final int SESSION_TIMEOUT = 50*1000; //定义实例变量 private ZooKeeper zk = null; private String lastValue = ""; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) {} }); } public void stopZK() throws InterruptedException { if(zk != null) { zk.close(); } } public void createZNode(String path,String nodeValue) throws KeeperException, InterruptedException { zk.create(path,nodeValue.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } public String getZNode(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); return new String(byteArray); } public boolean triggerValue(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); String newValue = new String(byteArray); if(lastValue.equals(newValue)) { System.out.println("there is no change~~~~~~~~"); return false; }else{ System.out.println("lastValue: "+lastValue+"\t"+"newValue: "+newValue); this.lastValue = newValue; return true; } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { WatchMoreTest watch = new WatchMoreTest(); watch.setZk(watch.startZK()); if(watch.getZk().exists(PATH, false) == null) { String initValue = "0000"; watch.setLastValue(initValue); watch.createZNode(PATH,initValue); System.out.println("**********************>: "+watch.getZNode(PATH)); Thread.sleep(Long.MAX_VALUE); }else{ System.out.println("i have znode"); } } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public String getLastValue() { return lastValue; } public void setLastValue(String lastValue) { this.lastValue = lastValue; } }
子节点
package com.atguigu.zk3; import java.io.IOException; import java.util.List; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class WatchChildNode { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu"; private static final int SESSION_TIMEOUT = 50*1000; //定义实例变量 private ZooKeeper zk = null; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(PATH)) { showChildLists(PATH); }else{ showChildLists(PATH); } } }); } public void stopZK() throws InterruptedException { if(zk != null) { zk.close(); } } public void showChildLists(String path) { List list = null; try { list = zk.getChildren(PATH, true); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } System.out.println("**********showChildLists: "+list); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { WatchChildNode watch = new WatchChildNode(); watch.setZk(watch.startZK()); Thread.sleep(Long.MAX_VALUE); } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } }
Zookeeper集群
是什么
Zookeeper分为2个部分:服务器端和客户端,客户端只连接到整个ZooKeeper服务的某个服务器上。客户端使用并维护一个TCP连接,通过这个连接发送请求、接受响应、获取观察的事件以及发送心跳。如果这个TCP连接中断,客户端将尝试连接到另外的ZooKeeper服务器。客户端第一次连接到ZooKeeper服务时,接受这个连接的 ZooKeeper服务器会为这个客户端建立一个会话。当这个客户端连接到另外的服务器时,这个会话会被新的服务器重新建立。
伪分布式单机配置
说明
服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口) 这个配置项的书写格式比较特殊,规则如下:server.N=YYY:A:B 其中, N表示服务器编号, YYY表示服务器的IP地址, A为LF通信端口,表示该服务器与集群中的leader交换的信息的端口。 B为选举端口,表示选举新leader时服务器间相互通信的端口(当leader挂掉时,其余服务器会相互通信,选择出新的leader) 一般来说,集群中每个服务器的A端口都是一样,每个服务器的B端口也是一样。 下面是一个集群的例子: server.0=233.34.9.144:2008:6008 server.1=233.34.9.145:2008:6008 server.2=233.34.9.146:2008:6008 server.3=233.34.9.147:2008:6008 但是当所采用的为伪集群时,IP地址都一样,只能时A端口和B端口不一样。 下面是一个伪集群的例子: server.0=127.0.0.1:2008:6008 server.1=127.0.0.1:2007:6007 server.2=127.0.0.1:2006:6006 server.3=127.0.0.1:2005:6005
initLimit 是Zookeeper用它来限定集群中的Zookeeper服务器连接到Leader的时限
syncLimit 限制了follower服务器与leader服务器之间请求和应答之间的时限
配置步骤
1. zookeeper-3.4.9.tar.gz解压后拷贝到/myzookeeper目录下并重新名为zk01,再复制zk01形成zk02、zk03,共计3份
2. 进入zk01/02/03分别新建文件夹

mydata
mylog
3. 分别进入zk01-zk03各自的conf文件夹
新建zoo.cfg

4. 编辑zoo.cfg
zk01  zk02  zk03...... server.1=127.0.0.1:2991:3991 server.2=127.0.0.1:2992:3992 server.3=127.0.0.1:2993:3993
设置自己的数据和log路径
dataDir=/myzookeeper/zk01/mydata
dataLogDir=/myzookeeper/zk01/mylog
修改各自的clientPort
在最后面添加server的列表
5. 在各自mydata下面创建myid的文件,在里面写入server的数字

6. 分别启动三个服务器

7. zkCli连接server,带参数指定-server
 2191/2192/2193任意用客户端链接一台,会发现只需要有一个改变了,整个集群的内容自动一致性同步。
常见应用案例
命名服务(NameService)
前面所演示的每个nodepath就是一个命名服务
一句话path就是命名服务,客户端能够根据指定节点的名字来获取资源或者服务地址,然后进行下一步操作
软负载均衡
package com.atguigu.zk3; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class BalanceTest { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final int SESSION_TIMEOUT = 50*1000; private static final String PATH = "/atguigu"; private static final String SUB_PREFIX = "sub"; //定义实例变量 private ZooKeeper zk = null; private int subCount = 5; private List serviceNodeLists = new ArrayList(); private int serviceNum = 0; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { try { serviceNodeLists = zk.getChildren(PATH, true); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); } public String dealRequest() throws KeeperException, InterruptedException { serviceNum = serviceNum +1; for (int i = serviceNum; i { if(serviceNodeLists.contains(SUB_PREFIX+serviceNum)) { return new String(zk.getData(PATH+"/"+SUB_PREFIX+serviceNum, false, new Stat())); }else{ serviceNum = serviceNum +1; } } for (int i = 1; i { if(serviceNodeLists.contains(SUB_PREFIX+i)) { serviceNum = i; return new String(zk.getData(PATH+"/"+SUB_PREFIX+serviceNum, false, new Stat())); } } return "null node~~~~~"; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { BalanceTest test = new BalanceTest(); test.setZk(test.startZK()); Thread.sleep(3000); String result = null; //以轮询的方式访问15次,共计5个节点来应付实现负载均衡 for (int i = 1; i { result = test.dealRequest(); System.out.println("****loop:"+i+"\t"+test.serviceNum+"\t"+result); Thread.sleep(2000); } } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } }
分布式通知和协调作用
A
package com.atguigu.zk3; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class ClientA { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu/distributed"; private static final int SESSION_TIMEOUT = 50*1000; //定义实例变量 private ZooKeeper zk = null; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) {} }); } public String getZNode(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); return new String(byteArray); } public boolean triggerValue(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); String newValue = new String(byteArray); if("ClientA".equals(newValue)) { System.out.println("AAA System is run......"); } return false; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ClientA test = new ClientA(); test.setZk(test.startZK()); System.out.println("************: "+test.getZNode(PATH)); Thread.sleep(Long.MAX_VALUE); } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } }
B
package com.atguigu.zk3; import java.io.IOException; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class ClientB { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); //定义常量 private static final String CONNECTSTRING = "192.168.67.167:2181"; private static final String PATH = "/atguigu/distributed"; private static final int SESSION_TIMEOUT = 50*1000; //定义实例变量 private ZooKeeper zk = null; //以下为业务方法 public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) {} }); } public String getZNode(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); return new String(byteArray); } public boolean triggerValue(String path) throws KeeperException, InterruptedException { byte[] byteArray = zk.getData(path,new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); String newValue = new String(byteArray); if("ClientB".equals(newValue)) { System.out.println("BBBBBBBBBB System is run......"); } return false; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ClientB test = new ClientB(); test.setZk(test.startZK()); System.out.println("************: "+test.getZNode(PATH)); Thread.sleep(Long.MAX_VALUE); } //setter---getter public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } }