Dunwu Blog

大道至简,知易行难

Flink Table API & SQL

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。

jar 依赖

必要依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

如果你想实现自定义格式或连接器 用于(反)序列化行或一组用户定义的函数,下面的依赖就足够了,编译出来的 jar 文件可以直接给 SQL Client 使用:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

概念与通用 API

Table API 和 SQL 集成在同一套 API 中。 这套 API 的核心概念是Table,用作查询的输入和输出。

Table API 和 SQL 程序的结构

所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;

// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);

// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build())

// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");

// Create a Table object from a Table API query
Table table2 = tableEnv.from("SourceTable");

// Create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");

// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("SinkTable");

创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStreamTable 之间的转换(面向 StreamTableEnvironment )

在 Catalog 中创建表

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。

Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。

查询表

Table API 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

// emit or convert Table
// execute query

SQL 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);

// emit or convert Table
// execute query

输出表

Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink

数据类型

通用类型与(嵌套的)复合类型 (如:POJO、tuples、rows、Scala case 类) 都可以作为行的字段。

复合类型的字段任意的嵌套可被 值访问函数 访问。

通用类型将会被视为一个黑箱,且可以被 用户自定义函数 传递或引用。

SQL

Flink 支持以下语句:

参考资料

LSM 树

什么是 LSM 树

LSM 树具有以下 3 个特点:

  1. 将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并(Merge Trees);
  2. 用批量写入代替随机写入,并且用预写日志 WAL 技术(Write AheadLog,预写日志技术)保证内存数据,在系统崩溃后可以被恢复;
  3. 数据采取类似日志追加写的方式写入(Log Structured)磁盘,以顺序写的方式提高写
    入效率。

LSM 树的这些特点,使得它相对于 B+ 树,在写入性能上有大幅提升。所以,许多 NoSQL 系统都使用 LSM 树作为检索引擎,而且还对 LSM 树进行了优化以提升检索性能。

LSM 树就是根据这个思路设计了这样一个机制:当数据写入时,延迟写磁盘,将数据先存放在内存中的树里,进行常规的存储和查询。当内存中的树持续变大达到阈值时,再批量地以块为单位写入磁盘的树中。因此,LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

如何将内存数据与磁盘数据合并

可以参考两个有序链表归并排序的过程,将 C0 树和 C1 树的所有叶子节点中存储的数据,看作是两个有序链表,那滚动合并问题就变成了我们熟悉的两个有序链表的归并问题。不过由于涉及磁盘操作,那为了提高写入效率和检索效率,我们还需要针对磁盘的特性,在一些归并细节上进行优化。

img

由于磁盘具有顺序读写效率高的特性,因此,为了提高 C1 树中节点的读写性能,除了根节点以外的节点都要尽可能地存放到连续的块中,让它们能作为一个整体单位来读写。这种包含多个节点的块就叫作多页块(Multi-Pages Block)。

第一步,以多页块为单位,将 C1 树的当前叶子节点从前往后读入内存。读入内存的多页块,叫作清空块(Emptying Block),意思是处理完以后会被清空。

第二步,将 C0 树的叶子节点和清空块中的数据进行归并排序,把归并的结果写入内存的一个新块中,叫作填充块(Filling Block)。

第三步,如果填充块写满了,我们就要将填充块作为新的叶节点集合顺序写入磁盘。这个时候,如果 C0 树的叶子节点和清空块都没有遍历完,我们就继续遍历归并,将数据写入新的填充块。如果清空块遍历完了,我们就去 C1 树中顺序读取新的多页块,加载到清空块中。

第四步,重复第三步,直到遍历完 C0 树和 C1 树的所有叶子节点,并将所有的归并结果写入到磁盘。这个时候,我们就可以同时删除 C0 树和 C1 树中被处理过的叶子节点。这样就完成了滚动归并的过程。

img

LSM 树是如何检索

因为同时存在 C0 和 C1 树,所以要查询一个 key 时,我们会先到 C0 树中查询。如果查询到了则直接返回;如过没有查询到,则查询 C1 树。

需要注意一种特殊情况:删除操作。假设某数据在 C0 树中被删除了,但是在 C1 树中仍存在。这此时查询时,可以在 C1 树中查到这个 key,这其实是过期数据了,如何应对这种情况呢?对于被删除的数据,可以将这些数据的 key 插入到 C0 树中,并标记一个删除标志。如果查到了一个带着删除标志的 key,就直接返回查询失败。

为什么需要 LSM 树

在关系型数据库中,通常使用 B+ 树作为索引。B+ 树的数据都存储在叶子节点中,而叶子节点一般都存储在磁盘中。因此,每次插入的新数据都需要随机写入磁盘,而随机写入的性能非常慢。如果是一个日志系统,每秒钟要写入上千条甚至上万条数据,这样的磁盘操作代价会使得系统性能急剧下降,甚至无法使用。

操作系统对磁盘的读写是以块为单位的,我们能否以块为单位写入,而不是每次插入一个数据都要随机写入磁盘呢?这样是不是就可以大幅度减少写入操作了呢?解决方案就是:LSM 树(Log Structured Merge Trees)。

WAL 技术

LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

如果机器断电或系统崩溃了,那内存中还未写入磁盘的数据岂不就永远丢失了?这种情况我们该如何解决呢?

为了保证内存中的数据在系统崩溃后能恢复,可以使用 WAL 技术(Write Ahead Log,预写日志技术)将数据第一时间高效写入磁盘进行备份。

WAL 技术保存和恢复数据的具体步骤如下:

  1. 内存中的程序在处理数据时,会先将对数据的修改作为一条记录,顺序写入磁盘的 log 文件作为备份。由于磁盘文件的顺序追加写入效率很高,因此许多应用场景都可以接受这种备份处理。
  2. 在数据写入 log 文件后,备份就成功了。接下来,该数据就可以长期驻留在内存中了。
  3. 系统会周期性地检查内存中的数据是否都被处理完了(比如,被删除或者写入磁盘),并且生成对应的检查点(Check Point)记录在磁盘中。然后,我们就可以随时删除被处理完的数据了。这样一来,log 文件就不会无限增长了。
  4. 系统崩溃重启,我们只需要从磁盘中读取检查点,就能知道最后一次成功处理的数据在 log 文件中的位置。接下来,我们就可以把这个位置之后未被处理的数据,从 log 文件中读出,然后重新加载到内存中。

img

参考资料

B+树

什么是 B+树

B+树是在二叉查找树的基础上进行了改造:树中的节点并不存储数据本身,而是只是作为索引。每个叶子节点串在一条链表上,链表中的数据是从小到大有序的。

img

改造之后,如果我们要求某个区间的数据。我们只需要拿区间的起始值,在树中进行查找,当查找到某个叶子节点之后,我们再顺着链表往后遍历,直到链表中的结点数据值大于区间的终止值为止。所有遍历到的数据,就是符合区间值的所有数据。

img

但是,我们要为几千万、上亿的数据构建索引,如果将索引存储在内存中,尽管内存访问的速度非常快,查询的效率非常高,但是,占用的内存会非常多。

比如,我们给一亿个数据构建二叉查找树索引,那索引中会包含大约 1 亿个节点,每个节点假设占用 16 个字节,那就需要大约 1GB 的内存空间。给一张表建立索引,我们需要 1GB 的内存空间。如果我们要给 10 张表建立索引,那对内存的需求是无法满足的。如何解决这个索引占用太多内存的问题呢?

我们可以借助时间换空间的思路,把索引存储在硬盘中,而非内存中。我们都知道,硬盘是一个非常慢速的存储设备。通常内存的访问速度是纳秒级别的,而磁盘访问的速度是毫秒级别的。读取同样大小的数据,从磁盘中读取花费的时间,是从内存中读取所花费时间的上万倍,甚至几十万倍。

这种将索引存储在硬盘中的方案,尽管减少了内存消耗,但是在数据查找的过程中,需要读取磁盘中的索引,因此数据查询效率就相应降低很多。

二叉查找树,经过改造之后,支持区间查找的功能就实现了。不过,为了节省内存,如果把树存储在硬盘中,那么每个节点的读取(或者访问),都对应一次磁盘 IO 操作。树的高度就等于每次查询数据时磁盘 IO 操作的次数。

我们前面讲到,比起内存读写操作,磁盘 IO 操作非常耗时,所以我们优化的重点就是尽量减少磁盘 IO 操作,也就是,尽量降低树的高度。那如何降低树的高度呢?

我们来看下,如果我们把索引构建成 m 叉树,高度是不是比二叉树要小呢?如图所示,给 16 个数据构建二叉树索引,树的高度是 4,查找一个数据,就需要 4 个磁盘 IO 操作(如果根节点存储在内存中,其他结点存储在磁盘中),如果对 16 个数据构建五叉树索引,那高度只有 2,查找一个数据,对应只需要 2 次磁盘操作。如果 m 叉树中的 m 是 100,那对一亿个数据构建索引,树的高度也只是 3,最多只要 3 次磁盘 IO 就能获取到数据。磁盘 IO 变少了,查找数据的效率也就提高了。

为什么需要 B+树

关系型数据库中常用 B+ 树作为索引,这是为什么呢?

思考以下经典应用场景

  • 根据某个值查找数据,比如 select * from user where id=1234
  • 根据区间值来查找某些数据,比如 select * from user where id > 1234 and id < 2345

为了提高查询效率,需要使用索引。而对于索引的性能要求,主要考察执行效率和存储空间。如果让你选择一种数据结构去存储索引,你会如何考虑?

以一些常见数据结构为例:

  • 哈希表:哈希表的查询性能很好,时间复杂度是 O(1)。但是,哈希表不能支持按照区间快速查找数据。所以,哈希表不能满足我们的需求。
  • 平衡二叉查找树:尽管平衡二叉查找树查询的性能也很高,时间复杂度是 O(logn)。而且,对树进行中序遍历,我们还可以得到一个从小到大有序的数据序列,但这仍然不足以支持按照区间快速查找数据。
  • 跳表:跳表是在链表之上加上多层索引构成的。它支持快速地插入、查找、删除数据,对应的时间复杂度是 O(logn)。并且,跳表也支持按照区间快速地查找数据。我们只需要定位到区间起点值对应在链表中的结点,然后从这个结点开始,顺序遍历链表,直到区间终点对应的结点为止,这期间遍历得到的数据就是满足区间值的数据。

实际上,数据库索引所用到的数据结构跟跳表非常相似,叫作 B+ 树。不过,它是通过二叉查找树演化过来的,而非跳表。B+树的应用场景

参考资料

字典树

什么是字典树

Trie 树(又叫“前缀树”或“字典树”)是一种用于快速查询“某个字符串/字符前缀”是否存在的数据结构。

  • 根节点(Root)不包含字符,除根节点外的每一个节点都仅包含一个字符;
  • 从根节点到某一节点路径上所经过的字符连接起来,即为该节点对应的字符串;
  • 任意节点的所有子节点所包含的字符都不相同;

img

字典树的构造

img

img

构建 Trie 树的过程,需要扫描所有的字符串,时间复杂度是 O(n)(n 表示所有字符串的长度和)。

字典树非常耗费内存

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

字典树的查找

  1. 每次从根结点开始搜索;
  2. 获取关键词的第一个字符,根据该字符选择对应的子节点,转到该子节点继续检索;
  3. 在相应的子节点上,获取关键词的第二个字符,进一步选择对应的子节点进行检索;
  4. 以此类推,进行迭代过程;
  5. 在某个节点处,关键词的所有字母已被取出,则读取附在该节点上的信息,查找完成。

img

每次查询时,如果要查询的字符串长度是 k,那我们只需要比对大约 k 个节点,就能完成查询操作。跟原本那组字符串的长度和个数没有任何关系。所以说,构建好 Trie 树后,在其中查找字符串的时间复杂度是 O(k),k 表示要查找的字符串的长度。

字典树的应用场景

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

第一,字符串中包含的字符集不能太大。我们前面讲到,如果字符集太大,那存储空间可能就会浪费很多。即便可以优化,但也要付出牺牲查询、插入效率的代价。

第二,要求字符串的前缀重合比较多,不然空间消耗会变大很多。

第三,如果要用 Trie 树解决问题,那我们就要自己从零开始实现一个 Trie 树,还要保证没有 bug,这个在工程上是将简单问题复杂化,除非必须,一般不建议这样做。

第四,我们知道,通过指针串起来的数据块是不连续的,而 Trie 树中用到了指针,所以,对缓存并不友好,性能上会打个折扣。

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

(1)自动补全

img

(2)拼写检查

img

(3)IP 路由 (最长前缀匹配)

img

图 3. 使用 Trie 树的最长前缀匹配算法,Internet 协议(IP)路由中利用转发表选择路径。

(4)T9 (九宫格) 打字预测

img

(5)单词游戏

img

Trie 树可通过剪枝搜索空间来高效解决 Boggle 单词游戏

参考资料

《检索技术核心 20 讲》笔记

伸缩性架构是指不需要改变系统的软硬件设计,仅通过改变部署服务器数量就可以扩大或缩小系统的服务处理能力。

线性结构检索

检索的核心思想:合理组织数据,尽可能快速减少查询范围,可以提升检索效率。

数组和链表的比较

  • 存储方式
    • 数组用 连续 的内存空间来存储数据。
    • 链表用 不连续 的内存空间来存储数据;并通过一个指针按顺序将这些空间串起来,形成一条链。
  • 访问方式
    • 数组支持随机访问。根据下标随机访问的时间复杂度为 O(1)
    • 链表不支持随机访问,只能顺序访问。
  • 空间大小
    • 数组空间大小固定,扩容只能采用复制数组的方式。
    • 链表空间大小不固定,扩容灵活。
  • 效率比较
    • 数组的 查找 效率高于链表。
    • 链表的 添加删除 效率高于数组。

非线性结构检索

  • 对于无序数组,只能顺序查找,其时间复杂度为 O(n)
  • 对于有序数组,可以应用二分查找法,其时间复杂度为 O(log n)

显然,二分查找法很高效,但是它有限制条件:数据有序。为了保证数据有序,添加、删除数组数据时,必须要进行数据调整,来保证其有序。

首先,对于数据频繁变化的应用场景,有序数组并不是最适合的解决方案。我们一般要考虑采用非连续存储的数据结构来灵活调整。同时,为了提高检索效率,我们还要采取合理的组织方式,让这些非连续存储的数据结构能够使用二分查找算法。

数据组织的方式有两种,一种是二叉检索树。一个平衡的二叉检索树使用二分查找的检索效率是 O(log n),但如果我们不做额外的平衡控制的话,二叉检索树的检索性能最差会退化到 O(n),也就和单链表一样了。所以,AVL 树和红黑树这样平衡性更强的二叉检索树,在实际工作中应用更多。

除了树结构以外,另一种数据组织方式是跳表。跳表也具备二分查找的能力,理想跳表的检索效率是 O(log n)。为了保证跳表的检索空间平衡,跳表为每个节点随机生成层级,这样的实现方式比 AVL 树和红黑树更简单。

无论是二叉检索树还是跳表,它们都是通过将数据进行合理组织,然后尽可能地平衡划分检索空间,使得我们能采用二分查找的思路快速地缩减查找范围,达到 O(log n) 的检索效率。

哈希检索

散列表的思路是:使用 Hash 函数将 Key 转换为数组下标。

哈希表的本质是一个数组,它通过 Hash 函数将查询的 Key 转为数组下标,利用数组的随机访问特性,使得我们能在 O(1) 的时间代价内完成检索。

尽管哈希检索没有使用二分查找,但无论是设计理想的哈希函数,还是保证哈希表有足够的空闲位置,包括解决冲突的“二次探查”和“双散列”方案,本质上都是希望数据插入哈希表的时候,分布能均衡,这样检索才能更高效。从这个角度来看,其实哈希检索提高检索效率的原理,和二叉检索树需要平衡左右子树深度的原理是一样的,也就是说,高效的检索需要均匀划分检索空间。

状态检索

在海量数据中,快速判断一个对象是否存在。相比于有序数组、二叉检索树和哈希表这三种方案,位图和布隆过滤器其实更适合解决这类状态检索的问题。这是因为,在不要求 100% 判断正确的情况下,使用位图和布隆过滤器可以达到 O(1) 时间代价的检索效率,同时空间使用率也非常高效。

为了判断一个很大的数据范围中,某数值是否存在,可以将这个范围的数据存为数组,其数组值为布尔型(true 或 false)。由于很多语言中,布尔类型需要 1 个字节,而二进制位(bit)的值 0 或 1 也可以表示 true 或 false,并且占用空间更小,所以更加合适。而这种基于位运算的哈希结构,即为位图。

布隆过滤器最大的特点,就是对一个对象使用多个哈希函数。如果我们使用了 k 个哈希函数,就会得到 k 个哈希值,也就是 k 个下标,我们会把数组中对应下标位置的值都置为 1。布隆过滤器和位图最大的区别就在于,我们不再使用一位来表示一个对象,而是使用 k 位来表示一个对象。这样两个对象的 k 位都相同的概率就会大大降低,从而能够解决哈希冲突的问题了。

布隆过滤器的误判有一个特点,那就是,它只会对存在的情况有误判。如果某个数字经过布隆过滤器判断不存在,那说明这个数字真的不存在,不会发生误判;如果某个数字经过布隆过滤器判断存在,这个时候才会有可能误判,有可能并不存在。不过,只要我们调整哈希函数的个数、位图大小跟要存储数字的个数之间的比例,那就可以将这种误判的概率降到非常低。

布隆过滤器过滤器适用于对误判有一定容忍度的场景。

倒排索引

倒排索引的核心其实并不复杂,它的具体实现其实是哈希表,只是它不是将文档 ID 或者题目作为 key,而是反过来,通过将内容或者属性作为 key 来存储对应的文档列表,使得我们能在 O(1) 的时间代价内完成查询。

尽管原理并不复杂,但是倒排索引是许多检索引擎的核心。比如说,数据库的全文索引功能、搜索引擎的索引、广告引擎和推荐引擎,都使用了倒排索引技术来实现检索功能。

B+ 树检索

内存是半导体元件。对于内存而言,只要给出了内存地址,我们就可以直接访问该地址取出数据。这个过程具有高效的随机访问特性,因此内存也叫随机访问存储器(Random Access Memory,即 RAM)。内存的访问速度很快,但是价格相对较昂贵,因此一般的计算机内存空间都相对较小。

而磁盘是机械器件。磁盘访问数据时,需要等磁盘盘片旋转到磁头下,才能读取相应的数据。尽管磁盘的旋转速度很快,但是和内存的随机访问相比,性能差距非常大。一般来说,如果是随机读写,会有 10 万到 100 万倍左右的差距。但如果是顺序访问大批量数据的话,磁盘的性能和内存就是一个数量级的。

磁盘的最小读写单位是扇区,较早期的磁盘一个扇区是 512 字节。随着磁盘技术的发展,目前常见的磁盘扇区是 4K 个字节。操作系统一次会读写多个扇区,所以操作系统的最小读写单位是块(Block),也叫作簇(Cluster)。当我们要从磁盘中读取一个数据时,操作系统会一次性将整个块都读出来。因此,对于大批量的顺序读写来说,磁盘的效率会比随机读写高许多。

假设有一个有序数组存储在硬盘中,如果它足够大,那么它会存储在多个块中。当我们要对这个数组使用二分查找时,需要先找到中间元素所在的块,将这个块从磁盘中读到内存里,然后在内存中进行二分查找。如果下一步要读的元素在其他块中,则需要再将相应块从磁盘中读入内存。直到查询结束,这个过程可能会多次访问磁盘。我们可以看到,这样的检索性能非常低。

由于磁盘相对于内存而言访问速度实在太慢,因此,对于磁盘上数据的高效检索,我们有一个极其重要的原则:对磁盘的访问次数要尽可能的少!

将索引和数据分离就是一种常见的设计思路。在数据频繁变化的场景中,有序数组并不是一个最好的选择,二叉检索树或者哈希表往往更有普适性。但是,哈希表由于缺乏范围检索的能力,在一些场合也不适用。因此,二叉检索树这种树形结构是许多常见检索系统的实施方案。

随着索引数据越来越大,直到无法完全加载到内存中,这是需要将索引数据也存入磁盘中。B+ 树给出了将树形索引的所有节点都存在磁盘上的高效检索方案。操作系统对磁盘数据的访问是以块为单位的。因此,如果我们想将树型索引的一个节点从磁盘中读出,即使该节点的数据量很小(比如说只有几个字节),但磁盘依然会将整个块的数据全部读出来,而不是只读这一小部分数据,这会让有效读取效率很低。B+ 树的一个关键设计,就是让一个节点的大小等于一个块的大小。节点内存储的数据,不是一个元素,而是一个可以装 m 个元素的有序数组。这样一来,我们就可以将磁盘一次读取的数据全部利用起来,使得读取效率最大化。

B+ 树还有另一个设计,就是将所有的节点分为内部节点和叶子节点。内部节点仅存储 key 和维持树形结构的指针,并不存储 key 对应的数据(无论是具体数据还是文件位置信息)。这样内部节点就能存储更多的索引数据,我们也就可以使用最少的内部节点,将所有数据组织起来了。而叶子节点仅存储 key 和对应数据,不存储维持树形结构的指针。通过这样的设计,B+ 树就能做到节点的空间利用率最大化。此外,B+ 树还将同一层的所有节点串成了有序的双向链表,这样一来,B+ 树就同时具备了良好的范围查询能力和灵活调整的能力了。

因此,B+ 树是一棵完全平衡的 m 阶多叉树。所谓的 m 阶,指的是每个节点最多有 m 个子节点,并且每个节点里都存了一个紧凑的可包含 m 个元素的数组。

即使是复杂的 B+ 树,我们将它拆解开来,其实也是由简单的数组、链表和树组成的,而且 B+ 树的检索过程其实也是二分查找。因此,如果 B+ 树完全加载在内存中的话,它的检索效率其实并不会比有序数组或者二叉检索树更
高,也还是二分查找的 log(n) 的效率。并且,它还比数组和二叉检索树更加复杂,还会带来额外的开销。

另外,这一节还有一个很重要的设计思想需要你掌握,那就是将索引和数据分离。通过这样的方式,我们能将索引的数组大小保持在一个较小的范围内,让它能加载在内存中。在许多大规模系统中,都是使用这个设计思想来精简索引的。而且,B+ 树的内部节点和叶子节点的区分,其实也是索引和数据分离的一次实践。

MySQL 中的 B+ 树实现其实有两种,一种是 MyISAM 引擎,另一种是 InnoDB 引擎。它们的核心区别就在于,数据和索引是否是分离的。

在 MyISAM 引擎中,B+ 树的叶子节点仅存储了数据的位置指针,这是一种索引和数据分离的设计方案,叫作非聚集索引。如果要保证 MyISAM 的数据一致性,那我们需要在表级别上进行加锁处理。

在 InnoDB 中,B+ 树的叶子节点直接存储了具体数据,这是一种索引和数据一体的方案。叫作聚集索引。由于数据直接就存在索引的叶子节点中,因此 InnoDB 不需要给全表加锁来保证一致性,它只需要支持行级的锁就可以了。

LSM 树检索

B+ 树的数据都存储在叶子节点中,而叶子节点一般都存储在磁盘中。因此,每次插入的新数据都需要随机写入磁盘,而随机写入的性能非常慢。如果是一个日志系统,每秒钟要写入上千条甚至上万条数据,这样的磁盘操作代价会使得系统性能急剧下降,甚至无法使用。

操作系统对磁盘的读写是以块为单位的,我们能否以块为单位写入,而不是每次插入一个数据都要随机写入磁盘呢?这样是不是就可以大幅度减少写入操作了呢?解决方案就是:LSM 树(Log Structured Merge Trees)。

LSM 树就是根据这个思路设计了这样一个机制:当数据写入时,延迟写磁盘,将数据先存放在内存中的树里,进行常规的存储和查询。当内存中的树持续变大达到阈值时,再批量地以块为单位写入磁盘的树中。因此,LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

LSM 树具有以下 3 个特点:

  1. 将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并(Merge Trees);
  2. 用批量写入代替随机写入,并且用预写日志 WAL 技术(Write AheadLog,预写日志技术)保证内存数据,在系统崩溃后可以被恢复;
  3. 数据采取类似日志追加写的方式写入(Log Structured)磁盘,以顺序写的方式提高写
    入效率。

LSM 树的这些特点,使得它相对于 B+ 树,在写入性能上有大幅提升。所以,许多 NoSQL 系统都使用 LSM 树作为检索引擎,而且还对 LSM 树进行了优化以提升检索性能。

索引构建

  • 数据压缩:一个是尽可能地将数据加载到内存中,因为内存的检索效率大大高于磁盘。那为了将数据更多地加载到内存中,索引压缩是一个重要的研究方向。
  • 分支处理:另一个是将大数据集合拆成多个小数据集合来处理。这其实就是分布式系统的核心思想。

索引更新

Double Buffer(双缓冲)机制

就是在内存中同时保存两份一样的索引,一个是索引 A,一个是索引 B。两个索引保持一个读、一个写,并且来回切换,最终完成高性能的索引更新。

优点:简单高效

缺点:达到一定数据量级后,会带来翻倍的内存开销,甚至有些索引存储在磁盘上的情况下,更是无法使用此机制。

全量索引和增量索引

将新接收到的数据单独建立一个可以存在内存中的倒排索引,也就是增量索引。当查询发生的时候,我们会同时查询全量索引和增量索引,将合并的结果作为总的结果输出。

因为增量索引相对全量索引而言会小很多,内存资源消耗在可承受范围,所以我们可以使用 Double Buffer 机制
对增量索引进行索引更新。这样一来,增量索引就可以做到无锁访问。而全量索引本身就是只读的,也不需要加锁。因此,整个检索过程都可以做到无锁访问,也就提高了系统的检索效率。

如何处理增量索引空间的持续增长

完全重建法

如果增量索引的增长速度不算很快,或者全量索引重建的代价不大,那么我们完全可以在增量索引写满内存空间之前,完全重建一次全量索引,然后将系统查询切换到新的全量索引上。

再合并法

直接归并全量索引和增量索引,生成一个新的全量索引,这也就避免了从头处理所有文档的重复开销。

滚动合并法

先生成多个不同层级的索引,然后逐层合并。

比如说,一个检索系统在磁盘中保存了全量索引、周级索引和天级索引。所谓周级索引,就
是根据本周的新数据生成的一份索引,那天级索引就是根据每天的新数据生成的一份索引。
在滚动合并法中,当内存中的增量索引增长到一定体量时,我们会用再合并法将它合并到磁
盘上当天的天级索引文件中。

img

索引拆分

水平拆分和垂直拆分

TOP K 检索

TF-IDF 算法

TF-IDF 算法的公式是:相关性 = TF*IDF。其中,TF 是词频(Term Frequency),IDF 是逆文档频率(Inverse Document Frequency)。

  • 词频定义的就是一个词项在文档中出现的次数。换一句话说就是,如果一个词项出现了越多次,那这个词在文档中就越重要。
  • 文档频率(Document Frequency),指的是这个词项出现在了多少个文档中。你也可以理解为,如果一个词出现在越多的文档中,那这个词就越普遍,越没有区分度。一个极端的例子,比如“的”字,它基本上在每个文档中都会出现,所以它的区分度就非常低。
  • 逆文档频率是对文档频率取倒数,它的值越大,这个词的的区分度就越大。

BM25 算法

BM25 算法的一个重要的设计思想是,它认为词频和相关性的关系并不是线性的。也就是说,随着词频的增加,相关性的增加会越来越不明显,并且还会有一个阈值上限。当词频达到阈值以后,那相关性就不会再增长了。

总结来说,BM25 算法就是一个对查询词和文档的相关性进行打分的概率模型算法。BM25 算法考虑了四个因子,分别为 IDF、文档长度、文档中的词频以及查询词中的词频。并且,公式中还加入了 3 个可以人工调整大小的参数,分别是 :k1、k2 和 b。

机器学习打分

机器学习可以更大规模地引入更多的打分因子,并且可以自动学习出各个打分因子的权重。所以,利用机器学习进行相关性打分,已经成了目前大规模检索引擎的标配。

根据打分结果快速 TOP K 检索

完成打分阶段之后,排序阶段我们要重视排序的效率。对于精准 Top K 检索,我们可以使用堆排序来代替全排序,只返回我们认为最重要的 k 个结果。这样,时间代价就是 O(n) + O(k log n) ,在数据量级非常大的情况下,它比 O(n log n) 的检索性能会高得多。

非精准 TOP K 检索

高质量的检索结果并不一定要非常精准,我们只需要保证质量足够高的结果,被包含在最终的 Top K 个结果中就够了。这就是非精准 Top K 检索的思路。

空间检索

通过将二维空间在水平和垂直方向上不停二分,可以生成一维的区域编码,然后我们可以使用一维空间的检索技术对区域编码做好索引。

在需要动态调整查询范围的场景下,对于二进制编码的二维空间的最近邻检索问题,我们可以通过四叉树来完成。四叉树可以很好地快速划分查询空间,并通过递归的方式高效地扩大查询范围。但是满四叉树经常会造成无谓的空间浪费,为了避免这个问题,在实际应用的时候,我们会选择使用非满四叉树来存储和索引编码。对于 GeoHash 编码的二维空间最近邻检索问题,我们也能通过类似的前缀树来提高检索效率。

最近邻检索

如何计算两篇文章的相似性

最常见的方式就是使用向量空间模型(Vector Space Model)。所谓向量空间模型,就是将所有文档中出现过的所有关键词都提取出来。如果一共有 n 个关键词,那每个关键词就是一个维度,这就组成了一个 n 维的向量空间。

存储系统

LevelDB 是由 Google 开源的存储系统。

LevelDB 是基于 LSM 树优化而来的存储系统。LSM 树会将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并。但是,这里面存在着大量的细节问题。

数据在内存中如何高效检索?

首先,对内存中索引的高效检索,我们可以用很多检索技术,如红黑树、跳表等,这些数据结构会比 B+ 树更高效。LevelDB 对于 LSM 树的第一个改进,就是使用跳表代替 B+ 树来实现内存中的 C0 树。

数据是如何高效地从内存转移到磁盘的?

LevelDB 做了读写分离的设计。它将内存中的数据分为两块,一块叫作 MemTable,它是可读可写的。另一块叫作 Immutable MemTable,它是只读的。这两块数据的数据结构完全一样,都是跳表。

当 MemTable 的存储数据达到上限时,我们直接将它切换为只读的 Immutable MemTable,然后重新生成一个新的 MemTable,来支持新数据的写入和查询。这时,将内存索引存储到磁盘的问题,就变成了将 Immutable MemTable 写入磁盘的问题。而且,由于 Immutable MemTable 是只读的,因此,它不需要加锁就可以高效
地写入磁盘中。

数据如何合并

在原始 LSM 树的设计中,内存索引写入磁盘时是直接和磁盘中的 C1 树进行归并的。但如果工程中也这么
实现的话,会有两个很严重的问题:

  • 合并代价很高,因为 C1 树很大,而 C0 树很小,这会导致它们在合并时产生大量的磁盘 IO;
  • 合并频率会很频繁,由于 C0 树很小,很容易被写满,因此系统会频繁进行 C0 树和 C1 树的合并,这样频繁合并会带来的大量磁盘 IO,这更是系统无法承受的。

LevelDB 采用了延迟合并的设计来优化。具体来说就是,先将 Immutable MemTable 顺序快速写入磁盘,直接变成一个个 SSTable(Sorted String Table)文件,之后再对这些 SSTable 文件进行合并。这样就避免了 C0 树和 C1 树昂贵的 合并代价。

而在管理多个 SSTable 文件的环节,LevelDB 使用分层和滚动合并的设计来组织多个 SSTable 文件,避免了 C0 树和 C1 树的合并带来的大量数据被复制的问题。

数据如何检索

先在 MemTable 中查找,如果查找不到再去 Immutable MemTable 中查找。如果 Immutable MemTable 也查询不到,才会到磁盘中去查找。

在磁盘中检索数据的环节,因为 SSTable 文件是有序的,所以我们通过多层二分查找的方式,就能快速定位到需要查询的 SSTable 文件。接着,在 SSTable 文件内查找元素时,LevelDB 先是使用索引与数据分离的设计,减少磁盘 IO,又使用 BloomFilter 和二分查找来完成检索加速。加速检索的过程中,LevelDB 又使用缓存技术,将会被反复读取的数据缓存在内存中,从而避免了磁盘开销。

搜索系统

搜索流程:

  • 先对查询内容分词,搜索引擎还会纠错和相似推荐,得到检索词
  • 根据检索词在倒排索引中进行短语检索。然后,根据相关性打分,将得分高的结果保留。

广告系统

广告引擎处理一个广告请求的过程,本质上就是根据用户的广告请求信息,找出标签匹配的广告设置,并将广告进行排序返回的过程。

  • 在标签检索引擎中,我们通过合理地将标签使用在树形检索 + 倒排索引 + 结果过滤这三个环节,来提高检索效率。
  • 在向量检索引擎中,我们可以使用聚类 + 倒排索引 + 乘积量化的技术来加速检索。
  • 在打分排序环节,增加一个非精准打分环节,这样我们就可以大幅降低使用深度学习模型带来的开销。
  • 在索引构建环节,我们还可以将一些过滤条件前置,仅将当前有效的广告设置加入索引,然后通过全量索引 + 增量索引的更新方式,来保证过滤逻辑的有效。

推荐引擎

相比于搜索引擎和广告引擎,推荐引擎具有更灵活的检索能力,也就是可以使用更灵活的检索技术,来进行文章的召回服务。

参考资料

Elasticsearch 集群

集群

空集群

如果我们启动了一个单独的节点,里面不包含任何的数据和索引,那我们的集群看起来就是一个包含空内容节点的集群。

Figure 1. 包含空内容节点的集群

包含空内容节点的集群

图 1:只有一个空节点的集群

一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。

当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。我们的示例集群就只有一个节点,所以它同时也成为了主节点。

作为用户,我们可以将请求发送到集群中的任何节点,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。

集群健康

Elasticsearch 的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是 集群健康 , 它在 status 字段中展示为 greenyellow 或者 red

1
GET /_cluster/health

在一个不包含任何索引的空集群中,它将会有一个类似于如下所示的返回内容:

1
2
3
4
5
6
7
8
9
10
11
12
{
"cluster_name": "elasticsearch",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 0,
"active_shards": 0,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
}

status 字段指示着当前集群在总体上是否工作正常。它的三种颜色含义如下:

  • **green**:所有的主分片和副本分片都正常运行。
  • **yellow**:所有的主分片都正常运行,但不是所有的副本分片都正常运行。
  • **red**:有主分片没能正常运行。

添加索引

我们往 Elasticsearch 添加数据时需要用到 索引 —— 保存相关数据的地方。索引实际上是指向一个或者多个物理分片的逻辑命名空间 。

一个 分片 是一个底层的 工作单元 ,它仅保存了全部数据中的一部分。现在我们只需知道一个分片是一个 Lucene 的实例,以及它本身就是一个完整的搜索引擎。 我们的文档被存储和索引到分片内,但是应用程序是直接与索引而不是与分片进行交互。

Elasticsearch 是利用分片将数据分发到集群内各处的。分片是数据的容器,文档保存在分片内,分片又被分配到集群内的各个节点里。 当你的集群规模扩大或者缩小时, Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里。

一个分片可以是 分片或者 副本 分片。 索引内任意一个文档都归属于一个主分片,所以主分片的数目决定着索引能够保存的最大数据量。

技术上来说,一个主分片最大能够存储 Integer.MAX_VALUE - 128 个文档,但是实际最大值还需要参考你的使用场景:包括你使用的硬件, 文档的大小和复杂程度,索引和查询文档的方式以及你期望的响应时长。

一个副本分片只是一个主分片的拷贝。副本分片作为硬件故障时保护数据不丢失的冗余备份,并为搜索和返回文档等读操作提供服务。

在索引建立的时候就已经确定了主分片数,但是副本分片数可以随时修改。

让我们在包含一个空节点的集群内创建名为 blogs 的索引。 索引在默认情况下会被分配 5 个主分片, 但是为了演示目的,我们将分配 3 个主分片和一份副本(每个主分片拥有一个副本分片):

1
2
3
4
5
6
7
PUT /blogs
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

我们的集群现在是 _拥有一个索引的单节点集群_。所有 3 个主分片都被分配在 Node 1

Figure 2. 拥有一个索引的单节点集群

拥有一个索引的单节点集群

如果我们现在查看集群健康,我们将看到如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name": "elasticsearch",
"status": "yellow",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 3,
"active_shards": 3,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 3,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50
}
  • 集群 status 值为 yellow
  • 没有被分配到任何节点的副本数

集群的健康状况为 yellow 则表示全部 分片都正常运行(集群可以正常服务所有请求),但是 副本 分片没有全部处在正常状态。 实际上,所有 3 个副本分片都是 unassigned —— 它们都没有被分配到任何节点。 在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点上的所有副本数据。

当前我们的集群是正常运行的,但是在硬件故障时有丢失数据的风险。

添加故障转移

当集群中只有一个节点在运行时,意味着会有一个单点故障问题——没有冗余。 幸运的是,我们只需再启动一个节点即可防止数据丢失。

为了测试第二个节点启动后的情况,你可以在同一个目录内,完全依照启动第一个节点的方式来启动一个新节点(参考安装并运行 Elasticsearch)。多个节点可以共享同一个目录。

当你在同一台机器上启动了第二个节点时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。 但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表。

如果启动了第二个节点,我们的集群将会拥有两个节点的集群——所有主分片和副本分片都已被分配。

Figure 3. 拥有两个节点的集群——所有主分片和副本分片都已被分配

拥有两个节点的集群

当第二个节点加入到集群后,3 个 副本分片 将会分配到这个节点上——每个主分片对应一个副本分片。 这意味着当集群内任何一个节点出现问题时,我们的数据都完好无损。

所有新近被索引的文档都将会保存在主分片上,然后被并行的复制到对应的副本分片上。这就保证了我们既可以从主分片又可以从副本分片上获得文档。

cluster-health 现在展示的状态为 green ,这表示所有 6 个分片(包括 3 个主分片和 3 个副本分片)都在正常运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name": "elasticsearch",
"status": "green",
"timed_out": false,
"number_of_nodes": 2,
"number_of_data_nodes": 2,
"active_primary_shards": 3,
"active_shards": 6,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100
}
  • 集群 status 值为 green

我们的集群现在不仅仅是正常运行的,并且还处于 始终可用 的状态。

水平扩容

怎样为我们的正在增长中的应用程序按需扩容呢? 当启动了第三个节点,我们的集群将拥有三个节点的集群——为了分散负载而对分片进行重新分配。

Figure 4. 拥有三个节点的集群——为了分散负载而对分片进行重新分配

拥有三个节点的集群

Node 1Node 2 上各有一个分片被迁移到了新的 Node 3 节点,现在每个节点上都拥有 2 个分片,而不是之前的 3 个。 这表示每个节点的硬件资源(CPU, RAM, I/O)将被更少的分片所共享,每个分片的性能将会得到提升。

分片是一个功能完整的搜索引擎,它拥有使用一个节点上的所有资源的能力。 我们这个拥有 6 个分片(3 个主分片和 3 个副本分片)的索引可以最大扩容到 6 个节点,每个节点上存在一个分片,并且每个分片拥有所在节点的全部资源。

更多的扩容

但是如果我们想要扩容超过 6 个节点怎么办呢?

主分片的数目在索引创建时就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。(实际大小取决于你的数据、硬件和使用场景。) 但是,读操作——搜索和返回数据——可以同时被主分片 副本分片所处理,所以当你拥有越多的副本分片时,也将拥有越高的吞吐量。

在运行中的集群上是可以动态调整副本分片数目的,我们可以按需伸缩集群。让我们把副本数从默认的 1 增加到 2

1
2
3
4
PUT /blogs/_settings
{
"number_of_replicas" : 2
}

blogs 索引现在拥有 9 个分片:3 个主分片和 6 个副本分片。 这意味着我们可以将集群扩容到 9 个节点,每个节点上一个分片。相比原来 3 个节点时,集群搜索性能可以提升 3 倍。

Figure 5. 将参数 number_of_replicas 调大到 2

拥有2份副本分片3个节点的集群

当然,如果只是在相同节点数目的集群上增加更多的副本分片并不能提高性能,因为每个分片从节点上获得的资源会变少。 你需要增加更多的硬件资源来提升吞吐量。

但是更多的副本分片数提高了数据冗余量:按照上面的节点配置,我们可以在失去 2 个节点的情况下不丢失任何数据。

应对故障

我们之前说过 Elasticsearch 可以应对节点故障,接下来让我们尝试下这个功能。 如果我们关闭第一个节点,这时集群的状态为关闭了一个节点后的集群。

Figure 6. 关闭了一个节点后的集群

关闭了一个节点后的集群

我们关闭的节点是一个主节点。而集群必须拥有一个主节点来保证正常工作,所以发生的第一件事情就是选举一个新的主节点: Node 2

在我们关闭 Node 1 的同时也失去了主分片 12 ,并且在缺失主分片的时候索引也不能正常工作。 如果此时来检查集群的状况,我们看到的状态将会为 red :不是所有主分片都在正常工作。

幸运的是,在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这些分片在 Node 2Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow 。 这个提升主分片的过程是瞬间发生的,如同按下一个开关一般。

为什么我们集群状态是 yellow 而不是 green 呢? 虽然我们拥有所有的三个主分片,但是同时设置了每个主分片需要对应 2 份副本分片,而此时只存在一份副本分片。 所以集群不能为 green 的状态,不过我们不必过于担心:如果我们同样关闭了 Node 2 ,我们的程序 依然 可以保持在不丢任何数据的情况下运行,因为 Node 3 为每一个分片都保留着一份副本。

如果我们重新启动 Node 1 ,集群可以将缺失的副本分片再次进行分配,那么集群的状态也将如 Figure 5. 将参数 number_of_replicas 调大到 2 所示。 如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们,同时仅从主分片复制发生了修改的数据文件。

到目前为止,你应该对分片如何使得 Elasticsearch 进行水平扩容以及数据保障等知识有了一定了解。 接下来我们将讲述关于分片生命周期的更多细节。

分片

  • 为什么搜索是 实时的?
  • 为什么文档的 CRUD (创建-读取-更新-删除) 操作是 实时 的?
  • Elasticsearch 是怎样保证更新被持久化在断电时也不丢失数据?
  • 为什么删除文档不会立刻释放空间?
  • refresh, flush, 和 optimize API 都做了什么, 你什么情况下应该使用他们?

使文本可被搜索

必须解决的第一个挑战是如何使文本可被搜索。 传统的数据库每个字段存储单个值,但这对全文检索并不够。文本字段中的每个单词需要被搜索,对数据库意味着需要单个字段有索引多值(这里指单词)的能力。

最好的支持 一个字段多个值 需求的数据结构是我们在 倒排索引 章节中介绍过的 倒排索引 。 倒排索引包含一个有序列表,列表包含所有文档出现过的不重复个体,或称为 词项 ,对于每一个词项,包含了它所有曾出现过文档的列表。

1
2
3
4
5
6
Term  | Doc 1 | Doc 2 | Doc 3 | ...
------------------------------------
brown | X | | X | ...
fox | X | X | X | ...
quick | X | X | | ...
the | X | | X | ...

当讨论倒排索引时,我们会谈到 文档 标引,因为历史原因,倒排索引被用来对整个非结构化文本文档进行标引。 Elasticsearch 中的 文档 是有字段和值的结构化 JSON 文档。事实上,在 JSON 文档中, 每个被索引的字段都有自己的倒排索引。

这个倒排索引相比特定词项出现过的文档列表,会包含更多其它信息。它会保存每一个词项出现过的文档总数, 在对应的文档中一个具体词项出现的总次数,词项在文档中的顺序,每个文档的长度,所有文档的平均长度,等等。这些统计信息允许 Elasticsearch 决定哪些词比其它词更重要,哪些文档比其它文档更重要,这些内容在 什么是相关性? 中有描述。

为了能够实现预期功能,倒排索引需要知道集合中的 所有 文档,这是需要认识到的关键问题。

早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。

不变性

倒排索引被写入磁盘后是 不可改变 的:它永远不会修改。 不变性有重要的价值:

  • 不需要锁。如果你从来不更新索引,你就不需要担心多进程同时修改数据的问题。
  • 一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性。只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。
  • 其它缓存(像 filter 缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
  • 写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。

当然,一个不变的索引也有不好的地方。主要事实是它是不可变的! 你不能修改它。如果你需要让一个新的文档 可被搜索,你需要重建整个索引。这要么对一个索引所能包含的数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。

动态更新索引

下一个需要被解决的问题是怎样在保留不变性的前提下实现倒排索引的更新?答案是: 用更多的索引。

通过增加新的补充索引来反映新近的修改,而不是直接重写整个倒排索引。每一个倒排索引都会被轮流查询到—从最早的开始—查询完后再对结果进行合并。

Elasticsearch 基于 Lucene, 这个 java 库引入了 按段搜索 的概念。 每一 段 本身都是一个倒排索引, 但 索引 在 Lucene 中除表示所有 段 的集合外, 还增加了 提交点 的概念 — 一个列出了所有已知段的文件,就像在 Figure 16, “一个 Lucene 索引包含一个提交点和三个段” 中描绘的那样。 如 Figure 17, “一个在内存缓存中包含新文档的 Lucene 索引” 所示,新的文档首先被添加到内存索引缓存中,然后写入到一个基于磁盘的段,如 Figure 18, “在一次提交后,一个新的段被添加到提交点而且缓存被清空。” 所示。

Figure 16. 一个 Lucene 索引包含一个提交点和三个段

A Lucene index with a commit point and three segments

被混淆的概念是,一个 Lucene 索引 我们在 Elasticsearch 称作 分片 。 一个 Elasticsearch 索引 是分片的集合。 当 Elasticsearch 在索引中搜索的时候, 他发送查询到每一个属于索引的分片(Lucene 索引),然后像 执行分布式检索 提到的那样,合并每个分片的结果到一个全局的结果集。

逐段搜索会以如下流程进行工作:

  1. 新文档被收集到内存索引缓存, 见 Figure 17, “一个在内存缓存中包含新文档的 Lucene 索引” 。
  2. 不时地, 缓存被 提交
    • 一个新的段—一个追加的倒排索引—被写入磁盘。
    • 一个新的包含新段名字的 提交点 被写入磁盘。
    • 磁盘进行 同步 — 所有在文件系统缓存中等待的写入都刷新到磁盘,以确保它们被写入物理文件。
  3. 新的段被开启,让它包含的文档可见以被搜索。
  4. 内存缓存被清空,等待接收新的文档。

Figure 17. 一个在内存缓存中包含新文档的 Lucene 索引

A Lucene index with new documents in the in-memory buffer, ready to commit

Figure 18. 在一次提交后,一个新的段被添加到提交点而且缓存被清空。

After a commit, a new segment is added to the index and the buffer is cleared

当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添加到索引。

删除和更新

段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档的段信息。

当一个文档被 “删除” 时,它实际上只是在 .del 文件中被 标记 删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。

文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。

段合并 , 我们展示了一个被删除的文档是怎样被文件系统移除的。

近实时搜索

随着按段(per-segment)搜索的发展,一个新的文档从索引到可被搜索的延迟显著降低了。新文档在几分钟之内即可被检索,但这样还是不够快。

磁盘在这里成为了瓶颈。提交(Commiting)一个新的段到磁盘需要一个 fsync 来确保段被物理性地写入磁盘,这样在断电的时候就不会丢失数据。 但是 fsync 操作代价很大; 如果每次索引一个文档都去执行一次的话会造成很大的性能问题。

我们需要的是一个更轻量的方式来使一个文档可被搜索,这意味着 fsync 要从整个过程中被移除。

在 Elasticsearch 和磁盘之间是文件系统缓存。 像之前描述的一样, 在内存索引缓冲区( Figure 19, “在内存缓冲区中包含了新文档的 Lucene 索引” )中的文档会被写入到一个新的段中( Figure 20, “缓冲区的内容已经被写入一个可被搜索的段中,但还没有进行提交” )。 但是这里新段会被先写入到文件系统缓存—这一步代价会比较低,稍后再被刷新到磁盘—这一步代价比较高。不过只要文件已经在缓存中, 就可以像其它文件一样被打开和读取了。

Figure 19. 在内存缓冲区中包含了新文档的 Lucene 索引

A Lucene index with new documents in the in-memory buffer

Lucene 允许新段被写入和打开—使其包含的文档在未进行一次完整提交时便对搜索可见。 这种方式比进行一次提交代价要小得多,并且在不影响性能的前提下可以被频繁地执行。

Figure 20. 缓冲区的内容已经被写入一个可被搜索的段中,但还没有进行提交

The buffer contents have been written to a segment, which is searchable, but is not yet commited

refresh API

在 Elasticsearch 中,写入和打开一个新段的轻量的过程叫做 refresh 。 默认情况下每个分片会每秒自动刷新一次。这就是为什么我们说 Elasticsearch 是 实时搜索: 文档的变化并不是立即对搜索可见,但会在一秒之内变为可见。

这些行为可能会对新用户造成困惑: 他们索引了一个文档然后尝试搜索它,但却没有搜到。这个问题的解决办法是用 refresh API 执行一次手动刷新:

1
2
POST /_refresh
POST /blogs/_refresh

刷新(Refresh)所有的索引

只刷新(Refresh) blogs 索引

尽管刷新是比提交轻量很多的操作,它还是会有性能开销。当写测试的时候, 手动刷新很有用,但是不要在生产环境下每次索引一个文档都去手动刷新。 相反,你的应用需要意识到 Elasticsearch 的近实时的性质,并接受它的不足。

并不是所有的情况都需要每秒刷新。可能你正在使用 Elasticsearch 索引大量的日志文件, 你可能想优化索引速度而不是近实时搜索, 可以通过设置 refresh_interval , 降低每个索引的刷新频率:

1
2
3
4
5
6
PUT /my_logs
{
"settings": {
"refresh_interval": "30s"
}
}

每 30 秒刷新 my_logs 索引。

refresh_interval 可以在既存索引上进行动态更新。 在生产环境中,当你正在建立一个大的新索引时,可以先关闭自动刷新,待开始使用该索引时,再把它们调回来:

1
2
3
4
5
PUT /my_logs/_settings
{ "refresh_interval": -1 }

PUT /my_logs/_settings
{ "refresh_interval": "1s" }
  • 关闭自动刷新。

  • 每秒自动刷新。

refresh_interval 需要一个 持续时间 值, 例如 1s (1 秒) 或 2m (2 分钟)。 一个绝对值 1 表示的是 1 毫秒 –无疑会使你的集群陷入瘫痪。

持久化变更

如果没有用 fsync 把数据从文件系统缓存刷(flush)到硬盘,我们不能保证数据在断电甚至是程序正常退出之后依然存在。为了保证 Elasticsearch 的可靠性,需要确保数据变化被持久化到磁盘。

动态更新索引,我们说一次完整的提交会将段刷到磁盘,并写入一个包含所有段列表的提交点。Elasticsearch 在启动或重新打开一个索引的过程中使用这个提交点来判断哪些段隶属于当前分片。

即使通过每秒刷新(refresh)实现了近实时搜索,我们仍然需要经常进行完整提交来确保能从失败中恢复。但在两次提交之间发生变化的文档怎么办?我们也不希望丢失掉这些数据。

Elasticsearch 增加了一个 translog ,或者叫事务日志,在每一次对 Elasticsearch 进行操作时均进行了日志记录。通过 translog ,整个流程看起来是下面这样:

一个文档被索引之后,就会被添加到内存缓冲区,并且 追加到了 translog ,正如 Figure 21, “新的文档被添加到内存缓冲区并且被追加到了事务日志” 描述的一样。

Figure 21. 新的文档被添加到内存缓冲区并且被追加到了事务日志

New documents are added to the in-memory buffer and appended to the transaction log

刷新(refresh)使分片处于 Figure 22, “刷新(refresh)完成后, 缓存被清空但是事务日志不会” 描述的状态,分片每秒被刷新(refresh)一次:

  • 这些在内存缓冲区的文档被写入到一个新的段中,且没有进行 fsync 操作。
  • 这个段被打开,使其可被搜索。
  • 内存缓冲区被清空。

Figure 22. 刷新(refresh)完成后, 缓存被清空但是事务日志不会

After a refresh, the buffer is cleared but the transaction log is not

这个进程继续工作,更多的文档被添加到内存缓冲区和追加到事务日志(见 Figure 23, “事务日志不断积累文档” )。

Figure 23. 事务日志不断积累文档

The transaction log keeps accumulating documents

  1. 每隔一段时间—例如 translog 变得越来越大—索引被刷新(flush);一个新的 translog 被创建,并且一个全量提交被执行(见 Figure 24, “在刷新(flush)之后,段被全量提交,并且事务日志被清空” ):
    • 所有在内存缓冲区的文档都被写入一个新的段。
    • 缓冲区被清空。
    • 一个提交点被写入硬盘。
    • 文件系统缓存通过 fsync 被刷新(flush)。
    • 老的 translog 被删除。

translog 提供所有还没有被刷到磁盘的操作的一个持久化纪录。当 Elasticsearch 启动的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放 translog 中所有在最后一次提交后发生的变更操作。

translog 也被用来提供实时 CRUD 。当你试着通过 ID 查询、更新、删除一个文档,它会在尝试从相应的段中检索之前, 首先检查 translog 任何最近的变更。这意味着它总是能够实时地获取到文档的最新版本。

Figure 24. 在刷新(flush)之后,段被全量提交,并且事务日志被清空

After a flush, the segments are fully commited and the transaction log is cleared

flush API

这个执行一个提交并且截断 translog 的行为在 Elasticsearch 被称作一次 flush 。 分片每 30 分钟被自动刷新(flush),或者在 translog 太大的时候也会刷新。请查看 translog 文档 来设置,它可以用来 控制这些阈值:

flush API 可以被用来执行一个手工的刷新(flush):

1
2
POST /blogs/_flush
POST /_flush?wait_for_ongoing
  • 刷新(flush) blogs 索引。
  • 刷新(flush)所有的索引并且并且等待所有刷新在返回前完成。

你很少需要自己手动执行 flush 操作;通常情况下,自动刷新就足够了。

这就是说,在重启节点或关闭索引之前执行 flush 有益于你的索引。当 Elasticsearch 尝试恢复或重新打开一个索引, 它需要重放 translog 中所有的操作,所以如果日志越短,恢复越快。

translog 的目的是保证操作不会丢失。这引出了这个问题: Translog 有多安全?

在文件被 fsync 到磁盘前,被写入的文件在重启之后就会丢失。默认 translog 是每 5 秒被 fsync 刷新到硬盘, 或者在每次写请求完成之后执行(e.g. index, delete, update, bulk)。这个过程在主分片和复制分片都会发生。最终, 基本上,这意味着在整个请求被 fsync 到主分片和复制分片的 translog 之前,你的客户端不会得到一个 200 OK 响应。

在每次请求后都执行一个 fsync 会带来一些性能损失,尽管实践表明这种损失相对较小(特别是 bulk 导入,它在一次请求中平摊了大量文档的开销)。

但是对于一些大容量的偶尔丢失几秒数据问题也并不严重的集群,使用异步的 fsync 还是比较有益的。比如,写入的数据被缓存到内存中,再每 5 秒执行一次 fsync

这个行为可以通过设置 durability 参数为 async 来启用:

1
2
3
4
5
PUT /my_index/_settings
{
"index.translog.durability": "async",
"index.translog.sync_interval": "5s"
}

这个选项可以针对索引单独设置,并且可以动态进行修改。如果你决定使用异步 translog 的话,你需要 保证 在发生 crash 时,丢失掉 sync_interval 时间段的数据也无所谓。请在决定前知晓这个特性。

如果你不确定这个行为的后果,最好是使用默认的参数( "index.translog.durability": "request" )来避免数据丢失。

段合并

由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和 cpu 运行周期。更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。

Elasticsearch 通过在后台进行段合并来解决这个问题。小的段被合并到大的段,然后这些大的段再被合并到更大的段。

段合并的时候会将那些旧的已删除文档从文件系统中清除。被删除的文档(或被更新文档的旧版本)不会被拷贝到新的大段中。

启动段合并不需要你做任何事。进行索引和搜索时会自动进行。这个流程像在 Figure 25, “两个提交了的段和一个未提交的段正在被合并到一个更大的段” 中提到的一样工作:

1、 当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。

2、 合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。

Figure 25. 两个提交了的段和一个未提交的段正在被合并到一个更大的段

Two commited segments and one uncommited segment in the process of being merged into a bigger segment

Figure 26, “一旦合并结束,老的段被删除” 说明合并完成时的活动:

  • 新的段被刷新(flush)到了磁盘。 ** 写入一个包含新段且排除旧的和较小的段的新提交点。
  • 新的段被打开用来搜索。
  • 老的段被删除。

Figure 26. 一旦合并结束,老的段被删除

一旦合并结束,老的段被删除

合并大的段需要消耗大量的 I/O 和 CPU 资源,如果任其发展会影响搜索性能。Elasticsearch 在默认情况下会对合并流程进行资源限制,所以搜索仍然 有足够的资源很好地执行。

optimize API

optimize API 大可看做是 强制合并 API。它会将一个分片强制合并到 max_num_segments 参数指定大小的段数目。 这样做的意图是减少段的数量(通常减少到一个),来提升搜索性能。

optimize API 不应该 被用在一个活跃的索引————一个正积极更新的索引。后台合并流程已经可以很好地完成工作。 optimizing 会阻碍这个进程。不要干扰它!

在特定情况下,使用 optimize API 颇有益处。例如在日志这种用例下,每天、每周、每月的日志被存储在一个索引中。 老的索引实质上是只读的;它们也并不太可能会发生变化。

在这种情况下,使用 optimize 优化老的索引,将每一个分片合并为一个单独的段就很有用了;这样既可以节省资源,也可以使搜索更加快速:

1
POST /logstash-2014-10/_optimize?max_num_segments=1

合并索引中的每个分片为一个单独的段

请注意,使用 optimize API 触发段合并的操作不会受到任何资源上的限制。这可能会消耗掉你节点上全部的 I/O 资源, 使其没有余裕来处理搜索请求,从而有可能使集群失去响应。 如果你想要对索引执行 optimize,你需要先使用分片分配(查看 迁移旧索引)把索引移到一个安全的节点,再执行。

参考资料

ElasticSearch API 之 HighLevelRestClient

Elasticsearch 官方的 High Level REST Client 在 7.1.5.0 版本废弃。所以本文中的 API 不推荐使用。

快速开始

引入依赖

在 pom.xml 中引入以下依赖:

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.1</version>
</dependency>

创建连接和关闭

1
2
3
4
5
6
7
8
// 创建连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));

// 关闭
client.close();

索引 API

测试准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static final String INDEX = "mytest";
public static final String INDEX_ALIAS = "mytest_alias";
/**
* {@link User} 的 mapping 结构(json形式)
*/
public static final String MAPPING_JSON =
"{\n" + " \"properties\": {\n" + " \"_class\": {\n" + " \"type\": \"keyword\",\n"
+ " \"index\": false,\n" + " \"doc_values\": false\n" + " },\n" + " \"description\": {\n"
+ " \"type\": \"text\",\n" + " \"fielddata\": true\n" + " },\n" + " \"enabled\": {\n"
+ " \"type\": \"boolean\"\n" + " },\n" + " \"name\": {\n" + " \"type\": \"text\",\n"
+ " \"fielddata\": true\n" + " }\n" + " }\n" + "}";

@Autowired
private RestHighLevelClient client;

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);

// 设置索引的 settings
createIndexRequest.settings(
Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));

// 设置索引的 mapping
createIndexRequest.mapping(MAPPING_JSON, XContentType.JSON);

// 设置索引的别名
createIndexRequest.alias(new Alias(INDEX_ALIAS));

AcknowledgedResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(createIndexResponse.isAcknowledged());

删除索引

1
2
3
4
// 删除索引
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
AcknowledgedResponse deleteResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(deleteResponse.isAcknowledged());

判断索引是否存在

1
2
3
4
GetIndexRequest getIndexRequest = new GetIndexRequest(INDEX);
Assertions.assertTrue(client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
GetIndexRequest getIndexAliasRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexAliasRequest, RequestOptions.DEFAULT));

文档 API

文档测试准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static final String INDEX = "mytest";
public static final String INDEX_ALIAS = "mytest_alias";
/**
* {@link User} 的 mapping 结构(json形式)
*/
public static final String MAPPING_JSON =
"{\n" + " \"properties\": {\n" + " \"_class\": {\n" + " \"type\": \"keyword\",\n"
+ " \"index\": false,\n" + " \"doc_values\": false\n" + " },\n" + " \"description\": {\n"
+ " \"type\": \"text\",\n" + " \"fielddata\": true\n" + " },\n" + " \"enabled\": {\n"
+ " \"type\": \"boolean\"\n" + " },\n" + " \"name\": {\n" + " \"type\": \"text\",\n"
+ " \"fielddata\": true\n" + " }\n" + " }\n" + "}";

@Autowired
private RestHighLevelClient client;

@BeforeEach
public void init() throws IOException {

// 创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);

// 设置索引的 settings
createIndexRequest.settings(
Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));

// 设置索引的 mapping
createIndexRequest.mapping(MAPPING_JSON, XContentType.JSON);

// 设置索引的别名
createIndexRequest.alias(new Alias(INDEX_ALIAS));

AcknowledgedResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(response.isAcknowledged());

// 判断索引是否存在
GetIndexRequest getIndexRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
GetIndexRequest getIndexAliasRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexAliasRequest, RequestOptions.DEFAULT));
}

@AfterEach
public void destroy() throws IOException {
// 删除索引
DeleteIndexRequest request = new DeleteIndexRequest(INDEX);
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
Assertions.assertTrue(response.isAcknowledged());
}

创建文档

RestHighLevelClient Api 使用 IndexRequest 来构建创建文档的请求参数。

【示例】创建 id 为 1 的文档

1
2
3
4
5
6
7
8
IndexRequest request = new IndexRequest("product");
request.id("1");
Product product = new Product();
product.setName("机器人");
product.setDescription("人工智能机器人");
product.setEnabled(true);
String jsonString = JSONUtil.toJsonStr(product);
request.source(jsonString, XContentType.JSON);

同步执行

1
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

异步执行

1
2
3
4
5
6
7
8
9
10
11
12
// 异步执行
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println(indexResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

删除文档

RestHighLevelClient Api 使用 DeleteRequest 来构建删除文档的请求参数。

【示例】删除 id 为 1 的文档

1
DeleteRequest deleteRequest = new DeleteRequest(INDEX_ALIAS, "1");

同步执行

1
2
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(deleteResponse);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
System.out.println(deleteResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

更新文档

RestHighLevelClient Api 使用 UpdateRequest 来构建更新文档的请求参数。

【示例】更新 id 为 1 的文档

1
2
3
4
5
6
7
UpdateRequest updateRequest = new UpdateRequest(INDEX_ALIAS, "1");
Product product3 = new Product();
product3.setName("扫地机器人");
product3.setDescription("人工智能扫地机器人");
product3.setEnabled(true);
String jsonString2 = JSONUtil.toJsonStr(product3);
updateRequest.doc(jsonString2, XContentType.JSON);

同步执行

1
2
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
System.out.println(updateResponse);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
System.out.println(updateResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

查看文档

RestHighLevelClient Api 使用 GetRequest 来构建查看文档的请求参数。

【示例】查看 id 为 1 的文档

1
GetRequest getRequest = new GetRequest(INDEX_ALIAS, "1");

同步执行

1
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println(getResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

获取匹配条件的记录总数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
@DisplayName("获取匹配条件的记录总数")
public void count() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchPhraseQuery("customer_gender", "MALE"));
sourceBuilder.trackTotalHits(true);

CountRequest countRequest = new CountRequest(INDEX);
countRequest.source(sourceBuilder);

CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
long count = countResponse.getCount();
System.out.println("命中记录数:" + count);
}

分页查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
@DisplayName("分页查询测试")
public void pageTest(int page) throws IOException {

int size = 10;
int offset = page * size;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchPhraseQuery("customer_gender", "MALE"));
sourceBuilder.from(offset);
sourceBuilder.size(size);
sourceBuilder.trackTotalHits(true);

SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
KibanaSampleDataEcommerceBean bean =
BeanUtil.mapToBean(hit.getSourceAsMap(), KibanaSampleDataEcommerceBean.class, true,
CopyOptions.create());
System.out.println(bean);
}
}

条件查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
@DisplayName("条件查询")
public void matchPhraseQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("customer_last_name", "Jensen"));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.trackTotalHits(true);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
KibanaSampleDataEcommerceBean bean =
BeanUtil.mapToBean(hit.getSourceAsMap(), KibanaSampleDataEcommerceBean.class, true,
CopyOptions.create());
System.out.println(bean);
}
}

参考资料

Elasticsearch 文本分析

文本分析是将非结构化文本转换为针对搜索优化的结构化格式的过程。

文本分析简介

文本分析使 Elasticsearch 能够执行全文搜索,其中搜索返回所有相关结果,而不仅仅是完全匹配。

文本分析可以分为两个方面:

  • Tokenization(分词化) - 分析通过分词化使全文搜索成为可能:将文本分解成更小的块,称为分词。在大多数情况下,这些标记是单独的 term(词项)。
  • Normalizeation(标准化) - 经过分词后的文本只能进行词项匹配,但是无法进行同义词匹配。为解决这个问题,可以将文本进行标准化处理。例如:将 foxes 标准化为 fox

Analyzer(分析器)

文本分析由 analyzer(分析器) 执行,分析器是一组控制整个过程的规则。无论是索引还是搜索,都需要使用分析器。

analyzer(分析器) 由三个组件组成:零个或多个 Character Filters(字符过滤器)、有且仅有一个 Tokenizer(分词器)、零个或多个 Token Filters(分词过滤器)

它的执行顺序如下:

1
character filters -> tokenizer -> token filters

Elasticsearch 内置的分析器:

  • standard - 根据单词边界将文本划分为多个 term,如 Unicode 文本分割算法所定义。它删除了大多数标点符号、小写 term,并支持删除停用词。
  • simple - 遇到非字母字符时将文本划分为多个 term,并将其转为小写。
  • whitespace - 遇到任何空格时将文本划分为多个 term,不转换为小写。
  • stop - 与 simple 相似,同时支持删除停用词(如:the、a、is)。
  • keyword - 部分词,直接将输入当做输出。
  • pattern - 使用正则表达式将文本拆分为 term。它支持小写和非索引字。
  • fingerprint - 可创建用于重复检测的指纹。
  • 语言分析器 - 提供了 30 多种常见语言的分词器。

默认情况下,Elasticsearch 使用 standard analyzer(标准分析器),它开箱即用,适用于大多数使用场景。Elasticsearch 也允许定制分析器。

测试分析器

_analyze API 是查看分析器如何分词的工具。

::: details 【示例】直接指定 analyzer 进行测试

查看不同的 analyzer 的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
GET _analyze
{
"analyzer": "standard",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

GET _analyze
{
"analyzer": "simple",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

GET _analyze
{
"analyzer": "stop",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

GET _analyze
{
"analyzer": "whitespace",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

GET _analyze
{
"analyzer": "keyword",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

GET _analyze
{
"analyzer": "pattern",
"text": "2 running Quick brown-foxes leap over lazy dogs in the summer evening."
}

:::

::: details 【示例】自由组合分析器组件进行测试

1
2
3
4
5
6
POST _analyze
{
"tokenizer": "standard",
"filter": [ "lowercase", "asciifolding" ],
"text": "Is this déja vu?"
}

:::

指定分析器

内置分析器可以直接使用,无需任何配置。但是,其中一些支持配置选项来更改其行为。

在搜索时,Elasticsearch 通过按顺序检查以下参数来确定要使用的分析器:

  1. 搜索查询中的 analyzer 参数。请参阅 指定查询的搜索分析器
  2. 字段的 search_analyzer 映射参数。请参阅 为字段指定搜索分析器
  3. analysis.analyzer.default_search 索引设置。请参阅 指定索引的默认搜索分析器
  4. 字段的 analyzer mapping 参数。请参阅 为字段指定分析器

如果未指定这些参数,则使用 standard 分析器。

::: details 【示例】设置索引的默认分析器

将 std_english 分析器定义为基于标准分析器,但配置为删除预定义的英语停用词列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
PUT my-index-000001
{
"settings": {
"analysis": {
"analyzer": {
"std_english": {
"type": "standard",
"stopwords": "_english_"
}
}
}
},
"mappings": {
"properties": {
"my_text": {
"type": "text",
"analyzer": "standard",
"fields": {
"english": {
"type": "text",
"analyzer": "std_english"
}
}
}
}
}
}

:::

::: details 【示例】设置字段的分析器

将字段 title 的分析器设为 whitespace

1
2
3
4
5
6
7
8
9
10
11
PUT my-index-000001
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "whitespace"
}
}
}
}

:::

::: details 【示例】指定查询的搜索分析器

1
2
3
4
5
6
7
8
9
10
11
GET my-index-000001/_search
{
"query": {
"match": {
"message": {
"query": "Quick foxes",
"analyzer": "stop"
}
}
}
}

:::

::: details 【示例】指定字段的搜索分析器

1
2
3
4
5
6
7
8
9
10
11
12
PUT my-index-000001
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "whitespace",
"search_analyzer": "simple"
}
}
}
}

:::

::: details 【示例】指定索引的默认搜索分析器

创建索引时,可以使用该 analysis.analyzer.default_search 设置设置默认搜索分析器。如果提供了搜索分析器,则还必须使用 analysis.analyzer.default 设置指定默认索引分析器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT my-index-000001
{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "simple"
},
"default_search": {
"type": "whitespace"
}
}
}
}
}

:::

自定义分析器

自定义分析器,需要指定 type 为 custom 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT my-index-000001
{
"settings": {
"analysis": {
"analyzer": {
"my_custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"char_filter": [
"html_strip"
],
"filter": [
"lowercase",
"asciifolding"
]
}
}
}
}
}

中文分词

在英文中,单词有自然的空格作为分隔。

在中文中,分词有以下难点:

  • 中文不能根据一个个汉字进行分词
  • 不同于英文可以根据自然的空格进行分词;中文中一般不会有空格。
  • 同一句话,在不同的上下文中,有不同个理解。例如:这个苹果,不大好吃;这个苹果,不大,好吃!

可以使用一些插件来获得对中文更好的分析能力:

  • analysis-icu - 添加了扩展的 Unicode 支持,包括更好地分析亚洲语言、Unicode 规范化、Unicode 感知大小写折叠、排序规则支持和音译。
  • elasticsearch-analysis-ik - 支持自定义词库,支持热更新分词字典
  • elasticsearch-thulac-plugin - 清华大学自然语言处理和社会人文计算实验室的一套中文分词器。

Character Filters(字符过滤器)

Character Filters(字符过滤器) 将原始文本作为字符流接收,并可以通过添加、删除或更改字符来转换文本。

分析器可以有零个或多个 Character Filters(字符过滤器),如果配置了多个,它会按照配置的顺序执行。

Elasticsearch 内置的字符过滤器:

  • html_strip - html_strip字符过滤器用于去除 HTML 元素(如 <b>)并转义 HTML 实体(如 &amp;)。
  • mapping - mapping 字符过滤器用于将指定字符串的任何匹配项替换为指定的替换项。
  • pattern_replace - pattern_replace 字符筛选器将匹配正则表达式的任何字符替换为指定的替换。

Tokenizer(分词器)

Tokenizer(分词器) 接收字符流,将其分解为分词(通常是单个单词),并输出一个分词流。

分词器还负责记录每个 term 的顺序或位置,以及该 term 所代表的原始单词的开始和结束字符偏移量。``

分析器有且仅有一个 Tokenizer(分词器)

Elasticsearch 内置的分词器:

  • 面向单词的分词器
    • standard - 将文本划分为单词边界上的 term,如 Unicode 文本分割算法所定义。它会删除大多数标点符号。它是大多数语言的最佳选择。
    • letter - 遇到非字母字符时将文本划分为多个 term。
    • lowercase - 到非字母字符时将文本划分为多个 term,并将其转为小写。
    • whitespace - 遇到任何空格时将文本划分为多个 term。
    • uax_url_email - 与 standard 相似,不同之处在于它将 URL 和电子邮件地址识别为单个分词。
    • classic - 基于语法的英语分词器。
    • thai - 将泰语文本分割为单词。
  • 部分单词分词器
    • n-gram - 遇到指定字符列表(例如空格或标点符号)中的任何一个时,将文本分解为单词,然后返回每个单词的 n-gram:一个连续字母的滑动窗口,例如 quick[qu, ui, ic, ck]
    • edge_n-gram - 遇到指定字符列表(例如空格或标点符号)中的任何一个时,将文本分解为单词,然后返回锚定到单词开头的每个单词的 n 元语法,例如 quick[q, qu, qui, quic, quick]
  • 结构化文本分词器
    • keyword - 接受给定的任何文本,并输出与单个 term 完全相同的文本。它可以与 lowercase 等分词过滤器结合使用,以规范化分析的 term。
    • pattern - 使用正则表达式在文本与单词分隔符匹配时将文本拆分为 term,或者将匹配的文本捕获为 term。
    • simple_pattern - 使用正则表达式将匹配的文本捕获为 term。它使用正则表达式特征的受限子集,并且通常比 pattern 更快。
    • char_group - 可以通过要拆分的字符集进行配置,这通常比运行正则表达式代价更小。
    • simple_pattern_split - 使用与 simple_pattern 分词器相同的受限正则表达式子集,但在匹配项处拆分输入,而不是将匹配项作为 term 返回。
    • path_hierarchy - 基于文件系统的路径分隔符,进行拆分,例如 /foo/bar/baz[/foo, /foo/bar, /foo/bar/baz ]

Token Filters(分词过滤器)

Token Filters(分词过滤器) 接收分词流,并可以添加、删除或更改分词。常用的分词过滤器有: lowercase(小写转换)stop(停用词处理)synonym(同义词处理) 等等。

分析器可以有零个或多个 Token Filters(分词过滤器),如果配置了多个,它会按照配置的顺序执行。

Elasticsearch 内置了很多分词过滤器,这里列举几个常见的:

参考资料

Elasticsearch 存储

逻辑存储设计

Elasticsearch 的逻辑存储被设计为层级结构,自上而下为:

1
index -> type -> mapping -> document -> field

各层级结构的说明如下:

Document(文档)

Elasticsearch 是面向文档的,这意味着读写数据的最小单位是文档。Elasticsearch 以 JSON 文档的形式序列化和存储数据。文档是一组字段,这些字段是包含数据的键值对。每个文档都有一个唯一的 ID。

一个简单的 Elasticsearch 文档可能如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"_index": "my-first-elasticsearch-index",
"_id": "DyFpo5EBxE8fzbb95DOa",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"email": "john@smith.com",
"first_name": "John",
"last_name": "Smith",
"info": {
"bio": "Eco-warrior and defender of the weak",
"age": 25,
"interests": ["dolphins", "whales"]
},
"join_date": "2024/05/01"
}
}

Elasticsearch 中的 document 是无模式的,也就是并非所有 document 都必须拥有完全相同的字段,它们不受限于同一个模式。

Field(字段)

field 包含数据的键值对。默认情况下,Elasticsearch 对每个字段中的所有数据建立索引,并且每个索引字段都具有专用的优化数据结构。

document 包含数据和元数据。Metadata Field(元数据字段) 是存储有关文档信息的系统字段。在 Elasticsearch 中,元数据字段都以 _ 开头。常见的元数据字段有:

  • _index - 文档所属的索引
  • _id - 文档的 ID
  • _source - 表示文档原文的 JSON

Type(类型)

在 Elasticsearch 中,type 是 document 的逻辑分类。每个 index 里可以有一个或多个 type。

不同的 type 应该有相似的结构(schema)。举例来说,id字段不能在这个组是字符串,在另一个组是数值。

注意:Elasticsearch 7.x 版已彻底移除 type。

Index(索引)

在 Elasticsearch 中,可以将 index 视为 document 的集合。每个索引存储在磁盘上的同组文件中;索引存储了所有映射类型的字段,还有一些设置。

Elasticsearch 会为所有字段建立索引,经过处理后写入一个倒排索引(Inverted Index)。查找数据的时候,直接查找该索引。

所以,Elasticsearch 数据管理的顶层单位就叫做 Index。它是单个数据库的同义词。每个 Index 的名字必须是小写。

Elasticsearch 概念和 RDBM 概念

Elasticsearch 概念 vs. RDBM 概念

Elasticsearch 概念 RDBM 概念
索引(index) 数据库(database)
类型(type,6.0 废弃,7.0 移除) 数据表(table)
文档(docuemnt) 行(row)
字符(field) 列(column)
映射(mapping) 表结构(schema)

物理存储设计

Elasticsearch 的物理存储,天然使用了分布式设计。

每个 Elasticsearch 进程都从属于一个 cluster,一个 cluster 可以有一个或多个 node(即 Elasticsearch 进程)。

Elasticsearch 存储会将每个 index 分为多个 shard,而 shard 可以分布在集群中不同节点上。正是由于这个机制,使得 Elasticsearch 有了水平扩展的能力。shard 也是 Elasticsearch 将数据从一个节点迁移到拎一个节点的最小单位。

Elasticsearch 的每个 shard 对应一个 Lucene index(一个包含倒排索引的文件目录)。Lucene index 又会被分解为多个 segment。segment 是索引中的内部存储元素,由于写入效率的考虑,所以被设计为不可变更的。segment 会定期 合并 较大的 segment,以保持索引大小。简单来说,Lucene 就是一个 jar 包,里面包含了封装好的构建、管理倒排索引的算法代码。

倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个例子,有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1,2,3,4,5
2 地图 1,2,3,4,5
3 之父 1,2,4,5
4 跳槽 1,4
5 Facebook 1,2,3,4,5
6 加盟 2,3,5
7 创始人 3
8 拉斯 3,5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

Setting

Elasticsearch 索引的配置项主要分为静态配置属性动态配置属性,静态配置属性是索引创建后不能修改,而动态配置属性则可以随时修改。

Elasticsearch 索引设置的 api 为 **settings**,完整的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
PUT /my_index
{
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "1",
"refresh_interval": "60s",
"analysis": {
"filter": {
"tsconvert": {
"type": "stconvert",
"convert_type": "t2s",
"delimiter": ","
},
"synonym": {
"type": "synonym",
"synonyms_path": "analysis/synonyms.txt"
}
},
"analyzer": {
"ik_max_word_synonym": {
"filter": [
"synonym",
"tsconvert",
"standard",
"lowercase",
"stop"
],
"tokenizer": "ik_max_word"
},
"ik_smart_synonym": {
"filter": [
"synonym",
"standard",
"lowercase",
"stop"
],
"tokenizer": "ik_smart"
}
},
"mapping": {
"coerce": "false",
"ignore_malformed": "false"
},
"indexing": {
"slowlog": {
"threshold": {
"index": {
"warn": "2s",
"info": "1s"
}
}
}
},
"provided_name": "hospital_202101070533",
"query": {
"default_field": "timestamp",
"parse": {
"allow_unmapped_fields": "false"
}
},
"requests": {
"cache": {
"enable": "true"
}
},
"search": {
"slowlog": {
"threshold": {
"fetch": {
"warn": "1s",
"info": "200ms"
},
"query": {
"warn": "1s",
"info": "500ms"
}
}
}
}
}
}
}

固定属性

  • **_index.creation_date_**:顾名思义索引的创建时间戳。
  • **_index.uuid_**:索引的 uuid 信息。
  • **_index.version.created_**:索引的版本号。

索引静态配置

  • **_index.number_of_shards**:索引的主分片数,默认值是 **5**。这个配置在索引创建后不能修改;在 Elasticsearch 层面,可以通过 **es.index.max_number_of_shards** 属性设置索引最大的分片数,默认为 **1024_**。
  • **_index.codec**:数据存储的压缩算法,默认值为 **LZ4**,可选择值还有 **best_compression_**,它比 LZ4 可以获得更好的压缩比(即占据较小的磁盘空间,但存储性能比 LZ4 低)。
  • **_index.routing_partition_size_**:路由分区数,如果设置了该参数,其路由算法为:( hash(_routing) + hash(_id) % index.routing_parttion_size ) % number_of_shards。如果该值不设置,则路由算法为 hash(_routing) % number_of_shardings_routing 默认值为 _id

静态配置里,有重要的部分是配置分析器(config analyzers)。

  • index.analysis

    :分析器最外层的配置项,内部主要分为 char_filter、tokenizer、filter 和 analyzer。

    • **_char_filter_**:定义新的字符过滤器件。
    • **_tokenizer_**:定义新的分词器。
    • **_filter_**:定义新的 token filter,如同义词 filter。
    • **_analyzer_**:配置新的分析器,一般是 char_filter、tokenizer 和一些 token filter 的组合。

索引动态配置

  • **_index.number_of_replicas**:索引主分片的副本数,默认值是 **1_**,该值必须大于等于 0,这个配置可以随时修改。
  • **_index.refresh_interval**:执行新索引数据的刷新操作频率,该操作使对索引的最新更改对搜索可见,默认为 **1s**。也可以设置为 **-1_** 以禁用刷新。更详细信息参考 Elasticsearch 动态修改 refresh_interval 刷新间隔设置

Mapping

在 Elasticsearch 中,**Mapping**(映射),用来定义一个文档以及其所包含的字段如何被存储和索引,可以在映射中事先定义字段的数据类型、字段的权重、分词器等属性,就如同在关系型数据库中创建数据表时会设置字段的类型。

Mapping 会把 json 文档映射成 Lucene 所需要的扁平格式

一个 Mapping 属于一个索引的 Type

  • 每个文档都属于一个 Type
  • 一个 Type 有一个 Mapping 定义
  • 7.0 开始,不需要在 Mapping 定义中指定 type 信息

每个 document 都是 field 的集合,每个 field 都有自己的数据类型。映射数据时,可以创建一个 mapping,其中包含与 document 相关的 field 列表。映射定义还包括元数据 field,例如 _source ,它自定义如何处理 document 的关联元数据。

映射分类

在 Elasticsearch 中,映射可分为静态映射和动态映射。在关系型数据库中写入数据之前首先要建表,在建表语句中声明字段的属性,在 Elasticsearch 中,则不必如此,Elasticsearch 最重要的功能之一就是让你尽可能快地开始探索数据,文档写入 Elasticsearch 中,它会根据字段的类型自动识别,这种机制称为动态映射,而静态映射则是写入数据之前对字段的属性进行手工设置。

静态映射

Elasticsearch 官方将静态映射称为显式映射(Explicit mapping静态映射是在创建索引时手工指定索引映射。静态映射和 SQL 中在建表语句中指定字段属性类似。相比动态映射,通过静态映射可以添加更详细、更精准的配置信息。

例如:

  • 哪些字符串字段应被视为全文字段。
  • 哪些字段包含数字、日期或地理位置。
  • 日期值的格式。
  • 用于控制动态添加字段的自定义规则。

【示例】创建索引时,显示指定 mapping

1
2
3
4
5
6
7
8
9
10
PUT /my-index-000001
{
"mappings": {
"properties": {
"age": { "type": "integer" },
"email": { "type": "keyword" },
"name": { "type": "text" }
}
}
}

【示例】在已存在的索引中,指定一个 field 的属性

1
2
3
4
5
6
7
8
9
PUT /my-index-000001/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false
}
}
}

【示例】查看 mapping

1
GET /my-index-000001/_mapping

【示例】查看指定 field 的 mapping

1
GET /my-index-000001/_mapping/field/employee-id

动态映射

动态映射机制,允许用户不手动定义映射,Elasticsearch 会自动识别字段类型。在实际项目中,如果遇到的业务在导入数据之前不确定有哪些字段,也不清楚字段的类型是什么,使用动态映射非常合适。当 Elasticsearch 在文档中碰到一个以前没见过的字段时,它会利用动态映射来决定该字段的类型,并自动把该字段添加到映射中。

示例:创建一个名为 data 的索引、其 mapping 类型为 _doc,并且有一个类型为 long 的字段 count

1
2
PUT data/_doc/1
{ "count": 5 }

动态字段映射

动态字段映射(Dynamic field mappings)是用于管理动态字段检测的规则。当 Elasticsearch 在文档中检测到新字段时,默认情况下会动态将该字段添加到类型映射中。

在 mapping 中可以通过将 dynamic 参数设置为 trueruntime 来开启动态映射。

dynamic 不同设置的作用:

可选值 说明
true 新字段被添加到 mapping 中。mapping 的默认设置。
runtime 新字段被添加到 mapping 中并作为运行时字段——这些字段不会被索引,但是可以在查询时出现在 _source 中。
false 新字段不会被索引或搜索,但仍会出现在返回匹配的 _source 字段中。这些字段不会添加到映射中,并且必须显式添加新字段。
strict 如果检测到新字段,则会抛出异常并拒绝文档。必须将新字段显式添加到映射中。

需要注意的是:对已有字段,一旦已经有数据写入,就不再支持修改字段定义。如果希望改变字段类型,必须重建索引。这是由于 Lucene 实现的倒排索引,一旦生成后,就不允许修改。如果修改了字段的数据类型,会导致已被索引的字段无法被搜索。

启用动态字段映射后,Elasticsearch 使用内置规则来确定如何映射每个字段的数据类型。规则如下:

JSON 数据类型 "dynamic":"true" "dynamic":"runtime"
null 没有字段被添加 没有字段被添加
true or false boolean 类型 boolean 类型
浮点型数字 float 类型 double 类型
数字 数字型 long 类型
JSON 对象 object 类型 没有字段被添加
数组 由数组中第一个非空值决定 由数组中第一个非空值决定
开启 日期检测 的字符串 date 类型 date 类型
开启 数字检测 的字符串 float 类型或 long类型 double 类型或 long 类型
什么也没开启的字符串 带有 .keyword 子 field 的 text 类型 keyword 类型

下面举一个例子认识动态 mapping,在 Elasticsearch 中创建一个新的索引并查看它的 mapping,命令如下:

1
2
PUT books
GET books/_mapping

此时 books 索引的 mapping 是空的,返回结果如下:

1
2
3
4
5
{
"books": {
"mappings": {}
}
}

再往 books 索引中写入一条文档,命令如下:

1
2
3
4
5
6
PUT books/it/1
{
"id": 1,
"publish_date": "2019-11-10",
"name": "master Elasticsearch"
}

文档写入完成之后,再次查看 mapping,返回结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"books": {
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"publish_date": {
"type": "date"
}
}
}
}
}

动态映射有时可能会错误的识别字段类型,这种情况下,可能会导致一些功能无法正常使用,如 Range 查询。所以,使用动态 mapping 要结合实际业务需求来综合考虑,如果将 Elasticsearch 当作主要的数据存储使用,并且希望出现未知字段时抛出异常来提醒你注意这一问题,那么开启动态 mapping 并不适用。

动态模板

动态模板(dynamic templates是用于给 mapping 动态添加字段的自定义规则。

动态模板可以设置匹配条件,只有匹配的情况下才使用动态模板:

  • match_mapping_type 对 Elasticsearch 检测到的数据类型进行操作
  • matchunmatch 使用模式匹配字段名称
  • path_matchpath_unmatch 对字段的完整虚线路径进行操作
  • 如果动态模板没有定义 match_mapping_typematchpath_match,则不会匹配任何字段。您仍然可以在批量请求的 dynamic_templates 部分按名称引用模板。

【示例】当设置 'dynamic':'true' 时,Elasticsearch 会将字符串字段映射为带有关键字子字段的文本字段。如果只是索引结构化内容并且对全文搜索不感兴趣,可以让 Elasticsearch 仅将字段映射为关键字字段。这种情况下,只有完全匹配才能搜索到这些字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT my-index-000001
{
"mappings": {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}

运行时字段

运行时字段是在查询时评估的字段。运行时字段有以下作用:

  • 在不重新索引数据的情况下,向现有文档添加字段
  • 在不了解数据结构的情况下,也可以处理数据
  • 在查询时覆盖从索引字段返回的值
  • 为特定用途定义字段而不修改底层架构

检索 Elasticsearch 时,运行时字段和其他字段并没有什么不同。

需要注意的是:使用 _search API 上的 fields 参数来检索运行时字段的值。运行时字段不会显示在 _source 中,但 fields API 适用于所有字段,即使是那些未作为原始 _source 的一部分发送的字段。

运行时字段在处理日志数据时很有用,尤其是当日志是不确定的数据结构时:这种情况下,会降低搜索速度,但您的索引大小要小得多,您可以更快地处理日志,而无需为它们设置索引。

因为运行时字段没有被索引,所以添加运行时字段不会增加索引大小。用户可以直接在 mapping 中定义运行时字段,从而节省存储成本并提高采集数据的速度。定义了运行时字段后,可以立即在搜索请求、聚合、过滤和排序中使用它。

如果将运行时字段设为索引字段,则无需修改任何引用运行时字段的查询。更好的是,您可以引用字段是运行时字段的一些索引,以及字段是索引字段的其他索引。您可以灵活地选择要索引哪些字段以及保留哪些字段作为运行时字段。

就其核心而言,运行时字段最重要的好处是能够在您提取字段后将字段添加到文档中。此功能简化了映射决策,因为您不必预先决定如何解析数据,并且可以使用运行时字段随时修改映射。使用运行时字段允许更小的索引和更快的摄取时间,这结合使用更少的资源并降低您的运营成本。

字段数据类型

在 Elasticsearch 中,每个字段都有一个字段数据类型或字段类型,用于指示字段包含的数据类型(例如字符串或布尔值)及其预期用途。字段类型按系列分组。同一族中的类型具有完全相同的搜索行为,但可能具有不同的空间使用或性能特征。

Elasticsearch 提供了非常丰富的数据类型,官方将其分为以下几类:

  • 普通类型

    • binary:编码为 Base64 字符串的二进制值。
    • boolean:布尔类型,值为 true 或 false。
    • Keywords:keyword 族类型,包括 keywordconstant_keywordwildcard
    • Numbers:数字类型,如 longdouble
    • Dates:日期类型,包括 datedate_nanos
    • alias:用于定义存在字段的别名。
  • 对象类型

    • object:JSON 对象
    • flattened:整个 JSON 对象作为单个字段值。
    • nested:保留其子字段之间关系的 JSON 对象。
    • join:为同一索引中的文档定义父/子关系。
  • 结构化数据类型

    • Range:范围类型,例如:long_rangedouble_rangedate_rangeip_range
    • ip:IPv4 和 IPv6 地址。
    • version:版本号。支持 Semantic Versioning 优先规则。
    • murmur3:计算并存储 hash 值。
  • 聚合数据类型

  • 文本搜索类型

  • 文档排名类型

    • dense_vector:记录浮点数的密集向量。
    • rank_feature:记录一个数字特征,为了在查询时提高命中率。
    • rank_features:记录多个数字特征,为了在查询时提高命中率。
  • 空间数据类型

  • 其他类型

元数据字段

一个文档中,不仅仅包含数据 ,也包含元数据。元数据是用于描述文档的信息。

  • 标识元数据字段
    • _index:文档所属的索引。
    • _id:文档的 ID。
  • 文档 source 元数据字段
    • _source:文档正文的原始 JSON。
    • _size_source 字段的大小(以字节为单位),由 mapper-size 插件提供。
  • 文档计数元数据字段
    • _doc_count:当文档表示预聚合数据时,用于存储文档计数的自定义字段。
  • 索引元数据字段
  • 路由元数据字段
    • _routing:将文档路由到特定分片的自定义路由值。
  • 其他元数据字段
    • _meta:应用程序特定的元数据。
    • _tier:文档所属索引的当前数据层首选项。

映射参数

Elasticsearch 提供了以下映射参数:

  • analyzer:指定在索引或搜索文本字段时用于文本分析的分析器。
  • coerce:如果开启,Elasticsearch 将尝试清理脏数据以适应字段的数据类型。
  • copy_to:允许将多个字段的值复制到一个组字段中,然后可以将其作为单个字段进行查询。
  • doc_values:默认情况下,所有字段都是被
  • dynamic:是否开启动态映射。
  • eager_global_ordinals:当在 global ordinals 的时候,refresh 以后下一次查询字典就需要重新构建,在追求查询的场景下很影响查询性能。可以使用 eager_global_ordinals,即在每次 refresh 以后即可更新字典,字典常驻内存,减少了查询的时候构建字典的耗时。
  • enabled:只能应用于顶级 mapping 定义和 object 字段。设置为 false 后,Elasticsearch 解析时,会完全跳过该字段。
  • fielddata:默认情况下, text 字段是可搜索的,但不可用于聚合、排序或脚本。如果为字段设置 fielddata=true,就会通过反转倒排索引将 fielddata 加载到内存中。请注意,这可能会占用大量内存。如果想对 text 字段进行聚合、排序或脚本操作,fielddata 是唯一方法。
  • fields:有时候,同一个字段需要以不同目的进行索引,此时可以通过 fields 进行配置。
  • format:用于格式化日期类型。
  • ignore_above:字符串长度大于 ignore_above 所设,则不会被索引或存储。
  • ignore_malformed:有时候,同一个字段,可能会存储不同的数据类型。默认情况下,Elasticsearch 解析字段数据类型失败时,会引发异常,并拒绝整个文档。 如果设置 ignore_malformedtrue,则允许忽略异常。这种情况下,格式错误的字段不会被索引,但文档中的其他字段可以正常处理。
  • index_options 用于控制将哪些信息添加到倒排索引以进行搜索和突出显示。只有 textkeyword 等基于术语(term)的字段类型支持此配置。
  • index_phrases:如果启用,两个词的组合(shingles)将被索引到一个单独的字段中。这允许以更大的索引为代价,更有效地运行精确的短语查询(无 slop)。请注意,当停用词未被删除时,此方法效果最佳,因为包含停用词的短语将不使用辅助字段,并将回退到标准短语查询。接受真或假(默认)。
  • index_prefixes:index_prefixes 参数启用 term 前缀索引以加快前缀搜索。
  • indexindex 选项控制字段值是否被索引。默认为 true。
  • meta:附加到字段的元数据。此元数据对 Elasticsearch 是不透明的,它仅适用于多个应用共享相同索引的元数据信息,例如:单位。
  • normalizerkeyword 字段的 normalizer 属性类似于 analyzer ,只是它保证分析链只产生单个标记。 normalizer 在索引 keyword 之前应用,以及在搜索时通过查询解析器(例如匹配查询)或通过术语级别查询(例如术语查询)搜索关键字字段时应用。
  • normsnorms 存储在查询时使用的各种规范化因子,以便计算文档的相关性评分。
  • null_value:null 值无法被索引和搜索。当一个字段被设为 null,则被视为没有值。null_value 允许将空值替换为指定值,以便对其进行索引和搜索。
  • position_increment_gap:分析的文本字段会考虑术语位置,以便能够支持邻近或短语查询。当索引具有多个值的文本字段时,值之间会添加一个“假”间隙,以防止大多数短语查询在值之间匹配。此间隙的大小使用 position_increment_gap 配置,默认为 100。
  • properties:类型映射、对象字段和嵌套字段包含的子字段,都称为属性。这些属性可以是任何数据类型,包括对象和嵌套。
  • search_analyzer:通常,在索引时和搜索时应使用相同的分析器,以确保查询中的术语与倒排索引中的术语格式相同。但是,有时在搜索时使用不同的分析器可能是有意义的,例如使用 edge_ngram 标记器实现自动补全或使用同义词搜索时。
  • similarity:Elasticsearch 允许为每个字段配置文本评分算法或相似度。相似度设置提供了一种选择文本相似度算法的简单方法,而不是默认的 BM25,例如布尔值。只有 textkeyword 等基于文本的字段类型支持此配置。
  • store:默认情况下,对字段值进行索引以使其可搜索,但不会存储它们。这意味着可以查询该字段,但无法检索原始字段值。通常这不重要,字段值已经是默认存储的 _source 字段的一部分。如果您只想检索单个字段或几个字段的值,而不是整个 _source,则可以通过 source filtering 来实现。
  • term_vector:term_vector 包含有关分析过程产生的术语的信息,包括:
    • 术语列表
    • 每个 term 的位置(或顺序)
    • 起始和结束字符偏移量,用于将 term 和原始字符串进行映射
    • 有效负载(如果可用) - 用户定义的,与 term 位置相关的二进制数据

映射配置

  • index.mapping.total_fields.limit:索引中的最大字段数。字段和对象映射以及字段别名计入此限制。默认值为 1000
  • index.mapping.depth.limit:字段的最大深度,以内部对象的数量来衡量。例如,如果所有字段都在根对象级别定义,则深度为 1。如果有一个对象映射,则深度为 2,以此类推。默认值为 20
  • index.mapping.nested_fields.limit:索引中不同 nested 映射的最大数量。 nested 类型只应在特殊情况下使用,即需要相互独立地查询对象数组。为了防止设计不佳的映射,此设置限制了每个索引的唯一 nested 类型的数量。默认值为 50
  • index.mapping.nested_objects.limit:单个文档中,所有 nested 类型中包含的最大嵌套 JSON 对象数。当文档包含太多 nested 对象时,此限制有助于防止出现内存溢出。默认值为 10000
  • index.mapping.field_name_length.limit:设置字段名称的最大长度。默认为 Long.MAX_VALUE(无限制)。

参考资料

Flink 运维

(1)使用 docker 命令拉取镜像

1
docker pull flink

(2)编写 docker-compose.yml,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
version: '2.1'
services:
jobmanager:
image: flink
expose:
- '6123'
ports:
- '8081:8081'
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

taskmanager:
image: flink
expose:
- '6121'
- '6122'
depends_on:
- jobmanager
command: taskmanager
links:
- 'jobmanager:jobmanager'
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

(3)执行 docker-compose,命令如下:

1
docker-compose up -d

(4)打开浏览器,访问 http://127.0.0.1:8081

基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# jobManager 的IP地址
jobmanager.rpc.address: localhost

# JobManager 的端口号
jobmanager.rpc.port: 6123

# JobManager JVM heap 内存大小
jobmanager.heap.size: 1024m

# TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024m

# 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 1

# 程序默认并行计算的个数
parallelism.default: 1
# 文件系统来源
# fs.default-scheme

高可用配置

1
2
3
4
5
6
7
8
9
10
11
# 可以选择 'NONE' 或者 'zookeeper'.
# high-availability: zookeeper

# 文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/

# zookeeper 集群中仲裁者的机器 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181

# 默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open

容错和 checkpoint 配置

1
2
3
4
5
6
7
8
9
10
11
# 用于存储和检查点状态
# state.backend: filesystem

# 存储检查点的数据文件和元数据的默认目录
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# savepoints 的默认目标目录(可选)
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# 用于启用/禁用增量 checkpoints 的标志
# state.backend.incremental: false

Web UI 配置

1
2
3
4
5
6
7
# 基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0

# Web 的运行时监视器端口
rest.port: 8081
# 是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false

高级配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# io.tmp.dirs: /tmp

# 是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
# taskmanager.memory.preallocate: false

# 类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
# classloader.resolve-order: child-first

# 用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数

# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 67108864
# taskmanager.network.memory.max: 1073741824
1
2
3
4
5
6
7
8
9
10
11
# 指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true

# 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab

# 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user

# 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,`Client,KafkaClient`使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient

Zookeeper 安全配置

1
2
3
4
5
# 覆盖以下配置以提供自定义 ZK 服务名称
# zookeeper.sasl.service-name: zookeeper

# 该配置必须匹配 "security.kerberos.login.contexts" 中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client

参考资料