Dunwu Blog

大道至简,知易行难

ZooKeeper 原理

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。

ZooKeeper 简介

ZooKeeper 是什么

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 的特性

ZooKeeper 具有以下特性:

  • 顺序一致性:所有客户端看到的服务端数据模型都是一致的;从一个客户端发起的事务请求,最终都会严格按照其发起顺序被应用到 ZooKeeper 中。具体的实现可见:原子广播
  • 原子性 - 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,即整个集群要么都成功应用了某个事务,要么都没有应用。 实现方式可见:事务
  • 单一视图 - 无论客户端连接的是哪个 Zookeeper 服务器,其看到的服务端数据模型都是一致的。
  • 高性能 - ZooKeeper 将数据全量存储在内存中,所以其性能很高。需要注意的是:由于 ZooKeeper 的所有更新和删除都是基于事务的,因此 ZooKeeper 在读多写少的应用场景中有性能表现较好,如果写操作频繁,性能会大大下滑
  • 高可用 - ZooKeeper 的高可用是基于副本机制实现的,此外 ZooKeeper 支持故障恢复,可见:选举 Leader

ZooKeeper 的应用场景

  • 配置管理
    • 集群节点可以通过中心源获取启动配置
    • 更简单的部署
  • 分布式集群管理
    • 节点加入/离开
    • 节点的实时状态
  • 命名服务,如:DNS
  • 分布式同步:如锁、栅栏、队列
  • 分布式系统的选主
  • 中心化和高可靠的数据注册

ZooKeeper 的设计目标

  • 简单的数据模型:ZooKeeper 的数据模型是一个树形结构的文件系统,树中的节点被称为 **znode**。
  • 可以构建集群:ZooKeeper 支持集群模式,可以通过伸缩性,来控制集群的吞吐量。需要注意的是:由于 ZooKeeper 采用一主多从架构,所以其写性能是有上限的,比较适合于读多写少的场景。
  • 顺序访问:对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增 ID,这个 ID 反映了所有事务请求的先后顺序。
  • 高性能、高可用:ZooKeeper 将数据存全量储在内存中以保持高性能,并通过服务集群来实现高可用,由于 Zookeeper 的所有更新和删除都是基于事务的,所以其在读多写少的应用场景中有着很高的性能表现。

ZooKeeper 核心概念

服务

Zookeeper 服务是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。

客户端只会连接一个 ZooKeeper 服务器节点,并维持 TCP 连接。

数据模型

ZooKeeper 的数据模型是一个树形结构的文件系统

树中的节点被称为 znode,其中根节点为 /,每个节点上都会保存自己的数据和节点信息。znode 可以用于存储数据,并且有一个与之相关联的 ACL(详情可见 ACL)。ZooKeeper 的设计目标是实现协调服务,而不是真的作为一个文件存储,因此 znode 存储数据的大小被限制在 1MB 以内

ZooKeeper 的数据访问具有原子性。其读写操作都是要么全部成功,要么全部失败。

znode 通过路径被引用。znode 节点路径必须是绝对路径

znode 有两种类型:

  • 临时的( EPHEMERAL - 户端会话结束时,ZooKeeper 就会删除临时的 znode。不允许有子节点。
  • 持久的(PERSISTENT - 除非客户端主动执行删除操作,否则 ZooKeeper 不会删除持久的 znode。

节点信息

znode 上有一个顺序标志( SEQUENTIAL。如果在创建 znode 时,设置了顺序标志( SEQUENTIAL,那么 ZooKeeper 会使用计数器为 znode 添加一个单调递增的数值,即 zxid。ZooKeeper 正是利用 zxid 实现了严格的顺序访问控制能力。

每个 znode 节点在存储数据的同时,都会维护一个叫做 Stat 的数据结构,里面存储了关于该节点的全部状态信息。如下:

状态属性 说明
czxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mzxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pzxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
version 节点数据的更改次数
aversion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

集群角色

Zookeeper 集群是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。此外,每个服务器节点承担如下三种角色中的一种:

  • Leader - 它负责 发起并维护与各 Follwer 及 Observer 间的心跳。所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader。
  • Follower - 它会响应 Leader 的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。一个 Zookeeper 集群可能同时存在多个 Follower。
  • Observer - 角色与 Follower 类似,但是无投票权。

客户端可以从任意 ZooKeeper 服务器节点读取数据,但只能通过 Leader 服务写数据并需要半数以上 Follower 的 ACK,才算写入成功。记住这个重要的知识点,下文会详细讲述。

ACL

ZooKeeper 采用 ACL(Access Control Lists)策略来进行权限控制

每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。

ACL 依赖于 ZooKeeper 的客户端认证机制。ZooKeeper 提供了以下几种认证方式:

  • digest - 用户名和密码 来识别客户端
  • sasl - 通过 kerberos 来识别客户端
  • ip - 通过 IP 来识别客户端

ZooKeeper 定义了如下五种权限:

  • CREATE - 允许创建子节点;
  • READ - 允许从节点获取数据并列出其子节点;
  • WRITE - 允许为节点设置数据;
  • DELETE - 允许删除子节点;
  • ADMIN - 允许为节点设置权限。

ZooKeeper 工作原理

读操作

Leader/Follower/Observer 都可直接处理读请求,从本地内存中读取数据并返回给客户端即可

由于处理读请求不需要服务器之间的交互,Follower/Observer 越多,整体系统的读请求吞吐量越大,也即读性能越好。

写操作

所有的写请求实际上都要交给 Leader 处理。Leader 将写请求以事务形式发给所有 Follower 并等待 ACK,一旦收到半数以上 Follower 的 ACK,即认为写操作成功。

写 Leader

由上图可见,通过 Leader 进行写操作,主要分为五步:

  1. 客户端向 Leader 发起写请求
  2. Leader 将写请求以事务 Proposal 的形式发给所有 Follower 并等待 ACK
  3. Follower 收到 Leader 的事务 Proposal 后返回 ACK
  4. Leader 得到过半数的 ACK(Leader 对自己默认有一个 ACK)后向所有的 Follower 和 Observer 发送 Commmit
  5. Leader 将处理结果返回给客户端

注意

  • Leader 不需要得到 Observer 的 ACK,即 Observer 无投票权。
  • Leader 不需要得到所有 Follower 的 ACK,只要收到过半的 ACK 即可,同时 Leader 本身对自己有一个 ACK。上图中有 4 个 Follower,只需其中两个返回 ACK 即可,因为 $$(2+1) / (4+1) > 1/2$$ 。
  • Observer 虽然无投票权,但仍须同步 Leader 的数据从而在处理读请求时可以返回尽可能新的数据。

写 Follower/Observer

  • Follower/Observer 均可接受写请求,但不能直接处理,而需要将写请求转发给 Leader 处理。
  • 除了多了一步请求转发,其它流程与直接写 Leader 无任何区别。

事务

对于来自客户端的每个更新请求,ZooKeeper 具备严格的顺序访问控制能力。

为了保证事务的顺序一致性,ZooKeeper 采用了递增的事务 id 号(zxid)来标识事务

Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。

所有的提议(**proposal**)都在被提出的时候加上了 zxid。zxid 是一个 64 位的数字,它的高 32 位是 epoch 用来标识 Leader 关系是否改变,每次一个 Leader 被选出来,它都会有一个新的 epoch,标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。

详细过程如下:

  1. Leader 等待 Server 连接;
  2. Follower 连接 Leader,将最大的 zxid 发送给 Leader;
  3. Leader 根据 Follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

观察

ZooKeeper 允许客户端监听它关心的 znode,当 znode 状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端

客户端和服务端保持连接一般有两种形式:

  • 客户端向服务端不断轮询
  • 服务端向客户端推送状态

Zookeeper 的选择是服务端主动推送状态,也就是观察机制( Watch )。

ZooKeeper 的观察机制允许用户在指定节点上针对感兴趣的事件注册监听,当事件发生时,监听器会被触发,并将事件信息推送到客户端。

  • 监听器实时触发
  • 监听器总是有序的
  • 创建新的 znode 数据前,客户端就能收到监听事件。

客户端使用 getData 等接口获取 znode 状态时传入了一个用于处理节点变更的回调,那么服务端就会主动向客户端推送节点的变更:

1
public byte[] getData(final String path, Watcher watcher, Stat stat)

从这个方法中传入的 Watcher 对象实现了相应的 process 方法,每次对应节点出现了状态的改变,WatchManager 都会通过以下的方式调用传入 Watcher 的方法:

1
2
3
4
5
6
7
8
9
10
11
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
}
for (Watcher w : watchers) {
w.process(e);
}
return watchers;
}

Zookeeper 中的所有数据其实都是由一个名为 DataTree 的数据结构管理的,所有的读写数据的请求最终都会改变这颗树的内容,在发出读请求时可能会传入 Watcher 注册一个回调函数,而写请求就可能会触发相应的回调,由 WatchManager 通知客户端数据的变化。

通知机制的实现其实还是比较简单的,通过读请求设置 Watcher 监听事件,写请求在触发事件时就能将通知发送给指定的客户端。

会话

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。

每个 ZooKeeper 客户端配置中都配置了 ZooKeeper 服务器集群列表。启动时,客户端会遍历列表去尝试建立连接。如果失败,它会尝试连接下一个服务器,依次类推。

一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。每个会话都会有一个超时时间,若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 znode 都会被删除。

通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式(ping)来保持会话不过期。

ZooKeeper 的会话具有四个属性:

  • sessionID - 会话 ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper 都会为其分配一个全局唯一的 sessionID。
  • TimeOut - 会话超时时间,客户端在构造 Zookeeper 实例时,会配置 sessionTimeout 参数用于指定会话的超时时间,Zookeeper 客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。
  • TickTime - 下次会话超时时间点,为了便于 Zookeeper 对会话实行”分桶策略”管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper 会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上 TimeOut。
  • isClosing - 标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的 isClosing 标记为”已关闭”,这样就能确保不再处理来自该会话的心情求了。

Zookeeper 的会话管理主要是通过 SessionTracker 来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便 Zookeeper 对会话进行不同区块的隔离处理以及同一区块的统一处理。

ZAB 协议

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。Multi-Paxos 实现的是一系列值的共识,不关心最终达成共识的值是什么,不关心各值的顺序。而 ZooKeeper 需要确保操作的顺序性。

ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议

ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

选举 Leader

ZooKeeper 的故障恢复

ZooKeeper 集群采用一主(称为 Leader)多从(称为 Follower)模式,主从节点通过副本机制保证数据一致。

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)。
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zxid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

原子广播(Atomic Broadcast)

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

ZAB 是通过“一切以领导者为准”的强领导者模型和严格按照顺序提交日志,来实现操作的顺序性的,这一点和 Raft 是一样的。

ZooKeeper 应用

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

  • 如果序列号是最小的,则成功获取到锁。
    • 释放锁:执行完操作后,把创建的节点给删掉。
  • 如果不是,则监听比自己要小 1 的节点变化。

(3)释放锁,即删除自己创建的节点。

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

集群管理

ZooKeeper 还能解决大多数分布式系统中的问题:

  • 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
  • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
  • 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度。
  • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Master/Salves),ZooKeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。

队列管理

ZooKeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 ZooKeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 /synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

ZooKeeper 的缺点

ZooKeeper 不是为高可用性设计的

生产环境中常常需要通过多机房部署来容灾。出于成本考虑,一般多机房都是同时提供服务的,即一个机房撑不住所有流量。ZooKeeper 集群只能有一个 Leader,一旦机房之间连接出现故障,那么只有 Leader 所在的机房可以正常工作,其他机房只能停摆。于是所有流量集中到 Leader 所在的机房,由于处理不过来而导致崩溃。

即使是在同一个机房里面,由于网段的不同,在调整机房交换机的时候偶尔也会发生网段隔离的情况。实际上机房每个月基本上都会发生短暂的网络隔离之类的子网段调整。在那个时刻 ZooKeeper 将处于不可用状态。如果业务系统重度依赖 ZooKeeper(比如用 Dubbo 作为 RPC,且使用 ZooKeeper 作为注册中心),则系统的可用性将非常脆弱。

由于 ZooKeeper 对于网络隔离的极度敏感,导致 ZooKeeper 对于网络的任何风吹草动都会做出激烈反应。这使得 ZooKeeper 的不可用时间比较多。我们不能让 ZooKeeper 的不可用,变成系统的不可用

ZooKeeper 的选举过程速度很慢

互联网环境中,网络不稳定几乎是必然的,而 ZooKeeper 网络隔离非常敏感。一旦出现网络隔离,zookeeper 就要发起选举流程。

ZooKeeper 的选举流程通常耗时 30 到 120 秒,期间 ZooKeeper 由于没有 Leader,都是不可用的。

对于网络里面偶尔出现的,比如半秒一秒的网络隔离,ZooKeeper 会由于选举过程,而把不可用时间放大几十倍。

ZooKeeper 的性能是有限的

典型的 ZooKeeper 的 TPS 大概是一万多,无法支撑每天动辄几十亿次的调用。因此,每次请求都去 ZooKeeper 获取业务系统信息是不可能的。

为此,ZooKeeper 的 client 必须自己缓存业务系统的信息。这就导致 ZooKeeper 提供的强一致性实际上是做不到的。如果我们需要强一致性,还需要其他机制来进行保障:比如用自动化脚本把业务系统的 old master 给 kill 掉,但是这可能会引发很多其他问题。

ZooKeeper 无法进行有效的权限控制

ZooKeeper 的权限控制非常弱。在大型的复杂系统里面,使用 ZooKeeper 必须自己再额外的开发一套权限控制系统,通过那套权限控制系统再访问 ZooKeeper。

额外的权限控制系统不但增加了系统复杂性和维护成本,而且降低了系统的总体性能。

即使有了 ZooKeeper 也很难避免业务系统的数据不一致

由于 ZooKeeper 的性能限制,我们无法让每次系统内部调用都走 ZooKeeper,因此总有某些时刻,业务系统会存在两份数据(业务系统 client 那边缓存的业务系统信息是定时从 ZooKeeper 更新的,因此会有更新不同步的问题)。

如果要保持数据的强一致性,唯一的方法是“先 kill 掉当前 Leader,再在 ZooKeeper 上更新 Leader 信息”。是否要 kill 掉当前 Leader 这个问题上,程序是无法完全自动决定的(因为网络隔离的时候 ZooKeeper 已经不可用了,自动脚本没有全局信息,不管怎么做都可能是错的,什么都不做也可能是错的。当网络故障的时候,只有运维人员才有全局信息,程序是无法得知其他机房的情况的)。因此系统无法自动的保障数据一致性,必须要人工介入。而人工介入的典型时间是半个小时以上,我们不能让系统这么长时间不可用。因此我们必须在某个方向上进行妥协,最常见的妥协方式是放弃强一致性,而接受最终一致性

如果我们需要人工介入才能保证可靠的强一致性,那么 ZooKeeper 的价值就大打折扣。

参考资料

ZooKeeper 面试

ZooKeeper 简介

【基础】什么是 ZooKeeper?

:::details 要点

Zookeeper 是一个开源的分布式协调服务,目前由 Apache 进行维护。Zookeeper 可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

Zookeeper 具有以下特性:

  • 顺序一致性:从一个客户端发起的事务请求,最终都会严格按照其发起顺序被应用到 Zookeeper 中;
  • 原子性:所有事务请求的处理结果在整个集群中所有机器上都是一致的;不存在部分机器应用了该事务,而另一部分没有应用的情况;
  • 单一视图:所有客户端看到的服务端数据模型都是一致的;
  • 可靠性:一旦服务端成功应用了一个事务,则其引起的改变会一直保留,直到被另外一个事务所更改;
  • 实时性:一旦一个事务被成功应用后,Zookeeper 可以保证客户端立即可以读取到这个事务变更后的最新状态的数据。

:::

【基础】ZooKeeper 中有哪些应用场景?

:::details 要点

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

发布订阅

通过 Zookeeper 进行数据的发布与订阅其实可以说是它提供的最基本功能,它能够允许多个客户端同时订阅某一个节点的变更并在变更发生时执行我们预先设置好的回调函数,在运行时改变服务的配置和行为:

1
2
3
4
5
6
7
8
9
ZooKeeper zk = new ZooKeeper("localhost", 3000, null);
zk.getData("/config", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.toString());
}
}, null);
zk.setData("/config", "draven".getBytes(), 0);

// WatchedEvent state:SyncConnected type:NodeDataChanged path:/config

发布与订阅是 Zookeeper 提供的一个最基本的功能,它的使用非常的简单,我们可以在 getData 中传入实现 process 方法的 Watcher 对象,在每次改变节点的状态时,process 方法都会被调用,在这个方法中就可以对变更进行响应动态修改一些行为。

zookeeper-pubsub

通过 Zookeeper 这个中枢,每一个客户端对节点状态的改变都能够推送给节点的订阅者,在发布订阅模型中,Zookeeper 的每一个节点都可以被理解成一个主题,每一个客户端都可以向这个主题推送详细,同时也可以订阅这个主题中的消息;只是 Zookeeper 引入了文件系统的父子层级的概念将发布订阅功能实现得更加复杂。

1
2
3
4
5
6
7
public static enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4);
}

如果我们订阅了一个节点的变更信息,那么该节点的子节点出现数量变更时就会调用 process 方法通知观察者,这也意味着更复杂的实现,同时和专门做发布订阅的中间件相比也没有性能优势,在海量推送的应用场景下,消息队列更能胜任,而 Zookeeper 更适合做一些类似服务配置的动态下发的工作。

命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

  • 如果序列号是最小的,则成功获取到锁。
    • 释放锁:执行完操作后,把创建的节点给删掉。
  • 如果不是,则监听比自己要小 1 的节点变化。

(3)释放锁,即删除自己创建的节点。

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

集群管理

ZooKeeper 还能解决大多数分布式系统中的协调问题:

  • 可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
  • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报
  • 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度
  • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Leader/Followers),ZooKeeper 可以用于该模式下的 Leader 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Leader 节点。

队列管理

ZooKeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 ZooKeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 /synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

:::

ZooKeeper 存储

【基础】ZooKeeper 如何存储数据?

:::details 要点

ZooKeeper 采用类似于文件系统的层级结构存储数据

树中的节点被称为 znode,其中根节点为 /,每个节点上都会保存自己的数据和节点信息。znode 可以用于存储数据,并且有一个与之相关联的 ACL(详情可见 ACL)。ZooKeeper 的设计目标是实现协调服务,而不是真的作为一个文件存储,因此 znode 存储数据的大小被限制在 1MB 以内

ZooKeeper 的数据访问具有原子性。其读写操作都是要么全部成功,要么全部失败。

znode 通过路径被引用。znode 节点路径必须是绝对路径

:::

【基础】ZooKeeper 有几种节点类型?

:::details 要点

znode 其实有 PERSISTENTPERSISTENT_SEQUENTIALEPHEMERALEPHEMERAL_SEQUENTIAL 四种类型,它们是临时与持久、顺序与非顺序两个不同的方向组合成的四种类型。

临时节点是客户端在连接 Zookeeper 时才会保持存在的节点,一旦客户端和服务端之间的连接中断,当前连接持有的所有节点都会被删除,而持久的节点不会随着会话连接的中断而删除,它们需要被客户端主动删除;Zookeeper 中另一种节点的特性就是顺序和非顺序,如果我们使用 Zookeeper 创建了顺序的节点,那么所有节点就会在名字的末尾附加一个序列号,序列号是一个由父节点维护的单调递增计数器。

:::

ZooKeeper 架构

【中级】ZooKeeper 的设计目标是什么?

:::details 要点

Zookeeper 致力于为那些高吞吐的大型分布式系统提供一个高性能、高可用、且具有严格顺序访问控制能力的分布式协调服务。它具有以下四个目标:

目标一:简单的数据模型

Zookeeper 通过树形结构来存储数据,它由一系列被称为 znode 的数据节点组成,类似于常见的文件系统。不过和常见的文件系统不同,Zookeeper 将数据全量存储在内存中,以此来实现高吞吐,减少访问延迟。

img

目标二:构建集群

可以由一组 Zookeeper 服务构成 Zookeeper 集群,集群中每台机器都会单独在内存中维护自身的状态,并且每台机器之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务。

img

目标三:顺序访问

对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增 ID,这个 ID 反映了所有事务请求的先后顺序。

目标四:高性能高可用

ZooKeeper 将数据存全量储在内存中以保持高性能,并通过服务集群来实现高可用,由于 Zookeeper 的所有更新和删除都是基于事务的,所以其在读多写少的应用场景中有着很高的性能表现。

:::

【中级】ZooKeeper 集群有几种角色?

:::details 要点

Zookeeper 集群是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。此外,每个服务器节点承担如下三种角色中的一种:

  • Leader - 它负责 发起并维护与各 Follwer 及 Observer 间的心跳。所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader。
  • Follower - 它会响应 Leader 的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。一个 Zookeeper 集群可能同时存在多个 Follower。
  • Observer - 角色与 Follower 类似,但是无投票权。

客户端可以从任意 ZooKeeper 服务器节点读取数据,但只能通过 Leader 服务写数据并需要半数以上 Follower 的 ACK,才算写入成功。记住这个重要的知识点,下文会详细讲述。

:::

【中级】ZooKeeper 的权限控制如何设计的?

:::details 要点

ZooKeeper 采用 ACL(Access Control Lists)策略来进行权限控制

每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。

ACL 依赖于 ZooKeeper 的客户端认证机制。ZooKeeper 提供了以下几种认证方式:

  • digest - 用户名和密码 来识别客户端
  • sasl - 通过 kerberos 来识别客户端
  • ip - 通过 IP 来识别客户端

ZooKeeper 定义了如下五种权限:

  • CREATE - 允许创建子节点;
  • READ - 允许从节点获取数据并列出其子节点;
  • WRITE - 允许为节点设置数据;
  • DELETE - 允许删除子节点;
  • ADMIN - 允许为节点设置权限。

:::

【高级】ZooKeeper 的架构有什么缺点?

:::details 要点

ZooKeeper 不是为高可用性设计的

生产环境中常常需要通过多机房部署来容灾。出于成本考虑,一般多机房都是同时提供服务的,即一个机房撑不住所有流量。ZooKeeper 集群只能有一个 Leader,一旦机房之间连接出现故障,那么只有 Leader 所在的机房可以正常工作,其他机房只能停摆。于是所有流量集中到 Leader 所在的机房,由于处理不过来而导致崩溃。

即使是在同一个机房里面,由于网段的不同,在调整机房交换机的时候偶尔也会发生网段隔离的情况。实际上机房每个月基本上都会发生短暂的网络隔离之类的子网段调整。在那个时刻 ZooKeeper 将处于不可用状态。如果业务系统重度依赖 ZooKeeper(比如用 Dubbo 作为 RPC,且使用 ZooKeeper 作为注册中心),则系统的可用性将非常脆弱。

由于 ZooKeeper 对于网络隔离的极度敏感,导致 ZooKeeper 对于网络的任何风吹草动都会做出激烈反应。这使得 ZooKeeper 的不可用时间比较多。我们不能让 ZooKeeper 的不可用,变成系统的不可用

ZooKeeper 的选举过程速度很慢

互联网环境中,网络不稳定几乎是必然的,而 ZooKeeper 网络隔离非常敏感。一旦出现网络隔离,zookeeper 就要发起选举流程。

ZooKeeper 的选举流程通常耗时 30 到 120 秒,期间 ZooKeeper 由于没有 Leader,都是不可用的

对于网络里面偶尔出现的,比如半秒一秒的网络隔离,ZooKeeper 会由于选举过程,而把不可用时间放大几十倍。

ZooKeeper 的性能是有限的

典型的 ZooKeeper 的 TPS 大概是一万多,无法支撑每天动辄几十亿次的调用。因此,每次请求都去 ZooKeeper 获取业务系统信息是不可能的。

为此,ZooKeeper 的 client 必须自己缓存业务系统的信息。这就导致 ZooKeeper 提供的强一致性实际上是做不到的。如果我们需要强一致性,还需要其他机制来进行保障:比如用自动化脚本把业务系统的 old master 给 kill 掉,但是这可能会引发很多其他问题。

ZooKeeper 无法进行有效的权限控制

ZooKeeper 的权限控制非常弱。在大型的复杂系统里面,使用 ZooKeeper 必须自己再额外的开发一套权限控制系统,通过那套权限控制系统再访问 ZooKeeper。

额外的权限控制系统不但增加了系统复杂性和维护成本,而且降低了系统的总体性能。

即使有了 ZooKeeper 也很难避免业务系统的数据不一致

由于 ZooKeeper 的性能限制,我们无法让每次系统内部调用都走 ZooKeeper,因此总有某些时刻,业务系统会存在两份数据(业务系统 client 那边缓存的业务系统信息是定时从 ZooKeeper 更新的,因此会有更新不同步的问题)。

如果要保持数据的强一致性,唯一的方法是“先 kill 掉当前 Leader,再在 ZooKeeper 上更新 Leader 信息”。是否要 kill 掉当前 Leader 这个问题上,程序是无法完全自动决定的(因为网络隔离的时候 ZooKeeper 已经不可用了,自动脚本没有全局信息,不管怎么做都可能是错的,什么都不做也可能是错的。当网络故障的时候,只有运维人员才有全局信息,程序是无法得知其他机房的情况的)。因此系统无法自动的保障数据一致性,必须要人工介入。而人工介入的典型时间是半个小时以上,我们不能让系统这么长时间不可用。因此我们必须在某个方向上进行妥协,最常见的妥协方式是放弃强一致性,而接受最终一致性

如果我们需要人工介入才能保证可靠的强一致性,那么 ZooKeeper 的价值就大打折扣。

:::

ZooKeeper 工作流

【中级】ZooKeeper 读操作工作流程是怎样的?

:::details 要点

Leader/Follower/Observer 都可直接处理读请求,从本地内存中读取数据并返回给客户端即可

由于处理读请求不需要服务器之间的交互,Follower/Observer 越多,整体系统的读请求吞吐量越大,也即读性能越好。

:::

【中级】ZooKeeper 写操作工作流程是怎样的?

:::details 要点

所有的写请求实际上都要交给 Leader 处理。Leader 将写请求以事务形式发给所有 Follower 并等待 ACK,一旦收到半数以上 Follower 的 ACK,即认为写操作成功。

写 Leader

由上图可见,通过 Leader 进行写操作,主要分为五步:

  1. 客户端向 Leader 发起写请求
  2. Leader 将写请求以事务 Proposal 的形式发给所有 Follower 并等待 ACK
  3. Follower 收到 Leader 的事务 Proposal 后返回 ACK
  4. Leader 得到过半数的 ACK(Leader 对自己默认有一个 ACK)后向所有的 Follower 和 Observer 发送 Commmit
  5. Leader 将处理结果返回给客户端

注意

  • Leader 不需要得到 Observer 的 ACK,即 Observer 无投票权。
  • Leader 不需要得到所有 Follower 的 ACK,只要收到过半的 ACK 即可,同时 Leader 本身对自己有一个 ACK。上图中有 4 个 Follower,只需其中两个返回 ACK 即可,因为 $$(2+1) / (4+1) > 1/2$$ 。
  • Observer 虽然无投票权,但仍须同步 Leader 的数据从而在处理读请求时可以返回尽可能新的数据。

写 Follower/Observer

  • Follower/Observer 均可接受写请求,但不能直接处理,而需要将写请求转发给 Leader 处理。
  • 除了多了一步请求转发,其它流程与直接写 Leader 无任何区别。

:::

【中级】ZooKeeper 事务机制是怎样的?

:::details 要点

对于来自客户端的每个更新请求,ZooKeeper 具备严格的顺序访问控制能力。

为了保证事务的顺序一致性,ZooKeeper 采用了递增的事务 id 号(zxid)来标识事务

Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。

所有的提议(**proposal**)都在被提出的时候加上了 zxid。zxid 是一个 64 位的数字,它的高 32 位是 epoch 用来标识 Leader 关系是否改变,每次一个 Leader 被选出来,它都会有一个新的 epoch,标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。

详细过程如下:

  1. Leader 等待 Server 连接;
  2. Follower 连接 Leader,将最大的 zxid 发送给 Leader;
  3. Leader 根据 Follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

:::

【中级】ZooKeeper 监听机制是怎样的?

:::details 要点

ZooKeeper 允许客户端监听它关心的 znode,当 znode 状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端

需要注意的是:ZooKeeper 的监听通知是一次性的。无论是服务端还是客户端,一旦一个 Watcher 被触发,Zookeeper 都会将其从相应的存储中移除。这样的设计有效的减轻了服务端的压力,不然对于更新非常频繁的节点,服务端会不断的向客户端发送事件通知,无论对于网络还是服务端的压力都非常大。

客户端和服务端保持连接一般有两种形式:

  • 客户端向服务端不断轮询
  • 服务端向客户端推送状态

Zookeeper 的选择是服务端主动推送状态,也就是观察机制( Watch

ZooKeeper 的观察机制允许用户在指定节点上针对感兴趣的事件注册监听,当事件发生时,监听器会被触发,并将事件信息推送到客户端。

  • 监听器实时触发
  • 监听器总是有序的
  • 创建新的 znode 数据前,客户端就能收到监听事件。

客户端使用 getData 等接口获取 znode 状态时传入了一个用于处理节点变更的回调,那么服务端就会主动向客户端推送节点的变更:

1
public byte[] getData(final String path, Watcher watcher, Stat stat)

从这个方法中传入的 Watcher 对象实现了相应的 process 方法,每次对应节点出现了状态的改变,WatchManager 都会通过以下的方式调用传入 Watcher 的方法:

1
2
3
4
5
6
7
8
9
10
11
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
}
for (Watcher w : watchers) {
w.process(e);
}
return watchers;
}

Zookeeper 中的所有数据其实都是由一个名为 DataTree 的数据结构管理的,所有的读写数据的请求最终都会改变这颗树的内容,在发出读请求时可能会传入 Watcher 注册一个回调函数,而写请求就可能会触发相应的回调,由 WatchManager 通知客户端数据的变化。

通知机制的实现其实还是比较简单的,通过读请求设置 Watcher 监听事件,写请求在触发事件时就能将通知发送给指定的客户端。

:::

【中级】ZooKeeper 会话机制是怎样的?

:::details 要点

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。

每个 ZooKeeper 客户端配置中都配置了 ZooKeeper 服务器集群列表。启动时,客户端会遍历列表去尝试建立连接。如果失败,它会尝试连接下一个服务器,依次类推。

一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。每个会话都会有一个超时时间,若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 znode 都会被删除。

通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式(ping)来保持会话不过期。

ZooKeeper 的会话具有四个属性:

  • sessionID - 会话 ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper 都会为其分配一个全局唯一的 sessionID。
  • TimeOut - 会话超时时间,客户端在构造 Zookeeper 实例时,会配置 sessionTimeout 参数用于指定会话的超时时间,Zookeeper 客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。
  • TickTime - 下次会话超时时间点,为了便于 Zookeeper 对会话实行”分桶策略”管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper 会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上 TimeOut。
  • isClosing - 标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的 isClosing 标记为”已关闭”,这样就能确保不再处理来自该会话的心情求了。

Zookeeper 的会话管理主要是通过 SessionTracker 来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便 Zookeeper 对会话进行不同区块的隔离处理以及同一区块的统一处理。

:::

Zab 协议

【中级】什么是 Zab 协议?

:::details 要点

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。Multi-Paxos 实现的是一系列值的共识,不关心最终达成共识的值是什么,不关心各值的顺序。而 ZooKeeper 需要确保操作的顺序性。

ZAB 协议是 Zookeeper 专门设计的一种支持故障恢复的原子广播协议。ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

:::

【高级】Zab 协议中故障恢复的流程是怎样的?

:::details 要点

故障恢复

ZooKeeper 集群采用一主多从模式,主从节点通过副本机制保证数据一致

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zxid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为 (2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在 (self_myid, self_zxid) 相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

:::

【高级】Zab 协议中原子广播的流程是怎样的?

:::details 要点

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

ZAB 是通过“一切以领导者为准”的强领导者模型和严格按照顺序提交日志,来实现操作的顺序性的,这一点和 Raft 是一样的。

:::

【中级】Zab 和 Paxos 有什么区别?

:::details 要点

Zab 和 Paxos 协议在实现上其实有非常多的相似点,例如:

  • 主节点会向所有的从节点发出提案;
  • 主节点在接收到一组从节点中一半以上节点的确认后,才会认为当前提案被提交了;
  • Zab 协议中的每一个提案都包含一个 epoch 值,与 Paxos 中的 Ballot 非常相似;

因为它们有一些相同的特点,所以有的观点会认为 Zab 是 Paxos 的一个简化版本,但是 Zab 和 Paxos 在设计理念上就有着比较大的不同,两者的主要区别就在于 Zab 主要是为构建高可用的主备系统设计的,而 Paxos 能够帮助工程师搭建具有一致性的状态机系统。

作为一个一致性状态机系统,它能够保证集群中任意一个状态机副本都按照客户端的请求执行了相同顺序的请求,即使来自客户端请求是异步的并且不同客户端的接收同一个请求的顺序不同,集群中的这些副本就是会使用 Paxos 或者它的变种对提案达成一致;在集群运行的过程中,如果主节点出现了错误导致宕机,其他的节点会重新开始进行选举并处理未提交的请求。

但是在类似 Zookeeper 的高可用主备系统中,所有的副本都需要对增量的状态更新顺序达成一致,这些状态更新的变量都是由主节点创建并发送给其他的从节点的,每一个从节点都会严格按照顺序逐一的执行主节点生成的状态更新请求,如果 Zookeeper 集群中的主节点发生了宕机,新的主节点也必须严格按照顺序对请求进行恢复。

总的来说,使用状态更新节点数据的主备系统相比根据客户端请求改变状态的状态机系统对于请求的执行顺序有着更严格的要求。

这一节对于 Zab 和 Paxos 区别的介绍大都来自于 Zab vs. Paxos ,有兴趣的读者可以阅读相关的内容。

:::

参考资料

ZooKeeper 运维指南

单点服务部署

在安装 ZooKeeper 之前,请确保你的系统是在以下任一操作系统上运行:

  • 任意 Linux OS - 支持开发和部署。适合演示应用程序。
  • Windows OS - 仅支持开发。
  • Mac OS - 仅支持开发。

安装步骤如下:

下载解压

进入官方下载地址:http://zookeeper.apache.org/releases.html#download ,选择合适版本。

解压到本地:

1
2
tar -zxf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6

环境变量

执行 vim /etc/profile,添加环境变量:

1
2
export ZOOKEEPER_HOME=/usr/app/zookeeper-3.4.14
export PATH=$ZOOKEEPER_HOME/bin:$PATH

再执行 source /etc/profile , 使得配置的环境变量生效。

修改配置

你必须创建 conf/zoo.cfg 文件,否则启动时会提示你没有此文件。

初次尝试,不妨直接使用 Kafka 提供的模板配置文件 conf/zoo_sample.cfg

1
cp conf/zoo_sample.cfg conf/zoo.cfg

修改后完整配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

配置参数说明:

  • tickTime:用于计算的基础时间单元。比如 session 超时:N*tickTime;
  • initLimit:用于集群,允许从节点连接并同步到 master 节点的初始化连接时间,以 tickTime 的倍数来表示;
  • syncLimit:用于集群, master 主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
  • dataDir:数据存储位置;
  • dataLogDir:日志目录;
  • clientPort:用于客户端连接的端口,默认 2181

启动服务

执行以下命令

1
bin/zkServer.sh start

执行此命令后,你将收到以下响应

1
2
3
JMX enabled by default
Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

停止服务

可以使用以下命令停止 zookeeper 服务器。

1
bin/zkServer.sh stop

集群服务部署

分布式系统节点数一般都要求是奇数,且最少为 3 个节点,Zookeeper 也不例外。

这里,规划一个含 3 个节点的最小 ZooKeeper 集群,主机名分别为 hadoop001,hadoop002,hadoop003 。

修改配置

修改配置文件 zoo.cfg,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/
dataLogDir=/usr/local/zookeeper-cluster/log/
clientPort=2181

# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
# 指名集群间通讯端口和选举端口
server.1=hadoop001:2287:3387
server.2=hadoop002:2287:3387
server.3=hadoop003:2287:3387

标识节点

分别在三台主机的 dataDir 目录下新建 myid 文件,并写入对应的节点标识。Zookeeper 集群通过 myid 文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出 Leader 节点。

创建存储目录:

1
2
# 三台主机均执行该命令
mkdir -vp /usr/local/zookeeper-cluster/data/

创建并写入节点标识到 myid 文件:

1
2
3
4
5
6
# hadoop001主机
echo "1" > /usr/local/zookeeper-cluster/data/myid
# hadoop002主机
echo "2" > /usr/local/zookeeper-cluster/data/myid
# hadoop003主机
echo "3" > /usr/local/zookeeper-cluster/data/myid

启动集群

分别在三台主机上,执行如下命令启动服务:

1
/usr/app/zookeeper-cluster/zookeeper/bin/zkServer.sh start

集群验证

启动后使用 zkServer.sh status 查看集群各个节点状态。

参考资料

HBase 命令

进入 HBase Shell 控制台:./bin/hbase shell

如果有 kerberos 认证,需要事先使用相应的 keytab 进行一下认证(使用 kinit 命令),认证成功之后再使用 hbase shell 进入可以使用 whoami 命令可查看当前用户.

基本命令

  • 获取帮助信息:help
  • 获取命令的详细帮助信息:help 'status'
  • 查看服务器状态:status
  • 查看版本信息:version
  • 查看当前登录用户:whoami

DDL

创建表

【语法】create '表名称','列族名称 1','列族名称 2','列名称 N'

【示例】

1
2
# 创建一张名为 test 的表,columnFamliy1、columnFamliy2 是 table1 表的列族。
create 'test','columnFamliy1','columnFamliy2'

启用、禁用表

  • 启用表:enable 'test'
  • 禁用表:disable 'test'
  • 检查表是否被启用:is_enabled 'test'
  • 检查表是否被禁用:is_disabled 'test'

删除表

注意:删除表前需要先禁用表

1
2
disable 'test'
drop 'test'

修改表

添加列族

命令格式: alter ‘表名’, ‘列族名’

1
alter 'test', 'teacherInfo'

删除列族

命令格式:alter ‘表名’, {NAME => ‘列族名’, METHOD => ‘delete’}

1
alter 'test', {NAME => 'teacherInfo', METHOD => 'delete'}

更改列族存储版本的限制

默认情况下,列族只存储一个版本的数据,如果需要存储多个版本的数据,则需要修改列族的属性。修改后可通过 desc 命令查看。

1
alter 'test',{NAME=>'columnFamliy1',VERSIONS=>3}

查看表

  • 查看所有表:list
  • 查看表的详细信息:describe 'test'
  • 检查表是否存在:exists 'test'

增删改

插入数据

命令格式put '表名', '行键','列族:列','值'

注意:如果新增数据的行键值、列族名、列名与原有数据完全相同,则相当于更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
put 'test', 'rowkey1', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey2', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey2', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey2', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey3', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey3', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey3', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey1', 'columnFamliy2:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy2:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy2:c', 'valueC'

获取指定行、列族、列

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

删除指定行、列

  • 删除指定行:delete 'test','rowkey2'
  • 删除指定行中指定列的数据:delete 'test','rowkey2','columnFamliy1:a'

查询

hbase 中访问数据有两种基本的方式:

  • 按指定 rowkey 获取数据:get 方法;
  • 按指定条件获取数据:scan 方法。

scan 可以设置 begin 和 end 参数来访问一个范围内所有的数据。get 本质上就是 begin 和 end 相等的一种特殊的 scan。

get 查询

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

scan 查询

查询整表数据

1
scan 'test'

查询指定列簇的数据

1
scan 'test', {COLUMN=>'columnFamliy1'}

条件查询

1
2
# 查询指定列的数据
scan 'test', {COLUMNS=> 'columnFamliy1:a'}

除了列 (COLUMNS) 修饰词外,HBase 还支持 Limit(限制查询结果行数),STARTROWROWKEY 起始行,会先根据这个 key 定位到 region,再向后扫描)、STOPROW(结束行)、TIMERANGE(限定时间戳范围)、VERSIONS(版本数)、和 FILTER(按条件过滤行)等。

如下代表从 rowkey2 这个 rowkey 开始,查找下两个行的最新 3 个版本的 name 列的数据:

1
scan 'test', {COLUMNS=> 'columnFamliy1:a',STARTROW => 'rowkey2',STOPROW => 'rowkey3',LIMIT=>2, VERSIONS=>3}

条件过滤

Filter 可以设定一系列条件来进行过滤。如我们要查询值等于 24 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'binary:24')"

值包含 valueA 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'substring:valueA')"

列名中的前缀为 b 的:

1
scan 'test', FILTER=>"ColumnPrefixFilter('b')"

FILTER 中支持多个过滤条件通过括号、AND 和 OR 进行组合:

1
2
# 列名中的前缀为 b 且列值中包含1998的数据
scan 'test', FILTER=>"ColumnPrefixFilter('b') AND ValueFilter ValueFilter(=,'substring:A')"

PrefixFilter 用于对 Rowkey 的前缀进行判断:

1
scan 'test', FILTER=>"PrefixFilter('wr')"

参考资料

Zipkin 快速入门

Zipkin 是一个基于 Java 开发的、开源的、分布式实时数据跟踪系统(Distributed Tracking System)。它采集有助于解决服务架构中延迟问题的实时数据。

Zipkin 主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver 的 Pinpoint、Apache 的 HTrace、阿里的鹰眼 Tracing、京东的 Hydra、新浪的 Watchman,美团点评的 CAT,skywalking 等。

Zipkin 基于 Google Dapper 的论文设计而来,由 Twitter 公司开发贡献。

一、Zipkin 简介

特性

如果日志文件中有跟踪 ID,则可以直接跳至该跟踪 ID。 否则,您可以基于属性进行查询,例如服务,操作名称,标签和持续时间。 将为您总结一些有趣的数据,例如在服务中花费的时间百分比以及操作是否失败。

Zipkin UI 还提供了一个依赖关系图,该关系图显示了每个应用程序中跟踪了多少个请求。这对于识别聚合行为(包括错误路径或对不赞成使用的服务的调用)很有帮助。

Zipkin UI

多平台

Zipkin 官方支持 C#、Go、Java、JavaScript、Ruby、Scala、PHP 语言。

除此以外,社区还贡献了多种其他语言的支持,详情可以参考官方文档:Tracers and Instrumentation

数据

Zipkin 服务器捆绑了用于采集和存储数据的扩展。

默认情况下,数据可以通过 HttpKafkaRabbitMQ 或 RPC 传输。

并存储在内存中或 MySQLCassandraElasticsearch 中。

数据以 json 形式存储,可以参考:Zipkin 官方的 Swagger API

Zipkin Swagger API

二、Zipkin 安装

Docker

Docker 启动方式:

1
docker run -d -p 9411:9411 openzipkin/zipkin

Java

注意:必须运行在 JDK8+ 环境

Java 启动方式:

1
2
curl -sSL https://zipkin.io/quickstart.sh | bash -s
java -jar zipkin.jar

编译方式

适用于需要订制化的场景。

1
2
3
4
5
6
7
# get the latest source
git clone https://github.com/openzipkin/zipkin
cd zipkin
# Build the server and also make its dependencies
./mvnw -DskipTests --also-make -pl zipkin-server clean install
# Run the server
java -jar ./zipkin-server/target/zipkin-server-*exec.jar

三、Zipkin 架构

ZipKin 可以分为两部分,

  • 一部分是 Zipkin server,用来作为数据的采集存储、数据分析与展示;
  • 另一部分是 Zipkin client 是 Zipkin 基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能。

架构如下:

Zipkin 架构

Zipkin Server

Zipkin Server 主要包括四个模块:

  • Collector - 负责采集客户端传输的数据。
  • Storage - 负责存储采集的数据。当前支持 Memory,MySQL,Cassandra,ElasticSearch 等,默认存储在内存中。
  • API(Query) - 负责查询 Storage 中存储的数据。提供简单的 JSON API 获取数据,主要提供给 web UI 使用。
  • UI - 提供简单的 web 界面。

Instrumented Client 和 Instrumented Server,是指分布式架构中使用了 Trace 工具的两个应用,Client 会调用 Server 提供的服务,两者都会向 Zipkin 上报 Trace 相关信息。在 Client 和 Server 通过 Transport 上报 Trace 信息后,由 Zipkin 的 Collector 模块接收,并由 Storage 模块将数据存储在对应的存储介质中,然后 Zipkin 提供 API 供 UI 界面查询 Trace 跟踪信息。Non-Instrumented Server,指的是未使用 Trace 工具的 Server,显然它不会上报 Trace 信息。

Zipkin Client

  • Tracer - Tracer 存在于你的应用中,它负责采集关于已发生操作的实时元数据。它们通常会检测库,因此对于用户是透明的。例如,已检测的 Web 服务器记录它何时接收到请求,以及何时发送响应。收集的跟踪数据称为跨度(Span)。
  • Instrumentation - Instrumentation 保证了生产环境的安全性和很少的开销。因此,它们仅在内部传播 ID,以告知接收方正在进行追踪。完成的 Span 将通过外部通信告知 Zipkin,类似于应用程序异步报告指标的方式。例如,当跟踪某个操作并且需要发出 http 请求时,会添加一些 header 来传播 ID。header 不用于发送详细信息,例如操作名称。
  • Reporter - 能够将数据发送到 Zipkin 的检测应用程序中的组件,被称为 Reporter。Reporter 有多种传输方式,可以将跟踪数据发送到 Zipkin 采集器,后者将跟踪数据持久化保存到存储中。稍后,API 会查询存储以向 UI 提供渲染数据。

以下是 Zipkin 的一个示例工作流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
│ User Code │ │ Trace Instrumentation │ │ Http Client │ │ Zipkin Collector │
└─────────────┘ └───────────────────────┘ └─────────────┘ └──────────────────┘
│ │ │ │
┌─────────┐
│ ──┤GET /foo ├─▶ │ ────┐ │ │
└─────────┘ │ record tags
│ │ ◀───┘ │ │
────┐
│ │ │ add trace headers │ │
◀───┘
│ │ ────┐ │ │
│ record timestamp
│ │ ◀───┘ │ │
┌─────────────────┐
│ │ ──┤GET /foo ├─▶ │ │
│X-B3-TraceId: aa │ ────┐
│ │ │X-B3-SpanId: 6b │ │ │ │
└─────────────────┘ │ invoke
│ │ │ │ request │

│ │ │ │ │
┌────────┐ ◀───┘
│ │ ◀─────┤200 OK ├─────── │ │
────┐ └────────┘
│ │ │ record duration │ │
┌────────┐ ◀───┘
│ ◀──┤200 OK ├── │ │ │
└────────┘ ┌────────────────────────────────┐
│ │ ──┤ asynchronously report span ├────▶ │
│ │
│{ │
│ "traceId": "aa", │
│ "id": "6b", │
│ "name": "get", │
│ "timestamp": 1483945573944000,│
│ "duration": 386000, │
│ "annotations": [ │
│--snip-- │
└────────────────────────────────┘

Instrumented client 和 server 是分别使用了 ZipKin Client 的服务,Zipkin Client 会根据配置将追踪数据发送到 Zipkin Server 中进行数据存储、分析和展示。

四、Zipkin 客户端

Brave 是 Java 版的 zipkin 客户端。

一般不会手动编写 Trace 相关的代码,Brave 提供可一些开箱即用的库,帮助我们追踪一些特定的请求。比如:dubbo、grpc、servlet、mysql、httpClient、kafka、springMVC 等。

参考资料

Spring AOP

AOP 概念

什么是 AOP

AOP(Aspect-Oriented Programming,即 面向切面编程)与 OOP( Object-Oriented Programming,面向对象编程) 相辅相成,提供了与 OOP 不同的抽象软件结构的视角。

在 OOP 中,我们以类(class)作为我们的基本单元,而 AOP 中的基本单元是 Aspect(切面)

术语

Aspect(切面)

aspectpointcountadvice 组成, 它既包含了横切逻辑的定义, 也包括了连接点的定义. Spring AOP 就是负责实施切面的框架, 它将切面所定义的横切逻辑织入到切面所指定的连接点中.
AOP 的工作重心在于如何将增强织入目标对象的连接点上, 这里包含两个工作:

  1. 如何通过 pointcut 和 advice 定位到特定的 joinpoint 上
  2. 如何在 advice 中编写切面代码.

可以简单地认为, 使用 @Aspect 注解的类就是切面.

advice(增强)

由 aspect 添加到特定的 join point(即满足 point cut 规则的 join point) 的一段代码.
许多 AOP 框架, 包括 Spring AOP, 会将 advice 模拟为一个拦截器(interceptor), 并且在 join point 上维护多个 advice, 进行层层拦截.
例如 HTTP 鉴权的实现, 我们可以为每个使用 RequestMapping 标注的方法织入 advice, 当 HTTP 请求到来时, 首先进入到 advice 代码中, 在这里我们可以分析这个 HTTP 请求是否有相应的权限, 如果有, 则执行 Controller, 如果没有, 则抛出异常. 这里的 advice 就扮演着鉴权拦截器的角色了.

连接点(join point)

a point during the execution of a program, such as the execution of a method or the handling of an exception. In Spring AOP, a join point always represents a method execution.

程序运行中的一些时间点, 例如一个方法的执行, 或者是一个异常的处理.
在 Spring AOP 中, join point 总是方法的执行点, 即只有方法连接点.

切点(point cut)

匹配 join point 的谓词(a predicate that matches join points).
Advice 是和特定的 point cut 关联的, 并且在 point cut 相匹配的 join point 中执行.
在 Spring 中, 所有的方法都可以认为是 joinpoint, 但是我们并不希望在所有的方法上都添加 Advice, 而 pointcut 的作用就是提供一组规则(使用 AspectJ pointcut expression language 来描述) 来匹配joinpoint, 给满足规则的 joinpoint 添加 Advice.

关于 join point 和 point cut 的区别

在 Spring AOP 中, 所有的方法执行都是 join point. 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 因此 join point 和 point cut 本质上就是两个不同纬度上的东西.
advice 是在 join point 上执行的, 而 point cut 规定了哪些 join point 可以执行哪些 advice

introduction

为一个类型添加额外的方法或字段. Spring AOP 允许我们为 目标对象 引入新的接口(和对应的实现). 例如我们可以使用 introduction 来为一个 bean 实现 IsModified 接口, 并以此来简化 caching 的实现.

目标对象(Target)

织入 advice 的目标对象. 目标对象也被称为 advised object.
因为 Spring AOP 使用运行时代理的方式来实现 aspect, 因此 adviced object 总是一个代理对象(proxied object)
注意, adviced object 指的不是原来的类, 而是织入 advice 后所产生的代理类.

AOP proxy

一个类被 AOP 织入 advice, 就会产生一个结果类, 它是融合了原类和增强逻辑的代理类.
在 Spring AOP 中, 一个 AOP 代理是一个 JDK 动态代理对象或 CGLIB 代理对象.

织入(Weaving)

将 aspect 和其他对象连接起来, 并创建 adviced object 的过程.
根据不同的实现技术, AOP 织入有三种方式:

  • 编译器织入, 这要求有特殊的 Java 编译器.
  • 类装载期织入, 这需要有特殊的类装载器.
  • 动态代理织入, 在运行期为目标类添加增强(Advice)生成子类的方式.
    Spring 采用动态代理织入, 而 AspectJ 采用编译器织入和类装载期织入.

advice 的类型

  • before advice, 在 join point 前被执行的 advice. 虽然 before advice 是在 join point 前被执行, 但是它并不能够阻止 join point 的执行, 除非发生了异常(即我们在 before advice 代码中, 不能人为地决定是否继续执行 join point 中的代码)
  • after return advice, 在一个 join point 正常返回后执行的 advice
  • after throwing advice, 当一个 join point 抛出异常后执行的 advice
  • after(final) advice, 无论一个 join point 是正常退出还是发生了异常, 都会被执行的 advice.
  • around advice, 在 join point 前和 joint point 退出后都执行的 advice. 这个是最常用的 advice.

关于 AOP Proxy

Spring AOP 默认使用标准的 JDK 动态代理(dynamic proxy)技术来实现 AOP 代理, 通过它, 我们可以为任意的接口实现代理.
如果需要为一个类实现代理, 那么可以使用 CGLIB 代理. 当一个业务逻辑对象没有实现接口时, 那么 Spring AOP 就默认使用 CGLIB 来作为 AOP 代理了. 即如果我们需要为一个方法织入 advice, 但是这个方法不是一个接口所提供的方法, 则此时 Spring AOP 会使用 CGLIB 来实现动态代理. 鉴于此, Spring AOP 建议基于接口编程, 对接口进行 AOP 而不是类.

彻底理解 aspect, join point, point cut, advice

看完了上面的理论部分知识, 我相信还是会有不少朋友感觉到 AOP 的概念还是很模糊, 对 AOP 中的各种概念理解的还不是很透彻. 其实这很正常, 因为 AOP 中的概念是在是太多了, 我当时也是花了老大劲才梳理清楚的.
下面我以一个简单的例子来比喻一下 AOP 中 aspect, jointpoint, pointcut 与 advice 之间的关系.

让我们来假设一下, 从前有一个叫爪哇的小县城, 在一个月黑风高的晚上, 这个县城中发生了命案. 作案的凶手十分狡猾, 现场没有留下什么有价值的线索. 不过万幸的是, 刚从隔壁回来的老王恰好在这时候无意中发现了凶手行凶的过程, 但是由于天色已晚, 加上凶手蒙着面, 老王并没有看清凶手的面目, 只知道凶手是个男性, 身高约七尺五寸. 爪哇县的县令根据老王的描述, 对守门的士兵下命令说: 凡是发现有身高七尺五寸的男性, 都要抓过来审问. 士兵当然不敢违背县令的命令, 只好把进出城的所有符合条件的人都抓了起来.

来让我们看一下上面的一个小故事和 AOP 到底有什么对应关系.
首先我们知道, 在 Spring AOP 中 join point 指代的是所有方法的执行点, 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 对应到我们在上面举的例子, 我们可以做一个简单的类比, join point 就相当于 爪哇的小县城里的百姓, point cut 就相当于 老王所做的指控, 即凶手是个男性, 身高约七尺五寸, 而 advice 则是施加在符合老王所描述的嫌疑人的动作: 抓过来审问.
为什么可以这样类比呢?

  • join point –> 爪哇的小县城里的百姓: 因为根据定义, join point 是所有可能被织入 advice 的候选的点, 在 Spring AOP 中, 则可以认为所有方法执行点都是 join point. 而在我们上面的例子中, 命案发生在小县城中, 按理说在此县城中的所有人都有可能是嫌疑人.
  • point cut –> 男性, 身高约七尺五寸: 我们知道, 所有的方法(joint point) 都可以织入 advice, 但是我们并不希望在所有方法上都织入 advice, 而 pointcut 的作用就是提供一组规则来匹配 joinpoint, 给满足规则的 joinpoint 添加 advice. 同理, 对于县令来说, 他再昏庸, 也知道不能把县城中的所有百姓都抓起来审问, 而是根据凶手是个男性, 身高约七尺五寸, 把符合条件的人抓起来. 在这里凶手是个男性, 身高约七尺五寸 就是一个修饰谓语, 它限定了凶手的范围, 满足此修饰规则的百姓都是嫌疑人, 都需要抓起来审问.
  • advice –> 抓过来审问, advice 是一个动作, 即一段 Java 代码, 这段 Java 代码是作用于 point cut 所限定的那些 join point 上的. 同理, 对比到我们的例子中, 抓过来审问 这个动作就是对作用于那些满足 男性, 身高约七尺五寸爪哇的小县城里的百姓.
  • aspect: aspect 是 point cut 与 advice 的组合, 因此在这里我们就可以类比: “根据老王的线索, 凡是发现有身高七尺五寸的男性, 都要抓过来审问” 这一整个动作可以被认为是一个 aspect.

或则我们也可以从语法的角度来简单类比一下. 我们在学英语时, 经常会接触什么 定语, 被动句 之类的概念, 那么可以做一个不严谨的类比, 即 joinpoint 可以认为是一个 宾语, 而 pointcut 则可以类比为修饰 joinpoint 的定语, 那么整个 aspect 就可以描述为: 满足 pointcut 规则的 joinpoint 会被添加相应的 advice 操作.

@AspectJ 支持

@AspectJ 是一种使用 Java 注解来实现 AOP 的编码风格。

@AspectJ 风格的 AOP 是 AspectJ Project 在 AspectJ 5 中引入的, 并且 Spring 也支持 @AspectJ 的 AOP 风格.

使能 @AspectJ 支持

@AspectJ 可以以 XML 的方式或以注解的方式来使能, 并且不论以哪种方式使能@ASpectJ, 我们都必须保证 aspectjweaver.jar 在 classpath 中.

使用 Java Configuration 方式使能@AspectJ

1
2
3
4
@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
}

使用 XML 方式使能@AspectJ

1
<aop:aspectj-autoproxy/>

定义 aspect(切面)

当使用注解 @Aspect 标注一个 Bean 后, 那么 Spring 框架会自动收集这些 Bean, 并添加到 Spring AOP 中, 例如:

1
2
3
4
@Component
@Aspect
public class MyTest {
}

注意, 仅仅使用@Aspect 注解, 并不能将一个 Java 对象转换为 Bean, 因此我们还需要使用类似 @Component 之类的注解.
注意, 如果一个 类被@Aspect 标注, 则这个类就不能是其他 aspect 的 **advised object** 了, 因为使用 @Aspect 后, 这个类就会被排除在 auto-proxying 机制之外.

声明 pointcut

一个 pointcut 的声明由两部分组成:

  • 一个方法签名, 包括方法名和相关参数
  • 一个 pointcut 表达式, 用来指定哪些方法执行是我们感兴趣的(即因此可以织入 advice).

在@AspectJ 风格的 AOP 中, 我们使用一个方法来描述 pointcut, 即:

1
2
@Pointcut("execution(* com.xys.service.UserService.*(..))") // 切点表达式
private void dataAccessOperation() {} // 切点前面

这个方法必须无返回值.
这个方法本身就是 pointcut signature, pointcut 表达式使用@Pointcut 注解指定.
上面我们简单地定义了一个 pointcut, 这个 pointcut 所描述的是: 匹配所有在包 com.xys.service.UserService 下的所有方法的执行.

切点标志符(designator)

AspectJ5 的切点表达式由标志符(designator)和操作参数组成. 如 “execution(* greetTo(..))” 的切点表达式, **execution** 就是 标志符, 而圆括号里的 *****greetTo(..) 就是操作参数

execution

匹配 join point 的执行, 例如 “execution(* hello(..))” 表示匹配所有目标类中的 hello() 方法. 这个是最基本的 pointcut 标志符.

within

匹配特定包下的所有 join point, 例如 within(com.xys.*) 表示 com.xys 包中的所有连接点, 即包中的所有类的所有方法. 而within(com.xys.service.*Service) 表示在 com.xys.service 包中所有以 Service 结尾的类的所有的连接点.

this 与 target

this 的作用是匹配一个 bean, 这个 bean(Spring AOP proxy) 是一个给定类型的实例(instance of). 而 target 匹配的是一个目标对象(target object, 即需要织入 advice 的原始的类), 此对象是一个给定类型的实例(instance of).

bean

匹配 bean 名字为指定值的 bean 下的所有方法, 例如:

1
2
bean(*Service) // 匹配名字后缀为 Service 的 bean 下的所有方法
bean(myService) // 匹配名字为 myService 的 bean 下的所有方法
args

匹配参数满足要求的的方法.
例如:

1
2
3
4
5
6
7
8
@Pointcut("within(com.xys.demo2.*)")
public void pointcut2() {
}

@Before(value = "pointcut2() && args(name)")
public void doSomething(String name) {
logger.info("---page: {}---", name);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class NormalService {
private Logger logger = LoggerFactory.getLogger(getClass());

public void someMethod() {
logger.info("---NormalService: someMethod invoked---");
}

public String test(String name) {
logger.info("---NormalService: test invoked---");
return "服务一切正常";
}
}

当 NormalService.test 执行时, 则 advice doSomething 就会执行, test 方法的参数 name 就会传递到 doSomething 中.

常用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配只有一个参数 name 的方法
@Before(value = "aspectMethod() && args(name)")
public void doSomething(String name) {
}

// 匹配第一个参数为 name 的方法
@Before(value = "aspectMethod() && args(name, ..)")
public void doSomething(String name) {
}

// 匹配第二个参数为 name 的方法
Before(value = "aspectMethod() && args(*, name, ..)")
public void doSomething(String name) {
}
@annotation

匹配由指定注解所标注的方法, 例如:

1
2
3
@Pointcut("@annotation(com.xys.demo1.AuthChecker)")
public void pointcut() {
}

则匹配由注解 AuthChecker 所标注的方法.

常见的切点表达式

匹配方法签名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配指定包中的所有的方法
execution(* com.xys.service.*(..))

// 匹配当前包中的指定类的所有方法
execution(* UserService.*(..))

// 匹配指定包中的所有 public 方法
execution(public * com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且返回值是 int 类型的方法
execution(public int com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且第一个参数是 String, 返回值是 int 类型的方法
execution(public int com.xys.service.*(String name, ..))
匹配类型签名
1
2
3
4
5
6
7
8
9
10
11
12
// 匹配指定包中的所有的方法, 但不包括子包
within(com.xys.service.*)

// 匹配指定包中的所有的方法, 包括子包
within(com.xys.service..*)

// 匹配当前包中的指定类中的方法
within(UserService)


// 匹配一个接口的所有实现类中的实现的方法
within(UserDao+)
匹配 Bean 名字
1
2
// 匹配以指定名字结尾的 Bean 中的所有方法
bean(*Service)
切点表达式组合
1
2
3
4
5
// 匹配以 Service 或 ServiceImpl 结尾的 bean
bean(*Service || *ServiceImpl)

// 匹配名字以 Service 结尾, 并且在包 com.xys.service 中的 bean
bean(*Service) && within(com.xys.service.*)

声明 advice

advice 是和一个 pointcut 表达式关联在一起的, 并且会在匹配的 join point 的方法执行的前/后/周围 运行. pointcut 表达式可以是简单的一个 pointcut 名字的引用, 或者是完整的 pointcut 表达式.
下面我们以几个简单的 advice 为例子, 来看一下一个 advice 是如何声明的.

Before advice

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @author xiongyongshun
* @version 1.0
* @created 16/9/9 13:13
*/
@Component
@Aspect
public class BeforeAspectTest {
// 定义一个 Pointcut, 使用 切点表达式函数 来描述对哪些 Join point 使用 advise.
@Pointcut("execution(* com.xys.service.UserService.*(..))")
public void dataAccessOperation() {
}
}
1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Before("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public void doBeforeAccessCheck(JoinPoint joinPoint) {
System.out.println("*****Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

这里, @Before 引用了一个 pointcut, 即 “com.xys.aspect.PointcutDefine.dataAccessOperation()” 是一个 pointcut 的名字.
如果我们在 advice 在内置 pointcut, 则可以:

1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 将 pointcut 和 advice 同时定义
@Before("within(com.xys.service..*)")
public void doAccessCheck(JoinPoint joinPoint) {
System.out.println("*****doAccessCheck, Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

around advice

around advice 比较特别, 它可以在一个方法的之前之前和之后添加不同的操作, 并且甚至可以决定何时, 如何, 是否调用匹配到的方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Around("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public Object doAroundAccessCheck(ProceedingJoinPoint pjp) throws Throwable {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 开始
Object retVal = pjp.proceed();
stopWatch.stop();
// 结束
System.out.println("invoke method: " + pjp.getSignature().getName() + ", elapsed time: " + stopWatch.getTotalTimeMillis());
return retVal;
}
}

around advice 和前面的 before advice 差不多, 只是我们把注解 @Before 改为了 @Around 了.

参考资料

HDFS 应用

HDFSHadoop Distributed File System 的缩写,即 Hadoop 的分布式文件系统。

HDFS 是一种用于存储具有流数据访问模式的超大文件的文件系统,它运行在廉价的机器集群上。

HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供 PB 级的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

HDFS 命令

显示当前目录结构

1
2
3
4
5
6
# 显示当前目录结构
hdfs dfs -ls <path>
# 递归显示当前目录结构
hdfs dfs -ls -R <path>
# 显示根目录下内容
hdfs dfs -ls /

创建目录

1
2
3
4
# 创建目录
hdfs dfs -mkdir <path>
# 递归创建目录
hdfs dfs -mkdir -p <path>

删除操作

1
2
3
4
# 删除文件
hdfs dfs -rm <path>
# 递归删除目录和文件
hdfs dfs -rm -R <path>

导入文件到 HDFS

1
2
3
# 二选一执行即可
hdfs dfs -put [localsrc] [dst]
hdfs dfs -copyFromLocal [localsrc] [dst]

从 HDFS 导出文件

1
2
3
# 二选一执行即可
hdfs dfs -get [dst] [localsrc]
hdfs dfs -copyToLocal [dst] [localsrc]

查看文件内容

1
2
3
# 二选一执行即可
hdfs dfs -text <path>
hdfs dfs -cat <path>

显示文件的最后一千字节

1
2
3
hdfs dfs -tail <path>
# 和 Linux 下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hdfs dfs -tail -f <path>

拷贝文件

1
hdfs dfs -cp [src] [dst]

移动文件

1
hdfs dfs -mv [src] [dst]

统计当前目录下各文件大小

  • 默认单位字节
  • -s : 显示所有文件大小总和,
  • -h : 将以更友好的方式显示文件大小(例如 64.0m 而不是 67108864)
1
hdfs dfs -du <path>

合并下载多个文件

  • -nl 在每个文件的末尾添加换行符(LF)
  • -skip-empty-file 跳过空文件
1
2
3
hdfs dfs -getmerge
# 示例 将 HDFS 上的 hbase-policy.xml 和 hbase-site.xml 文件合并后下载到本地的/usr/test.xml
hdfs dfs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml

统计文件系统的可用空间信息

1
hdfs dfs -df -h /

更改文件复制因子

1
hdfs dfs -setrep [-R] [-w] <numReplicas> <path>
  • 更改文件的复制因子。如果 path 是目录,则更改其下所有文件的复制因子
  • -w : 请求命令是否等待复制完成
1
2
# 示例
hdfs dfs -setrep -w 3 /user/hadoop/dir1

权限控制

1
2
3
4
5
6
7
# 权限控制和 Linux 上使用方式一致
# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hdfs dfs -chgrp [-R] GROUP URI [URI ...]
# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
# 修改文件的拥有者 用户必须是超级用户。
hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

文件检测

1
hdfs dfs -test - [defsz]  URI

可选选项:

  • -d:如果路径是目录,返回 0。
  • -e:如果路径存在,则返回 0。
  • -f:如果路径是文件,则返回 0。
  • -s:如果路径不为空,则返回 0。
  • -r:如果路径存在且授予读权限,则返回 0。
  • -w:如果路径存在且授予写入权限,则返回 0。
  • -z:如果文件长度为零,则返回 0。
1
2
# 示例
hdfs dfs -test -e filename

HDFS API

简介

想要使用 HDFS API,需要导入依赖 hadoop-client。如果是 CDH 版本的 Hadoop,还需要额外指明其仓库地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.heibaiying</groupId>
<artifactId>hdfs-java-api</artifactId>
<version>1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0-cdh5.15.2</hadoop.version>
</properties>

<!---配置 CDH 仓库地址-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<dependencies>
<!--Hadoop-client-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>

FileSystem

FileSystem 是所有 HDFS 操作的主入口。由于之后的每个单元测试都需要用到它,这里使用 @Before 注解进行标注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;

@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的 Hadoop, 所以副本系数设置为 1, 默认值为 3
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}

@After
public void destroy() {
fileSystem = null;
}

FileSystem 官方 Java API 文档

创建目录

支持递归创建目录:

1
2
3
4
@Test
public void mkDir() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}

创建指定权限的目录

FsPermission(FsAction u, FsAction g, FsAction o) 的三个参数分别对应:创建者权限,同组其他用户权限,其他用户权限,权限值定义在 FsAction 枚举类中。

1
2
3
4
5
@Test
public void mkDirWithPermission() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}

创建文件,并写入内容

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void create() throws Exception {
// 如果文件存在,默认会覆盖,可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
true, 4096);
out.write("hello hadoop!".getBytes());
out.write("hello spark!".getBytes());
out.write("hello flink!".getBytes());
// 强制将缓冲区中内容刷出
out.flush();
out.close();
}

判断文件是否存在

1
2
3
4
5
@Test
public void exist() throws Exception {
boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
System.out.println(exists);
}

查看文件内容

查看小文本文件的内容,直接转换成字符串后输出:

1
2
3
4
5
6
@Test
public void readToString() throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);
}

inputStreamToString 是一个自定义方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 把输入流转换为指定编码的字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("\n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

文件重命名

1
2
3
4
5
6
7
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfs-api/test/a.txt");
Path newPath = new Path("/hdfs-api/test/b.txt");
boolean result = fileSystem.rename(oldPath, newPath);
System.out.println(result);
}

删除目录或文件

1
2
3
4
5
6
7
8
9
public void delete() throws Exception {
/*
* 第二个参数代表是否递归删除
* + 如果 path 是一个目录且递归删除为 true, 则删除该目录及其中所有文件;
* + 如果 path 是一个目录但递归删除为 false, 则会则抛出异常。
*/
boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
System.out.println(result);
}

上传文件到 HDFS

1
2
3
4
5
6
7
@Test
public void copyFromLocalFile() throws Exception {
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:\\BigData-Notes\\notes\\installation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);
}

上传大文件并显示上传进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void copyFromLocalBigFile() throws Exception {

File file = new File("D:\\kafka.tgz");
final float fileSize = file.length();
InputStream in = new BufferedInputStream(new FileInputStream(file));

FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
new Progressable() {
long fileCount = 0;

public void progress() {
fileCount++;
// progress 方法每上传大约 64KB 的数据后就会被调用一次
System.out.println("上传进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
}
});

IOUtils.copyBytes(in, out, 4096);

}

从 HDFS 上下载文件

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:\\app\\");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是 true, 即删除;
* 最后一个参数表示是否将 RawLocalFileSystem 用作本地文件系统;
* RawLocalFileSystem 默认为 false, 通常情况下可以不设置,
* 但如果你在执行时候抛出 NullPointerException 异常,则代表你的文件系统与程序可能存在不兼容的情况 (window 下常见),
* 此时可以将 RawLocalFileSystem 设置为 true
*/
fileSystem.copyToLocalFile(false, src, dst, true);
}

查看指定目录下所有文件的信息

1
2
3
4
5
6
7
public void listFiles() throws Exception {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
for (FileStatus fileStatus : statuses) {
//fileStatus 的 toString 方法被重写过,直接打印可以看到所有信息
System.out.println(fileStatus.toString());
}
}

FileStatus 中包含了文件的基本信息,比如文件路径,是否是文件夹,修改时间,访问时间,所有者,所属组,文件权限,是否是符号链接等,输出内容示例如下:

1
2
3
4
5
6
7
8
9
10
FileStatus{
path=hdfs://192.168.0.106:8020/hdfs-api/test;
isDirectory=true;
modification_time=1556680796191;
access_time=0;
owner=root;
group=supergroup;
permission=rwxr-xr-x;
isSymlink=false
}

递归查看指定目录下所有文件的信息

1
2
3
4
5
6
7
@Test
public void listFilesRecursive() throws Exception {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hbase"), true);
while (files.hasNext()) {
System.out.println(files.next());
}
}

和上面输出类似,只是多了文本大小,副本系数,块大小信息。

1
2
3
4
5
6
7
8
9
10
11
LocatedFileStatus{
path=hdfs://192.168.0.106:8020/hbase/hbase.version;
isDirectory=false;
length=7;
replication=1;
blocksize=134217728;
modification_time=1554129052916;
access_time=1554902661455;
owner=root; group=supergroup;
permission=rw-r--r--;
isSymlink=false}

查看文件的块信息

1
2
3
4
5
6
7
8
9
@Test
public void getFileBlockLocations() throws Exception {

FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blocks) {
System.out.println(block);
}
}

块输出信息有三个值,分别是文件的起始偏移量 (offset),文件大小 (length),块所在的主机名 (hosts)。

1
0,57028557,hadoop001

这里我上传的文件只有 57M(小于 128M),且程序中设置了副本系数为 1,所有只有一个块信息。

参考资料

Hadoop 面试

简介

【初级】简介一下大数据技术生态?

:::details 要点

  • 数据采集:Flume、Sqoop、Logstash、Filebeat
  • 分布式文件存储:Hadoop HDFS
  • NoSql
    • 文档数据库:Mongodb
    • 列式数据库:HBase
    • 搜索引擎:Solr、Elasticsearch
  • 分布式计算
    • 批处理:Hadoop MapReduce
    • 流处理:Storm、Kafka
    • 混合处理:Spark、Flink
  • 查询分析:Hive、Spark SQL、Flink SQL、Pig、Phoenix
  • 集群资源管理:Hadoop YARN
  • 分布式协调:Zookeeper
  • 任务调度:Azkaban、Oozie
  • 集群部署和监控:Ambari、Cloudera Manager

:::

【初级】什么是 HDFS?

:::details 要点

HDFSHadoop Distributed File System 的缩写,即 Hadoop 的分布式文件系统。

HDFS 是一种用于存储具有流数据访问模式的超大文件的文件系统,它运行在廉价的机器集群上。

HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供 PB 级的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

HDFS 的常见使用场景:

  • 大数据存储 - HDFS 能够存储 PB 级甚至 EB 级的数据,适合存储日志数据、传感器数据、社交媒体数据等。
  • 批处理与分析 - HDFS 是 Hadoop MapReduce 的默认存储系统,MapReduce 作业直接从 HDFS 读取数据并进行分布式计算。
  • 数据仓库 - HDFS 可以作为数据仓库的底层存储,支持大规模数据的离线分析。
  • 数据冷备 - 由于 HDFS 的高可靠和低成本,适用于存储访问频率较低的冷数据(如历史数据、备份数据)。
  • 多媒体数据存储:HDFS 适合存储大规模的多媒体数据(如图像、视频、音频)。

:::

【初级】HDFS 有什么特性(优缺点)?

:::details 要点

HDFS 的优点

  • 高可用 - 冗余数据副本,支持自动故障恢复;支持 NameNode HA、安全模式
  • 易扩展 - 能够处理 10K 节点的规模;处理数据达到 GB、TB、甚至 PB 级别的数据;能够处理百万规模以上的文件数量,数量相当之大。
  • 批处理 - 流式数据访问;数据位置暴露给计算框架
  • 低成本 - HDFS 构建在廉价的商用机器上。

HDFS 的缺点

  • 不适合低延迟数据访问 - 适合高吞吐率的场景,就是在某一时间内写入大量的数据。但是它在低延时的情况下是不行的,比如毫秒级以内读取数据,它是很难做到的。
  • 不适合大量小文件存储
    • 存储大量小文件(这里的小文件是指小于 HDFS 系统的 Block 大小的文件(默认 64M)) 的话,它会占用 NameNode 大量的内存来存储文件、目录和块信息。这样是不可取的,因为 NameNode 的内存总是有限的。
    • 磁盘寻道时间超过读取时间
  • 不支持并发写入 - 一个文件同时只能有一个写入者
  • 不支持文件随机修改 - 仅支持追加写入

:::

【初级】什么是 YARN?

:::details 要点

YARN(Yet Another Resource Negotiator,即另一种资源调度器) 是 Hadoop 的集群资源管理系统。YARN 负责资源管理和调度。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。

在 Hadoop 1.x 版本,MapReduce 中的 jobTracker 担负了太多的责任,接收任务是它,资源调度是它,监控 TaskTracker 运行情况还是它。这样实现的好处是比较简单,但相对的,就容易出现一些问题,比如常见的单点故障问题。要解决这些问题,只能将 jobTracker 进行拆分,将其中部分功能拆解出来。沿着这个思路,于是有了 YARN。

:::

【初级】什么是 MapReduce?

:::details 要点

MapReduce 是 Hadoop 项目中的分布式计算框架。它降低了分布式计算的门槛,可以让用户轻松编写程序,让其以可靠、容错的方式运行在大型集群上并行处理海量数据(TB 级)。

MapReduce 的设计思路是:

  • 分而治之,并行计算
  • 移动计算,而非移动数据

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 任务以并行的方式处理。框架对 map 的输出进行排序,然后将其输入到 reduce 任务中。作业的输入和输出都存储在文件系统中。该框架负责调度任务、监控任务并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即 MapReduce 框架和 HDFS 在同一组节点上运行。此配置允许框架在已存在数据的节点上有效地调度任务,从而在整个集群中实现非常高的聚合带宽。

MapReduce 框架由一个主 ResourceManager、每个集群节点一个工作程序 NodeManager 和每个应用程序的 MRAppMaster (YARN 组件) 组成。

MapReduce 框架仅对 <key、value> 对进行作,也就是说,框架将作业的输入视为一组 <key、value> 对,并生成一组 <key、value> 对作为作业的输出,可以想象是不同的类型。类必须可由框架序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口,以便于按框架进行排序。

MapReduce 作业的 Input 和 Output 类型:

1
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce 适用场景:

  • 数据统计,如:网站的 PV、UV 统计
  • 搜索引擎构建索引
  • 海量数据查询

MapReduce 不适用场景:

  • OLAP - 要求毫秒或秒级返回结果
  • 流计算 - 流计算的输入数据集是动态的,而 MapReduce 是静态的
  • DAG 计算
    • 多个作业存在依赖关系,后一个的输入是前一个的输出,构成有向无环图 DAG
    • 每个 MapReduce 作业的输出结果都会落盘,造成大量磁盘 IO,导致性能非常低下

:::

【初级】MapReduce 有什么特性(优缺点)?

:::details 要点

MapReduce 有以下特性:

  • 移动计算,而非移动数据
  • 良好的扩展性:计算能力随着节点数增加,近似线性递增
  • 高可用
  • 适合海量数据的离线批处理
  • 降低了分布式编程的门槛

:::

架构

【高级】HDFS 的架构是怎样设计的?

:::details 要点

HDFS 架构有以下几个核心要点:

  • 主从架构
  • 按块分区
  • 数据副本
  • 命名空间

(1)HDFS 主从架构

HDFS 采用 master/slave 架构。一个 HDFS 集群是由一个 NameNode 和一定数目的 DataNode 组成。NameNode 是一个中心服务器,负责管理文件系统的命名空间 (namespace) 以及客户端对文件的访问。集群中的 DataNode 一般是一个节点一个,负责管理它所在节点上的存储。HDFS 暴露了文件系统的命名空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组 DataNode 上。NameNode 执行文件系统的命名空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体 DataNode 节点的映射。DataNode 负责处理文件系统客户端的读写请求。在 NameNode 的统一调度下进行数据块的创建、删除和复制。

  • NameNode - 负责 HDFS 集群的管理、协调。具体来说,主要有以下职责:
    • 管理命名空间 - 执行有关命名空间的操作,例如打开,关闭、重命名文件和目录等。
    • 管理元数据 - 维护文件的位置、所有者、权限、数据块等。
    • 管理 Block 副本策略 - 默认 3 个副本
    • 客户端读写请求寻址
  • DataNode:负责提供来自文件系统客户端的读写请求,执行块的创建,删除等操作。具体来说,主要有以下职责:
    • 执行客户端发送的读写操作
    • 存储 Block 和数据校验和
    • 定期向 NameNode 发送心跳以续活
    • 定期向 NameNode 上报 Block 信息

(2)按块分区

HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。

将大文件分割成 Block 的主要目的是为了优化网络传输和数据处理的效率。这种分割机制使得文件的不同部分可以并行处理,大大提高了数据处理的速度。

HDFS Block 有以下要点:

  • Block 是 HDFS 最小存储单元
  • 文件写入 HDFS 会被切分成若干个 Block
  • Block 大小固定,默认为 128MB,可通过 dfs.blocksize 参数修改
  • 若一个 Block 的大小小于设定值,不会占用整个块空间
  • 默认情况下每个 Block 有 3 个副本

这实际上是典型的分布式分区思想,使得 HDFS 具备了扩展能力。

(3)数据复制

HDFS 被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有副本。每个文件的数据块大小和副本系数都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。HDFS 中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告 (Blockreport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

(4)命名空间

HDFS 支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统命名空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。HDFS 不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是 HDFS 架构并不妨碍实现这些特性。

NameNode 负责维护文件系统的命名空间,任何对文件系统命名空间或属性的修改都将被 NameNode 记录下来。应用程序可以设置 HDFS 保存的文件的副本数目。文件副本的数目称为文件的副本系数,这个信息也是由 NameNode 保存的。

:::

【中级】HDFS 使用 NameNode 的好处 ?

:::details 要点

HDFS 使用 NameNode 的好处主要体现在以下几个方面:

  • 中心化的元数据管理 - NameNode 在 HDFS 中负责存储整个文件系统的元数据,包括文件和目录的结构、每个文件的数据块信息及其在 DataNode 上的位置等。这种中心化的管理,使得文件系统的组织和管理变得更加简洁高效,并且可以确保整个文件系统的一致性。
  • 易扩展 - 由于实际的数据存储在 DataNode 上,而 NameNode 只存储元数据,这样的架构设计使得 HDFS 可以轻松扩展到处理 PB 级别甚至更大规模的数据集。
  • 快速的文件访问:用户或应用程序在访问文件时,首先与 NameNode 交互以获得数据块的位置信息,然后直接从 DataNode 读取数据。这种方式可以快速定位数据,提高文件访问的效率。
  • 容错和恢复机制:NameNode 可以监控 DataNode 的状态,实现系统的容错。在 DataNode 发生故障时,NameNode 可以指导其它 DataNode 复制丢失的数据块,保证数据的可靠性。
  • 简化数据管理:NameNode 的存在简化了数据的管理和维护。例如,在进行数据备份、系统升级或扩展时,管理员只需要关注 NameNode 上的元数据,而不是每个节点上存储的实际数据。

然而,由于 NameNode 是中心节点,它也成为了系统的一个潜在瓶颈和单点故障。因此,HDFS 后来引入了主备 NameNode 机制来保证 NameNode 自身的可用性。

:::

【中级】HDFS 使用 Block 的好处 ?

:::details 要点

HDFS 采用文件分块(Block)进行存储管理,主要是基于以下几个原因:

  • 提高可靠性和容错性 - 通过将文件分成多个块,并在不同的 DataNode 上存储这些块的副本,HDFS 可以提高数据的可靠性。即使某些 DataNode 出现故障,其他节点上的副本仍然可以用于数据恢复。
  • 提高数据处理效率:在处理大规模数据集时,将大文件分割成小块可以提高数据处理的效率。这样,可以并行地在多个节点上处理不同的块,从而加速数据处理和分析。
  • 提高网络传输效率:分块存储还有利于网络传输。当处理或传输一个大文件的部分数据时,只需处理或传输相关的几个块,而不是整个文件,这减少了网络传输负担。
  • 易于扩展:分块机制使得 HDFS 易于扩展。可以简单地通过增加更多的 DataNode 来扩大存储容量和处理能力,而不需要对现有的数据块进行任何修改。
  • 负载均衡:分块存储还有助于在集群中实现负载均衡。不同的数据块可以分布在不同的节点上,从而均衡各个节点的存储和处理负载。

:::

【中级】NameNode 与 SecondaryNameNode 的区别与联系 ?

:::details 要点

NameNode 和 SecondaryNameNode 的区别

  • NameNode 是 HDFS 的主要节点,负责管理文件系统的命名空间。它维护着整个文件系统的目录和文件结构,以及所有文件的元数据,包括文件的数据块(block)信息、数据块的位置等。
  • SecondaryNameNode 是 NameNode 的辅助节点。
    • SecondaryNameNode 不是 NameNode 的备份,不能在 NameNode 故障时接管其功能。
    • HDFS 在运行过程中,所有的事务(如文件创建、删除等)都会首先记录在 NameNode 的内存和 EditLog 中。SecondaryNameNode 定期从 NameNode 获取这些日志文件,与文件系统的命名空间镜像(FsImage)合并,然后把新的 FsImage 送回给 NameNode,以帮助减少 NameNode 的内存压力。

NameNode 和 SecondaryNameNode 的联系

  • 共同目标:二者共同目的是维护 HDFS 的稳定和高效运作。NameNode 作为核心,负责实时的元数据管理;而 SecondaryNameNode 辅助 NameNode,通过定期处理 FsImage 和 EditLog,减轻 NameNode 的负担。
  • 数据交互:SecondaryNameNode 的工作依赖于与 NameNode 的交互,从 NameNode 获取元数据的状态和编辑日志。

:::

【中级】什么是 FsImage 和 EditLog?

:::details 要点

HDFS 中,FsImageEditLog是两个关键的文件,用于存储和管理文件系统的元数据。它们的主要区别如下:

FsImage(文件系统镜像)

  • 内容FsImage包含 HDFS 元数据的完整快照,例如文件系统的目录树、文件和目录的属性等。
  • 静态性:它是在特定时间点上的静态快照。一旦创建,除非进行新的快照操作,否则内容不会改变。
  • 使用场景:在 NameNode 启动时使用,用于加载文件系统的最初状态。此外,在进行系统备份时也会生成新的FsImage
  • 更新频率:不是实时更新的。通常在系统进行 checkpoint 操作时才会更新。

EditLog(编辑日志)

  • 内容EditLog记录了自上一个FsImage快照以来所有对文件系统所做的增量更改。这些更改包括文件和目录的创建、删除、重命名等操作。
  • 动态性:它是一个动态更新的日志文件。每次对文件系统进行更改时,这个更改就会记录在EditLog中。
  • 使用场景:用于记录所有的文件系统更改操作。在 NameNode 重启时,FsImage将与EditLog结合使用,以重建文件系统的最新状态。
  • 更新频率:实时更新。每次对文件系统的更改都会迅速反映在EditLog中。

结合使用

在 HDFS 中,FsImageEditLog一起工作,以确保文件系统的元数据既能够被可靠地存储,又能够反映最新的更改。定期进行 checkpoint 操作(由 Secondary NameNode 或 Standby NameNode 执行)会将EditLog中的更改应用到FsImage中,创建一个新的、更新的快照。这样可以保证在系统重启或恢复时,可以快速加载最新的文件系统状态。

:::

【中级】YARN 有哪些核心组件?

:::details 要点

YARN Architecture

YARN 有以下核心组件:

  • ResourceManager - ResourceManager 是管理资源和安排在 YARN 上运行的中央调度器。整个系统有且只有一个 ResourceManager,因为号令发布都来自一处,因此不存在调度不一致的情况(很多分布式系统都是通过经典的一主多从模式来解决一致性问题的)。它也包含了两个主要的子组件:
    • 定时调度器(Scheduler) - 从本质上来说,定时调度器就是一种策略,或者说一种算法。当 Client 提交一个任务的时候,它会根据所需要的资源以及当前集群的资源状况进行分配。注意,它只负责向应用程序分配资源,并不做监控以及应用程序的状态跟踪。
    • 应用管理器(ApplicationManager) - 应用管理器就是负责管理 Client 提交的应用。上面不是说到定时调度器(Scheduler)不对用户提交的程序监控嘛,其实啊,监控应用的工作正是由应用管理器(ApplicationManager)完成的。
  • NodeManager - NodeManager 是 ResourceManager 在每台机器的上代理,负责容器的管理,并监控他们的资源使用情况(cpu、内存、磁盘及网络等),以及向 ResourceManager/Scheduler 提供这些资源使用报告。
  • ApplicationMaster - 每当 Client 提交一个 Application 时候,就会新建一个 ApplicationMaster 。由这个 ApplicationMaster 去与 ResourceManager 申请容器资源,获得资源后会将要运行的程序发送到容器上启动,然后进行分布式计算。这么设计的原因在于,数据量大的时候,移动数据成本太高,耗时太久,改为移动计算代价较小。
  • Container - Container 是 YARN 对资源的抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当 AM 向 RM 申请资源时,RM 为 AM 返回的资源是用 Container 表示的。
    • YARN 会为每个任务分配一个 Container,该任务只能使用该 Container 中描述的资源。
    • ApplicationMaster 可在 Container 内运行任何类型的任务。例如,MapReduce ApplicationMaster 请求一个容器来启动 map 或 reduce 任务,而 Giraph ApplicationMaster 请求一个容器来运行 Giraph 任务。
    • 容器由 NodeManager 启动和管理,并被它所监控。
    • 容器被 ResourceManager 所调度。

:::

【中级】MapReduce 有哪些核心组件?

:::details 要点

MapReduce 有以下核心组件:

  • Job - Job 表示 MapReduce 作业配置。Job 通常用于指定 Mapper、combiner(如果有)、PartitionerReducerInputFormatOutputFormat 实现。
  • Mapper - Mapper 负责将输入键值对映射到一组中间键值对。转换的中间记录不需要与输入记录具有相同的类型。一个给定的输入键值对可能映射到零个或多个输出键值对。
  • Combiner - combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作。它执行中间输出的本地聚合,这有助于减少从 Mapper 传输到 Reducer 的数据量。
  • Reducer - Reducer 将共享一个 key 的一组中间值归并为一个小的数值集。Reducer 有 3 个主要子阶段:shuffle,sort 和 reduce。
    • shuffle - Reducer 的输入就是 mapper 的排序输出。在这个阶段,框架通过 HTTP 获取所有 mapper 输出的相关分区。
    • sort - 在这个阶段中,框架将按照 key (因为不同 mapper 的输出中可能会有相同的 key) 对 Reducer 的输入进行分组。shuffle 和 sort 两个阶段是同时发生的。
    • reduce - 对按键分组的数据进行聚合统计。
  • Partitioner - Partitioner 负责控制 map 中间输出结果的键的分区。
    • 键(或者键的子集)用于产生分区,通常通过一个散列函数。
    • 分区总数与作业的 reduce 任务数是一样的。因此,它控制中间输出结果(也就是这条记录)的键发送给 m 个 reduce 任务中的哪一个来进行 reduce 操作。
  • InputFormat - InputFormat 描述 MapReduce 作业的输入规范。MapReduce 框架依赖作业的 InputFormat 来完成以下工作:
    • 确认作业的输入规范。
    • 把输入文件分割成多个逻辑的 InputSplit 实例,然后将每个实例分配给一个单独的 Mapper。InputSplit 表示要由单个 Mapper 处理的数据。
    • 提供 RecordReader 的实现。RecordReaderInputSplit 中读取 <key, value> 对,并提供给 Mapper 实现进行处理。
  • OutputFormat - OutputFormat 描述 MapReduce 作业的输出规范。MapReduce 框架依赖作业的 OutputFormat 来完成以下工作:
    • 确认作业的输出规范,例如检查输出路径是否已经存在。
    • 提供 RecordWriter 实现。RecordWriter 将输出 <key, value> 对到文件系统。

:::

工作流

【中级】HDFS 的写数据流程是怎样的?

:::details 要点

HDFS 写数据流程大致为:

  1. 按 Block 大小分割数据
  2. 通过 NameNode 寻址 DataNode
  3. 向 DataNode 写数据
  4. 完成后通知 NameNode

扩展:下面的漫画生动的展示了 HDFS 的写入流程,图片引用自博客:翻译经典 HDFS 原理讲解漫画

HDFS 写数据的源码流程:

  1. 客户端通过对 DistributedFileSystem 对象调用 create() 函数来新建文件
  2. 分布式文件系统对 NameNode 创建一个 RPC 调用,在文件系统的命名空间中新建一个文件
  3. NameNode 对新建文件进行检查无误后,分布式文件系统返回给客户端一个 FSDataOutputStream 对象,FSDataOutputStream 对象封装一个 DFSoutPutstream 对象,负责处理 NameNode 和 DataNode 之间的通信,客户端开始写入数据
  4. FSDataOutputStream数据分成一个一个的数据包,写入内部数据队列,DataStreamer 负责将数据包依次流式传输到由一组 NameNode 构成的管道中。
  5. DFSOutputStream 维护着确认队列来等待 DataNode 收到确认回执,收到管道中所有 DataNode 确认后,数据包从确认队列删除
  6. 客户端完成数据的写入,调用 close() 方法关闭传输通道。
  7. NameNode 确认完成

:::

【中级】HDFS 的读数据流程是怎样的?

:::details 要点

HDFS 读数据流程大致为:

  1. 客户端向 NameNode 查询文件信息
  2. NameNode 返回相关信息
    • 该文件的所有数据块
    • 每个数据块对应的 DataNode(按距离客户端的远近排序)
  3. 客户端向 DataNode 读数据

HDFS 读数据的源码流程:

  1. 客户端调用 FileSystem 对象的 open() 方法在 HDFS 中打开要读取的文件
  2. HDFS 通过使用 RPC(远程过程调用)来调用 NameNode,确定文件起始块(Block)的位置
  3. DistributedFileSystem 类返回一个支持文件定位的输入流 FSDataInputStream 对象,FSDataInputStream 对象接着封装 DFSInputStream 对象(存储着文件起始几个块的 DataNode 地址),客户端对这个输入流调用 read() 方法。
  4. DFSInputStream 连接距离最近的 DataNode,通过反复调用 read 方法,将数据从 DataNode 传输到客户端
  5. 到达块的末端时,DFSInputStream 关闭与该 DataNode 的连接,寻找下一个块的最佳 DataNode
  6. 客户端完成读取,对 FSDataInputStream 调用 close() 方法关闭连接

:::

【中级】MapReduce 是如何工作的?

:::details 要点

MapReduce 任务过程分为两个处理阶段:map 极端和 reduce 阶段。每阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还需要写两个函数:map 函数和 reduce 函数。

以词频统计为例,其工作流再细分一下,可以划分为以下阶段:

  1. input - 读取文本文件;
  2. splitting - 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping - 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling - 由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. reducing - 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

MapReduce 工作流

:::

【中级】YARN 是如何工作的?

:::details 要点

这张图简单地标明了提交一个程序所经历的流程,接下来我们来具体说说每一步的过程。

  1. Client 向 ResourceManager 申请运行一个 Application 进程,这里我们假设是一个 MapReduce 作业。
  2. ResourceManager 向 NodeManager 通信,为该 Application 进程分配第一个容器。并在这个容器中运行这个应用程序对应的 ApplicationMaster。
  3. ApplicationMaster 启动以后,对作业(也就是 Application) 进行拆分,拆分 task 出来,这些 task 可以运行在一个或多个容器中。然后向 ResourceManager 申请要运行程序的容器,并定时向 ResourceManager 发送心跳。
  4. 申请到容器后,ApplicationMaster 会去和容器对应的 NodeManager 通信,而后将作业分发到对应的 NodeManager 中的容器去运行,这里会将拆分后的 MapReduce 进行分发,对应容器中运行的可能是 Map 任务,也可能是 Reduce 任务。
  5. 容器中运行的任务会向 ApplicationMaster 发送心跳,汇报自身情况。当程序运行完成后, ApplicationMaster 再向 ResourceManager 注销并释放容器资源。

:::

复制

复制主要指通过网络在多台机器上保存相同数据的副本

复制数据,可能出于各种各样的原因:

  • 提高可用性 - 当部分组件出现位障,系统依然可以继续工作,系统依然可以继续工作。
  • 降低访问延迟 - 使数据在地理位置上更接近用户。
  • 提高读吞吐量 - 扩展至多台机器以同时提供数据访问服务。

所有分布式系统都需要支持复制。

【中级】HDFS 的副本机制是怎样的?

:::details 要点

基于块的副本

由于 Hadoop 被设计运行在廉价的机器上,这意味着硬件是不可靠的,为了保证容错性,HDFS 提供了副本机制。HDFS 将文件分解为若干 Block,Block 是 HDFS 最小存储单元,每个 Block 有多个副本。

HDFS 的默认副本数为 3,更多的副本意味着更高的数据安全性,但同时也会带来更高的额外开销(存储成本和带宽成本)。3 个副本是在保障数据可靠性和系统成本之间的一个较好的平衡点。

副本数可以通过以下方式修改:

  • 在 HDFS 的配置文件 hdfs-site.xml 中,有一个名为 dfs.replication 的属性,可以设置全局的默认副本数。修改这个值后,需要重启 HDFS 使配置生效。
  • 针对单个文件或目录修改副本数:如果只想改变某个特定文件或目录的副本数,而不影响整个系统的默认设置,可以使用 HDFS 的命令行工具。例如,使用命令hdfs dfs -setrep -w <副本数> <文件/目录路径> 来修改特定文件或目录的副本数。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告 (BlockReport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

副本分布策略

副本分布策略是 HDFS 可靠性和性能的关键。优化的副本存放策略是 HDFS 区分于其他大部分分布式文件系统的重要特性。HDFS 采用一种称为机架感知 (rack-aware) 的策略来改进数据的可靠性、可用性和网络带宽的利用率。大型 HDFS 实例一般运行在跨越多个机架的计算机组成的集群上,不同机架上的两台机器之间的通信需要经过交换机。在大多数情况下,同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大。

通过一个机架感知的过程,NameNode 可以确定每个 DataNode 所属的机架 id。一个简单但没有优化的策略就是将副本存放在不同的机架上。这样可以有效防止当整个机架失效时数据的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集群中,有利于当组件失效情况下的负载均衡。但是,因为这种策略的一个写操作需要传输数据块到多个机架,这增加了写的代价。

HDFS 默认的副本数为 3,此时 HDFS 的副本分布策略是:

  • 副本 1 - 放在 Client 所在节点;对于远程 Client,系统会随机选择节点
  • 副本 2 - 放在不同机架的节点上
  • 副本 3 - 放在与第二个副本同一机架的不同节点上
  • 副本 N - 在满足以下条件的节点中随机选择
    • 每个节点只存储一份副本
    • 每个机架最多存储两份副本
  • 优选 - 同等条件下优先选择空闲节点。
    • 如果某个 DataNode 节点上的空闲空间低于特定的临界点,按照均衡策略系统就会自动地将数据从这个 DataNode 移动到其他空闲的 DataNode。

副本选择

为了降低整体的带宽消耗和读取延时,HDFS 会尽量让客户端程序读取离它最近的副本。如果在客户端程序的同一个机架上有一个副本,那么就读取该副本。如果一个 HDFS 集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。

为了最大限度地减少带宽消耗和读取延迟,HDFS 在执行读取请求时,优先读取距离读取器最近的副本。如果在与读取器节点相同的机架上存在副本,则优先选择该副本。如果 HDFS 群集跨越多个数据中心,则优先选择本地数据中心上的副本。

:::

【中级】HDFS 如何保证数据一致性?

:::details 要点

HDFS 的数据一致性主要依赖以下机制来保证:

  • NameNode 的中心化管理 - NameNode 在 HDFS 中负责存储整个文件系统的元数据,包括文件和目录的结构、每个文件的数据块信息及其在 DataNode 上的位置等。这种中心化的管理,使得文件系统的组织和管理变得更加简洁高效,并且可以确保整个文件系统的一致性。
  • 数据块的复制(Replication) - HDFS 采用副本来保证数据的可靠性。一旦数据写入完成,副本就会分散存储在不同的 DataNodes 上。尽管这种方法不是强一致性模型,但通过足够数量的副本和及时的副本替换策略,HDFS 能够提供较高水平的数据一致性和可靠性。
  • 写入和复制的原子性保证 - 在 HDFS 中,文件一旦创建,其内容就不能被更新,只能被追加或重写。这种方式简化了并发控制,因为写操作在文件级别上是原子的。在复制数据块时,HDFS 保证原子性复制,即一个数据块的所有副本在任何时间点上都是相同的。如果复制过程中出现错误,那么不完整的副本会被删除,系统会重新尝试复制直到成功。
  • 客户端的一致性协议 - 客户端在与 HDFS 交互时,遵循特定的协议。例如,客户端在完成文件写入之后,需要向 NameNode 通知,以确保 NameNode 更新文件的元数据。这样可以保证 NameNode 的元数据与实际存储的数据保持一致。
  • 定期检查和错误恢复
    • 心跳和健康检查 - DataNodes 定期向 NameNode 发送心跳和 Block 健康状况报告。NameNode 利用这些信息来检查和维护系统的整体一致性。例如,如果某个 DataNode 失败,NameNode 会重新组织数据块的副本。
    • 校验 - HDFS 在存储和传输数据时,会计算数据的校验和。在读取数据时,会验证这些校验和,确保数据的完整性。

通过这些机制,HDFS 确保了系统中的数据在正常操作和故障情况下的一致性和可靠性。虽然 HDFS 不提供像传统数据库那样的强一致性保证,但它的设计和实现确保了在大规模数据处理场景中的有效性和健壮性。

:::

容错

【中级】HDFS 有哪些故障类型?如何检测故障?

:::details 要点

HDFS 常见故障及检测方法:

  • 节点故障
    • DataNode 每 3 秒向 NameNode 发送心跳
    • 超时未收到心跳,NameNode 判定 DataNode 宕机
  • 通信故障
    • 客户端请求 DataNode 会收到 ACK
  • 数据损坏
    • 磁盘介质在存储过程中受环境或者老化影响,其存储的数据可能会出现错乱。HDFS 的应对措施是,对于存储在 DataNode 上的数据块,计算并存储校验和(CheckSum)。在读取数据的时候,重新计算读取出来的数据的校验和,如果校验不正确就抛出异常,应用程序捕获异常后就到其他 DataNode 上读取备份数据。
    • 如果 DataNode 监测到本机的某块磁盘损坏,就将该块磁盘上存储的所有 BlockID 报告给 NameNode,NameNode 检查这些数据块还在哪些 DataNode 上有备份,通知相应的 DataNode 服务器将对应的数据块复制到其他服务器上,以保证数据块的备份数满足要求。

:::

【中级】HDFS 读写故障如何处理?

:::details 要点

写入故障处理

  • 写入数据通过数据包传输
  • DataNode 接收数据后,返回 ACK
  • 如果客户端没有收到 ACK,就判定 DataNode 宕机,跳过节点
  • 没有充分备份的数据块信息通知到 NameNode

读取故障处理

  • 读数据先要通过 NameNode 寻址该数据块的所有 DataNode
  • 如果某 DataNode 宕机,则读取其他节点

:::

【中级】DataNode 故障如何处理?

:::details 要点

DataNode 每 3 秒会向 NameNode 发送心跳消息,以证明自身正常工作。如果 DataNode 超时未发送心跳,NameNode 就会认为该 DataNode 已经宕机。

NameNode 会立即查找该 DataNode 上存储的数据块有哪些,以及这些数据块还存储在哪些其他 DataNode 上。

随后,NameNode 通知这些 DataNode 再复制一份数据块到其他 DataNode 上,保证 HDFS 存储的数据块副本数符合配置数。即使再出现服务器宕机,也不会丢失数据。

:::

【中级】NameNode 故障如何处理?

:::details 要点

NameNode 是整个 HDFS 的核心,记录着 HDFS 文件分配表信息,所有的文件路径和数据块存储信息都保存在 NameNode,如果 NameNode 故障,整个 HDFS 系统集群都无法使用。如果 NameNode 上记录的数据丢失,整个集群所有 DataNode 存储的数据也就没用了。

NameNode 通过主备架构实现故障转移。

  • Active NameNode - 是正在工作的 NameNode;
  • Standby NameNode - 是备份的 NameNode。

Active NameNode 宕机后,Standby NameNode 快速升级为新的 Active NameNode。Standby NameNode 周期性同步 edits 编辑日志,定期合并 FsImageedits 到本地磁盘。

注:Hadoop 3.0 允许配置多个 Standby NameNode。

元数据文件

  • edits(编辑日志文件) - 保存了自最新检查点(Checkpoint)之后的所有文件更新操作。
  • FsImage(元数据检查点镜像文件) - 保存了文件系统中所有的目录和文件信息,如:某个目录下有哪些子目录和文件,以及文件名、文件副本数、文件由哪些 Block 组成等。

Active NameNode 内存中有一份最新的元数据(= FsImage + edits)。

Standby NameNode 在检查点定期将内存中的元数据保存到 FsImage 文件中。

利用 QJM 实现元数据高可用

基于 Paxos 算法

QJM 机制(Quorum Journal Manager)

只要保证 Quorum(法定人数)数量的操作成功,就认为这是一次最终成功的操作

QJM 共享存储系统

  • 部署奇数(2N+1)个 JournalNode
  • JournalNode 负责存储 edits 编辑日志
  • 写 edits 的时候,只要超过半数(N+1)的 JournalNode 返回成功,就代表本次写入成功
  • 最多可容忍 N 个 JournalNode 宕机

利用 ZooKeeper 实现 Active 节点选举。

:::

【中级】HDFS 安全模式有什么作用?

:::details 要点

在启动过程中,NameNode 进入安全模式。在这个模式下,它会检查数据块的健康状况和副本数量。只有在足够数量的数据块可用时,NameNode 才会退出安全模式,开始正常的操作。

:::

HA

【高级】HDFS 如何实现高可用?

:::details 要点

HDFS 高可用架构如下:

HDFS 高可用架构主要由以下组件所构成:

  • Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务。
  • 主备切换控制器 ZKFailoverController:ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换,当然 NameNode 目前也支持不依赖于 Zookeeper 的手动主备切换。
  • Zookeeper 集群:为主备切换控制器提供主备选举支持。
  • 共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。主 NameNode 和 NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务。
  • DataNode 节点:除了通过共享存储系统共享 HDFS 的元数据信息之外,主 NameNode 和备 NameNode 还需要共享 HDFS 的数据块和 DataNode 之间的映射关系。DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。

目前 Hadoop 支持使用 Quorum Journal Manager (QJM) 或 Network File System (NFS) 作为共享的存储系统,这里以 QJM 集群为例进行说明:Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog,当 Active NameNode 宕机后, Standby NameNode 在确认元数据完全同步之后就可以对外提供服务。

需要说明的是向 JournalNode 集群写入 EditLog 是遵循 “过半写入则成功” 的策略,所以你至少要有 3 个 JournalNode 节点,当然你也可以继续增加节点数量,但是应该保证节点总数是奇数。同时如果有 2N+1 台 JournalNode,那么根据过半写的原则,最多可以容忍有 N 台 JournalNode 节点挂掉。

:::

【高级】NameNode 如何实现主备切换?

:::details 要点

NameNode 实现主备切换的流程下图所示:

工作流程说明:

  1. HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
  2. HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
  3. 如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
  4. ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
  5. ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
  6. ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。

主备选举过程:

NameNode 在选举成功后,会在 zk 上创建了一个 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点,而没有选举成功的备 NameNode 会监控这个节点,通过 Watcher 来监听这个节点的状态变化事件,ZKFC 的 ActiveStandbyElector 主要关注这个节点的 NodeDeleted 事件(这部分实现跟 Kafka 中 Controller 的选举一样)。

如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时, ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock,这样处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件。收到这个事件之后,会马上再次进入到创建 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点的流程,如果创建成功,这个本来处于 Standby 状态的 NameNode 就选举为主 NameNode 并随后开始切换为 Active 状态。

当然,如果是 Active 状态的 NameNode 所在的机器整个宕掉的话,那么根据 Zookeeper 的临时节点特性,/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换。

:::

【高级】如何应对 HDFS 脑裂问题?

:::details 要点

在实际中,NameNode 可能会出现这种情况,NameNode 在垃圾回收(GC)时,可能会在长时间内整个系统无响应,因此,也就无法向 zk 写入心跳信息,这样的话可能会导致临时节点掉线,备 NameNode 会切换到 Active 状态,这种情况,可能会导致整个集群会有同时有两个 NameNode,这就是脑裂问题。

脑裂问题的解决方案是隔离(Fencing),主要是在以下三处采用隔离措施:

  • 第三方共享存储:任一时刻,只有一个 NN 可以写入;
  • DataNode:需要保证只有一个 NN 发出与管理数据副本有关的删除命令;
  • Client:需要保证同一时刻只有一个 NN 能够对 Client 的请求发出正确的响应。

关于这个问题目前解决方案的实现如下:

  • ActiveStandbyElector 为了实现隔离,会在成功创建 Zookeeper 节点 hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 从而成为 Active NameNode 之后,创建另外一个路径为 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 的持久节点,这个节点里面保存了这个 Active NameNode 的地址信息;
  • Active NameNode 的 ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候,会一起删除这个持久节点;
  • 但如果 ActiveStandbyElector 在异常的状态下 Zookeeper Session 关闭 (比如前述的 Zookeeper 假死),那么由于 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来,后面当另一个 NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行 fencing。

在进行隔离的时候,会执行以下的操作:

首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态; 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施。

Hadoop 目前主要提供两种隔离措施,通常会选择第一种:sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死; shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离。 只有在成功地执行完成 fencing 之后,选主成功的 ActiveStandbyElector 才会回调 ZKFailoverController 的 becomeActive 方法将对应的 NameNode 转换为 Active 状态,开始对外提供服务。

NameNode 选举的实现机制与 Kafka 的 Controller 类似,那么 Kafka 是如何避免脑裂问题的呢?

Controller 给 Broker 发送的请求中,都会携带 controller epoch 信息,如果 broker 发现当前请求的 epoch 小于缓存中的值,那么就证明这是来自旧 Controller 的请求,就会决绝这个请求,正常情况下是没什么问题的; 但是异常情况下呢?如果 Broker 先收到异常 Controller 的请求进行处理呢?现在看 Kafka 在这一部分并没有适合的方案; 正常情况下,Kafka 新的 Controller 选举出来之后,Controller 会向全局所有 broker 发送一个 metadata 请求,这样全局所有 Broker 都可以知道当前最新的 controller epoch,但是并不能保证可以完全避免上面这个问题,还是有出现这个问题的几率的,只不过非常小,而且即使出现了由于 Kafka 的高可靠架构,影响也非常有限,至少从目前看,这个问题并不是严重的问题。

通过标识每次选举的版本号,并以最新版本选举结果为准,是分布式选举避免脑裂的常见做法。在其他分布式系统中,epoch 可能会被称为 term、version 等。

:::

【高级】YARN 如何实现高可用?

:::details 要点

YARN ResourceManager 的高可用与 HDFS NameNode 的高可用类似,但是 ResourceManager 不像 NameNode ,没有那么多的元数据信息需要维护,所以它的状态信息可以直接写到 Zookeeper 上,并依赖 Zookeeper 来进行主备选举。

:::

参考资料

Hive 常用 DDL 操作

Database

查看数据列表

1
show databases;

使用数据库

1
USE database_name;

新建数据库

语法:

1
2
3
4
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMA 是等价的
[COMMENT database_comment] --数据库注释
[LOCATION hdfs_path] --存储在 HDFS 上的位置
[WITH DBPROPERTIES (property_name=property_value, ...)]; --指定额外属性

示例:

1
2
3
CREATE DATABASE IF NOT EXISTS hive_test
COMMENT 'hive database for test'
WITH DBPROPERTIES ('create'='heibaiying');

查看数据库信息

语法:

1
DESC DATABASE [EXTENDED] db_name; --EXTENDED 表示是否显示额外属性

示例:

1
DESC DATABASE  EXTENDED hive_test;

删除数据库

语法:

1
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
  • 默认行为是 RESTRICT,如果数据库中存在表则删除失败。
  • 要想删除库及其中的表,可以使用 CASCADE 级联删除。

示例:

1
DROP DATABASE IF EXISTS hive_test CASCADE;

创建表

建表语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --表名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列数据类型
[COMMENT table_comment] --表描述
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --分区表分区规则
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --分桶表分桶规则
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --指定倾斜列和值
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 指定行分隔符、存储文件格式或采用自定义存储格式
[LOCATION hdfs_path] -- 指定表的存储位置
[TBLPROPERTIES (property_name=property_value, ...)] --指定表的属性
[AS select_statement]; --从查询结果创建表

内部表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

外部表

1
2
3
4
5
6
7
8
9
10
11
CREATE EXTERNAL TABLE emp_external(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_external';

使用 desc format emp_external 命令可以查看表的详细信息如下:

分区表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

分桶表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

倾斜表

通过指定一个或者多个列经常出现的值(严重偏斜),Hive 会自动将涉及到这些值的数据拆分为单独的文件。在查询时,如果涉及到倾斜值,它就直接从独立文件中获取数据,而不是扫描所有文件,这使得性能得到提升。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_skewed(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
SKEWED BY (empno) ON (66,88,100) --指定 empno 的倾斜值 66,88,100
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_skewed';

临时表

临时表仅对当前 session 可见,临时表的数据将存储在用户的暂存目录中,并在会话结束后删除。如果临时表与永久表表名相同,则对该表名的任何引用都将解析为临时表,而不是永久表。临时表还具有以下两个限制:

  • 不支持分区列;
  • 不支持创建索引。
1
2
3
4
5
6
7
8
9
10
CREATE TEMPORARY TABLE emp_temp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

CTAS 创建表

支持从查询语句的结果创建表:

1
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

复制表结构

语法:

1
2
3
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name  --创建表表名
LIKE existing_table_or_view_name --被复制表的表名
[LOCATION hdfs_path]; --存储位置

示例:

1
CREATE TEMPORARY EXTERNAL TABLE  IF NOT EXISTS  emp_co  LIKE emp

加载数据到表

加载数据到表中属于 DML 操作,这里为了方便大家测试,先简单介绍一下加载本地数据到表中:

1
2
-- 加载数据到 emp 表中
load data local inpath "/usr/file/emp.txt" into table emp;

其中 emp.txt 的内容如下,你可以直接复制使用,也可以到本仓库的resources 目录下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369	SMITH	CLERK	7902	1980-12-17 00:00:00	800.00		20
7499 ALLEN SALESMAN 7698 1981-02-20 00:00:00 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-02-22 00:00:00 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-04-02 00:00:00 2975.00 20
7654 MARTIN SALESMAN 7698 1981-09-28 00:00:00 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850.00 30
7782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450.00 10
7788 SCOTT ANALYST 7566 1987-04-19 00:00:00 1500.00 20
7839 KING PRESIDENT 1981-11-17 00:00:00 5000.00 10
7844 TURNER SALESMAN 7698 1981-09-08 00:00:00 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-05-23 00:00:00 1100.00 20
7900 JAMES CLERK 7698 1981-12-03 00:00:00 950.00 30
7902 FORD ANALYST 7566 1981-12-03 00:00:00 3000.00 20
7934 MILLER CLERK 7782 1982-01-23 00:00:00 1300.00 10

加载后可查询表中数据

修改表

重命名表

语法:

1
ALTER TABLE table_name RENAME TO new_table_name;

示例:

1
ALTER TABLE emp_temp RENAME TO new_emp; --把 emp_temp 表重命名为 new_emp

修改列

语法:

1
2
ALTER TABLE table_name [PARTITION partition_spec] CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];

示例:

1
2
3
4
5
6
7
8
-- 修改字段名和类型
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- 修改字段 sal 的名称 并将其放置到 empno 字段后
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;

-- 为字段增加注释
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';

新增列

示例:

1
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');

清空表/删除表

清空表 hive-ddl.md

语法:

1
2
-- 清空整个表或表指定分区中的数据
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
  • 目前只有内部表才能执行 TRUNCATE 操作,外部表执行时会抛出异常 Cannot truncate non-managed table XXXX

示例:

1
TRUNCATE TABLE emp_mgt_ptn PARTITION (deptno=20);

删除表

语法:

1
DROP TABLE [IF EXISTS] table_name [PURGE];
  • 内部表:不仅会删除表的元数据,同时会删除 HDFS 上的数据;
  • 外部表:只会删除表的元数据,不会删除 HDFS 上的数据;
  • 删除视图引用的表时,不会给出警告(但视图已经无效了,必须由用户删除或重新创建)。

其他命令

Describe

查看数据库:

1
DESCRIBE|Desc DATABASE [EXTENDED] db_name;  --EXTENDED 是否显示额外属性

查看表:

1
DESCRIBE|Desc [EXTENDED|FORMATTED] table_name --FORMATTED 以友好的展现方式查看表详情

Show

1. 查看数据库列表

1
2
3
4
5
-- 语法
SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];

-- 示例:
SHOW DATABASES like 'hive*';

LIKE 子句允许使用正则表达式进行过滤,但是 SHOW 语句当中的 LIKE 子句只支持 *(通配符)和 |(条件或)两个符号。例如 employeesemp *emp * | * ees,所有这些都将匹配名为 employees 的数据库。

2. 查看表的列表

1
2
3
4
5
-- 语法
SHOW TABLES [IN database_name] ['identifier_with_wildcards'];

-- 示例
SHOW TABLES IN default;

3. 查看视图列表

1
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --仅支持 Hive 2.2.0

4. 查看表的分区列表

1
SHOW PARTITIONS table_name;

5. 查看表/视图的创建语句

1
SHOW CREATE TABLE ([db_name.]table_name|view_name);

参考资料

Hive 常用 DML 操作

加载文件数据到表

语法

1
2
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]
INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  • LOCAL 关键字代表从本地文件系统加载文件,省略则代表从 HDFS 上加载文件:
  • 从本地文件系统加载文件时, filepath 可以是绝对路径也可以是相对路径 (建议使用绝对路径);
  • 从 HDFS 加载文件时候,filepath 为文件完整的 URL 地址:如 hdfs://namenode:port/user/hive/project/ data1
  • filepath 可以是文件路径 (在这种情况下 Hive 会将文件移动到表中),也可以目录路径 (在这种情况下,Hive 会将该目录中的所有文件移动到表中);
  • 如果使用 OVERWRITE 关键字,则将删除目标表(或分区)的内容,使用新的数据填充;不使用此关键字,则数据以追加的方式加入;
  • 加载的目标可以是表或分区。如果是分区表,则必须指定加载数据的分区;
  • 加载文件的格式必须与建表时使用 STORED AS 指定的存储格式相同。

使用建议:

不论是本地路径还是 URL 都建议使用完整的。虽然可以使用不完整的 URL 地址,此时 Hive 将使用 hadoop 中的 fs.default.name 配置来推断地址,但是为避免不必要的错误,建议使用完整的本地路径或 URL 地址;

加载对象是分区表时建议显示指定分区。在 Hive 3.0 之后,内部将加载 (LOAD) 重写为 INSERT AS SELECT,此时如果不指定分区,INSERT AS SELECT 将假设最后一组列是分区列,如果该列不是表定义的分区,它将抛出错误。为避免错误,还是建议显示指定分区。

示例

新建分区表:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE emp_ptn(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

从 HDFS 上加载数据到分区表:

1
LOAD DATA  INPATH "hdfs://hadoop001:8020/mydir/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=20);

emp.txt 文件可在本仓库的 resources 目录中下载

加载后表中数据如下,分区列 deptno 全部赋值成 20:

查询结果插入到表

语法

1
2
3
4
5
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]]
select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)]
select_statement1 FROM from_statement;
  • Hive 0.13.0 开始,建表时可以通过使用 TBLPROPERTIES(“immutable”=“true”)来创建不可变表 (immutable table) ,如果不可以变表中存在数据,则 INSERT INTO 失败。(注:INSERT OVERWRITE 的语句不受 immutable 属性的影响);

  • 可以对表或分区执行插入操作。如果表已分区,则必须通过指定所有分区列的值来指定表的特定分区;

  • 从 Hive 1.1.0 开始,TABLE 关键字是可选的;

  • 从 Hive 1.2.0 开始 ,可以采用 INSERT INTO tablename(z,x,c1) 指明插入列;

  • 可以将 SELECT 语句的查询结果插入多个表(或分区),称为多表插入。语法如下:

    1
    2
    3
    4
    5
    FROM from_statement
    INSERT OVERWRITE TABLE tablename1
    [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
    [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
    [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

动态插入分区

1
2
3
4
5
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

在向分区表插入数据时候,分区列名是必须的,但是列值是可选的。如果给出了分区列值,我们将其称为静态分区,否则它是动态分区。动态分区列必须在 SELECT 语句的列中最后指定,并且与它们在 PARTITION() 子句中出现的顺序相同。

注意:Hive 0.9.0 之前的版本动态分区插入是默认禁用的,而 0.9.0 之后的版本则默认启用。以下是动态分区的相关配置:

配置 默认值 说明
hive.exec.dynamic.partition true 需要设置为 true 才能启用动态分区插入
hive.exec.dynamic.partition.mode strict 在严格模式 (strict) 下,用户必须至少指定一个静态分区,以防用户意外覆盖所有分区,在非严格模式下,允许所有分区都是动态的
hive.exec.max.dynamic.partitions.pernode 100 允许在每个 mapper/reducer 节点中创建的最大动态分区数
hive.exec.max.dynamic.partitions 1000 允许总共创建的最大动态分区数
hive.exec.max.created.files 100000 作业中所有 mapper/reducer 创建的 HDFS 文件的最大数量
hive.error.on.empty.partition false 如果动态分区插入生成空结果,是否抛出异常

示例

(1)新建 emp 表,作为查询对象表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

-- 加载数据到 emp 表中 这里直接从本地加载
load data local inpath "/usr/file/emp.txt" into table emp;

完成后 emp 表中数据如下:

(2)为清晰演示,先清空 emp_ptn 表中加载的数据:

1
TRUNCATE TABLE emp_ptn;

(3)静态分区演示:从 emp 表中查询部门编号为 20 的员工数据,并插入 emp_ptn 表中,语句如下:

1
2
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno=20)
SELECT empno,ename,job,mgr,hiredate,sal,comm FROM emp WHERE deptno=20;

完成后 emp_ptn 表中数据如下:

(4)接着演示动态分区:

1
2
3
4
5
6
-- 由于我们只有一个分区,且还是动态分区,所以需要关闭严格默认。因为在严格模式下,用户必须至少指定一个静态分区
set hive.exec.dynamic.partition.mode=nonstrict;

-- 动态分区 此时查询语句的最后一列为动态分区列,即 deptno
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno)
SELECT empno,ename,job,mgr,hiredate,sal,comm,deptno FROM emp WHERE deptno=30;

完成后 emp_ptn 表中数据如下:

使用 SQL 语句插入值

1
2
INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2] ...)]
VALUES ( value [, value ...] )
  • 使用时必须为表中的每个列都提供值。不支持只向部分列插入值(可以为缺省值的列提供空值来消除这个弊端);
  • 如果目标表表支持 ACID 及其事务管理器,则插入后自动提交;
  • 不支持支持复杂类型 (array, map, struct, union) 的插入。

更新和删除数据

语法

更新和删除的语法比较简单,和关系型数据库一致。需要注意的是这两个操作都只能在支持 ACID 的表,也就是事务表上才能执行。

1
2
3
4
5
-- 更新
UPDATE tablename SET column = value [, column = value ...] [WHERE expression]

--删除
DELETE FROM tablename [WHERE expression]

示例

1. 修改配置

首先需要更改 hive-site.xml,添加如下配置,开启事务支持,配置完成后需要重启 Hive 服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>

2. 创建测试表

创建用于测试的事务表,建表时候指定属性 transactional = true 则代表该表是事务表。需要注意的是,按照官方文档 的说明,目前 Hive 中的事务表有以下限制:

  • 必须是 buckets Table;
  • 仅支持 ORC 文件格式;
  • 不支持 LOAD DATA …语句。
1
2
3
4
5
6
CREATE TABLE emp_ts(
empno int,
ename String
)
CLUSTERED BY (empno) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true");

3. 插入测试数据

1
INSERT INTO TABLE emp_ts  VALUES (1,"ming"),(2,"hong");

插入数据依靠的是 MapReduce 作业,执行成功后数据如下:

4. 测试更新和删除

1
2
3
4
5
--更新数据
UPDATE emp_ts SET ename = "lan" WHERE empno=1;

--删除数据
DELETE FROM emp_ts WHERE empno=2;

更新和删除数据依靠的也是 MapReduce 作业,执行成功后数据如下:

查询结果写出到文件系统

语法

1
2
3
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...
  • OVERWRITE 关键字表示输出文件存在时,先删除后再重新写入;

  • 和 Load 语句一样,建议无论是本地路径还是 URL 地址都使用完整的;

  • 写入文件系统的数据被序列化为文本,其中列默认由^A 分隔,行由换行符分隔。如果列不是基本类型,则将其序列化为 JSON 格式。其中行分隔符不允许自定义,但列分隔符可以自定义,如下:

    1
    2
    3
    4
    5
    6
    7
    -- 定义列分隔符为'\t'
    insert overwrite local directory './test-04'
    row format delimited
    FIELDS TERMINATED BY '\t'
    COLLECTION ITEMS TERMINATED BY ','
    MAP KEYS TERMINATED BY ':'
    select * from src;

示例

这里我们将上面创建的 emp_ptn 表导出到本地文件系统,语句如下:

1
2
3
4
INSERT OVERWRITE LOCAL DIRECTORY '/usr/file/ouput'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
SELECT * FROM emp_ptn;

导出结果如下:

参考资料