ZooKeeper Java Api
ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。
ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。
很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。
ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。
ZooKeeper 官方客户端
ZooKeeper 客户端简介
客户端和服务端交互遵循以下基本步骤:
- 客户端连接 ZooKeeper 服务端集群任意工作节点,该节点为客户端分配会话 ID。
- 为了保持通信,客户端需要和服务端保持心跳(实质上就是 ping )。否则,ZooKeeper 服务会话超时时间内未收到客户端请求,会将会话视为过期。这种情况下,客户端如果要通信,就需要重新连接。
- 只要会话 ID 处于活动状态,就可以执行读写 znode 操作。
- 所有任务完成后,客户端断开与 ZooKeeper 服务端集群的连接。如果客户端长时间不活动,则 ZooKeeper 集合将自动断开客户端。
ZooKeeper 官方客户端的核心是 ZooKeeper
类。它在其构造函数中提供了连接 ZooKeeper 服务的配置选项,并提供了访问 ZooKeeper 数据的方法。
其主要操作如下:
connect
- 连接 ZooKeeper 服务
create
- 创建 znode
exists
- 检查 znode 是否存在及其信息
getACL
/ setACL
- 获取/设置一个 znode 的 ACL
getData
/ setData
- 获取/设置一个 znode 的数据
getChildren
- 获取特定 znode 中的所有子节点
delete
- 删除特定的 znode 及其所有子项
close
- 关闭连接
ZooKeeper 官方客户端的使用方法是在 maven 项目的 pom.xml 中添加:
1 2 3 4 5
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> </dependency>
|
创建连接
ZooKeeper 类通过其构造函数提供连接 ZooKeeper 服务的功能。其构造函数的定义如下:
1
| ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
|
参数说明:
connectionString
- ZooKeeper 集群的主机列表。
sessionTimeout
- 会话超时时间(以毫秒为单位)。
- watcher - 实现监视机制的回调。当被监控的 znode 状态发生变化时,ZooKeeper 服务端的
WatcherManager
会主动调用传入的 Watcher ,推送状态变化给客户端。
【示例】连接 ZooKeeper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.jupiter.api.*;
import java.io.IOException; import java.util.concurrent.CountDownLatch;
@DisplayName("ZooKeeper 官方客户端测试例") public class ZooKeeperTest {
private static ZooKeeper zk;
@BeforeAll public static void init() throws IOException, InterruptedException { final String HOST = "localhost:2181"; CountDownLatch latch = new CountDownLatch(1); zk = new ZooKeeper(HOST, 5000, watcher -> { if (watcher.getState() == Watcher.Event.KeeperState.SyncConnected) { latch.countDown(); } }); latch.await(); }
@AfterAll public static void destroy() throws InterruptedException { if (zk != null) { zk.close(); } }
@Test public void getState() { ZooKeeper.States state = zk.getState(); Assertions.assertTrue(state.isAlive()); }
}
|
说明:
添加一个 connect
方法,用于创建一个 ZooKeeper
对象,用于连接到 ZooKeeper 服务。
这里 CountDownLatch
用于停止(等待)主进程,直到客户端与 ZooKeeper 集合连接。
ZooKeeper
对象通过监听器回调来监听连接状态。一旦客户端与 ZooKeeper 建立连接,监听器回调就会被调用;并且监听器回调函数调用 CountDownLatch
的 countDown
方法来释放锁,在主进程中 await
。
节点增删改查
判断节点是否存在
ZooKeeper 类提供了 exists
方法来检查 znode 的存在。如果指定的 znode 存在,则返回一个 znode 的元数据。
exists
方法的签名如下:
1
| exists(String path, boolean watcher)
|
- path- Znode 路径
- watcher - 布尔值,用于指定是否监视指定的 znode
【示例】
1 2
| Stat stat = zk.exists("/", true); Assertions.assertNotNull(stat);
|
创建节点
ZooKeeper
类的 create
方法用于在 ZooKeeper 中创建一个新节点(znode)。
create
方法的签名如下:
1
| create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
|
- path - Znode 路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata
- data - 要存储在指定 znode 路径中的数据
- acl - 要创建的节点的访问控制列表。ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开 znode 的 acl 列表。
- createMode - 节点的类型,即临时,顺序或两者。这是一个枚举。
【示例】
1 2 3 4 5 6
| private static final String path = "/mytest";
String text = "My first zookeeper app"; zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.exists(path, true); Assertions.assertNotNull(stat);
|
删除节点
ZooKeeper 类提供了 delete
方法来删除指定的 znode。
delete
方法的签名如下:
1
| delete(String path, int version)
|
- path - Znode 路径。
- version - znode 的当前版本。
让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 delete 功能。创建文件 ZKDelete.java 。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk 。然后,使用指定的路径和版本号调用 zk 对象的 delete 方法。
删除 znode 的完整程序代码如下:
【示例】
1 2 3
| zk.delete(path, zk.exists(path, true).getVersion()); Stat stat = zk.exists(path, true); Assertions.assertNull(stat);
|
获取节点数据
ZooKeeper 类提供 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下:
1
| getData(String path, Watcher watcher, Stat stat)
|
- path - Znode 路径。
- watcher - 监听器类型的回调函数。当指定的 znode 的数据改变时,ZooKeeper 集合将通过监听器回调进行通知。这是一次性通知。
- stat - 返回 znode 的元数据。
【示例】
1 2 3 4
| byte[] data = zk.getData(path, false, null); String text1 = new String(data); Assertions.assertEquals(text, text1); System.out.println(text1);
|
设置节点数据
ZooKeeper 类提供 setData 方法来修改指定 znode 中附加的数据。 setData 方法的签名如下:
1
| setData(String path, byte[] data, int version)
|
- path- Znode 路径
- data - 要存储在指定 znode 路径中的数据。
- version- znode 的当前版本。每当数据更改时,ZooKeeper 会更新 znode 的版本号。
【示例】
1 2 3 4 5 6 7 8
| String text = "含子节点的节点"; zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> actualList = zk.getChildren(path, false); for (String child : actualList) { System.out.println(child); }
|
获取子节点
ZooKeeper 类提供 getChildren 方法来获取特定 znode 的所有子节点。 getChildren 方法的签名如下:
1
| getChildren(String path, Watcher watcher)
|
- path - Znode 路径。
- watcher - 监听器类型的回调函数。当指定的 znode 被删除或 znode 下的子节点被创建/删除时,ZooKeeper 集合将进行通知。这是一次性通知。
【示例】
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void getChildren() throws InterruptedException, KeeperException { byte[] data = "My first zookeeper app".getBytes(); zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> actualList = zk.getChildren(path, false); List<String> expectedList = CollectionUtil.newArrayList("1", "2"); Assertions.assertTrue(CollectionUtil.containsAll(expectedList, actualList)); for (String child : actualList) { System.out.println(child); } }
|
Curator 客户端
Curator 客户端简介
Curator 客户端的使用方法是在 maven 项目的 pom.xml 中添加:
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.1.0</version> </dependency>
|
创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.junit.jupiter.api.*;
import java.nio.charset.StandardCharsets;
public class CuratorTest {
private static CuratorFramework client = null; private static final String path = "/mytest";
@BeforeAll public static void init() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); client.start(); }
@AfterAll public static void destroy() { if (client != null) { client.close(); } }
}
|
节点增删改查
判断节点是否存在
1 2
| Stat stat = client.checkExists().forPath(path); Assertions.assertNull(stat);
|
判读服务状态
1 2
| CuratorFrameworkState state = client.getState(); Assertions.assertEquals(CuratorFrameworkState.STARTED, state);
|
创建节点
1 2 3 4 5 6
| String text = "Hello World"; client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path, text.getBytes(StandardCharsets.UTF_8));
|
删除节点
1 2 3 4 5
| client.delete() .guaranteed() .deletingChildrenIfNeeded() .withVersion(stat.getVersion()) .forPath(path);
|
获取节点数据
1 2 3
| byte[] data = client.getData().forPath(path); Assertions.assertEquals(text, new String(data)); System.out.println("修改前的节点数据:" + new String(data));
|
设置节点数据
1 2 3 4
| String text2 = "try again"; client.setData() .withVersion(client.checkExists().forPath(path).getVersion()) .forPath(path, text2.getBytes(StandardCharsets.UTF_8));
|
获取子节点
1 2 3 4 5 6
| List<String> children = client.getChildren().forPath(path); for (String s : children) { System.out.println(s); } List<String> expectedList = CollectionUtil.newArrayList("1", "2"); Assertions.assertTrue(CollectionUtil.containsAll(expectedList, children));
|
监听事件
创建一次性监听
和 Zookeeper 原生监听一样,使用 usingWatcher
注册的监听是一次性的,即监听只会触发一次,触发后就销毁。
【示例】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| client.getData().usingWatcher(new CuratorWatcher() { public void process(WatchedEvent event) { System.out.println("节点 " + event.getPath() + " 发生了事件:" + event.getType()); } }).forPath(path);
client.setData() .withVersion(client.checkExists().forPath(path).getVersion()) .forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));
client.setData() .withVersion(client.checkExists().forPath(path).getVersion()) .forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));
|
输出
1
| 节点 /mytest 发生了事件:NodeDataChanged
|
说明
修改两次数据,但是监听器只会监听第一次修改。
创建永久监听
Curator 还提供了创建永久监听的 API,其使用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| CuratorCache curatorCache = CuratorCache.builder(client, path).build(); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) throws Exception { System.out.println("节点 " + event.getData().getPath() + " 发生了事件:" + event.getType()); } }; CuratorCacheListener listener = CuratorCacheListener.builder() .forPathChildrenCache(path, client, pathChildrenCacheListener) .build(); curatorCache.listenable().addListener(listener); curatorCache.start();
client.setData() .withVersion(client.checkExists().forPath(path).getVersion()) .forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));
client.setData() .withVersion(client.checkExists().forPath(path).getVersion()) .forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));
|
监听子节点
这里以监听 /hadoop
下所有子节点为例,实现方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| String text = "Hello World"; client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path, text.getBytes(StandardCharsets.UTF_8)); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path + "/1", text.getBytes(StandardCharsets.UTF_8)); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path + "/2", text.getBytes(StandardCharsets.UTF_8));
PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData(); System.out.println("当前数据节点的子节点列表:"); childDataList.forEach(x -> System.out.println(x.getPath()));
childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { switch (event.getType()) { case INITIALIZED: System.out.println("childrenCache 初始化完成"); break; case CHILD_ADDED: System.out.println("增加子节点:" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("删除子节点:" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("被修改的子节点的路径:" + event.getData().getPath()); System.out.println("修改后的数据:" + new String(event.getData().getData())); break; } } });
client.setData() .forPath(path + "/1", "第一次修改".getBytes(StandardCharsets.UTF_8));
client.setData() .forPath(path + "/1", "第二次修改".getBytes(StandardCharsets.UTF_8));
|
ACL 权限管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| public class AclOperation {
private CuratorFramework client = null; private static final String zkServerPath = "192.168.0.226:2181"; private static final String nodePath = "/mytest/hdfs";
@Before public void prepare() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder() .authorization("digest", "heibai:123456".getBytes()) .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); client.start(); }
@Test public void createNodesWithAcl() throws Exception { List<ACL> aclList = new ArrayList<>(); String digest1 = DigestAuthenticationProvider.generateDigest("heibai:123456"); String digest2 = DigestAuthenticationProvider.generateDigest("ying:123456"); Id user01 = new Id("digest", digest1); Id user02 = new Id("digest", digest2); aclList.add(new ACL(Perms.ALL, user01)); aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02));
byte[] data = "abc".getBytes(); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(aclList, true) .forPath(nodePath, data); }
@Test public void SetAcl() throws Exception { String digest = DigestAuthenticationProvider.generateDigest("admin:admin"); Id user = new Id("digest", digest); client.setACL() .withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user))) .forPath(nodePath); }
@Test public void getAcl() throws Exception { List<ACL> aclList = client.getACL().forPath(nodePath); ACL acl = aclList.get(0); System.out.println(acl.getId().getId() + "是否有删读权限:" + (acl.getPerms() == (Perms.READ | Perms.DELETE))); }
@After public void destroy() { if (client != null) { client.close(); } } }
|
参考资料