Dunwu Blog

大道至简,知易行难

HBase 数据模型

HBase 是一个面向 的数据库管理系统,这里更为确切的而说,HBase 是一个面向 列族 的数据库管理系统。表 schema 仅定义列族,表具有多个列族,每个列族可以包含任意数量的列,列由多个单元格(cell)组成,单元格可以存储多个版本的数据,多个版本数据以时间戳进行区分。

HBase 逻辑存储结构

  • **Table**:Table 由 Row 和 Column 组成。
  • **Row**:Row 是列族(Column Family)的集合。
  • Row KeyRow Key 是用来检索记录的主键
    • Row Key 是未解释的字节数组,所以理论上,任何数据都可以通过序列化表示成字符串或二进制,从而存为 HBase 的键值。
    • 表中的行,是按照 Row Key 的字典序进行排序。这里需要注意以下两点:
      • 因为字典序对 Int 排序的结果是 1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。如果你使用整型的字符串作为行键,那么为了保持整型的自然序,行键必须用 0 作左填充。
      • 行的一次读写操作是原子性的 (不论一次读写多少列)。
    • 所有对表的访问都要通过 Row Key,有以下三种方式:
      • 通过指定的 Row Key 进行访问;
      • 通过 Row Key 的 range 进行访问,即访问指定范围内的行;
      • 进行全表扫描。
  • **Column Family**:即列族。HBase 表中的每个列,都归属于某个列族。列族是表的 Schema 的一部分,所以列族需要在创建表时进行定义。
    • 一个表的列族必须作为表模式定义的一部分预先给出,但是新的列族成员可以随后按需加入。
    • 同一个列族的所有成员具有相同的前缀,例如 info:formatinfo:geo 都属于 info 这个列族。
  • **Column Qualifier**:列限定符。可以理解为是具体的列名,例如 info:formatinfo:geo 都属于 info 这个列族,它们的列限定符分别是 formatgeo。列族和列限定符之间始终以冒号分隔。需要注意的是列限定符不是表 Schema 的一部分,你可以在插入数据的过程中动态创建列。
  • **Column**:HBase 中的列由列族和列限定符组成,由 :(冒号) 进行分隔,即一个完整的列名应该表述为 列族名 :列限定符
  • **Cell**:Cell 是行,列族和列限定符的组合,并包含值和时间戳。HBase 中通过 row keycolumn 确定的为一个存储单元称为 Cell,你可以等价理解为关系型数据库中由指定行和指定列确定的一个单元格,但不同的是 HBase 中的一个单元格是由多个版本的数据组成的,每个版本的数据用时间戳进行区分。
    - Cell 由行和列的坐标交叉决定,是有版本的。默认情况下,版本号是自动分配的,为 HBase 插入 Cell 时的时间戳。Cell 的内容是未解释的字节数组。

  • **Timestamp**:Cell 的版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase 在数据写入时自动赋值,也可以由客户显式指定。每个 Cell 中,不同版本的数据按照时间戳倒序排列,即最新的数据排在最前面。

img

HBase 物理存储结构

HBase 自动将表水平划分成区域(Region)。每个 Region 由表中 Row 的子集构成。每个 Region 由它所属的表的起始范围来表示(包含的第一行和最后一行)。初始时,一个表只有一个 Region,随着 Region 膨胀,当超过一定阈值时,会在某行的边界上分裂成两个大小基本相同的新 Region。在第一次划分之前,所有加载的数据都放在原始 Region 所在的那台服务器上。随着表变大,Region 个数也会逐渐增加。Region 是在 HBase 集群上分布数据的最小单位。

HBase 数据模型示例

下图为 HBase 中一张表的:

  • RowKey 为行的唯一标识,所有行按照 RowKey 的字典序进行排序;
  • 该表具有两个列族,分别是 personal 和 office;
  • 其中列族 personal 拥有 name、city、phone 三个列,列族 office 拥有 tel、addres 两个列。

img

图片引用自 : HBase 是列式存储数据库吗 https://www.iteblog.com/archives/2498.html

HBase 表特性

Hbase 的表具有以下特点:

  • 容量大:一个表可以有数十亿行,上百万列;
  • 面向列:数据是按照列存储,每一列都单独存放,数据即索引,在查询时可以只访问指定列的数据,有效地降低了系统的 I/O 负担;
  • 稀疏性:空 (null) 列并不占用存储空间,表可以设计的非常稀疏 ;
  • 数据多版本:每个单元中的数据可以有多个版本,按照时间戳排序,新的数据在最上面;
  • 存储类型:所有数据的底层存储格式都是字节数组 (byte[])。

参考资料

HBase 协处理器

简述

在使用 HBase 时,如果你的数据量达到了数十亿行或数百万列,此时能否在查询中返回大量数据将受制于网络的带宽,即便网络状况允许,但是客户端的计算处理也未必能够满足要求。在这种情况下,协处理器(Coprocessors)应运而生。它允许你将业务计算代码放入在 RegionServer 的协处理器中,将处理好的数据再返回给客户端,这可以极大地降低需要传输的数据量,从而获得性能上的提升。同时协处理器也允许用户扩展实现 HBase 目前所不具备的功能,如权限校验、二级索引、完整性约束等。

协处理器类型

2.1 Observer 协处理器

1. 功能

Observer 协处理器类似于关系型数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。通常可以用来实现下面功能:

  • 权限校验:在执行 GetPut 操作之前,您可以使用 preGetprePut 方法检查权限;
  • 完整性约束: HBase 不支持关系型数据库中的外键功能,可以通过触发器在插入或者删除数据的时候,对关联的数据进行检查;
  • 二级索引: 可以使用协处理器来维护二级索引。

2. 类型

当前 Observer 协处理器有以下四种类型:

  • RegionObserver : 允许您观察 Region 上的事件,例如 Get 和 Put 操作。
  • RegionServerObserver : 允许您观察与 RegionServer 操作相关的事件,例如启动,停止或执行合并,提交或回滚。
  • MasterObserver : 允许您观察与 HBase Master 相关的事件,例如表创建,删除或 schema 修改。
  • WalObserver : 允许您观察与预写日志(WAL)相关的事件。

3. 接口

以上四种类型的 Observer 协处理器均继承自 Coprocessor 接口,这四个接口中分别定义了所有可用的钩子方法,以便在对应方法前后执行特定的操作。通常情况下,我们并不会直接实现上面接口,而是继承其 Base 实现类,Base 实现类只是简单空实现了接口中的方法,这样我们在实现自定义的协处理器时,就不必实现所有方法,只需要重写必要方法即可。

img

这里以 RegionObservers 为例,其接口类中定义了所有可用的钩子方法,下面截取了部分方法的定义,多数方法都是成对出现的,有 pre 就有 post

img

4. 执行流程

img

  • 客户端发出 put 请求
  • 该请求被分派给合适的 RegionServer 和 region
  • coprocessorHost 拦截该请求,然后在该表的每个 RegionObserver 上调用 prePut()
  • 如果没有被 prePut() 拦截,该请求继续送到 region,然后进行处理
  • region 产生的结果再次被 CoprocessorHost 拦截,调用 postPut()
  • 假如没有 postPut() 拦截该响应,最终结果被返回给客户端

如果大家了解 Spring,可以将这种执行方式类比于其 AOP 的执行原理即可,官方文档当中也是这样类比的:

If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor as applying advice by intercepting a request and then running some custom code,before passing the request on to its final destination (or even changing the destination).

如果您熟悉面向切面编程(AOP),您可以将协处理器视为通过拦截请求然后运行一些自定义代码来使用 Advice,然后将请求传递到其最终目标(或者更改目标)。

2.2 Endpoint 协处理器

Endpoint 协处理器类似于关系型数据库中的存储过程。客户端可以调用 Endpoint 协处理器在服务端对数据进行处理,然后再返回。

以聚集操作为例,如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,然后在客户端上遍历扫描结果,这必然会加重了客户端处理数据的压力。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出来,仅仅将该 max 值返回给客户端。之后客户端只需要将每个 Region 的最大值进行比较而找到其中最大的值即可。

协处理的加载方式

要使用我们自己开发的协处理器,必须通过静态(使用 HBase 配置)或动态(使用 HBase Shell 或 Java API)加载它。

  • 静态加载的协处理器称之为 System Coprocessor(系统级协处理器),作用范围是整个 HBase 上的所有表,需要重启 HBase 服务;
  • 动态加载的协处理器称之为 Table Coprocessor(表处理器),作用于指定的表,不需要重启 HBase 服务。

其加载和卸载方式分别介绍如下。

静态加载与卸载

4.1 静态加载

静态加载分以下三步:

  1. hbase-site.xml 定义需要加载的协处理器。
1
2
3
4
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property>

<name> 标签的值必须是下面其中之一:

  • RegionObservers 和 Endpoints 协处理器:hbase.coprocessor.region.classes
  • WALObservers 协处理器: hbase.coprocessor.wal.classes
  • MasterObservers 协处理器:hbase.coprocessor.master.classes

<value> 必须是协处理器实现类的全限定类名。如果为加载指定了多个类,则类名必须以逗号分隔。

  1. 将 jar(包含代码和所有依赖项) 放入 HBase 安装目录中的 lib 目录下;
  2. 重启 HBase。

4.2 静态卸载

  1. 从 hbase-site.xml 中删除配置的协处理器的元素及其子元素;
  2. 从类路径或 HBase 的 lib 目录中删除协处理器的 JAR 文件(可选);
  3. 重启 HBase。

动态加载与卸载

使用动态加载协处理器,不需要重新启动 HBase。但动态加载的协处理器是基于每个表加载的,只能用于所指定的表。 此外,在使用动态加载必须使表脱机(disable)以加载协处理器。动态加载通常有两种方式:Shell 和 Java API 。

以下示例基于两个前提:

  1. coprocessor.jar 包含协处理器实现及其所有依赖项。
  2. JAR 包存放在 HDFS 上的路径为:hdfs:// / user / /coprocessor.jar

5.1 HBase Shell 动态加载

  1. 使用 HBase Shell 禁用表
1
hbase > disable 'tableName'
  1. 使用如下命令加载协处理器
1
2
3
hbase > alter 'tableName', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2'

Coprocessor 包含由管道(|)字符分隔的四个参数,按顺序解释如下:

  • JAR 包路径:通常为 JAR 包在 HDFS 上的路径。关于路径以下两点需要注意:
  • 允许使用通配符,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar 来添加指定的 JAR 包;
  • 可以使指定目录,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/ ,这会添加目录中的所有 JAR 包,但不会搜索子目录中的 JAR 包。
  • 类名:协处理器的完整类名。
  • 优先级:协处理器的优先级,遵循数字的自然序,即值越小优先级越高。可以为空,在这种情况下,将分配默认优先级值。
  • 可选参数 :传递的协处理器的可选参数。
  1. 启用表
1
hbase > enable 'tableName'
  1. 验证协处理器是否已加载
1
hbase > describe 'tableName'

协处理器出现在 TABLE_ATTRIBUTES 属性中则代表加载成功。

5.2 HBase Shell 动态卸载

  1. 禁用表
1
hbase> disable 'tableName'
  1. 移除表协处理器
1
hbase> alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
  1. 启用表
1
hbase> enable 'tableName'

5.3 Java API 动态加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

在 HBase 0.96 及其以后版本中,HTableDescriptor 的 addCoprocessor() 方法提供了一种更为简便的加载方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TableName tableName = TableName.valueOf("users");
Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar");
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path,
Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

5.4 Java API 动态卸载

卸载其实就是重新定义表但不设置协处理器。这会删除所有表上的协处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

协处理器案例

这里给出一个简单的案例,实现一个类似于 Redis 中 append 命令的协处理器,当我们对已有列执行 put 操作时候,HBase 默认执行的是 update 操作,这里我们修改为执行 append 操作。

1
2
3
4
5
6
7
8
9
# redis append 命令示例
redis> EXISTS mykey
(integer) 0
redis> APPEND mykey "Hello"
(integer) 5
redis> APPEND mykey " World"
(integer) 11
redis> GET mykey
"Hello World"

6.1 创建测试表

1
2
# 创建一张杂志表 有文章和图片两个列族
hbase > create 'magazine','article','picture'

6.2 协处理器编程

完整代码可见本仓库:hbase-observer-coprocessor

新建 Maven 工程,导入下面依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>

继承 BaseRegionObserver 实现我们自定义的 RegionObserver,对相同的 article:content 执行 put 命令时,将新插入的内容添加到原有内容的末尾,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AppendRegionObserver extends BaseRegionObserver {

private byte[] columnFamily = Bytes.toBytes("article");
private byte[] qualifier = Bytes.toBytes("content");

@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
if (put.has(columnFamily, qualifier)) {
// 遍历查询结果,获取指定列的原值
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
String oldValue = "";
for (Cell cell : rs.rawCells())
if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) {
oldValue = Bytes.toString(CellUtil.cloneValue(cell));
}

// 获取指定列新插入的值
List<Cell> cells = put.get(columnFamily, qualifier);
String newValue = "";
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) {
newValue = Bytes.toString(CellUtil.cloneValue(cell));
}
}

// Append 操作
put.addColumn(columnFamily, qualifier, Bytes.toBytes(oldValue + newValue));
}
}
}

6.3 打包项目

使用 maven 命令进行打包,打包后的文件名为 hbase-observer-coprocessor-1.0-SNAPSHOT.jar

1
# mvn clean package

6.4 上传 JAR 包到 HDFS

1
2
3
4
# 上传项目到HDFS上的hbase目录
hadoop fs -put /usr/app/hbase-observer-coprocessor-1.0-SNAPSHOT.jar /hbase
# 查看上传是否成功
hadoop fs -ls /hbase

img

6.5 加载协处理器

  1. 加载协处理器前需要先禁用表
1
hbase >  disable 'magazine'
  1. 加载协处理器
1
hbase >   alter 'magazine', METHOD => 'table_att', 'Coprocessor'=>'hdfs://hadoop001:8020/hbase/hbase-observer-coprocessor-1.0-SNAPSHOT.jar|com.heibaiying.AppendRegionObserver|1001|'
  1. 启用表
1
hbase >  enable 'magazine'
  1. 查看协处理器是否加载成功
1
hbase >  desc 'magazine'

协处理器出现在 TABLE_ATTRIBUTES 属性中则代表加载成功,如下图:

img

6.6 测试加载结果

插入一组测试数据:

1
2
3
4
hbase > put 'magazine', 'rowkey1','article:content','Hello'
hbase > get 'magazine','rowkey1','article:content'
hbase > put 'magazine', 'rowkey1','article:content','World'
hbase > get 'magazine','rowkey1','article:content'

可以看到对于指定列的值已经执行了 append 操作:

img

插入一组对照数据:

1
2
3
4
hbase > put 'magazine', 'rowkey1','article:author','zhangsan'
hbase > get 'magazine','rowkey1','article:author'
hbase > put 'magazine', 'rowkey1','article:author','lisi'
hbase > get 'magazine','rowkey1','article:author'

可以看到对于正常的列还是执行 update 操作:

img

6.7 卸载协处理器

  1. 卸载协处理器前需要先禁用表
1
hbase >  disable 'magazine'
  1. 卸载协处理器
1
hbase > alter 'magazine', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
  1. 启用表
1
hbase >  enable 'magazine'
  1. 查看协处理器是否卸载成功
1
hbase >  desc 'magazine'

img

6.8 测试卸载结果

依次执行下面命令可以测试卸载是否成功

1
2
3
hbase > get 'magazine','rowkey1','article:content'
hbase > put 'magazine', 'rowkey1','article:content','Hello'
hbase > get 'magazine','rowkey1','article:content'

img

HBase 过滤器

HBase 过滤器简介

Hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,即谓词下推(predicate push down)。这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。

过滤器基础

Filter 接口和 FilterBase 抽象类

Filter 接口中定义了过滤器的基本方法,FilterBase 抽象类实现了 Filter 接口。所有内置的过滤器则直接或者间接继承自 FilterBase 抽象类。用户只需要将定义好的过滤器通过 setFilter 方法传递给 Scanput 的实例即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
setFilter(Filter filter)
// Scan 中定义的 setFilter
@Override
public Scan setFilter(Filter filter) {
super.setFilter(filter);
return this;
}
// Get 中定义的 setFilter
@Override
public Get setFilter(Filter filter) {
super.setFilter(filter);
return this;
}

过滤器分类

HBase 内置过滤器可以分为三类:分别是比较过滤器,专用过滤器和包装过滤器。分别在下面的三个小节中做详细的介绍。

比较过滤器

所有比较过滤器均继承自 CompareFilter。创建一个比较过滤器需要两个参数,分别是比较运算符比较器实例

1
2
3
4
public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {
this.compareOp = compareOp;
this.comparator = comparator;
}

比较运算符

  • LESS (<)
  • LESS_OR_EQUAL (<=)
  • EQUAL (=)
  • NOT_EQUAL (!=)
  • GREATER_OR_EQUAL (>=)
  • GREATER (>)
  • NO_OP (排除所有符合条件的值)

比较运算符均定义在枚举类 CompareOperator

1
2
3
4
5
6
7
8
9
10
@InterfaceAudience.Public
public enum CompareOperator {
LESS,
LESS_OR_EQUAL,
EQUAL,
NOT_EQUAL,
GREATER_OR_EQUAL,
GREATER,
NO_OP,
}

注意:在 1.x 版本的 HBase 中,比较运算符定义在 CompareFilter.CompareOp 枚举类中,但在 2.0 之后这个类就被标识为 @deprecated ,并会在 3.0 移除。所以 2.0 之后版本的 HBase 需要使用 CompareOperator 这个枚举类。

比较器

所有比较器均继承自 ByteArrayComparable 抽象类,常用的有以下几种:

  • BinaryComparator : 使用 Bytes.compareTo(byte [],byte []) 按字典序比较指定的字节数组。
  • BinaryPrefixComparator : 按字典序与指定的字节数组进行比较,但只比较到这个字节数组的长度。
  • RegexStringComparator : 使用给定的正则表达式与指定的字节数组进行比较。仅支持 EQUALNOT_EQUAL 操作。
  • SubStringComparator : 测试给定的子字符串是否出现在指定的字节数组中,比较不区分大小写。仅支持 EQUALNOT_EQUAL 操作。
  • NullComparator :判断给定的值是否为空。
  • BitComparator :按位进行比较。

BinaryPrefixComparatorBinaryComparator 的区别不是很好理解,这里举例说明一下:

在进行 EQUAL 的比较时,如果比较器传入的是 abcd 的字节数组,但是待比较数据是 abcdefgh

  • 如果使用的是 BinaryPrefixComparator 比较器,则比较以 abcd 字节数组的长度为准,即 efgh 不会参与比较,这时候认为 abcdabcdefgh 是满足 EQUAL 条件的;
  • 如果使用的是 BinaryComparator 比较器,则认为其是不相等的。

比较过滤器种类

比较过滤器共有五个(Hbase 1.x 版本和 2.x 版本相同):

  • RowFilter :基于行键来过滤数据;
  • FamilyFilterr :基于列族来过滤数据;
  • QualifierFilterr :基于列限定符(列名)来过滤数据;
  • ValueFilterr :基于单元格 (cell) 的值来过滤数据;
  • DependentColumnFilter :指定一个参考列来过滤其他列的过滤器,过滤的原则是基于参考列的时间戳来进行筛选 。

前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例即可构建,然后通过 setFilter 方法传递给 scan

1
2
3
Filter filter  = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
scan.setFilter(filter);

DependentColumnFilter 的使用稍微复杂一点,这里单独做下说明。

DependentColumnFilter

可以把 DependentColumnFilter 理解为一个 valueFilter 和一个时间戳过滤器的组合DependentColumnFilter 有三个带参构造器,这里选择一个参数最全的进行说明:

1
2
3
DependentColumnFilter(final byte [] family, final byte[] qualifier,
final boolean dropDependentColumn, final CompareOperator op,
final ByteArrayComparable valueComparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropDependentColumn :决定参考列是否被包含在返回结果内,为 true 时表示参考列被返回,为 false 时表示被丢弃
  • op :比较运算符
  • valueComparator :比较器

这里举例进行说明:

1
2
3
4
5
6
DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(
Bytes.toBytes("student"),
Bytes.toBytes("name"),
false,
CompareOperator.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
  • 首先会去查找 student:name 中值以 xiaolan 开头的所有数据获得 参考数据集,这一步等同于 valueFilter 过滤器;
  • 其次再用参考数据集中所有数据的时间戳去检索其他列,获得时间戳相同的其他列的数据作为 结果数据集,这一步等同于时间戳过滤器;
  • 最后如果 dropDependentColumn 为 true,则返回 参考数据集+结果数据集,若为 false,则抛弃参考数据集,只返回 结果数据集

专用过滤器

专用过滤器通常直接继承自 FilterBase,适用于范围更小的筛选规则。

单列列值过滤器 (SingleColumnValueFilter)

基于某列(参考列)的值决定某行数据是否被过滤。其实例有以下方法:

  • setFilterIfMissing(boolean filterIfMissing) :默认值为 false,即如果该行数据不包含参考列,其依然被包含在最后的结果中;设置为 true 时,则不包含;
  • setLatestVersionOnly(boolean latestVersionOnly) :默认为 true,即只检索参考列的最新版本数据;设置为 false,则检索所有版本数据。
1
2
3
4
5
6
7
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"student".getBytes(),
"name".getBytes(),
CompareOperator.EQUAL,
new SubstringComparator("xiaolan"));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);

单列列值排除器 (SingleColumnValueExcludeFilter)

SingleColumnValueExcludeFilter 继承自上面的 SingleColumnValueFilter,过滤行为与其相反。

行键前缀过滤器 (PrefixFilter)

基于 RowKey 值决定某行数据是否被过滤。

1
2
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(prefixFilter);

列名前缀过滤器 (ColumnPrefixFilter)

基于列限定符(列名)决定某行数据是否被过滤。

1
2
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(columnPrefixFilter);

分页过滤器 (PageFilter)

可以使用这个过滤器实现对结果按行进行分页,创建 PageFilter 实例的时候需要传入每页的行数。

1
2
3
4
public PageFilter(final long pageSize) {
Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
this.pageSize = pageSize;
}

下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:

客户端进行分页查询,需要传递 startRow(起始 RowKey),知道起始 startRow 后,就可以返回对应的 pageSize 行数据。这里唯一的问题就是,对于第一次查询,显然 startRow 就是表格的第一行数据,但是之后第二次、第三次查询我们并不知道 startRow,只能知道上一次查询的最后一条数据的 RowKey(简单称之为 lastRow)。

我们不能将 lastRow 作为新一次查询的 startRow 传入,因为 scan 的查询区间是[startRow,endRow) ,即前开后闭区间,这样 startRow 在新的查询也会被返回,这条数据就重复了。

同时在不使用第三方数据库存储 RowKey 的情况下,我们是无法通过知道 lastRow 的下一个 RowKey 的,因为 RowKey 的设计可能是连续的也有可能是不连续的。

由于 Hbase 的 RowKey 是按照字典序进行排序的。这种情况下,就可以在 lastRow 后面加上 0 ,作为 startRow 传入,因为按照字典序的规则,某个值加上 0 后的新值,在字典序上一定是这个值的下一个值,对于 HBase 来说下一个 RowKey 在字典序上一定也是等于或者大于这个新值的。

所以最后传入 lastRow+0,如果等于这个值的 RowKey 存在就从这个值开始 scan,否则从字典序的下一个 RowKey 开始 scan。

25 个字母以及数字字符,字典排序如下:

1
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分页查询主要实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(15);

int totalRows = 0;
byte[] lastRow = null;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 如果不是首行 则 lastRow + 0
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " +
Bytes.toStringBinary(startRow));
scan.withStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
//最后一页,查询结束
if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);

需要注意的是在多台 Regin Services 上执行分页过滤的时候,由于并行执行的过滤器不能共享它们的状态和边界,所以有可能每个过滤器都会在完成扫描前获取了 PageCount 行的结果,这种情况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。

时间戳过滤器 (TimestampsFilter)

1
2
3
4
List<Long> list = new ArrayList<>();
list.add(1554975573000L);
TimestampsFilter timestampsFilter = new TimestampsFilter(list);
scan.setFilter(timestampsFilter);

首次行键过滤器 (FirstKeyOnlyFilter)

FirstKeyOnlyFilter 只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,通常用于行数统计的场景,因为如果某一行存在,则行中必然至少有一列。

1
2
FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();
scan.set(firstKeyOnlyFilter);

包装过滤器

包装过滤器就是通过包装其他过滤器以实现某些拓展的功能。

SkipFilter 过滤器

SkipFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,则拓展过滤整行数据。下面是一个使用示例:

1
2
3
4
5
// 定义 ValueFilter 过滤器
Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
// 使用 SkipFilter 进行包装
Filter filter2 = new SkipFilter(filter1);

WhileMatchFilter 过滤器

WhileMatchFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,WhileMatchFilter 则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("rowKey4")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner1.close();

System.out.println("--------------------");

// 使用 WhileMatchFilter 进行包装
Filter filter2 = new WhileMatchFilter(filter1);

scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0
rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0
rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0
rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0
rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0
--------------------
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0

可以看到被包装后,只返回了 rowKey4 之前的数据。

FilterList

以上都是讲解单个过滤器的作用,当需要多个过滤器共同作用于一次查询的时候,就需要使用 FilterListFilterList 支持通过构造器或者 addFilter 方法传入多个过滤器。

1
2
3
4
5
6
7
8
// 构造器传入
public FilterList(final Operator operator, final List<Filter> filters)
public FilterList(final List<Filter> filters)
public FilterList(final Filter... filters)

// 方法传入
public void addFilter(List<Filter> filters)
public void addFilter(Filter filter)

多个过滤器组合的结果由 operator 参数定义 ,其可选参数定义在 Operator 枚举类中。只有 MUST_PASS_ALLMUST_PASS_ONE 两个可选的值:

  • MUST_PASS_ALL :相当于 AND,必须所有的过滤器都通过才认为通过;
  • MUST_PASS_ONE :相当于 OR,只有要一个过滤器通过则认为通过。
1
2
3
4
5
6
7
@InterfaceAudience.Public
public enum Operator {
/** !AND */
MUST_PASS_ALL,
/** !OR */
MUST_PASS_ONE
}

使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<Filter> filters = new ArrayList<Filter>();

Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("XXX")));
filters.add(filter1);

Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("YYY")));
filters.add(filter2);

Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
new RegexStringComparator("ZZZ"));
filters.add(filter3);

FilterList filterList = new FilterList(filters);

Scan scan = new Scan();
scan.setFilter(filterList);

参考资料

HBase Schema 设计

Hbase 设计时要考虑的因素

  • 这个表应该有多少 Column Family
  • Column Family 使用什么数据
  • 每个 Column Family 有有多少列
  • 列名是什么,尽管列名不必在建表时定义,但读写数据是要知道的
  • 单元应该存放什么数据
  • 每个单元存储多少时间版本
  • 行健(rowKey)结构是什么,应该包含什么信息

HBase Schema 设计规则

Column Family 设计

HBase 不能很好处理 2 ~ 3 个以上的 Column Family,所以 HBase 表应尽可能减少 Column Family 数。如果可以,请只使用一个列族,只有需要经常执行 Column 范围查询时,才引入多列族。也就是说,尽量避免同时查询多个列族。

  • Column Family 数量多,会影响数据刷新。HBase 的数据刷新是在每个 Region 的基础上完成的。因此,如果一个 Column Family 携带大量导致刷新的数据,那么相邻的列族即使携带的数据量很小,也会被刷新。当存在许多 Column Family 时,刷新交互会导致一堆不必要的 IO。 此外,在表/区域级别的压缩操作也会在每个存储中发生。
  • Column Family 数量多,会影响查找效率。如:Column Family A 有 100 万行,Column Family B 有 10 亿行,那么 Column Family A 的数据可能会分布在很多很多区域(和 RegionServers)。 这会降低 Column Family A 的批量扫描效率。

Column Family 名尽量简短,最好是一个字符。Column Family 会在列限定符中被频繁使用,缩短长度有利于节省空间并提升效率。

Row 设计

HBase 中的 Row 按 Row Key 的字典顺序排序

  • 不要将 Row Key 设计为单调递增的,例如:递增的整数或时间戳

    • 问题:因为 Hbase 的 Row Key 是就近存储的,这样会导致一段时间内大部分写入集中在某一个 Region 上,即所谓热点问题。

    • 解决方法一、加盐:这里的不是指密码学的加盐,而是指将随机分配的前缀添加到行键的开头。这么做是为了避免相同前缀的 Row Key 数据被存储在相邻位置,从而导致热点问题。示例如下:

      • ```
        foo0001
        foo0002
        foo0003
        foo0004

        改为

        a-foo0003
        b-foo0001
        c-foo0003
        c-foo0004
        d-foo0002

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29

        - 解决方法二、Hash:Row Key 的前缀使用 Hash

        - **尽量减少行和列的长度**

        - **反向时间戳**:反向时间戳可以极大地帮助快速找到值的最新版本。

        - **行健不能改变**:唯一可以改变的方式是先删除后插入。

        - **Row Key 和 Column Family**:Row Key 从属于 Column Family,因此,相同的 Row Key 可以存在每一个 Column Family 中而不会出现冲突。

        ### Version 设计

        最大、最小 Row 版本号:表示 HBase 会保留的版本号数的上下限。均可以通过 HColumnDescriptor 对每个列族进行配置

        Row 版本号过大,会大大增加 StoreFile 的大小;所以,最大 Row 版本号应按需设置。HBase 会在主要压缩时,删除多余的版本。

        ### TTL 设计

        Column Family 会设置一个以秒为单位的 TTL,一旦达到 TTL 时,HBase 会自动删除行记录。

        仅包含过期行的存储文件在次要压缩时被删除。 将 hbase.store.delete.expired.storefile 设置为 false 会禁用此功能。将最小版本数设置为 0 以外的值也会禁用此功能。

        在较新版本的 HBase 上,还支持在 Cell 上设置 TTL,与 Column Family 的 TTL 不同的是,单位是毫秒。

        ### Column Family 属性配置

        - HFile 数据块,默认是 64KB,数据库的大小影响数据块索引的大小。数据块大的话一次加载进内存的数据越多,扫描查询效果越好。但是数据块小的话,随机查询性能更好

create ‘mytable’,{NAME => ‘cf1’, BLOCKSIZE => ‘65536’}
复制代码

1
2
3

- 数据块缓存,数据块缓存默认是打开的,如果一些比较少访问的数据可以选择关闭缓存

create ‘mytable’,{NAME => ‘cf1’, BLOCKCACHE => ‘FALSE’}
复制代码

1
2
3

- 数据压缩,压缩会提高磁盘利用率,但是会增加 CPU 的负载,看情况进行控制

create ‘mytable’,{NAME => ‘cf1’, COMPRESSION => ‘SNAPPY’}
复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Hbase 表设计是和需求相关的,但是遵守表设计的一些硬性指标对性能的提升还是很有帮助的,这里整理了一些设计时用到的要点。

## Schema 设计案例

### 案例:日志数据和时序数据

假设采集以下数据

- Hostname
- Timestamp
- Log event
- Value/message

应该如何设计 Row Key?

1TimestampRow Key 头部

如果 Row Key 设计为 `[timestamp][hostname][log-event]` 形式,会出现热点问题。

如果针对时间的扫描很重要,可以采用时间戳分桶策略,即

bucket = timestamp % bucketNum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

计算出桶号后,将 Row Key 指定为:`[bucket][timestamp][hostname][log-event]`

如上所述,要为特定时间范围选择数据,需要对每个桶执行扫描。 例如,100 个桶将在键空间中提供广泛的分布,但需要 100 次扫描才能获取单个时间戳的数据,因此需要权衡取舍。

2)Hostname 在 Row Key 头部

如果主机样本量很大,将 Row Key 设计为 `[hostname][log-event][timestamp]`,这样有利于扫描 hostname。

3Timestamp 还是反向 Timestamp

如果数据访问以查找最近的数据为主,可以将时间戳存储为反向时间戳(例如: `timestamp = Long.MAX_VALUE – timestamp`),这样有利于扫描最近的数据。

4Row Key 是可变长度还是固定长度

拼接 Row Key 的关键字长度不一定是固定的,例如 hostname 有可能很长,也有可能很短。如果想要统一长度,可以参考以下做法:

- 将关键字 Hash 编码:使用某种 Hash 算法计算关键字,并取固定长度的值(例如:8 位或 16 位)。
- 使用数字替代关键字:例如:使用事件类型 Code 替换事件类型;hostname 如果是 IP,可以转换为 long
- 截取关键字:截取后的关键字需要有足够的辨识度,长度大小根据具体情况权衡。

5)时间分片

[hostname][log-event][timestamp1]
[hostname][log-event][timestamp2]
[hostname][log-event][timestamp3]

1
2
3

上面的例子中,每个详细事件都有单独的行键,可以重写如下,即每个时间段存储一次:

[hostname][log-event][timerange]

```

参考资料

HBase Java Api

HBase Client API

HBase Java API 示例

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
public class HBaseUtils {

private static Connection connection;

static {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 如果是集群 则主机名用逗号分隔
configuration.set("hbase.zookeeper.quorum", "hadoop001");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 创建 HBase 表
*
* @param tableName 表名
* @param columnFamilies 列族的数组
*/
public static boolean createTable(String tableName, List<String> columnFamilies) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
columnFamilies.forEach(columnFamily -> {
ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
cfDescriptorBuilder.setMaxVersions(1);
ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build();
tableDescriptor.setColumnFamily(familyDescriptor);
});
admin.createTable(tableDescriptor.build());
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除 hBase 表
*
* @param tableName 表名
*/
public static boolean deleteTable(String tableName) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 删除表前需要先禁用表
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}

/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param qualifier 列标识
* @param value 数据
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
String value) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param pairList 列标识和值的集合
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 根据 rowKey 获取指定行的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static Result getRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 获取指定行指定列 (cell) 的最新版本的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamily 列族
* @param qualifier 列标识
*/
public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (!get.isCheckExistenceOnly()) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = table.get(get);
byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
return Bytes.toString(resultValue);
} else {
return null;
}

} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索全表
*
* @param tableName 表名
*/
public static ResultScanner getScanner(String tableName) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索表中指定数据
*
* @param tableName 表名
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 检索表中指定数据
*
* @param tableName 表名
* @param startRowKey 起始 RowKey
* @param endRowKey 终止 RowKey
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 删除指定行记录
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static boolean deleteRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除指定行指定列
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param familyName 列族
* @param qualifier 列标识
*/
public static boolean deleteColumn(String tableName, String rowKey, String familyName,
String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
table.delete(delete);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}

}

数据库连接

在上面的代码中,在类加载时就初始化了 Connection 连接,并且之后的方法都是复用这个 Connection,这时我们可能会考虑是否可以使用自定义连接池来获取更好的性能表现?实际上这是没有必要的。

首先官方对于 Connection 的使用说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Connection Pooling For applications which require high-end multithreaded
access (e.g., web-servers or application servers that may serve many
application threads in a single JVM), you can pre-create a Connection,
as shown in the following example:

对于高并发多线程访问的应用程序(例如,在单个 JVM 中存在的为多个线程服务的 Web 服务器或应用程序服务器),
您只需要预先创建一个 Connection。例子如下:

// Create a connection to the cluster.
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tablename))) {
// use table as needed, the table returned is lightweight
}

之所以能这样使用,这是因为 Connection 并不是一个简单的 socket 连接,接口文档 中对 Connection 的表述是:

1
2
3
4
5
6
7
A cluster connection encapsulating lower level individual connections to actual servers and a
connection to zookeeper. Connections are instantiated through the ConnectionFactory class.
The lifecycle of the connection is managed by the caller, who has to close() the connection
to release the resources.

Connection 是一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与 zookeeper 的连接。
连接通过 ConnectionFactory 类实例化。连接的生命周期由调用者管理,调用者必须使用 close() 关闭连接以释放资源。

之所以封装这些连接,是因为 HBase 客户端需要连接三个不同的服务角色:

  • Zookeeper :主要用于获取 meta 表的位置信息,Master 的信息;
  • HBase Master :主要用于执行 HBaseAdmin 接口的一些操作,例如建表等;
  • HBase RegionServer :用于读、写数据。

Connection 对象和实际的 Socket 连接之间的对应关系如下图:

在 HBase 客户端代码中,真正对应 Socket 连接的是 RpcConnection 对象。HBase 使用 PoolMap 这种数据结构来存储客户端到 HBase 服务器之间的连接。PoolMap 的内部有一个 ConcurrentHashMap 实例,其 key 是 ConnectionId(封装了服务器地址和用户 ticket),value 是一个 RpcConnection 对象的资源池。当 HBase 需要连接一个服务器时,首先会根据 ConnectionId 找到对应的连接池,然后从连接池中取出一个连接对象。

1
2
3
4
5
6
7
8
9
10
11
12
@InterfaceAudience.Private
public class PoolMap<K, V> implements Map<K, V> {
private PoolType poolType;

private int poolMaxSize;

private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();

public PoolMap(PoolType poolType) {
this.poolType = poolType;
}
.....

HBase 中提供了三种资源池的实现,分别是 ReusableRoundRobinThreadLocal。具体实现可以通 hbase.client.ipc.pool.type 配置项指定,默认为 Reusable。连接池的大小也可以通过 hbase.client.ipc.pool.size 配置项指定,默认为 1,即每个 Server 1 个连接。也可以通过修改配置实现:

1
2
3
config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);

由此可以看出 HBase 中 Connection 类已经实现了对连接的管理功能,所以我们不必在 Connection 上在做额外的管理。

另外,Connection 是线程安全的,但 Table 和 Admin 却不是线程安全的,因此正确的做法是一个进程共用一个 Connection 对象,而在不同的线程中使用单独的 Table 和 Admin 对象。Table 和 Admin 的获取操作 getTable()getAdmin() 都是轻量级,所以不必担心性能的消耗,同时建议在使用完成后显示的调用 close() 方法来关闭它们。

DDL

创建、更新 schema

使用 HBase 命令HBase Admin API 都可以创建或修改 HBase 的 schema。

修改 ColumnFamily 时必须禁用表。这些更改将在下一次的压缩操作和重写 StoreFiles 时生效。

1
2
3
4
5
6
7
8
9
10
11
12
Configuration config = HBaseConfiguration.create();
Admin admin = new Admin(conf);
TableName table = TableName.valueOf("myTable");

admin.disableTable(table);

HColumnDescriptor cf1 = ...;
admin.addColumn(table, cf1); // adding new ColumnFamily
HColumnDescriptor cf2 = ...;
admin.modifyColumn(table, cf2); // modifying existing ColumnFamily

admin.enableTable(table);

DML

《大规模数据处理实战》笔记

00 丨开篇词丨从这里开始,带你走上硅谷一线系统架构师之路

01 丨为什么 MapReduce 会被硅谷一线公司淘汰?

高昂的维护成本

时间性能“达不到”用户的期待

02 | MapReduce 后谁主沉浮:怎样设计下一代数据处理技术?

03 | 大规模数据处理初体验:怎样实现大型电商热销榜?

不同量级 TOP K 算法的解决方案不同:

小规模:Hash 即可

大规模:由于单机的处理量不足以处理全量数据,势必分而治之:分片统计,然后聚合(即先 map 后 reduce)

04 丨分布式系统(上):学会用服务等级协议 SLA 来评估你的系统

SLA(Service-Level Agreement),也就是服务等级协议,指的是系统服务提供者(Provider)对客户(Customer)的一个服务承诺。

可用性:大厂一般要求可用性至少达到四个 9(即 99.99%)

准确性:准确率= 正确的有效请求数 / 有效的总请求数

系统容量:通常通过 QPS (Queries Per Second)来衡量

延迟:请求和响应的时间间隔

05 丨分布式系统(下):架构师不得不知的三大指标

  • 可扩展性(Scalability)
    • 水平扩展(Horizontal Scaling)
    • 垂直扩展(Vertical Scaling)
  • 一致性(Consistency)
    • 强一致性(Strong Consistency):系统中的某个数据被成功更新后,后续任何对该数据的读取操作都将得到更新
      后的值。所以在任意时刻,同一系统所有节点中的数据是一样的。
    • 弱一致性(Weak Consistency):系统中的某个数据被更新后,后续对该数据的读取操作可能得到更新后的值,
      也可能是更改前的值。但经过“不一致时间窗口”这段时间后,后续对该数据的读取都是更新后的值。
    • 最终一致性(Eventual Consistency):是弱一致性的特殊形式。存储系统保证,在没有新的更新的条件下,最终所有的访问都是最后更新的值。
  • 持久性(Data Durability):意味着数据一旦被成功存储就可以一直继续使用,即使系统中的节点下线、宕机或数据损坏也是如。

06 | 如何区分批处理还是流处理?

  • 无边界数据(Unbounded Data):是一种不断增长,可以说是无限的数据集。
  • 有边界数据(Bounded Data):是一种有限的数据集。
  • 事件时间(Event Time):指的是一个数据实际产生的时间点。
  • 处理时间(Precessing Time):指的是处理数据的系统架构实际接收到这个数据的时间点。
  • 批处理:绝大部分情况下,批处理的输入数据都是有边界数据,同样的,输出结果也一样是有边界数据。所以在批处理中,我们所关心的更多会是数据的事件时间。
    • 应用场景:
      • 日志分析:日志系统是在一定时间段(日,周或年)内收集的,而日志的数据处理分析是在不同的时间内执行,以得出有关系统的一些关键性能指标。
      • 计费应用程序:计费应用程序会计算出一段时间内一项服务的使用程度,并生成计费信息,例如银行在每个月末生成的信用卡还款单。
      • 数据仓库:数据仓库的主要目标是根据收集好的数据事件时间,将数据信息合并为静态快照 (static snapshot),并将它们聚合为每周、每月、每季度的报告等。
  • 流处理:流处理的输入数据基本上都是无边界数据。
    • 应用场景
      • 实时监控:捕获和分析各种来源发布的数据,如传感器,新闻源,点击网页等。
      • 实时商业智能:智能汽车,智能家居,智能病人护理等。
      • 销售终端(POS)系统:像是股票价格的更新,允许用户实时完成付款的系统等。

07 | Workflow 设计模式:让你在大规模数据世界中君临天下

08 | 发布/订阅模式:流处理架构中的瑞士军刀

09 丨 CAP 定理:三选二,架构师必须学会的取舍

10 丨 Lambda 架构:Twitter 亿级实时数据分析架构背后的倚天剑

Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。

11 丨 Kappa 架构:利用 Kafka 锻造的屠龙刀

12 | 我们为什么需要 Spark?

MapReduce 的缺点:

  • 高昂的维护成本
  • 时间性能“达不到”用户的期待
  • MapReduce 模型的抽象层次低
  • 只提供 Map 和 Reduce 两个操作
  • 在 Hadoop 中,每一个 Job 的计算结果都会存储在 HDFS 文件存储系统中,所以每一步计算都要进行硬盘的读取和写入,大大增加了系统的延迟。
  • 只支持批处理

Spark 的优点

  • 性能比 MapReduce 高很多
  • Spark 提供了很多对 RDD 的操作,如 Map、Filter、flatMap、groupByKey 和 Union 等等,极大地提升了对各种复杂场景的支持

13 丨弹性分布式数据集:Spark 大厦的地基(上)

Spark 最基本的数据抽象是弹性分布式数据集(Resilient Distributed Dataset)

RDD 表示已被分区、不可变的,并能够被并行操作的数据集合。

14 丨弹性分布式数据集:Spark 大厦的地基(下)

15 丨 SparkSQL:Spark 数据查询的利器

16 | Spark Streaming:Spark 的实时流计算 API

Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的

17 | Structured Streaming:如何用 DataFrame API 进行实时数据分析?

18 丨 WordCount:从零开始运行你的第一个 Spark 应用

19 丨综合案例实战:处理加州房屋信息,构建线性回归模型

20 丨流处理案例实战:分析纽约市出租车载客信息


读到此处,感觉收获甚少,暂时搁置阅读。

参考资料

《从 0 开始学大数据》笔记

预习模块

01 丨预习 01 丨大数据技术发展史:大数据的前世今生

大数据技术,起源于 Google 在 2004 年前后发表的三篇论文:

Doug Cutting 根据 Google 论文开发了 Hadoop。

大数据处理的主要应用场景包括数据分析、数据挖掘与机器学习。

数据分析主要使用 Hive、Spark SQL 等 SQL 引擎完成;

数据挖掘与机器学习则有专门的机器学习框架 TensorFlow、Mahout 以及 MLlib 等,内置了主要的机器学习和数据挖掘算法。

大数据要存入分布式文件系统(HDFS),要有序调度 MapReduce 和 Spark 作业执行,并能把执行结果写入到各个应用系统的数据库中,还需要有一个大数据平台整合所有这些大数据组件和企业应用系统。

02 丨预习 02 丨大数据应用发展史:从搜索引擎到人工智能

大数据的应用领域:

  • 搜索引擎:GFS 和 MapReduce 开启了超大规模的分布式存储和分布式计算应用。
  • 数据仓库:Hive 实现了用更低廉的价格获得比以往多得多的数据存储与计算能力。
  • 数据挖掘:基于海量数据进行关联分析。应用有:关联推荐、用户画像、关系图谱
  • 机器学习:有了大数据,可以把全部的历史数据都收集起来,统计其规律,进而预测正在发生的事情。

03 丨预习 03 丨大数据应用领域:数据驱动一切

大数据的行业应用:

  • 医疗健康领域
    • 医学影像智能识别
    • 病历大数据智能诊疗
  • 教育领域
    • AI 外语老师
    • 智能解题
  • 社交媒体领域:舆情监控与分析
  • 金融领域:大数据风控
  • 新零售领域:全链路管理
  • 交通领域
    • 实时采集监控数据
    • 判断道路拥堵状态
    • 无人驾驶技术

模块一、Hadoop 大数据原理与架构

04 | 移动计算比移动数据更划算

传统计算模型:输入 -> 计算 -> 输出,面对海量数据(TB 级甚至 PB 级),无法应对。

移动计算将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。

移动计算步骤:

  1. 将待处理的大规模数据存储在服务器集群的所有服务器上,主要使用 HDFS 分布式文件存储系统,将文件分成很多块(Block),以块为单位存储在集群的服务器上。
  2. 大数据引擎根据集群里不同服务器的计算能力,在每台服务器上启动若干分布式任务执行进程,这些进程会等待给它们分配执行任务。
  3. 使用大数据计算框架支持的编程模型进行编程,比如 Hadoop 的 MapReduce 编程模型,或者 Spark 的 RDD 编程模型。应用程序编写好以后,将其打包,MapReduce 和 Spark 都是在 JVM 环境中运行,所以打包出来的是一个 Java 的 JAR 包。
  4. 用 Hadoop 或者 Spark 的启动命令执行这个应用程序的 JAR 包,首先执行引擎会解析程序要处理的数据输入路径,根据输入数据量的大小,将数据分成若干片(Split),每一个数据片都分配给一个任务执行进程去处理。
  5. 任务执行进程收到分配的任务后,检查自己是否有任务对应的程序包,如果没有就去下载程序包,下载以后通过反射的方式加载程序。走到这里,最重要的一步,也就是移动计算就完成了。
  6. 加载程序后,任务执行进程根据分配的数据片的文件地址和数据在文件内的偏移量读取数据,并把数据输入给应用程序相应的方法

05 | 从 RAID 看垂直伸缩到水平伸缩的演化

海量数据存储核心问题

  • 数据存储容量的问题。既然大数据要解决的是数以 PB 计的数据计算问题,而一般的服务器磁盘容量通常 1 ~ 2TB,那么如何存储这么大规模的数据呢?
  • 数据读写速度的问题。一般磁盘的连续读写速度为几十 MB,以这样的速度,几十 PB 的数据恐怕要读写到天荒地老。
  • 数据可靠性的问题。磁盘大约是计算机设备中最易损坏的硬件了,通常情况一块磁盘使用寿命大概是一年,如果磁盘损坏了,数据怎么办?

解决方式是 RAID 技术:

  • 数据存储容量的问题。RAID 使用了 N 块磁盘构成一个存储阵列,如果使用 RAID 5,数据就可以存储在 N-1 块磁盘上,这样将存储空间扩大了 N-1 倍。
  • 数据读写速度的问题。RAID 根据可以使用的磁盘数量,将待写入的数据分成多片,并发同时向多块磁盘进行写入,显然写入的速度可以得到明显提高;同理,读取速度也可以得到明显提高。不过,需要注意的是,由于传统机械磁盘的访问延迟主要来自于寻址时间,数据真正进行读写的时间可能只占据整个数据访问时间的一小部分,所以数据分片后对 N 块磁盘进行并发读写操作并不能将访问速度提高 N 倍。
  • 数据可靠性的问题。使用 RAID 10、RAID 5 或者 RAID 6 方案的时候,由于数据有冗余存储,或者存储校验信息,所以当某块磁盘损坏的时候,可以通过其他磁盘上的数据和校验数据将丢失磁盘上的数据还原。

实现更强的计算能力和更大规模的数据存储有两种思路

  • 垂直伸缩(scaling up),即硬件升级
  • 水平伸缩(scaling out),即分布式系统

注:RAID 技术就是采用了垂直伸缩的方式。

06 | 新技术层出不穷,HDFS 依然是存储的王者

img

HDFS 有两个关键组件:DataNode 和 NameNode:

  • DataNode 负责文件数据的存储和读写操作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得 HDFS 可以在服务器集群规模上实现数据并行访问,极大地提高了访问速度。
  • NameNode 负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名、数据块的 ID 以及存储位置等信息,相当于操作系统中文件分配表(FAT)的角色。

为保证高可用,HDFS 会,会将一个数据块复制为多份(默认为 3 份),并将多份相同的数据块存储在不同的服务器上,甚至不同的机架上。

img

HDFS 故障容错:

  • 数据存储故障容错
    • 对于 DataNode 上的数据块进行计算并存储校验和(CheckSum)
    • 读取数据的时候,重新计算读取出来的数据的校验和,校验和不正确,则抛出异常
    • 发现异常后,从其他 DataNode 读取备份
  • 磁盘故障容错
    • 如果 DataNode 监测到本机磁盘损坏,将该磁盘的所有数据块 ID 报告给 NameNode
    • NameNode 检查这些数据块在哪些 DataNode 上有备份,复制一份到其他 DataNode 上
  • DataNode 故障容错
    • DataNode 会通过心跳和 NameNode 保持通信
    • 如果 DataNode 超时未发送心跳,NameNode 视其为宕机
    • NameNode 查找这个 DataNode 存储的所有数据块,复制一份到其他 DataNode 上
  • NameNode 故障容错
    • 基于 ZooKeeper 实现主从备份
    • 争夺 znode 锁

07 | 为什么说 MapReduce 既是编程模型又是计算框架?

MapReduce 既是编程模型,又是计算框架

MapReduce 编程模型只包含 Map 和 Reduce 两个过程,map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value 集合>;再将这个 <Key, Value 集合> 输入 reduce,经过计算输出零个或多个 <Key, Value> 对。

08 | MapReduce 如何让数据完成一次旅行?

MapReduce 作业启动和运行机制

大数据应用进程。这类进程是启动 MapReduce 程序的主入口,主要是指定 Map 和 Reduce 类、输入输出文件路径等,并提交作业给 Hadoop 集群,也就是下面提到的 JobTracker 进程。这是由用户启动的 MapReduce 程序进程,比如我们上期提到的 WordCount 程序。

JobTracker 进程。这类进程根据要处理的输入数据量,命令下面提到的 TaskTracker 进程启动相应数量的 Map 和 Reduce 进程任务,并管理整个作业生命周期的任务调度和监控。这是 Hadoop 集群的常驻进程,需要注意的是,JobTracker 进程在整个 Hadoop 集群全局唯一。

TaskTracker 进程。这个进程负责启动和管理 Map 进程以及 Reduce 进程。因为需要每个数据块都有对应的 map 函数,TaskTracker 进程通常和 HDFS 的 DataNode 进程启动在同一个服务器。也就是说,Hadoop 集群中绝大多数服务器同时运行 DataNode 进程和 TaskTracker 进程。

MapReduce 数据合并与连接机制

在 map 输出与 reduce 输入之间,MapReduce 计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫 shuffle。分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是 shuffle。

09 | 为什么我们管 Yarn 叫作资源调度框架?

服务器集群资源调度管理和 MapReduce 执行过程耦合在一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使用集群中的资源了。

Yarn 包括两个部分:

ResourceManager 进程负责整个集群的资源调度管理,通常部署在独立的服务器上;

NodeManager 进程负责具体服务器上的资源和任务管理,在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。

Yarn 的工作流程

  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。
  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容器里面启动 MapReduce ApplicationMaster。
  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自己的应用程序申请容器资源。
  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容器中运行,运行的就是 Map 或者 Reduce 任务。
  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。

10 | 模块答疑:我们能从 Hadoop 学到什么?

Hadoop 几个主要产品的架构设计,就会发现它们都有相似性,都是一主多从的架构方案。

  • HDFS,一个 NameNode,多个 DataNode;
  • MapReduce,一个 JobTracker,多个 TaskTracker;
  • Yarn,一个 ResourceManager,多个 NodeManager。

事实上,很多大数据产品都是这样的架构方案:

Storm,一个 Nimbus,多个 Supervisor;

Spark,一个 Master,多个 Slave。

大数据因为要对数据和计算任务进行统一管理,所以和互联网在线应用不同,需要一个全局管理者。一言以蔽之:集中管理,分布存储与计算

模块二、大数据生态体系主要产品原理与架构

11 | Hive 是如何让 MapReduce 实现 SQL 操作的?

Hive 能够直接处理我们输入的 SQL 语句(Hive 的 SQL 语法和数据库标准 SQL 略有不同),调用 MapReduce 计算框架完成数据分析操作。

我们通过 Hive 的 Client(Hive 的命令行工具,JDBC 等)向 Hive 提交 SQL 命令。如果是创建数据表的 DDL(数据定义语言),Hive 就会通过执行引擎 Driver 将数据表的信息记录在 Metastore 元数据组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联 HDFS 文件路径等这些数据库的 Meta 信息(元信息)。

如果我们提交的是查询分析数据的 DQL(数据查询语句),Driver 就会将该语句提交给自己的编译器 Compiler 进行语法分析、语法解析、语法优化等一系列操作,最后生成一个 MapReduce 执行计划。然后根据执行计划生成一个 MapReduce 的作业,提交给 Hadoop MapReduce 计算框架处理。

12 | 我们并没有觉得 MapReduce 速度慢,直到 Spark 出现

RDD 是 Spark 的核心概念,是弹性数据集(Resilient Distributed Datasets)的缩写。RDD 既是 Spark 面向开发者的编程模型,又是 Spark 自身架构的核心元素。

Spark 上编写 WordCount 程序,主要代码只需要三行

1
2
3
4
5
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

MapReduce 针对输入数据,将计算过程分为两个阶段,一个 Map 阶段,一个 Reduce 阶段,可以理解成是面向过程的大数据计算。

而 Spark 则直接针对数据进行编程,将大规模数据集合抽象成一个 RDD 对象,然后在这个 RDD 上进行各种计算处理,得到一个新的 RDD,继续计算处理,直到得到最后的结果数据。所以 Spark 可以理解成是面向对象的大数据计算。

RDD 上定义的函数分两种,一种是转换(transformation)函数,这种函数的返回值还是 RDD;另一种是执行(action)函数,这种函数不再返回 RDD。

RDD 定义了很多转换操作函数,比如有计算 map(func)、过滤 filter(func)、合并数据集 union(otherDataset)、根据 Key 聚合 reduceByKey(func, [numPartitions])、连接数据集 join(otherDataset, [numPartitions])、分组 groupByKey([numPartitions]) 等十几个函数。

13 | 同样的本质,为何 Spark 可以更高效?

Spark 有三个主要特性:RDD 的编程模型更简单,DAG 切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得 Spark 相对 Hadoop MapReduce 可以有更快的执行速度,以及更简单的编程实现。

14 | BigTable 的开源实现:HBase

HBase 可伸缩架构

HBase 的伸缩性主要依赖其可分裂的 HRegion 及可伸缩的分布式文件系统 HDFS 实现。

HRegion 是 HBase 负责数据存储的主要进程,应用程序对数据的读写操作都是通过和 HRegion 通信完成。上面是 HBase 架构图,我们可以看到在 HBase 中,数据以 HRegion 为单位进行管理,也就是说应用程序如果想要访问一个数据,必须先找到 HRegion,然后将数据读写操作提交给 HRegion,由 HRegion 完成存储层面的数据操作。

HRegionServer 是物理服务器,每个 HRegionServer 上可以启动多个 HRegion 实例。当一个 HRegion 中写入的数据太多,达到配置的阈值时,一个 HRegion 会分裂成两个 HRegion,并将 HRegion 在整个集群中进行迁移,以使 HRegionServer 的负载均衡。

每个 HRegion 中存储一段 Key 值区间 [key1, key2) 的数据,所有 HRegion 的信息,包括存储的 Key 值区间、所在 HRegionServer 地址、访问端口号等,都记录在 HMaster 服务器上。为了保证 HMaster 的高可用,HBase 会启动多个 HMaster,并通过 ZooKeeper 选举出一个主服务器。

应用程序通过 ZooKeeper 获得主 HMaster 的地址,输入 Key 值获得这个 Key 所在的 HRegionServer 地址,然后请求 HRegionServer 上的 HRegion,获得所需要的数据。

HRegion 会把数据存储在若干个 HFile 格式的文件中,这些文件使用 HDFS 分布式文件系统存储,在整个集群内分布并高可用。当一个 HRegion 中数据量太多时,这个 HRegion 连同 HFile 会分裂成两个 HRegion,并根据集群中服务器负载进行迁移。如果集群中有新加入的服务器,也就是说有了新的 HRegionServer,由于其负载较低,也会把 HRegion 迁移过去并记录到 HMaster,从而实现 HBase 的线性伸缩。

HBase 可扩展数据模型

支持列族结构的 NoSQL 数据库,在创建表的时候,只需要指定列族的名字,无需指定字段(Column)。那什么时候指定字段呢?可以在数据写入时再指定。通过这种方式, 数据表可以包含数百万的字段,这样就可以随意扩展应用程序的数据结构了。并且这种数据库在查询时也很方便,可以通过指定任意字段名称和值进行查询。

HBase 这种列族的数据结构设计,实际上是把字段的名称和字段的值,以 Key-Value 的方式一起存储在 HBase 中。实际写入的时候,可以随意指定字段名称,即使有几百万个字段也能轻松应对。

HBase 的高性能存储

HBase 使用了一种叫作 LSM 树(Log 结构合并树)的数据结构进行数据存储。数据写入的时候以 Log 方式连续写入,然后异步对磁盘上的多个 LSM 树进行合并。

LSM 树可以看作是一个 N 阶合并树。数据写操作(包括插入、修改、删除)都在内存中进行,并且都会创建一个新记录(修改会记录新的数据值,而删除会记录一个删除标志)。这些数据在内存中仍然还是一棵排序树,当数据量超过设定的内存阈值后,会将这棵排序树和磁盘上最新的排序树合并。当这棵排序树的数据量也超过设定阈值后,会和磁盘上下一级的排序树合并。合并过程中,会用最新更新的数据覆盖旧的数据(或者记录为不同版本)。

15 | 流式计算的代表:Storm、Flink、Spark Streaming

16 | ZooKeeper 是如何保证数据一致性的?

分布式系统中的“脑裂”是指一个系统中的节点被分隔成两个或多个独立的部分,这些部分无法互相通信,导致系统出现不一致性和数据丢失的问题。通常情况下,“脑裂”是由于网络故障、硬件故障或者软件故障等因素导致的。

包括 HDFS 在内的很多大数据技术都选择了使用 ZooKeeper 来解决多台服务器的状态一致性问题。

ZooKeeper 使用了一种叫 ZAB 算法来解决一致性问题。ZAB 可视为 Paxos 算法的一种简化方案。

17 丨模块答疑:这么多技术,到底都能用在什么场景里?

大数据技术在实际部署的时候,通常会部署在同一个集群中,也就是说,在由很多台服务器组成的服务器集群中,某台服务器可能运行着 HDFS 的 DataNode 进程,负责 HDFS 的数据存储;同时也运行着 Yarn 的 NodeManager,负责计算资源的调度管理;而 MapReduce、Spark、Storm、Flink 这些批处理或者流处理大数据计算引擎则通过 Yarn 的调度,运行在 NodeManager 的容器(container)里面。至于 Hive、Spark SQL 这些运行在 MapReduce 或者 Spark 基础上的大数据仓库引擎,在经过自身的执行引擎将 SQL 语句解析成 MapReduce 或者 Spark 的执行计划以后,一样提交给 Yarn 去调度执行。

模块三、大数据开发实践

18 | 如何自己开发一个大数据 SQL 引擎?

19 | Spark 的性能优化案例分析(上)

性能指标:

  • 响应时间:完成一次任务(请求)花费的时间。
  • 并发数:同时处理的任务数(请求数)。
  • 吞吐量:单位时间完成的任务数(请求数、事务数、查询数……)。
  • 性能计数器:System Load,线程数,进程数,CPU、内存、磁盘、网络使用率等。

Spark 性能优化可以分解为下面几步。

  1. 性能测试,观察 Spark 性能特性和资源(CPU、Memory、Disk、Net)利用情况。
  2. 分析、寻找资源瓶颈。
  3. 分析系统架构、代码,发现资源利用关键所在,思考优化策略。
  4. 代码、架构、基础设施调优,优化、平衡资源利用。
  5. 性能测试,观察系统性能特性,是否达到优化目的,以及寻找下一个瓶颈点。

20 | Spark 的性能优化案例分析(下)

21 | 从阿里内部产品看海量数据处理系统的设计(上):Doris 的立项

22 | 从阿里内部产品看海量数据处理系统的设计(下):架构与创新

23 | 大数据基准测试可以带来什么好处?

大数据基准测试工具:

HiBench

24 丨从大数据性能测试工具 Dew 看如何快速开发大数据系统

25 | 模块答疑:我能从大厂的大数据开发实践中学到什么?

学习层次

  1. 练习
  2. 应用
  3. 开发

模块四、大数据平台与系统集成

26 | 互联网产品 + 大数据产品 = 大数据平台

  • 数据采集:数据库同步通常用 Sqoop,日志同步可以选择 Flume,打点采集的数据经过格式化转换后通过 Kafka 等消息队列进行传递
  • 数据处理:离线计算:MapReduce、Hive、Spark;实时计算:Storm、Spark Streaming、Flink
  • 数据展示:Lambda 架构

27 | 大数据从哪里来?

大数据平台的数据来源主要有数据库、日志、前端程序埋点、爬虫系统。

  • 数据库导入
    • Sqoop:Sqoop 是一个数据库批量导入导出工具,可以将关系数据库的数据批量导入到 Hadoop,也可以将 Hadoop 的数据导出到关系数据库。
    • Canal:Canal 是阿里巴巴开源的一个 MySQL binlog 获取工具,binlog 是 MySQL 的事务日志,可用于 MySQL 数据库主从复制,Canal 将自己伪装成 MySQL 从库,从 MySQL 获取 binlog。
  • 日志文件导入
    • Flume:Flume 是大数据日志收集常用的工具。
  • 前端程序埋点
    • 手动埋点
    • 自动埋点
  • 爬虫

28 | 知名大厂如何搭建大数据平台?

淘宝大数据平台

美团大数据平台

滴滴大数据平台

29 | 盘点可供中小企业参考的商业大数据平台

大数据解决方案提供商

CDH、TDH

CDH 是一个大数据集成平台,将主流大数据产品都集成到这个平台中,企业可以使用 CDH 一站式部署整个大数据技术栈。从架构分层角度,CDH 可以分为 4 层:系统集成,大数据存储,统一服务,过程、分析与计算。

  1. 系统集成:数据库导入导出用 Sqoop,日志导入导出用 Flume,其他实时数据导入导出用 Kafka。
  2. 大数据存储:文件系统用 HDFS,结构化数据用 Kudu,NoSQL 存储用 HBase,其他还有对象存储。
  3. 统一服务:资源管理用 Yarn,安全管理用 Sentry 和 RecordService 细粒度地管理不同用户数据的访问权限。
  4. 过程、分析与计算:批处理计算用 MapReduce、Spark、Hive、Pig,流计算用 SparkStreaming,快速 SQL 分析用 Impala,搜索服务用 Solr。

大数据云计算服务商

阿里云、亚马逊

大数据 SaaS 服务商

友盟、神策、百度统计

大数据开放平台

30 | 当大数据遇上物联网

  1. 智能网关通过消息队列将数据上传到物联网大数据平台,Storm 等流式计算引擎从消息队列获取数据,对数据的处理分三个方面。数据进行清理转换后写入到大数据存储系统。调用规则和机器学习模型,对上传数据进行计算,如果触发了某种执行规则,就将控制信息通过设备管理服务器下发给智能网关,并进一步控制终端智能设备。
  2. Spark 等离线计算引擎定时对写入存储系统的数据进行批量计算处理,进行全量统计分析和机器学习,并更新机器学习模型。
  3. 应用程序也可以通过设备管理服务器直接发送控制指令给智能网关,控制终端智能设备。

31 | 模块答疑:为什么大数据平台至关重要?

模块五、大数据分析与运营

32 | 互联网运营数据指标与可视化监控

运营常用数据指标

  • 新增用户数
  • 用户留存率
  • 活跃用户数
  • PV(Page View)
  • GMV(Gross Merchandise Volume),即成交总金额
  • 转化率 = 付费用户数 / 总用户数

33 丨一个电商网站订单下降的数据分析案例

34 丨 A-B 测试与灰度发布必知必会

A/B 测试的过程

A/B 测试的系统架构

A/B 测试系统最重要的是能够根据用户 ID(或者设备 ID)将实验配置参数分发给应用程序,应用程序根据配置参数决定给用户展示的界面和执行的业务逻辑

灰度发布

35 丨如何利用大数据成为“增长黑客”?

AARRR 用户增长模型:它描述了用户增长的 5 个关键环节,分别是:获取用户(Acquisition)、提高活跃度(Activation)、提高留存率(Retention)、获取收入(Revenue)和自传播(Refer)。

  • 获取用户:通过各种推广手段,使产品触达用户并吸引用户,让用户访问我们的产品。
  • 提高活跃度:用户访问我们的产品后,如果发现没意思、体验差,就很难再次打开,产品的价值也就无法实现。因此需要结合产品内容、运营活动各种手段吸引用户,提升产品的活跃度。
  • 提高留存率:留住一个老用户的成本远低于获取一个新用户,而真正为产品带来营收利润的通常是老用户,因此需要提高留存率。提高留存率的常用手段有:针对老用户推出各种优惠和活动;建立会员等级体系,注册时间越长等级越高;对于一段时间没有访问的疑似流失用户进行消息短信推送以实现用户挽回等。
  • 获取收入:做企业不是做慈善,开发、运营互联网产品的最终目的还是为了赚钱,即获取收入。互联网产品收入主要有用户付费和广告收入,有些互联网产品看起来是用户付费,但其实主要营收是广告收入,比如淘宝。
  • 自传播:让用户利用利用自己的社交网络进行产品推广就是自传播,几乎所有的互联网产品都有“分享到”这样一个功能按钮,促进用户社交传播。有些产品还会利用“帮我砍价”“帮我抢票”等产品功能推动用户进行分享,实现产品的裂变式传播、病毒式营销。

增长用户的手段主要有:

利用用户画像定位用户群体

  • 通过用户分析挽回用户
  • A/B 测试决定产品功能
  • 大数据反欺诈、反羊毛
  • 用户生命周期管理

36 丨模块答疑:为什么说数据驱动运营?

模块六、大数据算法

37 丨如何对数据进行分类和预测?

KNN 算法:KNN 算法,即 K 近邻(K Nearest Neighbour)算法,是一种基本的分类算法。其主要原理是:对于一个需要分类的数据,将其和一组已经分类标注好的样本集合进行比较,得到距离最近的 K 个样本,K 个样本最多归属的类别,就是这个需要分类数据的类别。

数据的距离:

  • 欧氏距离:计算空间距离
  • 余弦相似度:计算向量的夹角。更关注数据的相似性

文本的特征值:

  • TF-IDF 算法:TF 与 IDF 的乘积就是 TF-IDF。
    • TF 是词频(Term Frequency),表示某个单词在文档中出现的频率,一个单词在一个文档中出现的越频繁,TF 值越高。
    • IDF 是逆文档频率(Inverse Document Frequency),表示这个单词在所有文档中的稀缺程度,越少文档出现这个词,IDF 值越高。

贝叶斯分类:贝叶斯公式是一种基于条件概率的分类算法,如果我们已经知道 A 和 B 的发生概率,并且知道了 B 发生情况下 A 发生的概率,可以用贝叶斯公式计算 A 发生的情况下 B 发生的概率。

38 丨如何发掘数据之间的关系?

搜索排序:Google PageRank 算法

关联分析:

  • Apriori 算法:Apriori 算法极大地降低了需要计算的商品组合数目,这个算法的原理是,如果一个商品组合不满足最小支持度,那么所有包含这个商品组合的其他商品组合也不满足最小支持度。所以从最小商品组合,也就是一件商品开始计算最小支持度,逐渐迭代,进而筛选出所有满足最小支持度的频繁模式。其步骤如下:
    1. 设置最小支持度阈值。
    2. 寻找满足最小支持度的单件商品,也就是单件商品出现在所有订单中的概率不低于最小支持度。
    3. 从第 2 步找到的所有满足最小支持度的单件商品中,进行两两组合,寻找满足最小支持度的两件商品组合,也就是两件商品出现在同一个订单中概率不低于最小支持度。
    4. 从第 3 步找到的所有满足最小支持度的两件商品,以及第 2 步找到的满足最小支持度的单件商品进行组合,寻找满足最小支持度的三件商品组合。
    5. 以此类推,找到所有满足最小支持度的商品组合。

聚类:聚类就是对一批数据进行自动归类。

K-means 算法

39 丨如何预测用户的喜好?

基于人口统计的推荐:基于人口统计的推荐是相对比较简单的一种推荐算法,根据用户的基本信息进行分类,然后将商品推荐给同类用户。

基于产品属性的推荐:基于用户的属性进行分类,然后根据同类用户的行为进行推荐。而基于商品属性的推荐则是将商品的属性进行分类,然后根据用户的历史行为进行推荐。

基于用户的协同过滤推荐:基于用户的协同过滤推荐是根据用户的喜好进行用户分类,常用的就是 KNN 算法,寻找和当前用户喜好最相近的 K 个用户,然后根据这些用户的喜好为当前用户进行推荐。

基于商品的协同过滤推荐:根据用户的喜好对商品进行分类,如果两个商品,喜欢它们的用户具有较高的重叠性,就认为它们的距离相近,划分为同类商品,然后进行推荐

40 丨机器学习的数学原理是什么?

样本:样本就是通常我们常说的“训练数据”,包括输入和结果两部分。

模型:模型就是映射样本输入与样本结果的函数,可能是一个条件概率分布,也可能是一个决策函数。

算法:算法就是要从模型的假设空间中寻找一个最优的函数,使得样本空间的输入 X 经过该函数的映射得到的 f(X),和真实的 Y 值之间的距离最小。这个最优的函数通常没办法直接计算得到,即没有解析解,需要用数值计算的方法不断迭代求解。因此如何寻找到 f 函数的全局最优解,以及使寻找过程尽量高效,就构成了机器学习的算法。

41 丨从感知机到神经网络算法

42 丨模块答疑:软件工程师如何进入人工智能领域?

斯坦福大学的机器学习公开课

参考资料

《机器学习 40 讲》笔记

开篇词 | 打通修炼机器学习的任督二脉

“机器学习”分为 3 个模块

  • 机器学习概观:介绍机器学习中超脱于具体模型和方法之上的一些共性问题
  • 统计学习(频率学派):利用不同的模型去拟合数据背后的规律;用拟合出的规律去推断和预测未知的结果
  • 符号学习(贝叶斯学派):即概率图模型,它计算的是变量间的相关关系,每个遍历的先验分布和大量复杂的积分技巧。

01 丨频率视角下的机器学习

频率学派认为概率是随机事件发生频率的极限值;

频率学派执行参数估计时,视参数为确定取值,视数据为随机变量;

频率学派主要使用最大似然估计法,让数据在给定参数下的似然概率最大化;

频率学派对应机器学习中的统计学习,以经验风险最小化作为模型选择的准则。

02 | 贝叶斯视角下的机器学习

贝叶斯学派认为概率是事件的可信程度或主体对事件的信任程度;

贝叶斯学派执行参数估计时,视参数为随机变量,视数据为确定取值;

贝叶斯学派主要使用最大后验概率法,让参数在先验信息和给定数据下的后验概率最大化;

贝叶斯学派对应机器学习中的概率图模型,可以在模型预测和选择中提供更加完整的信息。

03 丨学什么与怎么学

什么样的问题才能通过机器学习来解决呢?

首先,问题不能是完全随机的,需要具备一定的模式;

其次,问题本身不能通过纯计算的方法解决;

再次,有大量的数据可供使用。

机器学习的任务,就是使用数据计算出与目标函数最接近的假设,或者说拟合出最精确的模型 。

输入特征类型

  • 具体特征(concrete feature)
  • 原始特征(raw feature)
  • 抽象特征(abstract feature)

机器学习方法类型

  • 分类算法(classification)
  • 回归算法(regression)
  • 标注算法(tagging)

如果训练数据中的每组输入都有其对应的输出结果,这类学习任务就是监督学习(supervised learning),对没有输出的数据进行学习则是无监督学习(unsupervised learning)。监督学习具有更好的预测精度,无监督学习则可以发现数据中隐含的结构特性,起到的也是分类的作用,只不过没有给每个类别赋予标签而已。无监督学习可以用于对数据进行聚类或者密度估计,也可以完成异常检测这类监督学习中的预处理操作。直观地看,监督学习适用于预测任务,无监督学习适用于描述任务。

04 丨计算学习理论

Spring Data 综合

Spring Data Repository 抽象的目标是显著减少各种访问持久化存储的样板式代码。

核心概念

Repository 是 Spring Data 的核心接口。此接口主要用作标记接口,以捕获要使用的类型并帮助您发现扩展此接口的接口。CrudRepositoryListCrudRepository 接口为被管理的实体类提供复杂的 CRUD 功能。ListCrudRepository 提供等效方法,但它们返回 List,而 CrudRepository 方法返回 Iterable

CrudRepository 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface CrudRepository<T, ID> extends Repository<T, ID> {

<S extends T> S save(S entity);

Optional<T> findById(ID primaryKey);

Iterable<T> findAll();

long count();

void delete(T entity);

boolean existsById(ID primaryKey);

// … more functionality omitted.
}

Spring Data 项目也提供了一些特定持久化技术的抽象接口,如:JpaRepository 或 MongoRepository。这些接口扩展了 CrudRepository 并暴露了一些持久化技术的底层功能。

除了 CrudRepository 之外,还有一个 PagingAndSortingRepository 接口,它添加了额外的方法来简化对实体的分页访问:

1
2
3
4
5
6
public interface PagingAndSortingRepository<T, ID>  {

Iterable<T> findAll(Sort sort);

Page<T> findAll(Pageable pageable);
}

【示例】要按页面大小 20 访问 User 的第二页,可以执行如下操作

1
2
PagingAndSortingRepository<User, Long> repository = // … get access to a bean
Page<User> users = repository.findAll(PageRequest.of(1, 20));

除了查询方法之外,计数和删除时的查询也是可用的。

【示例】根据姓氏计数

1
2
3
interface UserRepository extends CrudRepository<User, Long> {
long countByLastname(String lastname);
}

【示例】根据姓氏删除

1
2
3
4
5
6
interface UserRepository extends CrudRepository<User, Long> {

long deleteByLastname(String lastname);

List<User> removeByLastname(String lastname);
}

查询方法

使用 Spring Data 对数据库进行查询有以下四步:

  1. 声明一个扩展 Repository 或其子接口的接口,并指定泛型类型(实体类和 ID 类型),如以下示例所示:

    1
    interface PersonRepository extends Repository<Person, Long> { … }
  2. 在接口中声明查询方法

    1
    2
    3
    interface PersonRepository extends Repository<Person, Long> {
    List<Person> findByLastname(String lastname);
    }
  3. 使用 JavaConfigXML 配置为这些接口创建代理实例

    1
    2
    @EnableJpaRepositories
    class Config { … }
  4. 注入 Repository 实例并使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class SomeClient {

    private final PersonRepository repository;

    SomeClient(PersonRepository repository) {
    this.repository = repository;
    }

    void doSomething() {
    List<Person> persons = repository.findByLastname("Matthews");
    }
    }

定义 Repository

首先需要定义一个 Repository 接口,该接口必须扩展 Repository 并且指定泛型类型(实体类和 ID 类型)。如果想为该实体暴露 CRUD 方法,可以扩展 CrudRepository 接口。

微调 Repository 定义

Spring Data 提供了很多种 Repository 以应对不同的需求场景。

CrudRepository 提供了 CRUD 功能。

ListCrudRepositoryCrudRepository 类似,但对于那些返回多个实体的方法,它返回一个 List 而不是 Iterable,这样使用可能更方便。

如果使用响应式框架,可以使用 ReactiveCrudRepositoryRxJava3CrudRepository

CoroutineCrudRepository 支持 Kotlin 的协程特性。

PagingAndSortingRepository 提供了分页、排序功能。

如果不想扩展 Spring Data 接口,还可以使用 @RepositoryDefinition 注释您的 Repository 接口。 扩展一个 CRUD Repository 接口,需要暴露一组完整的方法来操作实体。如果希望对暴露的方法有选择性,可以将要暴露的方法从 CRUD Repository 复制到自定义的 Repository 中。 这样做时,可以更改方法的返回类型。 如果可能,Spring Data 将遵循返回类型。 例如,对于返回多个实体的方法,可以选择 Iterable<T>List<T>Collection<T>VAVR 列表。

自定义基础 Repository 接口,必须用 @NoRepositoryBean 标记。 这可以防止 Spring Data 尝试直接创建它的实例并失败,因为它无法确定该 Repository 的实体,因为它仍然包含一个通用类型变量。

以下示例显示了如何有选择地暴露 CRUD 方法(在本例中为 findById 和 save):

1
2
3
4
5
6
7
8
9
10
11
@NoRepositoryBean
interface MyBaseRepository<T, ID> extends Repository<T, ID> {

Optional<T> findById(ID id);

<S extends T> S save(S entity);
}

interface UserRepository extends MyBaseRepository<User, Long> {
User findByEmailAddress(EmailAddress emailAddress);
}

使用多个 Spring 数据模块

有时,程序中需要使用多个 Spring Data 模块。在这种情况下,必须区分持久化技术。当检测到类路径上有多个 Repository 工厂时,Spring Data 进入严格的配置模式。

如果定义的 Repository 扩展了特定模块中的 Repository,则它是特定 Spring Data 模块的有效候选者。

如果实体类使用了特定模块的类型注解,则它是特定 Spring Data 模块的有效候选者。 Spring Data 模块接受第三方注解(例如 JPA 的 @Entity)或提供自己的注解(例如用于 Spring Data MongoDB 和 Spring Data Elasticsearch 的 @Document)。

以下示例显示了一个使用模块特定接口(在本例中为 JPA)的 Repository:

1
2
3
4
5
6
interface MyRepository extends JpaRepository<User, Long> { }

@NoRepositoryBean
interface MyBaseRepository<T, ID> extends JpaRepository<T, ID> { … }

interface UserRepository extends MyBaseRepository<User, Long> { … }

MyRepository 和 UserRepository 扩展了 JpaRepository。它们是 Spring Data JPA 模块的有效候选者。

以下示例显示了一个使用通用接口的 Repository

1
2
3
4
5
6
interface AmbiguousRepository extends Repository<User, Long> { … }

@NoRepositoryBean
interface MyBaseRepository<T, ID> extends CrudRepository<T, ID> { … }

interface AmbiguousUserRepository extends MyBaseRepository<User, Long> { … }

AmbiguousRepository 和 AmbiguousUserRepository 仅扩展了 Repository 和 CrudRepository。 虽然这在使用唯一的 Spring Data 模块时很好,但是存在多个模块时,无法区分这些 Repository 应该绑定到哪个特定的 Spring Data。

以下示例显示了一个使用带注解的实体类的 Repository

1
2
3
4
5
6
7
8
9
interface PersonRepository extends Repository<Person, Long> { … }

@Entity
class Person { … }

interface UserRepository extends Repository<User, Long> { … }

@Document
class User { … }

PersonRepository 引用 Person,它使用 JPA @Entity 注解进行标记,因此这个 Repository 显然属于 Spring Data JPA。 UserRepository 引用 User,它使用 Spring Data MongoDB 的 @Document 注解进行标记。

以下错误示例显示了一个使用带有混合注解的实体类的 Repository

1
2
3
4
5
6
7
interface JpaPersonRepository extends Repository<Person, Long> { … }

interface MongoDBPersonRepository extends Repository<Person, Long> { … }

@Entity
@Document
class Person { … }

此示例中的实体类同时使用了 JPA 和 Spring Data MongoDB 的注解。示例中定义了两个 Repository:JpaPersonRepository 和 MongoDBPersonRepository。 一个用于 JPA,另一个用于 MongoDB。 Spring Data 不再能够区分 Repository,这会导致未定义的行为。

区分 Repository 的最后一种方法是确定 Repository 扫描 package 的范围。

1
2
3
@EnableJpaRepositories(basePackages = "com.acme.repositories.jpa")
@EnableMongoRepositories(basePackages = "com.acme.repositories.mongo")
class Configuration { … }

定义查询方法

Repository 代理有两种方法可以从方法名称派生特定于存储的查询:

  • 通过直接从方法名称派生查询。
  • 通过使用手动定义的查询。

可用选项取决于实际存储。但是,必须有一个策略来决定创建什么实际查询。

查询策略

以下策略可用于Repository 基础结构来解析查询。 对于 Java 配置,您可以使用 EnableJpaRepositories 注释的 queryLookupStrategy 属性。 特定数据存储可能不支持某些策略。

  • CREATE 尝试从查询方法名称构造特定存储的查询。
  • USE_DECLARED_QUERY 尝试查找已声明的查询,如果找不到则抛出异常。
  • CREATE_IF_NOT_FOUND (默认)结合了 CREATEUSE_DECLARED_QUERY

查询创建

Spring Data 中有一套内置的查询构建器机制,可以自动映射符合命名和参数规则的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
interface PersonRepository extends Repository<Person, Long> {

List<Person> findByEmailAddressAndLastname(EmailAddress emailAddress, String lastname);

// Enables the distinct flag for the query
List<Person> findDistinctPeopleByLastnameOrFirstname(String lastname, String firstname);
List<Person> findPeopleDistinctByLastnameOrFirstname(String lastname, String firstname);

// Enabling ignoring case for an individual property
List<Person> findByLastnameIgnoreCase(String lastname);
// Enabling ignoring case for all suitable properties
List<Person> findByLastnameAndFirstnameAllIgnoreCase(String lastname, String firstname);

// Enabling static ORDER BY for a query
List<Person> findByLastnameOrderByFirstnameAsc(String lastname);
List<Person> findByLastnameOrderByFirstnameDesc(String lastname);
}

解析查询方法名称分为主语和谓语。第一部分 (find…By, exists…By) 定义查询的主语,第二部分构成谓词。 主语可以包含更多的表达。 find(或其他引入关键字)和 By 之间的任何文本都被认为是描述性的,除非使用其中一个结果限制关键字,例如 Distinct 在要创建的查询上设置不同的标志或 Top/First 限制查询结果。

参考:

Spring Data 支持的查询主语关键词

Spring Data 支持的查询谓语关键词

创建 Repository 实例

自定义 Repository 实现

Spring Data 扩展

参考资料

Spring 访问 Redis

简介

Redis 是一个被数百万开发人员用作数据库、缓存、流引擎和消息代理的开源内存数据库。

在 Spring 中,spring-data-redis 项目对访问 Redis 进行了 API 封装,提供了便捷的访问方式。 spring-data-redis

spring-boot 项目中的子模块 spring-boot-starter-data-redis 基于 spring-data-redis 项目,做了二次封装,大大简化了 Redis 的相关配置。

Spring Boot 快速入门

引入依赖

在 pom.xml 中引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

数据源配置

1
2
3
4
spring.redis.database = 0
spring.redis.host = localhost
spring.redis.port = 6379
spring.redis.password =

定义实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

private static final long serialVersionUID = 4142994984277644695L;

private Long id;
private String name;
private Integer age;
private String address;
private String email;

}

定义 CRUD 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.Map;

public interface UserService {

void batchSetUsers(Map<String, User> users);

long count();

User getUser(Long id);

void setUser(User user);

}

创建 CRUD 接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import cn.hutool.core.bean.BeanUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class UserServiceImpl implements UserService {

public static final String DEFAULT_KEY = "spring:tutorial:user";

private final RedisTemplate<String, Object> redisTemplate;

public UserServiceImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

@Override
public void batchSetUsers(Map<String, User> users) {
redisTemplate.opsForHash().putAll(DEFAULT_KEY, users);
}

@Override
public long count() {
return redisTemplate.opsForHash().size(DEFAULT_KEY);
}

@Override
public User getUser(Long id) {
Object obj = redisTemplate.opsForHash().get(DEFAULT_KEY, id.toString());
return BeanUtil.toBean(obj, User.class);
}

@Override
public void setUser(User user) {
redisTemplate.opsForHash().put(DEFAULT_KEY, user.getId().toString(), user);
}

}

创建 Application

创建 Application,实例化一个 RedisTemplate 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@SpringBootApplication
public class RedisQuickstartApplication {

@Autowired
private ObjectMapper objectMapper;

@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
// objectMapper.activateDefaultTyping(new DefaultBaseTypeLimitingValidator(),
// ObjectMapper.DefaultTyping.NON_FINAL);

// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
serializer.setObjectMapper(objectMapper);

RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
// 值采用json序列化
template.setValueSerializer(serializer);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();

return template;
}

public static void main(String[] args) {
SpringApplication.run(RedisQuickstartApplication.class, args);
}

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@SpringBootTest(classes = { RedisQuickstartApplication.class })
public class RedisQuickstartTests {

@Autowired
private UserService userService;

@Test
public void test() {
final long SIZE = 1000L;
Map<String, User> map = new HashMap<>();
for (long i = 0; i < SIZE; i++) {
User user = new User(i, RandomUtil.randomChineseName(),
RandomUtil.randomInt(1, 100),
RandomUtil.randomEnum(Location.class).name(),
RandomUtil.randomEmail());
map.put(String.valueOf(i), user);
}
userService.batchSetUsers(map);
long count = userService.count();
Assertions.assertThat(count).isEqualTo(SIZE);

for (int i = 0; i < 100; i++) {
long id = RandomUtil.randomLong(0, 1000);
User user = userService.getUser(id);
log.info("user-{}: {}", id, user.toString());
}
}

}

示例源码

更多 Spring 访问 Redis 示例请参考:Redis 示例源码

参考资料