ZooKeeperJavaApi

ZooKeeper Java Api

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。

ZooKeeper 官方客户端

ZooKeeper 客户端简介

客户端和服务端交互遵循以下基本步骤:

  1. 客户端连接 ZooKeeper 服务端集群任意工作节点,该节点为客户端分配会话 ID。
  2. 为了保持通信,客户端需要和服务端保持心跳(实质上就是 ping )。否则,ZooKeeper 服务会话超时时间内未收到客户端请求,会将会话视为过期。这种情况下,客户端如果要通信,就需要重新连接。
  3. 只要会话 ID 处于活动状态,就可以执行读写 znode 操作。
  4. 所有任务完成后,客户端断开与 ZooKeeper 服务端集群的连接。如果客户端长时间不活动,则 ZooKeeper 集合将自动断开客户端。

ZooKeeper 官方客户端的核心是 ZooKeeper。它在其构造函数中提供了连接 ZooKeeper 服务的配置选项,并提供了访问 ZooKeeper 数据的方法。

其主要操作如下:

  • connect - 连接 ZooKeeper 服务
  • create - 创建 znode
  • exists - 检查 znode 是否存在及其信息
  • getACL / setACL- 获取/设置一个 znode 的 ACL
  • getData / setData- 获取/设置一个 znode 的数据
  • getChildren - 获取特定 znode 中的所有子节点
  • delete - 删除特定的 znode 及其所有子项
  • close - 关闭连接

ZooKeeper 官方客户端的使用方法是在 maven 项目的 pom.xml 中添加:

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>

创建连接

ZooKeeper 类通过其构造函数提供连接 ZooKeeper 服务的功能。其构造函数的定义如下:

1
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)

参数说明:

  • connectionString - ZooKeeper 集群的主机列表。
  • sessionTimeout - 会话超时时间(以毫秒为单位)。
  • watcher - 实现监视机制的回调。当被监控的 znode 状态发生变化时,ZooKeeper 服务端的 WatcherManager 会主动调用传入的 Watcher ,推送状态变化给客户端。

【示例】连接 ZooKeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
* ZooKeeper 官方客户端测试例
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2022-02-19
*/
@DisplayName("ZooKeeper 官方客户端测试例")
public class ZooKeeperTest {

/**
* ZooKeeper 连接实例
*/
private static ZooKeeper zk;

/**
* 创建 ZooKeeper 连接
*/
@BeforeAll
public static void init() throws IOException, InterruptedException {
final String HOST = "localhost:2181";
CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper(HOST, 5000, watcher -> {
if (watcher.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
});
latch.await();
}

/**
* 关闭 ZooKeeper 连接
*/
@AfterAll
public static void destroy() throws InterruptedException {
if (zk != null) {
zk.close();
}
}

/**
* 建立连接
*/
@Test
public void getState() {
ZooKeeper.States state = zk.getState();
Assertions.assertTrue(state.isAlive());
}

}

说明:

添加一个 connect 方法,用于创建一个 ZooKeeper 对象,用于连接到 ZooKeeper 服务。

这里 CountDownLatch 用于停止(等待)主进程,直到客户端与 ZooKeeper 集合连接。

ZooKeeper 对象通过监听器回调来监听连接状态。一旦客户端与 ZooKeeper 建立连接,监听器回调就会被调用;并且监听器回调函数调用 CountDownLatchcountDown 方法来释放锁,在主进程中 await

节点增删改查

判断节点是否存在

ZooKeeper 类提供了 exists 方法来检查 znode 的存在。如果指定的 znode 存在,则返回一个 znode 的元数据。

exists 方法的签名如下:

1
exists(String path, boolean watcher)
  • path- Znode 路径
  • watcher - 布尔值,用于指定是否监视指定的 znode

【示例】

1
2
Stat stat = zk.exists("/", true);
Assertions.assertNotNull(stat);

创建节点

ZooKeeper 类的 create 方法用于在 ZooKeeper 中创建一个新节点(znode)。

create 方法的签名如下:

1
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
  • path - Znode 路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata
  • data - 要存储在指定 znode 路径中的数据
  • acl - 要创建的节点的访问控制列表。ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开 znode 的 acl 列表。
  • createMode - 节点的类型,即临时,顺序或两者。这是一个枚举

【示例】

1
2
3
4
5
6
private static final String path = "/mytest";

String text = "My first zookeeper app";
zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = zk.exists(path, true);
Assertions.assertNotNull(stat);

删除节点

ZooKeeper 类提供了 delete 方法来删除指定的 znode。

delete 方法的签名如下:

1
delete(String path, int version)
  • path - Znode 路径。
  • version - znode 的当前版本。

让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 delete 功能。创建文件 ZKDelete.java 。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk 。然后,使用指定的路径和版本号调用 zk 对象的 delete 方法。

删除 znode 的完整程序代码如下:

【示例】

1
2
3
zk.delete(path, zk.exists(path, true).getVersion());
Stat stat = zk.exists(path, true);
Assertions.assertNull(stat);

获取节点数据

ZooKeeper 类提供 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下:

1
getData(String path, Watcher watcher, Stat stat)
  • path - Znode 路径。
  • watcher - 监听器类型的回调函数。当指定的 znode 的数据改变时,ZooKeeper 集合将通过监听器回调进行通知。这是一次性通知。
  • stat - 返回 znode 的元数据。

【示例】

1
2
3
4
byte[] data = zk.getData(path, false, null);
String text1 = new String(data);
Assertions.assertEquals(text, text1);
System.out.println(text1);

设置节点数据

ZooKeeper 类提供 setData 方法来修改指定 znode 中附加的数据。 setData 方法的签名如下:

1
setData(String path, byte[] data, int version)
  • path- Znode 路径
  • data - 要存储在指定 znode 路径中的数据。
  • version- znode 的当前版本。每当数据更改时,ZooKeeper 会更新 znode 的版本号。

【示例】

1
2
3
4
5
6
7
8
String text = "含子节点的节点";
zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> actualList = zk.getChildren(path, false);
for (String child : actualList) {
System.out.println(child);
}

获取子节点

ZooKeeper 类提供 getChildren 方法来获取特定 znode 的所有子节点。 getChildren 方法的签名如下:

1
getChildren(String path, Watcher watcher)
  • path - Znode 路径。
  • watcher - 监听器类型的回调函数。当指定的 znode 被删除或 znode 下的子节点被创建/删除时,ZooKeeper 集合将进行通知。这是一次性通知。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void getChildren() throws InterruptedException, KeeperException {
byte[] data = "My first zookeeper app".getBytes();
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> actualList = zk.getChildren(path, false);
List<String> expectedList = CollectionUtil.newArrayList("1", "2");
Assertions.assertTrue(CollectionUtil.containsAll(expectedList, actualList));
for (String child : actualList) {
System.out.println(child);
}
}

Curator 客户端

Curator 客户端简介

Curator 客户端的使用方法是在 maven 项目的 pom.xml 中添加:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>

创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.jupiter.api.*;

import java.nio.charset.StandardCharsets;

public class CuratorTest {

/**
* Curator ZooKeeper 连接实例
*/
private static CuratorFramework client = null;
private static final String path = "/mytest";

/**
* 创建连接
*/
@BeforeAll
public static void init() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build(); //指定命名空间后,client 的所有路径操作都会以 /workspace 开头
client.start();
}

/**
* 关闭连接
*/
@AfterAll
public static void destroy() {
if (client != null) {
client.close();
}
}

}

节点增删改查

判断节点是否存在

1
2
Stat stat = client.checkExists().forPath(path);
Assertions.assertNull(stat);

判读服务状态

1
2
CuratorFrameworkState state = client.getState();
Assertions.assertEquals(CuratorFrameworkState.STARTED, state);

创建节点

1
2
3
4
5
6
// 创建节点
String text = "Hello World";
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, text.getBytes(StandardCharsets.UTF_8));

删除节点

1
2
3
4
5
client.delete()
.guaranteed() // 如果删除失败,会继续执行,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,则递归删除
.withVersion(stat.getVersion()) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
.forPath(path);

获取节点数据

1
2
3
byte[] data = client.getData().forPath(path);
Assertions.assertEquals(text, new String(data));
System.out.println("修改前的节点数据:" + new String(data));

设置节点数据

1
2
3
4
String text2 = "try again";
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, text2.getBytes(StandardCharsets.UTF_8));

获取子节点

1
2
3
4
5
6
List<String> children = client.getChildren().forPath(path);
for (String s : children) {
System.out.println(s);
}
List<String> expectedList = CollectionUtil.newArrayList("1", "2");
Assertions.assertTrue(CollectionUtil.containsAll(expectedList, children));

监听事件

创建一次性监听

和 Zookeeper 原生监听一样,使用 usingWatcher 注册的监听是一次性的,即监听只会触发一次,触发后就销毁。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 设置监听器
client.getData().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) {
System.out.println("节点 " + event.getPath() + " 发生了事件:" + event.getType());
}
}).forPath(path);

// 第一次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));

输出

1
节点 /mytest 发生了事件:NodeDataChanged

说明

修改两次数据,但是监听器只会监听第一次修改。

创建永久监听

Curator 还提供了创建永久监听的 API,其使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 设置监听器
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) throws Exception {
System.out.println("节点 " + event.getData().getPath() + " 发生了事件:" + event.getType());
}
};
CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache(path, client,
pathChildrenCacheListener)
.build();
curatorCache.listenable().addListener(listener);
curatorCache.start();

// 第一次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));

监听子节点

这里以监听 /hadoop 下所有子节点为例,实现方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 创建节点
String text = "Hello World";
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, text.getBytes(StandardCharsets.UTF_8));
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path + "/1", text.getBytes(StandardCharsets.UTF_8));
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path + "/2", text.getBytes(StandardCharsets.UTF_8));

// 设置监听器
// 第三个参数代表除了节点状态外,是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
/*
* StartMode 代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x -> System.out.println(x.getPath()));

childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache 初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" + event.getData().getPath());
System.out.println("修改后的数据:" + new String(event.getData().getData()));
break;
}
}
});

// 第一次修改
client.setData()
.forPath(path + "/1", "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.forPath(path + "/1", "第二次修改".getBytes(StandardCharsets.UTF_8));

ACL 权限管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class AclOperation {

private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/mytest/hdfs";

@Before
public void prepare() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.authorization("digest", "heibai:123456".getBytes()) //等价于 addauth 命令
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

/**
* 新建节点并赋予权限
*/
@Test
public void createNodesWithAcl() throws Exception {
List<ACL> aclList = new ArrayList<>();
// 对密码进行加密
String digest1 = DigestAuthenticationProvider.generateDigest("heibai:123456");
String digest2 = DigestAuthenticationProvider.generateDigest("ying:123456");
Id user01 = new Id("digest", digest1);
Id user02 = new Id("digest", digest2);
// 指定所有权限
aclList.add(new ACL(Perms.ALL, user01));
// 如果想要指定权限的组合,中间需要使用 | ,这里的|代表的是位运算中的 按位或
aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02));

// 创建节点
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList, true)
.forPath(nodePath, data);
}


/**
* 给已有节点设置权限,注意这会删除所有原来节点上已有的权限设置
*/
@Test
public void SetAcl() throws Exception {
String digest = DigestAuthenticationProvider.generateDigest("admin:admin");
Id user = new Id("digest", digest);
client.setACL()
.withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user)))
.forPath(nodePath);
}

/**
* 获取权限
*/
@Test
public void getAcl() throws Exception {
List<ACL> aclList = client.getACL().forPath(nodePath);
ACL acl = aclList.get(0);
System.out.println(acl.getId().getId()
+ "是否有删读权限:" + (acl.getPerms() == (Perms.READ | Perms.DELETE)));
}

@After
public void destroy() {
if (client != null) {
client.close();
}
}
}

参考资料