Dunwu Blog

大道至简,知易行难

微服务之注册和发现

微服务的服务定义

想要构建微服务,首先要解决的问题是,服务提供者如何发布一个服务,服务消费者如何引用这个服务。具体来说,就是这个服务的接口名是什么?调用这个服务需要传递哪些参数?接口的返回值是什么类型?以及一些其他接口描述信息。

最常见的服务定义的方式有以下几种:

  • REST API
    • REST API 方式主要被用作 HTTP 或者 HTTPS 协议的接口定义,即使在非微服务架构体系下,也被广泛采用。由于 HTTP 本身就是公开标准网络协议,所以几乎没有什么额外学习成本。
    • 代表 RPC 框架:Spring Cloud Eureka
  • XML 配置
    • XML 配置方式通过在服务提供者和服务消费者之间维持一份对等的 XML 配置文件,来保证服务消费者按照服务提供者的约定来进行服务调用。在这种方式下,如果服务提供者变更了接口定义,不仅需要更新服务提供者加载的接口描述文件 server.xml,还需要同时更新服务消费者加载的接口描述文件 client.xml。但这种方式对业务代码侵入性比较高,XML 配置有变更的时候,服务消费者和服务提供者都要更新,所以适合公司内部联系比较紧密的业务之间采用。
    • 代表 RPC 框架:阿里的 Dubbo(XML 配置示例:基于 Spring XML 开发微服务应用)、微博的 Motan
  • IDL 文件
    • IDL 就是接口描述语言(interface description language)的缩写,IDL 主要用于跨语言的服务之间的调用
    • 代表 RPC 框架:阿里的 Dubbo(XML 配置示例:IDL 定义跨语言服务),Facebook 的 Thrift,Google 的 gRPC
  • SDK 配置

具体采用哪种服务定义方式是根据实际情况决定的:

  • 如果只是企业内部之间的服务调用,并且都是 Java 语言的话,选择 SDK 配置、XML 配置方式是最简单的;
  • 如果企业内部存在多个跨语言服务,建议使用 IDL 文件方式进行描述服务;
  • 如果还存在对外开放服务调用的情形的话,使用 REST API 方式则更加通用。

服务注册和发现的工作原理

服务定义是服务提供者和服务消费者之间的约定,但是在微服务架构中,如何达成这个约定呢?这就依赖于服务注册和发现机制。

在微服务架构下,服务注册和发现机制中主要有三种角色:

  • 服务提供者(RPC Server / Provider)
  • 服务消费者(RPC Client / Consumer)
  • 服务注册中心(Registry)

一般来讲,注册中心的工作流程是:

  • 服务提供者在启动时,根据服务发布文件中配置的发布信息向注册中心注册自己的服务。
  • 服务消费者在启动时,根据消费者配置文件中配置的服务信息向注册中心订阅自己所需要的服务。
  • 注册中心返回服务提供者地址列表给服务消费者。
  • 当服务提供者发生变化,比如有节点新增或者销毁,注册中心将变更通知给服务消费者。

注册中心的核心功能

注册中心本质上是一个用于保存元数据的分布式存储。你如果明白了这一点,就会了解实现一个注册中心的所有要点都是围绕这个目标去构建的。

  • 元数据存储
    • 关键信息 - 注册中心主要用于存储服务信息,而服务信息一般包含三部分内容:
      • 服务名
      • 分组 - 服务一般会分成多个不同的分组,每个分组的目的不同。一般来说有下面几种分组方式:
        • 核心与非核心 - 从业务的核心程度来分。
        • 机房 - 从机房的维度来分。
        • 不同环境 - 从业务场景维度来区分。
      • 节点信息 - 节点信息又包括节点地址和其他节点信息。
    • 层次化的结构 - 注册中心存储服务信息一般采用层次化的结构。以 ZooKeeper 为例:
      • 在 ZooKeeper 中,数据按目录层级存储,每个目录叫作 znode,并且其有一个唯一的路径标识。
      • znode 可以包含数据和子 znode。
      • znode 中的数据可以有多个版本,比如某一个 znode 下存有多个数据版本,那么查询这个路径下的数据需带上版本信息。
  • 注册中心 API - 既然是分布式存储,势必要提供支持读写数据的接口,也就是 API,一般来说,需要支持以下功能:
    • 服务注册 - 服务提供者通过调用服务注册接口来完成服务注册。
    • 服务反注册 - 服务提供者通过调用服务反注册接口来完成服务注销。
    • 心跳汇报 - 服务提供者通过调用心跳汇报接口完成节点存活状态上报。
    • 服务订阅 - 服务消费者通过调用服务订阅接口完成服务订阅,获取可用的服务提供者节点列表。
    • 服务变更查询 - 服务消费者通过调用服务变更查询接口,获取最新的可用服务节点列表。
    • 服务查询 - 查询注册中心当前注册了哪些服务信息。
    • 服务修改 - 修改注册中心中某一服务的信息。
  • 服务健康检查 - 注册中心需要一种机制,来判断注册的服务是否还“活着”,即使用长连接或心跳探测方式来检查服务健康状态——这也是分布式应用常见的做法。以 ZooKeeper 注册中心为例:
    • ZooKeeper 客户端和服务端维持的是一个长连接。连接成功后,会生成一个全局唯一的 Session ID,客户端定期发送心跳消息,服务端收到后重置会话超时时间。如果超时,则认为连接结束。
    • 如果一个服务将 ZooKeeper 作为服务注册中心,一旦连接超时,ZooKeeper 会认为这个服务节点已经不可用,就会将其信息删除。
  • 服务状态订阅 - 当服务状态变更,如服务提供者某节点 IP 宕机,需要让服务消费者感知变化,可以基于订阅者模式实现:服务消费者可以监听服务提供者的节点信息,一旦服务提供者的节点信息变化,就可以获取到变更状态。
  • 集群部署 - 注册中心一般都是采用集群部署来保证高可用性,并通过分布式一致性协议来确保集群中不同节点之间的数据保持一致。根据 CAP 理论,三种特性无法同时达成,必须在可用性和一致性之间做取舍。于是,根据不同侧重点,注册中心可以分为 CP 和 AP 两个阵营:
    • CP 型注册中心 - 牺牲可用性来换取数据强一致性,最典型的例子就是 ZooKeeper,etcd,Consul 了。ZooKeeper 集群内只有一个 Leader,而且在 Leader 无法使用的时候通过 Paxos 算法选举出一个新的 Leader。这个 Leader 的目的就是保证写信息的时候只向这个 Leader 写入,Leader 会同步信息到 Followers,这个过程就可以保证数据的强一致性。但如果多个 ZooKeeper 之间网络出现问题,造成出现多个 Leader,发生脑裂的话,注册中心就不可用了。而 etcd 和 Consul 集群内都是通过 Raft 协议来保证强一致性,如果出现脑裂的话, 注册中心也不可用。
    • AP 型注册中心 - 牺牲一致性(只保证最终一致性)来换取可用性,最典型的例子就是 Eureka 了。对比下 Zookeeper,Eureka 不用选举一个 Leader,每个 Eureka 服务器单独保存服务注册地址,因此有可能出现数据信息不一致的情况。但是当网络出现问题的时候,每台服务器都可以完成独立的服务。

注册中心的扩展功能

多注册中心

对于服务消费者来说,要能够同时从多个注册中心订阅服务;

对于服务提供者来说,要能够同时向多个注册中心注册服务。

并行订阅服务

如果只支持串行订阅,如果服务消费者订阅的服务较多,并且某些服务节点的初始化连接过程中出现连接超时的情况,则后续所有的服务节点的初始化连接都需要等待它完成,这就会导致消费者启动非常慢。

可以每订阅一个服务就单独用一个线程来处理,这样的话即使遇到个别服务节点连接超时,其他服务节点的初始化连接也不受影响,最慢也就是这个服务节点的初始化连接耗费的时间,最终所有服务节点的初始化连接耗时控制在了 30 秒以内。

批量注销服务

在与注册中心的多次交互中,可能由于网络抖动、注册中心集群异常等原因,导致个别调用失败。对于注册中心来说,偶发的注册调用失败对服务调用基本没有影响,其结果顶多就是某一个服务少了一个可用的节点。但偶发的反注册调用失败会导致不可用的节点残留在注册中心中,变成“僵尸节点”。

需要定时去清理注册中心中的“僵尸节点”,如果支持批量注销服务,就可以一次调用就把该节点上提供的所有服务同时注销掉。

服务变更信息增量更新

为了减少服务消费者从注册中心中拉取的服务可用节点信息的数据量,这个时候可以通过增量更新的方式,注册中心只返回变化的那部分节点信息。尤其在只有少数节点信息变更时,此举可以大大减少服务消费者从注册中心拉取的数据量,从而最大程度避免产生网络风暴。

心跳开关保护机制

在网络频繁抖动的情况下,注册中心中可用的节点会不断变化,这时候服务消费者会频繁收到服务提供者节点变更的信息,于是就不断地请求注册中心来拉取最新的可用服务节点信息。当有成百上千个服务消费者,同时请求注册中心获取最新的服务提供者的节点信息时,可能会把注册中心的带宽给占满,尤其是注册中心是百兆网卡的情况下。

所以针对这种情况,需要一种保护机制,即使在网络频繁抖动的时候,服务消费者也不至于同时去请求注册中心获取最新的服务节点信息

我曾经就遇到过这种情况,一个可行的解决方案就是给注册中心设置一个开关,当开关打开时,即使网络频繁抖动,注册中心也不会通知所有的服务消费者有服务节点信息变更,比如只给 10% 的服务消费者返回变更,这样的话就能将注册中心的请求量减少到原来的 1/10。

当然打开这个开关也是有一定代价的,它会导致服务消费者感知最新的服务节点信息延迟,原先可能在 10s 内就能感知到服务提供者节点信息的变更,现在可能会延迟到几分钟,所以在网络正常的情况下,开关并不适合打开;可以作为一个紧急措施,在网络频繁抖动的时候,才打开这个开关。

服务节点摘除保护机制

服务提供者在进程启动时,会注册服务到注册中心,并每隔一段时间,汇报心跳给注册中心,以标识自己的存活状态。如果隔了一段固定时间后,服务提供者仍然没有汇报心跳给注册中心,注册中心就会认为该节点已经处于“dead”状态,于是从服务的可用节点信息中移除出去。

如果遇到网络问题,大批服务提供者节点汇报给注册中心的心跳信息都可能会传达失败,注册中心就会把它们都从可用节点列表中移除出去,造成剩下的可用节点难以承受所有的调用,引起“雪崩”。但是这种情况下,可能大部分服务提供者节点是可用的,仅仅因为网络原因无法汇报心跳给注册中心就被“无情”的摘除了。

这个时候就需要根据实际业务的情况,设定一个阈值比例,即使遇到刚才说的这种情况,注册中心也不能摘除超过这个阈值比例的节点

这个阈值比例可以根据实际业务的冗余度来确定,我通常会把这个比例设定在 20%,就是说注册中心不能摘除超过 20% 的节点。因为大部分情况下,节点的变化不会这么频繁,只有在网络抖动或者业务明确要下线大批量节点的情况下才有可能发生。而业务明确要下线大批量节点的情况是可以预知的,这种情况下可以关闭阈值保护;而正常情况下,应该打开阈值保护,以防止网络抖动时,大批量可用的服务节点被摘除。

黑/白名单机制

通常注册中心会有多套环境,区分开发、测试、线上等环境。如果弄错了,会出现意想不到的后果,为此需要引入黑、白名单保护机制。只有添加到注册中心白名单内的 RPC Server,才能够调用注册中心的注册接口,这样的话可以避免测试环境中的节点意外跑到线上环境中去。

注册中心的实现模式

应用内注册和发现

采用应用内注册与发现的方式,最典型的案例要属 Netflix 开源的 Eureka,官方架构图如下。

img

对着这张图,我来介绍下 Eureka 的架构,它主要由三个重要的组件组成:

  • Eureka Server:注册中心的服务端,实现了服务信息注册、存储以及查询等功能。
  • 服务端的 Eureka Client:集成在服务端的注册中心 SDK,服务提供者通过调用 SDK,实现服务注册、反注册等功能。
  • 客户端的 Eureka Client:集成在客户端的注册中心 SDK,服务消费者通过调用 SDK,实现服务订阅、服务更新等功能。

应用外注册和发现

img

通过这张架构图,可以看出来使用 Consul 实现应用外服务注册和发现主要依靠三个重要的组件:

  • Consul:注册中心的服务端,实现服务注册信息的存储,并提供注册和发现服务。
  • Registrator:一个开源的第三方服务管理器项目,它通过监听服务部署的 Docker 实例是否存活,来负责服务提供者的注册和销毁。
  • Consul Template:定时从注册中心服务端获取最新的服务提供者节点列表并刷新 LB 配置(比如 Nginx 的 upstream),这样服务消费者就通过访问 Nginx 就可以获取最新的服务提供者信息。

参考资料

微服务之服务调用

通过注册中心,服务消费者和服务提供者就可以感知彼此。但是,要实现交互还必须解决通信问题。

在单体应用中,一次服务调用发生在同一台机器上的同一个进程内部,因此也被称为本地方法调用。在微服务应用中,由于服务提供者和服务消费者运行在不同物理机器上的不同进程内,因此也被称为远程方法调用,简称 RPC(Remote Procedure Call)。

RPC 是如何像本地方法调用一样,完成一次请求处理的呢?我们不妨推导一二。首先,服务消费者和服务提供者通常位于网络上两个不同地址,要想交换信息,必须先建立网络连接;建立网络连接后,如果要想识别彼此的信息,必须遵循相同的通信协议;服务提供者和服务消费者,需要采用某种方式数据传输;为了减少传输数据量,还要对数据进行压缩,即序列化。

通过以上分析,我们由此可以得知 RPC 调用需要解决四个问题:

  • 如何进行网络传输?
  • 数据传输采用什么协议?
  • 数据该如何序列化和反序列化?

网络传输方式

网络传输方式即服务提供者和服务消费者之间的数据传输采用哪种方式。是同步还是异步?是在单连接上传输,还是多路复用。

常见的网络传输方式有三种:

同步阻塞方式(BIO) - 客户端每发一次请求,服务端就生成一个线程去处理。当客户端同时发起的请求很多时,服务端需要创建很多的线程去处理每一个请求,如果达到了系统最大的线程数瓶颈,新来的请求就没法处理了。

BIO 适用于连接数比较小的业务场景,这样的话不至于系统中没有可用线程去处理请求。这种方式写的程序也比较简单直观,易于理解。

img

同步非阻塞方式 (NIO) - 客户端每发一次请求,服务端并不是每次都创建一个新线程来处理,而是通过 I/O 多路复用技术进行处理。就是把多个 I/O 的阻塞复用到同一个 select 的阻塞上,从而使系统在单线程的情况下可以同时处理多个客户端请求。这种方式的优势是开销小,不用为每个请求创建一个线程,可以节省系统开销。

NIO 适用于连接数比较多并且请求消耗比较轻的业务场景,比如聊天服务器。这种方式相比 BIO,相对来说编程比较复杂。

BIO 与 NIO 最重要的区别是数据打包和传输的方式:BIO 以流的方式处理数据,而 NIO 以块的方式处理数据

  • 面向流的 BIO 一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的 I/O 通常相当慢。
  • 面向块的 NIO 一次处理一个数据块,按块处理数据比按流处理数据要快得多。但是面向块的 NIO 缺少一些面向流的 BIO 所具有的优雅性和简单性。

img

异步非阻塞方式 (AIO) - 客户端只需要发起一个 I/O 操作然后立即返回,等 I/O 操作真正完成以后,客户端会得到 I/O 操作完成的通知,此时客户端只需要对数据进行处理就好了,不需要进行实际的 I/O 读写操作,因为真正的 I/O 读取或者写入操作已经由内核完成了。这种方式的优势是客户端无需等待,不存在阻塞等待问题。

AIO 适用于连接数比较多而且请求消耗比较重的业务场景,比如涉及 I/O 操作的相册服务器。这种方式相比另外两种,编程难度最大,程序也不易于理解。

在实际的 Java 网络通信开发中,可以使用一些成熟的网络通信框架,如:Netty 等。

通信协议

通信协议要解决的是:服务提供者和服务消费者之间以什么样的协议进行网络通信。说白了,是要解决客户端和服务端如何建立连接、管理连接以及服务端如何处理请求的问题。是采用四层 TCP、UDP 协议,还是采用七层 HTTP 协议,还是采用其他协议?例如:Dubbo 基于 TCP 通信;而 Spring Cloud 基于 HTTP REST 通信。TCP 通信方式,传输效率更高;但是,HTTP 通信方式,天然支持对外服务。

HTTP 通信是基于应用层 HTTP 协议的,而 HTTP 协议又是基于传输层 TCP 协议的。一次 HTTP 通信过程就是发起一次 HTTP 调用,而一次 HTTP 调用就会建立一个 TCP 连接,经历三次握手的过程来建立连接。完成请求后,再经历一次四次挥手的过程来断开连接。

TCP 通信的过程分为四个步骤:服务器监听客户端请求连接确认数据传输。当客户端和服务端建立网络连接后,就可以发起请求了。但网络不一定总是可靠的,经常会遇到网络闪断、连接超时、服务端宕机等各种异常,通常的处理手段有两种:链路存活检测断连重试

通过两种通信方式的对比,不难看出:HTTP 通信由于每次都要建立 TCP 连接,而建立连接又较为耗时,所以 HTTP 通信性能是不如 TCP 通信的

除了 HTTP 协议和 TCP 协议以外,一些 RPC 框架还扩展支持了一些私有协议,如 Dubbo 支持的 Dubbo 协议,这里不一一列举。

序列化方式

在微服务架构中,序列化和反序列化主要解决客户端和服务端采用哪种数据编/解码的问题。常见的序列化方式包括:XML、JSON;二进制类如:thriftprotobufhessian、JDK。

具体采用哪种序列化方式,主要取决于三个方面的因素:

  • 支持数据结构类型的丰富度 - 数据结构种类支持的自然是越多越好,这样使用者的开发体验比较友好。
  • 跨语言支持 - 序列化方式是否支持跨语言也是一个很重要的因素,否则就无法应对异构服务场景。
  • 性能 - 主要看两点,一个是序列化后的压缩比,一个是序列化的速度。以常用的 PB 序列化和 JSON 序列化协议为例来对比分析,PB 序列化的压缩比和速度都要比 JSON 序列化高很多,所以对性能和存储空间要求比较高的系统选用 PB 序列化更合适;而 JSON 序列化虽然性能要差一些,但可读性更好,更适合对外部提供服务。

更多序列化的内容可以参考:Java 序列化

微服务框架对比:

RPC REST
耦合性 强耦合 松散耦合
协议 Tcp Http、Http2
序列化 二进制(Thrift、Protobuf、Hessian、Avro、JDK 等) Xml、Json
性能
客户端 对编程语言有限制 跨语言支持更好(支持 Http 即可)
代表技术 Dubbo、Motan、Tars、gRpc、Thrift Spring Cloud

参考资料

HBase Java API 管理功能

初始化 Admin 实例

1
2
3
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();

管理命名空间

查看命名空间

1
2
3
4
TableName[] tableNames = admin.listTableNamesByNamespace("test");
for (TableName tableName : tableNames) {
System.out.println(tableName.getName());
}

创建命名空间

1
2
NamespaceDescriptor namespace = NamespaceDescriptor.create("test").build();
admin.createNamespace(namespace);

修改命名空间

1
2
3
4
NamespaceDescriptor namespace = NamespaceDescriptor.create("test")
.addConfiguration("Description", "Test Namespace")
.build();
admin.modifyNamespace(namespace);

删除命名空间

1
admin.deleteNamespace("test");

管理表

创建表

1
2
3
4
5
TableName tableName = TableName.valueOf("test:test");
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"));
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);

删除表

1
admin.deleteTable(TableName.valueOf("test:test"));

修改表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 原始表
TableName tableName = TableName.valueOf("test:test");
HColumnDescriptor columnDescriptor = new HColumnDescriptor("cf1");
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName)
.addFamily(columnDescriptor)
.setValue("Description", "Original Table");
admin.createTable(tableDescriptor, Bytes.toBytes(1L), Bytes.toBytes(10000L), 50);

// 修改表
HTableDescriptor newTableDescriptor = admin.getTableDescriptor(tableName);
HColumnDescriptor newColumnDescriptor = new HColumnDescriptor("cf2");
newTableDescriptor.addFamily(newColumnDescriptor)
.setMaxFileSize(1024 * 1024 * 1024L)
.setValue("Description", "Modified Table");

// 修改表必须先禁用再想修改
admin.disableTable(tableName);
admin.modifyTable(tableName, newTableDescriptor);

禁用表

需要注意:HBase 表在删除前,必须先禁用。

1
admin.disableTable(TableName.valueOf("test:test"));

启用表

1
admin.enableTable(TableName.valueOf("test:test"));

查看表是否有效

1
2
boolean isOk = admin.isTableAvailable(tableName);
System.out.println("Table available: " + isOk);

参考资料

HBase Java API 其他高级特性

计数器

HBase 提供了一种高级功能:计数器(counter)。HBase 计数器可以用于实时统计,无需延时较高的批量处理操作。HBase 有一种机制可以将列当作计数器:即读取并修改(其实就是一种 CAS 模式),其保证了在一次操作中的原子性。否则,用户需要对一行数据加锁,然后读取数据,再对当前数据做加法,最后写回 HBase 并释放行锁,这一系列操作会引起大量的资源竞争问题。

早期的 HBase 版本会在每次计数器更新操作调用一次 RPC 请求,新版本中可以在一次 RPC 请求中完成多个计数器的更新操作,但是多个计数器必须在同一行。

计数器使用 Shell 命令行

计数器不需要初始化,创建一个新列时初始值为 0,第一次 incr 操作返回 1。

计数器使用 incr 命令,增量可以是正数也可以是负数,但是必须是长整数 Long:

1
incr '<table>','<row>','<column>',['<increment-value>']

计数器使用的例子:

1
2
3
4
5
6
7
8
9
10
11
hbase(main):001:0> create 'counters','daily','weekly','monthly'
0 row(s) in 1.2260 seconds

hbase(main):002:0> incr 'counters','20190301','daily:hites',1
COUNTER VALUE = 1

hbase(main):003:0> incr'counters','20190301','daily:hites',1
COUNTER VALUE = 2

hbase(main):004:0> get_counter 'counters','20190301','daily:hites'
COUNTER VALUE = 2

需要注意的是,增加的参数必须是长整型 Long,如果按照错误的格式更新了计数器(如字符串格式),下次调用 incr 会得到错误的结果:

1
2
3
4
5
hbase(main):005:0> put 'counters','20190301','daily:clicks','1'
0 row(s) in 1.3250 seconds

hbase(main):006:0> incr'counters','20190301','daily:clicks',1
COUNTER VALUE = 3530822107858468865

单计数器

操作一个计数器,类似 shell 命令 incr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
HTable table  = new HTable(conf, "counters");

long cnt1 = table.incrementColumnValue(Bytes.toBytes("20190301"),
Bytes.toBytes("daily"),
Bytes.toBytes("hits"),
1L);

long cnt2 = table.incrementColumnValue(Bytes.toBytes("20190301"),
Bytes.toBytes("daily"),
Bytes.toBytes("hits"),
1L);

long current = table.incrementColumnValue(Bytes.toBytes("20190301"),
Bytes.toBytes("daily"),
Bytes.toBytes("hits"),
0);

多计数器

使用 Tableincrement() 方法可以操作一行的多个计数器,需要构建 Increment 实例,并且指定行键:

1
2
3
4
5
6
7
8
9
10
11
12
HTable table  = new HTable(conf, "counters");

Increment incr1 = new Increment(Bytes.toBytes("20190301"));
incr1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"),1);
incr1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
incr1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 2);
incr1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 2);

Result result = table.increment(incr1);
for(Cell cell : result.rawCells()) {
// ...
}

Increment 类还有一种构造器:

1
Increment(byte[] row, RowLock rowLock)

rowLock 参数可选,可以设置用户自定义锁,可以限制其他写程序操作此行,但是不保证读的操作性。

连接管理

连接管理简介

在 HBase Java API 中,Connection 类代表了一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与 zookeeper 的连接。Connection 通过 ConnectionFactory 类实例化,而连接的生命周期则由调用者管理,调用者必须显示调用 close() 来释放连接。Connection 是线程安全的。创建 Connection 实例的开销很高,因此一个进程只需要实例化一个 Connection 即可。

Table 接口用于对指定的 HBase 表进行 CRUD 操作。一般,通过 Connection 获取 Table 实例,用完后,调用 close() 释放连接。

Admin 接口主要用于创建、删除、查看、启用/禁用 HBase 表,以及一些其他管理操作。一般,通过 Connection 获取 Admin 实例,用完后,调用 close() 释放连接。

TableAdmin 实例都是轻量级且并非线程安全的。建议每个线程只实例化一个 TableAdmin 实例。

连接池

问题:HBase 为什么没有提供 Connection 的连接池来获取更好的性能?是否需要自定义 Connection 连接池?

答:不需要。官方对于 Connection 的使用说明中,明确指出:对于高并发多线程访问的应用程序,一个进程中只需要预先创建一个 Connection

问题:HBase 老版本中 HTablePool 为什么废弃?是否需要自定义 Table 的连接池?

答:不需要。Table 和 Admin 的连接本质上是复用 Connection,实例化是一个较为轻量级的操作,因此,并不需要缓存或池化。实际上,HBase Java API 官方就是这么建议的。

下面是管理 HBase 连接的一个正确编程模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 所有进程共用一个 connection 对象
connection = ConnectionFactory.createConnection(config);

// 每个线程使用单独的 table 对象
Table table = connection.getTable(TableName.valueOf("tableName"));
try {
...
} finally {
table.close();
}

Admin admin = connection.getAdmin();
try {
...
} finally {
admin.close();
}

参考资料

HBase 数据模型

HBase 是一个面向 的数据库管理系统,这里更为确切的而说,HBase 是一个面向 列族 的数据库管理系统。表 schema 仅定义列族,表具有多个列族,每个列族可以包含任意数量的列,列由多个单元格(cell)组成,单元格可以存储多个版本的数据,多个版本数据以时间戳进行区分。

HBase 逻辑存储结构

  • **Table**:Table 由 Row 和 Column 组成。
  • **Row**:Row 是列族(Column Family)的集合。
  • Row KeyRow Key 是用来检索记录的主键
    • Row Key 是未解释的字节数组,所以理论上,任何数据都可以通过序列化表示成字符串或二进制,从而存为 HBase 的键值。
    • 表中的行,是按照 Row Key 的字典序进行排序。这里需要注意以下两点:
      • 因为字典序对 Int 排序的结果是 1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。如果你使用整型的字符串作为行键,那么为了保持整型的自然序,行键必须用 0 作左填充。
      • 行的一次读写操作是原子性的 (不论一次读写多少列)。
    • 所有对表的访问都要通过 Row Key,有以下三种方式:
      • 通过指定的 Row Key 进行访问;
      • 通过 Row Key 的 range 进行访问,即访问指定范围内的行;
      • 进行全表扫描。
  • **Column Family**:即列族。HBase 表中的每个列,都归属于某个列族。列族是表的 Schema 的一部分,所以列族需要在创建表时进行定义。
    • 一个表的列族必须作为表模式定义的一部分预先给出,但是新的列族成员可以随后按需加入。
    • 同一个列族的所有成员具有相同的前缀,例如 info:formatinfo:geo 都属于 info 这个列族。
  • **Column Qualifier**:列限定符。可以理解为是具体的列名,例如 info:formatinfo:geo 都属于 info 这个列族,它们的列限定符分别是 formatgeo。列族和列限定符之间始终以冒号分隔。需要注意的是列限定符不是表 Schema 的一部分,你可以在插入数据的过程中动态创建列。
  • **Column**:HBase 中的列由列族和列限定符组成,由 :(冒号) 进行分隔,即一个完整的列名应该表述为 列族名 :列限定符
  • **Cell**:Cell 是行,列族和列限定符的组合,并包含值和时间戳。HBase 中通过 row keycolumn 确定的为一个存储单元称为 Cell,你可以等价理解为关系型数据库中由指定行和指定列确定的一个单元格,但不同的是 HBase 中的一个单元格是由多个版本的数据组成的,每个版本的数据用时间戳进行区分。
    - Cell 由行和列的坐标交叉决定,是有版本的。默认情况下,版本号是自动分配的,为 HBase 插入 Cell 时的时间戳。Cell 的内容是未解释的字节数组。

  • **Timestamp**:Cell 的版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase 在数据写入时自动赋值,也可以由客户显式指定。每个 Cell 中,不同版本的数据按照时间戳倒序排列,即最新的数据排在最前面。

img

HBase 物理存储结构

HBase 自动将表水平划分成区域(Region)。每个 Region 由表中 Row 的子集构成。每个 Region 由它所属的表的起始范围来表示(包含的第一行和最后一行)。初始时,一个表只有一个 Region,随着 Region 膨胀,当超过一定阈值时,会在某行的边界上分裂成两个大小基本相同的新 Region。在第一次划分之前,所有加载的数据都放在原始 Region 所在的那台服务器上。随着表变大,Region 个数也会逐渐增加。Region 是在 HBase 集群上分布数据的最小单位。

HBase 数据模型示例

下图为 HBase 中一张表的:

  • RowKey 为行的唯一标识,所有行按照 RowKey 的字典序进行排序;
  • 该表具有两个列族,分别是 personal 和 office;
  • 其中列族 personal 拥有 name、city、phone 三个列,列族 office 拥有 tel、addres 两个列。

img

图片引用自 : HBase 是列式存储数据库吗 https://www.iteblog.com/archives/2498.html

HBase 表特性

Hbase 的表具有以下特点:

  • 容量大:一个表可以有数十亿行,上百万列;
  • 面向列:数据是按照列存储,每一列都单独存放,数据即索引,在查询时可以只访问指定列的数据,有效地降低了系统的 I/O 负担;
  • 稀疏性:空 (null) 列并不占用存储空间,表可以设计的非常稀疏 ;
  • 数据多版本:每个单元中的数据可以有多个版本,按照时间戳排序,新的数据在最上面;
  • 存储类型:所有数据的底层存储格式都是字节数组 (byte[])。

参考资料

HBase Java API 高级特性之协处理器

简述

在使用 HBase 时,如果你的数据量达到了数十亿行或数百万列,此时能否在查询中返回大量数据将受制于网络的带宽,即便网络状况允许,但是客户端的计算处理也未必能够满足要求。在这种情况下,协处理器(Coprocessors)应运而生。它允许你将业务计算代码放入在 RegionServer 的协处理器中,将处理好的数据再返回给客户端,这可以极大地降低需要传输的数据量,从而获得性能上的提升。同时协处理器也允许用户扩展实现 HBase 目前所不具备的功能,如权限校验、二级索引、完整性约束等。

参考资料

HBase Java API 高级特性之过滤器

HBase 中两种主要的数据读取方法是 get()scan(),它们都支持直接访问数据和通过指定起止 row key 访问数据。此外,可以指定列族、列、时间戳和版本号来进行条件查询。它们的缺点是不支持细粒度的筛选功能。为了弥补这种不足,GetScan 支持通过过滤器(Filter)对 row key、列或列值进行过滤。

HBase 提供了一些内置过滤器,也允许用户通过继承 Filter 类来自定义过滤器。所有的过滤器都在服务端生效,称为 谓词下推。这样可以保证被过滤掉的数据不会被传到客户端。

图片来自 HBase 权威指南

HBase 过滤器层次结构的最底层是 Filter 接口和 FilterBase 抽象类。大部分过滤器都直接继承自 FilterBase

比较过滤器

所有比较过滤器均继承自 CompareFilterCompareFilterFilterBase 多了一个 compare() 方法,它需要传入参数定义比较操作的过程:比较运算符和比较器。

创建一个比较过滤器需要两个参数,分别是比较运算符比较器实例

1
2
3
4
public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {
this.compareOp = compareOp;
this.comparator = comparator;
}

比较运算符

  • LESS (<)
  • LESS_OR_EQUAL (<=)
  • EQUAL (=)
  • NOT_EQUAL (!=)
  • GREATER_OR_EQUAL (>=)
  • GREATER (>)
  • NO_OP (排除所有符合条件的值)

比较运算符均定义在枚举类 CompareOperator

1
2
3
4
5
6
7
8
9
10
@InterfaceAudience.Public
public enum CompareOperator {
LESS,
LESS_OR_EQUAL,
EQUAL,
NOT_EQUAL,
GREATER_OR_EQUAL,
GREATER,
NO_OP,
}

注意:在 1.x 版本的 HBase 中,比较运算符定义在 CompareFilter.CompareOp 枚举类中,但在 2.0 之后这个类就被标识为 @deprecated ,并会在 3.0 移除。所以 2.0 之后版本的 HBase 需要使用 CompareOperator 这个枚举类。

比较器

所有比较器均继承自 ByteArrayComparable 抽象类,常用的有以下几种:

  • BinaryComparator : 使用 Bytes.compareTo(byte [],byte []) 按字典序比较指定的字节数组。
  • BinaryPrefixComparator : 按字典序与指定的字节数组进行比较,但只比较到这个字节数组的长度。
  • RegexStringComparator : 使用给定的正则表达式与指定的字节数组进行比较。仅支持 EQUALNOT_EQUAL 操作。
  • SubStringComparator : 测试给定的子字符串是否出现在指定的字节数组中,比较不区分大小写。仅支持 EQUALNOT_EQUAL 操作。
  • NullComparator :判断给定的值是否为空。
  • BitComparator :按位进行比较。

BinaryPrefixComparatorBinaryComparator 的区别不是很好理解,这里举例说明一下:

在进行 EQUAL 的比较时,如果比较器传入的是 abcd 的字节数组,但是待比较数据是 abcdefgh

  • 如果使用的是 BinaryPrefixComparator 比较器,则比较以 abcd 字节数组的长度为准,即 efgh 不会参与比较,这时候认为 abcdabcdefgh 是满足 EQUAL 条件的;
  • 如果使用的是 BinaryComparator 比较器,则认为其是不相等的。

比较过滤器种类

比较过滤器共有五个(Hbase 1.x 版本和 2.x 版本相同):

  • RowFilter :基于行键来过滤数据;
  • FamilyFilterr :基于列族来过滤数据;
  • QualifierFilterr :基于列限定符(列名)来过滤数据;
  • ValueFilterr :基于单元格 (cell) 的值来过滤数据;
  • DependentColumnFilter :指定一个参考列来过滤其他列的过滤器,过滤的原则是基于参考列的时间戳来进行筛选 。

前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例即可构建,然后通过 setFilter 方法传递给 scan

1
2
3
Filter filter  = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
scan.setFilter(filter);

DependentColumnFilter 的使用稍微复杂一点,这里单独做下说明。

DependentColumnFilter

可以把 DependentColumnFilter 理解为一个 valueFilter 和一个时间戳过滤器的组合DependentColumnFilter 有三个带参构造器,这里选择一个参数最全的进行说明:

1
2
3
DependentColumnFilter(final byte [] family, final byte[] qualifier,
final boolean dropDependentColumn, final CompareOperator op,
final ByteArrayComparable valueComparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropDependentColumn :决定参考列是否被包含在返回结果内,为 true 时表示参考列被返回,为 false 时表示被丢弃
  • op :比较运算符
  • valueComparator :比较器

这里举例进行说明:

1
2
3
4
5
6
DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(
Bytes.toBytes("student"),
Bytes.toBytes("name"),
false,
CompareOperator.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
  • 首先会去查找 student:name 中值以 xiaolan 开头的所有数据获得 参考数据集,这一步等同于 valueFilter 过滤器;
  • 其次再用参考数据集中所有数据的时间戳去检索其他列,获得时间戳相同的其他列的数据作为 结果数据集,这一步等同于时间戳过滤器;
  • 最后如果 dropDependentColumn 为 true,则返回 参考数据集+结果数据集,若为 false,则抛弃参考数据集,只返回 结果数据集

专用过滤器

专用过滤器通常直接继承自 FilterBase,用于更特定的场景。

单列列值过滤器 (SingleColumnValueFilter)

基于某列(参考列)的值决定某行数据是否被过滤。其实例有以下方法:

  • setFilterIfMissing(boolean filterIfMissing) :默认值为 false,即如果该行数据不包含参考列,其依然被包含在最后的结果中;设置为 true 时,则不包含;
  • setLatestVersionOnly(boolean latestVersionOnly) :默认为 true,即只检索参考列的最新版本数据;设置为 false,则检索所有版本数据。
1
2
3
4
5
6
7
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"student".getBytes(),
"name".getBytes(),
CompareOperator.EQUAL,
new SubstringComparator("xiaolan"));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);

单列列值排除器 (SingleColumnValueExcludeFilter)

SingleColumnValueExcludeFilter 继承自上面的 SingleColumnValueFilter,过滤行为与其相反。

行键前缀过滤器 (PrefixFilter)

基于 RowKey 值决定某行数据是否被过滤。

1
2
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(prefixFilter);

列名前缀过滤器 (ColumnPrefixFilter)

基于列限定符(列名)决定某行数据是否被过滤。

1
2
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(columnPrefixFilter);

分页过滤器 (PageFilter)

可以使用这个过滤器实现对结果按行进行分页,创建 PageFilter 实例的时候需要传入每页的行数。

1
2
3
4
public PageFilter(final long pageSize) {
Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
this.pageSize = pageSize;
}

下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:

客户端进行分页查询,需要传递 startRow(起始 RowKey),知道起始 startRow 后,就可以返回对应的 pageSize 行数据。这里唯一的问题就是,对于第一次查询,显然 startRow 就是表格的第一行数据,但是之后第二次、第三次查询我们并不知道 startRow,只能知道上一次查询的最后一条数据的 RowKey(简单称之为 lastRow)。

我们不能将 lastRow 作为新一次查询的 startRow 传入,因为 scan 的查询区间是[startRow,endRow) ,即前开后闭区间,这样 startRow 在新的查询也会被返回,这条数据就重复了。

同时在不使用第三方数据库存储 RowKey 的情况下,我们是无法通过知道 lastRow 的下一个 RowKey 的,因为 RowKey 的设计可能是连续的也有可能是不连续的。

由于 Hbase 的 RowKey 是按照字典序进行排序的。这种情况下,就可以在 lastRow 后面加上 0 ,作为 startRow 传入,因为按照字典序的规则,某个值加上 0 后的新值,在字典序上一定是这个值的下一个值,对于 HBase 来说下一个 RowKey 在字典序上一定也是等于或者大于这个新值的。

所以最后传入 lastRow+0,如果等于这个值的 RowKey 存在就从这个值开始 scan,否则从字典序的下一个 RowKey 开始 scan。

25 个字母以及数字字符,字典排序如下:

1
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分页查询主要实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(15);

int totalRows = 0;
byte[] lastRow = null;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 如果不是首行 则 lastRow + 0
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " +
Bytes.toStringBinary(startRow));
scan.withStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
//最后一页,查询结束
if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);

需要注意的是在多台 Regin Services 上执行分页过滤的时候,由于并行执行的过滤器不能共享它们的状态和边界,所以有可能每个过滤器都会在完成扫描前获取了 PageCount 行的结果,这种情况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。

时间戳过滤器 (TimestampsFilter)

1
2
3
4
List<Long> list = new ArrayList<>();
list.add(1554975573000L);
TimestampsFilter timestampsFilter = new TimestampsFilter(list);
scan.setFilter(timestampsFilter);

首次行键过滤器 (FirstKeyOnlyFilter)

FirstKeyOnlyFilter 只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,通常用于行数统计的场景,因为如果某一行存在,则行中必然至少有一列。

1
2
FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();
scan.set(firstKeyOnlyFilter);

包装过滤器

包装过滤器就是通过包装其他过滤器以实现某些拓展的功能。

SkipFilter 过滤器

SkipFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,则拓展过滤整行数据。下面是一个使用示例:

1
2
3
4
5
// 定义 ValueFilter 过滤器
Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
// 使用 SkipFilter 进行包装
Filter filter2 = new SkipFilter(filter1);

WhileMatchFilter 过滤器

WhileMatchFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,WhileMatchFilter 则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("rowKey4")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner1.close();

System.out.println("--------------------");

// 使用 WhileMatchFilter 进行包装
Filter filter2 = new WhileMatchFilter(filter1);

scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0
rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0
rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0
rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0
rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0
--------------------
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0

可以看到被包装后,只返回了 rowKey4 之前的数据。

FilterList

以上都是讲解单个过滤器的作用,当需要多个过滤器共同作用于一次查询的时候,就需要使用 FilterListFilterList 支持通过构造器或者 addFilter 方法传入多个过滤器。

1
2
3
4
5
6
7
8
// 构造器传入
public FilterList(final Operator operator, final List<Filter> filters)
public FilterList(final List<Filter> filters)
public FilterList(final Filter... filters)

// 方法传入
public void addFilter(List<Filter> filters)
public void addFilter(Filter filter)

多个过滤器组合的结果由 operator 参数定义 ,其可选参数定义在 Operator 枚举类中。只有 MUST_PASS_ALLMUST_PASS_ONE 两个可选的值:

  • MUST_PASS_ALL :相当于 AND,必须所有的过滤器都通过才认为通过;
  • MUST_PASS_ONE :相当于 OR,只有要一个过滤器通过则认为通过。
1
2
3
4
5
6
7
@InterfaceAudience.Public
public enum Operator {
/** !AND */
MUST_PASS_ALL,
/** !OR */
MUST_PASS_ONE
}

使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<Filter> filters = new ArrayList<Filter>();

Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("XXX")));
filters.add(filter1);

Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("YYY")));
filters.add(filter2);

Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
new RegexStringComparator("ZZZ"));
filters.add(filter3);

FilterList filterList = new FilterList(filters);

Scan scan = new Scan();
scan.setFilter(filterList);

参考资料

HBase Schema 设计

HBase Schema 设计要素

  • 这个表应该有多少 Column Family
  • Column Family 使用什么数据
  • 每个 Column Family 有有多少列
  • 列名是什么,尽管列名不必在建表时定义,但读写数据是要知道的
  • 单元应该存放什么数据
  • 每个单元存储多少时间版本
  • 行健(rowKey)结构是什么,应该包含什么信息

Row Key 设计

Row Key 的作用

在 HBase 中,所有对表的访问都要通过 Row Key,有三种访问方式:

  • 使用 get 命令,查询指定的 Row Key,即精确查找。
  • 使用 scan 命令,根据 Row Key 进行范围查找。
  • 全表扫描,即直接扫描表中所有行记录。

此外,在 HBase 中,表中的行,是按照 Row Key 的字典序进行排序的。

由此,可见,Row Key 的良好设计对于 HBase CRUD 的性能至关重要。

Row Key 的设计原则

长度原则

RowKey 是一个二进制码流,可以是任意字符串,最大长度为 64kb,实际应用中一般为 10-100byte,以 byte[]形式保存,一般设计成定长。建议越短越好,不要超过 16 个字节,原因如下:

  1. 数据的持久化文件 HFile 中时按照 Key-Value 存储的,如果 RowKey 过长,例如超过 100byte,那么 1000w 行的记录,仅 RowKey 就需占用近 1GB 的空间。这样会极大影响 HFile 的存储效率。
  2. MemStore 会缓存部分数据到内存中,若 RowKey 字段过长,内存的有效利用率就会降低,就不能缓存更多的数据,从而降低检索效率。
  3. 目前操作系统都是 64 位系统,内存 8 字节对齐,控制在 16 字节,8 字节的整数倍利用了操作系统的最佳特性。

唯一原则

必须在设计上保证 RowKey 的唯一性。由于在 HBase 中数据存储是 Key-Value 形式,若向 HBase 中同一张表插入相同 RowKey 的数据,则原先存在的数据会被新的数据覆盖。

排序原则

HBase 的 RowKey 是按照 ASCII 有序排序的,因此我们在设计 RowKey 的时候要充分利用这点。

散列原则

设计的 RowKey 应均匀的分布在各个 HBase 节点上。

热点问题

Region 是在 HBase 集群上分布数据的最小单位。每个 Region 由它所属的表的起始范围来表示(即起始 Row Key 和结束 Row Key)。

如果,Row Key 使用单调递增的整数或时间戳,就会产生一个问题:因为 Hbase 的 Row Key 是就近存储的,这会导致一段时间内大部分读写集中在某一个 Region 或少数 Region 上(根据二八原则,最近产生的数据,往往是读写频率最高的数据),即所谓 热点问题

反转(Reversing)

第一种咱们要分析的方法是反转,顾名思义它就是把固定长度或者数字格式的 RowKey 进行反转,反转分为一般数据反转和时间戳反转,其中以时间戳反转较常见。

  • 反转固定格式的数值 - 以手机号为例,手机号的前缀变化比较少(如 152、185 等),但后半部分变化很多。如果将它反转过来,可以有效地避免热点。不过其缺点就是失去了有序性。
  • 反转时间 - 如果数据访问以查找最近的数据为主,可以将时间戳存储为反向时间戳(例如: timestamp = Long.MAX_VALUE – timestamp),这样有利于扫描最近的数据。

加盐(Salting)

这里的“加盐”与密码学中的“加盐”不是一回事。它是指在 RowKey 的前面增加一些前缀,加盐的前缀种类越多,RowKey 就被打得越散。

需要注意的是分配的随机前缀的种类数量应该和我们想把数据分散到的那些 region 的数量一致。只有这样,加盐之后的 rowkey 才会根据随机生成的前缀分散到各个 region 中,避免了热点现象。

哈希(Hashing)

其实哈希和加盐的适用场景类似,但我们前缀不可以是随机的,因为必须要让客户端能够完整地重构 RowKey。所以一般会拿原 RowKey 或其一部分计算 Hash 值,然后再对 Hash 值做运算作为前缀。

HBase Schema 设计规则

Column Family 设计

HBase 不能很好处理 2 ~ 3 个以上的 Column Family,所以 HBase 表应尽可能减少 Column Family 数。如果可以,请只使用一个列族,只有需要经常执行 Column 范围查询时,才引入多列族。也就是说,尽量避免同时查询多个列族。

  • Column Family 数量多,会影响数据刷新。HBase 的数据刷新是在每个 Region 的基础上完成的。因此,如果一个 Column Family 携带大量导致刷新的数据,那么相邻的列族即使携带的数据量很小,也会被刷新。当存在许多 Column Family 时,刷新交互会导致一堆不必要的 IO。 此外,在表/区域级别的压缩操作也会在每个存储中发生。
  • Column Family 数量多,会影响查找效率。如:Column Family A 有 100 万行,Column Family B 有 10 亿行,那么 Column Family A 的数据可能会分布在很多很多区域(和 RegionServers)。 这会降低 Column Family A 的批量扫描效率。

Column Family 名尽量简短,最好是一个字符。Column Family 会在列限定符中被频繁使用,缩短长度有利于节省空间并提升效率。

Row 设计

HBase 中的 Row 按 Row Key 的字典顺序排序

  • 不要将 Row Key 设计为单调递增的,例如:递增的整数或时间戳

    • 问题:因为 Hbase 的 Row Key 是就近存储的,这样会导致一段时间内大部分写入集中在某一个 Region 上,即所谓热点问题。

    • 解决方法一、加盐:这里的不是指密码学的加盐,而是指将随机分配的前缀添加到行键的开头。这么做是为了避免相同前缀的 Row Key 数据被存储在相邻位置,从而导致热点问题。示例如下:

      • foo0001
        foo0002
        foo0003
        foo0004
        
        改为
        
        a-foo0003
        b-foo0001
        c-foo0003
        c-foo0004
        d-foo0002
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29

        - 解决方法二、Hash:Row Key 的前缀使用 Hash

        - **尽量减少行和列的长度**

        - **反向时间戳**:反向时间戳可以极大地帮助快速找到值的最新版本。

        - **行健不能改变**:唯一可以改变的方式是先删除后插入。

        - **Row Key 和 Column Family**:Row Key 从属于 Column Family,因此,相同的 Row Key 可以存在每一个 Column Family 中而不会出现冲突。

        ### Version 设计

        最大、最小 Row 版本号:表示 HBase 会保留的版本号数的上下限。均可以通过 HColumnDescriptor 对每个列族进行配置

        Row 版本号过大,会大大增加 StoreFile 的大小;所以,最大 Row 版本号应按需设置。HBase 会在主要压缩时,删除多余的版本。

        ### TTL 设计

        Column Family 会设置一个以秒为单位的 TTL,一旦达到 TTL 时,HBase 会自动删除行记录。

        仅包含过期行的存储文件在次要压缩时被删除。 将 hbase.store.delete.expired.storefile 设置为 false 会禁用此功能。将最小版本数设置为 0 以外的值也会禁用此功能。

        在较新版本的 HBase 上,还支持在 Cell 上设置 TTL,与 Column Family 的 TTL 不同的是,单位是毫秒。

        ### Column Family 属性配置

        - HFile 数据块,默认是 64KB,数据库的大小影响数据块索引的大小。数据块大的话一次加载进内存的数据越多,扫描查询效果越好。但是数据块小的话,随机查询性能更好

create ‘mytable’,{NAME => ‘cf1’, BLOCKSIZE => ‘65536’}
复制代码

1
2
3

- 数据块缓存,数据块缓存默认是打开的,如果一些比较少访问的数据可以选择关闭缓存

create ‘mytable’,{NAME => ‘cf1’, BLOCKCACHE => ‘FALSE’}
复制代码

1
2
3

- 数据压缩,压缩会提高磁盘利用率,但是会增加 CPU 的负载,看情况进行控制

create ‘mytable’,{NAME => ‘cf1’, COMPRESSION => ‘SNAPPY’}
复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Hbase 表设计是和需求相关的,但是遵守表设计的一些硬性指标对性能的提升还是很有帮助的,这里整理了一些设计时用到的要点。

## Schema 设计案例

### 案例:日志数据和时序数据

假设采集以下数据

- Hostname
- Timestamp
- Log event
- Value/message

应该如何设计 Row Key?

1TimestampRow Key 头部

如果 Row Key 设计为 `[timestamp][hostname][log-event]` 形式,会出现热点问题。

如果针对时间的扫描很重要,可以采用时间戳分桶策略,即

bucket = timestamp % bucketNum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

计算出桶号后,将 Row Key 指定为:`[bucket][timestamp][hostname][log-event]`

如上所述,要为特定时间范围选择数据,需要对每个桶执行扫描。 例如,100 个桶将在键空间中提供广泛的分布,但需要 100 次扫描才能获取单个时间戳的数据,因此需要权衡取舍。

2)Hostname 在 Row Key 头部

如果主机样本量很大,将 Row Key 设计为 `[hostname][log-event][timestamp]`,这样有利于扫描 hostname。

3Timestamp 还是反向 Timestamp

如果数据访问以查找最近的数据为主,可以将时间戳存储为反向时间戳(例如: `timestamp = Long.MAX_VALUE – timestamp`),这样有利于扫描最近的数据。

4Row Key 是可变长度还是固定长度

拼接 Row Key 的关键字长度不一定是固定的,例如 hostname 有可能很长,也有可能很短。如果想要统一长度,可以参考以下做法:

- 将关键字 Hash 编码:使用某种 Hash 算法计算关键字,并取固定长度的值(例如:8 位或 16 位)。
- 使用数字替代关键字:例如:使用事件类型 Code 替换事件类型;hostname 如果是 IP,可以转换为 long
- 截取关键字:截取后的关键字需要有足够的辨识度,长度大小根据具体情况权衡。

5)时间分片

[hostname][log-event][timestamp1]
[hostname][log-event][timestamp2]
[hostname][log-event][timestamp3]

1
2
3

上面的例子中,每个详细事件都有单独的行键,可以重写如下,即每个时间段存储一次:

[hostname][log-event][timerange]


## 参考资料

- [HBase 官方文档之 HBase and Schema Design](https://hbase.apache.org/book.html#schema)

HBase Java API 基础特性

HBase Client API

HBase Java API 示例

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
public class HBaseUtils {

private static Connection connection;

static {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 如果是集群 则主机名用逗号分隔
configuration.set("hbase.zookeeper.quorum", "hadoop001");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 创建 HBase 表
*
* @param tableName 表名
* @param columnFamilies 列族的数组
*/
public static boolean createTable(String tableName, List<String> columnFamilies) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
columnFamilies.forEach(columnFamily -> {
ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
cfDescriptorBuilder.setMaxVersions(1);
ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build();
tableDescriptor.setColumnFamily(familyDescriptor);
});
admin.createTable(tableDescriptor.build());
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除 hBase 表
*
* @param tableName 表名
*/
public static boolean deleteTable(String tableName) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 删除表前需要先禁用表
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}

/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param qualifier 列标识
* @param value 数据
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
String value) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param pairList 列标识和值的集合
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 根据 rowKey 获取指定行的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static Result getRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 获取指定行指定列 (cell) 的最新版本的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamily 列族
* @param qualifier 列标识
*/
public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (!get.isCheckExistenceOnly()) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = table.get(get);
byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
return Bytes.toString(resultValue);
} else {
return null;
}

} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索全表
*
* @param tableName 表名
*/
public static ResultScanner getScanner(String tableName) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索表中指定数据
*
* @param tableName 表名
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 检索表中指定数据
*
* @param tableName 表名
* @param startRowKey 起始 RowKey
* @param endRowKey 终止 RowKey
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 删除指定行记录
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static boolean deleteRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除指定行指定列
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param familyName 列族
* @param qualifier 列标识
*/
public static boolean deleteColumn(String tableName, String rowKey, String familyName,
String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
table.delete(delete);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}

}

数据库连接

在上面的代码中,在类加载时就初始化了 Connection 连接,并且之后的方法都是复用这个 Connection,这时我们可能会考虑是否可以使用自定义连接池来获取更好的性能表现?实际上这是没有必要的。

首先官方对于 Connection 的使用说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Connection Pooling For applications which require high-end multithreaded
access (e.g., web-servers or application servers that may serve many
application threads in a single JVM), you can pre-create a Connection,
as shown in the following example:

对于高并发多线程访问的应用程序(例如,在单个 JVM 中存在的为多个线程服务的 Web 服务器或应用程序服务器),
您只需要预先创建一个 Connection。例子如下:

// Create a connection to the cluster.
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tablename))) {
// use table as needed, the table returned is lightweight
}

之所以能这样使用,这是因为 Connection 并不是一个简单的 socket 连接,接口文档 中对 Connection 的表述是:

1
2
3
4
5
6
7
A cluster connection encapsulating lower level individual connections to actual servers and a
connection to zookeeper. Connections are instantiated through the ConnectionFactory class.
The lifecycle of the connection is managed by the caller, who has to close() the connection
to release the resources.

Connection 是一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与 zookeeper 的连接。
连接通过 ConnectionFactory 类实例化。连接的生命周期由调用者管理,调用者必须使用 close() 关闭连接以释放资源。

之所以封装这些连接,是因为 HBase 客户端需要连接三个不同的服务角色:

  • Zookeeper :主要用于获取 meta 表的位置信息,Master 的信息;
  • HBase Master :主要用于执行 HBaseAdmin 接口的一些操作,例如建表等;
  • HBase RegionServer :用于读、写数据。

Connection 对象和实际的 Socket 连接之间的对应关系如下图:

在 HBase 客户端代码中,真正对应 Socket 连接的是 RpcConnection 对象。HBase 使用 PoolMap 这种数据结构来存储客户端到 HBase 服务器之间的连接。PoolMap 的内部有一个 ConcurrentHashMap 实例,其 key 是 ConnectionId(封装了服务器地址和用户 ticket),value 是一个 RpcConnection 对象的资源池。当 HBase 需要连接一个服务器时,首先会根据 ConnectionId 找到对应的连接池,然后从连接池中取出一个连接对象。

1
2
3
4
5
6
7
8
9
10
11
12
@InterfaceAudience.Private
public class PoolMap<K, V> implements Map<K, V> {
private PoolType poolType;

private int poolMaxSize;

private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();

public PoolMap(PoolType poolType) {
this.poolType = poolType;
}
.....

HBase 中提供了三种资源池的实现,分别是 ReusableRoundRobinThreadLocal。具体实现可以通 hbase.client.ipc.pool.type 配置项指定,默认为 Reusable。连接池的大小也可以通过 hbase.client.ipc.pool.size 配置项指定,默认为 1,即每个 Server 1 个连接。也可以通过修改配置实现:

1
2
3
config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);

由此可以看出 HBase 中 Connection 类已经实现了对连接的管理功能,所以我们不必在 Connection 上在做额外的管理。

另外,Connection 是线程安全的,但 Table 和 Admin 却不是线程安全的,因此正确的做法是一个进程共用一个 Connection 对象,而在不同的线程中使用单独的 Table 和 Admin 对象。Table 和 Admin 的获取操作 getTable()getAdmin() 都是轻量级,所以不必担心性能的消耗,同时建议在使用完成后显示的调用 close() 方法来关闭它们。

概述

HBase 的主要客户端操作是由 org.apache.hadoop.hbase.client.HTable 提供的。创建 HTable 实例非常耗时,所以,建议每个线程只创建一次 HTable 实例。

HBase 所有修改数据的操作都保证了行级别的原子性。要么读到最新的修改,要么等待系统允许写入改行修改

用户要尽量使用批处理(batch)更新来减少单独操作同一行数据的次数

写操作中设计的列的数目并不会影响该行数据的原子性,行原子性会同时保护到所有列

创建 HTable 实例(指的是在 java 中新建该类),每个实例都要扫描.META. 表,以检查该表是否存在,推荐用户只创建一次 HTable 实例,而且是每个线程创建一个

如果用户需要多个 HTable 实例,建议使用 HTablePool 类(类似连接池)

CRUD 操作

put

Table 接口提供了两个 put 方法

1
2
3
4
// 写入单行 put
void put(Put put) throws IOException;
// 批量写入 put
void put(List<Put> puts) throws IOException;

Put 类提供了多种构造器方法用来初始化实例。

Put 类还提供了一系列有用的方法:

多个 add 方法:用于添加指定的列数据。

has 方法:用于检查是否存在特定的单元格,而不需要遍历整个集合

getFamilyMap 方法:可以遍历 Put 实例中每一个可用的 KeyValue 实例

getRow 方法:用于获取 rowkey
Put.heapSize() 可以计算当前 Put 实例所需的堆大小,既包含其中的数据,也包含内部数据结构所需的空间

KeyValue 类

特定单元格的数据以及坐标,坐标包括行键、列族名、列限定符以及时间戳
KeyValue(byte[] row, int roffset, int rlength, byte[] family, int foffoset, int flength, byte[] qualifier, int qoffset, int qlength, long timestamp, Type type, byte[] value, int voffset, int vlength)
每一个字节数组都有一个 offset 参数和一个 length 参数,允许用户提交一个已经存在的字节数组进行字节级别操作。
行目前来说指的是行键,即 Put 构造器里的 row 参数。

客户端的写缓冲区

每一个 put 操作实际上都是一个 RPC 操作,它将客户端数据传送到服务器然后返回。

HBase 的 API 配备了一个客户端的写缓冲区,缓冲区负责收集 put 操作,然后调用 RPC 操作一次性将 put 送往服务器。

1
2
void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()

默认情况下,客户端缓冲区是禁用的。可以通过 table.setAutoFlush(false) 来激活缓冲区。

Put 列表

批量提交 put 列表:

1
void put(List<Put> puts) throws IOException

注意:批量提交可能会有部分修改失败。

原子性操作 compare-and-set

checkAndPut 方法提供了 CAS 机制来保证 put 操作的原子性。

get

1
Result get(Get get) throws IOException
1
2
3
4
Get(byte[] row)
Get(byte[] row, RowLock rowLock)
Get addColumn(byte[] family, byte[] qualifier)
Get addFamily(byte[] family)

Result 类

当用户使用 get() 方法获取数据,HBase 返回的结果包含所有匹配的单元格数据,这些数据被封装在一个 Result 实例中返回给用户。

Result 类提供的方法如下:

1
2
3
4
5
6
7
byte[] getValue(byte[] family, byte[] qualifier)
byte[] value()
byte[] getRow()
int size()
boolean isEmpty()
KeyValue[] raw()
List<KeyValue> list()

delete

1
void delete(Delete delete) throws IOException
1
2
Delte(byte[] row)
Delete(byte[] row, long timestamp, RowLock rowLock)
1
2
3
4
Delete deleteFamily(byte[] family)
Delete deleteFamily(byte[] family, long timestamp)
Delete deleteColumns(byte[] family, byte[] qualifier)
Delete deleteColumn(byte[] family, byte[] qualifier) // 只删除最新版本

批处理操作

Row 是 Put、Get、Delete 的父类。

1
2
void batch(List<Row> actions, Object[] results) throws IOException, InterruptedException
Object batch(List<Row> actions) throws IOException, InterruptedException

行锁

region 服务器提供了行锁特性,这个特性保证了只有一个客户端能获取一行数据相应的锁,同时对该行进行修改。

如果不显示指定锁,服务器会隐式加锁。

扫描

scan,类似数据库系统中的 cursor,利用了 HBase 提供的底层顺序存储的数据结构。

调用 HTable 的 getScanner 就可以返回扫描器

1
2
ResultScanner getScanner(Scan scan) throws IOException
ResultScanner getScanner(byte[] family) throws IOException

Scan 类构造器可以有 startRow,区间一般为 [startRow, stopRow)

1
2
Scan(byte[] startRow, Filter filter)
Scan(byte[] startRow)

ResultScanner

以行为单位进行返回

1
2
3
Result next() throws IOException
Result[] next(int nbRows) throws IOException
void close()

缓存与批量处理

每一个 next()调用都会为每行数据生成一个单独的 RPC 请求

可以设置扫描器缓存

1
2
void setScannerCaching(itn scannerCaching)
int getScannerCaching()

缓存是面向行一级操作,批量是面向列一级操作

1
2
void setBatch(int batch)
int getBatch

RPC 请求的次数=(行数*每行列数)/Min(每行的列数,批量大小)/扫描器缓存

各种特性

Bytes 类提供了一系列将原生 Java 类型和字节数组互转的方法。

参考资料

《大规模数据处理实战》笔记

00 丨开篇词丨从这里开始,带你走上硅谷一线系统架构师之路

01 丨为什么 MapReduce 会被硅谷一线公司淘汰?

高昂的维护成本

时间性能“达不到”用户的期待

02 | MapReduce 后谁主沉浮:怎样设计下一代数据处理技术?

03 | 大规模数据处理初体验:怎样实现大型电商热销榜?

不同量级 TOP K 算法的解决方案不同:

小规模:Hash 即可

大规模:由于单机的处理量不足以处理全量数据,势必分而治之:分片统计,然后聚合(即先 map 后 reduce)

04 丨分布式系统(上):学会用服务等级协议 SLA 来评估你的系统

SLA(Service-Level Agreement),也就是服务等级协议,指的是系统服务提供者(Provider)对客户(Customer)的一个服务承诺。

可用性:大厂一般要求可用性至少达到四个 9(即 99.99%)

准确性:准确率= 正确的有效请求数 / 有效的总请求数

系统容量:通常通过 QPS (Queries Per Second)来衡量

延迟:请求和响应的时间间隔

05 丨分布式系统(下):架构师不得不知的三大指标

  • 可扩展性(Scalability)
    • 水平扩展(Horizontal Scaling)
    • 垂直扩展(Vertical Scaling)
  • 一致性(Consistency)
    • 强一致性(Strong Consistency):系统中的某个数据被成功更新后,后续任何对该数据的读取操作都将得到更新
      后的值。所以在任意时刻,同一系统所有节点中的数据是一样的。
    • 弱一致性(Weak Consistency):系统中的某个数据被更新后,后续对该数据的读取操作可能得到更新后的值,
      也可能是更改前的值。但经过“不一致时间窗口”这段时间后,后续对该数据的读取都是更新后的值。
    • 最终一致性(Eventual Consistency):是弱一致性的特殊形式。存储系统保证,在没有新的更新的条件下,最终所有的访问都是最后更新的值。
  • 持久性(Data Durability):意味着数据一旦被成功存储就可以一直继续使用,即使系统中的节点下线、宕机或数据损坏也是如。

06 | 如何区分批处理还是流处理?

  • 无边界数据(Unbounded Data):是一种不断增长,可以说是无限的数据集。
  • 有边界数据(Bounded Data):是一种有限的数据集。
  • 事件时间(Event Time):指的是一个数据实际产生的时间点。
  • 处理时间(Precessing Time):指的是处理数据的系统架构实际接收到这个数据的时间点。
  • 批处理:绝大部分情况下,批处理的输入数据都是有边界数据,同样的,输出结果也一样是有边界数据。所以在批处理中,我们所关心的更多会是数据的事件时间。
    • 应用场景:
      • 日志分析:日志系统是在一定时间段(日,周或年)内收集的,而日志的数据处理分析是在不同的时间内执行,以得出有关系统的一些关键性能指标。
      • 计费应用程序:计费应用程序会计算出一段时间内一项服务的使用程度,并生成计费信息,例如银行在每个月末生成的信用卡还款单。
      • 数据仓库:数据仓库的主要目标是根据收集好的数据事件时间,将数据信息合并为静态快照 (static snapshot),并将它们聚合为每周、每月、每季度的报告等。
  • 流处理:流处理的输入数据基本上都是无边界数据。
    • 应用场景
      • 实时监控:捕获和分析各种来源发布的数据,如传感器,新闻源,点击网页等。
      • 实时商业智能:智能汽车,智能家居,智能病人护理等。
      • 销售终端(POS)系统:像是股票价格的更新,允许用户实时完成付款的系统等。

07 | Workflow 设计模式:让你在大规模数据世界中君临天下

08 | 发布/订阅模式:流处理架构中的瑞士军刀

09 丨 CAP 定理:三选二,架构师必须学会的取舍

10 丨 Lambda 架构:Twitter 亿级实时数据分析架构背后的倚天剑

Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。

11 丨 Kappa 架构:利用 Kafka 锻造的屠龙刀

12 | 我们为什么需要 Spark?

MapReduce 的缺点:

  • 高昂的维护成本
  • 时间性能“达不到”用户的期待
  • MapReduce 模型的抽象层次低
  • 只提供 Map 和 Reduce 两个操作
  • 在 Hadoop 中,每一个 Job 的计算结果都会存储在 HDFS 文件存储系统中,所以每一步计算都要进行硬盘的读取和写入,大大增加了系统的延迟。
  • 只支持批处理

Spark 的优点

  • 性能比 MapReduce 高很多
  • Spark 提供了很多对 RDD 的操作,如 Map、Filter、flatMap、groupByKey 和 Union 等等,极大地提升了对各种复杂场景的支持

13 丨弹性分布式数据集:Spark 大厦的地基(上)

Spark 最基本的数据抽象是弹性分布式数据集(Resilient Distributed Dataset)

RDD 表示已被分区、不可变的,并能够被并行操作的数据集合。

14 丨弹性分布式数据集:Spark 大厦的地基(下)

15 丨 SparkSQL:Spark 数据查询的利器

16 | Spark Streaming:Spark 的实时流计算 API

Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的

17 | Structured Streaming:如何用 DataFrame API 进行实时数据分析?

18 丨 WordCount:从零开始运行你的第一个 Spark 应用

19 丨综合案例实战:处理加州房屋信息,构建线性回归模型

20 丨流处理案例实战:分析纽约市出租车载客信息


读到此处,感觉收获甚少,暂时搁置阅读。

参考资料