Kafka 面试

Kafka 面试

Kafka 简介

【基础】什么是 Kafka?

:::details 要点

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储

img

Kafka 的核心功能如下:

  • 消息引擎 - Kafka 可以作为一个消息引擎系统。
  • 流处理 - Kafka 可以作为一个分布式流处理平台。
  • 存储 - Kafka 可以作为一个安全的分布式存储。

Kafka 的设计目标:

  • 高性能
    • 分区、分段、索引:基于分区机制提供并发处理能力。分段、索引提升了数据读写的查询效率。
    • 顺序读写:使用顺序读写提升磁盘 IO 性能。
    • 零拷贝:利用零拷贝技术,提升网络 I/O 效率。
    • 页缓存:利用操作系统的 PageCache 来缓存数据(典型的利用空间换时间)
    • 批量读写:批量读写可以有效提升网络 I/O 效率。
    • 数据压缩:Kafka 支持数据压缩,可以有效提升网络 I/O 效率。
    • pull 模式:Kafka 架构基于 pull 模式,可以自主控制消费策略,提升传输效率。
  • 高可用
    • 持久化:Kafka 所有的消息都存储在磁盘,天然支持持久化。
    • 副本机制:Kafka 的 Broker 集群支持副本机制,可以通过冗余,来保证其整体的可用性。
    • 选举 Leader:Kafka 基于 ZooKeeper 支持选举 Leader,实现了故障转移能力。
  • 伸缩性
    • 分区:Kafka 的分区机制使得其具有良好的伸缩性。

:::

【基础】Kafka 有哪些核心术语?

:::details 要点

Kafka 的核心术语如下:

  • 消息 - Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
  • 主题 - Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 分区 - Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
  • 消息位移 - Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 副本 - Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  • 生产者 - Producer。向主题发布新消息的应用程序。
  • 消费者 - Consumer。从主题订阅新消息的应用程序。
  • 消费者位移 - Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
  • 消费者组 - Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 分区再均衡 - Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的三层消息架构:

  • 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
  • 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
  • 最后,客户端程序只能与分区的领导者副本进行交互。

:::

Kafka 存储

【中级】Kafka 是如何存储数据的?

:::details 要点

Kafka 逻辑存储

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。

Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。

Kafka 物理存储

Log 是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log 对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test,那么,Kafka 会在磁盘上创建两个子目录:test-0test-1;而在服务器端,这就对应两个 Log 对象。

因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。

Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除

Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

Segment 文件可以分为两类:

  • 索引文件
    • 偏移量索引文件( .index
    • 时间戳索引文件( .timeindex
    • 已终止事务的索引文件(.txnindex):如果没有使用 Kafka 事务,则不会创建该文件
  • 日志数据文件(.log

:::

【高级】Kafka 文件格式是怎样的?

:::details 要点

Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式和从生产者发送过来或消费者读取的数据格式是一样的。因为使用了相同的数据格式,使得 Kafka 可以进行零拷贝技术给消费者发送消息,同时避免了压缩和解压。

除了键、值和偏移量外,消息里还包含了消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip 或者 LZ4)和时间戳(0.10.0 新增)。时间戳可以是生产者发送消息的时间,也可以是消息到达 Broker 的时间,这个是可配的。

如果生产者发送的是压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:

img

如果生产者使用了压缩功能,发送的批次越大,就意味着能获得更好的网络传输效率,并且节省磁盘存储空间。

Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。

:::

【高级】Kafka 如何检索数据?

:::details 要点

Kafka 允许消费者从任意有效的偏移量位置开始读取消息。Kafka 为每个 Partition 都维护了一个索引(即 .index 文件),该索引将偏移量映射到片段文件以及偏移量在文件里的位置。

索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重读消息并录制偏移量和位置来重新生成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka 会自动重新生成这些索引。

索引文件用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由 offset 和 position 组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka 采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;

有了偏移量索引文件,通过它,Kafka 就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的 position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。

:::

【高级】Kafka 如何清理数据?

:::details 要点

每个日志片段可以分为以下两个部分:

  • 干净的部分:这部分消息之前已经被清理过,每个键只存在一个值。
  • 污浊的部分:在上一次清理后写入的新消息。

img

如果在 Kafka 启动时启用了清理功能(通过 log.cleaner.enabled 配置),每个 Broker 会启动一个清理管理器线程和若干个清理线程,每个线程负责一个 Partition。

清理线程会读取污浊的部分,并在内存里创建一个 map。map 的 key 是消息键的哈希值,value 是消息的偏移量。对于相同的键,只保留最新的位移。其中 key 的哈希大小为 16 字节,位移大小为 8 个字节。也就是说,一个映射只有 24 字节,假设消息大小为 1KB,那么 1GB 的段有 1 百万条消息,建立这个段的映射只需要 24MB 的内存,映射的内存效率是非常高效的。

在配置 Kafka 时,管理员需要设置这些清理线程可以使用的总内存。如果设置 1GB 的总内存同时有 5 个清理线程,那么每个线程只有 200MB 的内存可用。在清理线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka 会一次清理最老的几个脏段,然后在下一次再处理其他的脏段。

一旦建立完脏段的键与位移的映射后,清理线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。

对于一个段,清理前后的效果如下:

img

对于只保留最新消息的清理策略来说,Kafka 还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个 null 的消息内容。当清理线程发现这条消息时,它首先仍然进行一个正常的清理并且保留这个包含 null 的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,清理线程会删除这条消息,这个键会从 Partition 中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。

:::

生产者

【中级】Kafka 发送消息的工作流程是怎样的?

:::details 要点

Kafka 生产者用一个 ProducerRecord 对象来抽象一条要发送的消息, ProducerRecord 对象需要包含目标主题和要发送的内容,还可以指定键或分区。其发送消息流程如下:

(1)序列化 - 生产者要先把键和值序列化成字节数组,这样它们才能够在网络中传输。

(2)分区 - 数据被传给分区器。如果在 ProducerRecord 中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。

(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。

  • 批次,就是一组消息,这些消息属于同一个主题和分区
  • 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。

(4)响应 - 服务器收到消息会返回一个响应。

  • 如果成功,则返回一个 RecordMetaData 对象,它包含了主题、分区、偏移量;
  • 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。

img

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?

  • 生产者会向任意 broker 发送一个元数据请求(MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。
  • 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

img

:::

消费者

【基础】Kafka 为什么要支持消费者群组?

:::details 要点

消费者

每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。

一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:

img

消费者群组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。

Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer

如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费

不同消费者群组之间互不影响

:::

【中级】如何消费 Kafka 消息?

:::details 要点

Kafka 消费者通过 pull 模式来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:

  • 消费者通过 customer.poll(time) 中设置等待时间
  • Broker 会等待累计一定量数据,然后发送给消费者。这样可以减少网络开销。

pull 除了获取消息外,还有其他作用:

  • 发送心跳信息。消费者通过向被指派为群组协调器的 Broker 发送心跳来维护他和群组的从属关系,当机器宕掉后,群组协调器触发再均衡。

:::

分区

【中级】什么是分区?为什么要分区?

:::details 要点

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

img

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。

为什么 Kafka 的数据结构采用三级结构?

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。

:::

【中级】Kafka 的分区策略是怎样的?

:::details 要点

所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。

Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。

如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。

  • 没有 key 时的分发逻辑:每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。
  • 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错

:::

【中级】如何自定义分区策略?

:::details 要点

如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?

首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partitionclose,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

1
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的 topickeykeyBytesvaluevalueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。

接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。

负载均衡算法常见的有:

  • 随机算法
  • 轮询算法
  • 最小活跃数算法
  • 源地址哈希算法

可以根据实际需要去实现。

:::

【高级】Kafka 如何实现分区再均衡?

:::details 要点

什么是分区再均衡

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)Rebalance 实现了消费者群组的高可用性和伸缩性

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

当在群组里面新增/移除消费者或者新增/移除 kafka 集群 broker 节点时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。

何时生分区再均衡

分区再均衡的触发时机有三种:

  • 消费者群组成员数发生变更。比如有新的 Consumer 加入群组或者离开群组,或者是有 Consumer 实例崩溃被“踢出”群组。
    • 新增消费者。consumer 订阅主题之后,第一次执行 poll 方法
    • 移除消费者。执行 consumer.close() 操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
    • 新增 broker。如重启 broker 节点
    • 移除 broker。如 kill 掉 broker 节点。

分区再均衡的过程

Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的

(1)选择群主

当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。

(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。

  • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
  • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。

(4)群主分配完成之后,把分配情况发送给群组协调器。

(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

如何判定消费者已经死亡

消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。

当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。

查找协调者

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

  2. 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

:::

【高级】分区再均衡存在什么问题?如何避免分区再均衡?

:::details 要点

分区再均衡的问题

  • 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  • 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • 最后,Rebalance 实在是太慢了。

避免分区再均衡

通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。

分区再均衡发生的时机有三个:

  • 消费群组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:消费群组成员数量发生变化。

有两种情况,消费者并没有宕机,但也被视为消亡:

  • 未及时发送心跳
  • Consumer 消费时间过长
未及时发送心跳

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要合理设置会话超时时间。这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

Consumer 消费时间过长

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,**max.poll.interval.ms** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

GC 参数

如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。

:::

复制

【中级】Kafka 如何管理副本?

:::details 要点

副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。

副本机制有哪些好处?

  1. 提供可用性:有句俗语叫:鸡蛋不要放在一个篮子里。副本机制也是一个道理——当部分节点宕机时,系统仍然可以依靠其他正常运转的节点,从整体上对外继续提供服务。
  2. 提供伸缩性:通过增加、减少机器可以控制系统整体的吞吐量。
  3. 改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

但是,Kafka 只实现了第一个好处,原因后面会阐述。

  • 每个 Partition 都有一个 Leader,零个或多个 Follower。
  • Leader 处理一切对 Partition (分区)的读写请求;而 Follower 只需被动的同步 Leader 上的数据。
  • 同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份。

Kafka 副本角色

Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志

Kafka 副本有两种角色:

  • Leader 副本(主):每个 Partition 都有且仅有一个 Leader 副本。为了保证数据一致性,Leader 处理一切对 Partition (分区)的读写请求
  • Follower 副本(从):Leader 副本以外的副本都是 Follower 副本。Follower 唯一的任务就是从 Leader 那里复制消息,保持与 Leader 一致的状态
  • 如果 Leader 宕机,其中一个 Follower 会被选举为新的 Leader。

为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。

Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。

除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。

ISR

ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。

判断 Follower 是否与 Leader 同步的标准

Kafka Broker 端参数 replica.lag.time.max.ms 参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。

Unclean 领导者选举

因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举

开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

:::

可靠传输

【高级】如何保证 Kafka 消息不丢失?

:::details 要点

如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。

一条消息从生产到消费,可以划分三个阶段:

  • 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
  • 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
  • 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。

这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。

存储阶段不丢消息

存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

上面的话可以解读为:

  • 已提交只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
  • 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
  • 只要还有一个副本是存活的,那么已提交的消息就不会丢失
  • 消费者只能读取已提交的消息

Kafka 的副本机制是 kafka 可靠性保证的核心

Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。

Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

  • 副本数 - replication.factor 的作用是设置每个分区的副本数replication.factor 是主题级别配置; default.replication.factor 是 broker 级别配置。副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
  • 不完全的选主 - unclean.leader.election.enable 用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable 是 broker 级别(实际上是集群范围内)配置,默认值为 true。
    • 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险
    • 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。
  • 最少同步副本 - min.insync.replicas 控制的是消息至少要被写入到多少个副本才算是“已提交”min.insync.replicas 是主题级别和 broker 级别配置。尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
    • 如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
    • 注意:要确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

生产阶段不丢消息

在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。

Kafka 有三种发送方式:同步、异步、异步回调。同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。

解决生产者丢失消息的方案:

生产者使用异步回调方式 producer.send(msg, callback) 发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

  • 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
  • 如果是消息不合格造成的,那么可以调整消息格式后再次发送。

然后,需要基于以下几点来保证 Kafka 生产者的可靠性:

  • ACK - 生产者可选的确认模式有三种:acks=0acks=1acks=all
    • acks=0acks=1 都有丢失数据的风险。
    • acks=all 意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas ,就可以决定在得到确认响应前,至少有多少副本能够收到消息。这是最保险的做法,但也会降低吞吐量。
  • 重试 - 如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
    • 可重试错误,如:LEADER_NOT_AVAILABLE,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。
    • 不可重试错误,如:INVALID_CONFIG,即使重试,也无法改变配置选项,重试没有意义。
  • 错误处理 - 开发者需要自行处理的错误:
    • 不可重试的 broker 错误,如消息大小错误、认证错误等;
    • 消息发送前发生的错误,如序列化错误;
    • 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。

消费阶段不丢消息

前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。

消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。

img

消费者的可靠性配置:

  • group.id - 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
  • auto.offset.reset - 有两个选项:
    • earliest - 消费者会从分区的开始位置读取数据
    • latest - 消费者会从分区末尾位置读取数据
  • enable.auto.commit - 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。
  • auto.commit.interval.ms - 自动提交的频率,默认为每 5 秒提交一次。

如果 enable.auto.commit 设为 true,即自动提交,就无需考虑提交偏移量的问题。

如果选择显示提交偏移量,需要考虑以下问题:

  • 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
  • 提交频率是性能和重复消息数之间的权衡
  • 分区再均衡
  • 消费可能需要重试机制
  • 超时处理
  • 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
  • 幂等性设计
    • 写数据库:根据主键判断记录是否存在
    • 写 Redis:set 操作天然具有幂等性
    • 复杂的逻辑处理,则可以在消息中加入全局 ID

:::

【高级】如何保证 Kafka 消息不重复?

:::details 要点

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

常用的实现幂等操作的方法:

  • 利用数据库的唯一约束实现幂等 - 关系型数据库可以使用 INSERT IF NOT EXIST 语句防止重复;Redis 可以使用 SETNX 命令来防止重复;其他数据库只要支持类似语义,也是一个道理。
  • 为更新的数据设置前置条件 - 如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
  • 记录并检查操作- 也称为“Token 机制或者 GUID(全局唯一 ID)机制”,通用性最强,适用范围最广。实现的思路特别简单,在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
    • 具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
    • 需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。

:::

【高级】如何保证 Kafka 消息有序?

:::details 要点

某些场景下,可能会要求按序发送消息。

方案一、单 Partition

Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。

方案分析

优点:简单粗暴。开发者什么也不用做。

缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。

结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。

方案二、同一个 key 的消息发送给指定 Partition

(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。

(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。

:::

【高级】如何应对 Kafka 消息积压?

:::details 要点

先修复消费者,然后停掉当前所有消费者。

新建 Topic,扩大分区,以提高并发处理能力。

创建临时消费者程序,并部署在多节点上,扩大消费处理能力。

最后处理完积压消息后,恢复原先部署架构。

:::

事务

【中级】Kafka 是否支持事务?如何支持事务?

:::details 要点

Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败

消息可靠性保障,由低到高为:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。

Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

事务属性实现前提是幂等性,即在配置事务属性 transaction.id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

在事务属性之前先引入了生产者幂等性,它的作用为:

  • 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败。
  • consumer-transform-producer 模式下,因为消费者提交偏移量出现问题,导致重复消费。需要将这个模式下消费者提交偏移量操作和生产者一系列生成消息的操作封装成一个原子操作。

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息(o1 到 o2 之间的消息)。

Kafka 事务相关配置

使用 kafka 的事务 api 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行 consumer#commitSync 或者 consumer#commitAsyc
  • 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。
  • 和幂等性 Producer 一样,开启 enable.idempotence = true。如果配置了 transaction.id,则此时 enable.idempotence 会被设置为 true
  • 消费者需要配置事务隔离级别 isolation.level。在 consume-trnasform-produce 模式下使用事务时,必须设置为 READ_COMMITTED
    • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

:::

架构

【高级】Kafka 的数据存储在磁盘上,为什么还能这么快?

:::details 要点

说 Kafka 很快时,他们通常指的是 Kafka 高效移动大量数据的能力。

Kafka 为了提高传输效率,做了很多精妙的设计。

核心设计:

  • 顺序 I/O - 磁盘读写有两种方式:顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存接近。因为磁盘是机械结构,每次读写都会寻址写入,其中寻址是一个“机械动作”。Kafka 利用了一种分段式的、只追加 (Append-Only) 的日志,基本上把自身的读写操作限制为顺序 I/O,也就使得它在各种存储介质上能有很快的速度。
  • 零拷贝 - Kafka 数据传输是一个从网络到磁盘,再由磁盘到网络的过程。在网络和磁盘之间传输数据时,消除多余的复制是提高效率的关键。Kafka 利用零拷贝技术来消除传输过程中的多余复制
    • 如果不采用零拷贝,Kafka 将数据同步给消费者的大致流程是:
      1. 从磁盘加载数据到 os buffer
      2. 拷贝数据到 app buffer
      3. 再拷贝数据到 socket buffer
      4. 接下来,将数据拷贝到网卡 buffer
      5. 最后,通过网络传输,将数据发送到消费者
    • 采用零拷贝技术,Kafka 使用 sendfile() 系统方法,将数据从 os buffer 直接复制到网卡 buffer。这个过程中,唯一一次复制数据是从 os buffer 到网卡 buffer。这个复制过程是通过 DMA(Direct Memory Access,直接内存访问) 完成的。使用 DMA 时,CPU 不参与,这使得它非常高效。

其他设计:

  • 页缓存 - Kafka 的数据并不是实时的写入磁盘,它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Kafka 接收来自 socket buffer 的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用 mmap 内存文件映射。
  • 压缩 - Kafka 内置了几种压缩算法,并允许定制化压缩算法。通过压缩算法,可以有效减少传输数据的大小,从而提升传输效率。
  • 批处理 - Kafka 的 Clients 和 Brokers 会把多条读写的日志记录合并成一个批次,然后才通过网络发送出去。日志记录的批处理通过使用更大的包以及提高带宽效率来摊薄网络往返的开销。
  • 分区 - Kafka 将 Topic 分区,每个分区对应一个名为的 Log 的磁盘目录,而 Log 又根据大小,可以分为多个 Log Segment 文件。这种分而治之的策略,使得 Kafka 可以并发读,以支撑非常高的吞吐量。此外,Kafka 支持负载均衡机制,将数据分区近似均匀地分配给消费者群组的各个消费者。

:::

参考资料