导图社区 zookeeper
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
编辑于2021-08-18 17:55:00zookeeper
使用时机
配置中心
推拉结合,解决到处修改配置,一次就行
注册中心
解决单点问题
分布式锁
解决分布式同步
分布式队列
削峰填谷
负载均衡
减轻单点压力
安装
官网下载
解压到没空格和中文的路径下
修改配置文件
zookeeper下创建data和log文件
conf下复制zoo_sample.cfg并改为zoo.cfg
指定保存的目录,zoo.cfg修改
data
dataDir=../data
log
dataLogDir=../log
启动
bin
zkServer.cmd
zkCli.cmd
右键选择属性,取消快速编辑和插入模式
节点分类
第一类
持久性
临时性
第二类,顺序性
持久顺序性
临时顺序性
客户端命令
查询所有
help
跟踪路径下的节点
ls /zookeeper
创建永久节点
带序号
create -s /hello "xxx"
普遍
create /app1 "xxx"
创建临时节点
普通
create -e /app3 'app3'
带序号
create -e -s /app 'app'
查询
get /app1
修改
set /app1 'hello'
删除
普通
delete /hello
递归
rmr /hello
查看节点状态
stat /zookeeper
常用java API
导入依赖
<!--zookeeper的依赖--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.7</version> </dependency> <!-- zookeeper CuratorFramework 是Netflix公司开发一款连接zookeeper服务的框架,通过封装的一套高级API 简化了ZooKeeper的操作,提供了比较全面的功能,除了基础的节点的操作,节点的监听,还有集群的连接以及重试。--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <!--测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
创建节点
/** * RetryPolicy: 失败的重试策略的公共接口 * ExponentialBackoffRetry是 公共接口的其中一个实现类 * 参数1: 初始化sleep的时间,用于计算之后的每次重试的sleep时间 * 参数2:最大重试次数 参数3(可以省略):最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3,10); //创建客户端 /** * 参数1:连接的ip地址和端口号 * 参数2:会话超时时间,单位毫秒 * 参数3:连接超时时间,单位毫秒 * 参数4:失败重试策略 */ CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",3000,1000,retryPolicy); //开启客户端(会阻塞到会话连接成功为止) client.start(); /** * 创建节点 */ //1. 创建一个空节点(a)(只能创建一层节点) // client.create().forPath("/a"); //2. 创建一个有内容的b节点(只能创建一层节点) // client.create().forPath("/b", "这是b节点的内容".getBytes()); //3. 创建多层节点 // (creatingParentsIfNeeded)是否需要递归创建节点 // withMode(CreateMode.PERSISTENT) 创建持久性 b节点 // client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/g"); //4. 创建带有的序号的节点 // client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/e"); //5. 创建临时节点(客户端关闭,节点消失),设置延时5秒关闭 // client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/f"); //6. 创建临时带序号节点(客户端关闭,节点消失),设置延时5秒关闭 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/f"); Thread.sleep(5000); //关闭客户端 client.close();
修改节点
//创建失败策略对象 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); // CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",1000,1000,retryPolicy); client.start(); //修改节点 client.setData().forPath("/a/b", "abc".getBytes()); client.close();
节点数据查询
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1",1000,1000, retryPolicy); client.start(); // 查询节点数据 byte[] bytes = client.getData().forPath("/a/b"); System.out.println(new String(bytes));
删除节点
//重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); //创建客户端 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1",1000,1000, retryPolicy); //启动客户端 client.start(); //删除一个子节点 client.delete().forPath("/a"); // 删除节点并递归删除其子节点 client.delete().deletingChildrenIfNeeded().forPath("/a"); //强制保证删除一个节点 //只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。 // 比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。 client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/a"); //关闭客户端 client.close();
watch机制
NodeCache
//创建重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); //创建客户端 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); //开启客户端 client.start(); System.out.println("连接成功"); //创建节点数据监听对象 final NodeCache nodeCache = new NodeCache(client, "/hello"); //开始缓存 /** * 参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608 , data=[97, 98, 99, 100, 101]} * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null */ nodeCache.start(true); System.out.println(nodeCache.getCurrentData()); //添加监听对象 nodeCache.getListenable().addListener(new NodeCacheListener() { //如果节点数据有变化,会回调该方法 public void nodeChanged() throws Exception { String data = new String(nodeCache.getCurrentData().getData()); System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath() + ":data=" + data); } }); System.in.read();
PathChildrenCache
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); client.start(); //监听指定节点的子节点变化情况包括新增子节点 子节点数据变更 和子节点删除 //true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())ͺ如果为false 则无法取到数据内容(即:event.getData().getData()) PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true); /** * NORMAL: 普通启动方式, 在启动时缓存子节点数据 * POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化 * BUILD_INITIAL_CACHE: 在启动时什么都不会输出 * 在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。 */ childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); System.out.println(childrenCache.getCurrentData()); //添加监听 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){ System.out.println("子节点更新"); System.out.println("节点:"+event.getData().getPath()); System.out.println("数据" + new String(event.getData().getData())); }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){ System.out.println("初始化操作"); }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){ System.out.println("删除子节点"); System.out.println("节点:"+event.getData().getPath()); System.out.println("数据" + new String(event.getData().getData())); }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){ System.out.println("添加子节点"); System.out.println("节点:"+event.getData().getPath()); System.out.println("数据" + new String(event.getData().getData())); }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){ System.out.println("连接失效"); }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){ System.out.println("重新连接"); }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){ System.out.println("连接失效后稍等一会儿执行"); } } }); System.in.read(); // 使线程阻塞
TreeCache
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); client.start(); TreeCache treeCache = new TreeCache(client,"/hello"); treeCache.start(); System.out.println(treeCache.getCurrentData("/hello")); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){ System.out.println(event.getData().getPath() + "节点添加"); }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){ System.out.println(event.getData().getPath() + "节点移除"); }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){ System.out.println(event.getData().getPath() + "节点修改"); }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){ System.out.println("初始化完成"); }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){ System.out.println("连接过时"); }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){ System.out.println("重新连接"); }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){ System.out.println("连接过时一段时间"); } } }); System.in.read();