MySQL 简介
MySQL 简介
::: info 概述
MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL 是最好的 RDBMS 应用软件之一。
本文简单介绍了 MySQL 的功能、特性、发行版本、简史、概念,可以让读者在短时间内对于 MySQL 有一个初步的认识。
:::
:::details 要点
HBase 是一个构建在 HDFS(Hadoop 文件系统)之上的列式数据库。
HBase 是一种类似于 Google’s Big Table
的数据模型,它是 Hadoop 生态系统的一部分,它将数据存储在 HDFS 上,客户端可以通过 HBase 实现对 HDFS 上数据的随机访问。
HBase 的核心特性如下:
HBase 的其他特性
:::
:::details 要点
在 HBase 诞生之前,Hadoop 可以通过 HDFS 来存储结构化、半结构甚至非结构化的数据,它是传统数据库的补充,是海量数据存储的最佳方法,它针对大文件的存储,批量访问和流式访问都做了优化,同时也通过多副本解决了容灾问题。
Hadoop 的缺陷在于:它只能执行批处理,并且只能以顺序方式访问数据。这意味着即使是最简单的工作,也必须搜索整个数据集,即:Hadoop 无法实现对数据的随机访问。实现数据的随机访问是传统的关系型数据库所擅长的,但它们却不能用于海量数据的存储。在这种情况下,必须有一种新的方案来同时解决海量数据存储和随机访问的问题,HBase 就是其中之一 (HBase,Cassandra,CouchDB,Dynamo 和 MongoDB 都能存储海量数据并支持随机访问)。
注:数据结构分类:
- 结构化数据:即以关系型数据库表形式管理的数据;
- 半结构化数据:非关系模型的,有基本固定结构模式的数据,例如日志文件、XML 文档、JSON 文档、Email 等;
- 非结构化数据:没有固定模式的数据,如 WORD、PDF、PPT、EXL,各种格式的图片、视频等。
:::
:::details 要点
根据上一节对于 HBase 特性的介绍,我们可以梳理出 HBase 适用、不适用的场景:
HBase 不适用场景:
HBase 适用场景:
一言以蔽之——HBase 适用的场景是:实时地随机访问超大数据集。
HBase 的典型应用场景
:::
:::details 要点
HBase 和 RDBMS 的不同之处如下:
RDBMS | HBase |
---|---|
RDBMS 有它的模式,描述表的整体结构的约束 | HBase 无模式,它不具有固定列模式的概念;仅定义列族 |
支持的文件系统有 FAT、NTFS 和 EXT | 支持的文件系统只有 HDFS |
使用提交日志来存储日志 | 使用预写日志 (WAL) 来存储日志 |
使用特定的协调系统来协调集群 | 使用 ZooKeeper 来协调集群 |
存储的都是中小规模的数据表 | 存储的是超大规模的数据表,并且适合存储宽表 |
通常支持复杂的事务 | 仅支持行级事务 |
适用于结构化数据 | 适用于半结构化、结构化数据 |
使用主键 | 使用 row key |
:::
:::details 要点
HBase 和 HDFS 的不同之处如下:
HDFS | HBase |
---|---|
HDFS 提供了一个用于分布式存储的文件系统。 | HBase 提供面向表格列的数据存储。 |
HDFS 为大文件提供优化存储。 | HBase 为表格数据提供了优化。 |
HDFS 使用块文件。 | HBase 使用键值对数据。 |
HDFS 数据模型不灵活。 | HBase 提供了一个灵活的数据模型。 |
HDFS 使用文件系统和处理框架。 | HBase 使用带有内置 Hadoop MapReduce 支持的表格存储。 |
HDFS 主要针对一次写入多次读取进行了优化。 | HBase 针对读/写许多进行了优化。 |
:::
:::details 要点
行式数据库和列式数据库的不同之处如下:
行式数据库 | 列式数据库 |
---|---|
对于添加/修改操作更高效 | 对于读取操作更高效 |
读取整行数据 | 仅读取必要的列数据 |
最适合在线事务处理系统(OLTP) | 不适合在线事务处理系统(OLTP) |
将行数据存储在连续的页内存中 | 将列数据存储在非连续的页内存中 |
列式数据库的优点:
列式数据库的缺点:
:::
:::details 要点
Hbase 的表具有以下特点:
:::
:::details 要点
HBase 是一个面向 列
的数据库管理系统,这里更为确切的而说,HBase 是一个面向 列族
的数据库管理系统。表 schema 仅定义列族,表具有多个列族,每个列族可以包含任意数量的列,列由多个单元格(cell)组成,单元格可以存储多个版本的数据,多个版本数据以时间戳进行区分。
HBase 数据模型和关系型数据库有所不同。其数据模型的关键术语如下:
Table
**:Table 由 Row 和 Column 组成。Row
**:Row 是列族(Column Family)的集合。Row Key
:Row Key
是用来检索记录的主键。Row Key
是未解释的字节数组,所以理论上,任何数据都可以通过序列化表示成字符串或二进制,从而存为 HBase 的键值。Row Key
的字典序进行排序。这里需要注意以下两点:Row Key
进行访问;Row Key
的 range 进行访问,即访问指定范围内的行;Column Family
**:即列族。HBase 表中的每个列,都归属于某个列族。列族是表的 Schema 的一部分,所以列族需要在创建表时进行定义。info:format
,info:geo
都属于 info
这个列族。Column Qualifier
**:列限定符。可以理解为是具体的列名,例如 info:format
,info:geo
都属于 info
这个列族,它们的列限定符分别是 format
和 geo
。列族和列限定符之间始终以冒号分隔。需要注意的是列限定符不是表 Schema 的一部分,你可以在插入数据的过程中动态创建列。Column
**:HBase 中的列由列族和列限定符组成,由 :
(冒号) 进行分隔,即一个完整的列名应该表述为 列族名 :列限定符
。Cell
**:Cell
是行,列族和列限定符的组合,并包含值和时间戳。HBase 中通过 row key
和 column
确定的为一个存储单元称为 Cell
,你可以等价理解为关系型数据库中由指定行和指定列确定的一个单元格,但不同的是 HBase 中的一个单元格是由多个版本的数据组成的,每个版本的数据用时间戳进行区分。Cell
由行和列的坐标交叉决定,是有版本的。默认情况下,版本号是自动分配的,为 HBase 插入 Cell
时的时间戳。Cell
的内容是未解释的字节数组。Timestamp
**:Cell
的版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase 在数据写入时自动赋值,也可以由客户显式指定。每个 Cell
中,不同版本的数据按照时间戳倒序排列,即最新的数据排在最前面。下图为 HBase 中一张表的:
图片引用自 : HBase 是列式存储数据库吗 https://www.iteblog.com/archives/2498.html
:::
:::details 要点
HBase Table 中的所有行按照 Row Key
的字典序排列。HBase Tables 通过行键的范围 (row key range) 被水平切分成多个 Region
, 一个 Region
包含了在 start key 和 end key 之间的所有行。
每个表一开始只有一个 Region
,随着数据不断增加,Region
会不断增大,当增大到一个阀值的时候,Region
就会等分为两个新的 Region
。当 Table 中的行不断增多,就会有越来越多的 Region
。
Region
是 HBase 中分布式存储和负载均衡的最小单元。这意味着不同的 Region
可以分布在不同的 Region Server
上。但一个 Region
是不会拆分到多个 Server 上的。
:::
:::details 要点
以下是客户端首次读写 HBase 上数据的流程:
META
表所在的 Region Server;META
表所在的 Region Server,从 META
表中查询到访问行键所在的 Region Server,之后客户端将缓存这些信息以及 META
表的位置;如果再次读取,客户端将从缓存中获取行键所在的 Region Server。这样客户端就不需要再次查询 META
表,除非 Region 移动导致缓存失效,这样的话,则将会重新查询并更新缓存。
注:META
表是 HBase 中一张特殊的表,它保存了所有 Region 的位置信息,META 表自己的位置信息则存储在 ZooKeeper 上。
更为详细读取数据流程参考:
:::
:::details 要点
更为详细写入流程可以参考:HBase - 数据写入流程解析
:::
:::details 要点
HBase 系统遵循 Master/Salve 架构,由三种不同类型的组件组成:
HBase 使用 ZooKeeper 作为分布式协调服务来维护集群中的服务器状态。 Zookeeper 负责维护可用服务列表,并提供服务故障通知等服务:
:::
:::details 要点
Apache Hive 是一种分布式、容错数据仓库,支持大规模分析。Hive Metastore (HMS) 提供了一个元数据的中央存储库,可以轻松分析以做出明智的数据驱动决策,因此它是许多数据湖架构的关键组件。Hive 构建在 Apache Hadoop 之上,并通过 hdfs 支持在 S3、adls、gs 等上进行存储。Hive 允许用户使用 SQL 读取、写入和管理 PB 级数据。
Hive 可以将结构化的数据文件映射成表,并提供类 SQL 查询功能。用于查询的 SQL 语句会被转化为 MapReduce 作业,然后提交到 Hadoop 上运行。
特点:
:::
:::details 要点
Hive Metastore (HMS) 是关系数据库中 Hive 表和分区元数据的中央存储库,它使用元存储服务 API 为客户端(包括 Hive、Impala 和 Spark)提供对此信息的访问。它已成为利用各种开源软件(如 Apache Spark 和 Presto)的数据湖的构建块。事实上,整个工具生态系统,无论是开源的还是其他的,都是围绕 Hive Metastore 构建的,下图说明了其中一些。
:::
:::details 要点
Hive 表中的列支持以下基本数据类型:
大类 | 类型 |
---|---|
Integers(整型) | TINYINT—1 字节的有符号整数 SMALLINT—2 字节的有符号整数 INT—4 字节的有符号整数 BIGINT—8 字节的有符号整数 |
Boolean(布尔型) | BOOLEAN—TRUE/FALSE |
Floating point numbers(浮点型) | FLOAT— 单精度浮点型 DOUBLE—双精度浮点型 |
Fixed point numbers(定点数) | DECIMAL—用户自定义精度定点数,比如 DECIMAL(7,2) |
String types(字符串) | STRING—指定字符集的字符序列 VARCHAR—具有最大长度限制的字符序列 CHAR—固定长度的字符序列 |
Date and time types(日期时间类型) | TIMESTAMP — 时间戳 TIMESTAMP WITH LOCAL TIME ZONE — 时间戳,纳秒精度 DATE—日期类型 |
Binary types(二进制类型) | BINARY—字节序列 |
TIMESTAMP 和 TIMESTAMP WITH LOCAL TIME ZONE 的区别如下:
- TIMESTAMP WITH LOCAL TIME ZONE:用户提交时间给数据库时,会被转换成数据库所在的时区来保存。查询时则按照查询客户端的不同,转换为查询客户端所在时区的时间。
- TIMESTAMP :提交什么时间就保存什么时间,查询时也不做任何转换。
此外,Hive 还支持以下复杂类型:
类型 | 描述 | 示例 |
---|---|---|
STRUCT | 类似于对象,是字段的集合,字段的类型可以不同,可以使用 名称。字段名 方式进行访问 |
STRUCT (‘xiaoming’, 12 , ‘2018-12-12’) |
MAP | 键值对的集合,可以使用 名称 [key] 的方式访问对应的值 |
map(‘a’, 1, ‘b’, 2) |
ARRAY | 数组是一组具有相同类型和名称的变量的集合,可以使用 名称 [index] 访问对应的值 |
ARRAY(‘a’, ‘b’, ‘c’, ‘d’) |
:::
:::details 要点
Hive 会在 HDFS 为每个数据库上创建一个目录,数据库中的表是该目录的子目录,表中的数据会以文件的形式存储在对应的表目录下。Hive 支持以下几种文件存储格式:
格式 | 说明 |
---|---|
TextFile | 存储为纯文本文件。 这是 Hive 默认的文件存储格式。这种存储方式数据不做压缩,磁盘开销大,数据解析开销大。 |
SequenceFile | SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。 |
RCFile | RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。 |
ORC Files | ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。 |
Avro Files | Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。 |
Parquet | Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。 |
以上压缩格式中 ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用这两种格式。
通常在创建表的时候使用 STORED AS
参数指定:
1 | CREATE TABLE page_view(viewTime INT, userid BIGINT) |
各个存储文件类型指定方式如下:
:::
:::details 要点
内部表又叫做管理表 (Managed/Internal Table),创建表时不做任何指定,默认创建的就是内部表。想要创建外部表 (External Table),则需要使用 External 进行修饰。 内部表和外部表主要区别如下:
内部表 | 外部表 | |
---|---|---|
数据存储位置 | 内部表数据存储的位置由 hive.metastore.warehouse.dir 参数指定,默认情况下表的数据存储在 HDFS 的 /user/hive/warehouse/数据库名。db/表名/ 目录下 |
外部表数据的存储位置创建表时由 Location 参数指定; |
导入数据 | 在导入数据到内部表,内部表将数据移动到自己的数据仓库目录下,数据的生命周期由 Hive 来进行管理 | 外部表不会将数据移动到自己的数据仓库目录下,只是在元数据中存储了数据的位置 |
删除表 | 删除元数据(metadata)和文件 | 只删除元数据(metadata) |
:::
:::details 要点
Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。
分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where
子句中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能。
分区表并非 Hive 独有的概念,实际上这个概念非常常见。通常,在管理大规模数据集的时候都需要进行分区,比如将日志文件按天进行分区,从而保证数据细粒度的划分,使得查询性能得到提升。比如,在我们常用的 Oracle 数据库中,当表中的数据量不断增大,查询数据的速度就会下降,这时也可以对表进行分区。表进行分区后,逻辑上表仍然是一张完整的表,只是将表中的数据存放到多个表空间(物理文件上),这样查询数据时,就不必要每次都扫描整张表,从而提升查询性能。
在 Hive 中可以使用 PARTITIONED BY
子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:
1 | CREATE EXTERNAL TABLE emp_partition( |
加载数据到分区表时候必须要指定数据所处的分区:
1 | # 加载部门编号为 20 的数据到表中 |
这时候我们直接查看表目录,可以看到表目录下存在两个子目录,分别是 deptno=20
和 deptno=30
, 这就是分区目录,分区目录下才是我们加载的数据文件。
1 | # hadoop fs -ls hdfs://hadoop001:8020/hive/emp_partition/ |
这时候当你的查询语句的 where
包含 deptno=20
,则就去对应的分区目录下进行查找,而不用扫描全表。
:::
:::details 要点
分区提供了一个隔离数据和优化查询的可行方案,但是并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。同时 Hive 会限制动态分区可以创建的最大分区数,用来避免过多分区文件对文件系统产生负担。鉴于以上原因,Hive 还提供了一种更加细粒度的数据拆分方案:分桶表 (bucket Table)。
分桶表会将指定列的值进行哈希散列,并对 bucket(桶数量)取余,然后存储到对应的 bucket(桶)中。
单从概念上理解分桶表可能会比较晦涩,其实和分区一样,分桶这个概念同样不是 Hive 独有的,对于 Java 开发人员而言,这可能是一个每天都会用到的概念,因为 Hive 中的分桶概念和 Java 数据结构中的 HashMap 的分桶概念是一致的。
当调用 HashMap 的 put() 方法存储数据时,程序会先对 key 值调用 hashCode() 方法计算出 hashcode,然后对数组长度取模计算出 index,最后将数据存储在数组 index 位置的链表上,链表达到一定阈值后会转换为红黑树 (JDK1.8+)。下图为 HashMap 的数据结构图:
图片引用自:HashMap vs. Hashtable
在 Hive 中,我们可以通过 CLUSTERED BY
指定分桶列,并通过 SORTED BY
指定桶中数据的排序参考列。下面为分桶表建表语句示例:
1 | CREATE EXTERNAL TABLE emp_bucket( |
这里直接使用 Load
语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。
这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:
(1)设置强制分桶
1 | set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步 |
在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true
,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。
(2)CTAS 导入数据
1 | INSERT INTO TABLE emp_bucket SELECT * FROM emp; --这里的 emp 表就是一张普通的雇员表 |
可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:
查看分桶文件
bucket(桶) 本质上就是表目录下的具体文件:
:::
:::details 要点
分区表和分桶表的本质都是将数据按照不同粒度进行拆分,从而使得在查询时候不必扫描全表,只需要扫描对应的分区或分桶,从而提升查询效率。两者可以结合起来使用,从而保证表数据在不同粒度上都能得到合理的拆分。下面是 Hive 官方给出的示例:
1 | CREATE TABLE page_view_bucketed( |
此时导入数据时需要指定分区:
1 | INSERT OVERWRITE page_view_bucketed |
:::
:::details 要点
Hive 在 0.7.0 引入了索引的功能,索引的设计目标是提高表某些列的查询速度。如果没有索引,带有谓词的查询(如’WHERE table1.column = 10’)会加载整个表或分区并处理所有行。但是如果 column 存在索引,则只需要加载和处理文件的一部分。
在指定列上建立索引,会产生一张索引表(表结构如下),里面的字段包括:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。在查询涉及到索引字段时,首先到索引表查找索引列值对应的 HDFS 文件路径及偏移量,这样就避免了全表扫描。
1 | +--------------+----------------+----------+--+ |
创建索引:
1 | CREATE INDEX index_name --索引名称 |
查看索引:
1 | --显示表上所有列的索引 |
删除索引:
删除索引会删除对应的索引表。
1 | DROP INDEX [IF EXISTS] index_name ON table_name; |
如果存在索引的表被删除了,其对应的索引和索引表都会被删除。如果被索引表的某个分区被删除了,那么分区对应的分区索引也会被删除。
重建索引:
1 | ALTER INDEX index_name ON table_name [PARTITION partition_spec] REBUILD; |
重建索引。如果指定了 PARTITION
,则仅重建该分区的索引。
:::
:::details 要点
索引表最主要的一个缺陷在于:索引表无法自动 rebuild,这也就意味着如果表中有数据新增或删除,则必须手动 rebuild,重新执行 MapReduce 作业,生成索引表数据。
同时按照 官方文档 的说明,Hive 会从 3.0 开始移除索引功能,主要基于以下两个原因:
ORC 内置的索引功能可以参阅这篇文章:Hive 性能优化之 ORC 索引–Row Group Index vs Bloom Filter Index
:::
:::details 要点
Hive 在执行一条 HQL 的时候,会经过以下步骤:
关于 Hive SQL 的详细执行流程可以参考美团技术团队的文章:Hive SQL 的编译过程
:::
题目 | 难度 | 状态 |
---|---|---|
1. 两数之和 | 简单 | 通过 |
167. 两数之和 II - 输入有序数组 | 中等 | 通过 |
剑指 Offer II 006. 排序数组中两个数字之和 | 简单 | 通过 |
剑指 Offer 57. 和为 s 的两个数字 | 简单 | 通过 |
136. 只出现一次的数字 | 简单 | 通过 |
217. 存在重复元素 | 简单 | 通过 |
2073. 买票需要的时间 | 简单 | 通过 |
26. 删除有序数组中的重复项 | 简单 | 未通过 |
27. 移除元素 | ||
283. 移动零 | ||
344. 反转字符串 | ||
5. 最长回文子串 | ||
263. 丑数 | 简单 | 未通过 |
264. 丑数 II | 中等 | 未通过 |
1201. 丑数 III | 中等 | 未通过 |
313. 超级丑数 | 中等 | 未通过 |
373. 查找和最小的 K 对数字 | ||
题目 | 难度 | 通关 |
---|---|---|
19. 删除链表的倒数第 N 个结点 | 中等 | 通过 |
21. 合并两个有序链表 | 简单 | 通过 |
23. 合并 K 个升序链表 | 困难 | 通过 |
83. 删除排序链表中的重复元素 | 简单 | 通过 |
82. 删除排序链表中的重复元素 II | 中等 | 半通过 |
86. 分隔链表 | 简单 | 通过 |
876. 链表的中间结点 | 简单 | 通过 |
剑指 Offer 22. 链表中倒数第 k 个节点 | 简单 | 通过 |
141. 环形链表 | 简单 | 通过 |
142. 环形链表 II | 中等 | 未通过 |
160. 相交链表 | 简单 | 半通过 |
1836. 从未排序的链表中移除重复元素 | 中等 | 半通过 |
题目 | 难度 | |
---|---|---|
20. 有效的括号 | 简单 | |
225. 用队列实现栈 | 简单 | 通过 |
232. 用栈实现队列 | 简单 | 通过 |
LeetCode | 力扣 | 难度 |
---|---|---|
234. Palindrome Linked List | 234. 回文链表 | 🟢 |
206. Reverse Linked List | 206. 反转链表 | 🟢 |
92. Reverse Linked List II | 92. 反转链表 II | 🟠 |
25. Reverse Nodes in k-Group | 25. K 个一组翻转链表 | 🔴 |
LeetCode | 力扣 | 难度 |
---|---|---|
55. Jump Game | 55. 跳跃游戏 | 🟠 |
45. Jump Game II | 45. 跳跃游戏 II | 🟠 |
134. Gas Station | 134. 加油站 | 🟠 |
LeetCode | 力扣 | 难度 |
---|---|---|
23. Merge k Sorted Lists | 23. 合并K个升序链表 | 🔴 |
241. Different Ways to Add Parentheses | 241. 为运算表达式设计优先级 | 🟠 |
消息引擎系统的作用:
设计消息引擎系统的关键点:
消息引擎的作用:
Kafka 术语:
Kafka 的三层消息架构:
Kafka 在设计之初就旨在提供三个方面的特性:
作为流处理平台,Kafka 与其他主流大数据流式计算框架相比,优势在哪里呢?
Kafka 有以下重大版本:
系统
在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
磁盘
使用机械磁盘完全能够胜任 Kafka 线上环境。
磁盘容量
假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?
我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于1 亿 * 1KB * 2 / 1000 / 1000 = 200GB
。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10%的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB * 14
,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 * 3 = 2.25TB
。
总之在规划磁盘容量时你需要考虑下面这几个元素:
带宽
通常使用的都是普通的以太网络,带宽也主要有两种:1Gbps 的千兆网络和 10Gbps 的万兆网络。
假设你公司的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
让我们来计算一下,由于带宽是 1Gbps,即每秒处理 1Gb 的数据,假设每台 Kafka 服务器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混部其他服务,毕竟真实环境中不建议这么做。通常情况下你只能假设 Kafka 会用到 70%的带宽资源,因为总要为其他应用或进程留一些资源。
根据实际使用经验,超过 70%的阈值就有网络丢包的可能性了,故 70%的设定是一个比较合理的值,也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。
稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少此值。
好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb 的数据,除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。
与存储信息相关的参数
log.dirs
:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。log.dir
:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。只要设置log.dirs
,即第一个参数就好了,不要设置log.dir
。而且更重要的是,在线上生产环境中一定要为log.dirs
配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3
这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
与 ZooKeeper 相关的参数
zookeeper.connect
。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181
。2181 是 ZooKeeper 的默认端口。
如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect
参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1
和zk1:2181,zk2:2181,zk3:2181/kafka2
。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3
,这样的格式是不对的。
与 Broker 连接相关的参数
listeners
:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。advertised.listeners
:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。host.name/port
:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。关于 Topic 管理的参数
auto.create.topics.enable
:是否允许自动创建 Topic。unclean.leader.election.enable
:是否允许 Unclean Leader 选举。auto.leader.rebalance.enable
:是否允许定期进行 Leader 选举。关于数据留存的参数
log.retention.{hours|minutes|ms}
:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。log.retention.bytes
:这是指定 Broker 为消息保存的总磁盘容量大小。message.max.bytes
:控制 Broker 能够接收的最大消息大小。Topic 级别参数
retention.ms
:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。retention.bytes
:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。JVM 参数
KAFKA_HEAP_OPTS
:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。操作系统参数
ulimit -n 1000000
。Kafka 的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
分区是实现负载均衡以及高吞吐量的关键。
所谓分区策略,就是决定生产者将消息发送到哪个分区的算法。Kafka 提供了默认的分区策略,同时也支持自定义分区策略。
压缩秉承了用时间去换空间的思想。具体来说,就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。
Kafka 压缩、解压流程:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。
让 Broker 重新压缩消息的 2 种例外:Broker 端指定了和 Producer 端不同的压缩算法;Broker 发生了消息格式转换。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
对于 Kafka 而言,它们的性能测试结果却出奇得一致:
LZ4 > Snappy > zstd 和 GZIP
;zstd > LZ4 > GZIP > Snappy
。Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
producer.send(msg)
,而要使用 producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send
方法。retries
为一个较大的值。这里的 retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。acks = all
。acks
是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。unclean.leader.election.enable = false
。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。replication.factor >= 3
。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。min.insync.replicas > 1
。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
。enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。拦截器基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。
Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。指定拦截器类时要指定它们的全限定名。
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
开发客户端时,能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。
对最新版本的 Kafka(2.1.0)而言,Java Producer 端管理 TCP 连接的方式是:
消息可靠性保证有以下几种:
大部分 MQ 都支持 at least once,要实现 exactly once,需要消费方保证,通常是通过幂等性设计来实现。
Kafka 也提供了一些相关的功能:
幂等性 Producer 只能保证单分区上的幂等性,同时也只能实现单会话上的幂等性。
事务型 Producer 能够保证将消息原子性地写入到多个分区中,而且不惧进程的重启。
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
Consumer Group 特性:
Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布/订阅模型。
理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
分区再均衡规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
Rebalance 的触发条件:
Rebalance 的问题:
最好的解决方案就是避免 Rebalance 的发生吧。
consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。
老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。但是,ZooKeeper 其实并不适用于这种高频的写操作。
新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。
虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,不能随意地向这个主题写消息。
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。
Kafka 使用** Compact 策略**来删除位移主题中的过期消息,避免该主题无限期膨胀。
Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。
max.poll.interval.ms 参数值要大于下游最大处理时间。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即** Consumer 需要为分配给它的每个分区提交各自的位移数据**。
位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。
CommitFailedException,就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
CommitFailedException 最常见的场景:当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:
消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:
方案对比:
和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的。
TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
消费者程序会创建 3 类 TCP 连接:
对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。所谓滞后程度,就是指消费者当前落后于生产者的程度。
监控消费者组以及独立消费者程序消费进度的 3 种方法:
副本机制好处:
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。
基于领导者的副本机制
在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。
Kafka 引入了 In-sync Replicas(ISR)机制来明确追随者副本到底在什么条件下才算与 Leader 同步。
ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
Broker 端参数 replica.lag.time.max.ms 用于配置 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。
Kafka 所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。
Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。
Kafka 采用了类 Reactor 架构
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。
当网络线程拿到请求后,将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
Purgatory 是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
重平衡的 3 个触发条件:
消费者端重平衡流程:
Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的**。
(1)选择群主
当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。
(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。
(4)群主分配完成之后,把分配情况发送给群组协调器。
(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。每台 Broker 都能充当控制器,第一个成功创建 /controller
节点的 Broker 会被指定为控制器。
ZooKeeper 是一个提供高可靠性的分布式协调服务框架。ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。Kafka 控制器大量使用 Watch 功能实现对集群的协调管理。
下图展示了 Kafka 在 ZooKeeper 中创建的 znode 分布:
控制器的职责:
控制器保存的数据:
控制器故障转移
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。
水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。
水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
在 Kafka 中,高水位的作用主要有 2 个。
Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。
为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
当生产者给主题分区发送一条消息后,状态变更为:
此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。
Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值=1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
Kafka 提供了自带的 kafka-topics 脚本,用于帮助用户创建主题。
特殊主题:
略
略
略
略
略
略
略
略
略
略
略
略
略
略
Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储。
Kafka 的核心术语如下:
::: tip 关键点
<offset>.log
、.<offset>.index
、<offset>.timeindex
、<offset>.txnindex
):::
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
Kafka 的三层消息架构:
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。
Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。
Log
是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log
对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test
,那么,Kafka 会在磁盘上创建两个子目录:test-0
和 test-1
;而在服务器端,这就对应两个 Log
对象。
因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。
Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除。
Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
Segment 文件可以分为两类:
.index
).timeindex
).txnindex
):如果没有使用 Kafka 事务,则不会创建该文件.log
).index
)存储 offset→position 映射index.interval.bytes
).log
).index
)00000000000000368769.index
)下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。
日志分段结构
如果 Kafka 启用了清理功能(通过 log.cleaner.enabled
配置),每个 Broker 启动清理管理线程 + N 个清理线程(按分区分配)
对于一个段,清理前后的效果如下:
Apache Kafka 清理数据主要通过 日志保留策略(Log Retention) 和 压缩策略(Compaction) 实现,以下是核心要点概括:
基于时间的清理
log.retention.hours
(默认 168 小时/7 天)、log.retention.minutes
、log.retention.ms
。log.retention.check.interval.ms
调整)。基于大小的清理
log.retention.bytes
(整个分区的最大字节数)、log.segment.bytes
(单个日志段大小,默认 1GB)。日志压缩
cleanup.policy=compact
(启用压缩)。min.cleanable.dirty.ratio
(控制压缩触发时机,默认 0.5)。log.cleaner
线程执行)。手动清理
kafka-topics.sh --delete --topic <topic_name>
(需配置delete.topic.enable=true
)。log.dirs
)中的分区文件(需谨慎,可能导致数据不一致)。关键注意事项
cleanup.policy=compact,delete
,压缩优先于时间/大小删除。offsets.retention.minutes
)。::: tip 关键点
:::
Kafka 生产者用一个 ProducerRecord
对象来抽象一条要发送的消息, ProducerRecord
对象需要包含目标主题和要发送的内容,还可以指定键或分区。其发送消息流程如下:
(1)序列化 - 生产者要先把键和值序列化成字节数组,这样它们才能够在网络中传输。
(2)分区 - 数据被传给分区器。如果在 ProducerRecord
中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord
的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。
(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
(4)响应 - 服务器收到消息会返回一个响应。
RecordMetaData
对象,它包含了主题、分区、偏移量;生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
MetadataRequest
),获取到每一个分区对应的 Leader 信息,并缓存到本地。::: tip 关键点
:::
每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。
一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。
Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer。
如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费。
不同消费者群组之间互不影响。
::: tip 关键点
:::
Kafka 消费者通过 pull
模式来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:
customer.poll(time)
中设置等待时间pull
除了获取消息外,还有其他作用:
::: tip 关键点
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
:::
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。
为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。
::: tip 关键点
发送消息,未指定 key 时,选择分区采用轮询方式;指定 key 时,选择分区采用哈希方式,固定发往同一分区
:::
所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。
Kafka 生产者发送消息使用的对象 ProducerRecord
,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。
如果 ProducerRecord
指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。
topic.metadata.refresh.interval.ms
的时间,轮询选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class
。这个参数该怎么设定呢?
首先,要实现 org.apache.kafka.clients.producer.Partitioner
接口。这个接口定义了两个方法:partition
和 close
,通常只需要实现最重要的 partition
方法。我们来看看这个方法的方法签名:
1 | int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); |
这里的 topic
、key
、keyBytes
、value
和 valueBytes
都属于消息数据,cluster
则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
接着,设置 partitioner.class
参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。
负载均衡算法常见的有:
可以根据实际需要去实现。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)。Rebalance 实现了消费者群组的高可用性和伸缩性。
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
当在群组里面新增/移除消费者或者新增/移除 kafka 集群 broker 节点时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
分区再均衡的触发时机有三种:
consumer.close()
操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。consumer.subscribe(Pattern.compile(“t.*c”))
就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的。
(1)选择群主
当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。
(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。
(4)群主分配完成之后,把分配情况发送给群组协调器。
(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。
消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。
当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets
身上。
目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
。
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。
分区再均衡发生的时机有三个:
后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:消费群组成员数量发生变化。
有两种情况,消费者并没有宕机,但也被视为消亡:
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要合理设置会话超时时间。这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。
session.timeout.ms
= 6s。heartbeat.interval.ms
= 2s。session.timeout.ms
>= 3 * heartbeat.interval.ms
。将 session.timeout.ms
设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,**max.poll.interval.ms
** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。
如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。
Kafka 通过分区策略平衡吞吐量、延迟与稳定性。
Kafka 分区分配策略
Range(范围)
RoundRobin(轮询)
Sticky(粘性)
策略选择关键考量因素
在 Kafka 中,优化分区的读写性能主要可以通过以下几种常见的调优策略实现:
log.retention.hours
、log.segment.bytes
、log.flush.interval.messages
等参数,可以显著提升读写性能。batch.size
)、压缩类型(compression.type
)、消费者的最大拉取记录数(max.poll.records
)等。副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。
副本机制有哪些好处?
但是,Kafka 只实现了第一个好处,原因后面会阐述。
Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志。
Kafka 副本有两种角色:
为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。
Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。
除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。
ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。
判断 Follower 是否与 Leader 同步的标准:
Kafka Broker 端参数 replica.lag.time.max.ms
参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable
控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
核心机制
多副本机制
主从架构
ZooKeeper 协调
故障恢复流程
支撑技术
0
、1
、all
),平衡吞吐量与数据可靠性。优先副本选举是 Kafka 维持集群健康的核心机制,通过自动/手动结合的方式,确保 Leader 分布合理,兼顾负载均衡与数据可靠性。
优先副本(Preferred Replica) 是分区初始分配时的第一个副本(如分区P0
的副本分配为[Broker1, Broker2, Broker3]
,则Broker1
是优先副本)。它的作用是通过选举优先副本为 Leader,实现负载均衡,避免部分 Broker 长期承担过多 Leader 角色。
自动配置方法
在 server.properties
中配置以下参数:
1 | # 启用自动优先副本选举 |
生效条件:需重启 Kafka 集群。
手动触发命令
通过脚本强制触发选举:
1 | bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 |
适用场景:紧急负载均衡或维护后恢复预期状态。
核心价值
优势 | 说明 |
---|---|
负载均衡 | 分散 Leader 压力,避免单节点过热。 |
高可用性 | 优先副本通常数据最新,故障切换时恢复更快。 |
性能优化 | 均衡的 Leader 分布可提升整体吞吐量。 |
MirrorMaker 核心功能
快速部署步骤
consumer.config
**:指定源集群地址(bootstrap.servers
)、消费者组(group.id
)。producer.config
**:指定目标集群地址(bootstrap.servers
)。whitelist
**:定义需复制的 Topic(支持正则,如 .*
表示全部)。1 | bin/kafka-mirror-maker.sh \ |
关键特性与注意事项
特性 | 说明 |
---|---|
数据一致性 | 保证消息顺序,但存在延迟(依赖网络带宽和集群负载)。 |
容错性 | 消费者组自动提交偏移量,故障恢复后可继续复制。 |
性能瓶颈 | 单线程设计(1.0 版本),高吞吐场景需横向扩展 MirrorMaker 实例。 |
监控指标 | 关注 consumer-lag 、producer-throughput 等指标。 |
高级配置与优化
num.streams
参数提升并发度(1.0 版本)。--abort.on.send.failure true
确保生产失败时快速终止。替代工具对比
工具 | 适用场景 | 特点 |
---|---|---|
Confluent Replicator | 企业级需求(如 Schema 同步、监控集成)。 | 商业工具,功能全面,支持复杂拓扑。 |
uReplicator | 高可用、低延迟场景(如 LinkedIn 生产)。 | 开源,支持 Controller 层优化,减少延迟。 |
常见问题解决
fetch.min.bytes
(消费者)和 linger.ms
(生产者)。总结
ZooKeeper 在 Kafka 中扮演着核心的协调者角色,主要负责集群的元数据管理、Broker 协调和状态维护。
Zookeeper 仍是 Kafka 2.8 之前版本的”大脑”,承担关键协调职能。KRaft 模式将成为标准架构,2023 年后新版本将默认启用。
Zookeeper 的核心作用
功能 | 说明 |
---|---|
管理 Broker 元数据 | 维护 Broker 注册信息(在线/离线状态);Broker 的 ID、主机名、端口等元数据;Topic/Partition 元数据 |
Controller 选举 | 通过临时节点(Ephemeral ZNode)选举集群唯一 Controller,负责分区 Leader 选举 |
故障恢复 | 监测节点故障并触发分区 Leader 重选举 |
消费者组 Offset | 旧版本(≤0.8)将消费者 Offset 存储在 Zookeeper,新版本改用内部 主题 _consumer_offsets 。 |
配置中心 | 存储 Kafka 配置和拓扑信息 |
Zookeeper 的局限性
去 Zookeeper 化
__cluster_metadata
Topic 中,利用副本机制保证高可用。运维建议
znode
数量、延迟(avg_latency
)、活跃连接数。总结
Kafka 的 Controller 是集群中负责管理各种元数据(如主题创建、分区分配、副本分配等)以及协调领导者选举的关键组件。Controller Failover 是 Kafka 保证高可用性的重要机制。具体来讲,当 Controller 宕机时,Kafka 会通过 Zookeeper 选举出一个新的 Controller,以确保集群可以继续正常运行。
以下是 Kafka Controller Failover 的主要设计和流程:
/controller
)。因为这个节点使用的是 Ephemeral(临时)节点类型,当创建该节点的 Broker 宕机时,这个节点会自动删除。/controller
节点。第一个成功创建这个节点的 Broker 会成为新的 Controller,剩下的则会收到失败通知。Kafka 中的 Controller 是整个集群的协调者,它是专门负责监控和管理 Kafka 集群中分区(partition)和副本(replica)状态的节点。在整个 Kafka 集群中,Controller 的角色是至关重要的,它帮助集群维持稳定,确保分区和副本的可用性和一致性。
Controller 在集群中的主要作用包括:
为进一步了解 Kafka 中 Controller 的重要性,可阅读以下扩展点:
消息持久性:Kafka 使用磁盘进行消息存储,确保即使在系统故障的情况下,消息也不会丢失。具体措施包括:
高可用性:Kafka 通过复制机制和分布式架构来实现高可用性,具体包括:
支撑技术
ISR 定义
核心价值:通过动态 ISR 管理实现可靠性与性能的平衡
消息同步流程
关键机制
配置参数
参数 | 作用 | 典型值 |
---|---|---|
acks |
确认副本数 | all |
min.insync.replicas |
最小同步副本数 | 2 |
replica.lag.time.max.ms |
最大滞后时间 | 10000 |
设计权衡
acks=all
确保数据安全选择原则:根据业务对数据丢失的容忍度进行权衡配置。
参数选项
配置值 | 可靠性 | 性能 | 适用场景 |
---|---|---|---|
0 |
最低 | 最高 | 实时监控/日志收集 |
1 |
中等 | 中等 | 普通业务场景 |
all/-1 |
最高 | 最低 | 金融交易/关键数据 |
优化建议
可靠性优先:
acks=all
min.insync.replicas=2
unclean.leader.election.enable=false
性能优先:
acks=0
或1
replication.factor
(如 2)注意事项
replication.factor
建议≥3acks
值会增加网络和存储压力如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。
一条消息从生产到消费,可以划分三个阶段:
这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。通过 ACK+副本+幂等+手动提交 Offset 的组合策略,可系统性解决消息丢失问题。根据业务对可靠性和性能的需求调整配置。
acks=all
,确保所有副本持久化后才确认发送成功。enable.idempotence=true
,避免网络重试导致消息重复或丢失。producer.initTransactions()
)。replication.factor≥3
,保证高可用。min.insync.replicas≥2
,防止单点故障导致数据丢失。enable.auto.commit=false
,处理完消息后手动提交偏移量。关键配置:
1 | # 生产者 |
存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
上面的话可以解读为:
Kafka 的副本机制是 kafka 可靠性保证的核心。
Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。
Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
replication.factor
的作用是设置每个分区的副本数。replication.factor
是主题级别配置; default.replication.factor
是 broker 级别配置。副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。unclean.leader.election.enable
用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable
是 broker 级别(实际上是集群范围内)配置,默认值为 true。min.insync.replicas
控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas
是主题级别和 broker 级别配置。尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。replication.factor
> min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
。在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。
Kafka 有三种发送方式:同步、异步、异步回调。同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。
解决生产者丢失消息的方案:
生产者使用异步回调方式 producer.send(msg, callback)
发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
然后,需要基于以下几点来保证 Kafka 生产者的可靠性:
acks=0
、acks=1
、acks=all
。acks=0
、acks=1
都有丢失数据的风险。acks=all
意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas
,就可以决定在得到确认响应前,至少有多少副本能够收到消息。这是最保险的做法,但也会降低吞吐量。retries
为一个较大的值。这里的 retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。LEADER_NOT_AVAILABLE
,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。INVALID_CONFIG
,即使重试,也无法改变配置选项,重试没有意义。前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。
消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。
消费者的可靠性配置:
group.id
- 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
。auto.offset.reset
- 有两个选项:earliest
- 消费者会从分区的开始位置读取数据latest
- 消费者会从分区末尾位置读取数据enable.auto.commit
- 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。auto.commit.interval.ms
- 自动提交的频率,默认为每 5 秒提交一次。如果 enable.auto.commit
设为 true,即自动提交,就无需考虑提交偏移量的问题。
如果选择显示提交偏移量,需要考虑以下问题:
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性。
幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
常用的实现幂等操作的方法:
enable.idempotence=true
,避免网络重试导致重复。对消息有序有要求的场景
场景 | 顺序性要求示例 |
---|---|
金融交易 | 转账指令必须按 开户→存款→转账 顺序执行 |
日志聚合 | 错误日志需按时间顺序排列:启动→运行→异常→终止 |
库存管理 | 操作顺序必须为 入库→出库→盘点 ,否则库存数据不一致 |
流媒体 | 视频帧需按 I 帧→P 帧→B 帧 顺序传输,否则解码失败 |
Kafka 提供了有限度的顺序性保证,具体来说:
如何保证消息的严格顺序性
高并发场景下如何优化顺序消费
关键机制
hash(key) mod partitionNum
。通过这种策略,可以确保相同键的消息被发送到同一个分区,从而保证它们的顺序性。最佳实践
方案对比
方法 | 见效速度 | 影响 | 适用场景 |
---|---|---|---|
增加消费者 | 立即 | 无 | 分区有余量时 |
调整参数 | 立即 | 可能内存压力 | 资源充足时 |
重置 offset | 立即 | 数据丢失 | 非关键消息 |
处理原则
最佳实践:幂等性+事务+合理重试配置,构建高可靠消息系统
核心配置
1 | Properties props = new Properties(); |
关键特性
特性 | 说明 | 优势 |
---|---|---|
消息去重 | 自动过滤重复消息 | 避免数据重复 |
顺序保证 | 单分区内消息有序 | 维护数据一致性 |
自动重试 | 内置安全重试机制 | 提升可靠性 |
高级应用
1 | props.put("transactional.id", "txn-1"); |
使用建议
Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。
消息可靠性保障,由低到高为:
Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。
Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
Kafka 事务机制核心要点:
总结:Kafka 事务通过协调器、2PC 和日志追踪实现原子消息组,适用于需严格一致的分布式场景。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
事务属性实现前提是幂等性,即在配置事务属性 transaction.id
时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
在事务属性之前先引入了生产者幂等性,它的作用为:
消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息 (o1 到 o2 之间的消息)。
使用 kafka 的事务 api 时的一些注意事项:
consumer#commitSync
或者 consumer#commitAsyc
transctional.id
。最好为其设置一个有意义的名字。enable.idempotence = true
。如果配置了 transaction.id
,则此时 enable.idempotence
会被设置为 trueisolation.level
。在 consume-trnasform-produce
模式下使用事务时,必须设置为 READ_COMMITTED
。read_uncommitted
:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。Kafka 的事务机制与幂等性机制结合实现端到端的 Exactly Once,适用于强一致性要求的分布式系统。
核心功能
关键实现
事务流程(生产者端):
1 | producer.initTransactions(); // 初始化事务 |
幂等性实现:
协同作用
典型应用场景
故障容错:事务机制 + 幂等性 = 故障重试时仍保证数据一致,避免部分成功或重复消费。
Kafka 通过幂等生产
+事务
+精准 offset 控制
,在分布式环境下实现端到端 Exactly Once,适用于金融、计费等强一致性场景。
核心机制
幂等生产者
Producer ID
和消息序列号
实现去重事务生产者
commitTransaction
/abortTransaction
)消费端去重
offset
管理 + 消费者组机制异常处理
方法 | 作用 | 场景示例 |
---|---|---|
事务回滚 | 撤销未完成的操作,保持原子性 | 生产者写入部分分区失败时 |
自动重试 | 应对临时性故障(如网络抖动) | Broker 短暂不可用 |
幂等消费 | 通过业务 ID 或状态记录避免重复处理 | 消费者重启后重复拉取消息 |
关键扩展
enable.auto.commit=false
时需手动提交 offset 以精准控制消费Kafka 通过组件分工+副本机制保障高可用,结合批量/压缩/手动提交等优化手段实现高性能。新版本推荐使用 KRaft 模式简化架构。
核心组件
组件 | 核心功能 |
---|---|
Producer | 发布数据到 Topic,支持轮询/Key 哈希/自定义分区策略 |
Consumer | 通过消费组实现负载均衡,同一分区仅限单个消费者消费 |
Broker | 存储管理消息,通过分区副本实现高可用,支持故障自动转移 |
Zookeeper | 管理集群元数据、Leader 选举(注:新版本 Kafka 逐步用 KRaft 协议替代 Zookeeper) |
工作机制优化
linger.ms
+batch.size
)acks=1/all
平衡性能与可靠性)range/round-robin
策略)enable.auto.commit=false
避免重复/丢失)replication.factor≥2
保障容错)关键配置建议
场景 | 推荐配置 | 说明 |
---|---|---|
高吞吐场景 | compression.type=snappy |
压缩率与 CPU 开销平衡 |
数据持久化要求 | log.retention.hours=168 (7 天) |
根据存储容量调整 |
低延迟场景 | num.io.threads=8 (默认值翻倍) |
提升磁盘 IO 并行度 |
版本演进注意
Kafka 的数据存储在磁盘上,为什么还能这么快?
说 Kafka 很快时,他们通常指的是 Kafka 高效移动大量数据的能力。Kafka 为了提高传输效率,做了很多精妙的设计。
sendfile
系统调用,数据直接从磁盘→网络,减少 CPU 拷贝开销。::: info 零拷贝
:::
Kafka 数据传输是一个从网络到磁盘,再由磁盘到网络的过程。在网络和磁盘之间传输数据时,消除多余的复制是提高效率的关键。Kafka 利用零拷贝技术来消除传输过程中的多余复制。
如果不采用零拷贝,Kafka 将数据同步给消费者的大致流程是:
采用零拷贝技术,Kafka 使用 sendfile()
系统方法,将数据从 os buffer 直接复制到网卡 buffer。这个过程中,唯一一次复制数据是从 os buffer 到网卡 buffer。这个复制过程是通过 DMA(Direct Memory Access,直接内存访问) 完成的。使用 DMA 时,CPU 不参与,这使得它非常高效。
Kafka 通过分区+副本机制实现横向扩展与负载均衡,配合动态重平衡与 ISR 选举保障高可用性。合理配置分区/副本数和 ACK 策略是关键。
分区机制
负载均衡实现
角色 | 策略 | 作用 |
---|---|---|
Producer | 轮询(Round-robin) | 均匀分布消息到各分区 |
按键哈希(Key Hashing) | 相同 Key 的消息固定到同一分区 | |
Consumer | 消费组(Consumer Group)机制 | 组内消费者并行消费不同分区 |
分区分配策略(Range/Round-robin) | 控制消费者与分区的映射关系 |
动态扩展与容错
kafka-reassign-partitions
工具)关键优化点
group.instance.id
)避免消费者短暂离线触发重平衡replication.factor=3
)数据一致性保障
生产者 ACK 配置:
acks=0
:不等待确认(高性能,可能丢失数据)acks=1
:Leader 写入即确认(平衡选择)acks=all
:所有 ISR 副本确认(高可靠,延迟高)日志压缩通过 Key-Level 去重优化存储效率,适用于状态跟踪类场景,需权衡实时性与资源开销。配置时建议结合业务数据更新频率调整log.cleaner
相关参数。
基本概念
log.cleanup.policy=compact
工作机制
环节 | 说明 |
---|---|
写入阶段 | 所有消息(含重复 Key)正常写入日志 |
压缩阶段 | Cleaner 线程扫描日志,对同一 Key 只保留 offset 最大的记录 |
清理阶段 | 被标记删除的消息最终被物理清除 |
典型应用场景
与其他机制的对比
特性 | 日志压缩 | 日志删除(按时间/大小) |
---|---|---|
保留策略 | 按 Key 保留最新值 | 按时间/文件大小删除旧数据 |
适用场景 | 需要 Key 级状态追溯 | 只需保留近期数据 |
可共存性 | 可与删除策略同时配置 | - |
注意事项
null
键消息不会被压缩保留kafka.log:type=LogCleanerManager
相关指标Kafka 通过参数化限速和自适应背压实现多层级流量控制,需根据业务特点(吞吐/延迟/可靠性需求)组合配置。生产环境建议配合监控系统实现动态调节。
限速控制(Rate Limiting)
组件 | 关键参数 | 控制效果 |
---|---|---|
生产者 | max.in.flight.requests.per.connection |
限制单连接未确认请求数(默认 5) |
linger.ms |
批量发送等待时间(0-5000ms) | |
消费者 | fetch.min.bytes |
单次拉取最小数据量(默认 1B) |
fetch.max.wait.ms |
拉取请求最长等待时间(默认 500ms) |
背压机制(Backpressure)
enable.auto.commit=false
)buffer.memory
,默认 32MB)queued.max.messages
,默认 500)高级控制策略
配置建议
场景 | 优化方向 | 典型值 |
---|---|---|
高吞吐场景 | 增大linger.ms +batch.size |
linger.ms=50-100ms |
低延迟场景 | 减小fetch.max.wait.ms |
fetch.max.wait.ms=10ms |
稳定性优先 | 降低max.in.flight.requests |
设为 1(确保顺序性) |
通过 并行化、批处理、硬件加速 实现高吞吐,同时控制分区/副本数量及网络参数以降低延迟。
分区与副本优化
生产端调优
消费端调优
硬件优化
Broker 配置
通过 分区策略优化 + 动态资源分配 + 流量控制,实现数据均匀分布与稳定吞吐。
均衡数据分布
user_id
、order_id
),避免热点。动态调整与冗余
replication-factor=3
)分散读压力,平衡资源开销。rebalance
或迁移数据。流控与限流
producer
速率(如 max.in.flight.requests
)。fetch.max.bytes
或使用背压机制,匹配消费能力。实现 高吞吐、低延迟、强一致性 的流式数据处理管道。
基础集成步骤
flink-connector-kafka
(匹配 Kafka 版本)。FlinkKafkaConsumer
订阅 Kafka Topic。FlinkKafkaProducer
写入结果到 Kafka。性能优化方向
优化项 | 关键措施 |
---|---|
参数调优 | - 调整 batch.size /linger.ms (生产者)- 设置合理并行度(Flink 任务) |
资源分配 | - 平衡 Flink TaskManager 的 CPU/内存 - 确保 Kafka Broker 带宽充足 |
容错机制 | - 启用 Flink Checkpointing(精确一次语义) - 配置 Kafka 幂等性/事务 |
数据压缩 | 选用高效压缩算法(如 lz4 /snappy ),减少网络传输压力 |
关键代码示例
1 | // Kafka Source |
高级特性
setStartFromLatest()
/setStartFromEarliest()
。assignTimestampsAndWatermarks
处理事件时间。transaction.timeout.ms
)。存储架构优化
方法 | 作用 |
---|---|
增加分区数 | 分散写入负载,利用多磁盘并行 I/O(但避免过多分区导致管理开销)。 |
多副本配置 | 提升读取吞吐量(副本数通常 2-3),同时增强容错能力。 |
高性能磁盘 | 优先选择 NVMe SSD > SATA SSD > HDD,显著降低读写延迟。 |
RAID 配置 | - RAID 0:纯性能提升(无冗余) - RAID 10:性能+冗余(推荐生产环境)。 |
参数调优
类型 | 关键参数 | 优化建议 |
---|---|---|
Kafka 配置 | num.io.threads |
增加 I/O 线程数(默认=8,建议=CPU 核数)。 |
log.flush.interval.messages |
调高刷盘间隔(减少频繁刷盘开销)。 | |
生产者配置 | batch.size + linger.ms |
增大批次大小(如 64KB)和等待时间(如 50ms)。 |
系统级优化 | Linux vm.dirty_ratio |
调大文件系统缓存比例(如 20%-30%)。 |
高级特性
lz4
),减少磁盘写入量和网络传输负载。实践示例
1 | # Kafka Broker 配置示例 |
目标:通过 硬件选型 + 并行化设计 + 批量处理,实现高吞吐、低延迟的磁盘 I/O 性能。
通过 资源隔离 + 精准配额 + 动态管控,实现安全、公平的多租户架构。
租户隔离机制
方法 | 实现方式 |
---|---|
主题隔离 | 每个租户分配独立 Topic(如 tenantA_order 、tenantB_log ),物理隔离数据。 |
ACL 权限控制 | 通过 ACL 限制租户仅能访问自有 Topic(配置 CREATE/DESCRIBE/READ/WRITE 权限)。 |
配额管理配置
配额类型 | 控制对象 | 配置示例(CLI) |
---|---|---|
生产速率限制 | 生产者消息吞吐量 | --add-config 'producer_byte_rate=1048576' (限制 1MB/s) |
消费速率限制 | 消费者消息拉取速度 | --add-config 'consumer_byte_rate=524288' (限制 512KB/s) |
存储空间限制 | Topic 磁盘占用 | 配置 log.retention.bytes=1073741824 (限制 1GB) + log.retention.ms (时间策略) |
动态管理工具
kafka-configs.sh
动态调整配额(无需重启集群):1 | # 设置租户 A 的生产配额 |
实施建议
{tenant}_{data_type}
)。通过 流表转换 + 状态管理,实现实时计算与状态维护的统一处理。
核心概念对比
抽象类型 | 特点 | 适用场景 |
---|---|---|
Stream | 无界、有序的键值记录流(事件日志) | 实时分析、事件监控(如点击流、交易记录) |
Table | 有状态的键值快照(当前数据视图) | 状态维护(如用户配置、库存数量) |
相互转换操作
(1) Stream → Table
通过 聚合操作 将动态流转换为状态表:
1 | KStream<String, Long> stream = builder.stream("input-topic"); |
(2) Table → Stream
通过 toStream() 将表变更作为流输出:
1 | KTable<String, Long> table = builder.table("input-topic"); |
典型应用场景
电商实时统计
order-created
)。user_id → total_orders
)。视频播放分析
video_id, timestamp
)。video_id → play_count
)。关键设计思想
通过 合理分区设计 + 资源分配 + 参数调优,实现高吞吐、低延迟的稳定集群。
核心状态管理机制
组件 | 功能 |
---|---|
Zookeeper | 集群协调(Broker 注册、Leader 选举、Consumer Offset 存储) |
Broker 存储 | 消息持久化(内存 Page Cache + 磁盘日志) |
Kafka Streams | 流处理状态管理(RocksDB 状态存储、窗口化操作) |
关键性能优化策略
(1) 集群设计优化
配置项 | 优化建议 |
---|---|
分区数量 | - 分区数 ≈ 目标吞吐量 / 单分区吞吐能力 - 避免过多分区(建议单 Broker ≤ 2000) |
副本因子 | 生产环境建议 2-3(平衡可靠性与存储开销) |
Topic 规划 | 按业务拆分 Topic(如 logs-{service} ),避免热点 |
(2) Producer/Consumer 调优
1 | # Producer 优化 |
(3) Broker 资源配置
资源 | 优化方向 |
---|---|
内存 | - JVM 堆内存 ≤ 6GB(避免 GC 停顿) - 预留 50% 内存给 Page Cache |
磁盘 | - 使用 SSD/NVMe - 配置 RAID 10(高性能+冗余) |
网络 | 万兆网络 + 多网卡绑定(避免带宽瓶颈) |
(4) Kafka Streams 状态优化
1 | // 启用 RocksDB 状态存储 |
监控与调优工具
UnderReplicatedPartitions
、RequestQueueSize
、NetworkProcessorAvgIdlePercent
kafka-configs.sh
(动态调整配额)、kafka-topics.sh
(分区扩容)-XX:-UseBiasedLocking
):::details 要点
RPC 的全称是 Remote Procedure Call,即远程过程调用。
RPC 的主要作用是:
RPC 是微服务架构的基石,它提供了一种应用间通信的方式。
:::
:::details 要点
RPC 是一种应用间通信的方式,它的通信流程中需要注意以下环节:
:::
:::details 要点
只有二进制才能在网络中传输,所以 RPC 请求需要把方法调用的请求参数先转成二进制,然后再通过网络传输。
传输的数据可能很大,RPC 请求需要将数据分解为多个数据包;传输的数据也可能较小,需要和其他请求的数据包进行合并。当接收方收到请求时,需要从二进制数据中识别出不同的请求。问题是,如何从二进制数据中识别出其所属的请求呢?
这就需要发送方、接收方在通信过程中达成共识,严格按照协议处理二进制数据。这就好比让你读一篇没有标点符号的文章,你要怎么识别出每一句话到哪里结束呢?很简单啊,我们加上标点,完成断句就好了。这里有个潜在的含义,写文章和读文章的人,都遵循标点符号的用法。
再进一步探讨,既然已经有很多成熟的网络协议,为何还要设计 RPC 协议?
有必要。因为 HTTP 这些通信标准协议,数据包中的实际请求数据相对于数据包本身要小很多,有很多无用的内容;并且 HTTP 属于无状态协议,无法将请求和响应关联,每次请求要重新建立连接。这对于高性能的 RPC 来说,HTTP 协议难以满足需求,所以有必要设计一个紧凑的私有协议。
:::
:::details 要点
首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。
接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。
大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。
综上,一个 RPC 协议大概会由下图中的这些参数组成:
前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参数就会导致线上兼容问题。
为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。
:::
:::details 要点
由于,网络传输的数据必须是二进制数据,而调用方请求的出参、入参都是对象。因此,必须将对象转换可传输的二进制,并且要求转换算法是可逆的。
Java 领域,常见的序列化技术如下
市面上有如此多的序列化技术,那么我们在应用时如何选择呢?
一般而言,序列化技术选型需要考量的维度,根据重要性从高到低,依次有:
鉴于以上的考量,序列化技术的选型建议如下:
扩展阅读:深入理解 Java 序列化
:::
:::details 要点
由于 RPC 每次通信,都要经过序列化、反序列化的过程,所以序列化方式,会直接影响 RPC 通信的性能。除了选择合适的序列化技术,如何合理使用序列化也非常重要。
RPC 序列化常见的使用不当的情况如下:
对象过于复杂、庞大 - 对象过于复杂、庞大,会降低序列化、反序列化的效率,并增加传输开销,从而导致响应时延增大。
对象有复杂的继承关系 - 对象关系越复杂,就越浪费性能,同时又很容易出现序列化上的问题。大多数序列化框架在进行序列化时,如果发现类有继承关系,会不停地寻找父类,遍历属性。
使用序列化框架不支持的类作为入参类 - 比如 Hessian 框架,他天然是不支持 LinkHashMap、LinkedHashSet 等,而且大多数情况下最好不要使用第三方集合类,如 Guava 中的集合类,很多开源的序列化框架都是优先支持编程语言原生的对象。因此如果入参是集合类,应尽量选用原生的、最为常用的集合类,如 HashMap、ArrayList。
前面已经列举了常见的序列化问题,既然明确了问题,就要针对性预防。RPC 序列化时要注意以下几点:
:::
:::details 要点
一次 RPC 调用,本质就是服务消费者与服务提供者间的一次网络信息交换的过程。可见,通信是 RPC 实现的核心。
常见的网络 IO 模型分为四种:同步阻塞 IO(BIO)、同步非阻塞 IO(NIO)、IO 多路复用和异步非阻塞 IO(AIO)。在这四种 IO 模型中,只有 AIO 为异步 IO,其他都是同步 IO。
什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。
RPC 调用在大多数的情况下,是一个高并发调用的场景,考虑到系统内核的支持、编程语言的支持以及 IO 模型本身的特点,在 RPC 框架的实现中,在网络通信的处理上,通常会选择 IO 多路复用的方式。开发语言的网络通信框架的选型上,最优的选择是基于 Reactor 模式实现的框架,如 Java 语言,首选的框架便是 Netty 框架(Java 还有很多其他 NIO 框架,但目前 Netty 应用得最为广泛),并且在 Linux 环境下,也要开启 epoll 来提升系统性能(Windows 环境下是无法开启 epoll 的,因为系统内核不支持)。
:::
:::details 要点
系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。
应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。
应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。
所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。
Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。
Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。
Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。
Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。
:::
:::details 要点
RPC 的远程过程调用是通过动态代理实现的。
RPC 框架会自动为要调用的接口生成一个代理类。当在项目中注入接口的时候,运行过程中实际绑定的就是这个接口生成的代理类。在接口方法被调用时,会被代理类拦截,这样,就可以在生成的代理类中,加入远程调用逻辑。
除了 JDK 默认的 InvocationHandler
能完成代理功能,还有很多其他的第三方框架也可以,比如像 Javassist、Byte Buddy 这样的框架。
单纯从代理功能上来看,JDK 默认的代理功能是有一定的局限性的,它要求被代理的类只能是接口。原因是因为生成的代理类会继承 Proxy 类,但 Java 是不支持多重继承的。此外,由于它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低。
反射+动态代理更多详情可以参考:深入理解 Java 反射和动态代理
:::
:::details 要点
RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。
使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。
搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能。
通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。
在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:
RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。
:::
负载均衡详情可以参考:负载均衡基本原理
:::details 要点
可以采用一种打分的策略,服务调用者收集与之建立长连接的每个服务节点的指标数据,如服务节点的负载指标、CPU 核数、内存大小、请求处理的耗时指标(如请求平均耗时、TP99、TP999)、服务节点的状态指标(如正常、亚健康)。通过这些指标,计算出一个分数,比如总分 10 分,如果 CPU 负载达到 70%,就减它 3 分,当然了,减 3 分只是个类比,需要减多少分是需要一个计算策略的。可以为每个指标都设置一个指标权重占比,然后再根据这些指标数据,计算分数。
可以配合随机权重的负载均衡策略去控制,通过最终的指标分数修改服务节点最终的权重。例如给一个服务节点综合打分是 8 分(满分 10 分),服务节点的权重是 100,那么计算后最终权重就是 80(100*80%)。服务调用者发送请求时,会通过随机权重的策略来选择服务节点,那么这个节点接收到的流量就是其他正常节点的 80%(这里假设其他节点默认权重都是 100,且指标正常,打分为 10 分的情况)。
到这儿,一个自适应的负载均衡我们就完成了,整体的设计方案如下图所示:
关键步骤我来解释下:
:::
:::details 要点
服务路由是指通过一定的规则从集群中选择合适的节点。
负载均衡的作用和服务路由的功能看上去很近似,二者有什么区别呢?
负载均衡的目标是提供服务分发而不是解决路由问题,常见的静态、动态负载均衡算法也无法实现精细化的路由管理,但是负载均衡也可以简单看做是路由方案的一种。
服务路由通常用于以下场景,目的在于实现流量隔离:
常见的路由规则有:
:::
:::details 要点
使用频率适中的心跳去检测目标机器的健康状态。
可以使用可用率来作为健康状态的量化标准:
1 | 可用率 = 一个时间窗口内接口调用成功次数 / 总调用次数 |
当可用率低于某个比例,就认为这个节点存在问题,把它挪到亚健康列表,这样既考虑了高低频的调用接口,也兼顾了接口响应时间不同的问题。
:::
分布式链路跟踪就是将一次分布式请求还原为一个完整的调用链路,我们可以在整个调用链路中跟踪到这一次分布式请求的每一个环节的调用情况,比如调用是否成功,返回什么异常,调用的哪个服务节点以及请求耗时等等。
Trace 就是代表整个链路,每次分布式都会产生一个 Trace,每个 Trace 都有它的唯一标识即 TraceId,在分布式链路跟踪系统中,就是通过 TraceId 来区分每个 Trace 的。
Span 就是代表了整个链路中的一段链路,也就是说 Trace 是由多个 Span 组成的。在一个 Trace 下,每个 Span 也都有它的唯一标识 SpanId,而 Span 是存在父子关系的。还是以讲过的例子为例子,在 A->B->C->D 的情况下,在整个调用链中,正常情况下会产生 3 个 Span,分别是 Span1(A->B)、Span2(B->C)、Span3(C->D),这时 Span3 的父 Span 就是 Span2,而 Span2 的父 Span 就是 Span1。
RPC 在整合分布式链路跟踪需要做的最核心的两件事就是“埋点”和“传递”。
我们前面说是因为各子应用、子服务间复杂的依赖关系,所以通过日志难定位问题。那我们就想办法通过日志定位到是哪个子应用的子服务出现问题就行了。
其实,在 RPC 框架打印的异常信息中,是包括定位异常所需要的异常信息的,比如是哪类异常引起的问题(如序列化问题或网络超时问题),是调用端还是服务端出现的异常,调用端与服务端的 IP 是什么,以及服务接口与服务分组都是什么等等。具体如下图所示:
:::details 要点
当服务提供方要上线的时候,一般是通过部署系统完成实例重启。在这个过程中,服务提供方的团队并不会事先告诉调用方他们需要操作哪些机器,从而让调用方去事先切走流量。而对调用方来说,它也无法预测到服务提供方要对哪些机器重启上线,因此负载均衡就有可能把要正在重启的机器选出来,这样就会导致把请求发送到正在重启中的机器里面,从而导致调用方不能拿到正确的响应结果。
在服务重启的时候,对于调用方来说,这时候可能会存在以下几种情况:
当出现第二种情况的时候,调用方业务会受损,如何避免这种问题呢。当服务提供方关闭前,是不是可以先通知注册中心进行下线,然后通过注册中心告诉调用方进行节点摘除?
如上图所示,整个关闭过程中依赖了两次 RPC 调用,一次是服务提供方通知注册中心下线操作,一次是注册中心通知服务调用方下线节点操作。注册中心通知服务调用方都是异步的。服务发现只保证最终一致性,并不保证实时性,所以注册中心在收到服务提供方下线的时候,并不能成功保证把这次要下线的节点推送到所有的调用方。所以这么来看,通过服务发现并不能做到应用无损关闭。
可以这么处理:当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。
如何捕获到关闭事件呢?可以通过捕获操作系统的进程信号来获取,在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。
关闭过程中已经在处理的请求会不会受到影响呢?如果进程结束过快会造成这些请求还没有来得及应答,同时调用方会也会抛出异常。为了尽可能地完成正在处理的请求,首先我们要把这些请求识别出来。可以在服务对象加上引用计数器,每开始处理请求之前加一,完成请求处理减一,通过该计数器我们就可以快速判断是否有正在处理的请求。服务对象在关闭过程中,会拒绝新的请求,同时根据引用计数器等待正在处理的请求全部结束之后才会真正关闭。但考虑到有些业务请求可能处理时间长,或者存在被挂住的情况,为了避免一直等待造成应用无法正常退出,我们可以在整个 ShutdownHook 里面,加上超时时间控制,当超过了指定时间没有结束,则强制退出应用。超时时间我建议可以设定成 10s,基本可以确保请求都处理完了。
:::
运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在Java里面,在运行过程中,JVM虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到JVM缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。
但是这些“临时数据”,都在应用重启后就消失了。重启后的这些“红利”没有了之后,如果让刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。
启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。如何做到这点呢?
首先,对于调用方来说,我们要知道服务提供方启动的时间。一种是服务提供方在启动的时候,把自己启动的时间告诉注册中心;另外一种就是注册中心收到的服务提供方的请求注册时间。因为整个预热过程的时间是一个粗略值,即使机器之间的日期时间存在1分钟的误差也不影响,并且在真实环境中机器都会默认开启NTP时间同步功能,来保证所有机器时间的一致性。
不管你是选择哪个时间,最终的结果就是,调用方通过服务发现,除了可以拿到IP列表,还可以拿到对应的启动时间。接着,可以利用加权负载均衡算法来分发流量。现在,需要让这个权重变为动态的,并且是随着时间的推移慢慢增加到服务提供方设定的固定值。
通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。
服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。
为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个 Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。
具体如何实现呢?
我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的 Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。
:::
:::details 要点
设计一个 RPC 框架,可以自下而上梳理一下所需要的能力:
以上,是一个 RPC 框架的基础能力,使用于 P2P 场景。
但是,如果面对集群模式,以上能力就不够了。同一个服务可能有多个提供者。消费者选择调用哪个提供者?消费者怎么找到提供者的访问地址?请求提供者失败了如何处理?这些都依赖于服务治理的能力。
服务治理,需要很多个模块的能力:服务发现、负载均衡、路由、容错、配置挂历等。
具备了这些能力就万事大吉了吗?RPC 框架很难一开始就面面俱到,但作为基础能力,在实际应用中,难免会有定制化的要求。这就要求 RPC 框架具备良好的扩展性。
通常来说,框架软件可以通过 SPI 技术来实现微内核+插件架构。根据依赖倒置原则,框架应该先将每个功能点都抽象成接口,并提供默认实现。然后,利用 SPI 机制,可以动态地为某个接口寻找服务实现。
加上了插件功能之后,我们的RPC框架就包含了两大核心体系——核心功能体系与插件体系,如下图所示:
:::
:::details 要点
一次 RPC 调用的本质就是调用端向服务端发送一条请求消息,服务端收到消息后进行处理,处理之后响应给调用端一条响应消息,调用端收到响应消息之后再进行处理,最后将最终的返回值返回给动态代理。
对于 RPC 框架,无论是同步调用还是异步调用,调用端的内部实现都是异步的。
调用端发送的每条消息都一个唯一的消息标识,实际上调用端向服务端发送请求消息之前会先创建一个 Future,并会存储这个消息标识与这个 Future 的映射,动态代理所获得的返回值最终就是从这个 Future 中获取的;当收到服务端响应的消息时,调用端会根据响应消息的唯一标识,通过之前存储的映射找到对应的 Future,将结果注入给那个 Future,再进行一系列的处理逻辑,最后动态代理从 Future 中获得到正确的返回值。
所谓的同步调用,不过是 RPC 框架在调用端的处理逻辑中主动执行了这个 Future 的 get 方法,让动态代理等待返回值;而异步调用则是 RPC 框架没有主动执行这个 Future 的 get 方法,用户可以从请求上下文中得到这个 Future,自己决定什么时候执行这个 Future 的 get 方法。
如何做到 RPC 调用全异步?
实现 RPC 调用全异步的方法是让 RPC 框架支持 CompletableFuture
。CompletableFuture
是 Java8 原生支持的。如果 RPC 框架能够支持 CompletableFuture
,现在发布一个 RPC 服务,服务接口定义的返回值是 CompletableFuture
对象,整个调用过程会分为这样几步:
CompletableFuture
对象,之后就不需要任何额外的与 RPC 框架相关的操作了,直接就可以进行异步处理;CompletableFuture
对象,之后服务端真正的业务逻辑完全可以在一个线程池中异步处理,业务逻辑完成之后再调用这个 CompletableFuture
对象的 complete
方法,完成异步通知;CompletableFuture
对象的 complete
方法,这样一次异步调用就完成了。通过对 CompletableFuture
的支持,RPC 框架可以真正地做到在调用端与服务端之间完全异步,同时提升了调用端与服务端的两端的单机吞吐量,并且 CompletableFuture
是 Java8 原生支持,业务逻辑中没有任何代码入侵性。
:::
:::details 要点
在很多开源框架中,都需要定时任务的管理功能,例如 ZooKeeper、Netty、Quartz、Kafka 以及 Linux 操作系统。
定时器的本质是设计一种数据结构,能够存储和调度任务集合,而且 deadline 越近的任务拥有更高的优先级。那么定时器如何知道一个任务是否到期了呢?定时器需要通过轮询的方式来实现,每隔一个时间片去检查任务是否到期。
所以定时器的内部结构一般需要一个任务队列和一个异步轮询线程,并且能够提供三种基本操作:
JDK 原生提供了三种常用的定时器实现方式,分别为 Timer
、DelayedQueue
和 ScheduledThreadPoolExecutor
。
JDK 内置的三种实现定时器的方式,实现思路都非常相似,都离不开任务、任务管理、任务调度三个角色。三种定时器新增和取消任务的时间复杂度都是 O(logn)
,面对海量任务插入和删除的场景,这三种定时器都会遇到比较严重的性能瓶颈。
对于性能要求较高的场景,一般都会采用时间轮算法来实现定时器。时间轮(Timing Wheel)是 George Varghese 和 Tony Lauck 在 1996 年的论文 Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility 实现的,它在 Linux 内核中使用广泛,是 Linux 内核定时器的实现方法和基础之一。
时间轮是一种高效的、批量管理定时任务的调度模型。时间轮可以理解为一种环形结构,像钟表一样被分为多个 slot 槽位。每个 slot 代表一个时间段,每个 slot 中可以存放多个任务,使用的是链表结构保存该时间段到期的所有任务。时间轮通过一个时针随着时间一个个 slot 转动,并执行 slot 中的所有到期任务。
任务是如何添加到时间轮当中的呢?可以根据任务的到期时间进行取模,然后将任务分布到不同的 slot 中。如上图所示,时间轮被划分为 8 个 slot,每个 slot 代表 1s,当前时针指向 2。假如现在需要调度一个 3s 后执行的任务,应该加入 2+3=5
的 slot 中;如果需要调度一个 12s 以后的任务,需要等待时针完整走完一圈 round 零 4 个 slot,需要放入第 (2+12)%8=6
个 slot。
那么当时针走到第 6 个 slot 时,怎么区分每个任务是否需要立即执行,还是需要等待下一圈,甚至更久时间之后执行呢?所以我们需要把 round 信息保存在任务中。例如图中第 6 个 slot 的链表中包含 3 个任务,第一个任务 round=0,需要立即执行;第二个任务 round=1,需要等待 1*8=8s
后执行;第三个任务 round=2,需要等待 2*8=8s
后执行。所以当时针转动到对应 slot 时,只执行 round=0 的任务,slot 中其余任务的 round 应当减 1,等待下一个 round 之后执行。
上面介绍了时间轮算法的基本理论,可以看出时间轮有点类似 HashMap,如果多个任务如果对应同一个 slot,处理冲突的方法采用的是拉链法。在任务数量比较多的场景下,适当增加时间轮的 slot 数量,可以减少时针转动时遍历的任务个数。
时间轮定时器最大的优势就是,任务的新增和取消都是 O(1)
时间复杂度,而且只需要一个线程就可以驱动时间轮进行工作。
:::
:::details 要点
在一些特定场景下,需要在没有接口的情况下进行 RPC 调用。例如:
场景一:搭建一个统一的测试平台,可以让各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。
场景二:搭建一个轻量级的服务网关,可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。
为了解决这些场景的问题,可以使用泛化调用。
就是 RPC 框架提供统一的泛化调用接口(GenericService),调用端在创建 GenericService 代理时指定真正需要调用的接口的接口名以及分组名,通过调用 GenericService 代理的 $invoke 方法将服务端所需要的所有信息,包括接口名、业务分组名、方法名以及参数信息等封装成请求消息,发送给服务端,实现在没有接口的情况下进行 RPC 调用的功能。
1 | class GenericService { |
而通过泛化调用的方式发起调用,由于调用端没有服务端提供方提供的接口 API,不能正常地进行序列化与反序列化,我们可以为泛化调用提供专属的序列化插件,来解决实际问题。
:::
扩展:
缓存就是数据交换的缓冲区,用于将频繁访问的数据暂存在访问速度快的存储介质。
缓存的本质是一种利用空间换时间的设计:牺牲一定的数据实时性,使得访问更快、更近:
缓存是用于存储数据的硬件或软件的组成部分,以使得后续更快访问相应的数据。缓存中的数据可能是提前计算好的结果、数据的副本等。典型的应用场景:有 cpu cache, 磁盘 cache 等。本文中提及到缓存主要是指互联网应用中所使用的缓存组件。
缓存命中率是缓存的重要度量指标,命中率越高越好。
1 | 缓存命中率 = 从缓存中读取次数 / 总读取次数 |
引入缓存,会增加系统的复杂度,并牺牲一定的数据实时性。所以,引入缓存前,需要先权衡是否值得,考量点如下:
在数据层引入缓存,有以下几个好处:
缓存从部署角度,可以分为客户端缓存和服务端缓存。
客户端缓存
Cache-Control
、HTTP/1 中的 Expires
服务端缓存
其中,CDN 缓存、反向代理缓存、数据库缓存一般由专职人员维护(运维、DBA)。
后端开发一般聚焦于进程内缓存、分布式缓存。
CDN 将数据缓存到离用户物理距离最近的服务器,使得用户可以就近获取请求内容。CDN 一般缓存静态资源文件(页面,脚本,图片,视频,文件等)。
国内网络异常复杂,跨运营商的网络访问会很慢。为了解决跨运营商或各地用户访问问题,可以在重要的城市,部署 CDN 应用。使用户就近获取所需内容,降低网络拥塞,提高用户访问响应速度和命中率。
CDN 的基本原理是广泛采用各种缓存服务器,将这些缓存服务器分布到用户访问相对集中的地区或网络中,在用户访问网站时,利用全局负载技术将用户的访问指向距离最近的工作正常的缓存服务器上,由缓存服务器直接响应用户请求。
(1)未部署 CDN 应用前的网络路径:
在不考虑复杂网络的情况下,从请求到响应需要经过 3 个节点,6 个步骤完成一次用户访问操作。
(2)部署 CDN 应用后网络路径:
在不考虑复杂网络的情况下,从请求到响应需要经过 2 个节点,2 个步骤完成一次用户访问操作。
与不部署 CDN 服务相比,减少了 1 个节点,4 个步骤的访问。极大的提高了系统的响应速度。
优点
缺点
反向代理(Reverse Proxy)方式是指以代理服务器来接受网络连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给客户端,此时代理服务器对外就表现为一个反向代理服务器。
反向代理位于应用服务器同一网络,处理所有对 WEB 服务器的请求。
反向代理缓存的原理:
这种方式通过降低向 WEB 服务器的请求数,从而降低了 WEB 服务器的负载。
反向代理缓存一般针对的是静态资源,而将动态资源请求转发到应用服务器处理。常用的缓存应用服务器有 Varnish,Ngnix,Squid。
扩展:
缓存一般存于访问速度较快的存储介质,快也就意味着资源昂贵并且有限。正所谓,好钢要用在刀刃上。因此,缓存要合理利用,需要设定一些机制,将一些访问频率偏低或过期的数据淘汰。
淘汰缓存首先要做的是,确定什么时候触发淘汰缓存,一般有以下几个思路:
接下来,就要确定如何淘汰缓存,常见的缓存淘汰算法有以下几个:
一般来说,系统如果不是严格要求缓存和数据库保持一致性的话,尽量不要将读请求和写请求串行化。串行化可以保证一定不会出现数据不一致的情况,但是它会导致系统的吞吐量大幅度下降。缓存更新的常见策略有以下几种:
需要注意的是:以上几种缓存更新策略,都无法保证数据强一致。如果一定要保证强一致性,可以通过两阶段提交(2PC)或 Paxos 协议来实现。但是 2PC 太慢,而 Paxos 太复杂,所以如果不是非常重要的数据,不建议使用强一致性方案。
秒杀的整体架构可以概括为“稳、准、快”几个关键字
秒杀系统本质上就是一个满足大并发、高性能和高可用的分布式系统。
(1)请求量级 10w QPS 的架构
架构要点:
(1)请求量级 100w QPS 的架构
小结:架构之道,在于权衡取舍。要取得极致的性能,往往要在通用性、易用性、成本等方面有所牺牲,反之亦然。
“动态数据”和“静态数据”的主要区别就是看页面中输出的数据是否和 URL、浏览者、时间、地域相关,以及是否含有 Cookie 等私密数据。
所谓“动态”还是“静态”,并不是说数据本身是否动静,而是数据中是否含有和访问者相关的个性化数据。更通俗的来说,是不是每个人看到的页面是相同的。
怎样对静态数据做缓存呢?
http://item.xxx.com/item.htm?id=xxxx
就可以作为唯一的 URL 标识。为啥要 URL 唯一呢?前面说了我们是要缓存整个 HTTP 连接,那么以什么作为 Key 呢?就以 URL 作为缓存的 Key,例如以 id=xxx 这个格式进行区分。分离出动态内容之后,如何组织这些内容页就变得非常关键了。动态内容的处理通常有两种方案:
这种方案是将虚拟机改为实体机,以增大 Cache 的容量,并且采用了一致性 Hash 分组的方式来提升命中率。这里将 Cache 分成若干组,是希望能达到命中率和访问热点的平衡。Hash 分组越少,缓存的命中率肯定就会越高,但短板是也会使单个商品集中在一个分组中,容易导致 Cache 被击穿,所以我们应该适当增加多个相同的分组,来平衡访问热点和命中率的问题。
实体机单机部署有以下几个优点:
缺点:
所谓统一 Cache 层,就是将单机的 Cache 统一分离出来,形成一个单独的 Cache 集群。
优点:
缺点:
动静分离后,缓存如果前置到 CDN,由于离用户更近,因此访问更快。
CDN 方案有以下问题:
将商品详情系统放到全国的所有 CDN 节点上是不太现实的,因为存在失效问题、命中率问题以及系统的发布更新问题。那么是否可以选择若干个节点来尝试实施呢?答案是“可以”,但是这样的节点需要满足几个条件:
最后,还有一点也很重要,那就是:节点不要太多。
基于上面几个因素,选择 CDN 的二级 Cache 比较合适,因为二级 Cache 数量偏少,容量也更大,让用户的请求先回源的 CDN 的二级 Cache 中,如果没命中再回源站获取数据,部署方式如下图所示:
所谓“静态热点数据”,就是能够提前预测的热点数据。例如,我们可以通过卖家报名的方式提前筛选出来,通过报名系统对这些热点商品进行打标。另外,我们还可以通过大数据分析来提前发现热点商品,比如我们分析历史成交记录、用户的购物车记录,来发现哪些商品可能更热门、更好卖,这些都是可以提前分析出来的热点。
所谓“动态热点数据”,就是不能被提前预测到的,系统在运行过程中临时产生的热点。例如,卖家在抖音上做了广告,然后商品一下就火了,导致它在短时间内被大量购买。
动态热点发现系统的具体实现。
这里我给出了一个图,其中用户访问商品时经过的路径有很多,我们主要是依赖前面的导购页面(包括首页、搜索页面、商品详情、购物车等)提前识别哪些商品的访问量高,通过这些系统中的中间件来收集热点数据,并记录到日志中。
处理热点数据通常有几种思路:一是优化,二是限制,三是隔离。
具体到“秒杀”业务,我们可以在以下几个层次实现隔离。
流量削峰的思路:排队、答题、分层过滤
排队 - 使用 MQ 削峰、解耦
适用于内部上下游系统之间调用请求不平缓的场景,由于内部系统的服务质量要求不能随意丢弃请求,所以使用消息队列能起到很好的削峰和缓冲作用。
答题 - 延缓请求、限制秒杀器
适用于秒杀或者营销活动等应用场景,在请求发起端就控制发起请求的速度,因为越到后面无效请求也会越多,所以配合后面介绍的分层拦截的方式,可以更进一步减少无效请求对系统资源的消耗。
分层过滤 - 请求分别经过 CDN、前台读系统(如商品详情系统)、后台系统(如交易系统)和数据库这几层分层过滤。
分层过滤非常适合交易性的写请求,比如减库存或者拼车这种场景,在读的时候需要知道还有没有库存或者是否还有剩余空座位。但是由于库存和座位又是不停变化的,所以读的数据是否一定要非常准确呢?其实不一定,你可以放一些请求过去,然后在真正减的时候再做强一致性保证,这样既过滤一些请求又解决了强一致性读的瓶颈。
分层校验的基本原则是:
减库存的一般方式:
针对秒杀场景,一般“抢到就是赚到”,所以成功下单后却不付款的情况比较少,再加上卖家对秒杀商品的库存有严格限制,所以秒杀商品采用“下单减库存”更加合理。另外,理论上,“下单减库存”比“预扣库存”以及涉及第三方支付的“付款减库存”在逻辑上更为简单,所以性能上更占优势。
“下单减库存”在数据一致性上,主要就是保证大并发请求时库存数据不能为负数,也就是要保证数据库中的库存字段值不能为负数,一般我们有多种解决方案:一种是在应用程序中通过事务来判断,即保证减后库存不能为负数,否则就回滚;另一种办法是直接设置数据库的字段数据为无符号整数,这样减后库存字段值小于零时会直接执行 SQL 语句来报错;再有一种就是使用 CASE WHEN 判断语句,例如这样的 SQL 语句:
1 | UPDATE item SET inventory = CASE WHEN inventory >= xxx THEN inventory-xxx ELSE inventory END |
高可用系统建设:
“降级”,就是当系统的容量达到一定程度时,限制或者关闭系统的某些非核心功能,从而把有限的资源保留给更核心的业务。
限流就是当系统容量达到瓶颈时,我们需要通过限制一部分流量来保护系统,并做到既可以人工执行开关,也支持自动化保护的措施。
过载保护 - 当系统负载达到一定阈值时,例如 CPU 使用率达到 90%或者系统 load 值达到 2*CPU 核数时,系统直接拒绝所有请求,这种方式是最暴力但也最有效的系统保护方式。
拒绝服务可以说是一种不得已的兜底方案,用以防止最坏情况发生,防止因把服务器压跨而长时间彻底无法提供服务。像这种系统过载保护虽然在过载时无法提供服务,但是系统仍然可以运作,当负载下降时又很容易恢复,所以每个系统和每个环节都应该设置这个兜底方案,对系统做最坏情况下的保护。
高可用建设需要长期规划并进行体系化建设,要在预防(建立常态的压力体系,例如上线前的单机压测到上线后的全链路压测)、管控(做好线上运行时的降级、限流和兜底保护)、监控(建立性能基线来记录性能的变化趋势以及线上机器的负载报警体系,发现问题及时预警)和恢复体系(遇到故障要及时止损,并提供快速的数据订正工具等)等这些地方加强建设。