Dunwu Blog

大道至简,知易行难

《后端存储实战课》笔记

课前加餐丨电商系统是如何设计的?

创建和更新订单时,如何保证数据准确无误?

流量大、数据多的商品详情页系统该如何设计?

复杂而又重要的购物车系统,应该如何设计?

事务:账户余额总是对不上账,怎么办?

分布式事务:如何保证多个系统间的数据是一致的?

分布式事务常见解决方案:

  • 2PC
  • 3PC
  • TCC
  • Saga
  • 本地消息表

个人以前总结:分布式事务

如何用 Elasticsearch 构建商品搜索系统?

搜索领域的核心问题是进行全文匹配。一般的关系型数据库,如 Mysql 的索引(InnoDB 为 B 树索引)不适用于全文检索,导致查询时只能全表扫描,性能很差。

搜索引擎(典型代表:Elasticsearch)通过倒排索引技术,很好的支持了全文检索。但是,倒排索引的写入和更新性能相较于 B 树索引较差,因此不适用于更新频繁的数据。

MySQL HA:如何将“删库跑路”的损失降到最低?

Mysql 复制(略)

一个几乎每个系统必踩的坑儿:访问数据库超时

数据库超时分析经验:

  • 根据故障时段在系统忙时,推断出故障是跟支持用户访问的功能有关。
  • 根据系统能在流量峰值过后自动恢复这一现象,排除后台服务被大量请求打死的可能性。
  • 根据 CPU 利用率的变化曲线,如果满足一定的周期性波动,可推断出大概率和定时任务有关。这些定时任务负责刷新数据缓存。如果确实是因为刷新缓存定时任务导致的,需要针对性优化。
  • 如果 Mysql CPU 过高,大概率是慢 SQL 导致的,优先排查慢 SQL 日志,找出查询特别慢的表。看看该表是不是需要加缓存。

避免访问数据库超时的注意点:

  • 开发时,考虑 SQL 相关表的数据规模,查询性能,是否匹配索引等等,避免出现慢 SQL
  • 设计上,考虑减少查询次数,如使用缓存
  • 系统支持自动杀慢 SQL
  • 支持熔断、降级,减少故障影响范围

怎么能避免写出慢 SQL?

数据表不宜过大,一般不要超过千万条数据。

根据实际情况,尽量设计好索引,以提高查询、排序效率。

如果出现慢 SQL,需要改造索引时,可以通过执行计划进行分析。

走进黑盒:SQL 是如何在数据库中执行的?

MySQL 如何应对高并发(一):使用缓存保护 MySQL

MySQL 如何应对高并发(二):读写分离

img

MySQL 主从数据库同步是如何实现的?

基于 binlog 进行数据同步

订单数据越来越多,数据库越来越慢该怎么办?

针对大表,为了优化其查询性能,可以将历史数据归档。一般可以考虑归档到列式数据库,如:Hive

MySQL 存储海量数据的最后一招:分库分表

分库分表

用 Redis 构建缓存集群的最佳实践有哪些

Redis 3.0 后,官方提供 Redis Cluster 来解决数据量大、高可用和高并发问题。

相关文章:Redis 集群

大厂都是怎么做 MySQL to Redis 同步的?

缓存穿透:把全量数据都放在 Redis 集群,服务通过接受 MQ 消息,去触发更新缓存数据。

使用 Binlog 实时更新 Redis 缓存,如 Canal

分布式存储:你知道对象存储是如何保存图片文件的吗?

保存图片、音频、视频这种相对较大的文件,一般使用对象存储。如:HDFS 等。

元数据管理:ZooKeeper、etcd、Nacos

对象如何拆分和保存:将大文件分块(block),提升 IO 效率并方便维护。

跨系统实时同步数据,分布式事务是唯一的解决方案吗?

跨系统实时同步数据:

  • 早期方案:使用 ETL 定时同步数据,在 T+1 时刻去同步上一周期的数据,然后进行计算和分析。
  • 使用 Binlog 和 MQ 构建实时数据同步系统

如何保证数据同步的实时性

  • 为了能够支撑众多下游数据库实时同步的需求,可以通过 MQ 解耦上下游,Binlog 先发送到 MQ 中,下游各业务方可以消费 MQ 中的消息再写入各自的数据库。
  • 如果下游处理能力不能满足要求,可以增加 MQ 中的分区数量实现并发同步,但需要结合同步的业务数据特点,把具有因果关系的数据哈希到相同分区上,才能避免因为并发乱序而出现数据同步错误的问题。

如何在不停机的情况下,安全地更换数据库?

  • 停机迁移/扩容
    • 优点:简单粗暴;没有数据一致性问题
    • 缺点:需要停机
  • 双写迁移
    • 优点:不需要停机
    • 缺点:方案较复杂
  • 主从升级
    • 优点:不需要停机;无需数据迁移
    • 缺点:需要冗余的从库

类似“点击流”这样的海量数据应该如何存储?

使用 Kafka 暂存海量原始数据,然后再使用大数据计算框架(Spark、Flink)进行计算。

其他方案:

分布式流数据存储,如:Pravega、Pulsar 的存储引擎 BookKeeper

时序数据库,如:InfluxDB、OpenTSDB 等。

面对海量数据,如何才能查得更快

实时计算:Flink、Storm

批处理计算:Map-Reduce、Spark

海量数据存储:

  • 列式数据库(在正确使用的前提下,10GB 量级的数据查询基本上可以做到秒级返回):HBase、Cassandra
  • 搜索引擎(对于 TB 量级以下的数据,如果可以接受相对比较贵的硬件成本):Elasticsearch

MySQL 经常遇到的高可用、分片问题,NewSQL 是如何解决的?

安利 CockroachDB、RocksDB、OceanBase

RocksDB:不丢数据的高性能 KV 存储、

越来越多的新生代数据库,都选择 RocksDB 作为它们的存储引擎。

Redis 是一个内存数据库,所以它很快。

RocksDB 是一个持久化的 KV 存储,它需要保证每条数据都要安全地写到磁盘上。磁盘的读写性能和内
存读写性能差着一两个数量级,读写磁盘的 RocksDB,能和读写内存的 Redis 做到相近的性能,这就是 RocksDB 的价值所在了。

RocksDB 性能好,是由于使用了 LSM 树结构。

LSM-Tree 的全称是:The Log-Structured Merge-Tree,是一种非常复杂的复合数据结构,它包含了 WAL(Write Ahead Log)、跳表(SkipList)和一个分层的有序表(SSTable,Sorted String Table)。

参考资料

  • 后端存储实战课 - 极客教程【入门】:讲解存储在电商领域的种种应用和一些基本特性

数据结构与数据库索引

关键词:链表、数组、散列表、红黑树、B+ 树、LSM 树、跳表

引言

数据库是“按照 数据结构 来组织、存储和管理数据的仓库”。是一个长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。

——上面这句定义对数据库的定义来自百度百科。通过这个定义,我们也能明显看出数据结构是实现数据库的基石。

从本质来看,数据库只负责两件事:读数据、写数据;而数据结构研究的是如何合理组织数据,尽可能提升读、写数据的效率,这恰好是数据库的核心问题。因此,数据结构与数据库这两个领域有非常多的交集。其中,数据库索引最能体现二者的紧密关联。

索引是数据库为了提高查找效率的一种数据结构。索引基于原始数据衍生而来,它的主要作用是缩小检索的数据范围,提升查询性能。通俗来说,索引在数据库中的作用就像是一本书的目录索引。索引对于良好的性能非常关键,在数据量小且负载较低时,不恰当的索引对于性能的影响可能还不明显;但随着数据量逐渐增大,性能则会急剧下降。因此,索引优化应该是查询性能优化的最有效手段

很多数据库允许单独添加和删除索引,而不影响数据库的内容,它只会影响查询性能。维护额外的结构势必会引入开销,特别是在新数据写入时。对于写入,它很难超过简单地追加文件方式的性能,因为那已经是最简单的写操作了。由于每次写数据时,需要更新索引,因此任何类型的索引通常都会降低写的速度。

本文以一些常见的数据库为例,分析它们的索引采用了什么样的数据结构,有什么利弊,为何如此设计。

数组和链表

数组和链表分别代表了连续空间和不连续空间的存储方式,它们是线性表(Linear List)的典型代表。其他所有的数据结构,比如栈、队列、二叉树、B+ 树等,实际上都是这两者的结合和变化。

数组用连续的内存空间来存储数据。数组**支持随机访问,根据下标随机访问的时间复杂度为 O(1)**。但这并不代表数组的查找时间复杂度也是 O(1)

  • **对于无序数组,只能顺序查找,其时间复杂度为 O(n)**。
  • **对于有序数组,可以应用二分查找法,其时间复杂度为 O(log n)**。

在有序数组上应用二分查找法如此高效,为什么几乎没有数据库直接使用数组作为索引?这是因为它的限制条件:数据有序——为了保证数据有序,每次添加、删除数组数据时,都必须要进行数据调整,来保证其有序,而 **数组的插入/删除操作,时间复杂度为 O(n)**。此外,由于数组空间大小固定,每次扩容只能采用复制数组的方式。数组的这些特性,决定了它不适合用于数据频繁变化的应用场景。

img

链表用不连续的内存空间来存储数据;并通过一个指针按顺序将这些空间串起来,形成一条链

区别于数组,链表中的元素不是存储在内存中连续的一片区域,链表中的数据存储在每一个称之为“结点”复合区域里,在每一个结点除了存储数据以外,还保存了到下一个节点的指针(Pointer)。由于不必按顺序存储,**链表的插入/删除操作,时间复杂度为 O(1)**,但是,链表只支持顺序访问,其 **查找时间复杂度为 O(n)**。其低效的查找方式,决定了链表不适合作为索引。

img

哈希索引

哈希表是一种以键 - 值(key-value)对形式存储数据的结构,我们只要输入待查找的值即 key,就可以找到其对应的值即 Value。

哈希表 使用 哈希函数 组织数据,以支持快速插入和搜索的数据结构。哈希表的本质是一个数组,其思路是:使用 Hash 函数将 Key 转换为数组下标,利用数组的随机访问特性,使得我们能在 O(1) 的时间代价内完成检索。

img

有两种不同类型的哈希表:哈希集合哈希映射

  • 哈希集合 是集合数据结构的实现之一,用于存储非重复值。
  • 哈希映射 是映射 数据结构的实现之一,用于存储键值对。

哈希索引基于哈希表实现,只适用于等值查询。对于每一行数据,哈希索引都会将所有的索引列计算一个哈希码(hashcode),哈希码是一个较小的值。哈希索引将所有的哈希码存储在索引中,同时在哈希表中保存指向每个数据行的指针。

✔️️️ 哈希索引的优点

  • 因为索引数据结构紧凑,所以查询速度非常快

❌ 哈希索引的缺点

  • 哈希索引值包含哈希值和行指针,而不存储字段值,所以不能使用索引中的值来避免读取行。不过,访问内存中的行的速度很快,所以大部分情况下这一点对性能影响不大。
  • 哈希索引数据不是按照索引值顺序存储的,所以无法用于排序
  • 哈希索引不支持部分索引匹配查找,因为哈希索引时使用索引列的全部内容来进行哈希计算的。如,在数据列 (A,B) 上建立哈希索引,如果查询只有数据列 A,无法使用该索引。
  • 哈希索引只支持等值比较查询,包括 =IN()<=>;不支持任何范围查询,如 WHERE price > 100
  • 哈希索引有可能出现哈希冲突
    • 出现哈希冲突时,必须遍历链表中所有的行指针,逐行比较,直到找到符合条件的行。
    • 如果哈希冲突多的话,维护索引的代价会很高。

因为种种限制,所以哈希索引只适用于特定的场合。而一旦使用哈希索引,则它带来的性能提升会非常显著。例如,Mysql 中的 Memory 存储引擎就显示的支持哈希索引。

B-Tree 索引

通常我们所说的 B 树索引是指 B-Tree 索引,它是目前关系型数据库中查找数据最为常用和有效的索引,大多数存储引擎都支持这种索引。使用 B-Tree 这个术语,是因为 MySQL 在 CREATE TABLE 或其它语句中使用了这个关键字,但实际上不同的存储引擎可能使用不同的数据结构,比如 InnoDB 使用的是 B+Tree索引;而 MyISAM 使用的是 B-Tree索引。

B-Tree 索引中的 B 是指 balance,意为平衡。需要注意的是,B-Tree 索引并不能找到一个给定键值的具体行,它找到的只是被查找数据行所在的页,接着数据库会把页读入到内存,再在内存中进行查找,最后得到要查找的数据。

二叉搜索树

二叉搜索树的特点是:每个节点的左儿子小于父节点,父节点又小于右儿子。其查询时间复杂度是 O(log n)

当然为了维持 O(log n) 的查询复杂度,你就需要保持这棵树是平衡二叉树。为了做这个保证,更新的时间复杂度也是 O(log n)

随着数据库中数据的增加,索引本身大小随之增加,不可能全部存储在内存中,因此索引往往以索引文件的形式存储的磁盘上。这样的话,索引查找过程中就要产生磁盘 I/O 消耗,相对于内存存取,I/O 存取的消耗要高几个数量级。可以想象一下一棵几百万节点的二叉树的深度是多少?如果将这么大深度的一颗二叉树放磁盘上,每读取一个节点,需要一次磁盘的 I/O 读取,整个查找的耗时显然是不能够接受的。那么如何减少查找过程中的 I/O 存取次数?

一种行之有效的解决方法是减少树的深度,将二叉树变为 N 叉树(多路搜索树),而 B+ 树就是一种多路搜索树

B+Tree 索引

B+ 树索引适用于全键值查找键值范围查找键前缀查找,其中键前缀查找只适用于最左前缀查找。

理解 B+Tree,只需要理解其最重要的两个特征即可:

  • 第一,所有的关键字(可以理解为数据)都存储在叶子节点,非叶子节点并不存储真正的数据,所有记录节点都是按键值大小顺序存放在同一层叶子节点上。
  • 其次,所有的叶子节点由指针连接。如下图为简化了的B+Tree

img

根据叶子节点的内容,索引类型分为主键索引和非主键索引。

  • 聚簇索引(clustered):又称为主键索引,其叶子节点存的是整行数据。因为无法同时把数据行存放在两个不同的地方,所以一个表只能有一个聚簇索引InnoDB 的聚簇索引实际是在同一个结构中保存了 B 树的索引和数据行
  • 非主键索引的叶子节点内容是主键的值。在 InnoDB 里,非主键索引也被称为二级索引(secondary)。数据存储在一个位置,索引存储在另一个位置,索引中包含指向数据存储位置的指针。可以有多个,小于 249 个。

聚簇表示数据行和相邻的键值紧凑地存储在一起,因为数据紧凑,所以访问快。因为无法同时把数据行存放在两个不同的地方,所以一个表只能有一个聚簇索引

聚簇索引和非聚簇索引的查询有什么区别

  • 如果语句是 select * from T where ID=500,即聚簇索引查询方式,则只需要搜索 ID 这棵 B+ 树;
  • 如果语句是 select * from T where k=5,即非聚簇索引查询方式,则需要先搜索 k 索引树,得到 ID 的值为 500,再到 ID 索引树搜索一次。这个过程称为回表

也就是说,基于非聚簇索引的查询需要多扫描一棵索引树。因此,我们在应用中应该尽量使用主键查询。

显然,主键长度越小,非聚簇索引的叶子节点就越小,非聚簇索引占用的空间也就越小。

自增主键是指自增列上定义的主键,在建表语句中一般是这么定义的: NOT NULL PRIMARY KEY AUTO_INCREMENT。从性能和存储空间方面考量,自增主键往往是更合理的选择。有没有什么场景适合用业务字段直接做主键的呢?还是有的。比如,有些业务的场景需求是这样的:

  • 只有一个索引;
  • 该索引必须是唯一索引。

由于没有其他索引,所以也就不用考虑其他索引的叶子节点大小的问题。这时候我们就要优先考虑上一段提到的“尽量使用主键查询”原则,直接将这个索引设置为主键,可以避免每次查询需要搜索两棵树。


内存是半导体元件。对于内存而言,只要给出了内存地址,我们就可以直接访问该地址取出数据。这个过程具有高效的随机访问特性,因此内存也叫随机访问存储器(Random Access Memory,即 RAM)。内存的访问速度很快,但是价格相对较昂贵,因此一般的计算机内存空间都相对较小。

而磁盘是机械器件。磁盘访问数据时,需要等磁盘盘片旋转到磁头下,才能读取相应的数据。尽管磁盘的旋转速度很快,但是和内存的随机访问相比,性能差距非常大。一般来说,如果是随机读写,会有 10 万到 100 万倍左右的差距。但如果是顺序访问大批量数据的话,磁盘的性能和内存就是一个数量级的。

磁盘的最小读写单位是扇区,较早期的磁盘一个扇区是 512 字节。随着磁盘技术的发展,目前常见的磁盘扇区是 4K 个字节。操作系统一次会读写多个扇区,所以操作系统的最小读写单位是块(Block),也叫作簇(Cluster)。当我们要从磁盘中读取一个数据时,操作系统会一次性将整个块都读出来。因此,对于大批量的顺序读写来说,磁盘的效率会比随机读写高许多。

假设有一个有序数组存储在硬盘中,如果它足够大,那么它会存储在多个块中。当我们要对这个数组使用二分查找时,需要先找到中间元素所在的块,将这个块从磁盘中读到内存里,然后在内存中进行二分查找。如果下一步要读的元素在其他块中,则需要再将相应块从磁盘中读入内存。直到查询结束,这个过程可能会多次访问磁盘。我们可以看到,这样的检索性能非常低。

由于磁盘相对于内存而言访问速度实在太慢,因此,对于磁盘上数据的高效检索,我们有一个极其重要的原则:对磁盘的访问次数要尽可能的少!

将索引和数据分离就是一种常见的设计思路。在数据频繁变化的场景中,有序数组并不是一个最好的选择,二叉检索树或者哈希表往往更有普适性。但是,哈希表由于缺乏范围检索的能力,在一些场合也不适用。因此,二叉检索树这种树形结构是许多常见检索系统的实施方案。

随着索引数据越来越大,直到无法完全加载到内存中,这是需要将索引数据也存入磁盘中。B+ 树给出了将树形索引的所有节点都存在磁盘上的高效检索方案。操作系统对磁盘数据的访问是以块为单位的。因此,如果我们想将树型索引的一个节点从磁盘中读出,即使该节点的数据量很小(比如说只有几个字节),但磁盘依然会将整个块的数据全部读出来,而不是只读这一小部分数据,这会让有效读取效率很低。B+ 树的一个关键设计,就是让一个节点的大小等于一个块的大小。节点内存储的数据,不是一个元素,而是一个可以装 m 个元素的有序数组。这样一来,我们就可以将磁盘一次读取的数据全部利用起来,使得读取效率最大化。

B+ 树还有另一个设计,就是将所有的节点分为内部节点和叶子节点。内部节点仅存储 key 和维持树形结构的指针,并不存储 key 对应的数据(无论是具体数据还是文件位置信息)。这样内部节点就能存储更多的索引数据,我们也就可以使用最少的内部节点,将所有数据组织起来了。而叶子节点仅存储 key 和对应数据,不存储维持树形结构的指针。通过这样的设计,B+ 树就能做到节点的空间利用率最大化。此外,B+ 树还将同一层的所有节点串成了有序的双向链表,这样一来,B+ 树就同时具备了良好的范围查询能力和灵活调整的能力了。

因此,B+ 树是一棵完全平衡的 m 阶多叉树。所谓的 m 阶,指的是每个节点最多有 m 个子节点,并且每个节点里都存了一个紧凑的可包含 m 个元素的数组。

即使是复杂的 B+ 树,我们将它拆解开来,其实也是由简单的数组、链表和树组成的,而且 B+ 树的检索过程其实也是二分查找。因此,如果 B+ 树完全加载在内存中的话,它的检索效率其实并不会比有序数组或者二叉检索树更
高,也还是二分查找的 log(n) 的效率。并且,它还比数组和二叉检索树更加复杂,还会带来额外的开销。

另外,这一节还有一个很重要的设计思想需要你掌握,那就是将索引和数据分离。通过这样的方式,我们能将索引的数组大小保持在一个较小的范围内,让它能加载在内存中。在许多大规模系统中,都是使用这个设计思想来精简索引的。而且,B+ 树的内部节点和叶子节点的区分,其实也是索引和数据分离的一次实践。

MySQL 中的 B+ 树实现其实有两种,一种是 MyISAM 引擎,另一种是 InnoDB 引擎。它们的核心区别就在于,数据和索引是否是分离的。

在 MyISAM 引擎中,B+ 树的叶子节点仅存储了数据的位置指针,这是一种索引和数据分离的设计方案,叫作非聚集索引。如果要保证 MyISAM 的数据一致性,那我们需要在表级别上进行加锁处理。

在 InnoDB 中,B+ 树的叶子节点直接存储了具体数据,这是一种索引和数据一体的方案。叫作聚集索引。由于数据直接就存在索引的叶子节点中,因此 InnoDB 不需要给全表加锁来保证一致性,它只需要支持行级的锁就可以了。

LSM 树

B+ 树的数据都存储在叶子节点中,而叶子节点一般都存储在磁盘中。因此,每次插入的新数据都需要随机写入磁盘,而随机写入的性能非常慢。如果是一个日志系统,每秒钟要写入上千条甚至上万条数据,这样的磁盘操作代价会使得系统性能急剧下降,甚至无法使用。

操作系统对磁盘的读写是以块为单位的,我们能否以块为单位写入,而不是每次插入一个数据都要随机写入磁盘呢?这样是不是就可以大幅度减少写入操作了呢?解决方案就是:LSM 树(Log Structured Merge Trees)。

LSM 树就是根据这个思路设计了这样一个机制:当数据写入时,延迟写磁盘,将数据先存放在内存中的树里,进行常规的存储和查询。当内存中的树持续变大达到阈值时,再批量地以块为单位写入磁盘的树中。因此,LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

LSM 树具有以下 3 个特点:

  1. 将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并(Merge Trees);
  2. 用批量写入代替随机写入,并且用预写日志 WAL 技术(Write AheadLog,预写日志技术)保证内存数据,在系统崩溃后可以被恢复;
  3. 数据采取类似日志追加写的方式写入(Log Structured)磁盘,以顺序写的方式提高写
    入效率。

LSM 树的这些特点,使得它相对于 B+ 树,在写入性能上有大幅提升。所以,许多 NoSQL 系统都使用 LSM 树作为检索引擎,而且还对 LSM 树进行了优化以提升检索性能。

倒排索引

倒排索引的核心其实并不复杂,它的具体实现其实是哈希表,只是它不是将文档 ID 或者题目作为 key,而是反过来,通过将内容或者属性作为 key 来存储对应的文档列表,使得我们能在 O(1) 的时间代价内完成查询。

尽管原理并不复杂,但是倒排索引是许多检索引擎的核心。比如说,数据库的全文索引功能、搜索引擎的索引、广告引擎和推荐引擎,都使用了倒排索引技术来实现检索功能。

索引的维护

创建索引

  • 数据压缩:一个是尽可能地将数据加载到内存中,因为内存的检索效率大大高于磁盘。那为了将数据更多地加载到内存中,索引压缩是一个重要的研究方向。
  • 分支处理:另一个是将大数据集合拆成多个小数据集合来处理。这其实就是分布式系统的核心思想。

更新索引

(1)Double Buffer(双缓冲)机制

就是在内存中同时保存两份一样的索引,一个是索引 A,一个是索引 B。两个索引保持一个读、一个写,并且来回切换,最终完成高性能的索引更新。

优点:简单高效

缺点:达到一定数据量级后,会带来翻倍的内存开销,甚至有些索引存储在磁盘上的情况下,更是无法使用此机制。

(2)全量索引和增量索引

将新接收到的数据单独建立一个可以存在内存中的倒排索引,也就是增量索引。当查询发生的时候,我们会同时查询全量索引和增量索引,将合并的结果作为总的结果输出。

因为增量索引相对全量索引而言会小很多,内存资源消耗在可承受范围,所以我们可以使用 Double Buffer 机制
对增量索引进行索引更新。这样一来,增量索引就可以做到无锁访问。而全量索引本身就是只读的,也不需要加锁。因此,整个检索过程都可以做到无锁访问,也就提高了系统的检索效率。

参考资料

复杂度分析

为什么需要复杂度分析

衡量算法的优劣,有两种评估方式:事前估计和后期测试。

后期测试有性能测试、基准测试(Benchmark)等手段。

但是,后期测试有以下限制:

  • 测试结果非常依赖测试环境。如:不同机型、不同编译器版本、不同硬件配置等等,都会影响测试结果。
  • 测试结果受数据规模的影响很大

所以,需要一种方法,可以不受环境或数据规模的影响,粗略地估计算法的执行效率。这种方法就是复杂度分析。

时间复杂度分析

大 O 表示法

假设问题的规模为 n,则程序的时间复杂度表示为 T(n)代码的执行时间 T(n) 与每行代码的执行次数 n 成正比

当 n 增大时,T(n) 也随之增大,想要准确估计其变化比较困难。所以,可以采用大 O 时间复杂度来粗略估计其复杂度,其表达式为:**T(n) = O(f(n))**。

大 O 表示法实际上并不具体表示代码真正的执行时间,而是表示代码执行时间随数据规模增长的变化趋势,所以,也叫作渐进时间复杂度(asymptotic time complexity),简称时间复杂度

时间复杂度分析的要点

  • 只关注循环执行次数最多的一段代码
  • 加法法则:总复杂度等于量级最大的那段代码的复杂度
  • 乘法法则:嵌套代码的复杂度等于嵌套内外代码复杂度的乘积

最好、最坏和平均情况

  • 最好情况时间复杂度(best case time complexity):在最理想的情况下,执行代码的时间复杂度。例如:在最理想的情况下,要查找的变量 x 正好是数组的第一个元素,此时最好情况时间复杂度为 1。
  • 最坏情况时间复杂度(worst case time complexity):在最糟糕的情况下,执行代码的时间复杂度。例如:在最理想的情况下,要查找的变量 x 正好是数组的最后个元素,此时最好情况时间复杂度为 n。
  • 平均情况时间复杂度(average case time complexity):平均时间复杂度的全称应该叫加权平均时间复杂度或者期望时间复杂度

时间复杂度分析示例

【示例】从 1 累加到 100 的时间复杂度是多少?

1
2
3
4
5
int sum = 0;
int N = 100;
for (int i = 1; i <= N; i++) {
sum = sum + i;
}

时间复杂度计算:显然,这段代码执行了 100 次加法,其时间复杂度和 N 的大小完全一致

1
T(n) = O(n)

【示例】嵌套循环的时间复杂度是多少?

1
2
3
4
5
6
7
int M = 10;
int N = 20;
for (int i = 1; i < M; i++) {
for (int j = 1; j < N; j++) {
System.out.println("i = " + i + ", j = " + j);
}
}

时间复杂度计算:

1
T(n) = (M-1)(N-1) = O(M*N) ≈ O(N^2)

【示例】递归函数的时间复杂度是多少?思考一下斐波那契数列 f(n) = f(n-1) + f(n-2) 的时间复杂度是多少?

img

1
T(n) = O(2^N)

空间复杂度分析

时间复杂度的全称是渐进时间复杂度表示算法的执行时间与数据规模之间的增长关系

类比一下,空间复杂度全称就是渐进空间复杂度(asymptotic space complexity),表示算法的存储空间与数据规模之间的增长关系

复杂度量级

复杂度有以下量级:

  • **O(1)**:常数复杂度
  • **O(log n)**:对数复杂度
  • **O(n)**:线性复杂度
  • **O(nlog n)**:线性对数阶复杂度
  • **O(n^2)**:平方复杂度
  • **O(n^3)**:立方复杂度
  • **O(n^k)**:K 次方复杂度
  • **O(2^n)**:指数复杂度
  • **O(n!)**:阶乘复杂度

在数据量比较小的时候,复杂度量级差异并不明显;但是,随着数据规模大小的变化,差异会逐渐突出。

img

O(1) 复杂度示例:

1
2
int num = 100;
System.out.println("num = " + num);

O(log n) 对数复杂度示例:

1
2
3
4
int max = 100;
for (int i = 1; i < max; i = i * 2) {
System.out.println("i = " + i);
}

O(n) 复杂度示例:

1
2
3
4
int max = 100;
for (int i = 1; i < max; i++) {
System.out.println("i = " + i);
}

O(n^2) 复杂度示例:

1
2
3
4
5
6
7
int M = 10;
int N = 20;
for (int i = 1; i < M; i++) {
for (int j = 1; j < N; j++) {
System.out.println("i = " + i + ", j = " + j);
}
}

O(k^n) 复杂度示例:

1
2
3
4
int max = 10;
for (int i = 1; i <= Math.pow(2, max); i++) {
System.out.println("i = " + i);
}

常见数据结构的复杂度

img

参考资料

Flink Table API & SQL

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。

jar 依赖

必要依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

如果你想实现自定义格式或连接器 用于(反)序列化行或一组用户定义的函数,下面的依赖就足够了,编译出来的 jar 文件可以直接给 SQL Client 使用:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>

概念与通用 API

Table API 和 SQL 集成在同一套 API 中。 这套 API 的核心概念是Table,用作查询的输入和输出。

Table API 和 SQL 程序的结构

所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;

// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);

// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build())

// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");

// Create a Table object from a Table API query
Table table2 = tableEnv.from("SourceTable");

// Create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");

// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("SinkTable");

创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStreamTable 之间的转换(面向 StreamTableEnvironment )

在 Catalog 中创建表

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。

Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。

查询表

Table API 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

// emit or convert Table
// execute query

SQL 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);

// emit or convert Table
// execute query

输出表

Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink

数据类型

通用类型与(嵌套的)复合类型 (如:POJO、tuples、rows、Scala case 类) 都可以作为行的字段。

复合类型的字段任意的嵌套可被 值访问函数 访问。

通用类型将会被视为一个黑箱,且可以被 用户自定义函数 传递或引用。

SQL

Flink 支持以下语句:

参考资料

LSM 树

什么是 LSM 树

LSM 树具有以下 3 个特点:

  1. 将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并(Merge Trees);
  2. 用批量写入代替随机写入,并且用预写日志 WAL 技术(Write AheadLog,预写日志技术)保证内存数据,在系统崩溃后可以被恢复;
  3. 数据采取类似日志追加写的方式写入(Log Structured)磁盘,以顺序写的方式提高写
    入效率。

LSM 树的这些特点,使得它相对于 B+ 树,在写入性能上有大幅提升。所以,许多 NoSQL 系统都使用 LSM 树作为检索引擎,而且还对 LSM 树进行了优化以提升检索性能。

LSM 树就是根据这个思路设计了这样一个机制:当数据写入时,延迟写磁盘,将数据先存放在内存中的树里,进行常规的存储和查询。当内存中的树持续变大达到阈值时,再批量地以块为单位写入磁盘的树中。因此,LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

如何将内存数据与磁盘数据合并

可以参考两个有序链表归并排序的过程,将 C0 树和 C1 树的所有叶子节点中存储的数据,看作是两个有序链表,那滚动合并问题就变成了我们熟悉的两个有序链表的归并问题。不过由于涉及磁盘操作,那为了提高写入效率和检索效率,我们还需要针对磁盘的特性,在一些归并细节上进行优化。

img

由于磁盘具有顺序读写效率高的特性,因此,为了提高 C1 树中节点的读写性能,除了根节点以外的节点都要尽可能地存放到连续的块中,让它们能作为一个整体单位来读写。这种包含多个节点的块就叫作多页块(Multi-Pages Block)。

第一步,以多页块为单位,将 C1 树的当前叶子节点从前往后读入内存。读入内存的多页块,叫作清空块(Emptying Block),意思是处理完以后会被清空。

第二步,将 C0 树的叶子节点和清空块中的数据进行归并排序,把归并的结果写入内存的一个新块中,叫作填充块(Filling Block)。

第三步,如果填充块写满了,我们就要将填充块作为新的叶节点集合顺序写入磁盘。这个时候,如果 C0 树的叶子节点和清空块都没有遍历完,我们就继续遍历归并,将数据写入新的填充块。如果清空块遍历完了,我们就去 C1 树中顺序读取新的多页块,加载到清空块中。

第四步,重复第三步,直到遍历完 C0 树和 C1 树的所有叶子节点,并将所有的归并结果写入到磁盘。这个时候,我们就可以同时删除 C0 树和 C1 树中被处理过的叶子节点。这样就完成了滚动归并的过程。

img

LSM 树是如何检索

因为同时存在 C0 和 C1 树,所以要查询一个 key 时,我们会先到 C0 树中查询。如果查询到了则直接返回;如过没有查询到,则查询 C1 树。

需要注意一种特殊情况:删除操作。假设某数据在 C0 树中被删除了,但是在 C1 树中仍存在。这此时查询时,可以在 C1 树中查到这个 key,这其实是过期数据了,如何应对这种情况呢?对于被删除的数据,可以将这些数据的 key 插入到 C0 树中,并标记一个删除标志。如果查到了一个带着删除标志的 key,就直接返回查询失败。

为什么需要 LSM 树

在关系型数据库中,通常使用 B+ 树作为索引。B+ 树的数据都存储在叶子节点中,而叶子节点一般都存储在磁盘中。因此,每次插入的新数据都需要随机写入磁盘,而随机写入的性能非常慢。如果是一个日志系统,每秒钟要写入上千条甚至上万条数据,这样的磁盘操作代价会使得系统性能急剧下降,甚至无法使用。

操作系统对磁盘的读写是以块为单位的,我们能否以块为单位写入,而不是每次插入一个数据都要随机写入磁盘呢?这样是不是就可以大幅度减少写入操作了呢?解决方案就是:LSM 树(Log Structured Merge Trees)。

WAL 技术

LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

如果机器断电或系统崩溃了,那内存中还未写入磁盘的数据岂不就永远丢失了?这种情况我们该如何解决呢?

为了保证内存中的数据在系统崩溃后能恢复,可以使用 WAL 技术(Write Ahead Log,预写日志技术)将数据第一时间高效写入磁盘进行备份。

WAL 技术保存和恢复数据的具体步骤如下:

  1. 内存中的程序在处理数据时,会先将对数据的修改作为一条记录,顺序写入磁盘的 log 文件作为备份。由于磁盘文件的顺序追加写入效率很高,因此许多应用场景都可以接受这种备份处理。
  2. 在数据写入 log 文件后,备份就成功了。接下来,该数据就可以长期驻留在内存中了。
  3. 系统会周期性地检查内存中的数据是否都被处理完了(比如,被删除或者写入磁盘),并且生成对应的检查点(Check Point)记录在磁盘中。然后,我们就可以随时删除被处理完的数据了。这样一来,log 文件就不会无限增长了。
  4. 系统崩溃重启,我们只需要从磁盘中读取检查点,就能知道最后一次成功处理的数据在 log 文件中的位置。接下来,我们就可以把这个位置之后未被处理的数据,从 log 文件中读出,然后重新加载到内存中。

img

参考资料

B+树

什么是 B+树

B+树是在二叉查找树的基础上进行了改造:树中的节点并不存储数据本身,而是只是作为索引。每个叶子节点串在一条链表上,链表中的数据是从小到大有序的。

img

改造之后,如果我们要求某个区间的数据。我们只需要拿区间的起始值,在树中进行查找,当查找到某个叶子节点之后,我们再顺着链表往后遍历,直到链表中的结点数据值大于区间的终止值为止。所有遍历到的数据,就是符合区间值的所有数据。

img

但是,我们要为几千万、上亿的数据构建索引,如果将索引存储在内存中,尽管内存访问的速度非常快,查询的效率非常高,但是,占用的内存会非常多。

比如,我们给一亿个数据构建二叉查找树索引,那索引中会包含大约 1 亿个节点,每个节点假设占用 16 个字节,那就需要大约 1GB 的内存空间。给一张表建立索引,我们需要 1GB 的内存空间。如果我们要给 10 张表建立索引,那对内存的需求是无法满足的。如何解决这个索引占用太多内存的问题呢?

我们可以借助时间换空间的思路,把索引存储在硬盘中,而非内存中。我们都知道,硬盘是一个非常慢速的存储设备。通常内存的访问速度是纳秒级别的,而磁盘访问的速度是毫秒级别的。读取同样大小的数据,从磁盘中读取花费的时间,是从内存中读取所花费时间的上万倍,甚至几十万倍。

这种将索引存储在硬盘中的方案,尽管减少了内存消耗,但是在数据查找的过程中,需要读取磁盘中的索引,因此数据查询效率就相应降低很多。

二叉查找树,经过改造之后,支持区间查找的功能就实现了。不过,为了节省内存,如果把树存储在硬盘中,那么每个节点的读取(或者访问),都对应一次磁盘 IO 操作。树的高度就等于每次查询数据时磁盘 IO 操作的次数。

我们前面讲到,比起内存读写操作,磁盘 IO 操作非常耗时,所以我们优化的重点就是尽量减少磁盘 IO 操作,也就是,尽量降低树的高度。那如何降低树的高度呢?

我们来看下,如果我们把索引构建成 m 叉树,高度是不是比二叉树要小呢?如图所示,给 16 个数据构建二叉树索引,树的高度是 4,查找一个数据,就需要 4 个磁盘 IO 操作(如果根节点存储在内存中,其他结点存储在磁盘中),如果对 16 个数据构建五叉树索引,那高度只有 2,查找一个数据,对应只需要 2 次磁盘操作。如果 m 叉树中的 m 是 100,那对一亿个数据构建索引,树的高度也只是 3,最多只要 3 次磁盘 IO 就能获取到数据。磁盘 IO 变少了,查找数据的效率也就提高了。

为什么需要 B+树

关系型数据库中常用 B+ 树作为索引,这是为什么呢?

思考以下经典应用场景

  • 根据某个值查找数据,比如 select * from user where id=1234
  • 根据区间值来查找某些数据,比如 select * from user where id > 1234 and id < 2345

为了提高查询效率,需要使用索引。而对于索引的性能要求,主要考察执行效率和存储空间。如果让你选择一种数据结构去存储索引,你会如何考虑?

以一些常见数据结构为例:

  • 哈希表:哈希表的查询性能很好,时间复杂度是 O(1)。但是,哈希表不能支持按照区间快速查找数据。所以,哈希表不能满足我们的需求。
  • 平衡二叉查找树:尽管平衡二叉查找树查询的性能也很高,时间复杂度是 O(logn)。而且,对树进行中序遍历,我们还可以得到一个从小到大有序的数据序列,但这仍然不足以支持按照区间快速查找数据。
  • 跳表:跳表是在链表之上加上多层索引构成的。它支持快速地插入、查找、删除数据,对应的时间复杂度是 O(logn)。并且,跳表也支持按照区间快速地查找数据。我们只需要定位到区间起点值对应在链表中的结点,然后从这个结点开始,顺序遍历链表,直到区间终点对应的结点为止,这期间遍历得到的数据就是满足区间值的数据。

实际上,数据库索引所用到的数据结构跟跳表非常相似,叫作 B+ 树。不过,它是通过二叉查找树演化过来的,而非跳表。B+树的应用场景

参考资料

字典树

什么是字典树

Trie 树(又叫“前缀树”或“字典树”)是一种用于快速查询“某个字符串/字符前缀”是否存在的数据结构。

  • 根节点(Root)不包含字符,除根节点外的每一个节点都仅包含一个字符;
  • 从根节点到某一节点路径上所经过的字符连接起来,即为该节点对应的字符串;
  • 任意节点的所有子节点所包含的字符都不相同;

img

字典树的构造

img

img

构建 Trie 树的过程,需要扫描所有的字符串,时间复杂度是 O(n)(n 表示所有字符串的长度和)。

字典树非常耗费内存

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

用数组来存储一个节点的子节点的指针。如果字符串中包含从 a 到 z 这 26 个字符,那每个节点都要存储一个长度为 26 的数组,并且每个数组存储一个 8 字节指针(或者是 4 字节,这个大小跟 CPU、操作系统、编译器等有关)。而且,即便一个节点只有很少的子节点,远小于 26 个,比如 3、4 个,我们也要维护一个长度为 26 的数组。

字典树的查找

  1. 每次从根结点开始搜索;
  2. 获取关键词的第一个字符,根据该字符选择对应的子节点,转到该子节点继续检索;
  3. 在相应的子节点上,获取关键词的第二个字符,进一步选择对应的子节点进行检索;
  4. 以此类推,进行迭代过程;
  5. 在某个节点处,关键词的所有字母已被取出,则读取附在该节点上的信息,查找完成。

img

每次查询时,如果要查询的字符串长度是 k,那我们只需要比对大约 k 个节点,就能完成查询操作。跟原本那组字符串的长度和个数没有任何关系。所以说,构建好 Trie 树后,在其中查找字符串的时间复杂度是 O(k),k 表示要查找的字符串的长度。

字典树的应用场景

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

第一,字符串中包含的字符集不能太大。我们前面讲到,如果字符集太大,那存储空间可能就会浪费很多。即便可以优化,但也要付出牺牲查询、插入效率的代价。

第二,要求字符串的前缀重合比较多,不然空间消耗会变大很多。

第三,如果要用 Trie 树解决问题,那我们就要自己从零开始实现一个 Trie 树,还要保证没有 bug,这个在工程上是将简单问题复杂化,除非必须,一般不建议这样做。

第四,我们知道,通过指针串起来的数据块是不连续的,而 Trie 树中用到了指针,所以,对缓存并不友好,性能上会打个折扣。

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

在一组字符串中查找字符串,Trie 树实际上表现得并不好。它对要处理的字符串有及其严苛的要求。

(1)自动补全

img

(2)拼写检查

img

(3)IP 路由 (最长前缀匹配)

img

图 3. 使用 Trie 树的最长前缀匹配算法,Internet 协议(IP)路由中利用转发表选择路径。

(4)T9 (九宫格) 打字预测

img

(5)单词游戏

img

Trie 树可通过剪枝搜索空间来高效解决 Boggle 单词游戏

参考资料

《检索技术核心 20 讲》笔记

伸缩性架构是指不需要改变系统的软硬件设计,仅通过改变部署服务器数量就可以扩大或缩小系统的服务处理能力。

线性结构检索

检索的核心思想:合理组织数据,尽可能快速减少查询范围,可以提升检索效率。

数组和链表的比较

  • 存储方式
    • 数组用 连续 的内存空间来存储数据。
    • 链表用 不连续 的内存空间来存储数据;并通过一个指针按顺序将这些空间串起来,形成一条链。
  • 访问方式
    • 数组支持随机访问。根据下标随机访问的时间复杂度为 O(1)
    • 链表不支持随机访问,只能顺序访问。
  • 空间大小
    • 数组空间大小固定,扩容只能采用复制数组的方式。
    • 链表空间大小不固定,扩容灵活。
  • 效率比较
    • 数组的 查找 效率高于链表。
    • 链表的 添加删除 效率高于数组。

非线性结构检索

  • 对于无序数组,只能顺序查找,其时间复杂度为 O(n)
  • 对于有序数组,可以应用二分查找法,其时间复杂度为 O(log n)

显然,二分查找法很高效,但是它有限制条件:数据有序。为了保证数据有序,添加、删除数组数据时,必须要进行数据调整,来保证其有序。

首先,对于数据频繁变化的应用场景,有序数组并不是最适合的解决方案。我们一般要考虑采用非连续存储的数据结构来灵活调整。同时,为了提高检索效率,我们还要采取合理的组织方式,让这些非连续存储的数据结构能够使用二分查找算法。

数据组织的方式有两种,一种是二叉检索树。一个平衡的二叉检索树使用二分查找的检索效率是 O(log n),但如果我们不做额外的平衡控制的话,二叉检索树的检索性能最差会退化到 O(n),也就和单链表一样了。所以,AVL 树和红黑树这样平衡性更强的二叉检索树,在实际工作中应用更多。

除了树结构以外,另一种数据组织方式是跳表。跳表也具备二分查找的能力,理想跳表的检索效率是 O(log n)。为了保证跳表的检索空间平衡,跳表为每个节点随机生成层级,这样的实现方式比 AVL 树和红黑树更简单。

无论是二叉检索树还是跳表,它们都是通过将数据进行合理组织,然后尽可能地平衡划分检索空间,使得我们能采用二分查找的思路快速地缩减查找范围,达到 O(log n) 的检索效率。

哈希检索

散列表的思路是:使用 Hash 函数将 Key 转换为数组下标。

哈希表的本质是一个数组,它通过 Hash 函数将查询的 Key 转为数组下标,利用数组的随机访问特性,使得我们能在 O(1) 的时间代价内完成检索。

尽管哈希检索没有使用二分查找,但无论是设计理想的哈希函数,还是保证哈希表有足够的空闲位置,包括解决冲突的“二次探查”和“双散列”方案,本质上都是希望数据插入哈希表的时候,分布能均衡,这样检索才能更高效。从这个角度来看,其实哈希检索提高检索效率的原理,和二叉检索树需要平衡左右子树深度的原理是一样的,也就是说,高效的检索需要均匀划分检索空间。

状态检索

在海量数据中,快速判断一个对象是否存在。相比于有序数组、二叉检索树和哈希表这三种方案,位图和布隆过滤器其实更适合解决这类状态检索的问题。这是因为,在不要求 100% 判断正确的情况下,使用位图和布隆过滤器可以达到 O(1) 时间代价的检索效率,同时空间使用率也非常高效。

为了判断一个很大的数据范围中,某数值是否存在,可以将这个范围的数据存为数组,其数组值为布尔型(true 或 false)。由于很多语言中,布尔类型需要 1 个字节,而二进制位(bit)的值 0 或 1 也可以表示 true 或 false,并且占用空间更小,所以更加合适。而这种基于位运算的哈希结构,即为位图。

布隆过滤器最大的特点,就是对一个对象使用多个哈希函数。如果我们使用了 k 个哈希函数,就会得到 k 个哈希值,也就是 k 个下标,我们会把数组中对应下标位置的值都置为 1。布隆过滤器和位图最大的区别就在于,我们不再使用一位来表示一个对象,而是使用 k 位来表示一个对象。这样两个对象的 k 位都相同的概率就会大大降低,从而能够解决哈希冲突的问题了。

布隆过滤器的误判有一个特点,那就是,它只会对存在的情况有误判。如果某个数字经过布隆过滤器判断不存在,那说明这个数字真的不存在,不会发生误判;如果某个数字经过布隆过滤器判断存在,这个时候才会有可能误判,有可能并不存在。不过,只要我们调整哈希函数的个数、位图大小跟要存储数字的个数之间的比例,那就可以将这种误判的概率降到非常低。

布隆过滤器过滤器适用于对误判有一定容忍度的场景。

倒排索引

倒排索引的核心其实并不复杂,它的具体实现其实是哈希表,只是它不是将文档 ID 或者题目作为 key,而是反过来,通过将内容或者属性作为 key 来存储对应的文档列表,使得我们能在 O(1) 的时间代价内完成查询。

尽管原理并不复杂,但是倒排索引是许多检索引擎的核心。比如说,数据库的全文索引功能、搜索引擎的索引、广告引擎和推荐引擎,都使用了倒排索引技术来实现检索功能。

B+ 树检索

内存是半导体元件。对于内存而言,只要给出了内存地址,我们就可以直接访问该地址取出数据。这个过程具有高效的随机访问特性,因此内存也叫随机访问存储器(Random Access Memory,即 RAM)。内存的访问速度很快,但是价格相对较昂贵,因此一般的计算机内存空间都相对较小。

而磁盘是机械器件。磁盘访问数据时,需要等磁盘盘片旋转到磁头下,才能读取相应的数据。尽管磁盘的旋转速度很快,但是和内存的随机访问相比,性能差距非常大。一般来说,如果是随机读写,会有 10 万到 100 万倍左右的差距。但如果是顺序访问大批量数据的话,磁盘的性能和内存就是一个数量级的。

磁盘的最小读写单位是扇区,较早期的磁盘一个扇区是 512 字节。随着磁盘技术的发展,目前常见的磁盘扇区是 4K 个字节。操作系统一次会读写多个扇区,所以操作系统的最小读写单位是块(Block),也叫作簇(Cluster)。当我们要从磁盘中读取一个数据时,操作系统会一次性将整个块都读出来。因此,对于大批量的顺序读写来说,磁盘的效率会比随机读写高许多。

假设有一个有序数组存储在硬盘中,如果它足够大,那么它会存储在多个块中。当我们要对这个数组使用二分查找时,需要先找到中间元素所在的块,将这个块从磁盘中读到内存里,然后在内存中进行二分查找。如果下一步要读的元素在其他块中,则需要再将相应块从磁盘中读入内存。直到查询结束,这个过程可能会多次访问磁盘。我们可以看到,这样的检索性能非常低。

由于磁盘相对于内存而言访问速度实在太慢,因此,对于磁盘上数据的高效检索,我们有一个极其重要的原则:对磁盘的访问次数要尽可能的少!

将索引和数据分离就是一种常见的设计思路。在数据频繁变化的场景中,有序数组并不是一个最好的选择,二叉检索树或者哈希表往往更有普适性。但是,哈希表由于缺乏范围检索的能力,在一些场合也不适用。因此,二叉检索树这种树形结构是许多常见检索系统的实施方案。

随着索引数据越来越大,直到无法完全加载到内存中,这是需要将索引数据也存入磁盘中。B+ 树给出了将树形索引的所有节点都存在磁盘上的高效检索方案。操作系统对磁盘数据的访问是以块为单位的。因此,如果我们想将树型索引的一个节点从磁盘中读出,即使该节点的数据量很小(比如说只有几个字节),但磁盘依然会将整个块的数据全部读出来,而不是只读这一小部分数据,这会让有效读取效率很低。B+ 树的一个关键设计,就是让一个节点的大小等于一个块的大小。节点内存储的数据,不是一个元素,而是一个可以装 m 个元素的有序数组。这样一来,我们就可以将磁盘一次读取的数据全部利用起来,使得读取效率最大化。

B+ 树还有另一个设计,就是将所有的节点分为内部节点和叶子节点。内部节点仅存储 key 和维持树形结构的指针,并不存储 key 对应的数据(无论是具体数据还是文件位置信息)。这样内部节点就能存储更多的索引数据,我们也就可以使用最少的内部节点,将所有数据组织起来了。而叶子节点仅存储 key 和对应数据,不存储维持树形结构的指针。通过这样的设计,B+ 树就能做到节点的空间利用率最大化。此外,B+ 树还将同一层的所有节点串成了有序的双向链表,这样一来,B+ 树就同时具备了良好的范围查询能力和灵活调整的能力了。

因此,B+ 树是一棵完全平衡的 m 阶多叉树。所谓的 m 阶,指的是每个节点最多有 m 个子节点,并且每个节点里都存了一个紧凑的可包含 m 个元素的数组。

即使是复杂的 B+ 树,我们将它拆解开来,其实也是由简单的数组、链表和树组成的,而且 B+ 树的检索过程其实也是二分查找。因此,如果 B+ 树完全加载在内存中的话,它的检索效率其实并不会比有序数组或者二叉检索树更
高,也还是二分查找的 log(n) 的效率。并且,它还比数组和二叉检索树更加复杂,还会带来额外的开销。

另外,这一节还有一个很重要的设计思想需要你掌握,那就是将索引和数据分离。通过这样的方式,我们能将索引的数组大小保持在一个较小的范围内,让它能加载在内存中。在许多大规模系统中,都是使用这个设计思想来精简索引的。而且,B+ 树的内部节点和叶子节点的区分,其实也是索引和数据分离的一次实践。

MySQL 中的 B+ 树实现其实有两种,一种是 MyISAM 引擎,另一种是 InnoDB 引擎。它们的核心区别就在于,数据和索引是否是分离的。

在 MyISAM 引擎中,B+ 树的叶子节点仅存储了数据的位置指针,这是一种索引和数据分离的设计方案,叫作非聚集索引。如果要保证 MyISAM 的数据一致性,那我们需要在表级别上进行加锁处理。

在 InnoDB 中,B+ 树的叶子节点直接存储了具体数据,这是一种索引和数据一体的方案。叫作聚集索引。由于数据直接就存在索引的叶子节点中,因此 InnoDB 不需要给全表加锁来保证一致性,它只需要支持行级的锁就可以了。

LSM 树检索

B+ 树的数据都存储在叶子节点中,而叶子节点一般都存储在磁盘中。因此,每次插入的新数据都需要随机写入磁盘,而随机写入的性能非常慢。如果是一个日志系统,每秒钟要写入上千条甚至上万条数据,这样的磁盘操作代价会使得系统性能急剧下降,甚至无法使用。

操作系统对磁盘的读写是以块为单位的,我们能否以块为单位写入,而不是每次插入一个数据都要随机写入磁盘呢?这样是不是就可以大幅度减少写入操作了呢?解决方案就是:LSM 树(Log Structured Merge Trees)。

LSM 树就是根据这个思路设计了这样一个机制:当数据写入时,延迟写磁盘,将数据先存放在内存中的树里,进行常规的存储和查询。当内存中的树持续变大达到阈值时,再批量地以块为单位写入磁盘的树中。因此,LSM 树至少需要由两棵树组成,一棵是存储在内存中较小的 C0 树,另一棵是存储在磁盘中较大的 C1 树。

LSM 树具有以下 3 个特点:

  1. 将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并(Merge Trees);
  2. 用批量写入代替随机写入,并且用预写日志 WAL 技术(Write AheadLog,预写日志技术)保证内存数据,在系统崩溃后可以被恢复;
  3. 数据采取类似日志追加写的方式写入(Log Structured)磁盘,以顺序写的方式提高写
    入效率。

LSM 树的这些特点,使得它相对于 B+ 树,在写入性能上有大幅提升。所以,许多 NoSQL 系统都使用 LSM 树作为检索引擎,而且还对 LSM 树进行了优化以提升检索性能。

索引构建

  • 数据压缩:一个是尽可能地将数据加载到内存中,因为内存的检索效率大大高于磁盘。那为了将数据更多地加载到内存中,索引压缩是一个重要的研究方向。
  • 分支处理:另一个是将大数据集合拆成多个小数据集合来处理。这其实就是分布式系统的核心思想。

索引更新

Double Buffer(双缓冲)机制

就是在内存中同时保存两份一样的索引,一个是索引 A,一个是索引 B。两个索引保持一个读、一个写,并且来回切换,最终完成高性能的索引更新。

优点:简单高效

缺点:达到一定数据量级后,会带来翻倍的内存开销,甚至有些索引存储在磁盘上的情况下,更是无法使用此机制。

全量索引和增量索引

将新接收到的数据单独建立一个可以存在内存中的倒排索引,也就是增量索引。当查询发生的时候,我们会同时查询全量索引和增量索引,将合并的结果作为总的结果输出。

因为增量索引相对全量索引而言会小很多,内存资源消耗在可承受范围,所以我们可以使用 Double Buffer 机制
对增量索引进行索引更新。这样一来,增量索引就可以做到无锁访问。而全量索引本身就是只读的,也不需要加锁。因此,整个检索过程都可以做到无锁访问,也就提高了系统的检索效率。

如何处理增量索引空间的持续增长

完全重建法

如果增量索引的增长速度不算很快,或者全量索引重建的代价不大,那么我们完全可以在增量索引写满内存空间之前,完全重建一次全量索引,然后将系统查询切换到新的全量索引上。

再合并法

直接归并全量索引和增量索引,生成一个新的全量索引,这也就避免了从头处理所有文档的重复开销。

滚动合并法

先生成多个不同层级的索引,然后逐层合并。

比如说,一个检索系统在磁盘中保存了全量索引、周级索引和天级索引。所谓周级索引,就
是根据本周的新数据生成的一份索引,那天级索引就是根据每天的新数据生成的一份索引。
在滚动合并法中,当内存中的增量索引增长到一定体量时,我们会用再合并法将它合并到磁
盘上当天的天级索引文件中。

img

索引拆分

水平拆分和垂直拆分

TOP K 检索

TF-IDF 算法

TF-IDF 算法的公式是:相关性 = TF*IDF。其中,TF 是词频(Term Frequency),IDF 是逆文档频率(Inverse Document Frequency)。

  • 词频定义的就是一个词项在文档中出现的次数。换一句话说就是,如果一个词项出现了越多次,那这个词在文档中就越重要。
  • 文档频率(Document Frequency),指的是这个词项出现在了多少个文档中。你也可以理解为,如果一个词出现在越多的文档中,那这个词就越普遍,越没有区分度。一个极端的例子,比如“的”字,它基本上在每个文档中都会出现,所以它的区分度就非常低。
  • 逆文档频率是对文档频率取倒数,它的值越大,这个词的的区分度就越大。

BM25 算法

BM25 算法的一个重要的设计思想是,它认为词频和相关性的关系并不是线性的。也就是说,随着词频的增加,相关性的增加会越来越不明显,并且还会有一个阈值上限。当词频达到阈值以后,那相关性就不会再增长了。

总结来说,BM25 算法就是一个对查询词和文档的相关性进行打分的概率模型算法。BM25 算法考虑了四个因子,分别为 IDF、文档长度、文档中的词频以及查询词中的词频。并且,公式中还加入了 3 个可以人工调整大小的参数,分别是 :k1、k2 和 b。

机器学习打分

机器学习可以更大规模地引入更多的打分因子,并且可以自动学习出各个打分因子的权重。所以,利用机器学习进行相关性打分,已经成了目前大规模检索引擎的标配。

根据打分结果快速 TOP K 检索

完成打分阶段之后,排序阶段我们要重视排序的效率。对于精准 Top K 检索,我们可以使用堆排序来代替全排序,只返回我们认为最重要的 k 个结果。这样,时间代价就是 O(n) + O(k log n) ,在数据量级非常大的情况下,它比 O(n log n) 的检索性能会高得多。

非精准 TOP K 检索

高质量的检索结果并不一定要非常精准,我们只需要保证质量足够高的结果,被包含在最终的 Top K 个结果中就够了。这就是非精准 Top K 检索的思路。

空间检索

通过将二维空间在水平和垂直方向上不停二分,可以生成一维的区域编码,然后我们可以使用一维空间的检索技术对区域编码做好索引。

在需要动态调整查询范围的场景下,对于二进制编码的二维空间的最近邻检索问题,我们可以通过四叉树来完成。四叉树可以很好地快速划分查询空间,并通过递归的方式高效地扩大查询范围。但是满四叉树经常会造成无谓的空间浪费,为了避免这个问题,在实际应用的时候,我们会选择使用非满四叉树来存储和索引编码。对于 GeoHash 编码的二维空间最近邻检索问题,我们也能通过类似的前缀树来提高检索效率。

最近邻检索

如何计算两篇文章的相似性

最常见的方式就是使用向量空间模型(Vector Space Model)。所谓向量空间模型,就是将所有文档中出现过的所有关键词都提取出来。如果一共有 n 个关键词,那每个关键词就是一个维度,这就组成了一个 n 维的向量空间。

存储系统

LevelDB 是由 Google 开源的存储系统。

LevelDB 是基于 LSM 树优化而来的存储系统。LSM 树会将索引分为内存和磁盘两部分,并在内存达到阈值时启动树合并。但是,这里面存在着大量的细节问题。

数据在内存中如何高效检索?

首先,对内存中索引的高效检索,我们可以用很多检索技术,如红黑树、跳表等,这些数据结构会比 B+ 树更高效。LevelDB 对于 LSM 树的第一个改进,就是使用跳表代替 B+ 树来实现内存中的 C0 树。

数据是如何高效地从内存转移到磁盘的?

LevelDB 做了读写分离的设计。它将内存中的数据分为两块,一块叫作 MemTable,它是可读可写的。另一块叫作 Immutable MemTable,它是只读的。这两块数据的数据结构完全一样,都是跳表。

当 MemTable 的存储数据达到上限时,我们直接将它切换为只读的 Immutable MemTable,然后重新生成一个新的 MemTable,来支持新数据的写入和查询。这时,将内存索引存储到磁盘的问题,就变成了将 Immutable MemTable 写入磁盘的问题。而且,由于 Immutable MemTable 是只读的,因此,它不需要加锁就可以高效
地写入磁盘中。

数据如何合并

在原始 LSM 树的设计中,内存索引写入磁盘时是直接和磁盘中的 C1 树进行归并的。但如果工程中也这么
实现的话,会有两个很严重的问题:

  • 合并代价很高,因为 C1 树很大,而 C0 树很小,这会导致它们在合并时产生大量的磁盘 IO;
  • 合并频率会很频繁,由于 C0 树很小,很容易被写满,因此系统会频繁进行 C0 树和 C1 树的合并,这样频繁合并会带来的大量磁盘 IO,这更是系统无法承受的。

LevelDB 采用了延迟合并的设计来优化。具体来说就是,先将 Immutable MemTable 顺序快速写入磁盘,直接变成一个个 SSTable(Sorted String Table)文件,之后再对这些 SSTable 文件进行合并。这样就避免了 C0 树和 C1 树昂贵的 合并代价。

而在管理多个 SSTable 文件的环节,LevelDB 使用分层和滚动合并的设计来组织多个 SSTable 文件,避免了 C0 树和 C1 树的合并带来的大量数据被复制的问题。

数据如何检索

先在 MemTable 中查找,如果查找不到再去 Immutable MemTable 中查找。如果 Immutable MemTable 也查询不到,才会到磁盘中去查找。

在磁盘中检索数据的环节,因为 SSTable 文件是有序的,所以我们通过多层二分查找的方式,就能快速定位到需要查询的 SSTable 文件。接着,在 SSTable 文件内查找元素时,LevelDB 先是使用索引与数据分离的设计,减少磁盘 IO,又使用 BloomFilter 和二分查找来完成检索加速。加速检索的过程中,LevelDB 又使用缓存技术,将会被反复读取的数据缓存在内存中,从而避免了磁盘开销。

搜索系统

搜索流程:

  • 先对查询内容分词,搜索引擎还会纠错和相似推荐,得到检索词
  • 根据检索词在倒排索引中进行短语检索。然后,根据相关性打分,将得分高的结果保留。

广告系统

广告引擎处理一个广告请求的过程,本质上就是根据用户的广告请求信息,找出标签匹配的广告设置,并将广告进行排序返回的过程。

  • 在标签检索引擎中,我们通过合理地将标签使用在树形检索 + 倒排索引 + 结果过滤这三个环节,来提高检索效率。
  • 在向量检索引擎中,我们可以使用聚类 + 倒排索引 + 乘积量化的技术来加速检索。
  • 在打分排序环节,增加一个非精准打分环节,这样我们就可以大幅降低使用深度学习模型带来的开销。
  • 在索引构建环节,我们还可以将一些过滤条件前置,仅将当前有效的广告设置加入索引,然后通过全量索引 + 增量索引的更新方式,来保证过滤逻辑的有效。

推荐引擎

相比于搜索引擎和广告引擎,推荐引擎具有更灵活的检索能力,也就是可以使用更灵活的检索技术,来进行文章的召回服务。

参考资料

Elasticsearch 集群和分片

集群

空集群

如果我们启动了一个单独的节点,里面不包含任何的数据和索引,那我们的集群看起来就是一个包含空内容节点的集群。

Figure 1. 包含空内容节点的集群

包含空内容节点的集群

图 1:只有一个空节点的集群

一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。

当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。我们的示例集群就只有一个节点,所以它同时也成为了主节点。

作为用户,我们可以将请求发送到集群中的任何节点,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。

集群健康

Elasticsearch 的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是 集群健康 , 它在 status 字段中展示为 greenyellow 或者 red

1
GET /_cluster/health

在一个不包含任何索引的空集群中,它将会有一个类似于如下所示的返回内容:

1
2
3
4
5
6
7
8
9
10
11
12
{
"cluster_name": "elasticsearch",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 0,
"active_shards": 0,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
}

status 字段指示着当前集群在总体上是否工作正常。它的三种颜色含义如下:

  • **green**:所有的主分片和副本分片都正常运行。
  • **yellow**:所有的主分片都正常运行,但不是所有的副本分片都正常运行。
  • **red**:有主分片没能正常运行。

添加索引

我们往 Elasticsearch 添加数据时需要用到 索引 —— 保存相关数据的地方。索引实际上是指向一个或者多个物理分片的逻辑命名空间 。

一个 分片 是一个底层的 工作单元 ,它仅保存了全部数据中的一部分。现在我们只需知道一个分片是一个 Lucene 的实例,以及它本身就是一个完整的搜索引擎。 我们的文档被存储和索引到分片内,但是应用程序是直接与索引而不是与分片进行交互。

Elasticsearch 是利用分片将数据分发到集群内各处的。分片是数据的容器,文档保存在分片内,分片又被分配到集群内的各个节点里。 当你的集群规模扩大或者缩小时, Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里。

一个分片可以是 分片或者 副本 分片。 索引内任意一个文档都归属于一个主分片,所以主分片的数目决定着索引能够保存的最大数据量。

技术上来说,一个主分片最大能够存储 Integer.MAX_VALUE - 128 个文档,但是实际最大值还需要参考你的使用场景:包括你使用的硬件, 文档的大小和复杂程度,索引和查询文档的方式以及你期望的响应时长。

一个副本分片只是一个主分片的拷贝。副本分片作为硬件故障时保护数据不丢失的冗余备份,并为搜索和返回文档等读操作提供服务。

在索引建立的时候就已经确定了主分片数,但是副本分片数可以随时修改。

让我们在包含一个空节点的集群内创建名为 blogs 的索引。 索引在默认情况下会被分配 5 个主分片, 但是为了演示目的,我们将分配 3 个主分片和一份副本(每个主分片拥有一个副本分片):

1
2
3
4
5
6
7
PUT /blogs
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

我们的集群现在是 _拥有一个索引的单节点集群_。所有 3 个主分片都被分配在 Node 1

Figure 2. 拥有一个索引的单节点集群

拥有一个索引的单节点集群

如果我们现在查看集群健康,我们将看到如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name": "elasticsearch",
"status": "yellow",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 3,
"active_shards": 3,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 3,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50
}
  • 集群 status 值为 yellow
  • 没有被分配到任何节点的副本数

集群的健康状况为 yellow 则表示全部 分片都正常运行(集群可以正常服务所有请求),但是 副本 分片没有全部处在正常状态。 实际上,所有 3 个副本分片都是 unassigned —— 它们都没有被分配到任何节点。 在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点上的所有副本数据。

当前我们的集群是正常运行的,但是在硬件故障时有丢失数据的风险。

添加故障转移

当集群中只有一个节点在运行时,意味着会有一个单点故障问题——没有冗余。 幸运的是,我们只需再启动一个节点即可防止数据丢失。

为了测试第二个节点启动后的情况,你可以在同一个目录内,完全依照启动第一个节点的方式来启动一个新节点(参考安装并运行 Elasticsearch)。多个节点可以共享同一个目录。

当你在同一台机器上启动了第二个节点时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。 但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表。

如果启动了第二个节点,我们的集群将会拥有两个节点的集群——所有主分片和副本分片都已被分配。

Figure 3. 拥有两个节点的集群——所有主分片和副本分片都已被分配

拥有两个节点的集群

当第二个节点加入到集群后,3 个 副本分片 将会分配到这个节点上——每个主分片对应一个副本分片。 这意味着当集群内任何一个节点出现问题时,我们的数据都完好无损。

所有新近被索引的文档都将会保存在主分片上,然后被并行的复制到对应的副本分片上。这就保证了我们既可以从主分片又可以从副本分片上获得文档。

cluster-health 现在展示的状态为 green ,这表示所有 6 个分片(包括 3 个主分片和 3 个副本分片)都在正常运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name": "elasticsearch",
"status": "green",
"timed_out": false,
"number_of_nodes": 2,
"number_of_data_nodes": 2,
"active_primary_shards": 3,
"active_shards": 6,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100
}
  • 集群 status 值为 green

我们的集群现在不仅仅是正常运行的,并且还处于 始终可用 的状态。

水平扩容

怎样为我们的正在增长中的应用程序按需扩容呢? 当启动了第三个节点,我们的集群将拥有三个节点的集群——为了分散负载而对分片进行重新分配。

Figure 4. 拥有三个节点的集群——为了分散负载而对分片进行重新分配

拥有三个节点的集群

Node 1Node 2 上各有一个分片被迁移到了新的 Node 3 节点,现在每个节点上都拥有 2 个分片,而不是之前的 3 个。 这表示每个节点的硬件资源(CPU, RAM, I/O)将被更少的分片所共享,每个分片的性能将会得到提升。

分片是一个功能完整的搜索引擎,它拥有使用一个节点上的所有资源的能力。 我们这个拥有 6 个分片(3 个主分片和 3 个副本分片)的索引可以最大扩容到 6 个节点,每个节点上存在一个分片,并且每个分片拥有所在节点的全部资源。

更多的扩容

但是如果我们想要扩容超过 6 个节点怎么办呢?

主分片的数目在索引创建时就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。(实际大小取决于你的数据、硬件和使用场景。) 但是,读操作——搜索和返回数据——可以同时被主分片 副本分片所处理,所以当你拥有越多的副本分片时,也将拥有越高的吞吐量。

在运行中的集群上是可以动态调整副本分片数目的,我们可以按需伸缩集群。让我们把副本数从默认的 1 增加到 2

1
2
3
4
PUT /blogs/_settings
{
"number_of_replicas" : 2
}

blogs 索引现在拥有 9 个分片:3 个主分片和 6 个副本分片。 这意味着我们可以将集群扩容到 9 个节点,每个节点上一个分片。相比原来 3 个节点时,集群搜索性能可以提升 3 倍。

Figure 5. 将参数 number_of_replicas 调大到 2

拥有2份副本分片3个节点的集群

当然,如果只是在相同节点数目的集群上增加更多的副本分片并不能提高性能,因为每个分片从节点上获得的资源会变少。 你需要增加更多的硬件资源来提升吞吐量。

但是更多的副本分片数提高了数据冗余量:按照上面的节点配置,我们可以在失去 2 个节点的情况下不丢失任何数据。

应对故障

我们之前说过 Elasticsearch 可以应对节点故障,接下来让我们尝试下这个功能。 如果我们关闭第一个节点,这时集群的状态为关闭了一个节点后的集群。

Figure 6. 关闭了一个节点后的集群

关闭了一个节点后的集群

我们关闭的节点是一个主节点。而集群必须拥有一个主节点来保证正常工作,所以发生的第一件事情就是选举一个新的主节点: Node 2

在我们关闭 Node 1 的同时也失去了主分片 12 ,并且在缺失主分片的时候索引也不能正常工作。 如果此时来检查集群的状况,我们看到的状态将会为 red :不是所有主分片都在正常工作。

幸运的是,在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这些分片在 Node 2Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow 。 这个提升主分片的过程是瞬间发生的,如同按下一个开关一般。

为什么我们集群状态是 yellow 而不是 green 呢? 虽然我们拥有所有的三个主分片,但是同时设置了每个主分片需要对应 2 份副本分片,而此时只存在一份副本分片。 所以集群不能为 green 的状态,不过我们不必过于担心:如果我们同样关闭了 Node 2 ,我们的程序 依然 可以保持在不丢任何数据的情况下运行,因为 Node 3 为每一个分片都保留着一份副本。

如果我们重新启动 Node 1 ,集群可以将缺失的副本分片再次进行分配,那么集群的状态也将如 Figure 5. 将参数 number_of_replicas 调大到 2 所示。 如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们,同时仅从主分片复制发生了修改的数据文件。

到目前为止,你应该对分片如何使得 Elasticsearch 进行水平扩容以及数据保障等知识有了一定了解。 接下来我们将讲述关于分片生命周期的更多细节。

分片

  • 为什么搜索是 实时的?
  • 为什么文档的 CRUD (创建-读取-更新-删除) 操作是 实时 的?
  • Elasticsearch 是怎样保证更新被持久化在断电时也不丢失数据?
  • 为什么删除文档不会立刻释放空间?
  • refresh, flush, 和 optimize API 都做了什么, 你什么情况下应该使用他们?

使文本可被搜索

必须解决的第一个挑战是如何使文本可被搜索。 传统的数据库每个字段存储单个值,但这对全文检索并不够。文本字段中的每个单词需要被搜索,对数据库意味着需要单个字段有索引多值(这里指单词)的能力。

最好的支持 一个字段多个值 需求的数据结构是我们在 倒排索引 章节中介绍过的 倒排索引 。 倒排索引包含一个有序列表,列表包含所有文档出现过的不重复个体,或称为 词项 ,对于每一个词项,包含了它所有曾出现过文档的列表。

1
2
3
4
5
6
Term  | Doc 1 | Doc 2 | Doc 3 | ...
------------------------------------
brown | X | | X | ...
fox | X | X | X | ...
quick | X | X | | ...
the | X | | X | ...

当讨论倒排索引时,我们会谈到 文档 标引,因为历史原因,倒排索引被用来对整个非结构化文本文档进行标引。 Elasticsearch 中的 文档 是有字段和值的结构化 JSON 文档。事实上,在 JSON 文档中, 每个被索引的字段都有自己的倒排索引。

这个倒排索引相比特定词项出现过的文档列表,会包含更多其它信息。它会保存每一个词项出现过的文档总数, 在对应的文档中一个具体词项出现的总次数,词项在文档中的顺序,每个文档的长度,所有文档的平均长度,等等。这些统计信息允许 Elasticsearch 决定哪些词比其它词更重要,哪些文档比其它文档更重要,这些内容在 什么是相关性? 中有描述。

为了能够实现预期功能,倒排索引需要知道集合中的 所有 文档,这是需要认识到的关键问题。

早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。

不变性

倒排索引被写入磁盘后是 不可改变 的:它永远不会修改。 不变性有重要的价值:

  • 不需要锁。如果你从来不更新索引,你就不需要担心多进程同时修改数据的问题。
  • 一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性。只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。
  • 其它缓存(像 filter 缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
  • 写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。

当然,一个不变的索引也有不好的地方。主要事实是它是不可变的! 你不能修改它。如果你需要让一个新的文档 可被搜索,你需要重建整个索引。这要么对一个索引所能包含的数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。

动态更新索引

下一个需要被解决的问题是怎样在保留不变性的前提下实现倒排索引的更新?答案是: 用更多的索引。

通过增加新的补充索引来反映新近的修改,而不是直接重写整个倒排索引。每一个倒排索引都会被轮流查询到—从最早的开始—查询完后再对结果进行合并。

Elasticsearch 基于 Lucene, 这个 java 库引入了 按段搜索 的概念。 每一 段 本身都是一个倒排索引, 但 索引 在 Lucene 中除表示所有 段 的集合外, 还增加了 提交点 的概念 — 一个列出了所有已知段的文件,就像在 Figure 16, “一个 Lucene 索引包含一个提交点和三个段” 中描绘的那样。 如 Figure 17, “一个在内存缓存中包含新文档的 Lucene 索引” 所示,新的文档首先被添加到内存索引缓存中,然后写入到一个基于磁盘的段,如 Figure 18, “在一次提交后,一个新的段被添加到提交点而且缓存被清空。” 所示。

Figure 16. 一个 Lucene 索引包含一个提交点和三个段

A Lucene index with a commit point and three segments

被混淆的概念是,一个 Lucene 索引 我们在 Elasticsearch 称作 分片 。 一个 Elasticsearch 索引 是分片的集合。 当 Elasticsearch 在索引中搜索的时候, 他发送查询到每一个属于索引的分片(Lucene 索引),然后像 执行分布式检索 提到的那样,合并每个分片的结果到一个全局的结果集。

逐段搜索会以如下流程进行工作:

  1. 新文档被收集到内存索引缓存, 见 Figure 17, “一个在内存缓存中包含新文档的 Lucene 索引” 。
  2. 不时地, 缓存被 提交
    • 一个新的段—一个追加的倒排索引—被写入磁盘。
    • 一个新的包含新段名字的 提交点 被写入磁盘。
    • 磁盘进行 同步 — 所有在文件系统缓存中等待的写入都刷新到磁盘,以确保它们被写入物理文件。
  3. 新的段被开启,让它包含的文档可见以被搜索。
  4. 内存缓存被清空,等待接收新的文档。

Figure 17. 一个在内存缓存中包含新文档的 Lucene 索引

A Lucene index with new documents in the in-memory buffer, ready to commit

Figure 18. 在一次提交后,一个新的段被添加到提交点而且缓存被清空。

After a commit, a new segment is added to the index and the buffer is cleared

当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添加到索引。

删除和更新

段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档的段信息。

当一个文档被 “删除” 时,它实际上只是在 .del 文件中被 标记 删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。

文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。

段合并 , 我们展示了一个被删除的文档是怎样被文件系统移除的。

近实时搜索

随着按段(per-segment)搜索的发展,一个新的文档从索引到可被搜索的延迟显著降低了。新文档在几分钟之内即可被检索,但这样还是不够快。

磁盘在这里成为了瓶颈。提交(Commiting)一个新的段到磁盘需要一个 fsync 来确保段被物理性地写入磁盘,这样在断电的时候就不会丢失数据。 但是 fsync 操作代价很大; 如果每次索引一个文档都去执行一次的话会造成很大的性能问题。

我们需要的是一个更轻量的方式来使一个文档可被搜索,这意味着 fsync 要从整个过程中被移除。

在 Elasticsearch 和磁盘之间是文件系统缓存。 像之前描述的一样, 在内存索引缓冲区( Figure 19, “在内存缓冲区中包含了新文档的 Lucene 索引” )中的文档会被写入到一个新的段中( Figure 20, “缓冲区的内容已经被写入一个可被搜索的段中,但还没有进行提交” )。 但是这里新段会被先写入到文件系统缓存—这一步代价会比较低,稍后再被刷新到磁盘—这一步代价比较高。不过只要文件已经在缓存中, 就可以像其它文件一样被打开和读取了。

Figure 19. 在内存缓冲区中包含了新文档的 Lucene 索引

A Lucene index with new documents in the in-memory buffer

Lucene 允许新段被写入和打开—使其包含的文档在未进行一次完整提交时便对搜索可见。 这种方式比进行一次提交代价要小得多,并且在不影响性能的前提下可以被频繁地执行。

Figure 20. 缓冲区的内容已经被写入一个可被搜索的段中,但还没有进行提交

The buffer contents have been written to a segment, which is searchable, but is not yet commited

refresh API

在 Elasticsearch 中,写入和打开一个新段的轻量的过程叫做 refresh 。 默认情况下每个分片会每秒自动刷新一次。这就是为什么我们说 Elasticsearch 是 实时搜索: 文档的变化并不是立即对搜索可见,但会在一秒之内变为可见。

这些行为可能会对新用户造成困惑: 他们索引了一个文档然后尝试搜索它,但却没有搜到。这个问题的解决办法是用 refresh API 执行一次手动刷新:

1
2
POST /_refresh
POST /blogs/_refresh

刷新(Refresh)所有的索引

只刷新(Refresh) blogs 索引

尽管刷新是比提交轻量很多的操作,它还是会有性能开销。当写测试的时候, 手动刷新很有用,但是不要在生产环境下每次索引一个文档都去手动刷新。 相反,你的应用需要意识到 Elasticsearch 的近实时的性质,并接受它的不足。

并不是所有的情况都需要每秒刷新。可能你正在使用 Elasticsearch 索引大量的日志文件, 你可能想优化索引速度而不是近实时搜索, 可以通过设置 refresh_interval , 降低每个索引的刷新频率:

1
2
3
4
5
6
PUT /my_logs
{
"settings": {
"refresh_interval": "30s"
}
}

每 30 秒刷新 my_logs 索引。

refresh_interval 可以在既存索引上进行动态更新。 在生产环境中,当你正在建立一个大的新索引时,可以先关闭自动刷新,待开始使用该索引时,再把它们调回来:

1
2
3
4
5
PUT /my_logs/_settings
{ "refresh_interval": -1 }

PUT /my_logs/_settings
{ "refresh_interval": "1s" }
  • 关闭自动刷新。

  • 每秒自动刷新。

refresh_interval 需要一个 持续时间 值, 例如 1s (1 秒) 或 2m (2 分钟)。 一个绝对值 1 表示的是 1 毫秒 –无疑会使你的集群陷入瘫痪。

持久化变更

如果没有用 fsync 把数据从文件系统缓存刷(flush)到硬盘,我们不能保证数据在断电甚至是程序正常退出之后依然存在。为了保证 Elasticsearch 的可靠性,需要确保数据变化被持久化到磁盘。

动态更新索引,我们说一次完整的提交会将段刷到磁盘,并写入一个包含所有段列表的提交点。Elasticsearch 在启动或重新打开一个索引的过程中使用这个提交点来判断哪些段隶属于当前分片。

即使通过每秒刷新(refresh)实现了近实时搜索,我们仍然需要经常进行完整提交来确保能从失败中恢复。但在两次提交之间发生变化的文档怎么办?我们也不希望丢失掉这些数据。

Elasticsearch 增加了一个 translog ,或者叫事务日志,在每一次对 Elasticsearch 进行操作时均进行了日志记录。通过 translog ,整个流程看起来是下面这样:

一个文档被索引之后,就会被添加到内存缓冲区,并且 追加到了 translog ,正如 Figure 21, “新的文档被添加到内存缓冲区并且被追加到了事务日志” 描述的一样。

Figure 21. 新的文档被添加到内存缓冲区并且被追加到了事务日志

New documents are added to the in-memory buffer and appended to the transaction log

刷新(refresh)使分片处于 Figure 22, “刷新(refresh)完成后, 缓存被清空但是事务日志不会” 描述的状态,分片每秒被刷新(refresh)一次:

  • 这些在内存缓冲区的文档被写入到一个新的段中,且没有进行 fsync 操作。
  • 这个段被打开,使其可被搜索。
  • 内存缓冲区被清空。

Figure 22. 刷新(refresh)完成后, 缓存被清空但是事务日志不会

After a refresh, the buffer is cleared but the transaction log is not

这个进程继续工作,更多的文档被添加到内存缓冲区和追加到事务日志(见 Figure 23, “事务日志不断积累文档” )。

Figure 23. 事务日志不断积累文档

The transaction log keeps accumulating documents

  1. 每隔一段时间—例如 translog 变得越来越大—索引被刷新(flush);一个新的 translog 被创建,并且一个全量提交被执行(见 Figure 24, “在刷新(flush)之后,段被全量提交,并且事务日志被清空” ):
    • 所有在内存缓冲区的文档都被写入一个新的段。
    • 缓冲区被清空。
    • 一个提交点被写入硬盘。
    • 文件系统缓存通过 fsync 被刷新(flush)。
    • 老的 translog 被删除。

translog 提供所有还没有被刷到磁盘的操作的一个持久化纪录。当 Elasticsearch 启动的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放 translog 中所有在最后一次提交后发生的变更操作。

translog 也被用来提供实时 CRUD 。当你试着通过 ID 查询、更新、删除一个文档,它会在尝试从相应的段中检索之前, 首先检查 translog 任何最近的变更。这意味着它总是能够实时地获取到文档的最新版本。

Figure 24. 在刷新(flush)之后,段被全量提交,并且事务日志被清空

After a flush, the segments are fully commited and the transaction log is cleared

flush API

这个执行一个提交并且截断 translog 的行为在 Elasticsearch 被称作一次 flush 。 分片每 30 分钟被自动刷新(flush),或者在 translog 太大的时候也会刷新。请查看 translog 文档 来设置,它可以用来 控制这些阈值:

flush API 可以被用来执行一个手工的刷新(flush):

1
2
POST /blogs/_flush
POST /_flush?wait_for_ongoing
  • 刷新(flush) blogs 索引。
  • 刷新(flush)所有的索引并且并且等待所有刷新在返回前完成。

你很少需要自己手动执行 flush 操作;通常情况下,自动刷新就足够了。

这就是说,在重启节点或关闭索引之前执行 flush 有益于你的索引。当 Elasticsearch 尝试恢复或重新打开一个索引, 它需要重放 translog 中所有的操作,所以如果日志越短,恢复越快。

translog 的目的是保证操作不会丢失。这引出了这个问题: Translog 有多安全?

在文件被 fsync 到磁盘前,被写入的文件在重启之后就会丢失。默认 translog 是每 5 秒被 fsync 刷新到硬盘, 或者在每次写请求完成之后执行(e.g. index, delete, update, bulk)。这个过程在主分片和复制分片都会发生。最终, 基本上,这意味着在整个请求被 fsync 到主分片和复制分片的 translog 之前,你的客户端不会得到一个 200 OK 响应。

在每次请求后都执行一个 fsync 会带来一些性能损失,尽管实践表明这种损失相对较小(特别是 bulk 导入,它在一次请求中平摊了大量文档的开销)。

但是对于一些大容量的偶尔丢失几秒数据问题也并不严重的集群,使用异步的 fsync 还是比较有益的。比如,写入的数据被缓存到内存中,再每 5 秒执行一次 fsync

这个行为可以通过设置 durability 参数为 async 来启用:

1
2
3
4
5
PUT /my_index/_settings
{
"index.translog.durability": "async",
"index.translog.sync_interval": "5s"
}

这个选项可以针对索引单独设置,并且可以动态进行修改。如果你决定使用异步 translog 的话,你需要 保证 在发生 crash 时,丢失掉 sync_interval 时间段的数据也无所谓。请在决定前知晓这个特性。

如果你不确定这个行为的后果,最好是使用默认的参数( "index.translog.durability": "request" )来避免数据丢失。

段合并

由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和 cpu 运行周期。更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。

Elasticsearch 通过在后台进行段合并来解决这个问题。小的段被合并到大的段,然后这些大的段再被合并到更大的段。

段合并的时候会将那些旧的已删除文档从文件系统中清除。被删除的文档(或被更新文档的旧版本)不会被拷贝到新的大段中。

启动段合并不需要你做任何事。进行索引和搜索时会自动进行。这个流程像在 Figure 25, “两个提交了的段和一个未提交的段正在被合并到一个更大的段” 中提到的一样工作:

1、 当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。

2、 合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。

Figure 25. 两个提交了的段和一个未提交的段正在被合并到一个更大的段

Two commited segments and one uncommited segment in the process of being merged into a bigger segment

Figure 26, “一旦合并结束,老的段被删除” 说明合并完成时的活动:

  • 新的段被刷新(flush)到了磁盘。 ** 写入一个包含新段且排除旧的和较小的段的新提交点。
  • 新的段被打开用来搜索。
  • 老的段被删除。

Figure 26. 一旦合并结束,老的段被删除

一旦合并结束,老的段被删除

合并大的段需要消耗大量的 I/O 和 CPU 资源,如果任其发展会影响搜索性能。Elasticsearch 在默认情况下会对合并流程进行资源限制,所以搜索仍然 有足够的资源很好地执行。

optimize API

optimize API 大可看做是 强制合并 API。它会将一个分片强制合并到 max_num_segments 参数指定大小的段数目。 这样做的意图是减少段的数量(通常减少到一个),来提升搜索性能。

optimize API 不应该 被用在一个活跃的索引————一个正积极更新的索引。后台合并流程已经可以很好地完成工作。 optimizing 会阻碍这个进程。不要干扰它!

在特定情况下,使用 optimize API 颇有益处。例如在日志这种用例下,每天、每周、每月的日志被存储在一个索引中。 老的索引实质上是只读的;它们也并不太可能会发生变化。

在这种情况下,使用 optimize 优化老的索引,将每一个分片合并为一个单独的段就很有用了;这样既可以节省资源,也可以使搜索更加快速:

1
POST /logstash-2014-10/_optimize?max_num_segments=1

合并索引中的每个分片为一个单独的段

请注意,使用 optimize API 触发段合并的操作不会受到任何资源上的限制。这可能会消耗掉你节点上全部的 I/O 资源, 使其没有余裕来处理搜索请求,从而有可能使集群失去响应。 如果你想要对索引执行 optimize,你需要先使用分片分配(查看 迁移旧索引)把索引移到一个安全的节点,再执行。

参考资料

ElasticSearch Java API 之 High Level REST Client

Elasticsearch 官方的 High Level REST Client 在 7.1.5.0 版本废弃。所以本文中的 API 不推荐使用。

快速开始

引入依赖

在 pom.xml 中引入以下依赖:

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.1</version>
</dependency>

创建连接和关闭

1
2
3
4
5
6
7
8
// 创建连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));

// 关闭
client.close();

索引 API

测试准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static final String INDEX = "mytest";
public static final String INDEX_ALIAS = "mytest_alias";
/**
* {@link User} 的 mapping 结构(json形式)
*/
public static final String MAPPING_JSON =
"{\n" + " \"properties\": {\n" + " \"_class\": {\n" + " \"type\": \"keyword\",\n"
+ " \"index\": false,\n" + " \"doc_values\": false\n" + " },\n" + " \"description\": {\n"
+ " \"type\": \"text\",\n" + " \"fielddata\": true\n" + " },\n" + " \"enabled\": {\n"
+ " \"type\": \"boolean\"\n" + " },\n" + " \"name\": {\n" + " \"type\": \"text\",\n"
+ " \"fielddata\": true\n" + " }\n" + " }\n" + "}";

@Autowired
private RestHighLevelClient client;

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);

// 设置索引的 settings
createIndexRequest.settings(
Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));

// 设置索引的 mapping
createIndexRequest.mapping(MAPPING_JSON, XContentType.JSON);

// 设置索引的别名
createIndexRequest.alias(new Alias(INDEX_ALIAS));

AcknowledgedResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(createIndexResponse.isAcknowledged());

删除索引

1
2
3
4
// 删除索引
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
AcknowledgedResponse deleteResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(deleteResponse.isAcknowledged());

判断索引是否存在

1
2
3
4
GetIndexRequest getIndexRequest = new GetIndexRequest(INDEX);
Assertions.assertTrue(client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
GetIndexRequest getIndexAliasRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexAliasRequest, RequestOptions.DEFAULT));

文档 API

文档测试准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static final String INDEX = "mytest";
public static final String INDEX_ALIAS = "mytest_alias";
/**
* {@link User} 的 mapping 结构(json形式)
*/
public static final String MAPPING_JSON =
"{\n" + " \"properties\": {\n" + " \"_class\": {\n" + " \"type\": \"keyword\",\n"
+ " \"index\": false,\n" + " \"doc_values\": false\n" + " },\n" + " \"description\": {\n"
+ " \"type\": \"text\",\n" + " \"fielddata\": true\n" + " },\n" + " \"enabled\": {\n"
+ " \"type\": \"boolean\"\n" + " },\n" + " \"name\": {\n" + " \"type\": \"text\",\n"
+ " \"fielddata\": true\n" + " }\n" + " }\n" + "}";

@Autowired
private RestHighLevelClient client;

@BeforeEach
public void init() throws IOException {

// 创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);

// 设置索引的 settings
createIndexRequest.settings(
Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));

// 设置索引的 mapping
createIndexRequest.mapping(MAPPING_JSON, XContentType.JSON);

// 设置索引的别名
createIndexRequest.alias(new Alias(INDEX_ALIAS));

AcknowledgedResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
Assertions.assertTrue(response.isAcknowledged());

// 判断索引是否存在
GetIndexRequest getIndexRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexRequest, RequestOptions.DEFAULT));
GetIndexRequest getIndexAliasRequest = new GetIndexRequest(INDEX_ALIAS);
Assertions.assertTrue(client.indices().exists(getIndexAliasRequest, RequestOptions.DEFAULT));
}

@AfterEach
public void destroy() throws IOException {
// 删除索引
DeleteIndexRequest request = new DeleteIndexRequest(INDEX);
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
Assertions.assertTrue(response.isAcknowledged());
}

创建文档

RestHighLevelClient Api 使用 IndexRequest 来构建创建文档的请求参数。

【示例】创建 id 为 1 的文档

1
2
3
4
5
6
7
8
IndexRequest request = new IndexRequest("product");
request.id("1");
Product product = new Product();
product.setName("机器人");
product.setDescription("人工智能机器人");
product.setEnabled(true);
String jsonString = JSONUtil.toJsonStr(product);
request.source(jsonString, XContentType.JSON);

同步执行

1
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

异步执行

1
2
3
4
5
6
7
8
9
10
11
12
// 异步执行
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println(indexResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

删除文档

RestHighLevelClient Api 使用 DeleteRequest 来构建删除文档的请求参数。

【示例】删除 id 为 1 的文档

1
DeleteRequest deleteRequest = new DeleteRequest(INDEX_ALIAS, "1");

同步执行

1
2
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(deleteResponse);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
System.out.println(deleteResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

更新文档

RestHighLevelClient Api 使用 UpdateRequest 来构建更新文档的请求参数。

【示例】更新 id 为 1 的文档

1
2
3
4
5
6
7
UpdateRequest updateRequest = new UpdateRequest(INDEX_ALIAS, "1");
Product product3 = new Product();
product3.setName("扫地机器人");
product3.setDescription("人工智能扫地机器人");
product3.setEnabled(true);
String jsonString2 = JSONUtil.toJsonStr(product3);
updateRequest.doc(jsonString2, XContentType.JSON);

同步执行

1
2
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
System.out.println(updateResponse);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
System.out.println(updateResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

查看文档

RestHighLevelClient Api 使用 GetRequest 来构建查看文档的请求参数。

【示例】查看 id 为 1 的文档

1
GetRequest getRequest = new GetRequest(INDEX_ALIAS, "1");

同步执行

1
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

异步执行

1
2
3
4
5
6
7
8
9
10
11
client.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println(getResponse);
}

@Override
public void onFailure(Exception e) {
System.out.println("执行失败");
}
});

获取匹配条件的记录总数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
@DisplayName("获取匹配条件的记录总数")
public void count() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchPhraseQuery("customer_gender", "MALE"));
sourceBuilder.trackTotalHits(true);

CountRequest countRequest = new CountRequest(INDEX);
countRequest.source(sourceBuilder);

CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
long count = countResponse.getCount();
System.out.println("命中记录数:" + count);
}

分页查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
@DisplayName("分页查询测试")
public void pageTest(int page) throws IOException {

int size = 10;
int offset = page * size;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchPhraseQuery("customer_gender", "MALE"));
sourceBuilder.from(offset);
sourceBuilder.size(size);
sourceBuilder.trackTotalHits(true);

SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
KibanaSampleDataEcommerceBean bean =
BeanUtil.mapToBean(hit.getSourceAsMap(), KibanaSampleDataEcommerceBean.class, true,
CopyOptions.create());
System.out.println(bean);
}
}

条件查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
@DisplayName("条件查询")
public void matchPhraseQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("customer_last_name", "Jensen"));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.trackTotalHits(true);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
KibanaSampleDataEcommerceBean bean =
BeanUtil.mapToBean(hit.getSourceAsMap(), KibanaSampleDataEcommerceBean.class, true,
CopyOptions.create());
System.out.println(bean);
}
}

参考资料