Dunwu Blog

大道至简,知易行难

HBase Java API 高级特性之过滤器

HBase 中两种主要的数据读取方法是 get()scan(),它们都支持直接访问数据和通过指定起止 row key 访问数据。此外,可以指定列族、列、时间戳和版本号来进行条件查询。它们的缺点是不支持细粒度的筛选功能。为了弥补这种不足,GetScan 支持通过过滤器(Filter)对 row key、列或列值进行过滤。

HBase 提供了一些内置过滤器,也允许用户通过继承 Filter 类来自定义过滤器。所有的过滤器都在服务端生效,称为 谓词下推。这样可以保证被过滤掉的数据不会被传到客户端。

图片来自 HBase 权威指南

HBase 过滤器层次结构的最底层是 Filter 接口和 FilterBase 抽象类。大部分过滤器都直接继承自 FilterBase

比较过滤器

所有比较过滤器均继承自 CompareFilterCompareFilterFilterBase 多了一个 compare() 方法,它需要传入参数定义比较操作的过程:比较运算符和比较器。

创建一个比较过滤器需要两个参数,分别是比较运算符比较器实例

1
2
3
4
public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {
this.compareOp = compareOp;
this.comparator = comparator;
}

比较运算符

  • LESS (<)
  • LESS_OR_EQUAL (<=)
  • EQUAL (=)
  • NOT_EQUAL (!=)
  • GREATER_OR_EQUAL (>=)
  • GREATER (>)
  • NO_OP (排除所有符合条件的值)

比较运算符均定义在枚举类 CompareOperator

1
2
3
4
5
6
7
8
9
10
@InterfaceAudience.Public
public enum CompareOperator {
LESS,
LESS_OR_EQUAL,
EQUAL,
NOT_EQUAL,
GREATER_OR_EQUAL,
GREATER,
NO_OP,
}

注意:在 1.x 版本的 HBase 中,比较运算符定义在 CompareFilter.CompareOp 枚举类中,但在 2.0 之后这个类就被标识为 @deprecated ,并会在 3.0 移除。所以 2.0 之后版本的 HBase 需要使用 CompareOperator 这个枚举类。

比较器

所有比较器均继承自 ByteArrayComparable 抽象类,常用的有以下几种:

  • BinaryComparator : 使用 Bytes.compareTo(byte [],byte []) 按字典序比较指定的字节数组。
  • BinaryPrefixComparator : 按字典序与指定的字节数组进行比较,但只比较到这个字节数组的长度。
  • RegexStringComparator : 使用给定的正则表达式与指定的字节数组进行比较。仅支持 EQUALNOT_EQUAL 操作。
  • SubStringComparator : 测试给定的子字符串是否出现在指定的字节数组中,比较不区分大小写。仅支持 EQUALNOT_EQUAL 操作。
  • NullComparator :判断给定的值是否为空。
  • BitComparator :按位进行比较。

BinaryPrefixComparatorBinaryComparator 的区别不是很好理解,这里举例说明一下:

在进行 EQUAL 的比较时,如果比较器传入的是 abcd 的字节数组,但是待比较数据是 abcdefgh

  • 如果使用的是 BinaryPrefixComparator 比较器,则比较以 abcd 字节数组的长度为准,即 efgh 不会参与比较,这时候认为 abcdabcdefgh 是满足 EQUAL 条件的;
  • 如果使用的是 BinaryComparator 比较器,则认为其是不相等的。

比较过滤器种类

比较过滤器共有五个(Hbase 1.x 版本和 2.x 版本相同):

  • RowFilter :基于行键来过滤数据;
  • FamilyFilterr :基于列族来过滤数据;
  • QualifierFilterr :基于列限定符(列名)来过滤数据;
  • ValueFilterr :基于单元格 (cell) 的值来过滤数据;
  • DependentColumnFilter :指定一个参考列来过滤其他列的过滤器,过滤的原则是基于参考列的时间戳来进行筛选 。

前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例即可构建,然后通过 setFilter 方法传递给 scan

1
2
3
Filter filter  = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
scan.setFilter(filter);

DependentColumnFilter 的使用稍微复杂一点,这里单独做下说明。

DependentColumnFilter

可以把 DependentColumnFilter 理解为一个 valueFilter 和一个时间戳过滤器的组合DependentColumnFilter 有三个带参构造器,这里选择一个参数最全的进行说明:

1
2
3
DependentColumnFilter(final byte [] family, final byte[] qualifier,
final boolean dropDependentColumn, final CompareOperator op,
final ByteArrayComparable valueComparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropDependentColumn :决定参考列是否被包含在返回结果内,为 true 时表示参考列被返回,为 false 时表示被丢弃
  • op :比较运算符
  • valueComparator :比较器

这里举例进行说明:

1
2
3
4
5
6
DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(
Bytes.toBytes("student"),
Bytes.toBytes("name"),
false,
CompareOperator.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
  • 首先会去查找 student:name 中值以 xiaolan 开头的所有数据获得 参考数据集,这一步等同于 valueFilter 过滤器;
  • 其次再用参考数据集中所有数据的时间戳去检索其他列,获得时间戳相同的其他列的数据作为 结果数据集,这一步等同于时间戳过滤器;
  • 最后如果 dropDependentColumn 为 true,则返回 参考数据集+结果数据集,若为 false,则抛弃参考数据集,只返回 结果数据集

专用过滤器

专用过滤器通常直接继承自 FilterBase,用于更特定的场景。

单列列值过滤器 (SingleColumnValueFilter)

基于某列(参考列)的值决定某行数据是否被过滤。其实例有以下方法:

  • setFilterIfMissing(boolean filterIfMissing) :默认值为 false,即如果该行数据不包含参考列,其依然被包含在最后的结果中;设置为 true 时,则不包含;
  • setLatestVersionOnly(boolean latestVersionOnly) :默认为 true,即只检索参考列的最新版本数据;设置为 false,则检索所有版本数据。
1
2
3
4
5
6
7
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"student".getBytes(),
"name".getBytes(),
CompareOperator.EQUAL,
new SubstringComparator("xiaolan"));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);

单列列值排除器 (SingleColumnValueExcludeFilter)

SingleColumnValueExcludeFilter 继承自上面的 SingleColumnValueFilter,过滤行为与其相反。

行键前缀过滤器 (PrefixFilter)

基于 RowKey 值决定某行数据是否被过滤。

1
2
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(prefixFilter);

列名前缀过滤器 (ColumnPrefixFilter)

基于列限定符(列名)决定某行数据是否被过滤。

1
2
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(columnPrefixFilter);

分页过滤器 (PageFilter)

可以使用这个过滤器实现对结果按行进行分页,创建 PageFilter 实例的时候需要传入每页的行数。

1
2
3
4
public PageFilter(final long pageSize) {
Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
this.pageSize = pageSize;
}

下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:

客户端进行分页查询,需要传递 startRow(起始 RowKey),知道起始 startRow 后,就可以返回对应的 pageSize 行数据。这里唯一的问题就是,对于第一次查询,显然 startRow 就是表格的第一行数据,但是之后第二次、第三次查询我们并不知道 startRow,只能知道上一次查询的最后一条数据的 RowKey(简单称之为 lastRow)。

我们不能将 lastRow 作为新一次查询的 startRow 传入,因为 scan 的查询区间是[startRow,endRow) ,即前开后闭区间,这样 startRow 在新的查询也会被返回,这条数据就重复了。

同时在不使用第三方数据库存储 RowKey 的情况下,我们是无法通过知道 lastRow 的下一个 RowKey 的,因为 RowKey 的设计可能是连续的也有可能是不连续的。

由于 Hbase 的 RowKey 是按照字典序进行排序的。这种情况下,就可以在 lastRow 后面加上 0 ,作为 startRow 传入,因为按照字典序的规则,某个值加上 0 后的新值,在字典序上一定是这个值的下一个值,对于 HBase 来说下一个 RowKey 在字典序上一定也是等于或者大于这个新值的。

所以最后传入 lastRow+0,如果等于这个值的 RowKey 存在就从这个值开始 scan,否则从字典序的下一个 RowKey 开始 scan。

25 个字母以及数字字符,字典排序如下:

1
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分页查询主要实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(15);

int totalRows = 0;
byte[] lastRow = null;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 如果不是首行 则 lastRow + 0
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " +
Bytes.toStringBinary(startRow));
scan.withStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
//最后一页,查询结束
if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);

需要注意的是在多台 Regin Services 上执行分页过滤的时候,由于并行执行的过滤器不能共享它们的状态和边界,所以有可能每个过滤器都会在完成扫描前获取了 PageCount 行的结果,这种情况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。

时间戳过滤器 (TimestampsFilter)

1
2
3
4
List<Long> list = new ArrayList<>();
list.add(1554975573000L);
TimestampsFilter timestampsFilter = new TimestampsFilter(list);
scan.setFilter(timestampsFilter);

首次行键过滤器 (FirstKeyOnlyFilter)

FirstKeyOnlyFilter 只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,通常用于行数统计的场景,因为如果某一行存在,则行中必然至少有一列。

1
2
FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();
scan.set(firstKeyOnlyFilter);

包装过滤器

包装过滤器就是通过包装其他过滤器以实现某些拓展的功能。

SkipFilter 过滤器

SkipFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,则拓展过滤整行数据。下面是一个使用示例:

1
2
3
4
5
// 定义 ValueFilter 过滤器
Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
// 使用 SkipFilter 进行包装
Filter filter2 = new SkipFilter(filter1);

WhileMatchFilter 过滤器

WhileMatchFilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 KeyValue 实例时,WhileMatchFilter 则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("rowKey4")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner1.close();

System.out.println("--------------------");

// 使用 WhileMatchFilter 进行包装
Filter filter2 = new WhileMatchFilter(filter1);

scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0
rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0
rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0
rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0
rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0
--------------------
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0

可以看到被包装后,只返回了 rowKey4 之前的数据。

FilterList

以上都是讲解单个过滤器的作用,当需要多个过滤器共同作用于一次查询的时候,就需要使用 FilterListFilterList 支持通过构造器或者 addFilter 方法传入多个过滤器。

1
2
3
4
5
6
7
8
// 构造器传入
public FilterList(final Operator operator, final List<Filter> filters)
public FilterList(final List<Filter> filters)
public FilterList(final Filter... filters)

// 方法传入
public void addFilter(List<Filter> filters)
public void addFilter(Filter filter)

多个过滤器组合的结果由 operator 参数定义 ,其可选参数定义在 Operator 枚举类中。只有 MUST_PASS_ALLMUST_PASS_ONE 两个可选的值:

  • MUST_PASS_ALL :相当于 AND,必须所有的过滤器都通过才认为通过;
  • MUST_PASS_ONE :相当于 OR,只有要一个过滤器通过则认为通过。
1
2
3
4
5
6
7
@InterfaceAudience.Public
public enum Operator {
/** !AND */
MUST_PASS_ALL,
/** !OR */
MUST_PASS_ONE
}

使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<Filter> filters = new ArrayList<Filter>();

Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("XXX")));
filters.add(filter1);

Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("YYY")));
filters.add(filter2);

Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
new RegexStringComparator("ZZZ"));
filters.add(filter3);

FilterList filterList = new FilterList(filters);

Scan scan = new Scan();
scan.setFilter(filterList);

参考资料

HBase Schema 设计

HBase Schema 设计要素

  • 这个表应该有多少 Column Family
  • Column Family 使用什么数据
  • 每个 Column Family 有有多少列
  • 列名是什么,尽管列名不必在建表时定义,但读写数据是要知道的
  • 单元应该存放什么数据
  • 每个单元存储多少时间版本
  • 行健(rowKey)结构是什么,应该包含什么信息

Row Key 设计

Row Key 的作用

在 HBase 中,所有对表的访问都要通过 Row Key,有三种访问方式:

  • 使用 get 命令,查询指定的 Row Key,即精确查找。
  • 使用 scan 命令,根据 Row Key 进行范围查找。
  • 全表扫描,即直接扫描表中所有行记录。

此外,在 HBase 中,表中的行,是按照 Row Key 的字典序进行排序的。

由此,可见,Row Key 的良好设计对于 HBase CRUD 的性能至关重要。

Row Key 的设计原则

长度原则

RowKey 是一个二进制码流,可以是任意字符串,最大长度为 64kb,实际应用中一般为 10-100byte,以 byte[]形式保存,一般设计成定长。建议越短越好,不要超过 16 个字节,原因如下:

  1. 数据的持久化文件 HFile 中时按照 Key-Value 存储的,如果 RowKey 过长,例如超过 100byte,那么 1000w 行的记录,仅 RowKey 就需占用近 1GB 的空间。这样会极大影响 HFile 的存储效率。
  2. MemStore 会缓存部分数据到内存中,若 RowKey 字段过长,内存的有效利用率就会降低,就不能缓存更多的数据,从而降低检索效率。
  3. 目前操作系统都是 64 位系统,内存 8 字节对齐,控制在 16 字节,8 字节的整数倍利用了操作系统的最佳特性。

唯一原则

必须在设计上保证 RowKey 的唯一性。由于在 HBase 中数据存储是 Key-Value 形式,若向 HBase 中同一张表插入相同 RowKey 的数据,则原先存在的数据会被新的数据覆盖。

排序原则

HBase 的 RowKey 是按照 ASCII 有序排序的,因此我们在设计 RowKey 的时候要充分利用这点。

散列原则

设计的 RowKey 应均匀的分布在各个 HBase 节点上。

热点问题

Region 是在 HBase 集群上分布数据的最小单位。每个 Region 由它所属的表的起始范围来表示(即起始 Row Key 和结束 Row Key)。

如果,Row Key 使用单调递增的整数或时间戳,就会产生一个问题:因为 Hbase 的 Row Key 是就近存储的,这会导致一段时间内大部分读写集中在某一个 Region 或少数 Region 上(根据二八原则,最近产生的数据,往往是读写频率最高的数据),即所谓 热点问题

反转(Reversing)

第一种咱们要分析的方法是反转,顾名思义它就是把固定长度或者数字格式的 RowKey 进行反转,反转分为一般数据反转和时间戳反转,其中以时间戳反转较常见。

  • 反转固定格式的数值 - 以手机号为例,手机号的前缀变化比较少(如 152、185 等),但后半部分变化很多。如果将它反转过来,可以有效地避免热点。不过其缺点就是失去了有序性。
  • 反转时间 - 如果数据访问以查找最近的数据为主,可以将时间戳存储为反向时间戳(例如: timestamp = Long.MAX_VALUE – timestamp),这样有利于扫描最近的数据。

加盐(Salting)

这里的“加盐”与密码学中的“加盐”不是一回事。它是指在 RowKey 的前面增加一些前缀,加盐的前缀种类越多,RowKey 就被打得越散。

需要注意的是分配的随机前缀的种类数量应该和我们想把数据分散到的那些 region 的数量一致。只有这样,加盐之后的 rowkey 才会根据随机生成的前缀分散到各个 region 中,避免了热点现象。

哈希(Hashing)

其实哈希和加盐的适用场景类似,但我们前缀不可以是随机的,因为必须要让客户端能够完整地重构 RowKey。所以一般会拿原 RowKey 或其一部分计算 Hash 值,然后再对 Hash 值做运算作为前缀。

HBase Schema 设计规则

Column Family 设计

HBase 不能很好处理 2 ~ 3 个以上的 Column Family,所以 HBase 表应尽可能减少 Column Family 数。如果可以,请只使用一个列族,只有需要经常执行 Column 范围查询时,才引入多列族。也就是说,尽量避免同时查询多个列族。

  • Column Family 数量多,会影响数据刷新。HBase 的数据刷新是在每个 Region 的基础上完成的。因此,如果一个 Column Family 携带大量导致刷新的数据,那么相邻的列族即使携带的数据量很小,也会被刷新。当存在许多 Column Family 时,刷新交互会导致一堆不必要的 IO。 此外,在表/区域级别的压缩操作也会在每个存储中发生。
  • Column Family 数量多,会影响查找效率。如:Column Family A 有 100 万行,Column Family B 有 10 亿行,那么 Column Family A 的数据可能会分布在很多很多区域(和 RegionServers)。 这会降低 Column Family A 的批量扫描效率。

Column Family 名尽量简短,最好是一个字符。Column Family 会在列限定符中被频繁使用,缩短长度有利于节省空间并提升效率。

Row 设计

HBase 中的 Row 按 Row Key 的字典顺序排序

  • 不要将 Row Key 设计为单调递增的,例如:递增的整数或时间戳

    • 问题:因为 Hbase 的 Row Key 是就近存储的,这样会导致一段时间内大部分写入集中在某一个 Region 上,即所谓热点问题。

    • 解决方法一、加盐:这里的不是指密码学的加盐,而是指将随机分配的前缀添加到行键的开头。这么做是为了避免相同前缀的 Row Key 数据被存储在相邻位置,从而导致热点问题。示例如下:

      • foo0001
        foo0002
        foo0003
        foo0004
        
        改为
        
        a-foo0003
        b-foo0001
        c-foo0003
        c-foo0004
        d-foo0002
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29

        - 解决方法二、Hash:Row Key 的前缀使用 Hash

        - **尽量减少行和列的长度**

        - **反向时间戳**:反向时间戳可以极大地帮助快速找到值的最新版本。

        - **行健不能改变**:唯一可以改变的方式是先删除后插入。

        - **Row Key 和 Column Family**:Row Key 从属于 Column Family,因此,相同的 Row Key 可以存在每一个 Column Family 中而不会出现冲突。

        ### Version 设计

        最大、最小 Row 版本号:表示 HBase 会保留的版本号数的上下限。均可以通过 HColumnDescriptor 对每个列族进行配置

        Row 版本号过大,会大大增加 StoreFile 的大小;所以,最大 Row 版本号应按需设置。HBase 会在主要压缩时,删除多余的版本。

        ### TTL 设计

        Column Family 会设置一个以秒为单位的 TTL,一旦达到 TTL 时,HBase 会自动删除行记录。

        仅包含过期行的存储文件在次要压缩时被删除。 将 hbase.store.delete.expired.storefile 设置为 false 会禁用此功能。将最小版本数设置为 0 以外的值也会禁用此功能。

        在较新版本的 HBase 上,还支持在 Cell 上设置 TTL,与 Column Family 的 TTL 不同的是,单位是毫秒。

        ### Column Family 属性配置

        - HFile 数据块,默认是 64KB,数据库的大小影响数据块索引的大小。数据块大的话一次加载进内存的数据越多,扫描查询效果越好。但是数据块小的话,随机查询性能更好

create ‘mytable’,{NAME => ‘cf1’, BLOCKSIZE => ‘65536’}
复制代码

1
2
3

- 数据块缓存,数据块缓存默认是打开的,如果一些比较少访问的数据可以选择关闭缓存

create ‘mytable’,{NAME => ‘cf1’, BLOCKCACHE => ‘FALSE’}
复制代码

1
2
3

- 数据压缩,压缩会提高磁盘利用率,但是会增加 CPU 的负载,看情况进行控制

create ‘mytable’,{NAME => ‘cf1’, COMPRESSION => ‘SNAPPY’}
复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Hbase 表设计是和需求相关的,但是遵守表设计的一些硬性指标对性能的提升还是很有帮助的,这里整理了一些设计时用到的要点。

## Schema 设计案例

### 案例:日志数据和时序数据

假设采集以下数据

- Hostname
- Timestamp
- Log event
- Value/message

应该如何设计 Row Key?

1TimestampRow Key 头部

如果 Row Key 设计为 `[timestamp][hostname][log-event]` 形式,会出现热点问题。

如果针对时间的扫描很重要,可以采用时间戳分桶策略,即

bucket = timestamp % bucketNum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

计算出桶号后,将 Row Key 指定为:`[bucket][timestamp][hostname][log-event]`

如上所述,要为特定时间范围选择数据,需要对每个桶执行扫描。 例如,100 个桶将在键空间中提供广泛的分布,但需要 100 次扫描才能获取单个时间戳的数据,因此需要权衡取舍。

2)Hostname 在 Row Key 头部

如果主机样本量很大,将 Row Key 设计为 `[hostname][log-event][timestamp]`,这样有利于扫描 hostname。

3Timestamp 还是反向 Timestamp

如果数据访问以查找最近的数据为主,可以将时间戳存储为反向时间戳(例如: `timestamp = Long.MAX_VALUE – timestamp`),这样有利于扫描最近的数据。

4Row Key 是可变长度还是固定长度

拼接 Row Key 的关键字长度不一定是固定的,例如 hostname 有可能很长,也有可能很短。如果想要统一长度,可以参考以下做法:

- 将关键字 Hash 编码:使用某种 Hash 算法计算关键字,并取固定长度的值(例如:8 位或 16 位)。
- 使用数字替代关键字:例如:使用事件类型 Code 替换事件类型;hostname 如果是 IP,可以转换为 long
- 截取关键字:截取后的关键字需要有足够的辨识度,长度大小根据具体情况权衡。

5)时间分片

[hostname][log-event][timestamp1]
[hostname][log-event][timestamp2]
[hostname][log-event][timestamp3]

1
2
3

上面的例子中,每个详细事件都有单独的行键,可以重写如下,即每个时间段存储一次:

[hostname][log-event][timerange]


## 参考资料

- [HBase 官方文档之 HBase and Schema Design](https://hbase.apache.org/book.html#schema)

HBase Java API 基础特性

HBase Client API

HBase Java API 示例

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
public class HBaseUtils {

private static Connection connection;

static {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 如果是集群 则主机名用逗号分隔
configuration.set("hbase.zookeeper.quorum", "hadoop001");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 创建 HBase 表
*
* @param tableName 表名
* @param columnFamilies 列族的数组
*/
public static boolean createTable(String tableName, List<String> columnFamilies) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
columnFamilies.forEach(columnFamily -> {
ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
cfDescriptorBuilder.setMaxVersions(1);
ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build();
tableDescriptor.setColumnFamily(familyDescriptor);
});
admin.createTable(tableDescriptor.build());
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除 hBase 表
*
* @param tableName 表名
*/
public static boolean deleteTable(String tableName) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 删除表前需要先禁用表
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}

/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param qualifier 列标识
* @param value 数据
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
String value) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 插入数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamilyName 列族名
* @param pairList 列标识和值的集合
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 根据 rowKey 获取指定行的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static Result getRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 获取指定行指定列 (cell) 的最新版本的数据
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param columnFamily 列族
* @param qualifier 列标识
*/
public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (!get.isCheckExistenceOnly()) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = table.get(get);
byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
return Bytes.toString(resultValue);
} else {
return null;
}

} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索全表
*
* @param tableName 表名
*/
public static ResultScanner getScanner(String tableName) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}


/**
* 检索表中指定数据
*
* @param tableName 表名
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 检索表中指定数据
*
* @param tableName 表名
* @param startRowKey 起始 RowKey
* @param endRowKey 终止 RowKey
* @param filterList 过滤器
*/

public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 删除指定行记录
*
* @param tableName 表名
* @param rowKey 唯一标识
*/
public static boolean deleteRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}


/**
* 删除指定行指定列
*
* @param tableName 表名
* @param rowKey 唯一标识
* @param familyName 列族
* @param qualifier 列标识
*/
public static boolean deleteColumn(String tableName, String rowKey, String familyName,
String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
table.delete(delete);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}

}

数据库连接

在上面的代码中,在类加载时就初始化了 Connection 连接,并且之后的方法都是复用这个 Connection,这时我们可能会考虑是否可以使用自定义连接池来获取更好的性能表现?实际上这是没有必要的。

首先官方对于 Connection 的使用说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Connection Pooling For applications which require high-end multithreaded
access (e.g., web-servers or application servers that may serve many
application threads in a single JVM), you can pre-create a Connection,
as shown in the following example:

对于高并发多线程访问的应用程序(例如,在单个 JVM 中存在的为多个线程服务的 Web 服务器或应用程序服务器),
您只需要预先创建一个 Connection。例子如下:

// Create a connection to the cluster.
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tablename))) {
// use table as needed, the table returned is lightweight
}

之所以能这样使用,这是因为 Connection 并不是一个简单的 socket 连接,接口文档 中对 Connection 的表述是:

1
2
3
4
5
6
7
A cluster connection encapsulating lower level individual connections to actual servers and a
connection to zookeeper. Connections are instantiated through the ConnectionFactory class.
The lifecycle of the connection is managed by the caller, who has to close() the connection
to release the resources.

Connection 是一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与 zookeeper 的连接。
连接通过 ConnectionFactory 类实例化。连接的生命周期由调用者管理,调用者必须使用 close() 关闭连接以释放资源。

之所以封装这些连接,是因为 HBase 客户端需要连接三个不同的服务角色:

  • Zookeeper :主要用于获取 meta 表的位置信息,Master 的信息;
  • HBase Master :主要用于执行 HBaseAdmin 接口的一些操作,例如建表等;
  • HBase RegionServer :用于读、写数据。

Connection 对象和实际的 Socket 连接之间的对应关系如下图:

在 HBase 客户端代码中,真正对应 Socket 连接的是 RpcConnection 对象。HBase 使用 PoolMap 这种数据结构来存储客户端到 HBase 服务器之间的连接。PoolMap 的内部有一个 ConcurrentHashMap 实例,其 key 是 ConnectionId(封装了服务器地址和用户 ticket),value 是一个 RpcConnection 对象的资源池。当 HBase 需要连接一个服务器时,首先会根据 ConnectionId 找到对应的连接池,然后从连接池中取出一个连接对象。

1
2
3
4
5
6
7
8
9
10
11
12
@InterfaceAudience.Private
public class PoolMap<K, V> implements Map<K, V> {
private PoolType poolType;

private int poolMaxSize;

private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();

public PoolMap(PoolType poolType) {
this.poolType = poolType;
}
.....

HBase 中提供了三种资源池的实现,分别是 ReusableRoundRobinThreadLocal。具体实现可以通 hbase.client.ipc.pool.type 配置项指定,默认为 Reusable。连接池的大小也可以通过 hbase.client.ipc.pool.size 配置项指定,默认为 1,即每个 Server 1 个连接。也可以通过修改配置实现:

1
2
3
config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);

由此可以看出 HBase 中 Connection 类已经实现了对连接的管理功能,所以我们不必在 Connection 上在做额外的管理。

另外,Connection 是线程安全的,但 Table 和 Admin 却不是线程安全的,因此正确的做法是一个进程共用一个 Connection 对象,而在不同的线程中使用单独的 Table 和 Admin 对象。Table 和 Admin 的获取操作 getTable()getAdmin() 都是轻量级,所以不必担心性能的消耗,同时建议在使用完成后显示的调用 close() 方法来关闭它们。

概述

HBase 的主要客户端操作是由 org.apache.hadoop.hbase.client.HTable 提供的。创建 HTable 实例非常耗时,所以,建议每个线程只创建一次 HTable 实例。

HBase 所有修改数据的操作都保证了行级别的原子性。要么读到最新的修改,要么等待系统允许写入改行修改

用户要尽量使用批处理(batch)更新来减少单独操作同一行数据的次数

写操作中设计的列的数目并不会影响该行数据的原子性,行原子性会同时保护到所有列

创建 HTable 实例(指的是在 java 中新建该类),每个实例都要扫描.META. 表,以检查该表是否存在,推荐用户只创建一次 HTable 实例,而且是每个线程创建一个

如果用户需要多个 HTable 实例,建议使用 HTablePool 类(类似连接池)

CRUD 操作

put

Table 接口提供了两个 put 方法

1
2
3
4
// 写入单行 put
void put(Put put) throws IOException;
// 批量写入 put
void put(List<Put> puts) throws IOException;

Put 类提供了多种构造器方法用来初始化实例。

Put 类还提供了一系列有用的方法:

多个 add 方法:用于添加指定的列数据。

has 方法:用于检查是否存在特定的单元格,而不需要遍历整个集合

getFamilyMap 方法:可以遍历 Put 实例中每一个可用的 KeyValue 实例

getRow 方法:用于获取 rowkey
Put.heapSize() 可以计算当前 Put 实例所需的堆大小,既包含其中的数据,也包含内部数据结构所需的空间

KeyValue 类

特定单元格的数据以及坐标,坐标包括行键、列族名、列限定符以及时间戳
KeyValue(byte[] row, int roffset, int rlength, byte[] family, int foffoset, int flength, byte[] qualifier, int qoffset, int qlength, long timestamp, Type type, byte[] value, int voffset, int vlength)
每一个字节数组都有一个 offset 参数和一个 length 参数,允许用户提交一个已经存在的字节数组进行字节级别操作。
行目前来说指的是行键,即 Put 构造器里的 row 参数。

客户端的写缓冲区

每一个 put 操作实际上都是一个 RPC 操作,它将客户端数据传送到服务器然后返回。

HBase 的 API 配备了一个客户端的写缓冲区,缓冲区负责收集 put 操作,然后调用 RPC 操作一次性将 put 送往服务器。

1
2
void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()

默认情况下,客户端缓冲区是禁用的。可以通过 table.setAutoFlush(false) 来激活缓冲区。

Put 列表

批量提交 put 列表:

1
void put(List<Put> puts) throws IOException

注意:批量提交可能会有部分修改失败。

原子性操作 compare-and-set

checkAndPut 方法提供了 CAS 机制来保证 put 操作的原子性。

get

1
Result get(Get get) throws IOException
1
2
3
4
Get(byte[] row)
Get(byte[] row, RowLock rowLock)
Get addColumn(byte[] family, byte[] qualifier)
Get addFamily(byte[] family)

Result 类

当用户使用 get() 方法获取数据,HBase 返回的结果包含所有匹配的单元格数据,这些数据被封装在一个 Result 实例中返回给用户。

Result 类提供的方法如下:

1
2
3
4
5
6
7
byte[] getValue(byte[] family, byte[] qualifier)
byte[] value()
byte[] getRow()
int size()
boolean isEmpty()
KeyValue[] raw()
List<KeyValue> list()

delete

1
void delete(Delete delete) throws IOException
1
2
Delte(byte[] row)
Delete(byte[] row, long timestamp, RowLock rowLock)
1
2
3
4
Delete deleteFamily(byte[] family)
Delete deleteFamily(byte[] family, long timestamp)
Delete deleteColumns(byte[] family, byte[] qualifier)
Delete deleteColumn(byte[] family, byte[] qualifier) // 只删除最新版本

批处理操作

Row 是 Put、Get、Delete 的父类。

1
2
void batch(List<Row> actions, Object[] results) throws IOException, InterruptedException
Object batch(List<Row> actions) throws IOException, InterruptedException

行锁

region 服务器提供了行锁特性,这个特性保证了只有一个客户端能获取一行数据相应的锁,同时对该行进行修改。

如果不显示指定锁,服务器会隐式加锁。

扫描

scan,类似数据库系统中的 cursor,利用了 HBase 提供的底层顺序存储的数据结构。

调用 HTable 的 getScanner 就可以返回扫描器

1
2
ResultScanner getScanner(Scan scan) throws IOException
ResultScanner getScanner(byte[] family) throws IOException

Scan 类构造器可以有 startRow,区间一般为 [startRow, stopRow)

1
2
Scan(byte[] startRow, Filter filter)
Scan(byte[] startRow)

ResultScanner

以行为单位进行返回

1
2
3
Result next() throws IOException
Result[] next(int nbRows) throws IOException
void close()

缓存与批量处理

每一个 next()调用都会为每行数据生成一个单独的 RPC 请求

可以设置扫描器缓存

1
2
void setScannerCaching(itn scannerCaching)
int getScannerCaching()

缓存是面向行一级操作,批量是面向列一级操作

1
2
void setBatch(int batch)
int getBatch

RPC 请求的次数=(行数*每行列数)/Min(每行的列数,批量大小)/扫描器缓存

各种特性

Bytes 类提供了一系列将原生 Java 类型和字节数组互转的方法。

参考资料

《大规模数据处理实战》笔记

00 丨开篇词丨从这里开始,带你走上硅谷一线系统架构师之路

01 丨为什么 MapReduce 会被硅谷一线公司淘汰?

高昂的维护成本

时间性能“达不到”用户的期待

02 | MapReduce 后谁主沉浮:怎样设计下一代数据处理技术?

03 | 大规模数据处理初体验:怎样实现大型电商热销榜?

不同量级 TOP K 算法的解决方案不同:

小规模:Hash 即可

大规模:由于单机的处理量不足以处理全量数据,势必分而治之:分片统计,然后聚合(即先 map 后 reduce)

04 丨分布式系统(上):学会用服务等级协议 SLA 来评估你的系统

SLA(Service-Level Agreement),也就是服务等级协议,指的是系统服务提供者(Provider)对客户(Customer)的一个服务承诺。

可用性:大厂一般要求可用性至少达到四个 9(即 99.99%)

准确性:准确率= 正确的有效请求数 / 有效的总请求数

系统容量:通常通过 QPS (Queries Per Second)来衡量

延迟:请求和响应的时间间隔

05 丨分布式系统(下):架构师不得不知的三大指标

  • 可扩展性(Scalability)
    • 水平扩展(Horizontal Scaling)
    • 垂直扩展(Vertical Scaling)
  • 一致性(Consistency)
    • 强一致性(Strong Consistency):系统中的某个数据被成功更新后,后续任何对该数据的读取操作都将得到更新
      后的值。所以在任意时刻,同一系统所有节点中的数据是一样的。
    • 弱一致性(Weak Consistency):系统中的某个数据被更新后,后续对该数据的读取操作可能得到更新后的值,
      也可能是更改前的值。但经过“不一致时间窗口”这段时间后,后续对该数据的读取都是更新后的值。
    • 最终一致性(Eventual Consistency):是弱一致性的特殊形式。存储系统保证,在没有新的更新的条件下,最终所有的访问都是最后更新的值。
  • 持久性(Data Durability):意味着数据一旦被成功存储就可以一直继续使用,即使系统中的节点下线、宕机或数据损坏也是如。

06 | 如何区分批处理还是流处理?

  • 无边界数据(Unbounded Data):是一种不断增长,可以说是无限的数据集。
  • 有边界数据(Bounded Data):是一种有限的数据集。
  • 事件时间(Event Time):指的是一个数据实际产生的时间点。
  • 处理时间(Precessing Time):指的是处理数据的系统架构实际接收到这个数据的时间点。
  • 批处理:绝大部分情况下,批处理的输入数据都是有边界数据,同样的,输出结果也一样是有边界数据。所以在批处理中,我们所关心的更多会是数据的事件时间。
    • 应用场景:
      • 日志分析:日志系统是在一定时间段(日,周或年)内收集的,而日志的数据处理分析是在不同的时间内执行,以得出有关系统的一些关键性能指标。
      • 计费应用程序:计费应用程序会计算出一段时间内一项服务的使用程度,并生成计费信息,例如银行在每个月末生成的信用卡还款单。
      • 数据仓库:数据仓库的主要目标是根据收集好的数据事件时间,将数据信息合并为静态快照 (static snapshot),并将它们聚合为每周、每月、每季度的报告等。
  • 流处理:流处理的输入数据基本上都是无边界数据。
    • 应用场景
      • 实时监控:捕获和分析各种来源发布的数据,如传感器,新闻源,点击网页等。
      • 实时商业智能:智能汽车,智能家居,智能病人护理等。
      • 销售终端(POS)系统:像是股票价格的更新,允许用户实时完成付款的系统等。

07 | Workflow 设计模式:让你在大规模数据世界中君临天下

08 | 发布/订阅模式:流处理架构中的瑞士军刀

09 丨 CAP 定理:三选二,架构师必须学会的取舍

10 丨 Lambda 架构:Twitter 亿级实时数据分析架构背后的倚天剑

Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。

11 丨 Kappa 架构:利用 Kafka 锻造的屠龙刀

12 | 我们为什么需要 Spark?

MapReduce 的缺点:

  • 高昂的维护成本
  • 时间性能“达不到”用户的期待
  • MapReduce 模型的抽象层次低
  • 只提供 Map 和 Reduce 两个操作
  • 在 Hadoop 中,每一个 Job 的计算结果都会存储在 HDFS 文件存储系统中,所以每一步计算都要进行硬盘的读取和写入,大大增加了系统的延迟。
  • 只支持批处理

Spark 的优点

  • 性能比 MapReduce 高很多
  • Spark 提供了很多对 RDD 的操作,如 Map、Filter、flatMap、groupByKey 和 Union 等等,极大地提升了对各种复杂场景的支持

13 丨弹性分布式数据集:Spark 大厦的地基(上)

Spark 最基本的数据抽象是弹性分布式数据集(Resilient Distributed Dataset)

RDD 表示已被分区、不可变的,并能够被并行操作的数据集合。

14 丨弹性分布式数据集:Spark 大厦的地基(下)

15 丨 SparkSQL:Spark 数据查询的利器

16 | Spark Streaming:Spark 的实时流计算 API

Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的

17 | Structured Streaming:如何用 DataFrame API 进行实时数据分析?

18 丨 WordCount:从零开始运行你的第一个 Spark 应用

19 丨综合案例实战:处理加州房屋信息,构建线性回归模型

20 丨流处理案例实战:分析纽约市出租车载客信息


读到此处,感觉收获甚少,暂时搁置阅读。

参考资料

《从 0 开始学大数据》笔记

预习模块

01 丨预习 01 丨大数据技术发展史:大数据的前世今生

大数据技术,起源于 Google 在 2004 年前后发表的三篇论文:

Doug Cutting 根据 Google 论文开发了 Hadoop。

大数据处理的主要应用场景包括数据分析、数据挖掘与机器学习。

数据分析主要使用 Hive、Spark SQL 等 SQL 引擎完成;

数据挖掘与机器学习则有专门的机器学习框架 TensorFlow、Mahout 以及 MLlib 等,内置了主要的机器学习和数据挖掘算法。

大数据要存入分布式文件系统(HDFS),要有序调度 MapReduce 和 Spark 作业执行,并能把执行结果写入到各个应用系统的数据库中,还需要有一个大数据平台整合所有这些大数据组件和企业应用系统。

02 丨预习 02 丨大数据应用发展史:从搜索引擎到人工智能

大数据的应用领域:

  • 搜索引擎:GFS 和 MapReduce 开启了超大规模的分布式存储和分布式计算应用。
  • 数据仓库:Hive 实现了用更低廉的价格获得比以往多得多的数据存储与计算能力。
  • 数据挖掘:基于海量数据进行关联分析。应用有:关联推荐、用户画像、关系图谱
  • 机器学习:有了大数据,可以把全部的历史数据都收集起来,统计其规律,进而预测正在发生的事情。

03 丨预习 03 丨大数据应用领域:数据驱动一切

大数据的行业应用:

  • 医疗健康领域
    • 医学影像智能识别
    • 病历大数据智能诊疗
  • 教育领域
    • AI 外语老师
    • 智能解题
  • 社交媒体领域:舆情监控与分析
  • 金融领域:大数据风控
  • 新零售领域:全链路管理
  • 交通领域
    • 实时采集监控数据
    • 判断道路拥堵状态
    • 无人驾驶技术

模块一、Hadoop 大数据原理与架构

04 | 移动计算比移动数据更划算

传统计算模型:输入 -> 计算 -> 输出,面对海量数据(TB 级甚至 PB 级),无法应对。

移动计算将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。

移动计算步骤:

  1. 将待处理的大规模数据存储在服务器集群的所有服务器上,主要使用 HDFS 分布式文件存储系统,将文件分成很多块(Block),以块为单位存储在集群的服务器上。
  2. 大数据引擎根据集群里不同服务器的计算能力,在每台服务器上启动若干分布式任务执行进程,这些进程会等待给它们分配执行任务。
  3. 使用大数据计算框架支持的编程模型进行编程,比如 Hadoop 的 MapReduce 编程模型,或者 Spark 的 RDD 编程模型。应用程序编写好以后,将其打包,MapReduce 和 Spark 都是在 JVM 环境中运行,所以打包出来的是一个 Java 的 JAR 包。
  4. 用 Hadoop 或者 Spark 的启动命令执行这个应用程序的 JAR 包,首先执行引擎会解析程序要处理的数据输入路径,根据输入数据量的大小,将数据分成若干片(Split),每一个数据片都分配给一个任务执行进程去处理。
  5. 任务执行进程收到分配的任务后,检查自己是否有任务对应的程序包,如果没有就去下载程序包,下载以后通过反射的方式加载程序。走到这里,最重要的一步,也就是移动计算就完成了。
  6. 加载程序后,任务执行进程根据分配的数据片的文件地址和数据在文件内的偏移量读取数据,并把数据输入给应用程序相应的方法

05 | 从 RAID 看垂直伸缩到水平伸缩的演化

海量数据存储核心问题

  • 数据存储容量的问题。既然大数据要解决的是数以 PB 计的数据计算问题,而一般的服务器磁盘容量通常 1 ~ 2TB,那么如何存储这么大规模的数据呢?
  • 数据读写速度的问题。一般磁盘的连续读写速度为几十 MB,以这样的速度,几十 PB 的数据恐怕要读写到天荒地老。
  • 数据可靠性的问题。磁盘大约是计算机设备中最易损坏的硬件了,通常情况一块磁盘使用寿命大概是一年,如果磁盘损坏了,数据怎么办?

解决方式是 RAID 技术:

  • 数据存储容量的问题。RAID 使用了 N 块磁盘构成一个存储阵列,如果使用 RAID 5,数据就可以存储在 N-1 块磁盘上,这样将存储空间扩大了 N-1 倍。
  • 数据读写速度的问题。RAID 根据可以使用的磁盘数量,将待写入的数据分成多片,并发同时向多块磁盘进行写入,显然写入的速度可以得到明显提高;同理,读取速度也可以得到明显提高。不过,需要注意的是,由于传统机械磁盘的访问延迟主要来自于寻址时间,数据真正进行读写的时间可能只占据整个数据访问时间的一小部分,所以数据分片后对 N 块磁盘进行并发读写操作并不能将访问速度提高 N 倍。
  • 数据可靠性的问题。使用 RAID 10、RAID 5 或者 RAID 6 方案的时候,由于数据有冗余存储,或者存储校验信息,所以当某块磁盘损坏的时候,可以通过其他磁盘上的数据和校验数据将丢失磁盘上的数据还原。

实现更强的计算能力和更大规模的数据存储有两种思路

  • 垂直伸缩(scaling up),即硬件升级
  • 水平伸缩(scaling out),即分布式系统

注:RAID 技术就是采用了垂直伸缩的方式。

06 | 新技术层出不穷,HDFS 依然是存储的王者

img

HDFS 有两个关键组件:DataNode 和 NameNode:

  • DataNode 负责文件数据的存储和读写操作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得 HDFS 可以在服务器集群规模上实现数据并行访问,极大地提高了访问速度。
  • NameNode 负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名、数据块的 ID 以及存储位置等信息,相当于操作系统中文件分配表(FAT)的角色。

为保证高可用,HDFS 会,会将一个数据块复制为多份(默认为 3 份),并将多份相同的数据块存储在不同的服务器上,甚至不同的机架上。

img

HDFS 故障容错:

  • 数据存储故障容错
    • 对于 DataNode 上的数据块进行计算并存储校验和(CheckSum)
    • 读取数据的时候,重新计算读取出来的数据的校验和,校验和不正确,则抛出异常
    • 发现异常后,从其他 DataNode 读取备份
  • 磁盘故障容错
    • 如果 DataNode 监测到本机磁盘损坏,将该磁盘的所有数据块 ID 报告给 NameNode
    • NameNode 检查这些数据块在哪些 DataNode 上有备份,复制一份到其他 DataNode 上
  • DataNode 故障容错
    • DataNode 会通过心跳和 NameNode 保持通信
    • 如果 DataNode 超时未发送心跳,NameNode 视其为宕机
    • NameNode 查找这个 DataNode 存储的所有数据块,复制一份到其他 DataNode 上
  • NameNode 故障容错
    • 基于 ZooKeeper 实现主从备份
    • 争夺 znode 锁

07 | 为什么说 MapReduce 既是编程模型又是计算框架?

MapReduce 既是编程模型,又是计算框架

MapReduce 编程模型只包含 Map 和 Reduce 两个过程,map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value 集合>;再将这个 <Key, Value 集合> 输入 reduce,经过计算输出零个或多个 <Key, Value> 对。

08 | MapReduce 如何让数据完成一次旅行?

MapReduce 作业启动和运行机制

大数据应用进程。这类进程是启动 MapReduce 程序的主入口,主要是指定 Map 和 Reduce 类、输入输出文件路径等,并提交作业给 Hadoop 集群,也就是下面提到的 JobTracker 进程。这是由用户启动的 MapReduce 程序进程,比如我们上期提到的 WordCount 程序。

JobTracker 进程。这类进程根据要处理的输入数据量,命令下面提到的 TaskTracker 进程启动相应数量的 Map 和 Reduce 进程任务,并管理整个作业生命周期的任务调度和监控。这是 Hadoop 集群的常驻进程,需要注意的是,JobTracker 进程在整个 Hadoop 集群全局唯一。

TaskTracker 进程。这个进程负责启动和管理 Map 进程以及 Reduce 进程。因为需要每个数据块都有对应的 map 函数,TaskTracker 进程通常和 HDFS 的 DataNode 进程启动在同一个服务器。也就是说,Hadoop 集群中绝大多数服务器同时运行 DataNode 进程和 TaskTracker 进程。

MapReduce 数据合并与连接机制

在 map 输出与 reduce 输入之间,MapReduce 计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫 shuffle。分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是 shuffle。

09 | 为什么我们管 Yarn 叫作资源调度框架?

服务器集群资源调度管理和 MapReduce 执行过程耦合在一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使用集群中的资源了。

Yarn 包括两个部分:

ResourceManager 进程负责整个集群的资源调度管理,通常部署在独立的服务器上;

NodeManager 进程负责具体服务器上的资源和任务管理,在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。

Yarn 的工作流程

  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。
  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容器里面启动 MapReduce ApplicationMaster。
  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自己的应用程序申请容器资源。
  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容器中运行,运行的就是 Map 或者 Reduce 任务。
  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。

10 | 模块答疑:我们能从 Hadoop 学到什么?

Hadoop 几个主要产品的架构设计,就会发现它们都有相似性,都是一主多从的架构方案。

  • HDFS,一个 NameNode,多个 DataNode;
  • MapReduce,一个 JobTracker,多个 TaskTracker;
  • Yarn,一个 ResourceManager,多个 NodeManager。

事实上,很多大数据产品都是这样的架构方案:

Storm,一个 Nimbus,多个 Supervisor;

Spark,一个 Master,多个 Slave。

大数据因为要对数据和计算任务进行统一管理,所以和互联网在线应用不同,需要一个全局管理者。一言以蔽之:集中管理,分布存储与计算

模块二、大数据生态体系主要产品原理与架构

11 | Hive 是如何让 MapReduce 实现 SQL 操作的?

Hive 能够直接处理我们输入的 SQL 语句(Hive 的 SQL 语法和数据库标准 SQL 略有不同),调用 MapReduce 计算框架完成数据分析操作。

我们通过 Hive 的 Client(Hive 的命令行工具,JDBC 等)向 Hive 提交 SQL 命令。如果是创建数据表的 DDL(数据定义语言),Hive 就会通过执行引擎 Driver 将数据表的信息记录在 Metastore 元数据组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联 HDFS 文件路径等这些数据库的 Meta 信息(元信息)。

如果我们提交的是查询分析数据的 DQL(数据查询语句),Driver 就会将该语句提交给自己的编译器 Compiler 进行语法分析、语法解析、语法优化等一系列操作,最后生成一个 MapReduce 执行计划。然后根据执行计划生成一个 MapReduce 的作业,提交给 Hadoop MapReduce 计算框架处理。

12 | 我们并没有觉得 MapReduce 速度慢,直到 Spark 出现

RDD 是 Spark 的核心概念,是弹性数据集(Resilient Distributed Datasets)的缩写。RDD 既是 Spark 面向开发者的编程模型,又是 Spark 自身架构的核心元素。

Spark 上编写 WordCount 程序,主要代码只需要三行

1
2
3
4
5
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

MapReduce 针对输入数据,将计算过程分为两个阶段,一个 Map 阶段,一个 Reduce 阶段,可以理解成是面向过程的大数据计算。

而 Spark 则直接针对数据进行编程,将大规模数据集合抽象成一个 RDD 对象,然后在这个 RDD 上进行各种计算处理,得到一个新的 RDD,继续计算处理,直到得到最后的结果数据。所以 Spark 可以理解成是面向对象的大数据计算。

RDD 上定义的函数分两种,一种是转换(transformation)函数,这种函数的返回值还是 RDD;另一种是执行(action)函数,这种函数不再返回 RDD。

RDD 定义了很多转换操作函数,比如有计算 map(func)、过滤 filter(func)、合并数据集 union(otherDataset)、根据 Key 聚合 reduceByKey(func, [numPartitions])、连接数据集 join(otherDataset, [numPartitions])、分组 groupByKey([numPartitions]) 等十几个函数。

13 | 同样的本质,为何 Spark 可以更高效?

Spark 有三个主要特性:RDD 的编程模型更简单,DAG 切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得 Spark 相对 Hadoop MapReduce 可以有更快的执行速度,以及更简单的编程实现。

14 | BigTable 的开源实现:HBase

HBase 可伸缩架构

HBase 的伸缩性主要依赖其可分裂的 HRegion 及可伸缩的分布式文件系统 HDFS 实现。

HRegion 是 HBase 负责数据存储的主要进程,应用程序对数据的读写操作都是通过和 HRegion 通信完成。上面是 HBase 架构图,我们可以看到在 HBase 中,数据以 HRegion 为单位进行管理,也就是说应用程序如果想要访问一个数据,必须先找到 HRegion,然后将数据读写操作提交给 HRegion,由 HRegion 完成存储层面的数据操作。

HRegionServer 是物理服务器,每个 HRegionServer 上可以启动多个 HRegion 实例。当一个 HRegion 中写入的数据太多,达到配置的阈值时,一个 HRegion 会分裂成两个 HRegion,并将 HRegion 在整个集群中进行迁移,以使 HRegionServer 的负载均衡。

每个 HRegion 中存储一段 Key 值区间 [key1, key2) 的数据,所有 HRegion 的信息,包括存储的 Key 值区间、所在 HRegionServer 地址、访问端口号等,都记录在 HMaster 服务器上。为了保证 HMaster 的高可用,HBase 会启动多个 HMaster,并通过 ZooKeeper 选举出一个主服务器。

应用程序通过 ZooKeeper 获得主 HMaster 的地址,输入 Key 值获得这个 Key 所在的 HRegionServer 地址,然后请求 HRegionServer 上的 HRegion,获得所需要的数据。

HRegion 会把数据存储在若干个 HFile 格式的文件中,这些文件使用 HDFS 分布式文件系统存储,在整个集群内分布并高可用。当一个 HRegion 中数据量太多时,这个 HRegion 连同 HFile 会分裂成两个 HRegion,并根据集群中服务器负载进行迁移。如果集群中有新加入的服务器,也就是说有了新的 HRegionServer,由于其负载较低,也会把 HRegion 迁移过去并记录到 HMaster,从而实现 HBase 的线性伸缩。

HBase 可扩展数据模型

支持列族结构的 NoSQL 数据库,在创建表的时候,只需要指定列族的名字,无需指定字段(Column)。那什么时候指定字段呢?可以在数据写入时再指定。通过这种方式, 数据表可以包含数百万的字段,这样就可以随意扩展应用程序的数据结构了。并且这种数据库在查询时也很方便,可以通过指定任意字段名称和值进行查询。

HBase 这种列族的数据结构设计,实际上是把字段的名称和字段的值,以 Key-Value 的方式一起存储在 HBase 中。实际写入的时候,可以随意指定字段名称,即使有几百万个字段也能轻松应对。

HBase 的高性能存储

HBase 使用了一种叫作 LSM 树(Log 结构合并树)的数据结构进行数据存储。数据写入的时候以 Log 方式连续写入,然后异步对磁盘上的多个 LSM 树进行合并。

LSM 树可以看作是一个 N 阶合并树。数据写操作(包括插入、修改、删除)都在内存中进行,并且都会创建一个新记录(修改会记录新的数据值,而删除会记录一个删除标志)。这些数据在内存中仍然还是一棵排序树,当数据量超过设定的内存阈值后,会将这棵排序树和磁盘上最新的排序树合并。当这棵排序树的数据量也超过设定阈值后,会和磁盘上下一级的排序树合并。合并过程中,会用最新更新的数据覆盖旧的数据(或者记录为不同版本)。

15 | 流式计算的代表:Storm、Flink、Spark Streaming

16 | ZooKeeper 是如何保证数据一致性的?

分布式系统中的“脑裂”是指一个系统中的节点被分隔成两个或多个独立的部分,这些部分无法互相通信,导致系统出现不一致性和数据丢失的问题。通常情况下,“脑裂”是由于网络故障、硬件故障或者软件故障等因素导致的。

包括 HDFS 在内的很多大数据技术都选择了使用 ZooKeeper 来解决多台服务器的状态一致性问题。

ZooKeeper 使用了一种叫 ZAB 算法来解决一致性问题。ZAB 可视为 Paxos 算法的一种简化方案。

17 丨模块答疑:这么多技术,到底都能用在什么场景里?

大数据技术在实际部署的时候,通常会部署在同一个集群中,也就是说,在由很多台服务器组成的服务器集群中,某台服务器可能运行着 HDFS 的 DataNode 进程,负责 HDFS 的数据存储;同时也运行着 Yarn 的 NodeManager,负责计算资源的调度管理;而 MapReduce、Spark、Storm、Flink 这些批处理或者流处理大数据计算引擎则通过 Yarn 的调度,运行在 NodeManager 的容器(container)里面。至于 Hive、Spark SQL 这些运行在 MapReduce 或者 Spark 基础上的大数据仓库引擎,在经过自身的执行引擎将 SQL 语句解析成 MapReduce 或者 Spark 的执行计划以后,一样提交给 Yarn 去调度执行。

模块三、大数据开发实践

18 | 如何自己开发一个大数据 SQL 引擎?

19 | Spark 的性能优化案例分析(上)

性能指标:

  • 响应时间:完成一次任务(请求)花费的时间。
  • 并发数:同时处理的任务数(请求数)。
  • 吞吐量:单位时间完成的任务数(请求数、事务数、查询数……)。
  • 性能计数器:System Load,线程数,进程数,CPU、内存、磁盘、网络使用率等。

Spark 性能优化可以分解为下面几步。

  1. 性能测试,观察 Spark 性能特性和资源(CPU、Memory、Disk、Net)利用情况。
  2. 分析、寻找资源瓶颈。
  3. 分析系统架构、代码,发现资源利用关键所在,思考优化策略。
  4. 代码、架构、基础设施调优,优化、平衡资源利用。
  5. 性能测试,观察系统性能特性,是否达到优化目的,以及寻找下一个瓶颈点。

20 | Spark 的性能优化案例分析(下)

21 | 从阿里内部产品看海量数据处理系统的设计(上):Doris 的立项

22 | 从阿里内部产品看海量数据处理系统的设计(下):架构与创新

23 | 大数据基准测试可以带来什么好处?

大数据基准测试工具:

HiBench

24 丨从大数据性能测试工具 Dew 看如何快速开发大数据系统

25 | 模块答疑:我能从大厂的大数据开发实践中学到什么?

学习层次

  1. 练习
  2. 应用
  3. 开发

模块四、大数据平台与系统集成

26 | 互联网产品 + 大数据产品 = 大数据平台

  • 数据采集:数据库同步通常用 Sqoop,日志同步可以选择 Flume,打点采集的数据经过格式化转换后通过 Kafka 等消息队列进行传递
  • 数据处理:离线计算:MapReduce、Hive、Spark;实时计算:Storm、Spark Streaming、Flink
  • 数据展示:Lambda 架构

27 | 大数据从哪里来?

大数据平台的数据来源主要有数据库、日志、前端程序埋点、爬虫系统。

  • 数据库导入
    • Sqoop:Sqoop 是一个数据库批量导入导出工具,可以将关系数据库的数据批量导入到 Hadoop,也可以将 Hadoop 的数据导出到关系数据库。
    • Canal:Canal 是阿里巴巴开源的一个 MySQL binlog 获取工具,binlog 是 MySQL 的事务日志,可用于 MySQL 数据库主从复制,Canal 将自己伪装成 MySQL 从库,从 MySQL 获取 binlog。
  • 日志文件导入
    • Flume:Flume 是大数据日志收集常用的工具。
  • 前端程序埋点
    • 手动埋点
    • 自动埋点
  • 爬虫

28 | 知名大厂如何搭建大数据平台?

淘宝大数据平台

美团大数据平台

滴滴大数据平台

29 | 盘点可供中小企业参考的商业大数据平台

大数据解决方案提供商

CDH、TDH

CDH 是一个大数据集成平台,将主流大数据产品都集成到这个平台中,企业可以使用 CDH 一站式部署整个大数据技术栈。从架构分层角度,CDH 可以分为 4 层:系统集成,大数据存储,统一服务,过程、分析与计算。

  1. 系统集成:数据库导入导出用 Sqoop,日志导入导出用 Flume,其他实时数据导入导出用 Kafka。
  2. 大数据存储:文件系统用 HDFS,结构化数据用 Kudu,NoSQL 存储用 HBase,其他还有对象存储。
  3. 统一服务:资源管理用 Yarn,安全管理用 Sentry 和 RecordService 细粒度地管理不同用户数据的访问权限。
  4. 过程、分析与计算:批处理计算用 MapReduce、Spark、Hive、Pig,流计算用 SparkStreaming,快速 SQL 分析用 Impala,搜索服务用 Solr。

大数据云计算服务商

阿里云、亚马逊

大数据 SaaS 服务商

友盟、神策、百度统计

大数据开放平台

30 | 当大数据遇上物联网

  1. 智能网关通过消息队列将数据上传到物联网大数据平台,Storm 等流式计算引擎从消息队列获取数据,对数据的处理分三个方面。数据进行清理转换后写入到大数据存储系统。调用规则和机器学习模型,对上传数据进行计算,如果触发了某种执行规则,就将控制信息通过设备管理服务器下发给智能网关,并进一步控制终端智能设备。
  2. Spark 等离线计算引擎定时对写入存储系统的数据进行批量计算处理,进行全量统计分析和机器学习,并更新机器学习模型。
  3. 应用程序也可以通过设备管理服务器直接发送控制指令给智能网关,控制终端智能设备。

31 | 模块答疑:为什么大数据平台至关重要?

模块五、大数据分析与运营

32 | 互联网运营数据指标与可视化监控

运营常用数据指标

  • 新增用户数
  • 用户留存率
  • 活跃用户数
  • PV(Page View)
  • GMV(Gross Merchandise Volume),即成交总金额
  • 转化率 = 付费用户数 / 总用户数

33 丨一个电商网站订单下降的数据分析案例

34 丨 A-B 测试与灰度发布必知必会

A/B 测试的过程

A/B 测试的系统架构

A/B 测试系统最重要的是能够根据用户 ID(或者设备 ID)将实验配置参数分发给应用程序,应用程序根据配置参数决定给用户展示的界面和执行的业务逻辑

灰度发布

35 丨如何利用大数据成为“增长黑客”?

AARRR 用户增长模型:它描述了用户增长的 5 个关键环节,分别是:获取用户(Acquisition)、提高活跃度(Activation)、提高留存率(Retention)、获取收入(Revenue)和自传播(Refer)。

  • 获取用户:通过各种推广手段,使产品触达用户并吸引用户,让用户访问我们的产品。
  • 提高活跃度:用户访问我们的产品后,如果发现没意思、体验差,就很难再次打开,产品的价值也就无法实现。因此需要结合产品内容、运营活动各种手段吸引用户,提升产品的活跃度。
  • 提高留存率:留住一个老用户的成本远低于获取一个新用户,而真正为产品带来营收利润的通常是老用户,因此需要提高留存率。提高留存率的常用手段有:针对老用户推出各种优惠和活动;建立会员等级体系,注册时间越长等级越高;对于一段时间没有访问的疑似流失用户进行消息短信推送以实现用户挽回等。
  • 获取收入:做企业不是做慈善,开发、运营互联网产品的最终目的还是为了赚钱,即获取收入。互联网产品收入主要有用户付费和广告收入,有些互联网产品看起来是用户付费,但其实主要营收是广告收入,比如淘宝。
  • 自传播:让用户利用利用自己的社交网络进行产品推广就是自传播,几乎所有的互联网产品都有“分享到”这样一个功能按钮,促进用户社交传播。有些产品还会利用“帮我砍价”“帮我抢票”等产品功能推动用户进行分享,实现产品的裂变式传播、病毒式营销。

增长用户的手段主要有:

利用用户画像定位用户群体

  • 通过用户分析挽回用户
  • A/B 测试决定产品功能
  • 大数据反欺诈、反羊毛
  • 用户生命周期管理

36 丨模块答疑:为什么说数据驱动运营?

模块六、大数据算法

37 丨如何对数据进行分类和预测?

KNN 算法:KNN 算法,即 K 近邻(K Nearest Neighbour)算法,是一种基本的分类算法。其主要原理是:对于一个需要分类的数据,将其和一组已经分类标注好的样本集合进行比较,得到距离最近的 K 个样本,K 个样本最多归属的类别,就是这个需要分类数据的类别。

数据的距离:

  • 欧氏距离:计算空间距离
  • 余弦相似度:计算向量的夹角。更关注数据的相似性

文本的特征值:

  • TF-IDF 算法:TF 与 IDF 的乘积就是 TF-IDF。
    • TF 是词频(Term Frequency),表示某个单词在文档中出现的频率,一个单词在一个文档中出现的越频繁,TF 值越高。
    • IDF 是逆文档频率(Inverse Document Frequency),表示这个单词在所有文档中的稀缺程度,越少文档出现这个词,IDF 值越高。

贝叶斯分类:贝叶斯公式是一种基于条件概率的分类算法,如果我们已经知道 A 和 B 的发生概率,并且知道了 B 发生情况下 A 发生的概率,可以用贝叶斯公式计算 A 发生的情况下 B 发生的概率。

38 丨如何发掘数据之间的关系?

搜索排序:Google PageRank 算法

关联分析:

  • Apriori 算法:Apriori 算法极大地降低了需要计算的商品组合数目,这个算法的原理是,如果一个商品组合不满足最小支持度,那么所有包含这个商品组合的其他商品组合也不满足最小支持度。所以从最小商品组合,也就是一件商品开始计算最小支持度,逐渐迭代,进而筛选出所有满足最小支持度的频繁模式。其步骤如下:
    1. 设置最小支持度阈值。
    2. 寻找满足最小支持度的单件商品,也就是单件商品出现在所有订单中的概率不低于最小支持度。
    3. 从第 2 步找到的所有满足最小支持度的单件商品中,进行两两组合,寻找满足最小支持度的两件商品组合,也就是两件商品出现在同一个订单中概率不低于最小支持度。
    4. 从第 3 步找到的所有满足最小支持度的两件商品,以及第 2 步找到的满足最小支持度的单件商品进行组合,寻找满足最小支持度的三件商品组合。
    5. 以此类推,找到所有满足最小支持度的商品组合。

聚类:聚类就是对一批数据进行自动归类。

K-means 算法

39 丨如何预测用户的喜好?

基于人口统计的推荐:基于人口统计的推荐是相对比较简单的一种推荐算法,根据用户的基本信息进行分类,然后将商品推荐给同类用户。

基于产品属性的推荐:基于用户的属性进行分类,然后根据同类用户的行为进行推荐。而基于商品属性的推荐则是将商品的属性进行分类,然后根据用户的历史行为进行推荐。

基于用户的协同过滤推荐:基于用户的协同过滤推荐是根据用户的喜好进行用户分类,常用的就是 KNN 算法,寻找和当前用户喜好最相近的 K 个用户,然后根据这些用户的喜好为当前用户进行推荐。

基于商品的协同过滤推荐:根据用户的喜好对商品进行分类,如果两个商品,喜欢它们的用户具有较高的重叠性,就认为它们的距离相近,划分为同类商品,然后进行推荐

40 丨机器学习的数学原理是什么?

样本:样本就是通常我们常说的“训练数据”,包括输入和结果两部分。

模型:模型就是映射样本输入与样本结果的函数,可能是一个条件概率分布,也可能是一个决策函数。

算法:算法就是要从模型的假设空间中寻找一个最优的函数,使得样本空间的输入 X 经过该函数的映射得到的 f(X),和真实的 Y 值之间的距离最小。这个最优的函数通常没办法直接计算得到,即没有解析解,需要用数值计算的方法不断迭代求解。因此如何寻找到 f 函数的全局最优解,以及使寻找过程尽量高效,就构成了机器学习的算法。

41 丨从感知机到神经网络算法

42 丨模块答疑:软件工程师如何进入人工智能领域?

斯坦福大学的机器学习公开课

参考资料

《机器学习 40 讲》笔记

开篇词 | 打通修炼机器学习的任督二脉

“机器学习”分为 3 个模块

  • 机器学习概观:介绍机器学习中超脱于具体模型和方法之上的一些共性问题
  • 统计学习(频率学派):利用不同的模型去拟合数据背后的规律;用拟合出的规律去推断和预测未知的结果
  • 符号学习(贝叶斯学派):即概率图模型,它计算的是变量间的相关关系,每个遍历的先验分布和大量复杂的积分技巧。

01 丨频率视角下的机器学习

频率学派认为概率是随机事件发生频率的极限值;

频率学派执行参数估计时,视参数为确定取值,视数据为随机变量;

频率学派主要使用最大似然估计法,让数据在给定参数下的似然概率最大化;

频率学派对应机器学习中的统计学习,以经验风险最小化作为模型选择的准则。

02 | 贝叶斯视角下的机器学习

贝叶斯学派认为概率是事件的可信程度或主体对事件的信任程度;

贝叶斯学派执行参数估计时,视参数为随机变量,视数据为确定取值;

贝叶斯学派主要使用最大后验概率法,让参数在先验信息和给定数据下的后验概率最大化;

贝叶斯学派对应机器学习中的概率图模型,可以在模型预测和选择中提供更加完整的信息。

03 丨学什么与怎么学

什么样的问题才能通过机器学习来解决呢?

首先,问题不能是完全随机的,需要具备一定的模式;

其次,问题本身不能通过纯计算的方法解决;

再次,有大量的数据可供使用。

机器学习的任务,就是使用数据计算出与目标函数最接近的假设,或者说拟合出最精确的模型 。

输入特征类型

  • 具体特征(concrete feature)
  • 原始特征(raw feature)
  • 抽象特征(abstract feature)

机器学习方法类型

  • 分类算法(classification)
  • 回归算法(regression)
  • 标注算法(tagging)

如果训练数据中的每组输入都有其对应的输出结果,这类学习任务就是监督学习(supervised learning),对没有输出的数据进行学习则是无监督学习(unsupervised learning)。监督学习具有更好的预测精度,无监督学习则可以发现数据中隐含的结构特性,起到的也是分类的作用,只不过没有给每个类别赋予标签而已。无监督学习可以用于对数据进行聚类或者密度估计,也可以完成异常检测这类监督学习中的预处理操作。直观地看,监督学习适用于预测任务,无监督学习适用于描述任务。

04 丨计算学习理论

Spring Data 综合

Spring Data Repository 抽象的目标是显著减少各种访问持久化存储的样板式代码。

核心概念

Repository 是 Spring Data 的核心接口。此接口主要用作标记接口,以捕获要使用的类型并帮助您发现扩展此接口的接口。CrudRepositoryListCrudRepository 接口为被管理的实体类提供复杂的 CRUD 功能。ListCrudRepository 提供等效方法,但它们返回 List,而 CrudRepository 方法返回 Iterable

CrudRepository 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface CrudRepository<T, ID> extends Repository<T, ID> {

<S extends T> S save(S entity);

Optional<T> findById(ID primaryKey);

Iterable<T> findAll();

long count();

void delete(T entity);

boolean existsById(ID primaryKey);

// … more functionality omitted.
}

Spring Data 项目也提供了一些特定持久化技术的抽象接口,如:JpaRepository 或 MongoRepository。这些接口扩展了 CrudRepository 并暴露了一些持久化技术的底层功能。

除了 CrudRepository 之外,还有一个 PagingAndSortingRepository 接口,它添加了额外的方法来简化对实体的分页访问:

1
2
3
4
5
6
public interface PagingAndSortingRepository<T, ID>  {

Iterable<T> findAll(Sort sort);

Page<T> findAll(Pageable pageable);
}

【示例】要按页面大小 20 访问 User 的第二页,可以执行如下操作

1
2
PagingAndSortingRepository<User, Long> repository = // … get access to a bean
Page<User> users = repository.findAll(PageRequest.of(1, 20));

除了查询方法之外,计数和删除时的查询也是可用的。

【示例】根据姓氏计数

1
2
3
interface UserRepository extends CrudRepository<User, Long> {
long countByLastname(String lastname);
}

【示例】根据姓氏删除

1
2
3
4
5
6
interface UserRepository extends CrudRepository<User, Long> {

long deleteByLastname(String lastname);

List<User> removeByLastname(String lastname);
}

查询方法

使用 Spring Data 对数据库进行查询有以下四步:

  1. 声明一个扩展 Repository 或其子接口的接口,并指定泛型类型(实体类和 ID 类型),如以下示例所示:

    1
    interface PersonRepository extends Repository<Person, Long> { … }
  2. 在接口中声明查询方法

    1
    2
    3
    interface PersonRepository extends Repository<Person, Long> {
    List<Person> findByLastname(String lastname);
    }
  3. 使用 JavaConfigXML 配置为这些接口创建代理实例

    1
    2
    @EnableJpaRepositories
    class Config { … }
  4. 注入 Repository 实例并使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class SomeClient {

    private final PersonRepository repository;

    SomeClient(PersonRepository repository) {
    this.repository = repository;
    }

    void doSomething() {
    List<Person> persons = repository.findByLastname("Matthews");
    }
    }

定义 Repository

首先需要定义一个 Repository 接口,该接口必须扩展 Repository 并且指定泛型类型(实体类和 ID 类型)。如果想为该实体暴露 CRUD 方法,可以扩展 CrudRepository 接口。

微调 Repository 定义

Spring Data 提供了很多种 Repository 以应对不同的需求场景。

CrudRepository 提供了 CRUD 功能。

ListCrudRepositoryCrudRepository 类似,但对于那些返回多个实体的方法,它返回一个 List 而不是 Iterable,这样使用可能更方便。

如果使用响应式框架,可以使用 ReactiveCrudRepositoryRxJava3CrudRepository

CoroutineCrudRepository 支持 Kotlin 的协程特性。

PagingAndSortingRepository 提供了分页、排序功能。

如果不想扩展 Spring Data 接口,还可以使用 @RepositoryDefinition 注释您的 Repository 接口。 扩展一个 CRUD Repository 接口,需要暴露一组完整的方法来操作实体。如果希望对暴露的方法有选择性,可以将要暴露的方法从 CRUD Repository 复制到自定义的 Repository 中。 这样做时,可以更改方法的返回类型。 如果可能,Spring Data 将遵循返回类型。 例如,对于返回多个实体的方法,可以选择 Iterable<T>List<T>Collection<T>VAVR 列表。

自定义基础 Repository 接口,必须用 @NoRepositoryBean 标记。 这可以防止 Spring Data 尝试直接创建它的实例并失败,因为它无法确定该 Repository 的实体,因为它仍然包含一个通用类型变量。

以下示例显示了如何有选择地暴露 CRUD 方法(在本例中为 findById 和 save):

1
2
3
4
5
6
7
8
9
10
11
@NoRepositoryBean
interface MyBaseRepository<T, ID> extends Repository<T, ID> {

Optional<T> findById(ID id);

<S extends T> S save(S entity);
}

interface UserRepository extends MyBaseRepository<User, Long> {
User findByEmailAddress(EmailAddress emailAddress);
}

使用多个 Spring 数据模块

有时,程序中需要使用多个 Spring Data 模块。在这种情况下,必须区分持久化技术。当检测到类路径上有多个 Repository 工厂时,Spring Data 进入严格的配置模式。

如果定义的 Repository 扩展了特定模块中的 Repository,则它是特定 Spring Data 模块的有效候选者。

如果实体类使用了特定模块的类型注解,则它是特定 Spring Data 模块的有效候选者。 Spring Data 模块接受第三方注解(例如 JPA 的 @Entity)或提供自己的注解(例如用于 Spring Data MongoDB 和 Spring Data Elasticsearch 的 @Document)。

以下示例显示了一个使用模块特定接口(在本例中为 JPA)的 Repository:

1
2
3
4
5
6
interface MyRepository extends JpaRepository<User, Long> { }

@NoRepositoryBean
interface MyBaseRepository<T, ID> extends JpaRepository<T, ID> { … }

interface UserRepository extends MyBaseRepository<User, Long> { … }

MyRepository 和 UserRepository 扩展了 JpaRepository。它们是 Spring Data JPA 模块的有效候选者。

以下示例显示了一个使用通用接口的 Repository

1
2
3
4
5
6
interface AmbiguousRepository extends Repository<User, Long> { … }

@NoRepositoryBean
interface MyBaseRepository<T, ID> extends CrudRepository<T, ID> { … }

interface AmbiguousUserRepository extends MyBaseRepository<User, Long> { … }

AmbiguousRepository 和 AmbiguousUserRepository 仅扩展了 Repository 和 CrudRepository。 虽然这在使用唯一的 Spring Data 模块时很好,但是存在多个模块时,无法区分这些 Repository 应该绑定到哪个特定的 Spring Data。

以下示例显示了一个使用带注解的实体类的 Repository

1
2
3
4
5
6
7
8
9
interface PersonRepository extends Repository<Person, Long> { … }

@Entity
class Person { … }

interface UserRepository extends Repository<User, Long> { … }

@Document
class User { … }

PersonRepository 引用 Person,它使用 JPA @Entity 注解进行标记,因此这个 Repository 显然属于 Spring Data JPA。 UserRepository 引用 User,它使用 Spring Data MongoDB 的 @Document 注解进行标记。

以下错误示例显示了一个使用带有混合注解的实体类的 Repository

1
2
3
4
5
6
7
interface JpaPersonRepository extends Repository<Person, Long> { … }

interface MongoDBPersonRepository extends Repository<Person, Long> { … }

@Entity
@Document
class Person { … }

此示例中的实体类同时使用了 JPA 和 Spring Data MongoDB 的注解。示例中定义了两个 Repository:JpaPersonRepository 和 MongoDBPersonRepository。 一个用于 JPA,另一个用于 MongoDB。 Spring Data 不再能够区分 Repository,这会导致未定义的行为。

区分 Repository 的最后一种方法是确定 Repository 扫描 package 的范围。

1
2
3
@EnableJpaRepositories(basePackages = "com.acme.repositories.jpa")
@EnableMongoRepositories(basePackages = "com.acme.repositories.mongo")
class Configuration { … }

定义查询方法

Repository 代理有两种方法可以从方法名称派生特定于存储的查询:

  • 通过直接从方法名称派生查询。
  • 通过使用手动定义的查询。

可用选项取决于实际存储。但是,必须有一个策略来决定创建什么实际查询。

查询策略

以下策略可用于Repository 基础结构来解析查询。 对于 Java 配置,您可以使用 EnableJpaRepositories 注释的 queryLookupStrategy 属性。 特定数据存储可能不支持某些策略。

  • CREATE 尝试从查询方法名称构造特定存储的查询。
  • USE_DECLARED_QUERY 尝试查找已声明的查询,如果找不到则抛出异常。
  • CREATE_IF_NOT_FOUND (默认)结合了 CREATEUSE_DECLARED_QUERY

查询创建

Spring Data 中有一套内置的查询构建器机制,可以自动映射符合命名和参数规则的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
interface PersonRepository extends Repository<Person, Long> {

List<Person> findByEmailAddressAndLastname(EmailAddress emailAddress, String lastname);

// Enables the distinct flag for the query
List<Person> findDistinctPeopleByLastnameOrFirstname(String lastname, String firstname);
List<Person> findPeopleDistinctByLastnameOrFirstname(String lastname, String firstname);

// Enabling ignoring case for an individual property
List<Person> findByLastnameIgnoreCase(String lastname);
// Enabling ignoring case for all suitable properties
List<Person> findByLastnameAndFirstnameAllIgnoreCase(String lastname, String firstname);

// Enabling static ORDER BY for a query
List<Person> findByLastnameOrderByFirstnameAsc(String lastname);
List<Person> findByLastnameOrderByFirstnameDesc(String lastname);
}

解析查询方法名称分为主语和谓语。第一部分 (find…By, exists…By) 定义查询的主语,第二部分构成谓词。 主语可以包含更多的表达。 find(或其他引入关键字)和 By 之间的任何文本都被认为是描述性的,除非使用其中一个结果限制关键字,例如 Distinct 在要创建的查询上设置不同的标志或 Top/First 限制查询结果。

参考:

Spring Data 支持的查询主语关键词

Spring Data 支持的查询谓语关键词

创建 Repository 实例

自定义 Repository 实现

Spring Data 扩展

参考资料

Spring 访问 Redis

简介

Redis 是一个被数百万开发人员用作数据库、缓存、流引擎和消息代理的开源内存数据库。

在 Spring 中,spring-data-redis 项目对访问 Redis 进行了 API 封装,提供了便捷的访问方式。 spring-data-redis

spring-boot 项目中的子模块 spring-boot-starter-data-redis 基于 spring-data-redis 项目,做了二次封装,大大简化了 Redis 的相关配置。

Spring Boot 快速入门

引入依赖

在 pom.xml 中引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

数据源配置

1
2
3
4
spring.redis.database = 0
spring.redis.host = localhost
spring.redis.port = 6379
spring.redis.password =

定义实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

private static final long serialVersionUID = 4142994984277644695L;

private Long id;
private String name;
private Integer age;
private String address;
private String email;

}

定义 CRUD 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.Map;

public interface UserService {

void batchSetUsers(Map<String, User> users);

long count();

User getUser(Long id);

void setUser(User user);

}

创建 CRUD 接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import cn.hutool.core.bean.BeanUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class UserServiceImpl implements UserService {

public static final String DEFAULT_KEY = "spring:tutorial:user";

private final RedisTemplate<String, Object> redisTemplate;

public UserServiceImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

@Override
public void batchSetUsers(Map<String, User> users) {
redisTemplate.opsForHash().putAll(DEFAULT_KEY, users);
}

@Override
public long count() {
return redisTemplate.opsForHash().size(DEFAULT_KEY);
}

@Override
public User getUser(Long id) {
Object obj = redisTemplate.opsForHash().get(DEFAULT_KEY, id.toString());
return BeanUtil.toBean(obj, User.class);
}

@Override
public void setUser(User user) {
redisTemplate.opsForHash().put(DEFAULT_KEY, user.getId().toString(), user);
}

}

创建 Application

创建 Application,实例化一个 RedisTemplate 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@SpringBootApplication
public class RedisQuickstartApplication {

@Autowired
private ObjectMapper objectMapper;

@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
// objectMapper.activateDefaultTyping(new DefaultBaseTypeLimitingValidator(),
// ObjectMapper.DefaultTyping.NON_FINAL);

// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
serializer.setObjectMapper(objectMapper);

RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
// 值采用json序列化
template.setValueSerializer(serializer);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();

return template;
}

public static void main(String[] args) {
SpringApplication.run(RedisQuickstartApplication.class, args);
}

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@SpringBootTest(classes = { RedisQuickstartApplication.class })
public class RedisQuickstartTests {

@Autowired
private UserService userService;

@Test
public void test() {
final long SIZE = 1000L;
Map<String, User> map = new HashMap<>();
for (long i = 0; i < SIZE; i++) {
User user = new User(i, RandomUtil.randomChineseName(),
RandomUtil.randomInt(1, 100),
RandomUtil.randomEnum(Location.class).name(),
RandomUtil.randomEmail());
map.put(String.valueOf(i), user);
}
userService.batchSetUsers(map);
long count = userService.count();
Assertions.assertThat(count).isEqualTo(SIZE);

for (int i = 0; i < 100; i++) {
long id = RandomUtil.randomLong(0, 1000);
User user = userService.getUser(id);
log.info("user-{}: {}", id, user.toString());
}
}

}

示例源码

更多 Spring 访问 Redis 示例请参考:Redis 示例源码

参考资料

Spring 应用上下文生命周期

Spring 应用上下文启动准备阶段

AbstractApplicationContext#prepareRefresh() 方法

  • 启动时间 - startupDate
  • 状态标识 - closed(false)、active(true)
  • 初始化 PropertySources - initPropertySources()
  • 检验 Environment 中必须属性
  • 初始化事件监听器集合
  • 初始化早期 Spring 事件集合

BeanFactory 创建阶段

AbstractApplicationContext#obtainFreshBeanFactory() 方法

  • 刷新 Spring 应用上下文底层 BeanFactory - refreshBeanFactory()
    • 销毁或关闭 BeanFactory,如果已存在的话
    • 创建 BeanFactory - createBeanFactory()
    • 设置 BeanFactory Id
    • 设置“是否允许 BeanDefinition 重复定义” - customizeBeanFactory(DefaultListableBeanFactory)
    • 设置“是否允许循环引用(依赖)” - customizeBeanFactory(DefaultListableBeanFactory)
    • 加载 BeanDefinition - loadBeanDefinitions(DefaultListableBeanFactory) 方法
    • 关联新建 BeanFactory 到 Spring 应用上下文
  • 返回 Spring 应用上下文底层 BeanFactory - getBeanFactory()

BeanFactory 准备阶段

AbstractApplicationContext#prepareBeanFactory(ConfigurableListableBeanFactory) 方法

  • 关联 ClassLoader
  • 设置 Bean 表达式处理器
  • 添加 PropertyEditorRegistrar 实现 - ResourceEditorRegistrar
  • 添加 Aware 回调接口 BeanPostProcessor 实现 - ApplicationContextAwareProcessor
  • 忽略 Aware 回调接口作为依赖注入接口
  • 注册 ResolvableDependency 对象 - BeanFactory、ResourceLoader、ApplicationEventPublisher 以及 ApplicationContext
  • 注册 ApplicationListenerDetector 对象
  • 注册 LoadTimeWeaverAwareProcessor 对象
  • 注册单例对象 - Environment、Java System Properties 以及 OS 环境变量

BeanFactory 后置处理阶段

  • AbstractApplicationContext#postProcessBeanFactory(ConfigurableListableBeanFactory) 方法
    • 由子类覆盖该方法
  • AbstractApplicationContext#invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory 方法
    • 调用 BeanFactoryPostProcessor 或 BeanDefinitionRegistry 后置处理方法
    • 注册 LoadTimeWeaverAwareProcessor 对象

BeanFactory 注册 BeanPostProcessor 阶段

AbstractApplicationContext#registerBeanPostProcessors(ConfigurableListableBeanFactory) 方法

  • 注册 PriorityOrdered 类型的 BeanPostProcessor Beans
  • 注册 Ordered 类型的 BeanPostProcessor Beans
  • 注册普通 BeanPostProcessor Beans
  • 注册 MergedBeanDefinitionPostProcessor Beans
  • 注册 ApplicationListenerDetector 对象

初始化內建 Bean:MessageSource

AbstractApplicationContext#initMessageSource() 方法

初始化內建 Bean:Spring 事件广播器

AbstractApplicationContext#initApplicationEventMulticaster() 方法

Spring 应用上下文刷新阶段

AbstractApplicationContext#onRefresh() 方法

子类覆盖该方法

  • org.springframework.web.context.support.AbstractRefreshableWebApplicationContext#onRefresh()
  • org.springframework.web.context.support.GenericWebApplicationContext#onRefresh()
  • org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh()
  • org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext#onRefresh()
  • org.springframework.web.context.support.StaticWebApplicationContext#onRefresh()

Spring 事件监听器注册阶段

AbstractApplicationContext#registerListeners() 方法

  • 添加当前应用上下文所关联的 ApplicationListener 对象(集合)
  • 添加 BeanFactory 所注册 ApplicationListener Beans
  • 广播早期 Spring 事件

BeanFactory 初始化完成阶段

AbstractApplicationContext#finishBeanFactoryInitialization(ConfigurableListableBeanFactory) 方法

  • BeanFactory 关联 ConversionService Bean,如果存在
  • 添加 StringValueResolver 对象
  • 依赖查找 LoadTimeWeaverAware Bean
  • BeanFactory 临时 ClassLoader 置为 null
  • BeanFactory 冻结配置
  • BeanFactory 初始化非延迟单例 Beans

Spring 应用上下刷新完成阶段

AbstractApplicationContext#finishRefresh() 方法

  • 清除 ResourceLoader 缓存 - clearResourceCaches() @since 5.0
  • 初始化 LifecycleProcessor 对象 - initLifecycleProcessor()
  • 调用 LifecycleProcessor#onRefresh() 方法
  • 发布 Spring 应用上下文已刷新事件 - ContextRefreshedEvent
  • 向 MBeanServer 托管 Live Beans

Spring 应用上下文启动阶段

AbstractApplicationContext#start() 方法

  • 启动 LifecycleProcessor
    • 依赖查找 Lifecycle Beans
    • 启动 Lifecycle Beans
  • 发布 Spring 应用上下文已启动事件 - ContextStartedEvent

Spring 应用上下文停止阶段

AbstractApplicationContext#stop() 方法

  • 停止 LifecycleProcessor
    • 依赖查找 Lifecycle Beans
    • 停止 Lifecycle Beans
  • 发布 Spring 应用上下文已停止事件 - ContextStoppedEvent

Spring 应用上下文关闭阶段

AbstractApplicationContext#close() 方法

  • 状态标识:active(false)、closed(true)
  • Live Beans JMX 撤销托管
    • LiveBeansView.unregisterApplicationContext(ConfigurableApplicationContext)
  • 发布 Spring 应用上下文已关闭事件 - ContextClosedEvent
  • 关闭 LifecycleProcessor
    • 依赖查找 Lifecycle Beans
    • 停止 Lifecycle Beans
  • 销毁 Spring Beans
  • 关闭 BeanFactory
  • 回调 onClose()
  • 注册 Shutdown Hook 线程(如果曾注册)

问题

Spring 应用上下文生命周期有哪些阶段

  • 刷新阶段 - ConfigurableApplicationContext#refresh()
  • 启动阶段 - ConfigurableApplicationContext#start()
  • 停止阶段 - ConfigurableApplicationContext#stop()
  • 关闭阶段 - ConfigurableApplicationContext#close()

参考资料