Dunwu Blog

大道至简,知易行难

MySQL 简介

::: info 概述

MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL 是最好的 RDBMS 应用软件之一。

本文简单介绍了 MySQL 的功能、特性、发行版本、简史、概念,可以让读者在短时间内对于 MySQL 有一个初步的认识。

:::

阅读全文 »

HBase 面试

HBase 简介

【基础】什么是 HBase?

:::details 要点

HBase 是一个构建在 HDFS(Hadoop 文件系统)之上的列式数据库

HBase 是一种类似于 Google’s Big Table 的数据模型,它是 Hadoop 生态系统的一部分,它将数据存储在 HDFS 上,客户端可以通过 HBase 实现对 HDFS 上数据的随机访问。

img

HBase 的核心特性如下:

  • 分布式
    • 伸缩性:支持通过增减机器进行水平扩展,以提升整体容量和性能
    • 高可用:支持 RegionServers 之间的自动故障转移
    • 自动分区:Region 分散在集群中,当行数增长的时候,Region 也会自动的分区再均衡
  • 超大数据集:HBase 被设计用来读写超大规模的数据集(数十亿行至数百亿行的表)
  • 支持结构化、半结构化和非结构化的数据:由于 HBase 基于 HDFS 构建,所以和 HDFS 一样,支持结构化、半结构化和非结构化的数据
  • 非关系型数据库
    • 不支持标准 SQL 语法
    • 没有真正的索引
    • 不支持复杂的事务:只支持行级事务,即单行数据的读写都是原子性的

HBase 的其他特性

  • 读写操作遵循强一致性
  • 过滤器支持谓词下推
  • 易于使用的 Java 客户端 API
  • 它支持线性和模块化可扩展性。
  • HBase 表支持 Hadoop MapReduce 作业的便捷基类
  • 很容易使用 Java API 进行客户端访问
  • 为实时查询提供块缓存 BlockCache 和布隆过滤器
  • 它通过服务器端过滤器提供查询谓词下推

:::

【基础】为什么需要 HBase?

:::details 要点

在 HBase 诞生之前,Hadoop 可以通过 HDFS 来存储结构化、半结构甚至非结构化的数据,它是传统数据库的补充,是海量数据存储的最佳方法,它针对大文件的存储,批量访问和流式访问都做了优化,同时也通过多副本解决了容灾问题。

Hadoop 的缺陷在于:它只能执行批处理,并且只能以顺序方式访问数据。这意味着即使是最简单的工作,也必须搜索整个数据集,即:Hadoop 无法实现对数据的随机访问。实现数据的随机访问是传统的关系型数据库所擅长的,但它们却不能用于海量数据的存储。在这种情况下,必须有一种新的方案来同时解决海量数据存储和随机访问的问题,HBase 就是其中之一 (HBase,Cassandra,CouchDB,Dynamo 和 MongoDB 都能存储海量数据并支持随机访问)。

注:数据结构分类:

  • 结构化数据:即以关系型数据库表形式管理的数据;
  • 半结构化数据:非关系模型的,有基本固定结构模式的数据,例如日志文件、XML 文档、JSON 文档、Email 等;
  • 非结构化数据:没有固定模式的数据,如 WORD、PDF、PPT、EXL,各种格式的图片、视频等。

:::

【基础】HBase 有哪些应用场景?

:::details 要点

根据上一节对于 HBase 特性的介绍,我们可以梳理出 HBase 适用、不适用的场景:

HBase 不适用场景

  • 需要索引
  • 需要复杂的事务
  • 数据量较小(比如:数据量不足几百万行)

HBase 适用场景

  • 能存储海量数据并支持随机访问(比如:数据量级达到十亿级至百亿级)
  • 存储结构化、半结构化数据
  • 硬件资源充足

一言以蔽之——HBase 适用的场景是:实时地随机访问超大数据集

HBase 的典型应用场景

  • 存储监控数据
  • 存储用户/车辆 GPS 信息
  • 存储用户行为数据
  • 存储各种日志数据,如:访问日志、操作日志、推送日志等。
  • 存储短信、邮件等消息类数据
  • 存储网页数据

:::

【基础】HBase vs. RDBMS?

:::details 要点

HBase 和 RDBMS 的不同之处如下:

RDBMS HBase
RDBMS 有它的模式,描述表的整体结构的约束 HBase 无模式,它不具有固定列模式的概念;仅定义列族
支持的文件系统有 FAT、NTFS 和 EXT 支持的文件系统只有 HDFS
使用提交日志来存储日志 使用预写日志 (WAL) 来存储日志
使用特定的协调系统来协调集群 使用 ZooKeeper 来协调集群
存储的都是中小规模的数据表 存储的是超大规模的数据表,并且适合存储宽表
通常支持复杂的事务 仅支持行级事务
适用于结构化数据 适用于半结构化、结构化数据
使用主键 使用 row key

:::

【基础】HBase vs. HDFS?

:::details 要点

HBase 和 HDFS 的不同之处如下:

HDFS HBase
HDFS 提供了一个用于分布式存储的文件系统。 HBase 提供面向表格列的数据存储。
HDFS 为大文件提供优化存储。 HBase 为表格数据提供了优化。
HDFS 使用块文件。 HBase 使用键值对数据。
HDFS 数据模型不灵活。 HBase 提供了一个灵活的数据模型。
HDFS 使用文件系统和处理框架。 HBase 使用带有内置 Hadoop MapReduce 支持的表格存储。
HDFS 主要针对一次写入多次读取进行了优化。 HBase 针对读/写许多进行了优化。

:::

【基础】行式数据库 vs. 列式数据库?

:::details 要点

行式数据库和列式数据库的不同之处如下:

行式数据库 列式数据库
对于添加/修改操作更高效 对于读取操作更高效
读取整行数据 仅读取必要的列数据
最适合在线事务处理系统(OLTP) 不适合在线事务处理系统(OLTP)
将行数据存储在连续的页内存中 将列数据存储在非连续的页内存中

列式数据库的优点:

  • 支持数据压缩
  • 支持快速数据检索
  • 简化了管理和配置
  • 有利于聚合查询(例如 COUNT、SUM、AVG、MIN 和 MAX)的高性能
  • 分区效率很高,因为它提供了自动分片机制的功能,可以将较大的区域分配给较小的区域

列式数据库的缺点:

  • JOIN 查询和来自多个表的数据未优化
  • 必须为频繁的删除和更新创建拆分,因此降低了存储效率
  • 由于非关系数据库的特性,分区和索引的设计非常困难

:::

HBase 存储

【基础】HBase 表有什么特性?

:::details 要点

Hbase 的表具有以下特点:

  • 容量大:一个表可以有数十亿行,上百万列;
  • 面向列:数据是按照列存储,每一列都单独存放,数据即索引,在查询时可以只访问指定列的数据,有效地降低了系统的 I/O 负担;
  • 稀疏性:空 (null) 列并不占用存储空间,表可以设计的非常稀疏 ;
  • 数据多版本:每个单元中的数据可以有多个版本,按照时间戳排序,新的数据在最上面;
  • 存储类型:所有数据的底层存储格式都是字节数组 (byte[])。

:::

【基础】HBase 的逻辑存储模型是怎样的?

:::details 要点

HBase 是一个面向 的数据库管理系统,这里更为确切的而说,HBase 是一个面向 列族 的数据库管理系统。表 schema 仅定义列族,表具有多个列族,每个列族可以包含任意数量的列,列由多个单元格(cell)组成,单元格可以存储多个版本的数据,多个版本数据以时间戳进行区分。

HBase 数据模型和关系型数据库有所不同。其数据模型的关键术语如下:

  • **Table**:Table 由 Row 和 Column 组成。
  • **Row**:Row 是列族(Column Family)的集合。
  • Row KeyRow Key 是用来检索记录的主键
    • Row Key 是未解释的字节数组,所以理论上,任何数据都可以通过序列化表示成字符串或二进制,从而存为 HBase 的键值。
    • 表中的行,是按照 Row Key 的字典序进行排序。这里需要注意以下两点:
      • 因为字典序对 Int 排序的结果是 1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。如果你使用整型的字符串作为行键,那么为了保持整型的自然序,行键必须用 0 作左填充。
      • 行的一次读写操作是原子性的 (不论一次读写多少列)。
    • 所有对表的访问都要通过 Row Key,有以下三种方式:
      • 通过指定的 Row Key 进行访问;
      • 通过 Row Key 的 range 进行访问,即访问指定范围内的行;
      • 进行全表扫描。
  • **Column Family**:即列族。HBase 表中的每个列,都归属于某个列族。列族是表的 Schema 的一部分,所以列族需要在创建表时进行定义。
    • 一个表的列族必须作为表模式定义的一部分预先给出,但是新的列族成员可以随后按需加入。
    • 同一个列族的所有成员具有相同的前缀,例如 info:formatinfo:geo 都属于 info 这个列族。
  • **Column Qualifier**:列限定符。可以理解为是具体的列名,例如 info:formatinfo:geo 都属于 info 这个列族,它们的列限定符分别是 formatgeo。列族和列限定符之间始终以冒号分隔。需要注意的是列限定符不是表 Schema 的一部分,你可以在插入数据的过程中动态创建列。
  • **Column**:HBase 中的列由列族和列限定符组成,由 :(冒号) 进行分隔,即一个完整的列名应该表述为 列族名 :列限定符
  • **Cell**:Cell 是行,列族和列限定符的组合,并包含值和时间戳。HBase 中通过 row keycolumn 确定的为一个存储单元称为 Cell,你可以等价理解为关系型数据库中由指定行和指定列确定的一个单元格,但不同的是 HBase 中的一个单元格是由多个版本的数据组成的,每个版本的数据用时间戳进行区分。
    • Cell 由行和列的坐标交叉决定,是有版本的。默认情况下,版本号是自动分配的,为 HBase 插入 Cell 时的时间戳。Cell 的内容是未解释的字节数组。
  • **Timestamp**:Cell 的版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase 在数据写入时自动赋值,也可以由客户显式指定。每个 Cell 中,不同版本的数据按照时间戳倒序排列,即最新的数据排在最前面。

img

下图为 HBase 中一张表的:

  • RowKey 为行的唯一标识,所有行按照 RowKey 的字典序进行排序;
  • 该表具有两个列族,分别是 personal 和 office;
  • 其中列族 personal 拥有 name、city、phone 三个列,列族 office 拥有 tel、addres 两个列。

img

图片引用自 : HBase 是列式存储数据库吗 https://www.iteblog.com/archives/2498.html

:::

【基础】HBase 的物理存储模型是怎样的?

:::details 要点

HBase Table 中的所有行按照 Row Key 的字典序排列。HBase Tables 通过行键的范围 (row key range) 被水平切分成多个 Region, 一个 Region 包含了在 start key 和 end key 之间的所有行。

img

每个表一开始只有一个 Region,随着数据不断增加,Region 会不断增大,当增大到一个阀值的时候,Region 就会等分为两个新的 Region。当 Table 中的行不断增多,就会有越来越多的 Region

img

Region 是 HBase 中分布式存储和负载均衡的最小单元。这意味着不同的 Region 可以分布在不同的 Region Server 上。但一个 Region 是不会拆分到多个 Server 上的。

img

:::

HBase 架构

【中级】HBase 读数据流程是怎样的?

:::details 要点

以下是客户端首次读写 HBase 上数据的流程:

  1. 客户端从 Zookeeper 获取 META 表所在的 Region Server;
  2. 客户端访问 META 表所在的 Region Server,从 META 表中查询到访问行键所在的 Region Server,之后客户端将缓存这些信息以及 META 表的位置;
  3. 客户端从行键所在的 Region Server 上获取数据。

如果再次读取,客户端将从缓存中获取行键所在的 Region Server。这样客户端就不需要再次查询 META 表,除非 Region 移动导致缓存失效,这样的话,则将会重新查询并更新缓存。

注:META 表是 HBase 中一张特殊的表,它保存了所有 Region 的位置信息,META 表自己的位置信息则存储在 ZooKeeper 上。

img

更为详细读取数据流程参考:

HBase 原理-数据读取流程解析

HBase 原理-迟到的‘数据读取流程部分细节

:::

【中级】HBase 写数据流程是怎样的?

:::details 要点

  1. Client 向 Region Server 提交写请求;
  2. Region Server 找到目标 Region;
  3. Region 检查数据是否与 Schema 一致;
  4. 如果客户端没有指定版本,则获取当前系统时间作为数据版本;
  5. 将更新写入 WAL Log;
  6. 将更新写入 Memstore;
  7. 判断 Memstore 存储是否已满,如果存储已满则需要 flush 为 Store Hfile 文件。

更为详细写入流程可以参考:HBase - 数据写入流程解析

:::

【中级】HBase 有哪些核心组件?

:::details 要点

HBase 系统遵循 Master/Salve 架构,由三种不同类型的组件组成:

  • Zookeeper
    • 保证任何时候,集群中只有一个 Master;
    • 存储所有 Region 的寻址入口;
    • 实时监控 Region Server 的状态,将 Region Server 的上线和下线信息实时通知给 Master;
    • 存储 HBase 的 Schema,包括有哪些 Table,每个 Table 有哪些 Column Family 等信息。
  • Master
    • 为 Region Server 分配 Region ;
    • 负责 Region Server 的负载均衡 ;
    • 发现失效的 Region Server 并重新分配其上的 Region;
    • GFS 上的垃圾文件回收;
    • 处理 Schema 的更新请求。
  • Region Server
    • Region Server 负责维护 Master 分配给它的 Region ,并处理发送到 Region 上的 IO 请求;
    • Region Server 负责切分在运行过程中变得过大的 Region。

img

HBase 使用 ZooKeeper 作为分布式协调服务来维护集群中的服务器状态。 Zookeeper 负责维护可用服务列表,并提供服务故障通知等服务:

  • 每个 Region Server 都会在 ZooKeeper 上创建一个临时节点,Master 通过 Zookeeper 的 Watcher 机制对节点进行监控,从而可以发现新加入的 Region Server 或故障退出的 Region Server;
  • 所有 Masters 会竞争性地在 Zookeeper 上创建同一个临时节点,由于 Zookeeper 只能有一个同名节点,所以必然只有一个 Master 能够创建成功,此时该 Master 就是主 Master,主 Master 会定期向 Zookeeper 发送心跳。备用 Masters 则通过 Watcher 机制对主 HMaster 所在节点进行监听;
  • 如果主 Master 未能定时发送心跳,则其持有的 Zookeeper 会话会过期,相应的临时节点也会被删除,这会触发定义在该节点上的 Watcher 事件,使得备用的 Master Servers 得到通知。所有备用的 Master Servers 在接到通知后,会再次去竞争性地创建临时节点,完成主 Master 的选举。

img

:::

HBase 高可用

Hive 面试

Hive 简介

【基础】什么是 Hive?

:::details 要点

Apache Hive 是一种分布式、容错数据仓库,支持大规模分析。Hive Metastore (HMS) 提供了一个元数据的中央存储库,可以轻松分析以做出明智的数据驱动决策,因此它是许多数据湖架构的关键组件。Hive 构建在 Apache Hadoop 之上,并通过 hdfs 支持在 S3、adls、gs 等上进行存储。Hive 允许用户使用 SQL 读取、写入和管理 PB 级数据。

Hive 可以将结构化的数据文件映射成表,并提供类 SQL 查询功能。用于查询的 SQL 语句会被转化为 MapReduce 作业,然后提交到 Hadoop 上运行。

特点

  1. 简单、容易上手(提供了类似 sql 的查询语言 hql),使得精通 sql 但是不了解 Java 编程的人也能很好地进行大数据分析;
  2. 灵活性高,可以自定义用户函数 (UDF) 和存储格式;
  3. 为超大的数据集设计的计算和存储能力,集群扩展容易;
  4. 统一的元数据管理,可与 presto/impala/sparksql 等共享数据;
  5. 执行延迟高,不适合做数据的实时处理,但适合做海量数据的离线处理。

:::

【基础】什么是 HMS?

:::details 要点

Hive Metastore (HMS) 是关系数据库中 Hive 表和分区元数据的中央存储库,它使用元存储服务 API 为客户端(包括 Hive、Impala 和 Spark)提供对此信息的访问。它已成为利用各种开源软件(如 Apache Spark 和 Presto)的数据湖的构建块。事实上,整个工具生态系统,无论是开源的还是其他的,都是围绕 Hive Metastore 构建的,下图说明了其中一些。

Apache Software Foundation

:::

Hive 存储

【基础】Hive 支持哪些数据类型?

:::details 要点

Hive 表中的列支持以下基本数据类型:

大类 类型
Integers(整型) TINYINT—1 字节的有符号整数
SMALLINT—2 字节的有符号整数
INT—4 字节的有符号整数
BIGINT—8 字节的有符号整数
Boolean(布尔型) BOOLEAN—TRUE/FALSE
Floating point numbers(浮点型) FLOAT— 单精度浮点型
DOUBLE—双精度浮点型
Fixed point numbers(定点数) DECIMAL—用户自定义精度定点数,比如 DECIMAL(7,2)
String types(字符串) STRING—指定字符集的字符序列
VARCHAR—具有最大长度限制的字符序列
CHAR—固定长度的字符序列
Date and time types(日期时间类型) TIMESTAMP — 时间戳
TIMESTAMP WITH LOCAL TIME ZONE — 时间戳,纳秒精度
DATE—日期类型
Binary types(二进制类型) BINARY—字节序列

TIMESTAMP 和 TIMESTAMP WITH LOCAL TIME ZONE 的区别如下:

  • TIMESTAMP WITH LOCAL TIME ZONE:用户提交时间给数据库时,会被转换成数据库所在的时区来保存。查询时则按照查询客户端的不同,转换为查询客户端所在时区的时间。
  • TIMESTAMP :提交什么时间就保存什么时间,查询时也不做任何转换。

此外,Hive 还支持以下复杂类型:

类型 描述 示例
STRUCT 类似于对象,是字段的集合,字段的类型可以不同,可以使用 名称。字段名 方式进行访问 STRUCT (‘xiaoming’, 12 , ‘2018-12-12’)
MAP 键值对的集合,可以使用 名称 [key] 的方式访问对应的值 map(‘a’, 1, ‘b’, 2)
ARRAY 数组是一组具有相同类型和名称的变量的集合,可以使用 名称 [index] 访问对应的值 ARRAY(‘a’, ‘b’, ‘c’, ‘d’)

:::

【基础】Hive 支持哪些存储格式?

:::details 要点

Hive 会在 HDFS 为每个数据库上创建一个目录,数据库中的表是该目录的子目录,表中的数据会以文件的形式存储在对应的表目录下。Hive 支持以下几种文件存储格式:

格式 说明
TextFile 存储为纯文本文件。 这是 Hive 默认的文件存储格式。这种存储方式数据不做压缩,磁盘开销大,数据解析开销大。
SequenceFile SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。
RCFile RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。
ORC Files ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。
Avro Files Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。
Parquet Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。

以上压缩格式中 ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用这两种格式。

通常在创建表的时候使用 STORED AS 参数指定:

1
2
3
4
5
6
CREATE TABLE page_view(viewTime INT, userid BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

各个存储文件类型指定方式如下:

  • STORED AS TEXTFILE
  • STORED AS SEQUENCEFILE
  • STORED AS ORC
  • STORED AS PARQUET
  • STORED AS AVRO
  • STORED AS RCFILE

:::

【基础】Hive 中的内部表和外部表有什么区别?

:::details 要点

内部表又叫做管理表 (Managed/Internal Table),创建表时不做任何指定,默认创建的就是内部表。想要创建外部表 (External Table),则需要使用 External 进行修饰。 内部表和外部表主要区别如下:

内部表 外部表
数据存储位置 内部表数据存储的位置由 hive.metastore.warehouse.dir 参数指定,默认情况下表的数据存储在 HDFS 的 /user/hive/warehouse/数据库名。db/表名/ 目录下 外部表数据的存储位置创建表时由 Location 参数指定;
导入数据 在导入数据到内部表,内部表将数据移动到自己的数据仓库目录下,数据的生命周期由 Hive 来进行管理 外部表不会将数据移动到自己的数据仓库目录下,只是在元数据中存储了数据的位置
删除表 删除元数据(metadata)和文件 只删除元数据(metadata)

:::

【基础】什么是分区表?

:::details 要点

Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。

分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where 子句中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能。

分区表并非 Hive 独有的概念,实际上这个概念非常常见。通常,在管理大规模数据集的时候都需要进行分区,比如将日志文件按天进行分区,从而保证数据细粒度的划分,使得查询性能得到提升。比如,在我们常用的 Oracle 数据库中,当表中的数据量不断增大,查询数据的速度就会下降,这时也可以对表进行分区。表进行分区后,逻辑上表仍然是一张完整的表,只是将表中的数据存放到多个表空间(物理文件上),这样查询数据时,就不必要每次都扫描整张表,从而提升查询性能。

在 Hive 中可以使用 PARTITIONED BY 子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:

1
2
3
4
5
6
7
8
9
10
11
12
 CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

加载数据到分区表时候必须要指定数据所处的分区:

1
2
3
4
# 加载部门编号为 20 的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 加载部门编号为 30 的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

这时候我们直接查看表目录,可以看到表目录下存在两个子目录,分别是 deptno=20deptno=30, 这就是分区目录,分区目录下才是我们加载的数据文件。

1
# hadoop fs -ls  hdfs://hadoop001:8020/hive/emp_partition/

这时候当你的查询语句的 where 包含 deptno=20,则就去对应的分区目录下进行查找,而不用扫描全表。

:::

【基础】什么是分桶表?

:::details 要点

分区提供了一个隔离数据和优化查询的可行方案,但是并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。同时 Hive 会限制动态分区可以创建的最大分区数,用来避免过多分区文件对文件系统产生负担。鉴于以上原因,Hive 还提供了一种更加细粒度的数据拆分方案:分桶表 (bucket Table)。

分桶表会将指定列的值进行哈希散列,并对 bucket(桶数量)取余,然后存储到对应的 bucket(桶)中。

单从概念上理解分桶表可能会比较晦涩,其实和分区一样,分桶这个概念同样不是 Hive 独有的,对于 Java 开发人员而言,这可能是一个每天都会用到的概念,因为 Hive 中的分桶概念和 Java 数据结构中的 HashMap 的分桶概念是一致的。

当调用 HashMap 的 put() 方法存储数据时,程序会先对 key 值调用 hashCode() 方法计算出 hashcode,然后对数组长度取模计算出 index,最后将数据存储在数组 index 位置的链表上,链表达到一定阈值后会转换为红黑树 (JDK1.8+)。下图为 HashMap 的数据结构图:

img

图片引用自:HashMap vs. Hashtable

在 Hive 中,我们可以通过 CLUSTERED BY 指定分桶列,并通过 SORTED BY 指定桶中数据的排序参考列。下面为分桶表建表语句示例:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

这里直接使用 Load 语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。

这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:

(1)设置强制分桶

1
set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步

在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。

(2)CTAS 导入数据

1
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  --这里的 emp 表就是一张普通的雇员表

可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:

img

查看分桶文件

bucket(桶) 本质上就是表目录下的具体文件:

img

:::

【基础】分区和分桶可以一起使用吗?

:::details 要点

分区表和分桶表的本质都是将数据按照不同粒度进行拆分,从而使得在查询时候不必扫描全表,只需要扫描对应的分区或分桶,从而提升查询效率。两者可以结合起来使用,从而保证表数据在不同粒度上都能得到合理的拆分。下面是 Hive 官方给出的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE page_view_bucketed(
viewTime INT,
userid BIGINT,
page_url STRING,
referrer_url STRING,
ip STRING )
PARTITIONED BY(dt STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

此时导入数据时需要指定分区:

1
2
3
INSERT OVERWRITE page_view_bucketed
PARTITION (dt='2009-02-25')
SELECT * FROM page_view WHERE dt='2009-02-25';

:::

Hive 索引

【中级】Hive 的索引是如何工作的?

:::details 要点

Hive 在 0.7.0 引入了索引的功能,索引的设计目标是提高表某些列的查询速度。如果没有索引,带有谓词的查询(如’WHERE table1.column = 10’)会加载整个表或分区并处理所有行。但是如果 column 存在索引,则只需要加载和处理文件的一部分。

在指定列上建立索引,会产生一张索引表(表结构如下),里面的字段包括:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。在查询涉及到索引字段时,首先到索引表查找索引列值对应的 HDFS 文件路径及偏移量,这样就避免了全表扫描。

1
2
3
4
5
6
7
+--------------+----------------+----------+--+
| col_name | data_type | comment |
+--------------+----------------+----------+--+
| empno | int | 建立索引的列 |
| _bucketname | string | HDFS 文件路径 |
| _offsets | array<bigint> | 偏移量 |
+--------------+----------------+----------+--+

创建索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE INDEX index_name     --索引名称
ON TABLE base_table_name (col_name, ...) --建立索引的列
AS index_type --索引类型
[WITH DEFERRED REBUILD] --重建索引
[IDXPROPERTIES (property_name=property_value, ...)] --索引额外属性
[IN TABLE index_table_name] --索引表的名字
[
[ ROW FORMAT ...] STORED AS ...
| STORED BY ...
] --索引表行分隔符 、 存储格式
[LOCATION hdfs_path] --索引表存储位置
[TBLPROPERTIES (...)] --索引表表属性
[COMMENT "index comment"]; --索引注释

查看索引:

1
2
--显示表上所有列的索引
SHOW FORMATTED INDEX ON table_name;

删除索引:

删除索引会删除对应的索引表。

1
DROP INDEX [IF EXISTS] index_name ON table_name;

如果存在索引的表被删除了,其对应的索引和索引表都会被删除。如果被索引表的某个分区被删除了,那么分区对应的分区索引也会被删除。

重建索引:

1
ALTER INDEX index_name ON table_name [PARTITION partition_spec] REBUILD;

重建索引。如果指定了 PARTITION,则仅重建该分区的索引。

:::

【中级】Hive 索引有什么缺陷?

:::details 要点

索引表最主要的一个缺陷在于:索引表无法自动 rebuild,这也就意味着如果表中有数据新增或删除,则必须手动 rebuild,重新执行 MapReduce 作业,生成索引表数据。

同时按照 官方文档 的说明,Hive 会从 3.0 开始移除索引功能,主要基于以下两个原因:

  • 具有自动重写的物化视图 (Materialized View) 可以产生与索引相似的效果(Hive 2.3.0 增加了对物化视图的支持,在 3.0 之后正式引入)。
  • 使用列式存储文件格式(Parquet,ORC)进行存储时,这些格式支持选择性扫描,可以跳过不需要的文件或块。

ORC 内置的索引功能可以参阅这篇文章:Hive 性能优化之 ORC 索引–Row Group Index vs Bloom Filter Index

:::

Hive 架构

【高级】Hive SQL 如何执行的?

:::details 要点

Hive 在执行一条 HQL 的时候,会经过以下步骤:

  1. 语法解析:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象 语法树 AST Tree;
  2. 语义解析:遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
  3. 生成逻辑执行计划:遍历 QueryBlock,翻译为执行操作树 OperatorTree;
  4. 优化逻辑执行计划:逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量;
  5. 生成物理执行计划:遍历 OperatorTree,翻译为 MapReduce 任务;
  6. 优化物理执行计划:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

关于 Hive SQL 的详细执行流程可以参考美团技术团队的文章:Hive SQL 的编译过程

:::

参考资料

刷题

经典数据结构

数组

题目 难度 状态
1. 两数之和 简单 通过
167. 两数之和 II - 输入有序数组 中等 通过
剑指 Offer II 006. 排序数组中两个数字之和 简单 通过
剑指 Offer 57. 和为 s 的两个数字 简单 通过
136. 只出现一次的数字 简单 通过
217. 存在重复元素 简单 通过
2073. 买票需要的时间 简单 通过
26. 删除有序数组中的重复项 简单 未通过
27. 移除元素
283. 移动零
344. 反转字符串
5. 最长回文子串
263. 丑数 简单 未通过
264. 丑数 II 中等 未通过
1201. 丑数 III 中等 未通过
313. 超级丑数 中等 未通过
373. 查找和最小的 K 对数字

链表

题目 难度 通关
19. 删除链表的倒数第 N 个结点 中等 通过
21. 合并两个有序链表 简单 通过
23. 合并 K 个升序链表 困难 通过
83. 删除排序链表中的重复元素 简单 通过
82. 删除排序链表中的重复元素 II 中等 半通过
86. 分隔链表 简单 通过
876. 链表的中间结点 简单 通过
剑指 Offer 22. 链表中倒数第 k 个节点 简单 通过
141. 环形链表 简单 通过
142. 环形链表 II 中等 未通过
160. 相交链表 简单 半通过
1836. 从未排序的链表中移除重复元素 中等 半通过

栈和队列

题目 难度
20. 有效的括号 简单
225. 用队列实现栈 简单 通过
232. 用栈实现队列 简单 通过

解题套路

链表双指针技巧

LeetCode 力扣 难度
21. Merge Two Sorted Lists 21. 合并两个有序链表 🟢
86. Partition List 86. 分隔链表 🟠
23. Merge k Sorted Lists 23. 合并K个升序链表 🔴
141. Linked List Cycle 141. 环形链表 🟢
142. Linked List Cycle II 142. 环形链表 II 🟠
876. Middle of the Linked List 876. 链表的中间结点 🟢
19. Remove Nth Node From End of List 19. 删除链表的倒数第 N 个结点 🟠
160. Intersection of Two Linked Lists 160. 相交链表 🟢
264. Ugly Number II 264. 丑数 II 🟠
378. Kth Smallest Element in a Sorted Matrix 378. 有序矩阵中第 K 小的元素 🟠
373. Find K Pairs with Smallest Sums 373. 查找和最小的 K 对数字 🟠
82. Remove Duplicates from Sorted List II 82. 删除排序链表中的重复元素 II 🟠
2. Add Two Numbers 2. 两数相加 🟠
445. Add Two Numbers II 445. 两数相加 II 🟠

递归操作链表

LeetCode 力扣 难度
234. Palindrome Linked List 234. 回文链表 🟢
206. Reverse Linked List 206. 反转链表 🟢
92. Reverse Linked List II 92. 反转链表 II 🟠
25. Reverse Nodes in k-Group 25. K 个一组翻转链表 🔴

数组双指针技巧

LeetCode 力扣 难度
26. Remove Duplicates from Sorted Array 26. 删除有序数组中的重复项 🟢
83. Remove Duplicates from Sorted List 83. 删除排序链表中的重复元素 🟢
27. Remove Element 27. 移除元素 🟢
283. Move Zeroes 283. 移动零 🟢
167. Two Sum II - Input Array Is Sorted 167. 两数之和 II - 输入有序数组 🟠
344. Reverse String 344. 反转字符串 🟢
5. Longest Palindromic Substring 5. 最长回文子串 🟠
80. Remove Duplicates from Sorted Array II 80. 删除有序数组中的重复项 II 🟠
125. Valid Palindrome 125. 验证回文串 🟢
75. Sort Colors 75. 颜色分类 🟠
88. Merge Sorted Array 88. 合并两个有序数组 🟢
977. Squares of a Sorted Array 977. 有序数组的平方 🟢

二维数组操作技巧

LeetCode 力扣 难度
151. Reverse Words in a String 151. 反转字符串中的单词 🟠
48. Rotate Image 48. 旋转图像 🟠
54. Spiral Matrix 54. 螺旋矩阵 🟠
59. Spiral Matrix II 59. 螺旋矩阵 II 🟠
1329. Sort the Matrix Diagonally 1329. 将矩阵按对角线排序 🟠
1260. Shift 2D Grid 1260. 二维网格迁移 🟢
867. Transpose Matrix 867. 转置矩阵 🟢
14. Longest Common Prefix 14. 最长公共前缀 🟢

滑动窗口算法

LeetCode 力扣 难度
76. Minimum Window Substring 76. 最小覆盖子串 🔴
567. Permutation in String 567. 字符串的排列 🟠
438. Find All Anagrams in a String 438. 找到字符串中所有字母异位词 🟠
3. Longest Substring Without Repeating Characters 3. 无重复字符的最长子串 🟠
1658. Minimum Operations to Reduce X to Zero 1658. 将 x 减到 0 的最小操作数 🟠
713. Subarray Product Less Than K 713. 乘积小于 K 的子数组 🟠
219. Contains Duplicate II 219. 存在重复元素 II 🟢
220. Contains Duplicate III 220. 存在重复元素 III 🔴
209. Minimum Size Subarray Sum 209. 长度最小的子数组 🟠

二分搜索算法

LeetCode 力扣 难度
704. Binary Search 704. 二分查找 🟢
34. Find First and Last Position of Element in Sorted Array 34. 在排序数组中查找元素的第一个和最后一个位置 🟠
875. Koko Eating Bananas 875. 爱吃香蕉的珂珂 🟠
1011. Capacity To Ship Packages Within D Days 1011. 在 D 天内送达包裹的能力 🟠
410. Split Array Largest Sum 410. 分割数组的最大值 🔴

前缀和/差分技巧

LeetCode 力扣 难度
303. Range Sum Query - Immutable 303. 区域和检索 - 数组不可变 🟢
304. Range Sum Query 2D - Immutable 304. 二维区域和检索 - 矩阵不可变 🟠
1109. Corporate Flight Bookings 1109. 航班预订统计 🟠
1094. Car Pooling 1094. 拼车 🟠

LeetCode 力扣 难度
71. Simplify Path 71. 简化路径 🟠
143. Reorder List 143. 重排链表 🟠
20. Valid Parentheses 20. 有效的括号 🟢
150. Evaluate Reverse Polish Notation 150. 逆波兰表达式求值 🟠
225. Implement Stack using Queues 225. 用队列实现栈 🟢
388. Longest Absolute File Path 388. 文件的最长绝对路径 🟠

队列

LeetCode 力扣 难度
933. Number of Recent Calls 933. 最近的请求次数 🟢
622. Design Circular Queue 622. 设计循环队列 🟠
641. Design Circular Deque 641. 设计循环双端队列 🟠
1670. Design Front Middle Back Queue 1670. 设计前中后队列 🟠
2073. Time Needed to Buy Tickets 2073. 买票需要的时间 🟢

单调栈技巧

LeetCode 力扣 难度
1019. Next Greater Node In Linked List 1019. 链表中的下一个更大节点 🟠
1944. Number of Visible People in a Queue 1944. 队列中可以看到的人数 🔴
1475. Final Prices With a Special Discount in a Shop 1475. 商品折扣后的最终价格 🟢
901. Online Stock Span 901. 股票价格跨度 🟠
402. Remove K Digits 402. 移掉 K 位数字 🟠
853. Car Fleet 853. 车队 🟠
581. Shortest Unsorted Continuous Subarray 581. 最短无序连续子数组 🟠

单调队列技巧

LeetCode 力扣 难度
239. Sliding Window Maximum 239. 滑动窗口最大值 🔴
1438. Longest Continuous Subarray With Absolute Diff Less Than or Equal to Limit 1438. 绝对差不超过限制的最长连续子数组 🟠
862. Shortest Subarray with Sum at Least K 862. 和至少为 K 的最短子数组 🔴
918. Maximum Sum Circular Subarray 918. 环形子数组的最大和 🟠

二叉树

LeetCode 力扣 难度
226. Invert Binary Tree 226. 翻转二叉树 🟢
114. Flatten Binary Tree to Linked List 114. 二叉树展开为链表 🟠
116. Populating Next Right Pointers in Each Node 116. 填充每个节点的下一个右侧节点指针 🟠
654. Maximum Binary Tree 654. 最大二叉树 🟠
105. Construct Binary Tree from Preorder and Inorder Traversal 105. 从前序与中序遍历序列构造二叉树 🟠
106. Construct Binary Tree from Inorder and Postorder Traversal 106. 从中序与后序遍历序列构造二叉树 🟠
889. Construct Binary Tree from Preorder and Postorder Traversal 889. 根据前序和后序遍历构造二叉树 🟠
297. Serialize and Deserialize Binary Tree 297. 二叉树的序列化与反序列化 🔴
236. Lowest Common Ancestor of a Binary Tree 236. 二叉树的最近公共祖先 🟠
235. Lowest Common Ancestor of a Binary Search Tree 235. 二叉搜索树的最近公共祖先 🟠
222. Count Complete Tree Nodes 222. 完全二叉树的节点个数 🟠

二叉搜索树

LeetCode 力扣 难度
230. Kth Smallest Element in a BST 230. 二叉搜索树中第K小的元素 🟠
538. Convert BST to Greater Tree 538. 把二叉搜索树转换为累加树 🟠
1038. Binary Search Tree to Greater Sum Tree 1038. 从二叉搜索树到更大和树 🟠
450. Delete Node in a BST 450. 删除二叉搜索树中的节点 🟠
701. Insert into a Binary Search Tree 701. 二叉搜索树中的插入操作 🟠
700. Search in a Binary Search Tree 700. 二叉搜索树中的搜索 🟢
98. Validate Binary Search Tree 98. 验证二叉搜索树 🟠
96. Unique Binary Search Trees 96. 不同的二叉搜索树 🟠
95. Unique Binary Search Trees II 95. 不同的二叉搜索树 II 🟠

数据结构设计

LeetCode 力扣 难度
146. LRU Cache 146. LRU 缓存 🟠
460. LFU Cache 460. LFU 缓存 🔴
729. My Calendar I 729. 我的日程安排表 I 🟠
950. Reveal Cards In Increasing Order 950. 按递增顺序显示卡牌 🟠
1700. Number of Students Unable to Eat Lunch 1700. 无法吃午餐的学生数量 🟢
155. Min Stack 155. 最小栈 🟠
1670. Design Front Middle Back Queue 1670. 设计前中后队列 🟠
895. Maximum Frequency Stack 895. 最大频率栈 🔴
224. Basic Calculator 224. 基本计算器 🔴
227. Basic Calculator II 227. 基本计算器 II 🟠

图相关算法

LeetCode 力扣 难度
207. Course Schedule 207. 课程表 🟠
210. Course Schedule II 210. 课程表 II 🟠
990. Satisfiability of Equality Equations 990. 等式方程的可满足性 🟠
684. Redundant Connection 684. 冗余连接 🟠
1584. Min Cost to Connect All Points 1584. 连接所有点的最小费用 🟠
743. Network Delay Time 743. 网络延迟时间 🟠
1631. Path With Minimum Effort 1631. 最小体力消耗路径 🟠
1514. Path with Maximum Probability 1514. 概率最大的路径 🟠

DFS/回溯算法

LeetCode 力扣 难度
78. Subsets 78. 子集 🟠
90. Subsets II 90. 子集 II 🟠
77. Combinations 77. 组合 🟠
39. Combination Sum 39. 组合总和 🟠
40. Combination Sum II 40. 组合总和 II 🟠
216. Combination Sum III 216. 组合总和 III 🟠
46. Permutations 46. 全排列 🟠
47. Permutations II 47. 全排列 II 🟠
37. Sudoku Solver 37. 解数独 🔴
51. N-Queens 51. N 皇后 🔴
52. N-Queens II 52. N皇后 II 🔴
200. Number of Islands 200. 岛屿数量 🟠
1254. Number of Closed Islands 1254. 统计封闭岛屿的数目 🟠
695. Max Area of Island 695. 岛屿的最大面积 🟠
1905. Count Sub Islands 1905. 统计子岛屿 🟠
967. Numbers With Same Consecutive Differences 967. 连续差相同的数字 🟠
491. Non-decreasing Subsequences 491. 递增子序列 🟠
980. Unique Paths III 980. 不同路径 III 🔴
131. Palindrome Partitioning 131. 分割回文串 🟠
93. Restore IP Addresses 93. 复原 IP 地址 🟠
17. Letter Combinations of a Phone Number 17. 电话号码的字母组合 🟠
79. Word Search 79. 单词搜索 🟠

BFS 算法

LeetCode 力扣 难度
752. Open the Lock 752. 打开转盘锁 🟠
773. Sliding Puzzle 773. 滑动谜题 🔴
919. Complete Binary Tree Inserter 919. 完全二叉树插入器 🟠
841. Keys and Rooms 841. 钥匙和房间 🟠
433. Minimum Genetic Mutation 433. 最小基因变化 🟠
1926. Nearest Exit from Entrance in Maze 1926. 迷宫中离入口最近的出口 🟠
1091. Shortest Path in Binary Matrix 1091. 二进制矩阵中的最短路径 🟠
994. Rotting Oranges 994. 腐烂的橘子 🟠
721. Accounts Merge 721. 账户合并 🟠
127. Word Ladder 127. 单词接龙 🔴
365. Water and Jug Problem 365. 水壶问题 🟠

动态规划

LeetCode 力扣 难度
509. Fibonacci Number 509. 斐波那契数 🟢
322. Coin Change 322. 零钱兑换 🟠
300. Longest Increasing Subsequence 300. 最长递增子序列 🟠
354. Russian Doll Envelopes 354. 俄罗斯套娃信封问题 🔴
72. Edit Distance 72. 编辑距离 🔴
53. Maximum Subarray 53. 最大子数组和 🟠
1143. Longest Common Subsequence 1143. 最长公共子序列 🟠
583. Delete Operation for Two Strings 583. 两个字符串的删除操作 🟠
712. Minimum ASCII Delete Sum for Two Strings 712. 两个字符串的最小ASCII删除和 🟠
416. Partition Equal Subset Sum 416. 分割等和子集 🟠
518. Coin Change II 518. 零钱兑换 II 🟠

贪心算法

LeetCode 力扣 难度
55. Jump Game 55. 跳跃游戏 🟠
45. Jump Game II 45. 跳跃游戏 II 🟠
134. Gas Station 134. 加油站 🟠

分治算法

LeetCode 力扣 难度
23. Merge k Sorted Lists 23. 合并K个升序链表 🔴
241. Different Ways to Add Parentheses 241. 为运算表达式设计优先级 🟠

数学算法

LeetCode 力扣 难度
292. Nim Game 292. Nim 游戏 🟢
877. Stone Game 877. 石子游戏 🟠
319. Bulb Switcher 319. 灯泡开关 🟠
382. Linked List Random Node 382. 链表随机节点 🟠
398. Random Pick Index 398. 随机数索引 🟠
384. Shuffle an Array 384. 打乱数组 🟠
204. Count Primes 204. 计数质数 🟠
372. Super Pow 372. 超级次方 🟠

其他经典面试题

LeetCode 力扣 难度
42. Trapping Rain Water 42. 接雨水 🔴
11. Container With Most Water 11. 盛最多水的容器 🟠
263. Ugly Number 263. 丑数 🟢
264. Ugly Number II 264. 丑数 II 🟠
1201. Ugly Number III 1201. 丑数 III 🟠
313. Super Ugly Number 313. 超级丑数 🟠
528. Random Pick with Weight 528. 按权重随机选择 🟠
1. Two Sum 1. 两数之和 🟢
167. Two Sum II - Input Array Is Sorted 167. 两数之和 II - 输入有序数组 🟠
15. 3Sum 15. 三数之和 🟠
18. 4Sum 18. 四数之和 🟠

《Kafka 核心技术与实战》笔记

开篇词 为什么要学习 Kafka?

消息引擎系统 ABC

消息引擎系统的作用:

  • 消息引擎传输的对象是消息;
  • 如何传输消息属于消息引擎设计机制的一部分。

设计消息引擎系统的关键点:

  • 序列化 - 决定了在网络中传输数据的形式。
    • 代表:CSV、XML、JSON、Protocol Buffer、Thrift。
    • kafka 默认使用纯二进制的字节序列。
  • 传输模型:Kafka 同时支持以下两种模型
    • 点对点模型
    • 发布/订阅模型

消息引擎的作用:

  • 异步处理
  • 削峰填谷
  • 系统解耦
  • 系统间通信
  • 数据缓冲
  • 最终一致性

一篇文章带你快速搞定 Kafka 术语

Kafka 术语:

  • 消息 - Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
  • 主题 - Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 分区 - Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
  • 消息位移 - Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 副本 - Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  • 生产者 - Producer。向主题发布新消息的应用程序。
  • 消费者 - Consumer。从主题订阅新消息的应用程序。
  • 消费者位移 - Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
  • 消费者组 - Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 分区再均衡 - Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的三层消息架构:

  • 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
  • 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
  • 最后,客户端程序只能与分区的领导者副本进行交互。

Kafka 只是消息引擎系统吗?

Kafka 在设计之初就旨在提供三个方面的特性:

  • 提供一套 API 实现生产者和消费者;
  • 降低网络传输和磁盘存储开销;
  • 实现高伸缩性架构。

作为流处理平台,Kafka 与其他主流大数据流式计算框架相比,优势在哪里呢?

  • 更容易实现端到端的正确性(Correctness) - 因为所有的数据流转和计算都在 Kafka 内部完成,故 Kafka 可以实现端到端的精确一次处理语义。
  • Kafka 自己对于流式计算的定位 - 官网上明确标识 Kafka Streams 是一个用于搭建实时流处理的客户端库而非是一个完整的功能系统。

我应该选择哪种 Kafka?

  • Apache Kafka,也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。
  • Confluent Kafka,Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。
  • CDH/HDP Kafka,大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

聊聊 Kafka 的版本号

Kafka 有以下重大版本:

  • 0.7 - 只提供了最基础的消息队列功能
  • 0.8
    • 正式引入了副本机制
    • 至少升级到 0.8.2.2
  • 0.9
    • 增加了基础的安全认证 / 权限功能
    • 用 Java 重写了新版本消费者 API
    • 引入了 Kafka Connect 组件
    • 新版本 Producer API 在这个版本中算比较稳定
  • 0.10
    • 引入了 Kafka Streams,正式升级成分布式流处理平台
    • 至少升级到 0.10.2.2
    • 修复了一个可能导致 Producer 性能降低的 Bug
  • 0.11
    • 提供幂等性 Producer API 以及事务
    • 对 Kafka 消息格式做了重构
    • 至少升级到 0.11.0.3
  • 1.0 和 2.0 - Kafka Streams 的改进

Kafka 线上集群部署方案怎么做?

系统

在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。

磁盘

使用机械磁盘完全能够胜任 Kafka 线上环境。

磁盘容量

假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?

我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10%的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB * 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 * 3 = 2.25TB

总之在规划磁盘容量时你需要考虑下面这几个元素:

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

带宽

通常使用的都是普通的以太网络,带宽也主要有两种:1Gbps 的千兆网络和 10Gbps 的万兆网络。

假设你公司的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?

让我们来计算一下,由于带宽是 1Gbps,即每秒处理 1Gb 的数据,假设每台 Kafka 服务器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混部其他服务,毕竟真实环境中不建议这么做。通常情况下你只能假设 Kafka 会用到 70%的带宽资源,因为总要为其他应用或进程留一些资源。

根据实际使用经验,超过 70%的阈值就有网络丢包的可能性了,故 70%的设定是一个比较合理的值,也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。

稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少此值。

好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb 的数据,除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。

最最最重要的集群参数配置(上)

与存储信息相关的参数

  • log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
  • log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。还记得上一期我们关于 Kafka 是否需要使用 RAID 的讨论吗?这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。

与 ZooKeeper 相关的参数

zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。

如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

与 Broker 连接相关的参数

  • listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

关于 Topic 管理的参数

  • auto.create.topics.enable:是否允许自动创建 Topic。
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。
  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。

关于数据留存的参数

  • log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
  • message.max.bytes:控制 Broker 能够接收的最大消息大小。

最最最重要的集群参数配置(下)

Topic 级别参数

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。

JVM 参数

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

操作系统参数

  • 文件描述符限制 - 通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000
  • 文件系统类型 - 生产环境最好还是使用 XFS
  • Swappiness - 建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
  • 提交时间

生产者消息分区机制原理剖析

Kafka 的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

分区是实现负载均衡以及高吞吐量的关键。

所谓分区策略,就是决定生产者将消息发送到哪个分区的算法。Kafka 提供了默认的分区策略,同时也支持自定义分区策略。

生产者压缩算法面面观

压缩秉承了用时间去换空间的思想。具体来说,就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。

Kafka 压缩、解压流程:Producer 端压缩、Broker 端保持、Consumer 端解压缩

每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。

让 Broker 重新压缩消息的 2 种例外:Broker 端指定了和 Producer 端不同的压缩算法;Broker 发生了消息格式转换。

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。

对于 Kafka 而言,它们的性能测试结果却出奇得一致:

  • 在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
  • 在压缩比方面,zstd > LZ4 > GZIP > Snappy

无消息丢失配置怎么实现?

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

  • 生产阶段使用异步回调方式发送消息,业务侧做好对于发送失败的容错处理。
    • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
    • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  • 存储阶段需要保证写入数据同步副本,以及可靠的故障恢复。
    • 设置 acks = allacks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
    • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
    • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
    • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
    • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
  • 消费阶段确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

客户端都有哪些不常见但是很高级的功能?

拦截器基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。指定拦截器类时要指定它们的全限定名

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

Java 生产者是如何管理 TCP 连接的?

开发客户端时,能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。

对最新版本的 Kafka(2.1.0)而言,Java Producer 端管理 TCP 连接的方式是:

  1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
    • 不需要把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。
  2. TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时
    1. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
    2. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。
  3. Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数=-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。

幂等生产者和事务生产者是一回事吗?

消息可靠性保证有以下几种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

大部分 MQ 都支持 at least once,要实现 exactly once,需要消费方保证,通常是通过幂等性设计来实现。

Kafka 也提供了一些相关的功能:

幂等性 Producer 只能保证单分区上的幂等性,同时也只能实现单会话上的幂等性。

事务型 Producer 能够保证将消息原子性地写入到多个分区中,而且不惧进程的重启。

消费者组到底是什么?

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

Consumer Group 特性:

  • Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  • Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  • Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布/订阅模型。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

分区再均衡规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区

Rebalance 的触发条件:

  1. 组成员数发生变更。
  2. 订阅主题数发生变更。
  3. 订阅主题的分区数发生变更。

Rebalance 的问题:

  • 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  • Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • Rebalance 实在是太慢了。

最好的解决方案就是避免 Rebalance 的发生吧。

揭开神秘的“位移主题”面纱

consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。

老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。但是,ZooKeeper 其实并不适用于这种高频的写操作

新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,不能随意地向这个主题写消息。

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题

Kafka 使用** Compact 策略**来删除位移主题中的过期消息,避免该主题无限期膨胀。

消费者组重平衡能避免吗?

Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的

max.poll.interval.ms 参数值要大于下游最大处理时间。

Kafka 中位移提交那些事儿

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即** Consumer 需要为分配给它的每个分区提交各自的位移数据**。

位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。

CommitFailedException 异常怎么处理?

CommitFailedException,就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常

CommitFailedException 最常见的场景:当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。

多线程开发消费者实例

消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

方案对比:

Java 消费者是如何管理 TCP 连接的

和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的

TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。

  • 发起 FindCoordinator 请求时
  • 连接协调者时
  • 消费数据时

消费者程序会创建 3 类 TCP 连接:

  • 确定协调者和获取集群元数据
  • 连接协调者,令其执行组成员管理操作
  • 执行实际的消息获取

消费者组消费进度监控都怎么实现?

对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。所谓滞后程度,就是指消费者当前落后于生产者的程度

监控消费者组以及独立消费者程序消费进度的 3 种方法:

  1. 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本
  2. 使用 Kafka Java Consumer API 编程
  3. 使用 Kafka 自带的 JMX 监控指标

Kafka 副本机制详解

副本

副本机制好处:

  1. 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
  2. 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
  3. 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

所谓副本(Replica),本质就是一个只能追加写消息的提交日志

基于领导者的副本机制

在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。

In-sync Replicas(ISR)

追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。

Kafka 引入了 In-sync Replicas(ISR)机制来明确追随者副本到底在什么条件下才算与 Leader 同步。

ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本

Broker 端参数 replica.lag.time.max.ms 用于配置 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

Unclean 领导者选举(Unclean Leader Election)

因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本

请求是怎么被处理的?

Kafka 所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。

Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景

Kafka 采用了类 Reactor 架构

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。

当网络线程拿到请求后,将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。

Purgatory 是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

消费者组重平衡全流程解析

重平衡的 3 个触发条件:

  1. 组成员数量发生变化。
  2. 订阅主题数量发生变化。
  3. 订阅主题的分区数发生变化。

消费者端重平衡流程:

Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的**。

(1)选择群主

当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。

(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。

  • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2, 消费者 2 负责分区 3-4,消费者 3 负责分区 5。
  • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。

(4)群主分配完成之后,把分配情况发送给群组协调器。

(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

你一定不能错过的 Kafka 控制器

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。每台 Broker 都能充当控制器,第一个成功创建 /controller 节点的 Broker 会被指定为控制器

ZooKeeper 是一个提供高可靠性的分布式协调服务框架。ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。Kafka 控制器大量使用 Watch 功能实现对集群的协调管理。

下图展示了 Kafka 在 ZooKeeper 中创建的 znode 分布:

控制器的职责:

  • 主题管理(创建、删除、增加分区)
  • 分区重分配
  • Preferred 领导者选举
  • 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
  • 数据服务

控制器保存的数据:

控制器故障转移

故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。

关于高水位和 Leader Epoch 的讨论

水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。

水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。

高水位的作用

在 Kafka 中,高水位的作用主要有 2 个。

  • 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
    • 在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息。
    • 同一个副本对象,其高水位值不会大于 LEO 值
    • 分区的高水位就是其 Leader 副本的高水位
  • 帮助 Kafka 完成副本同步。

高水位更新机制

Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。

为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位

副本同步机制解析

首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。

当生产者给主题分区发送一条消息后,状态变更为:

此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。

Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:

这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:

在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值=1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。

Leader Epoch

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

主题管理知多少

Kafka 提供了自带的 kafka-topics 脚本,用于帮助用户创建主题

特殊主题:

  • consumer_offsets
  • transaction_state

Kafka 动态配置了解下?

怎么重设消费者组位移?

常见工具脚本大汇总

KafkaAdminClient:Kafka 的运维利器

Kafka 认证机制用哪家?

云环境下的授权该怎么做?

跨集群备份解决方案 MirrorMaker

你应该怎么监控 Kafka?

主流的 Kafka 监控框架

调优 Kafka,你做到了吗?

从 0 搭建基于 Kafka 的企业级实时日志流处理平台

Kafka Streams 与其他流处理平台的差异在哪里?

Kafka Streams DSL 开发实例

Kafka Streams 在金融领域的应用

参考资料

Kafka 面试

Kafka 简介

【简单】Kafka 是什么?

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储

img

【简单】Kafka 有哪些应用场景?

  • 消息队列:用作高吞吐量的消息系统,将消息从一个系统传递到另一个系统
  • 日志收集:集中收集日志数据,然后通过 Kafka 传递到实时监控系统或存储系统
  • 流计算:处理实时数据流,将数据传递给实时计算系统,如 Apache Storm 或 Apache Flink
  • 指标收集和监控:收集来自不同服务的监控指标,统一存储和处理
  • 事件溯源:记录事件发生的历史,以便稍后进行数据回溯或重新处理

【简单】Kafka 有哪些核心术语?

Kafka 的核心术语如下:

  • 消息 - Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
  • 主题 - Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 分区 - Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
  • 消息位移 - Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 副本 - Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  • 生产者 - Producer。向主题发布新消息的应用程序。
  • 消费者 - Consumer。从主题订阅新消息的应用程序。
  • 消费者位移 - Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
  • 消费者组 - Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 分区再均衡 - Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 存储

【中等】Kafka 是如何存储数据的?

::: tip 关键点

  • 逻辑存储:Topic -> Partition -> Record
  • 物理存储:Log(对应 Partition) -> LogSegment(<offset>.log、.<offset>.index<offset>.timeindex<offset>.txnindex

:::

Kafka 逻辑存储

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

Kafka 的三层消息架构:

  • 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
  • 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
  • 最后,客户端程序只能与分区的领导者副本进行交互。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。

Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。

Kafka 物理存储

Log 是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log 对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test,那么,Kafka 会在磁盘上创建两个子目录:test-0test-1;而在服务器端,这就对应两个 Log 对象。

因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。

Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除

Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

Segment 文件可以分为两类:

  • 索引文件
    • 偏移量索引文件( .index
    • 时间戳索引文件( .timeindex
    • 已终止事务的索引文件(.txnindex):如果没有使用 Kafka 事务,则不会创建该文件
  • 日志数据文件(.log

【困难】Kafka 如何检索数据?

  • 动态消费起点
    • 支持从任意有效偏移量开始消费
  • 稀疏索引设计
    • 索引文件(.index)存储 offset→position 映射
    • 采用间隔存储(可配置index.interval.bytes
    • 每个条目包含:
      • 消息偏移量(offset)
      • 物理位置(position)
  • 索引自愈能力
    • 索引无校验和,损坏后自动重建
    • 删除索引文件安全(Kafka 自动重新生成)
  • 文件对应关系
    • 每个日志分段(Segment)对应:
    • 数据文件(.log
    • 索引文件(.index
    • 按起始偏移量命名(如 00000000000000368769.index

下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。

【困难】Kafka 如何清理数据?

日志分段结构

  • 干净段:这部分消息之前已经被清理过,每个键只存在一个值。
  • 污浊段:在上一次清理后写入的新消息。

img

如果 Kafka 启用了清理功能(通过 log.cleaner.enabled 配置),每个 Broker 启动清理管理线程 + N 个清理线程(按分区分配)

对于一个段,清理前后的效果如下:

img

Apache Kafka 清理数据主要通过 日志保留策略(Log Retention)压缩策略(Compaction) 实现,以下是核心要点概括:

基于时间的清理

  • 配置参数log.retention.hours(默认 168 小时/7 天)、log.retention.minuteslog.retention.ms
  • 机制:删除超过指定时间的旧日志段(log segments)。
  • 触发条件:由 broker 后台线程定期扫描(默认 5 分钟检查一次,通过log.retention.check.interval.ms调整)。

基于大小的清理

  • 配置参数log.retention.bytes(整个分区的最大字节数)、log.segment.bytes(单个日志段大小,默认 1GB)。
  • 机制:当分区总大小超过限制时,删除最旧的日志段。

日志压缩

  • 适用场景:保留每个 key 的最新值(适用于 key-value 数据,如数据库变更日志)。
  • 配置参数
    • cleanup.policy=compact(启用压缩)。
    • min.cleanable.dirty.ratio(控制压缩触发时机,默认 0.5)。
  • 机制
    1. 保留每个 key 的最后一条有效记录,删除旧版本。
    2. 周期性合并日志段(由log.cleaner线程执行)。

手动清理

  • 删除 Topickafka-topics.sh --delete --topic <topic_name>(需配置delete.topic.enable=true)。
  • 删除数据文件:直接删除日志目录(log.dirs)中的分区文件(需谨慎,可能导致数据不一致)。

关键注意事项

  • 清理延迟:实际清理可能因检查间隔或资源竞争延迟。
  • 磁盘空间监控:依赖清理可能不足,需监控磁盘使用率。
  • 压缩与保留策略冲突:若同时设置cleanup.policy=compact,delete,压缩优先于时间/大小删除。
  • 消费者偏移量影响:删除旧数据可能导致消费者无法回溯(需调整offsets.retention.minutes)。

生产者和消费者

【中等】Kafka 发送消息的工作流程是怎样的?

::: tip 关键点

  1. 序列化
  2. 选择分区
  3. 暂存缓冲区
  4. 批次传输

:::

Kafka 生产者用一个 ProducerRecord 对象来抽象一条要发送的消息, ProducerRecord 对象需要包含目标主题和要发送的内容,还可以指定键或分区。其发送消息流程如下:

(1)序列化 - 生产者要先把键和值序列化成字节数组,这样它们才能够在网络中传输。

(2)分区 - 数据被传给分区器。如果在 ProducerRecord 中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。

(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。

  • 批次,就是一组消息,这些消息属于同一个主题和分区
  • 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。

(4)响应 - 服务器收到消息会返回一个响应。

  • 如果成功,则返回一个 RecordMetaData 对象,它包含了主题、分区、偏移量;
  • 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。

img

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?

  • 生产者会向任意 broker 发送一个元数据请求(MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。
  • 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

img

【简单】Kafka 为什么要支持消费者群组?

::: tip 关键点

  • 消费者群组,以组为维度订阅 Topic,并分摊分区,以均衡负载。
  • 一个分区只能分配给消费者群组中的一个实例。
  • 消费者数量发生变化,或主题分区数发生变化时,会触发分区再均衡。

:::

消费者

每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。

一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:

img

消费者群组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。

Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer

如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费

不同消费者群组之间互不影响

【中等】如何消费 Kafka 消息?

::: tip 关键点

  • 消费者群组订阅 Topic
  • 消费者轮批次拉取消息
  • 处理完消息后,提交偏移量(Offset)

:::

Kafka 消费者通过 pull 模式来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:

  • 消费者通过 customer.poll(time) 中设置等待时间
  • Broker 会等待累计一定量数据,然后发送给消费者。这样可以减少网络开销。

pull 除了获取消息外,还有其他作用:

  • 发送心跳信息。消费者通过向被指派为群组协调器的 Broker 发送心跳来维护他和群组的从属关系,当机器宕掉后,群组协调器触发再均衡。

集群

【中等】什么是分区?为什么要分区?

::: tip 关键点

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

:::

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

img

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。

为什么 Kafka 的数据结构采用三级结构?

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。

【中等】Kafka 的分区策略是怎样的?

::: tip 关键点

发送消息,未指定 key 时,选择分区采用轮询方式;指定 key 时,选择分区采用哈希方式,固定发往同一分区

:::

所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。

Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。

如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。

  • 没有 key 时的分发逻辑:每隔 topic.metadata.refresh.interval.ms 的时间,轮询选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。
  • 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错

【中等】如何自定义分区策略?

如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?

首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partitionclose,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

1
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的 topickeykeyBytesvaluevalueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。

接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。

负载均衡算法常见的有:

  • 随机算法
  • 轮询算法
  • 最小活跃数算法
  • 源地址哈希算法

可以根据实际需要去实现。

【困难】Kafka 如何实现分区再均衡?

什么是分区再均衡

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)Rebalance 实现了消费者群组的高可用性和伸缩性

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

当在群组里面新增/移除消费者或者新增/移除 kafka 集群 broker 节点时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。

何时生分区再均衡

分区再均衡的触发时机有三种:

  • 消费者群组成员数发生变更。比如有新的 Consumer 加入群组或者离开群组,或者是有 Consumer 实例崩溃被“踢出”群组。
    • 新增消费者。consumer 订阅主题之后,第一次执行 poll 方法
    • 移除消费者。执行 consumer.close() 操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
    • 新增 broker。如重启 broker 节点
    • 移除 broker。如 kill 掉 broker 节点。

分区再均衡的过程

Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的

(1)选择群主

当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。

(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。

  • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2, 消费者 2 负责分区 3-4,消费者 3 负责分区 5。
  • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。

(4)群主分配完成之后,把分配情况发送给群组协调器。

(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

如何判定消费者已经死亡

消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。

当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。

查找协调者

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

  2. 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

【困难】分区再均衡存在什么问题?如何避免分区再均衡?

分区再均衡的问题

  • 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  • 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • 最后,Rebalance 实在是太慢了。

避免分区再均衡

通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。

分区再均衡发生的时机有三个:

  • 消费群组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:消费群组成员数量发生变化。

有两种情况,消费者并没有宕机,但也被视为消亡:

  • 未及时发送心跳
  • Consumer 消费时间过长
未及时发送心跳

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要合理设置会话超时时间。这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

Consumer 消费时间过长

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,**max.poll.interval.ms** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

GC 参数

如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。

【困难】Kafka 中的分区分配策略有哪些?如何选择合适的策略?

Kafka 通过分区策略平衡吞吐量、延迟与稳定性。

Kafka 分区分配策略

  • Range(范围)

    • 原理:按分区范围顺序分配给消费者。
    • 适用场景:分区数与消费者数接近时。
    • 优势:实现简单,避免单消费者过载。
    • 劣势:分区/消费者数差异大时易负载不均。
  • RoundRobin(轮询)

    • 原理:均匀轮询分配分区。
    • 适用场景:分区和消费者数量均较大的场景。
    • 优势:负载均衡效果佳。
    • 劣势:消费者动态变化时易触发频繁重平衡,增加延迟。
  • Sticky(粘性)

    • 原理:优先保留原有分配,减少变动。
    • 适用场景:消费者频繁动态调整(如扩缩容)。
    • 优势:降低重平衡开销,提升稳定性。
    • 劣势:实现复杂,需调参优化。

策略选择关键考量因素

  • 集群规模:分区与消费者数量比例(接近选 Range,悬殊选 RoundRobin)。
  • 动态性需求:消费者频繁变动时,Sticky 策略更优。
  • 性能要求:低延迟优先:Sticky 减少重平衡;高吞吐优先:RoundRobin 均衡负载。

【困难】在 Kafka 中,如何优化分区的读写性能?有哪些常见的调优策略?

在 Kafka 中,优化分区的读写性能主要可以通过以下几种常见的调优策略实现:

  1. 合理设置分区数(partitions):根据生产者和消费者的能力,以及集群的规模,设置合适的分区数可以在提高写入和读取性能方面产生显著效果。
  2. 增加副本数(replication factor):副本数的增加可以提升数据的可靠性和读取性能,不过需要在性能和数据冗余之间找到平衡点。
  3. 调整 broker 配置参数:通过调优 Kafka broker 的相关配置,如调整 log.retention.hourslog.segment.byteslog.flush.interval.messages 等参数,可以显著提升读写性能。
  4. 调优生产者和消费者的配置:例如调整生产者的批量发送大小(batch.size)、压缩类型(compression.type)、消费者的最大拉取记录数(max.poll.records)等。
  5. 硬件配置优化:选择高 IOPS 的磁盘、足够的内存和计算资源来支撑 Kafka 的高并发读写请求。
  6. 分区和副本分布优化:确保不同主题的分区和副本分布在不同的 broker 上,以避免潜在的读写瓶颈。

【中等】Kafka 如何管理副本?

副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。

副本机制有哪些好处?

  1. 提供可用性:有句俗语叫:鸡蛋不要放在一个篮子里。副本机制也是一个道理——当部分节点宕机时,系统仍然可以依靠其他正常运转的节点,从整体上对外继续提供服务。
  2. 提供伸缩性:通过增加、减少机器可以控制系统整体的吞吐量。
  3. 改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

但是,Kafka 只实现了第一个好处,原因后面会阐述。

  • 每个 Partition 都有一个 Leader,零个或多个 Follower。
  • Leader 处理一切对 Partition (分区)的读写请求;而 Follower 只需被动的同步 Leader 上的数据。
  • 同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份。

Kafka 副本角色

Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志

Kafka 副本有两种角色:

  • Leader 副本(主):每个 Partition 都有且仅有一个 Leader 副本。为了保证数据一致性,Leader 处理一切对 Partition (分区)的读写请求
  • Follower 副本(从):Leader 副本以外的副本都是 Follower 副本。Follower 唯一的任务就是从 Leader 那里复制消息,保持与 Leader 一致的状态
  • 如果 Leader 宕机,其中一个 Follower 会被选举为新的 Leader。

为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。

Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。

除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。

ISR

ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。

判断 Follower 是否与 Leader 同步的标准

Kafka Broker 端参数 replica.lag.time.max.ms 参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。

Unclean 领导者选举

因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举

开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

高可用

【困难】Kafka 的高可用性是如何实现的?当 Broker 宕机时,如何保证服务不受影响?

  • 数据冗余:多副本存储,防止单点数据丢失。
  • 自动容灾:Leader 自动切换 + 分区再均衡,减少人工干预。
  • 灵活一致性:通过 ACK 和 ISR 机制适配不同业务场景(如高吞吐或强一致性)。

核心机制

  • 多副本机制

    • 每个分区(Partition)有多个副本,分布在不同的 Broker 上,确保数据冗余。
    • 副本分为 Leader(处理读写请求)和 Follower(同步数据)。
  • 主从架构

    • 生产者和消费者仅与 Leader 副本交互。
    • 当 Leader 宕机时,从 Follower 副本中选举新 Leader,保证服务连续性。
  • ZooKeeper 协调

    • 管理集群元数据(如 Broker 状态、分区 Leader 信息)。
    • 检测 Broker 故障并触发 Leader 选举。

故障恢复流程

  • 故障检测:ZooKeeper 发现 Broker 宕机。
  • Leader 选举:从 ISR(同步副本集)中选出新 Leader。
  • 分区再均衡:将宕机 Broker 的分区重新分配到其他可用 Broker。

支撑技术

  • ISR(In-Sync Replicas):仅与 Leader 保持同步的副本可参与 Leader 选举,确保数据一致性。
  • ACK 确认机制:生产者可配置不同级别的确认(如 01all),平衡吞吐量与数据可靠性。
  • 控制器(Controller):集群中一个 Broker 担任控制器,负责分区 Leader 选举和状态管理。控制器故障时,ZooKeeper 重新选举新控制器。
  • 惰性故障检测:避免短暂故障导致的频繁 Leader 切换,通过延迟判断减少集群波动。

【困难】Kafka 的优先副本选举机制是如何工作的?如何配置它?

优先副本选举是 Kafka 维持集群健康的核心机制,通过自动/手动结合的方式,确保 Leader 分布合理,兼顾负载均衡与数据可靠性。

优先副本(Preferred Replica) 是分区初始分配时的第一个副本(如分区P0的副本分配为[Broker1, Broker2, Broker3],则Broker1是优先副本)。它的作用是通过选举优先副本为 Leader,实现负载均衡,避免部分 Broker 长期承担过多 Leader 角色。

自动配置方法

server.properties 中配置以下参数:

1
2
3
4
# 启用自动优先副本选举
auto.leader.rebalance.enable=true
# 检查 Leader 不均衡的频率(默认 300 秒)
leader.imbalance.check.interval.seconds=300

生效条件:需重启 Kafka 集群。

手动触发命令

通过脚本强制触发选举:

1
bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

适用场景:紧急负载均衡或维护后恢复预期状态。

核心价值

优势 说明
负载均衡 分散 Leader 压力,避免单节点过热。
高可用性 优先副本通常数据最新,故障切换时恢复更快。
性能优化 均衡的 Leader 分布可提升整体吞吐量。

【困难】在 Kafka 中,如何实现多集群的数据同步?跨集群复制的实现原理是什么?

MirrorMaker 核心功能

  • 作用:实现 Kafka 集群间的跨集群数据复制(源集群 → 目标集群)。
  • 原理:基于消费者-生产者模型
    • Consumer:从源集群拉取数据。
    • Producer:向目标集群推送数据。
  • 版本差异
    • MirrorMaker 1.0:基础复制功能,配置复杂。
    • MirrorMaker 2.0(Kafka 2.4+):支持多租户、动态拓扑、偏移量同步等高级特性。

快速部署步骤

  1. 准备集群:确保源集群和目标集群正常运行。
  2. 配置 MirrorMaker
  • **consumer.config**:指定源集群地址(bootstrap.servers)、消费者组(group.id)。
  • **producer.config**:指定目标集群地址(bootstrap.servers)。
  • **whitelist**:定义需复制的 Topic(支持正则,如 .* 表示全部)。
  1. 启动命令
    1
    2
    3
    4
    bin/kafka-mirror-maker.sh \
    --consumer.config consumer.config \
    --producer.config producer.config \
    --whitelist "your_topic"

关键特性与注意事项

特性 说明
数据一致性 保证消息顺序,但存在延迟(依赖网络带宽和集群负载)。
容错性 消费者组自动提交偏移量,故障恢复后可继续复制。
性能瓶颈 单线程设计(1.0 版本),高吞吐场景需横向扩展 MirrorMaker 实例。
监控指标 关注 consumer-lagproducer-throughput 等指标。

高级配置与优化

  • MirrorMaker 2.0 优势
    • 支持双向同步(Active-Active 架构)。
    • 自动同步 Topic 配置(如分区数、ACL)。
  • 调优建议
    • 增加 num.streams 参数提升并发度(1.0 版本)。
    • 使用 --abort.on.send.failure true 确保生产失败时快速终止。

替代工具对比

工具 适用场景 特点
Confluent Replicator 企业级需求(如 Schema 同步、监控集成)。 商业工具,功能全面,支持复杂拓扑。
uReplicator 高可用、低延迟场景(如 LinkedIn 生产)。 开源,支持 Controller 层优化,减少延迟。

常见问题解决

  • 数据延迟高
    • 检查网络带宽,增加 MirrorMaker 实例数。
    • 调整 fetch.min.bytes(消费者)和 linger.ms(生产者)。
  • Topic 配置不同步
    • MirrorMaker 2.0 可自动同步,1.0 需手动创建目标 Topic。

总结

  • 基础场景:MirrorMaker 1.0 适合简单单向同步。
  • 复杂需求:优先选择 MirrorMaker 2.0 或 Confluent Replicator。
  • 核心原则:监控延迟、保障网络稳定性、合理规划拓扑。

【困难】ZooKeeper 在 Kafka 中的作用是什么?

ZooKeeper 在 Kafka 中扮演着核心的协调者角色,主要负责集群的元数据管理、Broker 协调和状态维护。

Zookeeper 仍是 Kafka 2.8 之前版本的”大脑”,承担关键协调职能。KRaft 模式将成为标准架构,2023 年后新版本将默认启用。

Zookeeper 的核心作用

功能 说明
管理 Broker 元数据 维护 Broker 注册信息(在线/离线状态);Broker 的 ID、主机名、端口等元数据;Topic/Partition 元数据
Controller 选举 通过临时节点(Ephemeral ZNode)选举集群唯一 Controller,负责分区 Leader 选举
故障恢复 监测节点故障并触发分区 Leader 重选举
消费者组 Offset 旧版本(≤0.8)将消费者 Offset 存储在 Zookeeper,新版本改用内部 主题 _consumer_offsets
配置中心 存储 Kafka 配置和拓扑信息

Zookeeper 的局限性

  • 性能瓶颈:高频元数据操作(如分区重平衡)可能导致 Zookeeper 成为性能瓶颈。
  • 运维复杂度:需单独维护 Zookeeper 集群,增加运维负担。
  • 扩展性差:Zookeeper 的写性能随节点数增加而下降。

去 Zookeeper 化

  • 目标:用 Kafka 自身机制替代 Zookeeper,简化架构。
  • 实现方案
    • Raft 协议:通过内置的 Raft 共识算法管理元数据(类似 ZooKeeper 的 ZAB)。
    • 内部 Topic:将元数据存储在 Kafka 的 __cluster_metadata Topic 中,利用副本机制保证高可用。
  • 优势
    • 减少外部依赖,降低运维成本。
    • 提升元数据操作的吞吐量和延迟。

运维建议

  • Zookeeper 集群配置
    • 至少部署 3/5 个节点(容忍 1/2 个节点故障)。
    • 隔离 Zookeeper 与 Kafka 的磁盘 I/O,避免资源竞争。
  • 监控指标
    • Zookeeper 的 znode 数量、延迟(avg_latency)、活跃连接数。
    • Kafka Controller 的存活状态及切换频率。

总结

  • 现状:Zookeeper 仍是 Kafka 的核心依赖(3.x 版本),负责集群元数据管理。
  • 未来:KIP-500 将逐步移除 Zookeeper,采用自管理的 Raft 元数据服务。
  • 关键措施
    • 保障 Zookeeper 集群的高可用(奇数节点+分散部署)。
    • 关注 Kafka 新版本演进,规划架构升级。

【困难】Kafka 的 Controller Failover 是如何设计的?在 Controller 宕机时如何进行故障恢复?

Kafka 的 Controller 是集群中负责管理各种元数据(如主题创建、分区分配、副本分配等)以及协调领导者选举的关键组件。Controller Failover 是 Kafka 保证高可用性的重要机制。具体来讲,当 Controller 宕机时,Kafka 会通过 Zookeeper 选举出一个新的 Controller,以确保集群可以继续正常运行。

以下是 Kafka Controller Failover 的主要设计和流程:

  1. Zookeeper 作为协调者:每个 Kafka Broker 启动时都会尝试在 Zookeeper 中创建一个特殊的节点(/controller)。因为这个节点使用的是 Ephemeral(临时)节点类型,当创建该节点的 Broker 宕机时,这个节点会自动删除。
  2. 竞争成为 Controller:一旦当前的 Controller 宕机,所有活着的 Broker 都会尝试在 Zookeeper 中创建 /controller 节点。第一个成功创建这个节点的 Broker 会成为新的 Controller,剩下的则会收到失败通知。
  3. 通知机制:新的 Controller 会在 Zookeeper 中写入它的选举结果,并通过监听机制通知所有 Broker。这些 Broker 会更新它们本地的 Controller 缓存,从而指向新的 Controller。
  4. 恢复任务:新当选的 Controller 需要快速完成集群状态的接管,包括重新分配分区副本、添加主题、调整副本同步等等。这些操作通过监听 Zookeeper 节点和操作 Kafka 内部 Topic(如__consumer_offsets)完成。

【困难】Kafka 中的 Controller 是什么角色?它在集群中的作用是什么?

Kafka 中的 Controller 是整个集群的协调者,它是专门负责监控和管理 Kafka 集群中分区(partition)和副本(replica)状态的节点。在整个 Kafka 集群中,Controller 的角色是至关重要的,它帮助集群维持稳定,确保分区和副本的可用性和一致性。

Controller 在集群中的主要作用包括:

  • Leader 选举:确定哪个副本成为分区的 Leader 来处理读写请求。
  • 副本管理:监控和管理副本的状态,确保同步副本集(ISR)的健康状态。
  • 分区迁移:如果某个 broker 出现故障,Controller 负责重新分配其上的分区到其他可用 Broker 上。
  • Topic 创建和删除:管理 Topic 的创建和删除操作,并广播这些信息到集群中的所有 Broker。

为进一步了解 Kafka 中 Controller 的重要性,可阅读以下扩展点:

  • Controller 的选举机制:Kafka 使用 ZooKeeper 来管理 Controller 的选举过程。当 Kafka 集群启动时,第一个向 ZooKeeper 注册的 Broker 被选为 Controller。如果当前 Controller 挂掉,其他 Broker 会竞选成为新的 Controller。
  • 高可用性与容错性:Controller 的设计是为了保证高可用性和容错性。即使当前 Controller 挂了,新选出的 Controller 也会迅速接管,一般不会导致集群不可用。Controller 的状态信息会存储在 ZooKeeper 中,从而保证即使在切换过程中数据也不会丢失。
  • 与 ISR 的关系:Controller 定期与 ISR 中的所有副本保持联系,确保这些副本数据是同步的。如果某个副本落后太多,Controller 会将其从 ISR 中移除,以保证数据的一致性。
  • Controller 负载与性能:虽然 Controller 承担了大量的管理任务,但其负载相对来说还是较小的,瓶颈更多出现在 Kafka 的数据传输和处理流程中。
  • ZooKeeper 对 Kafka 的作用:虽然 Kafka 在未来版本中可能会移除 ZooKeeper 的依赖(计划中的 Kafka Raft),目前仍然依赖 ZooKeeper 来维护集群的元数据和进行 Controller 的选举和管理。

【中等】Kafka 如何保证消息的持久性和高可用性?

消息持久性:Kafka 使用磁盘进行消息存储,确保即使在系统故障的情况下,消息也不会丢失。具体措施包括:

  • 分区:Kafka 将每个主题分成多个分区,每个分区是有序且持久的日志。分区方便了数据的存储和读取。
  • 日志分段和索引:每个分区被分段为多个日志段,分段之后的日志文件会以可配置的方式进行轮转。Kafka 还会为每个消息生成索引,以快速定位消息。
  • 文件系统的强制刷新:Kafka 使用页缓存来提高磁盘 I/O 性能,并定期调用 fsync 系统调用,将数据从页缓存刷新到磁盘,确保数据持久化。

高可用性:Kafka 通过复制机制和分布式架构来实现高可用性,具体包括:

  • 副本(Replica):每个分区有一个主副本(Leader)和若干个从副本(Follower)。主副本处理读写请求并将数据同步到从副本,从副本在主副本失败时能顶上处理。
  • ISR(In-Sync Replica):Kafka 维护一个同步副本集合,只有在 ISR 中的副本才被认为是健康的,从而保证了高可用性。
  • ACK 机制:在生产者发送消息时,可以配置不同的确认级别(acks),例如 acks=all 则需要等待所有 ISR 中的副本确认收到消息,进一步提高可靠性。

支撑技术

  • ZooKeeper:Kafka 通过 ZooKeeper 来管理集群的元数据和协调节点之间的工作。比如,分区 Leader 或者 Follower 的选举等操作都是靠 ZooKeeper 来完成的。这样即便某个 Kafka Broker 挂掉了,ZooKeeper 也能迅速协调恢复。
  • 高效的存储格式:Kafka 的数据存储采用了顺序写入的方式,而非像传统数据库那样频繁的读写操作跳跃性强。顺序写入拥有很高的磁盘写入速度,极大地提升了 Kafka 的性能。
  • Segment 和 Index 文件:Kafka 对每个 Partition 生成多个 Segment 文件和索引文件。Segment 文件是实际存储消息的,而索引文件则是维护消息偏移量和物理位置对照表。这样一来,即使是非常大的数据量,Kafka 也能高效地搜索和读取消息。
  • Min ISR 机制:配置 min.insync.replicas 参数可以设定 ISR 阈值,当 ISR 数量低于这个阈值时,Kafka 会拒绝消息写入请求,以确保数据的足够冗余。

【中等】Kafka 中的 ISR(In-Sync Replica)是什么?

ISR 定义

  • 由与 Leader 保持同步的副本组成
  • 包含数据最新或仅轻微滞后的副本

核心价值:通过动态 ISR 管理实现可靠性与性能的平衡

消息同步流程

  • Producer 发送消息至 Leader 副本
  • Leader 将消息复制给所有 ISR 副本
  • ISR 全部确认后,Leader 返回 ACK

关键机制

  • Leader 选举:仅从 ISR 中选出新 Leader
  • 副本管理
    • 滞后副本会被移出 ISR
    • 恢复同步后重新加入

配置参数

参数 作用 典型值
acks 确认副本数 all
min.insync.replicas 最小同步副本数 2
replica.lag.time.max.ms 最大滞后时间 10000

设计权衡

  • 可靠性acks=all确保数据安全
  • 性能:ISR 副本数越多,写入延迟越高

可靠传输

【中等】在 Kafka 中,如何通过 Acks 配置提高数据可靠性?Acks 的值如何影响性能?

选择原则:根据业务对数据丢失的容忍度进行权衡配置。

参数选项

配置值 可靠性 性能 适用场景
0 最低 最高 实时监控/日志收集
1 中等 中等 普通业务场景
all/-1 最高 最低 金融交易/关键数据

优化建议

  • 可靠性优先

    • 设置acks=all
    • 配合min.insync.replicas=2
    • 禁用unclean.leader.election.enable=false
  • 性能优先

    • 选择acks=01
    • 适当降低replication.factor(如 2)

注意事项

  • 副本数replication.factor建议≥3
  • acks值会增加网络和存储压力
  • 新版 Kafka 优化了高可靠性配置的性能表现

【困难】如何保证 Kafka 消息不丢失?

如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。

一条消息从生产到消费,可以划分三个阶段:

  • 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
  • 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
  • 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。

这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。通过 ACK+副本+幂等+手动提交 Offset 的组合策略,可系统性解决消息丢失问题。根据业务对可靠性和性能的需求调整配置。

  • 生产者端
    • ACK 机制:设置 acks=all,确保所有副本持久化后才确认发送成功。
    • 幂等生产者:启用 enable.idempotence=true,避免网络重试导致消息重复或丢失。
    • 事务支持:跨分区的原子性写入(producer.initTransactions())。
  • Broker 端
    • 多副本机制:设置 replication.factor≥3,保证高可用。
    • 最小同步副本:配置 min.insync.replicas≥2,防止单点故障导致数据丢失。
  • 消费者端
    • 手动提交 Offset:关闭 enable.auto.commit=false,处理完消息后手动提交偏移量。
    • 持久化 Offset:将 Offset 存储到 Kafka(而非 Zookeeper),避免分区再均衡时丢失。

关键配置:

1
2
3
4
5
6
7
8
# 生产者
acks=all
enable.idempotence=true
# Broker
replication.factor=3
min.insync.replicas=2
# 消费者
enable.auto.commit=false

存储阶段不丢消息

存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

上面的话可以解读为:

  • 已提交只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
  • 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
  • 只要还有一个副本是存活的,那么已提交的消息就不会丢失
  • 消费者只能读取已提交的消息

Kafka 的副本机制是 kafka 可靠性保证的核心

Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。

Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

  • 副本数 - replication.factor 的作用是设置每个分区的副本数replication.factor 是主题级别配置; default.replication.factor 是 broker 级别配置。副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
  • 不完全的选主 - unclean.leader.election.enable 用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable 是 broker 级别(实际上是集群范围内)配置,默认值为 true。
    • 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险
    • 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。
  • 最少同步副本 - min.insync.replicas 控制的是消息至少要被写入到多少个副本才算是“已提交”min.insync.replicas 是主题级别和 broker 级别配置。尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
    • 如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
    • 注意:要确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

生产阶段不丢消息

在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。

Kafka 有三种发送方式:同步、异步、异步回调。同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。

解决生产者丢失消息的方案:

生产者使用异步回调方式 producer.send(msg, callback) 发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

  • 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
  • 如果是消息不合格造成的,那么可以调整消息格式后再次发送。

然后,需要基于以下几点来保证 Kafka 生产者的可靠性:

  • ACK - 生产者可选的确认模式有三种:acks=0acks=1acks=all
    • acks=0acks=1 都有丢失数据的风险。
    • acks=all 意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas ,就可以决定在得到确认响应前,至少有多少副本能够收到消息。这是最保险的做法,但也会降低吞吐量。
  • 重试 - 如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
    • 可重试错误,如:LEADER_NOT_AVAILABLE,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。
    • 不可重试错误,如:INVALID_CONFIG,即使重试,也无法改变配置选项,重试没有意义。
  • 错误处理 - 开发者需要自行处理的错误:
    • 不可重试的 broker 错误,如消息大小错误、认证错误等;
    • 消息发送前发生的错误,如序列化错误;
    • 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。

消费阶段不丢消息

前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。

消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。

img

消费者的可靠性配置:

  • group.id - 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
  • auto.offset.reset - 有两个选项:
    • earliest - 消费者会从分区的开始位置读取数据
    • latest - 消费者会从分区末尾位置读取数据
  • enable.auto.commit - 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。
  • auto.commit.interval.ms - 自动提交的频率,默认为每 5 秒提交一次。

如果 enable.auto.commit 设为 true,即自动提交,就无需考虑提交偏移量的问题。

如果选择显示提交偏移量,需要考虑以下问题:

  • 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
  • 提交频率是性能和重复消息数之间的权衡
  • 分区再均衡
  • 消费可能需要重试机制
  • 超时处理
  • 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
  • 幂等性设计
    • 写数据库:根据主键判断记录是否存在
    • 写 Redis:set 操作天然具有幂等性
    • 复杂的逻辑处理,则可以在消息中加入全局 ID

【困难】如何保证 Kafka 消息不重复?

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

常用的实现幂等操作的方法:

  • 消费者幂等处理
    • 存储已处理消息 ID(如 offset/业务 ID)到 DB/Redis,处理前校验去重。
    • _优点_:实现简单;_缺点_:依赖外部存储性能。
  • Kafka 幂等性与事务
    • 生产者:启用enable.idempotence=true,避免网络重试导致重复。
    • 消费者:配合事务提交 Offset,确保”精确一次”处理。
    • _要求_:需 Kafka 0.11+版本支持。
  • 业务逻辑去重
    • 设计天然幂等操作(如订单状态更新:”SET status=paid”)。
    • _优势_:高性能;_挑战_:需深度理解业务。

【困难】如何保证 Kafka 消息有序?

对消息有序有要求的场景

场景 顺序性要求示例
金融交易 转账指令必须按 开户→存款→转账 顺序执行
日志聚合 错误日志需按时间顺序排列:启动→运行→异常→终止
库存管理 操作顺序必须为 入库→出库→盘点,否则库存数据不一致
流媒体 视频帧需按 I 帧→P 帧→B 帧 顺序传输,否则解码失败

Kafka 提供了有限度的顺序性保证,具体来说:

  • 在同一个分区内,消息是有序的。
  • 靠消息键将相关消息分配到同一分区,可以保证这些消息在同一分区内依然有序。

如何保证消息的严格顺序性

  • 分区:确保生产者将同一类型的消息发送到特定分区。Kafka 保证一个分区内的消息是按顺序存储和消费的。
  • 消息键:使用消息键(Key)来控制消息的分区。相同的 Key 总是被路由到同一个分区,从而保证了具有相同 Key 的消息顺序。
  • 单生产者线程:确保生产者是单线程的或使用有序的发送机制,这样就不会因多线程的并发发送而打乱顺序。
  • 生产者中的分区器:Kafka 的自定义分区器可以确保相同 Key 的消息始终发送到同一个分区。

高并发场景下如何优化顺序消费

  • 并行处理:在消费端,可以通过拆分步骤来并行处理部分无顺序依赖的逻辑,从而提高整体吞吐量。
  • 异步处理:利用异步处理机制处理消息,但需要确保消息的核心逻辑是顺序执行的,从而保证顺序。
  • 多线程消费:在不同消费组中根据分区并行消费,但仍需每个分区内的消费线程按照顺序处理消息。

关键机制

  • 分区机制:在 Kafka 中,每个 Topic 都可以配置为多个分区,每个分区都是一个有序的、不可变的消息日志。生产者在发送消息时,可以指定消息的键(Key),Kafka 根据这个键来进行哈希运算,将消息写入相应的分区。同一键的消息总会被写入到同一个分区,这样就保证了同一键的消息在同一个分区内是有序的。
  • 消息键和分区策略:当生产者发送消息时,可以通过配置分区策略(Partitioner)决定消息去哪个分区。默认的分区策略是基于消息键的哈希值,比如 hash(key) mod partitionNum 。通过这种策略,可以确保相同键的消息被发送到同一个分区,从而保证它们的顺序性。
  • 消费端的顺序保证:消费者在消费消息时,同一个消费者线程只能同时消费一个分区的消息,这样可以保证消费端在处理某个分区内的消息时是按顺序的。如果 Kafka 集群中没有足够的消费者线程,某个消费者线程可能需要同时消费多个分区的消息,但这些分区之间的顺序是无法保证的。
  • 顺序性在高可用环境下的挑战:当 Kafka 分区的 Leader 发生切换时,可能会有短时间的数据不一致。如果处理不当,可能会影响顺序性保证。Kafka 通过保持分区副本(Replica)的一致性,并在重新选举 Leader 时确保新 Leader 从最新的数据点开始处理,尽量减少顺序性的损失。

最佳实践

  • 生产者优化
    • 批量发送:在保证顺序的前提下,尽量使用批量发送来提高吞吐量。
    • 幂等性(Idempotence):Kafka 生产者支持幂等性,确保消息不会因为重试而导致重复。开启幂等性可以进一步保证消息顺序的一致性。
  • 消费者优化
    • 手工提交消费位移:可以选择在消费每一批消息后,手工提交消费位移,这样可以对某些消息进行重试处理,确保按序消费。
    • 事务性消费:使用 Kafka 的事务性支持,消费者可以确保一组消息要么全部处理成功,要么全部回滚,这在处理批量消息时保证顺序性非常有效。
    • 偏移量管理:合理管理和提交偏移量(Offset),确保在出现错误或重启时能继续保持顺序消费。
  • Kafka 配置调优
    • min.insync.replicas:确保最小同步副本数,以提高消息的可靠性和顺序性保障。
    • acks 设置:生产者的 acks 设置为 ‘all’(或 -1),确保所有副本已接收到消息再进行确认,保障消息顺序和持久性。

【困难】如何应对 Kafka 消息积压?

  • 紧急处理
    • 增加消费者实例(不超过分区数)
    • 调整参数:增大 max.poll.records
    • 选择性跳过:重置 offset(仅限非关键数据)
  • 性能优化
    • 采用异步处理:分离消息拉取和处理逻辑
    • 优先处理:确保关键业务消息优先消费
  • 监控预防
    • 实时监控 Lag 指标
    • 配置自动扩缩容机制
  • 极端情况处理
    • 拆分 Topic:分散积压消息
    • 离线处理:导出到 HDFS 批量消费

方案对比

方法 见效速度 影响 适用场景
增加消费者 立即 分区有余量时
调整参数 立即 可能内存压力 资源充足时
重置 offset 立即 数据丢失 非关键消息

处理原则

  • 先扩容消费者
  • 再优化消费逻辑
  • 确保核心业务
  • 建立预防机制

【困难】在 Kafka 中,如何实现幂等性 Producer?它对消息处理的意义是什么?

最佳实践:幂等性+事务+合理重试配置,构建高可靠消息系统

核心配置

1
2
3
4
Properties props = new Properties();
props.put("enable.idempotence", "true"); // 启用幂等性
props.put("acks", "all"); // 确保所有副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试

关键特性

特性 说明 优势
消息去重 自动过滤重复消息 避免数据重复
顺序保证 单分区内消息有序 维护数据一致性
自动重试 内置安全重试机制 提升可靠性

高级应用

  • 事务支持
1
2
props.put("transactional.id", "txn-1");
producer.initTransactions(); // 初始化事务
  • Exactly-Once 语义
    • 结合幂等性和事务
    • 确保端到端一次性处理

使用建议

  • 适用场景:金融交易、订单处理等关键业务
  • 性能影响:轻微吞吐量下降,换取数据可靠性
  • 版本要求:Kafka 0.11+

事务

【中等】Kafka 是否支持事务?如何支持事务?

Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败

消息可靠性保障,由低到高为:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。

Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息

Kafka 事务机制核心要点:

  • 事务管理器(Transaction Coordinator):协调事务生命周期(开始/提交/中止),跟踪生产者状态。
  • 生产者(Producer):将多条消息绑定为一个事务,提交后生效,失败则回滚。
  • 消费者(Consumer):仅读取已提交事务的消息,避免中间状态数据。
  • 事务日志(Transaction Log):持久化记录事务状态(进行中/已提交/已中止)。
  • 两阶段提交(2PC)
    • 阶段 1:生产者发送消息但不提交。
    • 阶段 2:事务管理器决定提交或中止,通知生产者执行。

总结:Kafka 事务通过协调器、2PC 和日志追踪实现原子消息组,适用于需严格一致的分布式场景。

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

事务属性实现前提是幂等性,即在配置事务属性 transaction.id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

在事务属性之前先引入了生产者幂等性,它的作用为:

  • 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败。
  • consumer-transform-producer 模式下,因为消费者提交偏移量出现问题,导致重复消费。需要将这个模式下消费者提交偏移量操作和生产者一系列生成消息的操作封装成一个原子操作。

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息 (o1 到 o2 之间的消息)。

Kafka 事务相关配置

使用 kafka 的事务 api 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行 consumer#commitSync 或者 consumer#commitAsyc
  • 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。
  • 和幂等性 Producer 一样,开启 enable.idempotence = true。如果配置了 transaction.id,则此时 enable.idempotence 会被设置为 true
  • 消费者需要配置事务隔离级别 isolation.level。在 consume-trnasform-produce 模式下使用事务时,必须设置为 READ_COMMITTED
    • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

【困难】Kafka 的事务机制与幂等性机制如何协同工作?它们在保证消息一致性上有什么作用?

Kafka 的事务机制与幂等性机制结合实现端到端的 Exactly Once,适用于强一致性要求的分布式系统。

核心功能

  • 事务机制:确保消息组的原子性(全部成功或全部失败),支持跨分区的一致性提交。
  • 幂等性机制:防止生产者重复发送导致消息重复(**Exactly Once **语义)。

关键实现

  • 事务流程(生产者端):

    1
    2
    3
    4
    producer.initTransactions();  // 初始化事务
    producer.beginTransaction(); // 开启事务
    producer.send(record); // 发送消息
    producer.commitTransaction();// 提交(或 abortTransaction() 回滚)
  • 幂等性实现

    • 每个生产者分配唯一** PID,消息附带递增序列号**。
    • Broker 通过 PID + 序列号去重,拒绝重复消息。

协同作用

  • 幂等性:解决单条消息重复问题(如网络重试)。
  • 事务:解决多条消息的原子提交问题(如分布式操作)。

典型应用场景

  • 金融交易:转账操作需保证扣款和入账同时成功或失败。
  • 日志处理:确保日志批次完整,且无重复记录。

故障容错:事务机制 + 幂等性 = 故障重试时仍保证数据一致,避免部分成功或重复消费。

【困难】Kafka 的 Exactly Once 语义在分布式系统中是如何实现的?如何处理分布式事务中的异常情况?

Kafka 通过幂等生产+事务+精准 offset 控制,在分布式环境下实现端到端 Exactly Once,适用于金融、计费等强一致性场景。

核心机制

  • 幂等生产者

    • 通过唯一Producer ID和消息序列号实现去重
    • 确保单条消息不重复(网络重试场景)
  • 事务生产者

    • 提供跨分区的原子操作(commitTransaction/abortTransaction
    • 保证一组消息全成功或全失败
  • 消费端去重

    • 基于offset管理 + 消费者组机制
    • 避免消息被重复处理

异常处理

方法 作用 场景示例
事务回滚 撤销未完成的操作,保持原子性 生产者写入部分分区失败时
自动重试 应对临时性故障(如网络抖动) Broker 短暂不可用
幂等消费 通过业务 ID 或状态记录避免重复处理 消费者重启后重复拉取消息

关键扩展

  • CAP 权衡:Kafka 优先保证高可用分区容错(AP),通过事务补充一致性
  • Kafka Streams:利用状态存储和检查点机制实现流处理 Exactly Once
  • 消费者组enable.auto.commit=false时需手动提交 offset 以精准控制消费

架构

【简单】Kafka 的基本架构包括哪些组件?各组件的作用是什么?

Kafka 通过组件分工+副本机制保障高可用,结合批量/压缩/手动提交等优化手段实现高性能。新版本推荐使用 KRaft 模式简化架构。

核心组件

组件 核心功能
Producer 发布数据到 Topic,支持轮询/Key 哈希/自定义分区策略
Consumer 通过消费组实现负载均衡,同一分区仅限单个消费者消费
Broker 存储管理消息,通过分区副本实现高可用,支持故障自动转移
Zookeeper 管理集群元数据、Leader 选举(注:新版本 Kafka 逐步用 KRaft 协议替代 Zookeeper)

工作机制优化

  • Producer 优化
    • 批量发送(linger.ms+batch.size
    • 压缩算法(Snappy/Gzip 降低带宽占用)
    • 异步发送(acks=1/all平衡性能与可靠性)
  • Consumer 优化
    • 动态分区分配(range/round-robin策略)
    • 手动提交 Offset(enable.auto.commit=false避免重复/丢失)
    • 并行消费(分区数≥消费者数,避免闲置)
  • Broker 优化
    • 副本机制(replication.factor≥2保障容错)
    • ISR 列表(同步副本快速选举新 Leader)
    • 磁盘顺序写(高吞吐设计,避免随机 IO)

关键配置建议

场景 推荐配置 说明
高吞吐场景 compression.type=snappy 压缩率与 CPU 开销平衡
数据持久化要求 log.retention.hours=168(7 天) 根据存储容量调整
低延迟场景 num.io.threads=8(默认值翻倍) 提升磁盘 IO 并行度

版本演进注意

  • KRaft 模式:Kafka 3.0+版本内置元数据管理,逐步淘汰 Zookeeper 依赖
  • 性能取舍:分区数并非越多越好(建议单 Broker≤2000 分区,避免元数据膨胀)

【简单】Kafka 的设计目标

  • 高性能
    • 分区、分段、索引:基于分区机制提供并发处理能力。分段、索引提升了数据读写的查询效率。
    • 顺序读写:使用顺序读写提升磁盘 IO 性能。
    • 零拷贝:利用零拷贝技术,提升网络 I/O 效率。
    • 页缓存:利用操作系统的 PageCache 来缓存数据(典型的利用空间换时间)
    • 批量读写:批量读写可以有效提升网络 I/O 效率。
    • 数据压缩:Kafka 支持数据压缩,可以有效提升网络 I/O 效率。
    • pull 模式:Kafka 架构基于 pull 模式,可以自主控制消费策略,提升传输效率。
  • 高可用
    • 持久化:Kafka 所有的消息都存储在磁盘,天然支持持久化。
    • 副本机制:Kafka 的 Broker 集群支持副本机制,可以通过冗余,来保证其整体的可用性。
    • 选举 Leader:Kafka 基于 ZooKeeper 支持选举 Leader,实现了故障转移能力。
  • 伸缩性
    • 分区:Kafka 的分区机制使得其具有良好的伸缩性。

【困难】Kafka 为什么性能高?

Kafka 的数据存储在磁盘上,为什么还能这么快?

说 Kafka 很快时,他们通常指的是 Kafka 高效移动大量数据的能力。Kafka 为了提高传输效率,做了很多精妙的设计。

  • 写:顺序追加 + 零拷贝 + 分段管理
    • 顺序追加:日志追加写入到磁盘(顺序 I/O),提高写入性能。
    • 零拷贝:sendfile 系统调用,数据直接从磁盘→网络,减少 CPU 拷贝开销。
    • 分段管理:
      • Topic→Partitions
      • Partition→有序 Log
      • Log→Segments
  • 读:页缓存 + 双索引
    • 页缓存:缓存热点数据,减少磁盘 IO
    • 双索引:xxx.index(偏移量索引) + xxx.timeindex(时间索引),加速查询
  • 存:压缩 + 日志清理
    • 批量压缩:支持 GZIP、Snappy 等算法
    • 日志清理:默认超出 7 天或 1GB 删除

::: info 零拷贝

:::

Kafka 数据传输是一个从网络到磁盘,再由磁盘到网络的过程。在网络和磁盘之间传输数据时,消除多余的复制是提高效率的关键。Kafka 利用零拷贝技术来消除传输过程中的多余复制

如果不采用零拷贝,Kafka 将数据同步给消费者的大致流程是:

  1. 从磁盘加载数据到 os buffer
  2. 拷贝数据到 app buffer
  3. 再拷贝数据到 socket buffer
  4. 接下来,将数据拷贝到网卡 buffer
  5. 最后,通过网络传输,将数据发送到消费者

采用零拷贝技术,Kafka 使用 sendfile() 系统方法,将数据从 os buffer 直接复制到网卡 buffer。这个过程中,唯一一次复制数据是从 os buffer 到网卡 buffer。这个复制过程是通过 DMA(Direct Memory Access,直接内存访问) 完成的。使用 DMA 时,CPU 不参与,这使得它非常高效。

【中等】Kafka 是如何实现横向扩展的?它如何处理大规模集群中的负载均衡?

Kafka 通过分区+副本机制实现横向扩展与负载均衡,配合动态重平衡与 ISR 选举保障高可用性。合理配置分区/副本数和 ACK 策略是关键。

分区机制

  • 基本概念
    • 每个 Topic 划分为多个 Partition,分布在集群 Broker 上
    • 单个 Partition 内消息有序,不同 Partition 间无序
  • 副本设计
    • 每个 Partition 配置多个 Replica(默认 1 Leader + N Follower)
    • Leader 处理读写请求,Follower 同步数据
    • ISR(In-Sync Replicas)维护同步副本集合,确保快速故障转移

负载均衡实现

角色 策略 作用
Producer 轮询(Round-robin) 均匀分布消息到各分区
按键哈希(Key Hashing) 相同 Key 的消息固定到同一分区
Consumer 消费组(Consumer Group)机制 组内消费者并行消费不同分区
分区分配策略(Range/Round-robin) 控制消费者与分区的映射关系

动态扩展与容错

  • Broker 扩容
    • 新 Broker 加入时自动触发分区重分配(通过kafka-reassign-partitions工具)
    • 支持手动调整副本分布,优化数据均衡
  • 故障恢复
    • Leader 失效时,从 ISR 中选举新 Leader(通常<1 秒)
    • 非同步副本(Out-of-Sync)需完全同步后才能加入 ISR

关键优化点

  • 减少 Rebalance 影响
    • 静态成员资格(group.instance.id)避免消费者短暂离线触发重平衡
    • 增量再平衡(Kafka 2.4+)仅调整变化的分区
  • 性能权衡
    • 分区数↑ → 并行度↑,但元数据开销↑(建议单 Broker≤2000 分区)
    • 副本数↑ → 可靠性↑,但写入延迟↑(通常replication.factor=3

数据一致性保障

生产者 ACK 配置:

  • acks=0:不等待确认(高性能,可能丢失数据)
  • acks=1:Leader 写入即确认(平衡选择)
  • acks=all:所有 ISR 副本确认(高可靠,延迟高)

【中等】Kafka 的日志压缩功能是如何实现的?它在什么场景下使用?

日志压缩通过 Key-Level 去重优化存储效率,适用于状态跟踪类场景,需权衡实时性与资源开销。配置时建议结合业务数据更新频率调整log.cleaner相关参数。

基本概念

  • 功能本质:保留每个键(Key)的最新消息,删除历史重复值
  • 触发条件:需配置log.cleanup.policy=compact
  • 执行主体:后台 Cleaner 线程周期性扫描压缩

工作机制

环节 说明
写入阶段 所有消息(含重复 Key)正常写入日志
压缩阶段 Cleaner 线程扫描日志,对同一 Key 只保留 offset 最大的记录
清理阶段 被标记删除的消息最终被物理清除

典型应用场景

  • 数据库变更日志(CDC):仅保留数据表的最终状态
  • 设备状态监控:存储物联网设备最新上报数据
  • 配置管理中心:记录配置项最新版本
  • 会话持久化:保存用户会话最新信息

与其他机制的对比

特性 日志压缩 日志删除(按时间/大小)
保留策略 按 Key 保留最新值 按时间/文件大小删除旧数据
适用场景 需要 Key 级状态追溯 只需保留近期数据
可共存性 可与删除策略同时配置 -

注意事项

  • 延迟性:压缩非实时,存在数据最终一致性
  • 资源消耗:压缩过程占用 CPU/IO 资源
  • 特殊键处理null键消息不会被压缩保留
  • 监控指标:关注kafka.log:type=LogCleanerManager相关指标

【困难】Kafka 的流量控制是如何实现的?如何通过流量控制避免系统过载?

Kafka 通过参数化限速和自适应背压实现多层级流量控制,需根据业务特点(吞吐/延迟/可靠性需求)组合配置。生产环境建议配合监控系统实现动态调节。

限速控制(Rate Limiting)

组件 关键参数 控制效果
生产者 max.in.flight.requests.per.connection 限制单连接未确认请求数(默认 5)
linger.ms 批量发送等待时间(0-5000ms)
消费者 fetch.min.bytes 单次拉取最小数据量(默认 1B)
fetch.max.wait.ms 拉取请求最长等待时间(默认 500ms)

背压机制(Backpressure)

  • 消费者控制
    • 手动提交偏移量(enable.auto.commit=false
    • 通过处理进度反馈调节消费速率
  • 系统级缓冲
    • 生产者缓冲区(buffer.memory,默认 32MB)
    • 消费者 fetch 队列(queued.max.messages,默认 500)

高级控制策略

  • 动态限流:基于监控指标(如 CPU/网络负载)自动调整生产/消费速率
  • 异步批处理:流处理框架(Flink/Spark)的微批处理优化吞吐量

配置建议

场景 优化方向 典型值
高吞吐场景 增大linger.ms+batch.size linger.ms=50-100ms
低延迟场景 减小fetch.max.wait.ms fetch.max.wait.ms=10ms
稳定性优先 降低max.in.flight.requests 设为 1(确保顺序性)

【困难】Kafka 在高吞吐量场景下如何保持低延迟?有哪些性能调优的策略?

通过 并行化、批处理、硬件加速 实现高吞吐,同时控制分区/副本数量及网络参数以降低延迟。

分区与副本优化

  • 分区数:增加分区提升并行度,但避免过多(管理开销)。
  • 副本数:通常设 2-3,平衡可靠性与性能。

生产端调优

  • acks=1:确保至少 1 个副本写入,兼顾性能与可靠性。
  • batch.size ↑ + linger.ms ↓:减少网络请求,降低延迟。
  • 压缩:选用 lz4(高效压缩/解压),节省带宽。

消费端调优

  • fetch.min.bytes + fetch.max.wait.ms:平衡吞吐与延迟。

硬件优化

  • 磁盘:SSD(显著提升 I/O 性能)。
  • 内存/CPU:增大内存缓存数据,多核处理并行任务。
  • 网络:确保高带宽,减少传输延迟。

Broker 配置

  • log.retention ↑:减少日志频繁清理开销。
  • socket 缓冲区 ↑:提升网络传输效率。

【困难】Kafka 如何处理数据倾斜问题?有哪些优化手段可以均衡负载?

通过 分区策略优化 + 动态资源分配 + 流量控制,实现数据均匀分布与稳定吞吐。

均衡数据分布

  • 合理设计分区键:选择高基数字段(如 user_idorder_id),避免热点。
  • 增加分区数:分散数据压力,但避免过多分区导致管理负担。
  • 自定义分区器:按业务逻辑重写分配策略(如轮询、哈希优化)。

动态调整与冗余

  • 调整副本因子:适当增加副本(如 replication-factor=3)分散读压力,平衡资源开销。
  • 动态监控调整:实时监控分区负载,必要时触发 rebalance 或迁移数据。

流控与限流

  • 生产者限流:控制 producer 速率(如 max.in.flight.requests)。
  • 消费者限流:调整 fetch.max.bytes 或使用背压机制,匹配消费能力。

实现 高吞吐、低延迟、强一致性 的流式数据处理管道。

基础集成步骤

  • 添加依赖:引入 flink-connector-kafka(匹配 Kafka 版本)。
  • 配置 Source:通过 FlinkKafkaConsumer 订阅 Kafka Topic。
  • 配置 Sink:通过 FlinkKafkaProducer 写入结果到 Kafka。
  • 设计作业:在 Flink 中实现数据处理逻辑(过滤/转换/聚合)。

性能优化方向

优化项 关键措施
参数调优 - 调整 batch.size/linger.ms(生产者)
- 设置合理并行度(Flink 任务)
资源分配 - 平衡 Flink TaskManager 的 CPU/内存
- 确保 Kafka Broker 带宽充足
容错机制 - 启用 Flink Checkpointing(精确一次语义)
- 配置 Kafka 幂等性/事务
数据压缩 选用高效压缩算法(如 lz4/snappy),减少网络传输压力

关键代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Kafka Source
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka:9092");
props.setProperty("group.id", "flink-group");

FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
props
);

// Kafka Sink
FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>(
"output-topic",
new SimpleStringSchema(),
props
);

// 作业流程
env.addSource(source)
.map(...) // 数据处理
.addSink(sink);

高级特性

  • 动态发现分区setStartFromLatest()/setStartFromEarliest()
  • 水位线生成:结合 assignTimestampsAndWatermarks 处理事件时间。
  • Exactly-Once 保障:启用 Kafka 事务(需配置 transaction.timeout.ms)。

【困难】在 Kafka 中,如何优化磁盘 I/O 性能?有哪些策略可以减少 I/O 开销?

存储架构优化

方法 作用
增加分区数 分散写入负载,利用多磁盘并行 I/O(但避免过多分区导致管理开销)。
多副本配置 提升读取吞吐量(副本数通常 2-3),同时增强容错能力。
高性能磁盘 优先选择 NVMe SSD > SATA SSD > HDD,显著降低读写延迟。
RAID 配置 - RAID 0:纯性能提升(无冗余)
- RAID 10:性能+冗余(推荐生产环境)。

参数调优

类型 关键参数 优化建议
Kafka 配置 num.io.threads 增加 I/O 线程数(默认=8,建议=CPU 核数)。
log.flush.interval.messages 调高刷盘间隔(减少频繁刷盘开销)。
生产者配置 batch.size + linger.ms 增大批次大小(如 64KB)和等待时间(如 50ms)。
系统级优化 Linux vm.dirty_ratio 调大文件系统缓存比例(如 20%-30%)。

高级特性

  • 分层存储(Tiered Storage)
    将冷数据迁移至对象存储(如 S3),热数据保留在本地 SSD,降低高速磁盘压力。
  • 压缩优化
    启用消息压缩(如 lz4),减少磁盘写入量和网络传输负载。

实践示例

1
2
3
4
5
6
7
8
# Kafka Broker 配置示例
num.io.threads=16
log.flush.interval.messages=10000

# 生产者配置示例
batch.size=65536
linger.ms=50
compression.type=lz4

目标:通过 硬件选型 + 并行化设计 + 批量处理,实现高吞吐、低延迟的磁盘 I/O 性能。

【困难】Kafka 的多租户支持是如何实现的?如何通过配额控制各租户的资源使用?

通过 资源隔离 + 精准配额 + 动态管控,实现安全、公平的多租户架构。

租户隔离机制

方法 实现方式
主题隔离 每个租户分配独立 Topic(如 tenantA_ordertenantB_log),物理隔离数据。
ACL 权限控制 通过 ACL 限制租户仅能访问自有 Topic(配置 CREATE/DESCRIBE/READ/WRITE 权限)。

配额管理配置

配额类型 控制对象 配置示例(CLI)
生产速率限制 生产者消息吞吐量 --add-config 'producer_byte_rate=1048576'(限制 1MB/s)
消费速率限制 消费者消息拉取速度 --add-config 'consumer_byte_rate=524288'(限制 512KB/s)
存储空间限制 Topic 磁盘占用 配置 log.retention.bytes=1073741824(限制 1GB) + log.retention.ms(时间策略)

动态管理工具

  • 命令工具:通过 kafka-configs.sh 动态调整配额(无需重启集群):
    1
    2
    3
    4
    # 设置租户 A 的生产配额
    kafka-configs.sh --bootstrap-server localhost:9092 --alter \
    --entity-type users --entity-name tenantA \
    --add-config 'producer_byte_rate=1048576,consumer_byte_rate=524288'

实施建议

  • 命名规范:Topic 名称包含租户标识(如 {tenant}_{data_type})。
  • 监控:结合 Kafka Metrics 或 Prometheus 监控配额使用情况,避免资源争抢。
  • 安全:启用 SSL/SASL 认证,防止租户越权访问。

【困难】Kafka 的 Stream 和 Table 是如何相互转换的?它们在 Kafka Streams 中的应用场景是什么?

通过 流表转换 + 状态管理,实现实时计算与状态维护的统一处理。

核心概念对比

抽象类型 特点 适用场景
Stream 无界、有序的键值记录流(事件日志) 实时分析、事件监控(如点击流、交易记录)
Table 有状态的键值快照(当前数据视图) 状态维护(如用户配置、库存数量)

相互转换操作

(1) Stream → Table

通过 聚合操作 将动态流转换为状态表:

1
2
3
4
5
6
7
8
9
10
KStream<String, Long> stream = builder.stream("input-topic");

// 按 Key 分组并累加值
KTable<String, Long> table = stream
.groupByKey()
.aggregate(
() -> 0L, // 初始值
(key, newValue, agg) -> agg + newValue, // 累加逻辑
Materialized.as("count-store") // 状态存储
);

(2) Table → Stream

通过 toStream() 将表变更作为流输出:

1
2
KTable<String, Long> table = builder.table("input-topic");
KStream<String, Long> stream = table.toStream(); // 输出表的更新事件

典型应用场景

  • 电商实时统计

    • Stream:处理用户订单事件(如 order-created)。
    • Table:维护用户总订单数(user_id → total_orders)。
  • 视频播放分析

    • Stream:接收视频点击事件(video_id, timestamp)。
    • Table:存储当前视频播放量(video_id → play_count)。

关键设计思想

  • 流表二元性
    • Stream 是 Table 的变更日志(Changelog)。
    • Table 是 Stream 的物化视图(Materialized View)。
  • 状态管理:Table 依赖 RocksDB 状态存储,支持容错与高效查询。

【困难】Kafka 的内部状态是如何管理的?如何通过状态管理优化性能?

通过 合理分区设计 + 资源分配 + 参数调优,实现高吞吐、低延迟的稳定集群。

核心状态管理机制

组件 功能
Zookeeper 集群协调(Broker 注册、Leader 选举、Consumer Offset 存储)
Broker 存储 消息持久化(内存 Page Cache + 磁盘日志)
Kafka Streams 流处理状态管理(RocksDB 状态存储、窗口化操作)

关键性能优化策略

(1) 集群设计优化

配置项 优化建议
分区数量 - 分区数 ≈ 目标吞吐量 / 单分区吞吐能力
- 避免过多分区(建议单 Broker ≤ 2000)
副本因子 生产环境建议 2-3(平衡可靠性与存储开销)
Topic 规划 按业务拆分 Topic(如 logs-{service}),避免热点

(2) Producer/Consumer 调优

1
2
3
4
5
6
7
8
9
# Producer 优化
batch.size=16384 # 增大批次(默认 16KB)
linger.ms=20 # 适当增加等待时间
compression.type=lz4 # 启用压缩
acks=1 # 平衡可靠性与延迟

# Consumer 优化
fetch.min.bytes=1024 # 减少拉取频次
max.poll.records=500 # 单次拉取最大消息数

(3) Broker 资源配置

资源 优化方向
内存 - JVM 堆内存 ≤ 6GB(避免 GC 停顿)
- 预留 50% 内存给 Page Cache
磁盘 - 使用 SSD/NVMe
- 配置 RAID 10(高性能+冗余)
网络 万兆网络 + 多网卡绑定(避免带宽瓶颈)

(4) Kafka Streams 状态优化

1
2
3
4
5
// 启用 RocksDB 状态存储
Stores.persistentKeyValueStore("my-store");

// 窗口化操作(如 5 分钟滚动窗口)
TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofSeconds(30));

监控与调优工具

  • 指标监控:关注 UnderReplicatedPartitionsRequestQueueSizeNetworkProcessorAvgIdlePercent
  • 命令行工具kafka-configs.sh(动态调整配额)、kafka-topics.sh(分区扩容)
  • JVM 调优:G1 GC + 禁用偏向锁(-XX:-UseBiasedLocking

参考资料

RPC 面试

RPC 简介

【基础】什么是 RPC?RPC 有什么用?

:::details 要点

RPC 的全称是 Remote Procedure Call,即远程过程调用

RPC 的主要作用是:

  • 屏蔽远程调用跟本地调用的差异,让用户像调用本地一样去调用远程方法。
  • 隐藏底层网络通信的复杂性,让用户更聚焦于业务逻辑。

RPC 是微服务架构的基石,它提供了一种应用间通信的方式。

:::

【中级】RPC 是怎样工作的?

:::details 要点

RPC 是一种应用间通信的方式,它的通信流程中需要注意以下环节:

  • 传输方式:RPC 是一个远程调用,因此必然需要通过网络传输数据,且 RPC 常用于业务系统之间的数据交互,需要保证其可靠性,所以 RPC 一般默认采用 TCP 来传输。
  • 序列化:在网络中传输的数据只能是二进制数据,而 RPC 请求时,发送的都是对象。因此,请求方需要将请求参数转为二进制数据,即序列化。
  • 反序列化:RPC 响应方接受到请求,要将二进制数据转换为请求参数,需要反序列化
  • 协议:请求方和响应方要互相识别彼此的信息,需要约定好彼此数据的格式,即协议。大多数的协议至少分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。
  • 动态代理:为了屏蔽底层通信细节,使用户聚焦自身业务,因此 RPC 框架一般引入了动态代理,通过依赖注入等技术,拦截方法调用,完成远程调用的通信逻辑。

img

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub 找到服务地址,并将消息发送到服务端;
  4. server stub 收到消息后进行解码;
  5. server stub 根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给 server stub;
  7. server stub 将返回结果打包成消息并发送至消费方;
  8. client stub 接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

:::

协议

【中级】为何需要 RPC 协议?

:::details 要点

只有二进制才能在网络中传输,所以 RPC 请求需要把方法调用的请求参数先转成二进制,然后再通过网络传输。

传输的数据可能很大,RPC 请求需要将数据分解为多个数据包;传输的数据也可能较小,需要和其他请求的数据包进行合并。当接收方收到请求时,需要从二进制数据中识别出不同的请求。问题是,如何从二进制数据中识别出其所属的请求呢?

这就需要发送方、接收方在通信过程中达成共识,严格按照协议处理二进制数据。这就好比让你读一篇没有标点符号的文章,你要怎么识别出每一句话到哪里结束呢?很简单啊,我们加上标点,完成断句就好了。这里有个潜在的含义,写文章和读文章的人,都遵循标点符号的用法。

再进一步探讨,既然已经有很多成熟的网络协议,为何还要设计 RPC 协议?

有必要。因为 HTTP 这些通信标准协议,数据包中的实际请求数据相对于数据包本身要小很多,有很多无用的内容;并且 HTTP 属于无状态协议,无法将请求和响应关联,每次请求要重新建立连接。这对于高性能的 RPC 来说,HTTP 协议难以满足需求,所以有必要设计一个紧凑的私有协议

:::

【中级】设计一个 RPC 协议的要点?

:::details 要点

首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。

接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。

大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。

综上,一个 RPC 协议大概会由下图中的这些参数组成:

前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参数就会导致线上兼容问题。

为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。

:::

序列化

【基础】什么是序列化?有哪些常见的序列化方式?

:::details 要点

由于,网络传输的数据必须是二进制数据,而调用方请求的出参、入参都是对象。因此,必须将对象转换可传输的二进制,并且要求转换算法是可逆的。

  • 序列化(serialize):序列化是将对象转换为二进制数据。
  • 反序列化(deserialize):反序列化是将二进制数据转换为对象。

Java 领域,常见的序列化技术如下

市面上有如此多的序列化技术,那么我们在应用时如何选择呢?

一般而言,序列化技术选型需要考量的维度,根据重要性从高到低,依次有:

  • 安全性:是否存在漏洞。如果存在漏洞,就有被攻击的可能性。
  • 兼容性:版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的。服务调用的稳定性与可靠性,要比服务的性能更加重要。
  • 性能
    • 时间开销:序列化、反序列化的耗时性能自然越小越好。
    • 空间开销:序列化后的数据越小越好,这样网络传输效率就高。
  • 易用性:类库是否轻量化,API 是否简单易懂。

鉴于以上的考量,序列化技术的选型建议如下:

  • JDK 序列化:性能较差,且有很多使用限制,不建议使用。
  • ThriftProtobuf:适用于对性能敏感,对开发体验要求不高
  • Hessian:适用于对开发体验敏感,性能有要求
  • JacksonGsonFastjson:适用于对序列化后的数据要求有良好的可读性(转为 json 、xml 形式)。

扩展阅读:深入理解 Java 序列化

:::

【基础】序列化的使用中需要注意哪些问题?

:::details 要点

由于 RPC 每次通信,都要经过序列化、反序列化的过程,所以序列化方式,会直接影响 RPC 通信的性能。除了选择合适的序列化技术,如何合理使用序列化也非常重要。

RPC 序列化常见的使用不当的情况如下:

  • 对象过于复杂、庞大 - 对象过于复杂、庞大,会降低序列化、反序列化的效率,并增加传输开销,从而导致响应时延增大。

    • 过于复杂:存在多层的嵌套,比如 A 对象关联 B 对象,B 对象又聚合 C 对象,C 对象又关联聚合很多其他对象
    • 过于庞大:比如一个大 List 或者大 Map
  • 对象有复杂的继承关系 - 对象关系越复杂,就越浪费性能,同时又很容易出现序列化上的问题。大多数序列化框架在进行序列化时,如果发现类有继承关系,会不停地寻找父类,遍历属性。

  • 使用序列化框架不支持的类作为入参类 - 比如 Hessian 框架,他天然是不支持 LinkHashMap、LinkedHashSet 等,而且大多数情况下最好不要使用第三方集合类,如 Guava 中的集合类,很多开源的序列化框架都是优先支持编程语言原生的对象。因此如果入参是集合类,应尽量选用原生的、最为常用的集合类,如 HashMap、ArrayList。

前面已经列举了常见的序列化问题,既然明确了问题,就要针对性预防。RPC 序列化时要注意以下几点:

  1. 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
  2. 入参对象与返回值对象体积不要太大,更不要传太大的集合;
  3. 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
  4. 对象不要有复杂的继承关系,最好不要有父子类的情况。

:::

通信

【中级】RPC 在网络通信上倾向选择哪种网络 IO 模型?

:::details 要点

一次 RPC 调用,本质就是服务消费者与服务提供者间的一次网络信息交换的过程。可见,通信是 RPC 实现的核心。

常见的网络 IO 模型分为四种:同步阻塞 IO(BIO)、同步非阻塞 IO(NIO)、IO 多路复用和异步非阻塞 IO(AIO)。在这四种 IO 模型中,只有 AIO 为异步 IO,其他都是同步 IO。

什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。

RPC 调用在大多数的情况下,是一个高并发调用的场景,考虑到系统内核的支持、编程语言的支持以及 IO 模型本身的特点,在 RPC 框架的实现中,在网络通信的处理上,通常会选择 IO 多路复用的方式。开发语言的网络通信框架的选型上,最优的选择是基于 Reactor 模式实现的框架,如 Java 语言,首选的框架便是 Netty 框架(Java 还有很多其他 NIO 框架,但目前 Netty 应用得最为广泛),并且在 Linux 环境下,也要开启 epoll 来提升系统性能(Windows 环境下是无法开启 epoll 的,因为系统内核不支持)。

:::

【高级】什么是零拷贝?

:::details 要点

系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。

img

应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。

应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。

所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。

img

Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。

Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。

Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。

Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。

扩展阅读:深入剖析Linux IO原理和几种零拷贝机制的实现

:::

动态代理

【中级】RPC 如何将远程调用转为本地调用的?

:::details 要点

RPC 的远程过程调用是通过动态代理实现的

RPC 框架会自动为要调用的接口生成一个代理类。当在项目中注入接口的时候,运行过程中实际绑定的就是这个接口生成的代理类。在接口方法被调用时,会被代理类拦截,这样,就可以在生成的代理类中,加入远程调用逻辑。

img

除了 JDK 默认的 InvocationHandler 能完成代理功能,还有很多其他的第三方框架也可以,比如像 Javassist、Byte Buddy 这样的框架。

单纯从代理功能上来看,JDK 默认的代理功能是有一定的局限性的,它要求被代理的类只能是接口。原因是因为生成的代理类会继承 Proxy 类,但 Java 是不支持多重继承的。此外,由于它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低。

反射+动态代理更多详情可以参考:深入理解 Java 反射和动态代理

:::

服务发现

【中级】如何实现服务发现?

:::details 要点

RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。

img

  • 服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心之中,注册中心将这个服务节点的 IP 和接口保存下来。
  • 服务订阅:在服务调用方启动的时候,去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。

基于 ZooKeeper 的服务发现

使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。

搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能。

img

通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。

在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:

  • 注册中心负载过高;
  • 各节点数据不一致;
  • 服务下发不及时或下发错误的服务节点列表。

RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。

:::

负载均衡

【中级】负载均衡有哪些策略?

负载均衡详情可以参考:负载均衡基本原理

【中级】如何设计自适应的负载均衡?

:::details 要点

可以采用一种打分的策略,服务调用者收集与之建立长连接的每个服务节点的指标数据,如服务节点的负载指标、CPU 核数、内存大小、请求处理的耗时指标(如请求平均耗时、TP99、TP999)、服务节点的状态指标(如正常、亚健康)。通过这些指标,计算出一个分数,比如总分 10 分,如果 CPU 负载达到 70%,就减它 3 分,当然了,减 3 分只是个类比,需要减多少分是需要一个计算策略的。可以为每个指标都设置一个指标权重占比,然后再根据这些指标数据,计算分数。

可以配合随机权重的负载均衡策略去控制,通过最终的指标分数修改服务节点最终的权重。例如给一个服务节点综合打分是 8 分(满分 10 分),服务节点的权重是 100,那么计算后最终权重就是 80(100*80%)。服务调用者发送请求时,会通过随机权重的策略来选择服务节点,那么这个节点接收到的流量就是其他正常节点的 80%(这里假设其他节点默认权重都是 100,且指标正常,打分为 10 分的情况)。

到这儿,一个自适应的负载均衡我们就完成了,整体的设计方案如下图所示:

img

关键步骤我来解释下:

  1. 添加服务指标收集器,并将其作为插件,默认有运行时状态指标收集器、请求耗时指标收集器。
  2. 运行时状态指标收集器收集服务节点 CPU 核数、CPU 负载以及内存等指标,在服务调用者与服务提供者的心跳数据中获取。
  3. 请求耗时指标收集器收集请求耗时数据,如平均耗时、TP99、TP999 等。
  4. 可以配置开启哪些指标收集器,并设置这些参考指标的指标权重,再根据指标数据和指标权重来综合打分。
  5. 通过服务节点的综合打分与节点的权重,最终计算出节点的最终权重,之后服务调用者会根据随机权重的策略,来选择服务节点。

:::

路由

【中级】什么是服务路由?有哪些常见的路由规则?

:::details 要点

服务路由是指通过一定的规则从集群中选择合适的节点。

负载均衡的作用和服务路由的功能看上去很近似,二者有什么区别呢?

负载均衡的目标是提供服务分发而不是解决路由问题,常见的静态、动态负载均衡算法也无法实现精细化的路由管理,但是负载均衡也可以简单看做是路由方案的一种。

服务路由通常用于以下场景,目的在于实现流量隔离

  • 分组调用:一般来讲,为了保证服务的高可用性,实现异地多活的需求,一个服务往往不止部署在一个数据中心,而且出于节省成本等考虑,有些业务可能不仅在私有机房部署,还会采用公有云部署,甚至采用多家公有云部署。服务节点也会按照不同的数据中心分成不同的分组,这时对于服务消费者来说,选择哪一个分组调用,就必须有相应的路由规则。
  • 蓝绿发布:蓝绿发布场景中,一共有两套服务群组:一套是提供旧版功能的服务群组,标记为绿色;另一套是提供新版功能的服务群组,标记为蓝色。两套服务群组都是功能完善的,并且正在运行的系统,只是服务版本和访问流量不同。新版群组(蓝色)通常是为了做内部测试、验收,不对外部用户暴露。
    • 如果新版群组(蓝色)运行稳定,并测试、验收通过后,则通过服务路由、负载均衡等手段逐步将外部用户流量导向新版群组(蓝色)。
    • 如果新版群组(蓝色)运行不稳定,或测试、验收不通过,则排查、解决问题后,再继续测试、验收。
  • 灰度发布:灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行 A/B 测试,即让一部分用户使用特性 A,一部分用户使用特性 B:如果用户对 B 没有什么反对意见,那么逐步扩大发布范围,直到把所有用户都迁移到 B 上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。要支持灰度发布,就要求服务能够根据一定的规则,将流量隔离。
  • 流量切换:在业务线上运行过程中,经常会遇到一些不可抗力因素导致业务故障,比如某个机房的光缆被挖断,或者发生着火等事故导致整个机房的服务都不可用。这个时候就需要按照某个指令,能够把原来调用这个机房服务的流量切换到其他正常的机房。
  • 线下测试联调:线下测试时,可能会缺少相应环境。可以将测试应用注册到线上,然后开启路由规则,在本地进行测试。
  • 读写分离。对于大多数互联网业务来说都是读多写少,所以在进行服务部署的时候,可以把读写分开部署,所有写接口可以部署在一起,而读接口部署在另外的节点上。

常见的路由规则有:

  • 条件路由规则 - 条件路由是基于条件表达式的路由规则。各个 RPC 框架的条件路由表达式各不相同。
  • 标签路由规则 - 标签路由通过将某一个或多个服务的提供者划分到同一个分组,约束流量只在指定分组中流转,从而实现流量隔离的目的,可以作为蓝绿发布、灰度发布等场景的能力基础。标签主要是指对服务提供者的分组,目前有两种方式可以完成实例分组,分别是动态规则打标静态规则打标。一般,动态规则优先级比静态规则更高,当两种规则同时存在且出现冲突时,将以动态规则为准。
  • 脚本路由规则 - 脚本路由是基于脚本语言的路由规则,具有最高的灵活性,常用的脚本语言比如 JavaScript、Groovy、JRuby 等。

:::

监控

【中级】如何实现 RPC 的健康检查?

:::details 要点

使用频率适中的心跳去检测目标机器的健康状态

  • 健康状态:建立连接成功,并且心跳探活也一直成功;
  • 亚健康状态:建立连接成功,但是心跳请求连续失败;
  • 死亡状态:建立连接失败。

可以使用可用率来作为健康状态的量化标准

1
可用率 = 一个时间窗口内接口调用成功次数 / 总调用次数

当可用率低于某个比例,就认为这个节点存在问题,把它挪到亚健康列表,这样既考虑了高低频的调用接口,也兼顾了接口响应时间不同的问题。

:::

链路跟踪

分布式链路跟踪就是将一次分布式请求还原为一个完整的调用链路,我们可以在整个调用链路中跟踪到这一次分布式请求的每一个环节的调用情况,比如调用是否成功,返回什么异常,调用的哪个服务节点以及请求耗时等等。

Trace 就是代表整个链路,每次分布式都会产生一个 Trace,每个 Trace 都有它的唯一标识即 TraceId,在分布式链路跟踪系统中,就是通过 TraceId 来区分每个 Trace 的。
Span 就是代表了整个链路中的一段链路,也就是说 Trace 是由多个 Span 组成的。在一个 Trace 下,每个 Span 也都有它的唯一标识 SpanId,而 Span 是存在父子关系的。还是以讲过的例子为例子,在 A->B->C->D 的情况下,在整个调用链中,正常情况下会产生 3 个 Span,分别是 Span1(A->B)、Span2(B->C)、Span3(C->D),这时 Span3 的父 Span 就是 Span2,而 Span2 的父 Span 就是 Span1。

RPC 在整合分布式链路跟踪需要做的最核心的两件事就是“埋点”和“传递”。

我们前面说是因为各子应用、子服务间复杂的依赖关系,所以通过日志难定位问题。那我们就想办法通过日志定位到是哪个子应用的子服务出现问题就行了。

其实,在 RPC 框架打印的异常信息中,是包括定位异常所需要的异常信息的,比如是哪类异常引起的问题(如序列化问题或网络超时问题),是调用端还是服务端出现的异常,调用端与服务端的 IP 是什么,以及服务接口与服务分组都是什么等等。具体如下图所示:

img

优雅启停

【中级】如何实现 RPC 优雅关闭?

:::details 要点

当服务提供方要上线的时候,一般是通过部署系统完成实例重启。在这个过程中,服务提供方的团队并不会事先告诉调用方他们需要操作哪些机器,从而让调用方去事先切走流量。而对调用方来说,它也无法预测到服务提供方要对哪些机器重启上线,因此负载均衡就有可能把要正在重启的机器选出来,这样就会导致把请求发送到正在重启中的机器里面,从而导致调用方不能拿到正确的响应结果。

img

在服务重启的时候,对于调用方来说,这时候可能会存在以下几种情况:

  • 调用方发请求前,目标服务已经下线。对于调用方来说,跟目标节点的连接会断开,这时候调用方可以立马感知到,并且在其健康列表里面会把这个节点挪掉,自然也就不会被负载均衡选中。
  • 调用方发请求的时候,目标服务正在关闭,但调用方并不知道它正在关闭,而且两者之间的连接也没断开,所以这个节点还会存在健康列表里面,因此该节点就有一定概率会被负载均衡选中。

当出现第二种情况的时候,调用方业务会受损,如何避免这种问题呢。当服务提供方关闭前,是不是可以先通知注册中心进行下线,然后通过注册中心告诉调用方进行节点摘除?

img

如上图所示,整个关闭过程中依赖了两次 RPC 调用,一次是服务提供方通知注册中心下线操作,一次是注册中心通知服务调用方下线节点操作。注册中心通知服务调用方都是异步的。服务发现只保证最终一致性,并不保证实时性,所以注册中心在收到服务提供方下线的时候,并不能成功保证把这次要下线的节点推送到所有的调用方。所以这么来看,通过服务发现并不能做到应用无损关闭。

可以这么处理:当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。

如何捕获到关闭事件呢?可以通过捕获操作系统的进程信号来获取,在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。

关闭过程中已经在处理的请求会不会受到影响呢?如果进程结束过快会造成这些请求还没有来得及应答,同时调用方会也会抛出异常。为了尽可能地完成正在处理的请求,首先我们要把这些请求识别出来。可以在服务对象加上引用计数器,每开始处理请求之前加一,完成请求处理减一,通过该计数器我们就可以快速判断是否有正在处理的请求。服务对象在关闭过程中,会拒绝新的请求,同时根据引用计数器等待正在处理的请求全部结束之后才会真正关闭。但考虑到有些业务请求可能处理时间长,或者存在被挂住的情况,为了避免一直等待造成应用无法正常退出,我们可以在整个 ShutdownHook 里面,加上超时时间控制,当超过了指定时间没有结束,则强制退出应用。超时时间我建议可以设定成 10s,基本可以确保请求都处理完了。

img

:::

【中级】如何实现 RPC 优雅启动?

运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在Java里面,在运行过程中,JVM虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到JVM缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。

但是这些“临时数据”,都在应用重启后就消失了。重启后的这些“红利”没有了之后,如果让刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。

启动预热

启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。如何做到这点呢?

首先,对于调用方来说,我们要知道服务提供方启动的时间。一种是服务提供方在启动的时候,把自己启动的时间告诉注册中心;另外一种就是注册中心收到的服务提供方的请求注册时间。因为整个预热过程的时间是一个粗略值,即使机器之间的日期时间存在1分钟的误差也不影响,并且在真实环境中机器都会默认开启NTP时间同步功能,来保证所有机器时间的一致性。

不管你是选择哪个时间,最终的结果就是,调用方通过服务发现,除了可以拿到IP列表,还可以拿到对应的启动时间。接着,可以利用加权负载均衡算法来分发流量。现在,需要让这个权重变为动态的,并且是随着时间的推移慢慢增加到服务提供方设定的固定值。

img

通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。

延迟暴露

服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。

为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个 Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。

具体如何实现呢?

我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的 Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。

img

:::

流量回放

架构

【高级】如何设计一个 RPC 框架?

:::details 要点

设计一个 RPC 框架,可以自下而上梳理一下所需要的能力:

  • 通信传输模块:RPC 本质上就是一个远程调用,那肯定就需要通过网络来传输数据。
  • 协议模块:传输的数据如何定义,就需要通过协议和序列化方式来确定。此外,为了减少传输数据的大小,可以加入压缩功能。
  • 代理模块:为了屏蔽用户的感知,让用户更聚焦于自身业务,需要引入动态代理来托管远程调用。

以上,是一个 RPC 框架的基础能力,使用于 P2P 场景。

但是,如果面对集群模式,以上能力就不够了。同一个服务可能有多个提供者。消费者选择调用哪个提供者?消费者怎么找到提供者的访问地址?请求提供者失败了如何处理?这些都依赖于服务治理的能力。

服务治理,需要很多个模块的能力:服务发现、负载均衡、路由、容错、配置挂历等。

img

具备了这些能力就万事大吉了吗?RPC 框架很难一开始就面面俱到,但作为基础能力,在实际应用中,难免会有定制化的要求。这就要求 RPC 框架具备良好的扩展性。

通常来说,框架软件可以通过 SPI 技术来实现微内核+插件架构。根据依赖倒置原则,框架应该先将每个功能点都抽象成接口,并提供默认实现。然后,利用 SPI 机制,可以动态地为某个接口寻找服务实现。

加上了插件功能之后,我们的RPC框架就包含了两大核心体系——核心功能体系与插件体系,如下图所示:

img

:::

【高级】如何实现 RPC 异步调用?

:::details 要点

一次 RPC 调用的本质就是调用端向服务端发送一条请求消息,服务端收到消息后进行处理,处理之后响应给调用端一条响应消息,调用端收到响应消息之后再进行处理,最后将最终的返回值返回给动态代理。

对于 RPC 框架,无论是同步调用还是异步调用,调用端的内部实现都是异步的

调用端发送的每条消息都一个唯一的消息标识,实际上调用端向服务端发送请求消息之前会先创建一个 Future,并会存储这个消息标识与这个 Future 的映射,动态代理所获得的返回值最终就是从这个 Future 中获取的;当收到服务端响应的消息时,调用端会根据响应消息的唯一标识,通过之前存储的映射找到对应的 Future,将结果注入给那个 Future,再进行一系列的处理逻辑,最后动态代理从 Future 中获得到正确的返回值。

所谓的同步调用,不过是 RPC 框架在调用端的处理逻辑中主动执行了这个 Future 的 get 方法,让动态代理等待返回值;而异步调用则是 RPC 框架没有主动执行这个 Future 的 get 方法,用户可以从请求上下文中得到这个 Future,自己决定什么时候执行这个 Future 的 get 方法。

img

如何做到 RPC 调用全异步?

实现 RPC 调用全异步的方法是让 RPC 框架支持 CompletableFutureCompletableFuture 是 Java8 原生支持的。如果 RPC 框架能够支持 CompletableFuture,现在发布一个 RPC 服务,服务接口定义的返回值是 CompletableFuture 对象,整个调用过程会分为这样几步:

  • 服务调用方发起 RPC 调用,直接拿到返回值 CompletableFuture 对象,之后就不需要任何额外的与 RPC 框架相关的操作了,直接就可以进行异步处理;
  • 在服务端的业务逻辑中创建一个返回值 CompletableFuture 对象,之后服务端真正的业务逻辑完全可以在一个线程池中异步处理,业务逻辑完成之后再调用这个 CompletableFuture 对象的 complete 方法,完成异步通知;
  • 调用端在收到服务端发送过来的响应之后,RPC 框架再自动地调用调用端拿到的那个返回值 CompletableFuture 对象的 complete 方法,这样一次异步调用就完成了。

通过对 CompletableFuture 的支持,RPC 框架可以真正地做到在调用端与服务端之间完全异步,同时提升了调用端与服务端的两端的单机吞吐量,并且 CompletableFuture 是 Java8 原生支持,业务逻辑中没有任何代码入侵性。

:::

【高级】Dubbo 中的时间轮机制是如何设计的?

:::details 要点

JDK 中定时任务的实现

在很多开源框架中,都需要定时任务的管理功能,例如 ZooKeeper、Netty、Quartz、Kafka 以及 Linux 操作系统。

定时器的本质是设计一种数据结构,能够存储和调度任务集合,而且 deadline 越近的任务拥有更高的优先级。那么定时器如何知道一个任务是否到期了呢?定时器需要通过轮询的方式来实现,每隔一个时间片去检查任务是否到期。

所以定时器的内部结构一般需要一个任务队列和一个异步轮询线程,并且能够提供三种基本操作:

  • Schedule 新增任务至任务集合;
  • Cancel 取消某个任务;
  • Run 执行到期的任务。

JDK 原生提供了三种常用的定时器实现方式,分别为 TimerDelayedQueueScheduledThreadPoolExecutor

JDK 内置的三种实现定时器的方式,实现思路都非常相似,都离不开任务任务管理任务调度三个角色。三种定时器新增和取消任务的时间复杂度都是 O(logn),面对海量任务插入和删除的场景,这三种定时器都会遇到比较严重的性能瓶颈。

对于性能要求较高的场景,一般都会采用时间轮算法来实现定时器。时间轮(Timing Wheel)是 George Varghese 和 Tony Lauck 在 1996 年的论文 Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility 实现的,它在 Linux 内核中使用广泛,是 Linux 内核定时器的实现方法和基础之一。

时间轮的基本原理

时间轮是一种高效的、批量管理定时任务的调度模型。时间轮可以理解为一种环形结构,像钟表一样被分为多个 slot 槽位。每个 slot 代表一个时间段,每个 slot 中可以存放多个任务,使用的是链表结构保存该时间段到期的所有任务。时间轮通过一个时针随着时间一个个 slot 转动,并执行 slot 中的所有到期任务。

图片 22.png

任务是如何添加到时间轮当中的呢?可以根据任务的到期时间进行取模,然后将任务分布到不同的 slot 中。如上图所示,时间轮被划分为 8 个 slot,每个 slot 代表 1s,当前时针指向 2。假如现在需要调度一个 3s 后执行的任务,应该加入 2+3=5 的 slot 中;如果需要调度一个 12s 以后的任务,需要等待时针完整走完一圈 round 零 4 个 slot,需要放入第 (2+12)%8=6 个 slot。

那么当时针走到第 6 个 slot 时,怎么区分每个任务是否需要立即执行,还是需要等待下一圈,甚至更久时间之后执行呢?所以我们需要把 round 信息保存在任务中。例如图中第 6 个 slot 的链表中包含 3 个任务,第一个任务 round=0,需要立即执行;第二个任务 round=1,需要等待 1*8=8s 后执行;第三个任务 round=2,需要等待 2*8=8s 后执行。所以当时针转动到对应 slot 时,只执行 round=0 的任务,slot 中其余任务的 round 应当减 1,等待下一个 round 之后执行。

上面介绍了时间轮算法的基本理论,可以看出时间轮有点类似 HashMap,如果多个任务如果对应同一个 slot,处理冲突的方法采用的是拉链法。在任务数量比较多的场景下,适当增加时间轮的 slot 数量,可以减少时针转动时遍历的任务个数。

时间轮定时器最大的优势就是,任务的新增和取消都是 O(1) 时间复杂度,而且只需要一个线程就可以驱动时间轮进行工作。

:::

【中级】RPC 如何实现那泛化调用?

:::details 要点

在一些特定场景下,需要在没有接口的情况下进行 RPC 调用。例如:

场景一:搭建一个统一的测试平台,可以让各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。

img

场景二:搭建一个轻量级的服务网关,可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。

img

为了解决这些场景的问题,可以使用泛化调用。

就是 RPC 框架提供统一的泛化调用接口(GenericService),调用端在创建 GenericService 代理时指定真正需要调用的接口的接口名以及分组名,通过调用 GenericService 代理的 $invoke 方法将服务端所需要的所有信息,包括接口名、业务分组名、方法名以及参数信息等封装成请求消息,发送给服务端,实现在没有接口的情况下进行 RPC 调用的功能。

1
2
3
4
class GenericService {
Object $invoke(String methodName, String[] paramTypes, Object[] params);
CompletableFuture<Object> $asyncInvoke(String methodName, String[] paramTypes
}

而通过泛化调用的方式发起调用,由于调用端没有服务端提供方提供的接口 API,不能正常地进行序列化与反序列化,我们可以为泛化调用提供专属的序列化插件,来解决实际问题。

:::

参考资料

分布式存储面试

缓存

扩展:

【基础】什么是缓存?为什么需要缓存?

缓存就是数据交换的缓冲区,用于将频繁访问的数据暂存在访问速度快的存储介质

缓存的本质是一种利用空间换时间的设计:牺牲一定的数据实时性,使得访问更快更近

  • 将数据存储到读取速度更快的存储(设备);
  • 将数据存储到离应用最近的位置;
  • 将数据存储到离用户最近的位置。

缓存是用于存储数据的硬件或软件的组成部分,以使得后续更快访问相应的数据。缓存中的数据可能是提前计算好的结果、数据的副本等。典型的应用场景:有 cpu cache, 磁盘 cache 等。本文中提及到缓存主要是指互联网应用中所使用的缓存组件。

缓存命中率是缓存的重要度量指标,命中率越高越好。

1
缓存命中率 = 从缓存中读取次数 / 总读取次数

【基础】何时需要缓存?

引入缓存,会增加系统的复杂度,并牺牲一定的数据实时性。所以,引入缓存前,需要先权衡是否值得,考量点如下:

  • CPU 开销 - 如果应用某个计算需要消耗大量 CPU,可以考虑缓存其计算结果。典型场景:复杂的、频繁调用的正则计算;分布式计算中间状态等。
  • IO 开销 - 如果数据库连接池比较繁忙,可以考虑缓存其查询结果。

在数据层引入缓存,有以下几个好处:

  • 提升数据读取速度。
  • 提升系统扩展能力,通过扩展缓存,提升系统承载能力。
  • 降低存储成本,Cache+DB 的方式可以承担原有需要多台 DB 才能承担的请求量,节省机器成本。

【中级】缓存有哪些分类?

缓存从部署角度,可以分为客户端缓存和服务端缓存。

客户端缓存

  • Http 缓存:HTTP/1.1 中的 Cache-Control、HTTP/1 中的 Expires
  • 浏览器缓存:HTML5 提供的 SessionStorage 和 LocalStorage、Cookie
  • APP 缓存
    • Android
    • IOS

服务端缓存

  • CDN 缓存 - CDN 将数据缓存到离用户物理距离最近的服务器,使得用户可以就近获取请求内容。CDN 一般缓存静态资源文件(页面,脚本,图片,视频,文件等)。
  • 反向代理缓存 - 反向代理(Reverse Proxy)方式是指以代理服务器来接受网络连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给客户端,此时代理服务器对外就表现为一个反向代理服务器。反向代理缓存一般针对的是静态资源,而将动态资源请求转发到应用服务器处理。
  • 数据库缓存 - 数据库(如 Mysql)自身一般也有缓存,但因为命中率和更新频率问题,不推荐使用。
  • 进程内缓存 - 缓存应用字典等常用数据。
  • 分布式缓存 - 缓存数据库中的热点数据。

其中,CDN 缓存、反向代理缓存、数据库缓存一般由专职人员维护(运维、DBA)。

后端开发一般聚焦于进程内缓存、分布式缓存。

【中级】CDN 缓存是如何工作的?

CDN 将数据缓存到离用户物理距离最近的服务器,使得用户可以就近获取请求内容。CDN 一般缓存静态资源文件(页面,脚本,图片,视频,文件等)

国内网络异常复杂,跨运营商的网络访问会很慢。为了解决跨运营商或各地用户访问问题,可以在重要的城市,部署 CDN 应用。使用户就近获取所需内容,降低网络拥塞,提高用户访问响应速度和命中率。

img

CDN 缓存原理

CDN 的基本原理是广泛采用各种缓存服务器,将这些缓存服务器分布到用户访问相对集中的地区或网络中,在用户访问网站时,利用全局负载技术将用户的访问指向距离最近的工作正常的缓存服务器上,由缓存服务器直接响应用户请求。

(1)未部署 CDN 应用前的网络路径:

  • 请求:本机网络(局域网)=> 运营商网络 => 应用服务器机房
  • 响应:应用服务器机房 => 运营商网络 => 本机网络(局域网)

在不考虑复杂网络的情况下,从请求到响应需要经过 3 个节点,6 个步骤完成一次用户访问操作。

(2)部署 CDN 应用后网络路径:

  • 请求:本机网络(局域网) => 运营商网络
  • 响应:运营商网络 => 本机网络(局域网)

在不考虑复杂网络的情况下,从请求到响应需要经过 2 个节点,2 个步骤完成一次用户访问操作。

与不部署 CDN 服务相比,减少了 1 个节点,4 个步骤的访问。极大的提高了系统的响应速度。

CDN 特点

优点

  • 本地 Cache 加速 - 提升访问速度,尤其含有大量图片和静态页面站点;
  • 实现跨运营商的网络加速 - 消除了不同运营商之间互联的瓶颈造成的影响,实现了跨运营商的网络加速,保证不同网络中的用户都能得到良好的访问质量;
  • 远程加速 - 远程访问用户根据 DNS 负载均衡技术智能自动选择 Cache 服务器,选择最快的 Cache 服务器,加快远程访问的速度;
  • 带宽优化 - 自动生成服务器的远程 Mirror(镜像)cache 服务器,远程用户访问时从 cache 服务器上读取数据,减少远程访问的带宽、分担网络流量、减轻原站点 WEB 服务器负载等功能。
  • 集群抗攻击 - 广泛分布的 CDN 节点加上节点之间的智能冗余机制,可以有效地预防黑客入侵以及降低各种 D.D.o.S 攻击对网站的影响,同时保证较好的服务质量。

缺点

  • 不适宜缓存动态资源
    • 解决方案:主要缓存静态资源,动态资源建立多级缓存或准实时同步;
  • 存在数据的一致性问题
    • 解决方案(主要是在性能和数据一致性二者间寻找一个平衡)
    • 设置缓存失效时间(1 个小时,过期后同步数据)。
    • 针对资源设置版本号。

【中级】反向代理缓存是如何工作的?

反向代理(Reverse Proxy)方式是指以代理服务器来接受网络连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给客户端,此时代理服务器对外就表现为一个反向代理服务器。

img

反向代理位于应用服务器同一网络,处理所有对 WEB 服务器的请求。

反向代理缓存的原理:

  • 如果用户请求的页面在代理服务器上有缓存的话,代理服务器直接将缓存内容发送给用户。
  • 如果没有缓存则先向 WEB 服务器发出请求,取回数据,本地缓存后再发送给用户。

这种方式通过降低向 WEB 服务器的请求数,从而降低了 WEB 服务器的负载。

反向代理缓存一般针对的是静态资源,而将动态资源请求转发到应用服务器处理。常用的缓存应用服务器有 Varnish,Ngnix,Squid。

【中级】缓存有哪些淘汰算法?

扩展:

Cache Replacement Policies - RR, FIFO, LIFO, & Optimal

Cache Replacement Policies - MRU, LRU, Pseudo-LRU, & LFU

缓存一般存于访问速度较快的存储介质,快也就意味着资源昂贵并且有限。正所谓,好钢要用在刀刃上。因此,缓存要合理利用,需要设定一些机制,将一些访问频率偏低或过期的数据淘汰。

淘汰缓存首先要做的是,确定什么时候触发淘汰缓存,一般有以下几个思路:

  • 基于空间 - 设置缓存空间大小。
  • 基于容量 - 设置缓存存储记录数。
  • 基于时间
    • TTL(Time To Live,即存活期) - 缓存数据从创建到过期的时间。
    • TTI(Time To Idle,即空闲期) - 缓存数据多久没被访问的时间。

接下来,就要确定如何淘汰缓存,常见的缓存淘汰算法有以下几个:

  • FIFO(First In First Out,先进先出) - 淘汰最先进入的缓存数据。缓存的行为就像一个队列。
    • 优点:这种方案非常简单
    • 缺点:可能会导致缓存命中率低。因为,进入缓存的先后顺序和访问频率无关,这种算法可能会将访问频率高的数据给淘汰。
  • LIFO(Last In First Out,后进先出) - 淘汰最后进入的缓存数据。缓存的行为就像一个栈。
    • 优点:这种方案非常简单
    • 缺点:和 FIFO 一样,也可能会导致缓存命中率低。因为,进入缓存的先后顺序和访问频率无关,这种算法可能会将访问频率高的数据给淘汰。
  • MRU(Most Recently Used,最近最多使用) - 淘汰最近最多使用缓存。
    • 优点:适用于一些特殊场景,例如数据访问具有较强的局部性。举个例子,用户访问一个信息流页面,已经看过的内容,他肯定不想再看到,此时就可以使用 MRU。
    • 缺点:某些情况下,可能会导致频繁的淘汰缓存,从而降低缓存命中率
  • LRU(Least Recently Used,最近最少使用) - 淘汰最近最少使用缓存。
    • 优点:避免了 FIFO 缓存命中率低的问题。
    • 缺点:存在临界区问题。假设,缓存只保留 1 分钟以内的热点数据。如果有个数据在 1 个小时的前 59 分钟访问了 1 万次(可见这是个热点数据),最后一分钟没有任何访问;而其他数据有被访问,就会导致这个热点数据被淘汰。
  • LFU(Less Frequently Used,最近最少频率使用) - 该算法对 LRU 做了进一步优化:利用额外的空间记录每个数据的使用频率,然后淘汰使用频率最低的数据,如果所有数据使用频率相同,可以用 FIFO 淘汰最早的缓存数据。
    • 优点:解决了 LRU 的临界区问题。
    • 缺点:记录使用频率,会产生额外的空间开销

【高级】缓存更新有哪些策略?

top 5 caching strategies for System design interviews

一般来说,系统如果不是严格要求缓存和数据库保持一致性的话,尽量不要将读请求和写请求串行化。串行化可以保证一定不会出现数据不一致的情况,但是它会导致系统的吞吐量大幅度下降。缓存更新的常见策略有以下几种:

  • Cache Aside
  • Wirte Through
  • Read Though
  • Wirte Behind

需要注意的是:以上几种缓存更新策略,都无法保证数据强一致。如果一定要保证强一致性,可以通过两阶段提交(2PC)或 Paxos 协议来实现。但是 2PC 太慢,而 Paxos 太复杂,所以如果不是非常重要的数据,不建议使用强一致性方案。

Cache Aside

Wirte Through

Read Though

Wirte Behind

【高级】多级缓存架构如何设计?

【中级】什么是缓存穿透?如何应对?

【中级】什么是缓存击穿?如何应对?

【中级】什么是缓存雪崩?如何应对?

【中级】什么是缓存预热?如何预热?

读写分离

【基础】什么是读写分离?为什么需要读写分离?

【中级】如何实现读写分离?

分库分表

【基础】什么是分库分表?为什么需要分库分表?

【高级】如何实现分库分表?

【高级】分库分表后,如何应对扩容和迁移?

《极客时间教程 - 秒杀系统》笔记

开篇词丨秒杀系统架构设计都有哪些关键点?

秒杀的整体架构可以概括为“稳、准、快”几个关键字

  • 稳-高可用 - 服务需要考虑各种容错场景,保证服务可用
  • 准-一致性 - 高并发下的库存数量增减不能出错,避免超卖
  • 快-高性能 - 支持高并发的读写

设计秒杀系统时应该注意的 5 个架构原则

秒杀系统本质上就是一个满足大并发、高性能和高可用的分布式系统。

架构原则:“4 要 1 不要”

  • 数据尽量少
    • 请求及响应的数据量越小,则传输数据量越小,可以显著减少 CPU 和带宽;
    • 依赖数据库的数据越少,数据库压力越小,I/O 耗时越少
  • 请求数尽量少 - 合并 css+js,减少静态资源的请求数
  • 路径尽量短
    • 路径,是指用户发出请求、收到响应的整个过程中,数据经过的节点数。
    • 路径越短,则 I/O 传输耗时越少,也更加可靠。
  • 依赖尽量少 - 依赖,是指要完成一次用户请求必须依赖的系统或者服务。
  • 避免单点
  • 对于应用服务,应设计为无状态,然后以集群模式提供整体服务,以此提高可用性
  • 对于数据库,应通过副本机制+故障转移,来保证可用性。

不同场景下的不同架构案例

(1)请求量级 10w QPS 的架构

架构要点:

  1. 把秒杀系统独立出来单独打造一个系统,这样可以有针对性地做优化
  2. 在系统部署上也独立做一个机器集群,这样秒杀的大流量就不会影响到正常的商品购买集群的机器负载;
  3. 将热点数据(如库存数据)单独放到一个缓存系统中,以提高“读性能”;
  4. 增加秒杀答题,防止有秒杀器抢单。

(1)请求量级 100w QPS 的架构

  1. 对页面进行彻底的动静分离,使得用户秒杀时不需要刷新整个页面,而只需要点击抢宝按钮,借此把页面刷新的数据降到最少;
  2. 在服务端对秒杀商品进行本地缓存,不需要再调用依赖系统的后台服务获取数据,甚至不需要去公共的缓存集群中查询数据,这样不仅可以减少系统调用,而且能够避免压垮公共缓存集群。
  3. 增加系统限流保护,防止最坏情况发生。

小结:架构之道,在于权衡取舍。要取得极致的性能,往往要在通用性、易用性、成本等方面有所牺牲,反之亦然。

如何才能做好动静分离?有哪些方案可选?

何为动静数据

“动态数据”和“静态数据”的主要区别就是看页面中输出的数据是否和 URL、浏览者、时间、地域相关,以及是否含有 Cookie 等私密数据

所谓“动态”还是“静态”,并不是说数据本身是否动静,而是数据中是否含有和访问者相关的个性化数据。更通俗的来说,是不是每个人看到的页面是相同的。

怎样对静态数据做缓存呢?

  • 第一,你应该把静态数据缓存到离用户最近的地方。常见技术:CDN、Cookie、服务器缓存
  • 第二,静态化改造就是要直接缓存 HTTP 连接。例如:Nginx 静态缓存
  • 第三,让谁来缓存静态数据也很重要。Web 服务器(如 Nginx、Apache、Varnish)更擅长处理大并发的静态文件请求。

如何做动静分离的改造

  1. URL 唯一化。商品详情系统天然地就可以做到 URL 唯一化,比如每个商品都由 ID 来标识,那么 http://item.xxx.com/item.htm?id=xxxx 就可以作为唯一的 URL 标识。为啥要 URL 唯一呢?前面说了我们是要缓存整个 HTTP 连接,那么以什么作为 Key 呢?就以 URL 作为缓存的 Key,例如以 id=xxx 这个格式进行区分。
  2. 分离浏览者相关的因素。浏览者相关的因素包括身份、认证信息等。这部分少量数据可以通过动态请求来获取。
  3. 分离时间因素。服务端输出的时间也通过动态请求获取。
  4. 异步化地域因素。详情页面上与地域相关的因素做成异步方式获取,当然你也可以通过动态请求方式获取,只是这里通过异步获取更合适。
  5. 去掉 Cookie。服务端输出的页面包含的 Cookie 可以通过代码软件来删除,如 Web 服务器 Varnish 可以通过 unset req.http.cookie 命令去掉 Cookie。注意,这里说的去掉 Cookie 并不是用户端收到的页面就不含 Cookie 了,而是说,在缓存的静态数据中不含有 Cookie。

分离出动态内容之后,如何组织这些内容页就变得非常关键了。动态内容的处理通常有两种方案:

  1. ESI 方案(或者 SSI):即在 Web 代理服务器上做动态内容请求,并将请求插入到静态页面中,当用户拿到页面时已经是一个完整的页面了。这种方式对服务端性能有些影响,但是用户体验较好。
  2. CSI 方案。即单独发起一个异步 JavaScript 请求,以向服务端获取动态内容。这种方式服务端性能更佳,但是用户端页面可能会延时,体验稍差。

动静分离的几种架构方案

方案 1:实体机单机部署

这种方案是将虚拟机改为实体机,以增大 Cache 的容量,并且采用了一致性 Hash 分组的方式来提升命中率。这里将 Cache 分成若干组,是希望能达到命中率和访问热点的平衡。Hash 分组越少,缓存的命中率肯定就会越高,但短板是也会使单个商品集中在一个分组中,容易导致 Cache 被击穿,所以我们应该适当增加多个相同的分组,来平衡访问热点和命中率的问题。

实体机单机部署有以下几个优点:

  1. 没有网络瓶颈,而且能使用大内存;
  2. 既能提升命中率,又能减少 Gzip 压缩;
  3. 减少 Cache 失效压力,因为采用定时失效方式,例如只缓存 3 秒钟,过期即自动失效。

缺点:

  • 一定程度上也造成了 CPU 的浪费,因为单个的 Java 进程很难用完整个实体机的 CPU。
  • 一个实体机上部署了 Java 应用又作为 Cache 来使用,这造成了运维上的高复杂度。

方案 2:统一 Cache 层

所谓统一 Cache 层,就是将单机的 Cache 统一分离出来,形成一个单独的 Cache 集群。

优点:

  1. 应用无需单独维护 Cache
  2. 运维简单
  3. 可以共享内存,最大化利用内存

缺点:

  1. Cache 层内部交换网络成为瓶颈;
  2. 缓存服务器的网卡也会是瓶颈;
  3. 机器少风险较大,挂掉一台就会影响很大一部分缓存数据。

方案 3:CDN

动静分离后,缓存如果前置到 CDN,由于离用户更近,因此访问更快。

CDN 方案有以下问题:

  1. 失效问题。需要考虑如果让 CDN 分布在全国各地的 Cache 在秒级时间内失效。
  2. 命中率问题。如果将数据全部放到全国的 CDN 上,必然导致 Cache 分散,而 Cache 分散又会导致访问请求命中同一个 Cache 的可能性降低,那么命中率就成为一个问题。
  3. 发布更新问题。若业务迭代快速,则发布系统必须足够简洁高效

将商品详情系统放到全国的所有 CDN 节点上是不太现实的,因为存在失效问题、命中率问题以及系统的发布更新问题。那么是否可以选择若干个节点来尝试实施呢?答案是“可以”,但是这样的节点需要满足几个条件:

  1. 靠近访问量比较集中的地区;
  2. 离主站相对较远;
  3. 节点到主站间的网络比较好,而且稳定;
  4. 节点容量比较大,不会占用其他 CDN 太多的资源。

最后,还有一点也很重要,那就是:节点不要太多。

基于上面几个因素,选择 CDN 的二级 Cache 比较合适,因为二级 Cache 数量偏少,容量也更大,让用户的请求先回源的 CDN 的二级 Cache 中,如果没命中再回源站获取数据,部署方式如下图所示:

二八原则:有针对性地处理好系统的“热点数据”

所谓“静态热点数据”,就是能够提前预测的热点数据。例如,我们可以通过卖家报名的方式提前筛选出来,通过报名系统对这些热点商品进行打标。另外,我们还可以通过大数据分析来提前发现热点商品,比如我们分析历史成交记录、用户的购物车记录,来发现哪些商品可能更热门、更好卖,这些都是可以提前分析出来的热点。

所谓“动态热点数据”,就是不能被提前预测到的,系统在运行过程中临时产生的热点。例如,卖家在抖音上做了广告,然后商品一下就火了,导致它在短时间内被大量购买。

发现热点数据

动态热点发现系统的具体实现。

  1. 构建一个异步的系统,它可以收集交易链路上各个环节中的中间件产品的热点 Key,如 Nginx、缓存、RPC 服务框架等这些中间件(一些中间件产品本身已经有热点统计模块)。
  2. 建立一个热点上报和可以按照需求订阅的热点服务的下发规范,主要目的是通过交易链路上各个系统(包括详情、购物车、交易、优惠、库存、物流等)访问的时间差,把上游已经发现的热点透传给下游系统,提前做好保护。比如,对于大促高峰期,详情系统是最早知道的,在统一接入层上 Nginx 模块统计的热点 URL。
  3. 将上游系统收集的热点数据发送到热点服务台,然后下游系统(如交易系统)就会知道哪些商品会被频繁调用,然后做热点保护。

这里我给出了一个图,其中用户访问商品时经过的路径有很多,我们主要是依赖前面的导购页面(包括首页、搜索页面、商品详情、购物车等)提前识别哪些商品的访问量高,通过这些系统中的中间件来收集热点数据,并记录到日志中。

处理热点数据

处理热点数据通常有几种思路:一是优化,二是限制,三是隔离

具体到“秒杀”业务,我们可以在以下几个层次实现隔离。

  1. 业务隔离。把秒杀做成一种营销活动,卖家要参加秒杀这种营销活动需要单独报名,从技术上来说,卖家报名后对我们来说就有了已知热点,因此可以提前做好预热。
  2. 系统隔离。系统隔离更多的是运行时的隔离,可以通过分组部署的方式和另外 99%分开。秒杀可以申请单独的域名,目的也是让请求落到不同的集群中。
  3. 数据隔离。秒杀所调用的数据大部分都是热点数据,比如会启用单独的 Cache 集群或者 MySQL 数据库来放热点数据,目的也是不想 0.01%的数据有机会影响 99.99%数据。

流量削峰这事应该怎么做?

流量削峰的思路:排队、答题、分层过滤

排队 - 使用 MQ 削峰、解耦

适用于内部上下游系统之间调用请求不平缓的场景,由于内部系统的服务质量要求不能随意丢弃请求,所以使用消息队列能起到很好的削峰和缓冲作用。

答题 - 延缓请求、限制秒杀器

适用于秒杀或者营销活动等应用场景,在请求发起端就控制发起请求的速度,因为越到后面无效请求也会越多,所以配合后面介绍的分层拦截的方式,可以更进一步减少无效请求对系统资源的消耗。

分层过滤 - 请求分别经过 CDN、前台读系统(如商品详情系统)、后台系统(如交易系统)和数据库这几层分层过滤。

分层过滤非常适合交易性的写请求,比如减库存或者拼车这种场景,在读的时候需要知道还有没有库存或者是否还有剩余空座位。但是由于库存和座位又是不停变化的,所以读的数据是否一定要非常准确呢?其实不一定,你可以放一些请求过去,然后在真正减的时候再做强一致性保证,这样既过滤一些请求又解决了强一致性读的瓶颈。

分层校验的基本原则是:

  1. 将动态请求的读数据缓存(Cache)在 Web 端,过滤掉无效的数据读;
  2. 对读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题;
  3. 对写数据进行基于时间的合理分片,过滤掉过期的失效请求;
  4. 对写请求做限流保护,将超出系统承载能力的请求过滤掉;
  5. 对写数据进行强一致性校验,只保留最后有效的数据。

影响性能的因素有哪些?又该如何提高系统的性能?

  • 影响性能的因素:响应时间、线程数
  • 如何发现瓶颈
    • 瓶颈点:CPU、内存、磁盘、带宽
    • 针对 CPU 而言,可以使用 CPU 相关工具:JProfile、Yourkit、jstack,此外,还可以使用链路追踪进行链路分析
  • 如何优化系统:编码、序列化、压缩、传输方式(NIO)、并发

秒杀系统“减库存”设计的核心逻辑

减库存的一般方式:

  • 下单减库存:不会出现超卖;不能应对下单不付款的情况
  • 付款减库存:高并发下,可能出现超卖——下单后无法付款的情况(库存已经清空)
  • 预扣库存:买家下单后,库存为其保留一定的时间(如 10 分钟),超过这个时间,库存将会自动释放,释放后其他买家就可以继续购买。在买家付款前,系统会校验该订单的库存是否还有保留:如果没有保留,则再次尝试预扣;如果库存不足(也就是预扣失败)则不允许继续付款;如果预扣成功,则完成付款并实际地减去库存。

针对秒杀场景,一般“抢到就是赚到”,所以成功下单后却不付款的情况比较少,再加上卖家对秒杀商品的库存有严格限制,所以秒杀商品采用“下单减库存”更加合理。另外,理论上,“下单减库存”比“预扣库存”以及涉及第三方支付的“付款减库存”在逻辑上更为简单,所以性能上更占优势。

“下单减库存”在数据一致性上,主要就是保证大并发请求时库存数据不能为负数,也就是要保证数据库中的库存字段值不能为负数,一般我们有多种解决方案:一种是在应用程序中通过事务来判断,即保证减后库存不能为负数,否则就回滚;另一种办法是直接设置数据库的字段数据为无符号整数,这样减后库存字段值小于零时会直接执行 SQL 语句来报错;再有一种就是使用 CASE WHEN 判断语句,例如这样的 SQL 语句:

1
UPDATE item SET inventory = CASE WHEN inventory >= xxx THEN inventory-xxx ELSE inventory END

准备 Plan B:如何设计兜底方案

高可用系统建设:

  1. 设计阶段:考虑系统的可扩展性和容错性。避免单点问题,采用多活方案(多机房部署)。
  2. 编码阶段:保证代码的健壮性。识别边界,捕获、处理异常;设置合适的超时机制。
  3. 测试阶段:测试用例覆盖度尽量全面。
  4. 发布阶段:自动化发布,支持灰度发布、回滚。
  5. 运行阶段:健全监控机制:日志、指标、链路监控
  6. 故障发生:容错处理、故障恢复、故障演练

降级

“降级”,就是当系统的容量达到一定程度时,限制或者关闭系统的某些非核心功能,从而把有限的资源保留给更核心的业务。

限流

限流就是当系统容量达到瓶颈时,我们需要通过限制一部分流量来保护系统,并做到既可以人工执行开关,也支持自动化保护的措施。

拒绝服务

过载保护 - 当系统负载达到一定阈值时,例如 CPU 使用率达到 90%或者系统 load 值达到 2*CPU 核数时,系统直接拒绝所有请求,这种方式是最暴力但也最有效的系统保护方式。

拒绝服务可以说是一种不得已的兜底方案,用以防止最坏情况发生,防止因把服务器压跨而长时间彻底无法提供服务。像这种系统过载保护虽然在过载时无法提供服务,但是系统仍然可以运作,当负载下降时又很容易恢复,所以每个系统和每个环节都应该设置这个兜底方案,对系统做最坏情况下的保护。

高可用建设需要长期规划并进行体系化建设,要在预防(建立常态的压力体系,例如上线前的单机压测到上线后的全链路压测)、管控(做好线上运行时的降级、限流和兜底保护)、监控(建立性能基线来记录性能的变化趋势以及线上机器的负载报警体系,发现问题及时预警)和恢复体系(遇到故障要及时止损,并提供快速的数据订正工具等)等这些地方加强建设。