Dunwu Blog

大道至简,知易行难

Kafka 运维

环境要求:

  • JDK8
  • ZooKeeper

Kafka 单点部署

下载解压

进入官方下载地址:http://kafka.apache.org/downloads,选择合适版本。

解压到本地:

1
2
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

现在您已经在您的机器上下载了最新版本的 Kafka。

启动服务器

由于 Kafka 依赖于 ZooKeeper,所以运行前需要先启动 ZooKeeper

1
2
3
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

然后,启动 Kafka

1
2
3
4
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

停止服务器

执行所有操作后,可以使用以下命令停止服务器

1
bin/kafka-server-stop.sh config/server.properties

Kafka 集群部署

修改配置

复制配置为多份(Windows 使用 copy 命令代理):

1
2
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改配置:

1
2
3
4
5
6
7
8
9
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

其中,broker.id 这个参数必须是唯一的。

端口故意配置的不一致,是为了可以在一台机器启动多个应用节点。

启动

根据这两份配置启动三个服务器节点:

1
2
3
4
5
6
$ bin/kafka-server-start.sh config/server.properties &
...
$ bin/kafka-server-start.sh config/server-1.properties &
...
$ bin/kafka-server-start.sh config/server-2.properties &
...

创建一个新的 Topic 使用 三个备份:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看主题:

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  • leader - 负责指定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • replicas - 是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
  • isr - 是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。

Kafka 命令

主题(Topic)

创建 Topic

1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic

查看 Topic 列表

1
kafka-topics --list --zookeeper localhost:2181

添加 Partition

1
kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

删除 Topic

1
kafka-topics --zookeeper localhost:2181 --delete --topic my-topic

查看 Topic 详细信息

1
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe

查看备份分区

1
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions

生产者(Producers)

通过控制台输入生产消息

1
kafka-console-producer --broker-list localhost:9092 --topic my-topic

通过文件输入生产消息

1
kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt

通过控制台输入 Avro 生产消息

1
kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081

然后,可以选择输入部分 json key:

1
{ "f1": "value1" }

生成消息性能测试

1
kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"

消费者(Consumers)

消费所有未消费的消息

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

消费一条消息

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic  --max-messages 1

从指定的 offset 消费一条消息

从指定的 offset __consumer_offsets 消费一条消息:

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1

从指定 Group 消费消息

1
kafka-console-consumer --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group

消费 avro 消息

1
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10
1
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081

查看消费者 Group 列表

1
kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092

查看消费者 Group 详细信息

1
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup

配置(Config)

设置 Topic 的保留时间

1
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

查看 Topic 的所有配置

1
kafka-configs --zookeeper localhost:2181 --describe --entity-type topics --entity-name my-topic

修改 Topic 的配置

1
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

ACL

查看指定 Topic 的 ACL

1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic topicA

添加 ACL

1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic topicA --group groupA
1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic topicA

ZooKeeper

1
zookeeper-shell localhost:2182 ls /

Kafka 工具

Kafka 核心配置

Broker 级别配置

存储配置

首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:

  • log.dirs:指定了 Broker 需要使用的若干个文件目录路径。这个参数是没有默认值的,必须由使用者亲自指定。
  • log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

log.dirs 具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

zookeeper 配置

Kafka 与 ZooKeeper 相关的最重要的参数当属 zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。

现在问题来了,如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

Broker 连接配置

  • listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

我们具体说说监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092

最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。

Topic 管理

  • auto.create.topics.enable:是否允许自动创建 Topic。一般设为 false,由运维把控创建 Topic。
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。
  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。

第二个参数unclean.leader.election.enable是关闭 Unclean Leader 选举的。何谓 Unclean?还记得 Kafka 有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。

那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。

好了,现在出现这种情况了:假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader 选举了?此时这个参数就派上用场了。

如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。

这个参数在最新版的 Kafka 中默认就是 false,本来不需要我特意提的,但是比较搞笑的是社区对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的 Kafka,所以建议你还是显式地把它设置成 false 吧。

第三个参数auto.leader.rebalance.enable的影响貌似没什么人提,但其实对生产环境影响非常大。设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。

你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

数据留存

  • log.retention.{hour|minutes|ms}:都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。
  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
  • message.max.bytes:控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。

Topic 级别配置

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

操作系统参数

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

其次是文件系统类型的选择。这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。

第三是 swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

最后是提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

Kafka 集群规划

操作系统

部署生产环境的 Kafka,强烈建议操作系统选用 Linux。

在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。

Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。

磁盘

Kafka 集群部署选择普通的机械磁盘还是固态硬盘?前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。

结论是:使用普通机械硬盘即可

Kafka 采用顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。

带宽

大部分公司使用普通的以太网络,千兆网络(1Gbps)应该是网络的标准配置。

通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其他应用或进程留一些资源。此外,通常要再额外预留出 2/3 的资源,因为不能让带宽资源总是保持在峰值。

基于以上原因,一个 Kafka 集群数量的大致推算公式如下:

1
Kafka 机器数 = 单位时间需要处理的总数据量 / 单机所占用带宽

参考资料

ZooKeeper 原理

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。

ZooKeeper 简介

ZooKeeper 是什么

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 的应用场景

  • 配置管理
    • 集群节点可以通过中心源获取启动配置
    • 更简单的部署
  • 分布式集群管理
    • 节点加入/离开
    • 节点的实时状态
  • 命名服务,如:DNS
  • 分布式同步:如锁、栅栏、队列
  • 分布式系统的选主
  • 中心化和高可靠的数据注册

ZooKeeper 的特性

ZooKeeper 具有以下特性:

  • 顺序一致性:所有客户端看到的服务端数据模型都是一致的;从一个客户端发起的事务请求,最终都会严格按照其发起顺序被应用到 ZooKeeper 中。具体的实现可见:原子广播
  • 原子性 - 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,即整个集群要么都成功应用了某个事务,要么都没有应用。 实现方式可见:事务
  • 单一视图 - 无论客户端连接的是哪个 Zookeeper 服务器,其看到的服务端数据模型都是一致的。
  • 高性能 - ZooKeeper 将数据全量存储在内存中,所以其性能很高。需要注意的是:由于 ZooKeeper 的所有更新和删除都是基于事务的,因此 ZooKeeper 在读多写少的应用场景中有性能表现较好,如果写操作频繁,性能会大大下滑
  • 高可用 - ZooKeeper 的高可用是基于副本机制实现的,此外 ZooKeeper 支持故障恢复,可见:选举 Leader

ZooKeeper 的设计目标

  • 简单的数据模型:ZooKeeper 的数据模型是一个树形结构的文件系统,树中的节点被称为 **znode**。
  • 可以构建集群:ZooKeeper 支持集群模式,可以通过伸缩性,来控制集群的吞吐量。需要注意的是:由于 ZooKeeper 采用一主多从架构,所以其写性能是有上限的,比较适合于读多写少的场景。
  • 顺序访问:对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增 ID,这个 ID 反映了所有事务请求的先后顺序。
  • 高性能、高可用:ZooKeeper 将数据存全量储在内存中以保持高性能,并通过服务集群来实现高可用,由于 Zookeeper 的所有更新和删除都是基于事务的,所以其在读多写少的应用场景中有着很高的性能表现。

ZooKeeper 核心概念

服务

Zookeeper 服务是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。

客户端只会连接一个 ZooKeeper 服务器节点,并维持 TCP 连接。

数据模型

ZooKeeper 的数据模型是一个树形结构的文件系统

树中的节点被称为 znode,其中根节点为 /,每个节点上都会保存自己的数据和节点信息。znode 可以用于存储数据,并且有一个与之相关联的 ACL(详情可见 ACL)。ZooKeeper 的设计目标是实现协调服务,而不是真的作为一个文件存储,因此 znode 存储数据的大小被限制在 1MB 以内

img

ZooKeeper 的数据访问具有原子性。其读写操作都是要么全部成功,要么全部失败。

znode 通过路径被引用。znode 节点路径必须是绝对路径

znode 有两种类型:

  • 临时的( EPHEMERAL - 户端会话结束时,ZooKeeper 就会删除临时的 znode。不允许有子节点。
  • 持久的(PERSISTENT - 除非客户端主动执行删除操作,否则 ZooKeeper 不会删除持久的 znode。

节点信息

znode 上有一个顺序标志( SEQUENTIAL。如果在创建 znode 时,设置了顺序标志( SEQUENTIAL,那么 ZooKeeper 会使用计数器为 znode 添加一个单调递增的数值,即 zxid。ZooKeeper 正是利用 zxid 实现了严格的顺序访问控制能力。

每个 znode 节点在存储数据的同时,都会维护一个叫做 Stat 的数据结构,里面存储了关于该节点的全部状态信息。如下:

状态属性 说明
czxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mzxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pzxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
version 节点数据的更改次数
aversion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

集群角色

Zookeeper 集群是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。此外,每个服务器节点承担如下三种角色中的一种:

  • Leader - 它负责 发起并维护与各 Follwer 及 Observer 间的心跳。所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader。
  • Follower - 它会响应 Leader 的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。一个 Zookeeper 集群可能同时存在多个 Follower。
  • Observer - 角色与 Follower 类似,但是无投票权。

客户端可以从任意 ZooKeeper 服务器节点读取数据,但只能通过 Leader 服务写数据并需要半数以上 Follower 的 ACK,才算写入成功。记住这个重要的知识点,下文会详细讲述。

ACL

ZooKeeper 采用 ACL(Access Control Lists)策略来进行权限控制

每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。

ACL 依赖于 ZooKeeper 的客户端认证机制。ZooKeeper 提供了以下几种认证方式:

  • digest - 用户名和密码 来识别客户端
  • sasl - 通过 kerberos 来识别客户端
  • ip - 通过 IP 来识别客户端

ZooKeeper 定义了如下五种权限:

  • CREATE - 允许创建子节点;
  • READ - 允许从节点获取数据并列出其子节点;
  • WRITE - 允许为节点设置数据;
  • DELETE - 允许删除子节点;
  • ADMIN - 允许为节点设置权限。

ZooKeeper 工作原理

读操作

Leader/Follower/Observer 都可直接处理读请求,从本地内存中读取数据并返回给客户端即可

由于处理读请求不需要服务器之间的交互,Follower/Observer 越多,整体系统的读请求吞吐量越大,也即读性能越好。

img

写操作

所有的写请求实际上都要交给 Leader 处理。Leader 将写请求以事务形式发给所有 Follower 并等待 ACK,一旦收到半数以上 Follower 的 ACK,即认为写操作成功。

写 Leader

img

由上图可见,通过 Leader 进行写操作,主要分为五步:

  1. 客户端向 Leader 发起写请求
  2. Leader 将写请求以事务 Proposal 的形式发给所有 Follower 并等待 ACK
  3. Follower 收到 Leader 的事务 Proposal 后返回 ACK
  4. Leader 得到过半数的 ACK(Leader 对自己默认有一个 ACK)后向所有的 Follower 和 Observer 发送 Commmit
  5. Leader 将处理结果返回给客户端

注意

  • Leader 不需要得到 Observer 的 ACK,即 Observer 无投票权。
  • Leader 不需要得到所有 Follower 的 ACK,只要收到过半的 ACK 即可,同时 Leader 本身对自己有一个 ACK。上图中有 4 个 Follower,只需其中两个返回 ACK 即可,因为 $$(2+1) / (4+1) > 1/2$$ 。
  • Observer 虽然无投票权,但仍须同步 Leader 的数据从而在处理读请求时可以返回尽可能新的数据。

写 Follower/Observer

img

  • Follower/Observer 均可接受写请求,但不能直接处理,而需要将写请求转发给 Leader 处理。
  • 除了多了一步请求转发,其它流程与直接写 Leader 无任何区别。

事务

对于来自客户端的每个更新请求,ZooKeeper 具备严格的顺序访问控制能力。

为了保证事务的顺序一致性,ZooKeeper 采用了递增的事务 id 号(zxid)来标识事务

Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。

所有的提议(**proposal**)都在被提出的时候加上了 zxid。zxid 是一个 64 位的数字,它的高 32 位是 epoch 用来标识 Leader 关系是否改变,每次一个 Leader 被选出来,它都会有一个新的 epoch,标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。

详细过程如下:

  1. Leader 等待 Server 连接;
  2. Follower 连接 Leader,将最大的 zxid 发送给 Leader;
  3. Leader 根据 Follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

观察

ZooKeeper 允许客户端监听它关心的 znode,当 znode 状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端

客户端和服务端保持连接一般有两种形式:

  • 客户端向服务端不断轮询
  • 服务端向客户端推送状态

Zookeeper 的选择是服务端主动推送状态,也就是观察机制( Watch )。

ZooKeeper 的观察机制允许用户在指定节点上针对感兴趣的事件注册监听,当事件发生时,监听器会被触发,并将事件信息推送到客户端。

  • 监听器实时触发
  • 监听器总是有序的
  • 创建新的 znode 数据前,客户端就能收到监听事件。

客户端使用 getData 等接口获取 znode 状态时传入了一个用于处理节点变更的回调,那么服务端就会主动向客户端推送节点的变更:

1
public byte[] getData(final String path, Watcher watcher, Stat stat)

从这个方法中传入的 Watcher 对象实现了相应的 process 方法,每次对应节点出现了状态的改变,WatchManager 都会通过以下的方式调用传入 Watcher 的方法:

1
2
3
4
5
6
7
8
9
10
11
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
}
for (Watcher w : watchers) {
w.process(e);
}
return watchers;
}

Zookeeper 中的所有数据其实都是由一个名为 DataTree 的数据结构管理的,所有的读写数据的请求最终都会改变这颗树的内容,在发出读请求时可能会传入 Watcher 注册一个回调函数,而写请求就可能会触发相应的回调,由 WatchManager 通知客户端数据的变化。

通知机制的实现其实还是比较简单的,通过读请求设置 Watcher 监听事件,写请求在触发事件时就能将通知发送给指定的客户端。

会话

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。

每个 ZooKeeper 客户端配置中都配置了 ZooKeeper 服务器集群列表。启动时,客户端会遍历列表去尝试建立连接。如果失败,它会尝试连接下一个服务器,依次类推。

一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。每个会话都会有一个超时时间,若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 znode 都会被删除。

通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式(ping)来保持会话不过期。

img

ZooKeeper 的会话具有四个属性:

  • sessionID - 会话 ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper 都会为其分配一个全局唯一的 sessionID。
  • TimeOut - 会话超时时间,客户端在构造 Zookeeper 实例时,会配置 sessionTimeout 参数用于指定会话的超时时间,Zookeeper 客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。
  • TickTime - 下次会话超时时间点,为了便于 Zookeeper 对会话实行”分桶策略”管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper 会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上 TimeOut。
  • isClosing - 标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的 isClosing 标记为”已关闭”,这样就能确保不再处理来自该会话的心情求了。

Zookeeper 的会话管理主要是通过 SessionTracker 来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便 Zookeeper 对会话进行不同区块的隔离处理以及同一区块的统一处理。

ZAB 协议

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。Multi-Paxos 实现的是一系列值的共识,不关心最终达成共识的值是什么,不关心各值的顺序。而 ZooKeeper 需要确保操作的顺序性。

ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议

ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

选举 Leader

ZooKeeper 的故障恢复

ZooKeeper 集群采用一主(称为 Leader)多从(称为 Follower)模式,主从节点通过副本机制保证数据一致。

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)。
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zxid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

原子广播(Atomic Broadcast)

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

img

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

ZAB 是通过“一切以领导者为准”的强领导者模型和严格按照顺序提交日志,来实现操作的顺序性的,这一点和 Raft 是一样的。

ZooKeeper 应用

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

img

配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

img

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

  • 如果序列号是最小的,则成功获取到锁。
    • 释放锁:执行完操作后,把创建的节点给删掉。
  • 如果不是,则监听比自己要小 1 的节点变化。

img

(3)释放锁,即删除自己创建的节点。

img

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

集群管理

ZooKeeper 还能解决大多数分布式系统中的问题:

  • 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
  • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
  • 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度。
  • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Master/Salves),ZooKeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。

队列管理

ZooKeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 ZooKeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 /synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

ZooKeeper 的缺点

ZooKeeper 的监听是一次性的。

ZooKeeper 不是为高可用性设计的

生产环境中常常需要通过多机房部署来容灾。出于成本考虑,一般多机房都是同时提供服务的,即一个机房撑不住所有流量。ZooKeeper 集群只能有一个 Leader,一旦机房之间连接出现故障,那么只有 Leader 所在的机房可以正常工作,其他机房只能停摆。于是所有流量集中到 Leader 所在的机房,由于处理不过来而导致崩溃。

即使是在同一个机房里面,由于网段的不同,在调整机房交换机的时候偶尔也会发生网段隔离的情况。实际上机房每个月基本上都会发生短暂的网络隔离之类的子网段调整。在那个时刻 ZooKeeper 将处于不可用状态。如果业务系统重度依赖 ZooKeeper(比如用 Dubbo 作为 RPC,且使用 ZooKeeper 作为注册中心),则系统的可用性将非常脆弱。

由于 ZooKeeper 对于网络隔离的极度敏感,导致 ZooKeeper 对于网络的任何风吹草动都会做出激烈反应。这使得 ZooKeeper 的不可用时间比较多。我们不能让 ZooKeeper 的不可用,变成系统的不可用

ZooKeeper 的选举过程速度很慢

互联网环境中,网络不稳定几乎是必然的,而 ZooKeeper 网络隔离非常敏感。一旦出现网络隔离,zookeeper 就要发起选举流程。

ZooKeeper 的选举流程通常耗时 30 到 120 秒,期间 ZooKeeper 由于没有 Leader,都是不可用的。

对于网络里面偶尔出现的,比如半秒一秒的网络隔离,ZooKeeper 会由于选举过程,而把不可用时间放大几十倍。

ZooKeeper 的性能是有限的

典型的 ZooKeeper 的 TPS 大概是一万多,无法支撑每天动辄几十亿次的调用。因此,每次请求都去 ZooKeeper 获取业务系统信息是不可能的。

为此,ZooKeeper 的 client 必须自己缓存业务系统的信息。这就导致 ZooKeeper 提供的强一致性实际上是做不到的。如果我们需要强一致性,还需要其他机制来进行保障:比如用自动化脚本把业务系统的 old master 给 kill 掉,但是这可能会引发很多其他问题。

ZooKeeper 无法进行有效的权限控制

ZooKeeper 的权限控制非常弱。在大型的复杂系统里面,使用 ZooKeeper 必须自己再额外的开发一套权限控制系统,通过那套权限控制系统再访问 ZooKeeper。

额外的权限控制系统不但增加了系统复杂性和维护成本,而且降低了系统的总体性能。

即使有了 ZooKeeper 也很难避免业务系统的数据不一致

由于 ZooKeeper 的性能限制,我们无法让每次系统内部调用都走 ZooKeeper,因此总有某些时刻,业务系统会存在两份数据(业务系统 client 那边缓存的业务系统信息是定时从 ZooKeeper 更新的,因此会有更新不同步的问题)。

如果要保持数据的强一致性,唯一的方法是“先 kill 掉当前 Leader,再在 ZooKeeper 上更新 Leader 信息”。是否要 kill 掉当前 Leader 这个问题上,程序是无法完全自动决定的(因为网络隔离的时候 ZooKeeper 已经不可用了,自动脚本没有全局信息,不管怎么做都可能是错的,什么都不做也可能是错的。当网络故障的时候,只有运维人员才有全局信息,程序是无法得知其他机房的情况的)。因此系统无法自动的保障数据一致性,必须要人工介入。而人工介入的典型时间是半个小时以上,我们不能让系统这么长时间不可用。因此我们必须在某个方向上进行妥协,最常见的妥协方式是放弃强一致性,而接受最终一致性

如果我们需要人工介入才能保证可靠的强一致性,那么 ZooKeeper 的价值就大打折扣。

参考资料

ZooKeeper 运维指南

单点服务部署

在安装 ZooKeeper 之前,请确保你的系统是在以下任一操作系统上运行:

  • 任意 Linux OS - 支持开发和部署。适合演示应用程序。
  • Windows OS - 仅支持开发。
  • Mac OS - 仅支持开发。

安装步骤如下:

下载解压

进入官方下载地址:http://zookeeper.apache.org/releases.html#download ,选择合适版本。

解压到本地:

1
2
tar -zxf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6

环境变量

执行 vim /etc/profile,添加环境变量:

1
2
export ZOOKEEPER_HOME=/usr/app/zookeeper-3.4.14
export PATH=$ZOOKEEPER_HOME/bin:$PATH

再执行 source /etc/profile , 使得配置的环境变量生效。

修改配置

你必须创建 conf/zoo.cfg 文件,否则启动时会提示你没有此文件。

初次尝试,不妨直接使用 Kafka 提供的模板配置文件 conf/zoo_sample.cfg

1
cp conf/zoo_sample.cfg conf/zoo.cfg

修改后完整配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

配置参数说明:

  • tickTime:用于计算的基础时间单元。比如 session 超时:N*tickTime;
  • initLimit:用于集群,允许从节点连接并同步到 master 节点的初始化连接时间,以 tickTime 的倍数来表示;
  • syncLimit:用于集群, master 主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
  • dataDir:数据存储位置;
  • dataLogDir:日志目录;
  • clientPort:用于客户端连接的端口,默认 2181

启动服务

执行以下命令

1
bin/zkServer.sh start

执行此命令后,你将收到以下响应

1
2
3
JMX enabled by default
Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

停止服务

可以使用以下命令停止 zookeeper 服务器。

1
bin/zkServer.sh stop

集群服务部署

分布式系统节点数一般都要求是奇数,且最少为 3 个节点,Zookeeper 也不例外。

这里,规划一个含 3 个节点的最小 ZooKeeper 集群,主机名分别为 hadoop001,hadoop002,hadoop003 。

修改配置

修改配置文件 zoo.cfg,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/
dataLogDir=/usr/local/zookeeper-cluster/log/
clientPort=2181

# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
# 指名集群间通讯端口和选举端口
server.1=hadoop001:2287:3387
server.2=hadoop002:2287:3387
server.3=hadoop003:2287:3387

标识节点

分别在三台主机的 dataDir 目录下新建 myid 文件,并写入对应的节点标识。Zookeeper 集群通过 myid 文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出 Leader 节点。

创建存储目录:

1
2
# 三台主机均执行该命令
mkdir -vp /usr/local/zookeeper-cluster/data/

创建并写入节点标识到 myid 文件:

1
2
3
4
5
6
# hadoop001主机
echo "1" > /usr/local/zookeeper-cluster/data/myid
# hadoop002主机
echo "2" > /usr/local/zookeeper-cluster/data/myid
# hadoop003主机
echo "3" > /usr/local/zookeeper-cluster/data/myid

启动集群

分别在三台主机上,执行如下命令启动服务:

1
/usr/app/zookeeper-cluster/zookeeper/bin/zkServer.sh start

集群验证

启动后使用 zkServer.sh status 查看集群各个节点状态。

参考资料

HBase 命令

进入 HBase Shell 控制台:./bin/hbase shell

如果有 kerberos 认证,需要事先使用相应的 keytab 进行一下认证(使用 kinit 命令),认证成功之后再使用 hbase shell 进入可以使用 whoami 命令可查看当前用户.

基本命令

  • 获取帮助信息:help
  • 获取命令的详细帮助信息:help 'status'
  • 查看服务器状态:status
  • 查看版本信息:version
  • 查看当前登录用户:whoami

DDL

创建表

【语法】create '表名称','列族名称 1','列族名称 2','列名称 N'

【示例】

1
2
# 创建一张名为 test 的表,columnFamliy1、columnFamliy2 是 table1 表的列族。
create 'test','columnFamliy1','columnFamliy2'

启用、禁用表

  • 启用表:enable 'test'
  • 禁用表:disable 'test'
  • 检查表是否被启用:is_enabled 'test'
  • 检查表是否被禁用:is_disabled 'test'

删除表

注意:删除表前需要先禁用表

1
2
disable 'test'
drop 'test'

修改表

添加列族

命令格式: alter ‘表名’, ‘列族名’

1
alter 'test', 'teacherInfo'

删除列族

命令格式:alter ‘表名’, {NAME => ‘列族名’, METHOD => ‘delete’}

1
alter 'test', {NAME => 'teacherInfo', METHOD => 'delete'}

更改列族存储版本的限制

默认情况下,列族只存储一个版本的数据,如果需要存储多个版本的数据,则需要修改列族的属性。修改后可通过 desc 命令查看。

1
alter 'test',{NAME=>'columnFamliy1',VERSIONS=>3}

查看表

  • 查看所有表:list
  • 查看表的详细信息:describe 'test'
  • 检查表是否存在:exists 'test'

增删改

插入数据

命令格式put '表名', '行键','列族:列','值'

注意:如果新增数据的行键值、列族名、列名与原有数据完全相同,则相当于更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
put 'test', 'rowkey1', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey2', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey2', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey2', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey3', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey3', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey3', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey1', 'columnFamliy2:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy2:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy2:c', 'valueC'

获取指定行、列族、列

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

删除指定行、列

  • 删除指定行:delete 'test','rowkey2'
  • 删除指定行中指定列的数据:delete 'test','rowkey2','columnFamliy1:a'

查询

hbase 中访问数据有两种基本的方式:

  • 按指定 rowkey 获取数据:get 方法;
  • 按指定条件获取数据:scan 方法。

scan 可以设置 begin 和 end 参数来访问一个范围内所有的数据。get 本质上就是 begin 和 end 相等的一种特殊的 scan。

get 查询

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

scan 查询

查询整表数据

1
scan 'test'

查询指定列簇的数据

1
scan 'test', {COLUMN=>'columnFamliy1'}

条件查询

1
2
# 查询指定列的数据
scan 'test', {COLUMNS=> 'columnFamliy1:a'}

除了列 (COLUMNS) 修饰词外,HBase 还支持 Limit(限制查询结果行数),STARTROWROWKEY 起始行,会先根据这个 key 定位到 region,再向后扫描)、STOPROW(结束行)、TIMERANGE(限定时间戳范围)、VERSIONS(版本数)、和 FILTER(按条件过滤行)等。

如下代表从 rowkey2 这个 rowkey 开始,查找下两个行的最新 3 个版本的 name 列的数据:

1
scan 'test', {COLUMNS=> 'columnFamliy1:a',STARTROW => 'rowkey2',STOPROW => 'rowkey3',LIMIT=>2, VERSIONS=>3}

条件过滤

Filter 可以设定一系列条件来进行过滤。如我们要查询值等于 24 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'binary:24')"

值包含 valueA 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'substring:valueA')"

列名中的前缀为 b 的:

1
scan 'test', FILTER=>"ColumnPrefixFilter('b')"

FILTER 中支持多个过滤条件通过括号、AND 和 OR 进行组合:

1
2
# 列名中的前缀为 b 且列值中包含1998的数据
scan 'test', FILTER=>"ColumnPrefixFilter('b') AND ValueFilter ValueFilter(=,'substring:A')"

PrefixFilter 用于对 Rowkey 的前缀进行判断:

1
scan 'test', FILTER=>"PrefixFilter('wr')"

参考资料

Zipkin 快速入门

Zipkin 是一个基于 Java 开发的、开源的、分布式实时数据跟踪系统(Distributed Tracking System)。它采集有助于解决服务架构中延迟问题的实时数据。

Zipkin 主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver 的 Pinpoint、Apache 的 HTrace、阿里的鹰眼 Tracing、京东的 Hydra、新浪的 Watchman,美团点评的 CAT,skywalking 等。

Zipkin 基于 Google Dapper 的论文设计而来,由 Twitter 公司开发贡献。

一、Zipkin 简介

特性

如果日志文件中有跟踪 ID,则可以直接跳至该跟踪 ID。 否则,您可以基于属性进行查询,例如服务,操作名称,标签和持续时间。 将为您总结一些有趣的数据,例如在服务中花费的时间百分比以及操作是否失败。

Zipkin UI 还提供了一个依赖关系图,该关系图显示了每个应用程序中跟踪了多少个请求。这对于识别聚合行为(包括错误路径或对不赞成使用的服务的调用)很有帮助。

Zipkin UI

多平台

Zipkin 官方支持 C#、Go、Java、JavaScript、Ruby、Scala、PHP 语言。

除此以外,社区还贡献了多种其他语言的支持,详情可以参考官方文档:Tracers and Instrumentation

数据

Zipkin 服务器捆绑了用于采集和存储数据的扩展。

默认情况下,数据可以通过 HttpKafkaRabbitMQ 或 RPC 传输。

并存储在内存中或 MySQLCassandraElasticsearch 中。

数据以 json 形式存储,可以参考:Zipkin 官方的 Swagger API

Zipkin Swagger API

二、Zipkin 安装

Docker

Docker 启动方式:

1
docker run -d -p 9411:9411 openzipkin/zipkin

Java

注意:必须运行在 JDK8+ 环境

Java 启动方式:

1
2
curl -sSL https://zipkin.io/quickstart.sh | bash -s
java -jar zipkin.jar

编译方式

适用于需要订制化的场景。

1
2
3
4
5
6
7
# get the latest source
git clone https://github.com/openzipkin/zipkin
cd zipkin
# Build the server and also make its dependencies
./mvnw -DskipTests --also-make -pl zipkin-server clean install
# Run the server
java -jar ./zipkin-server/target/zipkin-server-*exec.jar

三、Zipkin 架构

ZipKin 可以分为两部分,

  • 一部分是 Zipkin server,用来作为数据的采集存储、数据分析与展示;
  • 另一部分是 Zipkin client 是 Zipkin 基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能。

架构如下:

Zipkin 架构

Zipkin Server

Zipkin Server 主要包括四个模块:

  • Collector - 负责采集客户端传输的数据。
  • Storage - 负责存储采集的数据。当前支持 Memory,MySQL,Cassandra,ElasticSearch 等,默认存储在内存中。
  • API(Query) - 负责查询 Storage 中存储的数据。提供简单的 JSON API 获取数据,主要提供给 web UI 使用。
  • UI - 提供简单的 web 界面。

Instrumented Client 和 Instrumented Server,是指分布式架构中使用了 Trace 工具的两个应用,Client 会调用 Server 提供的服务,两者都会向 Zipkin 上报 Trace 相关信息。在 Client 和 Server 通过 Transport 上报 Trace 信息后,由 Zipkin 的 Collector 模块接收,并由 Storage 模块将数据存储在对应的存储介质中,然后 Zipkin 提供 API 供 UI 界面查询 Trace 跟踪信息。Non-Instrumented Server,指的是未使用 Trace 工具的 Server,显然它不会上报 Trace 信息。

Zipkin Client

  • Tracer - Tracer 存在于你的应用中,它负责采集关于已发生操作的实时元数据。它们通常会检测库,因此对于用户是透明的。例如,已检测的 Web 服务器记录它何时接收到请求,以及何时发送响应。收集的跟踪数据称为跨度(Span)。
  • Instrumentation - Instrumentation 保证了生产环境的安全性和很少的开销。因此,它们仅在内部传播 ID,以告知接收方正在进行追踪。完成的 Span 将通过外部通信告知 Zipkin,类似于应用程序异步报告指标的方式。例如,当跟踪某个操作并且需要发出 http 请求时,会添加一些 header 来传播 ID。header 不用于发送详细信息,例如操作名称。
  • Reporter - 能够将数据发送到 Zipkin 的检测应用程序中的组件,被称为 Reporter。Reporter 有多种传输方式,可以将跟踪数据发送到 Zipkin 采集器,后者将跟踪数据持久化保存到存储中。稍后,API 会查询存储以向 UI 提供渲染数据。

以下是 Zipkin 的一个示例工作流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
│ User Code │ │ Trace Instrumentation │ │ Http Client │ │ Zipkin Collector │
└─────────────┘ └───────────────────────┘ └─────────────┘ └──────────────────┘
│ │ │ │
┌─────────┐
│ ──┤GET /foo ├─▶ │ ────┐ │ │
└─────────┘ │ record tags
│ │ ◀───┘ │ │
────┐
│ │ │ add trace headers │ │
◀───┘
│ │ ────┐ │ │
│ record timestamp
│ │ ◀───┘ │ │
┌─────────────────┐
│ │ ──┤GET /foo ├─▶ │ │
│X-B3-TraceId: aa │ ────┐
│ │ │X-B3-SpanId: 6b │ │ │ │
└─────────────────┘ │ invoke
│ │ │ │ request │

│ │ │ │ │
┌────────┐ ◀───┘
│ │ ◀─────┤200 OK ├─────── │ │
────┐ └────────┘
│ │ │ record duration │ │
┌────────┐ ◀───┘
│ ◀──┤200 OK ├── │ │ │
└────────┘ ┌────────────────────────────────┐
│ │ ──┤ asynchronously report span ├────▶ │
│ │
│{ │
│ "traceId": "aa", │
│ "id": "6b", │
│ "name": "get", │
│ "timestamp": 1483945573944000,│
│ "duration": 386000, │
│ "annotations": [ │
│--snip-- │
└────────────────────────────────┘

Instrumented client 和 server 是分别使用了 ZipKin Client 的服务,Zipkin Client 会根据配置将追踪数据发送到 Zipkin Server 中进行数据存储、分析和展示。

四、Zipkin 客户端

Brave 是 Java 版的 zipkin 客户端。

一般不会手动编写 Trace 相关的代码,Brave 提供可一些开箱即用的库,帮助我们追踪一些特定的请求。比如:dubbo、grpc、servlet、mysql、httpClient、kafka、springMVC 等。

参考资料

Spring AOP

AOP 概念

什么是 AOP

AOP(Aspect-Oriented Programming,即 面向切面编程)与 OOP( Object-Oriented Programming,面向对象编程) 相辅相成,提供了与 OOP 不同的抽象软件结构的视角。

在 OOP 中,我们以类(class)作为我们的基本单元,而 AOP 中的基本单元是 Aspect(切面)

术语

Aspect(切面)

aspectpointcountadvice 组成, 它既包含了横切逻辑的定义, 也包括了连接点的定义. Spring AOP 就是负责实施切面的框架, 它将切面所定义的横切逻辑织入到切面所指定的连接点中.
AOP 的工作重心在于如何将增强织入目标对象的连接点上, 这里包含两个工作:

  1. 如何通过 pointcut 和 advice 定位到特定的 joinpoint 上
  2. 如何在 advice 中编写切面代码.

可以简单地认为, 使用 @Aspect 注解的类就是切面.

advice(增强)

由 aspect 添加到特定的 join point(即满足 point cut 规则的 join point) 的一段代码.
许多 AOP 框架, 包括 Spring AOP, 会将 advice 模拟为一个拦截器(interceptor), 并且在 join point 上维护多个 advice, 进行层层拦截.
例如 HTTP 鉴权的实现, 我们可以为每个使用 RequestMapping 标注的方法织入 advice, 当 HTTP 请求到来时, 首先进入到 advice 代码中, 在这里我们可以分析这个 HTTP 请求是否有相应的权限, 如果有, 则执行 Controller, 如果没有, 则抛出异常. 这里的 advice 就扮演着鉴权拦截器的角色了.

连接点(join point)

a point during the execution of a program, such as the execution of a method or the handling of an exception. In Spring AOP, a join point always represents a method execution.

程序运行中的一些时间点, 例如一个方法的执行, 或者是一个异常的处理.
在 Spring AOP 中, join point 总是方法的执行点, 即只有方法连接点.

切点(point cut)

匹配 join point 的谓词(a predicate that matches join points).
Advice 是和特定的 point cut 关联的, 并且在 point cut 相匹配的 join point 中执行.
在 Spring 中, 所有的方法都可以认为是 joinpoint, 但是我们并不希望在所有的方法上都添加 Advice, 而 pointcut 的作用就是提供一组规则(使用 AspectJ pointcut expression language 来描述) 来匹配joinpoint, 给满足规则的 joinpoint 添加 Advice.

关于 join point 和 point cut 的区别

在 Spring AOP 中, 所有的方法执行都是 join point. 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 因此 join point 和 point cut 本质上就是两个不同纬度上的东西.
advice 是在 join point 上执行的, 而 point cut 规定了哪些 join point 可以执行哪些 advice

introduction

为一个类型添加额外的方法或字段. Spring AOP 允许我们为 目标对象 引入新的接口(和对应的实现). 例如我们可以使用 introduction 来为一个 bean 实现 IsModified 接口, 并以此来简化 caching 的实现.

目标对象(Target)

织入 advice 的目标对象. 目标对象也被称为 advised object.
因为 Spring AOP 使用运行时代理的方式来实现 aspect, 因此 adviced object 总是一个代理对象(proxied object)
注意, adviced object 指的不是原来的类, 而是织入 advice 后所产生的代理类.

AOP proxy

一个类被 AOP 织入 advice, 就会产生一个结果类, 它是融合了原类和增强逻辑的代理类.
在 Spring AOP 中, 一个 AOP 代理是一个 JDK 动态代理对象或 CGLIB 代理对象.

织入(Weaving)

将 aspect 和其他对象连接起来, 并创建 adviced object 的过程.
根据不同的实现技术, AOP 织入有三种方式:

  • 编译器织入, 这要求有特殊的 Java 编译器.
  • 类装载期织入, 这需要有特殊的类装载器.
  • 动态代理织入, 在运行期为目标类添加增强(Advice)生成子类的方式.
    Spring 采用动态代理织入, 而 AspectJ 采用编译器织入和类装载期织入.

advice 的类型

  • before advice, 在 join point 前被执行的 advice. 虽然 before advice 是在 join point 前被执行, 但是它并不能够阻止 join point 的执行, 除非发生了异常(即我们在 before advice 代码中, 不能人为地决定是否继续执行 join point 中的代码)
  • after return advice, 在一个 join point 正常返回后执行的 advice
  • after throwing advice, 当一个 join point 抛出异常后执行的 advice
  • after(final) advice, 无论一个 join point 是正常退出还是发生了异常, 都会被执行的 advice.
  • around advice, 在 join point 前和 joint point 退出后都执行的 advice. 这个是最常用的 advice.

关于 AOP Proxy

Spring AOP 默认使用标准的 JDK 动态代理(dynamic proxy)技术来实现 AOP 代理, 通过它, 我们可以为任意的接口实现代理.
如果需要为一个类实现代理, 那么可以使用 CGLIB 代理. 当一个业务逻辑对象没有实现接口时, 那么 Spring AOP 就默认使用 CGLIB 来作为 AOP 代理了. 即如果我们需要为一个方法织入 advice, 但是这个方法不是一个接口所提供的方法, 则此时 Spring AOP 会使用 CGLIB 来实现动态代理. 鉴于此, Spring AOP 建议基于接口编程, 对接口进行 AOP 而不是类.

彻底理解 aspect, join point, point cut, advice

看完了上面的理论部分知识, 我相信还是会有不少朋友感觉到 AOP 的概念还是很模糊, 对 AOP 中的各种概念理解的还不是很透彻. 其实这很正常, 因为 AOP 中的概念是在是太多了, 我当时也是花了老大劲才梳理清楚的.
下面我以一个简单的例子来比喻一下 AOP 中 aspect, jointpoint, pointcut 与 advice 之间的关系.

让我们来假设一下, 从前有一个叫爪哇的小县城, 在一个月黑风高的晚上, 这个县城中发生了命案. 作案的凶手十分狡猾, 现场没有留下什么有价值的线索. 不过万幸的是, 刚从隔壁回来的老王恰好在这时候无意中发现了凶手行凶的过程, 但是由于天色已晚, 加上凶手蒙着面, 老王并没有看清凶手的面目, 只知道凶手是个男性, 身高约七尺五寸. 爪哇县的县令根据老王的描述, 对守门的士兵下命令说: 凡是发现有身高七尺五寸的男性, 都要抓过来审问. 士兵当然不敢违背县令的命令, 只好把进出城的所有符合条件的人都抓了起来.

来让我们看一下上面的一个小故事和 AOP 到底有什么对应关系.
首先我们知道, 在 Spring AOP 中 join point 指代的是所有方法的执行点, 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 对应到我们在上面举的例子, 我们可以做一个简单的类比, join point 就相当于 爪哇的小县城里的百姓, point cut 就相当于 老王所做的指控, 即凶手是个男性, 身高约七尺五寸, 而 advice 则是施加在符合老王所描述的嫌疑人的动作: 抓过来审问.
为什么可以这样类比呢?

  • join point –> 爪哇的小县城里的百姓: 因为根据定义, join point 是所有可能被织入 advice 的候选的点, 在 Spring AOP 中, 则可以认为所有方法执行点都是 join point. 而在我们上面的例子中, 命案发生在小县城中, 按理说在此县城中的所有人都有可能是嫌疑人.
  • point cut –> 男性, 身高约七尺五寸: 我们知道, 所有的方法(joint point) 都可以织入 advice, 但是我们并不希望在所有方法上都织入 advice, 而 pointcut 的作用就是提供一组规则来匹配 joinpoint, 给满足规则的 joinpoint 添加 advice. 同理, 对于县令来说, 他再昏庸, 也知道不能把县城中的所有百姓都抓起来审问, 而是根据凶手是个男性, 身高约七尺五寸, 把符合条件的人抓起来. 在这里凶手是个男性, 身高约七尺五寸 就是一个修饰谓语, 它限定了凶手的范围, 满足此修饰规则的百姓都是嫌疑人, 都需要抓起来审问.
  • advice –> 抓过来审问, advice 是一个动作, 即一段 Java 代码, 这段 Java 代码是作用于 point cut 所限定的那些 join point 上的. 同理, 对比到我们的例子中, 抓过来审问 这个动作就是对作用于那些满足 男性, 身高约七尺五寸爪哇的小县城里的百姓.
  • aspect: aspect 是 point cut 与 advice 的组合, 因此在这里我们就可以类比: “根据老王的线索, 凡是发现有身高七尺五寸的男性, 都要抓过来审问” 这一整个动作可以被认为是一个 aspect.

或则我们也可以从语法的角度来简单类比一下. 我们在学英语时, 经常会接触什么 定语, 被动句 之类的概念, 那么可以做一个不严谨的类比, 即 joinpoint 可以认为是一个 宾语, 而 pointcut 则可以类比为修饰 joinpoint 的定语, 那么整个 aspect 就可以描述为: 满足 pointcut 规则的 joinpoint 会被添加相应的 advice 操作.

@AspectJ 支持

@AspectJ 是一种使用 Java 注解来实现 AOP 的编码风格。

@AspectJ 风格的 AOP 是 AspectJ Project 在 AspectJ 5 中引入的, 并且 Spring 也支持 @AspectJ 的 AOP 风格.

使能 @AspectJ 支持

@AspectJ 可以以 XML 的方式或以注解的方式来使能, 并且不论以哪种方式使能@ASpectJ, 我们都必须保证 aspectjweaver.jar 在 classpath 中.

使用 Java Configuration 方式使能@AspectJ

1
2
3
4
@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
}

使用 XML 方式使能@AspectJ

1
<aop:aspectj-autoproxy/>

定义 aspect(切面)

当使用注解 @Aspect 标注一个 Bean 后, 那么 Spring 框架会自动收集这些 Bean, 并添加到 Spring AOP 中, 例如:

1
2
3
4
@Component
@Aspect
public class MyTest {
}

注意, 仅仅使用@Aspect 注解, 并不能将一个 Java 对象转换为 Bean, 因此我们还需要使用类似 @Component 之类的注解.
注意, 如果一个 类被@Aspect 标注, 则这个类就不能是其他 aspect 的 **advised object** 了, 因为使用 @Aspect 后, 这个类就会被排除在 auto-proxying 机制之外.

声明 pointcut

一个 pointcut 的声明由两部分组成:

  • 一个方法签名, 包括方法名和相关参数
  • 一个 pointcut 表达式, 用来指定哪些方法执行是我们感兴趣的(即因此可以织入 advice).

在@AspectJ 风格的 AOP 中, 我们使用一个方法来描述 pointcut, 即:

1
2
@Pointcut("execution(* com.xys.service.UserService.*(..))") // 切点表达式
private void dataAccessOperation() {} // 切点前面

这个方法必须无返回值.
这个方法本身就是 pointcut signature, pointcut 表达式使用@Pointcut 注解指定.
上面我们简单地定义了一个 pointcut, 这个 pointcut 所描述的是: 匹配所有在包 com.xys.service.UserService 下的所有方法的执行.

切点标志符(designator)

AspectJ5 的切点表达式由标志符(designator)和操作参数组成. 如 “execution(* greetTo(..))” 的切点表达式, **execution** 就是 标志符, 而圆括号里的 *****greetTo(..) 就是操作参数

execution

匹配 join point 的执行, 例如 “execution(* hello(..))” 表示匹配所有目标类中的 hello() 方法. 这个是最基本的 pointcut 标志符.

within

匹配特定包下的所有 join point, 例如 within(com.xys.*) 表示 com.xys 包中的所有连接点, 即包中的所有类的所有方法. 而within(com.xys.service.*Service) 表示在 com.xys.service 包中所有以 Service 结尾的类的所有的连接点.

this 与 target

this 的作用是匹配一个 bean, 这个 bean(Spring AOP proxy) 是一个给定类型的实例(instance of). 而 target 匹配的是一个目标对象(target object, 即需要织入 advice 的原始的类), 此对象是一个给定类型的实例(instance of).

bean

匹配 bean 名字为指定值的 bean 下的所有方法, 例如:

1
2
bean(*Service) // 匹配名字后缀为 Service 的 bean 下的所有方法
bean(myService) // 匹配名字为 myService 的 bean 下的所有方法
args

匹配参数满足要求的的方法.
例如:

1
2
3
4
5
6
7
8
@Pointcut("within(com.xys.demo2.*)")
public void pointcut2() {
}

@Before(value = "pointcut2() && args(name)")
public void doSomething(String name) {
logger.info("---page: {}---", name);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class NormalService {
private Logger logger = LoggerFactory.getLogger(getClass());

public void someMethod() {
logger.info("---NormalService: someMethod invoked---");
}

public String test(String name) {
logger.info("---NormalService: test invoked---");
return "服务一切正常";
}
}

当 NormalService.test 执行时, 则 advice doSomething 就会执行, test 方法的参数 name 就会传递到 doSomething 中.

常用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配只有一个参数 name 的方法
@Before(value = "aspectMethod() && args(name)")
public void doSomething(String name) {
}

// 匹配第一个参数为 name 的方法
@Before(value = "aspectMethod() && args(name, ..)")
public void doSomething(String name) {
}

// 匹配第二个参数为 name 的方法
Before(value = "aspectMethod() && args(*, name, ..)")
public void doSomething(String name) {
}
@annotation

匹配由指定注解所标注的方法, 例如:

1
2
3
@Pointcut("@annotation(com.xys.demo1.AuthChecker)")
public void pointcut() {
}

则匹配由注解 AuthChecker 所标注的方法.

常见的切点表达式

匹配方法签名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配指定包中的所有的方法
execution(* com.xys.service.*(..))

// 匹配当前包中的指定类的所有方法
execution(* UserService.*(..))

// 匹配指定包中的所有 public 方法
execution(public * com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且返回值是 int 类型的方法
execution(public int com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且第一个参数是 String, 返回值是 int 类型的方法
execution(public int com.xys.service.*(String name, ..))
匹配类型签名
1
2
3
4
5
6
7
8
9
10
11
12
// 匹配指定包中的所有的方法, 但不包括子包
within(com.xys.service.*)

// 匹配指定包中的所有的方法, 包括子包
within(com.xys.service..*)

// 匹配当前包中的指定类中的方法
within(UserService)


// 匹配一个接口的所有实现类中的实现的方法
within(UserDao+)
匹配 Bean 名字
1
2
// 匹配以指定名字结尾的 Bean 中的所有方法
bean(*Service)
切点表达式组合
1
2
3
4
5
// 匹配以 Service 或 ServiceImpl 结尾的 bean
bean(*Service || *ServiceImpl)

// 匹配名字以 Service 结尾, 并且在包 com.xys.service 中的 bean
bean(*Service) && within(com.xys.service.*)

声明 advice

advice 是和一个 pointcut 表达式关联在一起的, 并且会在匹配的 join point 的方法执行的前/后/周围 运行. pointcut 表达式可以是简单的一个 pointcut 名字的引用, 或者是完整的 pointcut 表达式.
下面我们以几个简单的 advice 为例子, 来看一下一个 advice 是如何声明的.

Before advice

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @author xiongyongshun
* @version 1.0
* @created 16/9/9 13:13
*/
@Component
@Aspect
public class BeforeAspectTest {
// 定义一个 Pointcut, 使用 切点表达式函数 来描述对哪些 Join point 使用 advise.
@Pointcut("execution(* com.xys.service.UserService.*(..))")
public void dataAccessOperation() {
}
}
1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Before("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public void doBeforeAccessCheck(JoinPoint joinPoint) {
System.out.println("*****Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

这里, @Before 引用了一个 pointcut, 即 “com.xys.aspect.PointcutDefine.dataAccessOperation()” 是一个 pointcut 的名字.
如果我们在 advice 在内置 pointcut, 则可以:

1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 将 pointcut 和 advice 同时定义
@Before("within(com.xys.service..*)")
public void doAccessCheck(JoinPoint joinPoint) {
System.out.println("*****doAccessCheck, Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

around advice

around advice 比较特别, 它可以在一个方法的之前之前和之后添加不同的操作, 并且甚至可以决定何时, 如何, 是否调用匹配到的方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Around("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public Object doAroundAccessCheck(ProceedingJoinPoint pjp) throws Throwable {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 开始
Object retVal = pjp.proceed();
stopWatch.stop();
// 结束
System.out.println("invoke method: " + pjp.getSignature().getName() + ", elapsed time: " + stopWatch.getTotalTimeMillis());
return retVal;
}
}

around advice 和前面的 before advice 差不多, 只是我们把注解 @Before 改为了 @Around 了.

参考资料

Hive 入门

简介

Hive 是一个构建在 Hadoop 之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类 SQL 查询功能,用于查询的 SQL 语句会被转化为 MapReduce 作业,然后提交到 Hadoop 上运行。

特点

  1. 简单、容易上手 (提供了类似 sql 的查询语言 hql),使得精通 sql 但是不了解 Java 编程的人也能很好地进行大数据分析;
  2. 灵活性高,可以自定义用户函数 (UDF) 和存储格式;
  3. 为超大的数据集设计的计算和存储能力,集群扩展容易;
  4. 统一的元数据管理,可与 presto/impala/sparksql 等共享数据;
  5. 执行延迟高,不适合做数据的实时处理,但适合做海量数据的离线处理。

Hive 的体系架构

img

command-line shell & thrift/jdbc

可以用 command-line shell 和 thrift/jdbc 两种方式来操作数据:

  • command-line shell:通过 hive 命令行的的方式来操作数据;
  • thrift/jdbc:通过 thrift 协议按照标准的 JDBC 的方式操作数据。

Metastore

在 Hive 中,表名、表结构、字段名、字段类型、表的分隔符等统一被称为元数据。所有的元数据默认存储在 Hive 内置的 derby 数据库中,但由于 derby 只能有一个实例,也就是说不能有多个命令行客户端同时访问,所以在实际生产环境中,通常使用 MySQL 代替 derby。

Hive 进行的是统一的元数据管理,就是说你在 Hive 上创建了一张表,然后在 presto/impala/sparksql 中都是可以直接使用的,它们会从 Metastore 中获取统一的元数据信息,同样的你在 presto/impala/sparksql 中创建一张表,在 Hive 中也可以直接使用。

HQL 的执行流程

Hive 在执行一条 HQL 的时候,会经过以下步骤:

  1. 语法解析:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象 语法树 AST Tree;
  2. 语义解析:遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
  3. 生成逻辑执行计划:遍历 QueryBlock,翻译为执行操作树 OperatorTree;
  4. 优化逻辑执行计划:逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量;
  5. 生成物理执行计划:遍历 OperatorTree,翻译为 MapReduce 任务;
  6. 优化物理执行计划:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

关于 Hive SQL 的详细执行流程可以参考美团技术团队的文章:Hive SQL 的编译过程

数据类型

基本数据类型

Hive 表中的列支持以下基本数据类型:

大类 类型
Integers(整型) TINYINT—1 字节的有符号整数
SMALLINT—2 字节的有符号整数
INT—4 字节的有符号整数
BIGINT—8 字节的有符号整数
Boolean(布尔型) BOOLEAN—TRUE/FALSE
Floating point numbers(浮点型) FLOAT— 单精度浮点型
DOUBLE—双精度浮点型
Fixed point numbers(定点数) DECIMAL—用户自定义精度定点数,比如 DECIMAL(7,2)
String types(字符串) STRING—指定字符集的字符序列
VARCHAR—具有最大长度限制的字符序列
CHAR—固定长度的字符序列
Date and time types(日期时间类型) TIMESTAMP — 时间戳
TIMESTAMP WITH LOCAL TIME ZONE — 时间戳,纳秒精度
DATE—日期类型
Binary types(二进制类型) BINARY—字节序列

TIMESTAMP 和 TIMESTAMP WITH LOCAL TIME ZONE 的区别如下:

  • TIMESTAMP WITH LOCAL TIME ZONE:用户提交时间给数据库时,会被转换成数据库所在的时区来保存。查询时则按照查询客户端的不同,转换为查询客户端所在时区的时间。
  • TIMESTAMP :提交什么时间就保存什么时间,查询时也不做任何转换。

隐式转换

Hive 中基本数据类型遵循以下的层次结构,按照这个层次结构,子类型到祖先类型允许隐式转换。例如 INT 类型的数据允许隐式转换为 BIGINT 类型。额外注意的是:按照类型层次结构允许将 STRING 类型隐式转换为 DOUBLE 类型。

img

复杂类型

类型 描述 示例
STRUCT 类似于对象,是字段的集合,字段的类型可以不同,可以使用 名称.字段名 方式进行访问 STRUCT (‘xiaoming’, 12 , ‘2018-12-12’)
MAP 键值对的集合,可以使用 名称[key] 的方式访问对应的值 map(‘a’, 1, ‘b’, 2)
ARRAY 数组是一组具有相同类型和名称的变量的集合,可以使用 名称[index] 访问对应的值 ARRAY(‘a’, ‘b’, ‘c’, ‘d’)

示例

如下给出一个基本数据类型和复杂数据类型的使用示例:

1
2
3
4
5
6
7
CREATE TABLE students(
name STRING, -- 姓名
age INT, -- 年龄
subject ARRAY<STRING>, --学科
score MAP<STRING,FLOAT>, --各个学科考试成绩
address STRUCT<houseNumber:int, street:STRING, city:STRING, province:STRING> --家庭居住地址
) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

内容格式

当数据存储在文本文件中,必须按照一定格式区别行和列,如使用逗号作为分隔符的 CSV 文件 (Comma-Separated Values) 或者使用制表符作为分隔值的 TSV 文件 (Tab-Separated Values)。但此时也存在一个缺点,就是正常的文件内容中也可能出现逗号或者制表符。

所以 Hive 默认使用了几个平时很少出现的字符,这些字符一般不会作为内容出现在文件中。Hive 默认的行和列分隔符如下表所示。

分隔符 描述
\n 对于文本文件来说,每行是一条记录,所以可以使用换行符来分割记录
^A (Ctrl+A) 分割字段 (列),在 CREATE TABLE 语句中也可以使用八进制编码 \001 来表示
^B 用于分割 ARRAY 或者 STRUCT 中的元素,或者用于 MAP 中键值对之间的分割,
在 CREATE TABLE 语句中也可以使用八进制编码 \002 表示
^C 用于 MAP 中键和值之间的分割,在 CREATE TABLE 语句中也可以使用八进制编码 \003 表示

使用示例如下:

1
2
3
4
5
6
CREATE TABLE page_view(viewTime INT, userid BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

存储格式

支持的存储格式

Hive 会在 HDFS 为每个数据库上创建一个目录,数据库中的表是该目录的子目录,表中的数据会以文件的形式存储在对应的表目录下。Hive 支持以下几种文件存储格式:

格式 说明
TextFile 存储为纯文本文件。 这是 Hive 默认的文件存储格式。这种存储方式数据不做压缩,磁盘开销大,数据解析开销大。
SequenceFile SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。
RCFile RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。
ORC Files ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。
Avro Files Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。
Parquet Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。

以上压缩格式中 ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用这两种格式。

指定存储格式

通常在创建表的时候使用 STORED AS 参数指定:

1
2
3
4
5
6
CREATE TABLE page_view(viewTime INT, userid BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

各个存储文件类型指定方式如下:

  • STORED AS TEXTFILE
  • STORED AS SEQUENCEFILE
  • STORED AS ORC
  • STORED AS PARQUET
  • STORED AS AVRO
  • STORED AS RCFILE

内部表和外部表

内部表又叫做管理表 (Managed/Internal Table),创建表时不做任何指定,默认创建的就是内部表。想要创建外部表 (External Table),则需要使用 External 进行修饰。 内部表和外部表主要区别如下:

内部表 外部表
数据存储位置 内部表数据存储的位置由 hive.metastore.warehouse.dir 参数指定,默认情况下表的数据存储在 HDFS 的 /user/hive/warehouse/数据库名.db/表名/ 目录下 外部表数据的存储位置创建表时由 Location 参数指定;
导入数据 在导入数据到内部表,内部表将数据移动到自己的数据仓库目录下,数据的生命周期由 Hive 来进行管理 外部表不会将数据移动到自己的数据仓库目录下,只是在元数据中存储了数据的位置
删除表 删除元数据(metadata)和文件 只删除元数据(metadata)

参考资料

Hive 分区表和分桶表

分区表

概念

Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。

分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where 子句中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能。

这里说明一下分区表并非 Hive 独有的概念,实际上这个概念非常常见。比如在我们常用的 Oracle 数据库中,当表中的数据量不断增大,查询数据的速度就会下降,这时也可以对表进行分区。表进行分区后,逻辑上表仍然是一张完整的表,只是将表中的数据存放到多个表空间(物理文件上),这样查询数据时,就不必要每次都扫描整张表,从而提升查询性能。

使用场景

通常,在管理大规模数据集的时候都需要进行分区,比如将日志文件按天进行分区,从而保证数据细粒度的划分,使得查询性能得到提升。

创建分区表

在 Hive 中可以使用 PARTITIONED BY 子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

加载数据到分区表

加载数据到分区表时候必须要指定数据所处的分区:

1
2
3
4
# 加载部门编号为20的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 加载部门编号为30的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

查看分区目录

这时候我们直接查看表目录,可以看到表目录下存在两个子目录,分别是 deptno=20deptno=30,这就是分区目录,分区目录下才是我们加载的数据文件。

1
# hadoop fs -ls  hdfs://hadoop001:8020/hive/emp_partition/

这时候当你的查询语句的 where 包含 deptno=20,则就去对应的分区目录下进行查找,而不用扫描全表。

img

分桶表

简介

分区提供了一个隔离数据和优化查询的可行方案,但是并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。同时 Hive 会限制动态分区可以创建的最大分区数,用来避免过多分区文件对文件系统产生负担。鉴于以上原因,Hive 还提供了一种更加细粒度的数据拆分方案:分桶表 (bucket Table)。

分桶表会将指定列的值进行哈希散列,并对 bucket(桶数量)取余,然后存储到对应的 bucket(桶)中。

理解分桶表

单从概念上理解分桶表可能会比较晦涩,其实和分区一样,分桶这个概念同样不是 Hive 独有的,对于 Java 开发人员而言,这可能是一个每天都会用到的概念,因为 Hive 中的分桶概念和 Java 数据结构中的 HashMap 的分桶概念是一致的。

当调用 HashMap 的 put() 方法存储数据时,程序会先对 key 值调用 hashCode() 方法计算出 hashcode,然后对数组长度取模计算出 index,最后将数据存储在数组 index 位置的链表上,链表达到一定阈值后会转换为红黑树 (JDK1.8+)。下图为 HashMap 的数据结构图:

img

图片引用自:HashMap vs. Hashtable

创建分桶表

在 Hive 中,我们可以通过 CLUSTERED BY 指定分桶列,并通过 SORTED BY 指定桶中数据的排序参考列。下面为分桶表建表语句示例:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

加载数据到分桶表

这里直接使用 Load 语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。

这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:

设置强制分桶

1
set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步

在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。

CTAS 导入数据

1
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  --这里的 emp 表就是一张普通的雇员表

可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:

img

查看分桶文件

bucket(桶) 本质上就是表目录下的具体文件:

img

分区表和分桶表结合使用

分区表和分桶表的本质都是将数据按照不同粒度进行拆分,从而使得在查询时候不必扫描全表,只需要扫描对应的分区或分桶,从而提升查询效率。两者可以结合起来使用,从而保证表数据在不同粒度上都能得到合理的拆分。下面是 Hive 官方给出的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE page_view_bucketed(
viewTime INT,
userid BIGINT,
page_url STRING,
referrer_url STRING,
ip STRING )
PARTITIONED BY(dt STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

此时导入数据时需要指定分区:

1
2
3
INSERT OVERWRITE page_view_bucketed
PARTITION (dt='2009-02-25')
SELECT * FROM page_view WHERE dt='2009-02-25';

参考资料

Hive 视图和索引

视图

简介

Hive 中的视图和 RDBMS 中视图的概念一致,都是一组数据的逻辑表示,本质上就是一条 SELECT 语句的结果集。视图是纯粹的逻辑对象,没有关联的存储 (Hive 3.0.0 引入的物化视图除外),当查询引用视图时,Hive 可以将视图的定义与查询结合起来,例如将查询中的过滤器推送到视图中。

创建视图

1
2
3
4
5
CREATE VIEW [IF NOT EXISTS] [db_name.]view_name   -- 视图名称
[(column_name [COMMENT column_comment], ...) ] --列名
[COMMENT view_comment] --视图注释
[TBLPROPERTIES (property_name = property_value, ...)] --额外信息
AS SELECT ...;

在 Hive 中可以使用 CREATE VIEW 创建视图,如果已存在具有相同名称的表或视图,则会抛出异常,建议使用 IF NOT EXISTS 预做判断。在使用视图时候需要注意以下事项:

  • 视图是只读的,不能用作 LOAD / INSERT / ALTER 的目标;

  • 在创建视图时候视图就已经固定,对基表的后续更改(如添加列)将不会反映在视图;

  • 删除基表并不会删除视图,需要手动删除视图;

  • 视图可能包含 ORDER BYLIMIT 子句。如果引用视图的查询语句也包含这类子句,其执行优先级低于视图对应字句。例如,视图 custom_view 指定 LIMIT 5,查询语句为 select * from custom_view LIMIT 10,此时结果最多返回 5 行。

  • 创建视图时,如果未提供列名,则将从 SELECT 语句中自动派生列名;

  • 创建视图时,如果 SELECT 语句中包含其他表达式,例如 x + y,则列名称将以_C0,_C1 等形式生成;

    1
    CREATE VIEW  IF NOT EXISTS custom_view AS SELECT empno, empno+deptno , 1+2 FROM emp;

img

查看视图

1
2
3
4
5
6
-- 查看所有视图: 没有单独查看视图列表的语句,只能使用 show tables
show tables;
-- 查看某个视图
desc view_name;
-- 查看某个视图详细信息
desc formatted view_name;

删除视图

1
DROP VIEW [IF EXISTS] [db_name.]view_name;

删除视图时,如果被删除的视图被其他视图所引用,这时候程序不会发出警告,但是引用该视图其他视图已经失效,需要进行重建或者删除。

修改视图

1
ALTER VIEW [db_name.]view_name AS select_statement;

被更改的视图必须存在,且视图不能具有分区,如果视图具有分区,则修改失败。

修改视图属性

语法:

1
2
3
4
ALTER VIEW [db_name.]view_name SET TBLPROPERTIES table_properties;

table_properties:
: (property_name = property_value, property_name = property_value, ...)

示例:

1
ALTER VIEW custom_view SET TBLPROPERTIES ('create'='heibaiying','date'='2019-05-05');

img

索引

简介

Hive 在 0.7.0 引入了索引的功能,索引的设计目标是提高表某些列的查询速度。如果没有索引,带有谓词的查询(如’WHERE table1.column = 10’)会加载整个表或分区并处理所有行。但是如果 column 存在索引,则只需要加载和处理文件的一部分。

索引原理

在指定列上建立索引,会产生一张索引表(表结构如下),里面的字段包括:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。在查询涉及到索引字段时,首先到索引表查找索引列值对应的 HDFS 文件路径及偏移量,这样就避免了全表扫描。

1
2
3
4
5
6
7
+--------------+----------------+----------+--+
| col_name | data_type | comment |
+--------------+----------------+----------+--+
| empno | int | 建立索引的列 |
| _bucketname | string | HDFS 文件路径 |
| _offsets | array<bigint> | 偏移量 |
+--------------+----------------+----------+--+

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE INDEX index_name     --索引名称
ON TABLE base_table_name (col_name, ...) --建立索引的列
AS index_type --索引类型
[WITH DEFERRED REBUILD] --重建索引
[IDXPROPERTIES (property_name=property_value, ...)] --索引额外属性
[IN TABLE index_table_name] --索引表的名字
[
[ ROW FORMAT ...] STORED AS ...
| STORED BY ...
] --索引表行分隔符 、 存储格式
[LOCATION hdfs_path] --索引表存储位置
[TBLPROPERTIES (...)] --索引表表属性
[COMMENT "index comment"]; --索引注释

查看索引

1
2
--显示表上所有列的索引
SHOW FORMATTED INDEX ON table_name;

删除索引

删除索引会删除对应的索引表。

1
DROP INDEX [IF EXISTS] index_name ON table_name;

如果存在索引的表被删除了,其对应的索引和索引表都会被删除。如果被索引表的某个分区被删除了,那么分区对应的分区索引也会被删除。

重建索引

1
ALTER INDEX index_name ON table_name [PARTITION partition_spec] REBUILD;

重建索引。如果指定了 PARTITION,则仅重建该分区的索引。

索引案例

创建索引

在 emp 表上针对 empno 字段创建名为 emp_index,索引数据存储在 emp_index_table 索引表中

1
2
3
4
create index emp_index on table emp(empno) as
'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with deferred rebuild
in table emp_index_table ;

此时索引表中是没有数据的,需要重建索引才会有索引的数据。

重建索引

1
alter index emp_index on emp rebuild;

Hive 会启动 MapReduce 作业去建立索引,建立好后查看索引表数据如下。三个表字段分别代表:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。

img

自动使用索引

默认情况下,虽然建立了索引,但是 Hive 在查询时候是不会自动去使用索引的,需要开启相关配置。开启配置后,涉及到索引列的查询就会使用索引功能去优化查询。

1
2
3
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
SET hive.optimize.index.filter=true;
SET hive.optimize.index.filter.compact.minsize=0;

查看索引

1
SHOW INDEX ON emp;

img

索引的缺陷

索引表最主要的一个缺陷在于:索引表无法自动 rebuild,这也就意味着如果表中有数据新增或删除,则必须手动 rebuild,重新执行 MapReduce 作业,生成索引表数据。

同时按照官方文档 的说明,Hive 会从 3.0 开始移除索引功能,主要基于以下两个原因:

  • 具有自动重写的物化视图 (Materialized View) 可以产生与索引相似的效果(Hive 2.3.0 增加了对物化视图的支持,在 3.0 之后正式引入)。
  • 使用列式存储文件格式(Parquet,ORC)进行存储时,这些格式支持选择性扫描,可以跳过不需要的文件或块。

ORC 内置的索引功能可以参阅这篇文章:Hive 性能优化之 ORC 索引–Row Group Index vs Bloom Filter Index

参考资料

Hive 数据查询详解

数据准备

为了演示查询操作,这里需要预先创建三张表,并加载测试数据。

数据文件 emp.txt 和 dept.txt 可以从本仓库的resources 目录下载。

员工表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 -- 建表语句
CREATE TABLE emp(
empno INT, -- 员工表编号
ename STRING, -- 员工姓名
job STRING, -- 职位类型
mgr INT,
hiredate TIMESTAMP, --雇佣日期
sal DECIMAL(7,2), --工资
comm DECIMAL(7,2),
deptno INT) --部门编号
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

--加载数据
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp;

部门表

1
2
3
4
5
6
7
8
9
10
-- 建表语句
CREATE TABLE dept(
deptno INT, --部门编号
dname STRING, --部门名称
loc STRING --部门所在的城市
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

--加载数据
LOAD DATA LOCAL INPATH "/usr/file/dept.txt" OVERWRITE INTO TABLE dept;

分区表

这里需要额外创建一张分区表,主要是为了演示分区查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE EXTERNAL TABLE emp_ptn(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";


--加载数据
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=20)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=30)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=40)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=50)

单表查询

SELECT

1
2
-- 查询表中全部数据
SELECT * FROM emp;

WHERE

1
2
-- 查询 10 号部门中员工编号大于 7782 的员工信息
SELECT * FROM emp WHERE empno > 7782 AND deptno = 10;

DISTINCT

Hive 支持使用 DISTINCT 关键字去重。

1
2
-- 查询所有工作类型
SELECT DISTINCT job FROM emp;

分区查询

分区查询 (Partition Based Queries),可以指定某个分区或者分区范围。

1
2
3
-- 查询分区表中部门编号在[20,40]之间的员工
SELECT emp_ptn.* FROM emp_ptn
WHERE emp_ptn.deptno >= 20 AND emp_ptn.deptno <= 40;

LIMIT

1
2
-- 查询薪资最高的 5 名员工
SELECT * FROM emp ORDER BY sal DESC LIMIT 5;

GROUP BY

Hive 支持使用 GROUP BY 进行分组聚合操作。

1
2
3
4
set hive.map.aggr=true;

-- 查询各个部门薪酬综合
SELECT deptno,SUM(sal) FROM emp GROUP BY deptno;

hive.map.aggr 控制程序如何进行聚合。默认值为 false。如果设置为 true,Hive 会在 map 阶段就执行一次聚合。这可以提高聚合效率,但需要消耗更多内存。

ORDER AND SORT

可以使用 ORDER BY 或者 Sort BY 对查询结果进行排序,排序字段可以是整型也可以是字符串:如果是整型,则按照大小排序;如果是字符串,则按照字典序排序。ORDER BY 和 SORT BY 的区别如下:

  • 使用 ORDER BY 时会有一个 Reducer 对全部查询结果进行排序,可以保证数据的全局有序性;
  • 使用 SORT BY 时只会在每个 Reducer 中进行排序,这可以保证每个 Reducer 的输出数据是有序的,但不能保证全局有序。

由于 ORDER BY 的时间可能很长,如果你设置了严格模式 (hive.mapred.mode = strict),则其后面必须再跟一个 limit 子句。

注 :hive.mapred.mode 默认值是 nonstrict ,也就是非严格模式。

1
2
-- 查询员工工资,结果按照部门升序,按照工资降序排列
SELECT empno, deptno, sal FROM emp ORDER BY deptno ASC, sal DESC;

HAVING

可以使用 HAVING 对分组数据进行过滤。

1
2
-- 查询工资总和大于 9000 的所有部门
SELECT deptno,SUM(sal) FROM emp GROUP BY deptno HAVING SUM(sal)>9000;

DISTRIBUTE BY

默认情况下,MapReduce 程序会对 Map 输出结果的 Key 值进行散列,并均匀分发到所有 Reducer 上。如果想要把具有相同 Key 值的数据分发到同一个 Reducer 进行处理,这就需要使用 DISTRIBUTE BY 字句。

需要注意的是,DISTRIBUTE BY 虽然能保证具有相同 Key 值的数据分发到同一个 Reducer,但是不能保证数据在 Reducer 上是有序的。情况如下:

把以下 5 个数据发送到两个 Reducer 上进行处理:

1
2
3
4
5
k1
k2
k4
k3
k1

Reducer1 得到如下乱序数据:

1
2
3
k1
k2
k1

Reducer2 得到数据如下:

1
2
k4
k3

如果想让 Reducer 上的数据时有序的,可以结合 SORT BY 使用 (示例如下),或者使用下面我们将要介绍的 CLUSTER BY。

1
2
-- 将数据按照部门分发到对应的 Reducer 上处理
SELECT empno, deptno, sal FROM emp DISTRIBUTE BY deptno SORT BY deptno ASC;

CLUSTER BY

如果 SORT BYDISTRIBUTE BY 指定的是相同字段,且 SORT BY 排序规则是 ASC,此时可以使用 CLUSTER BY 进行替换,同时 CLUSTER BY 可以保证数据在全局是有序的。

1
SELECT empno, deptno, sal FROM emp CLUSTER  BY deptno ;

多表联结查询

Hive 支持内连接,外连接,左外连接,右外连接,笛卡尔连接,这和传统数据库中的概念是一致的,可以参见下图。

需要特别强调:JOIN 语句的关联条件必须用 ON 指定,不能用 WHERE 指定,否则就会先做笛卡尔积,再过滤,这会导致你得不到预期的结果 (下面的演示会有说明)。

img

INNER JOIN

1
2
3
4
5
6
7
8
-- 查询员工编号为 7369 的员工的详细信息
SELECT e.*,d.* FROM
emp e JOIN dept d
ON e.deptno = d.deptno
WHERE empno=7369;

--如果是三表或者更多表连接,语法如下
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

LEFT OUTER JOIN

LEFT OUTER JOIN 和 LEFT JOIN 是等价的。

1
2
3
4
-- 左连接
SELECT e.*,d.*
FROM emp e LEFT OUTER JOIN dept d
ON e.deptno = d.deptno;

RIGHT OUTER JOIN

1
2
3
4
--右连接
SELECT e.*,d.*
FROM emp e RIGHT OUTER JOIN dept d
ON e.deptno = d.deptno;

执行右连接后,由于 40 号部门下没有任何员工,所以此时员工信息为 NULL。这个查询可以很好的复述上面提到的——JOIN 语句的关联条件必须用 ON 指定,不能用 WHERE 指定。你可以把 ON 改成 WHERE,你会发现无论如何都查不出 40 号部门这条数据,因为笛卡尔运算不会有 (NULL, 40) 这种情况。

img

FULL OUTER JOIN

1
2
3
SELECT e.*,d.*
FROM emp e FULL OUTER JOIN dept d
ON e.deptno = d.deptno;

LEFT SEMI JOIN

LEFT SEMI JOIN (左半连接)是 IN/EXISTS 子查询的一种更高效的实现。

  • JOIN 子句中右边的表只能在 ON 子句中设置过滤条件;
  • 查询结果只包含左边表的数据,所以只能 SELECT 左表中的列。
1
2
3
4
5
6
7
8
-- 查询在纽约办公的所有员工信息
SELECT emp.*
FROM emp LEFT SEMI JOIN dept
ON emp.deptno = dept.deptno AND dept.loc="NEW YORK";

--上面的语句就等价于
SELECT emp.* FROM emp
WHERE emp.deptno IN (SELECT deptno FROM dept WHERE loc="NEW YORK");

JOIN

笛卡尔积连接,这个连接日常的开发中可能很少遇到,且性能消耗比较大,基于这个原因,如果在严格模式下 (hive.mapred.mode = strict),Hive 会阻止用户执行此操作。

1
SELECT * FROM emp JOIN dept;

JOIN 优化

STREAMTABLE

在多表进行联结的时候,如果每个 ON 字句都使用到共同的列(如下面的 b.key),此时 Hive 会进行优化,将多表 JOIN 在同一个 map / reduce 作业上进行。同时假定查询的最后一个表(如下面的 c 表)是最大的一个表,在对每行记录进行 JOIN 操作时,它将尝试将其他的表缓存起来,然后扫描最后那个表进行计算。因此用户需要保证查询的表的大小从左到右是依次增加的。

1
`SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key) JOIN c ON (c.key = b.key)`

然后,用户并非需要总是把最大的表放在查询语句的最后面,Hive 提供了 /*+ STREAMTABLE() */ 标志,用于标识最大的表,示例如下:

1
2
3
4
SELECT /*+ STREAMTABLE(d) */  e.*,d.*
FROM emp e JOIN dept d
ON e.deptno = d.deptno
WHERE job='CLERK';

MAPJOIN

如果所有表中只有一张表是小表,那么 Hive 把这张小表加载到内存中。这时候程序会在 map 阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在 map 就进行了 JOIN 操作,从而可以省略 reduce 过程,这样效率可以提升很多。Hive 中提供了 /*+ MAPJOIN() */ 来标记小表,示例如下:

1
2
3
4
SELECT /*+ MAPJOIN(d) */ e.*,d.*
FROM emp e JOIN dept d
ON e.deptno = d.deptno
WHERE job='CLERK';

SELECT 的其他用途

查看当前数据库:

1
SELECT current_database()

本地模式

在上面演示的语句中,大多数都会触发 MapReduce, 少部分不会触发,比如 select * from emp limit 5 就不会触发 MR,此时 Hive 只是简单的读取数据文件中的内容,然后格式化后进行输出。在需要执行 MapReduce 的查询中,你会发现执行时间可能会很长,这时候你可以选择开启本地模式。

1
2
--本地模式默认关闭,需要手动开启此功能
SET hive.exec.mode.local.auto=true;

启用后,Hive 将分析查询中每个 map-reduce 作业的大小,如果满足以下条件,则可以在本地运行它:

  • 作业的总输入大小低于:hive.exec.mode.local.auto.inputbytes.max(默认为 128MB);
  • map-tasks 的总数小于:hive.exec.mode.local.auto.tasks.max(默认为 4);
  • 所需的 reduce 任务总数为 1 或 0。

因为我们测试的数据集很小,所以你再次去执行上面涉及 MR 操作的查询,你会发现速度会有显著的提升。

参考资料