Dunwu Blog

大道至简,知易行难

SpringBoot 之安全快速入门

QuickStart

(1)添加依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

(2)添加配置

1
2
3
spring.security.user.name = root
spring.security.user.password = root
spring.security.user.roles = USER

(3)启动应用后,访问任意路径,都会出现以下页面,提示你先执行登录操作。输入配置的用户名、密码(root/root)即可访问应用页面。

img

设计一个低代码平台

本文目标是设计一个用于提高开发人员开发效率的低代码平台,这里会采用系统解决方案设计的一般思路来逐步探寻设计方案。

业务分析

低代码平台是什么

广义上的低代码平台包括低代码平台和零代码平台,它们都属于 APaaS(Application Platform as a Service 应用平台即服务),两者的主要区别在于对代码的依赖程度:

  • 低代码平台:通过自动代码生成和可视化编程,只需要少量代码,即可快速搭建各种应用。
  • 零代码平台:零开发经验的业务人员通过拖拽等方式,无需编写代码,即可快速搭建各种应用。

技术路线

基于 IDE 框架的快速开发平台

该方案将传统的集成开发环境(IDE)充分可视化,开发者对前端界面组件、数据源绑定方式、数据模型、业务逻辑和工作流等都可以自由定义,平台将自动生成代码,开发者也可以添加自己的代码,对程序具有较强的控制能力,因此该方案具备更高的灵活性,可以设计出定制化程度高、逻辑复杂的软件。

由于该方案仍涉及代码开发、部署等技术工作,所以它仍然是一个技术开发平台,需要较高的学习成本,主要价值是提高开发效率,减少重复劳动。

Outsystems 就是采用该方案的典型产品,如下为产品截图:

img

基于模型驱动的应用平台

用户通过可视化方式构建数据模型、视图、权限、工作流等,即可在平台提供的环境中运行,无需编译部署,更像一种傻瓜式的应用搭建平台。平台对各类组件、业务逻辑做了较高层级的封装,因此用户无法随心所欲修改界面风格、交互方式、处理逻辑等。

该方案可以实现完全零代码,对使用者技术要求不高,但需要具备业务抽象、建模能力。主要价值是降低开发门槛、快速适应变化。

明道云、伙伴云等都是此类方案的典型产品,如下为明道云的产品截图:

img

核心要素

绝大部分的企业软件由以下四个部分组成:

  • 业务实体:即操作对象,如客户、订单
  • 业务活动:即进行何种操作,如采购申请、合同审批
  • 业务权限:即何种人拥有何种权限,包括数据查看权限和数据操作权限,如部门经理可以管理所有下属的客户信息,而员工只能管理自己的客户信息
  • 统计报表:即从哪些方面量化企业活动情况,如客户增长率、各月销售额趋势

低代码平台将以上进行抽象,支持数据模型、业务流程、用户权限、统计图表,因此可以作为更通用的企业软件解决方案,这四类能力也是任何一个低代码平台都必须具备的核心要素。

数据模型

建立数据模型就是提取业务实体的数据特征,抽象为数据表,建立表间关系。制作 ER 图的过程就是数据建模。市面上常见的低代码平台均提供了丰富的控件,可以拖拽完成数据模型搭建。此外,数据模型搭建与表单展示合二为一,每完成一个数据表的创建,就自动生成了该表的增删改查功能及相关页面,进而隐藏了数据库设计、前端开发这些专业技术。其实,这也就是我们常说的表单引擎。

这里顺便提一下,虽然很多低代码平台将数据建模与表单展示合二为一,但通过这种方式自动生成的表单只能实现最基础的增删改查页面,用户对界面展示内容及形式的控制程度很低,无法满足大部分企业软件的需求,所以低代码平台一般还会提供自定义页面功能,用户可根据需要在页面上配置按钮、图表等元素,满足个性化需求。

业务流程

业务流程指为了实现某项目标,由多人合作,按照一定的规则、顺序进行的一系列活动,在软件中,业务流程的参与者可以是人,也可以是程序。低代码平台实现了可视化流程配置,用户对触发条件、处理节点、节点参与者进行配置,实现自定义业务流程。

用户权限

大部分的低代码平台都采用了非常经典的 RBAC(Role-Based Access Control )模型管理用户权限,简单来说就是将拥有相同权限的用户添加为相同角色,通过为角色分配权限,实现了“用户——角色——权限”的授权模式。由于企业是一个组织,一般都会有部门的概念,所以也可以将部门添加到某个角色,实现“用户——部门——角色——权限”的授权模式。

统计图表

统计图表可以类比 Excel 中的透视图,统计图表由数据源、统计规则、展示形式定义,低代码平台也正是遵循这种方式,实现统计图表的可视化配置。

流行产品

OutSystems

OutSystems 是快速应用开发的头号低代码平台,并且是 2018 年 Gartner 高生产力平台的领导者。OutSystems 号称将低代码功能与高级移动功能相结合的唯一解决方案,它支持整个应用程序组合的可视化开发,可轻松与现有系统集成。

Mendix

Mendix 帮助企业改善创新方式。通过使用可视化模型,在 Mendix 上构建应用程序非常简单,快速且直观,可使开发人员和业务分析人员等众多人员构建强大的应用程序,而无需编写代码。借助模型驱动开发,业务领导者和 IT 部门可以共享语言来快速构建应用程序。

顶层设计

由于系统的用户群体是有一定技术基础的开发人员。所以,系统定位是低代码平台,而非零代码平台。

其次,由于系统主要是用于简化基本的页面开发,所以技术路线应该选择:基于模型驱动的应用平台。

最后,由于生成的代码是应用于 Java Web 框架。生成的后端代码是 java 代码;前端代码是基于 vue + element-ui 生态的前端代码。

img

代码生成规则对应的数据建模:

img

自动生成前后端代码。

  • 后端代码
    • Controller
    • Service
    • ServiceImpl
    • Daomybatis
    • DaoImpl
    • Mapper
    • Query
    • Dto
    • Entity
    • xml
  • 前端代码
    • List
    • Form
    • Api

组件设计

列表页

搜索栏

操作栏

表格

分页

表单页

表单组件

校验器

扩展设计

参考资料

Kafka 集群

Kafka 是一个分布式的、可水平扩展的、基于发布/订阅模式的、支持容错的消息系统。

Kafka 和 ZooKeeper

Kafka 使用 Zookeeper 来维护集群成员的信息。每个 Broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在 Broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。Kafka 组件订阅 Zookeeper 的 /broker/ids 路径,当有 Broker 加入集群或退出集群时,这些组件就可以获得通知。

如果要启动另一个具有相同 ID 的 Broker,会得到一个错误——新 Broker 会试着进行注册,但不会成功,因为 ZooKeeper 中已经有一个具有相同 ID 的 Broker。

在 Broker 停机、出现网络分区或长时间垃圾回收停顿时,Broker 会与 ZooKeeper 断开连接,此时 Broker 在启动时创建的临时节点会自动被 ZooKeeper 移除。监听 Broker 列表的 Kafka 组件会被告知 Broker 已移除。

img

Kafka 在 ZooKeeper 的关键存储信息:

  • admin:存储管理信息。主要为删除主题事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)
  • brokers:存储 Broker 相关信息。broker 节点以及节点上的主题相关信息
  • cluster:存储 kafka 集群信息
  • config:存储 broker,client,topic,user 以及 changer 相关的配置信息
  • consumers:存储消费者相关信息
  • controller:存储控制器节点信息
  • controller_epoch:存储控制器节点当前的年龄(说明控制器节点变更次数)

ZooKeeper 两个重要特性:

  • 客户端会话结束时,ZooKeeper 就会删除临时节点。
  • 客户端注册监听它关心的节点,当节点状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端。

详细内容可以参考:ZooKeeper 原理

控制器

控制器(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。控制器其实就是一个 Broker,只不过它除了具有一般 Broker 的功能以外,还负责 Leader 的选举。

img

如何选举控制器

集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个在 ZooKeeper 成功创建 /controller 临时节点的 Broker 会被指定为控制器

选举控制器的详细流程:

img

  1. 第一个在 ZooKeeper 中成功创建 /controller 临时节点的 Broker 会被指定为控制器。

  2. 其他 Broker 在控制器节点上创建 Zookeeper watch 对象。

  3. 如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 临时节点就会消失。集群中的其他 Broker 通过 watch 对象得到状态变化的通知,它们会尝试让自己成为新的控制器。

  4. 第一个在 Zookeeper 里创建一个临时节点 /controller 的 Broker 成为新控制器。其他 Broker 在新控制器节点上创建 Zookeeper watch 对象。

  5. 每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他节点会忽略旧的 epoch 的消息。

  6. 当控制器发现一个 Broker 已离开集群,并且这个 Broker 是某些 Partition 的 Leader。此时,控制器会遍历这些 Partition,并用轮询方式确定谁应该成为新 Leader,随后,新 Leader 开始处理生产者和消费者的请求,而 Follower 开始从 Leader 那里复制消息。

简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行 Partition Leader 选举。控制器使用 epoch 来避免“脑裂”,“脑裂”是指两个节点同时被认为自己是当前的控制器

控制器的作用

Topic 管理(创建、删除、增加分区)

这里的 Topic 管理,就是指控制器帮助我们完成对 Kafka Topic 的创建、删除以及分区增加的操作。换句话说,当我们执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。

分区重分配

分区重分配主要是指,kafka-reassign-partitions 脚本(关于这个脚本,后面我也会介绍)提供的对已有 Topic 分区进行细粒度的分配功能。这部分功能也是控制器实现的。

选举 Leader

Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。在专栏后面说到工具的时候,我们再详谈 Preferred 领导者选举,这里你只需要了解这也是控制器的职责范围就可以了。

集群成员管理

集群成员管理,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。

比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。

侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。

数据服务

控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

控制器中保存了多种数据,比较重要的的数据有:

  • 所有 Topic 信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

值得注意的是,这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要是指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。

副本机制

副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。

副本机制有哪些好处?

  1. 提供可用性:有句俗语叫:鸡蛋不要放在一个篮子里。副本机制也是一个道理——当部分节点宕机时,系统仍然可以依靠其他正常运转的节点,从整体上对外继续提供服务。
  2. 提供伸缩性:通过增加、减少机器可以控制系统整体的吞吐量。
  3. 改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

但是,Kafka 只实现了第一个好处,原因后面会阐述。

Kafka 副本角色

Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志

img

Kafka 副本有两种角色:

  • Leader 副本(主):每个 Partition 都有且仅有一个 Leader 副本。为了保证数据一致性,Leader 处理一切对 Partition (分区)的读写请求
  • Follower 副本(从):Leader 副本以外的副本都是 Follower 副本。Follower 唯一的任务就是从 Leader 那里复制消息,保持与 Leader 一致的状态
  • 如果 Leader 宕机,其中一个 Follower 会被选举为新的 Leader。

img

为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。

Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。

除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。

ISR

ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。

判断 Follower 是否与 Leader 同步的标准

Kafka Broker 端参数 replica.lag.time.max.ms 参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。

选举 Leader

Unclean 领导者选举

因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举

开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

处理请求

Broker 的大部分工作是处理客户端、Partition 副本和控制器发送给 Partition Leader 的请求。Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 Broker 如何对请求作出响应。

broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接,并把它交给 Processor 线程去处理。Processor 线程的数量是可配置的。Processor 线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。

当请求放进请求队列后,IO 线程负责进行处理。

img

生产请求和获取请求都需要发送给 Partition 的 Leader 副本处理。如果 Broker 收到一个针对特定分区的请求,而该分区的 Leader 在另一个 Broker 上,那么发送请求的客户端会收到一个“非分区 Leader”的错误响应。Kafka 客户端要自己负责把生成请求和获取请求发送到正确的 Broker 上。

元数据请求

客户端怎么知道哪个是 Leader 呢?客户端通过使用另一种类型的请求来实现,那就是元数据请求(metadata request)。这种请求包含了客户端感兴趣的 Topic 列表。broker 的响应消息指明了这些 Topic 所包含的 Partition、Partition 有哪些副本,以及哪个副本是 Leader。元数据请求可以发给任意一个 broker,因为所有 Broker 都缓存了这些信息。

客户端会把这些信息缓存起来,并直接往目标 Broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms 来配置),从而知道元数据是否发生了变化。

img

生产请求

acks 参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:

  • acks=0 - 消息发送完毕,生产者认为消息写入成功;
  • acks=1 - Leader 写入成功,生产者认为消息写入成功;
  • acks=all - 所有同步副本写入成功,生产者才认为消息写入成功。

如果 Leader 收到生产消息,它会执行一些检查逻辑,包含:

  • 发送的用户是否有权限写入 Topic?
  • 请求的 acks 参数取值是否合法(只允许 01all)?
  • 如果 acks 设置为 all,是否有足够的同步副本已经安全写入消息?(我们可以配置如果同步副本数量不足,Leader 拒绝处理新消息)

之后,消息被写入到本地磁盘。一旦消息本地持久化后,如果 acks 被设为 01,那么会返回结果给客户端;如果 acks 被设为 all 那么会将请求放置在一个称为 purgatory 的缓冲区中等待其他的副本写入完成。

消费请求

Leader 处理拉取请求和处理生产请求的方式很相似:

  1. 请求需要先到达指定的 Partition Leader 上,然后客户端通过查询元数据来确保请求的路由是正确的。
  2. Leader 在收到请求时,会先检查请求是否有效。
  3. 如果请求的偏移量存在,Broker 将按照客户端指定的数量上限从 Partition 里读取消息,再把消息返回给客户端。Kafka 使用零拷贝技术向客户端发送消息——也就是说,Kafka 直接把消息从文件(更准确的说,是文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这避免了内存的字节拷贝和缓冲区维护,极大地提高了性能。

客户端可以指定 Broker 返回数据量的上限和下限,防止数据量过大造成客户端内存溢出。同时,客户端也可以指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回。指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。

img

不是所有 Leader 的数据都能够被读取。消费者只能读取已提交的消息只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。

因为还没有被足够的副本持久化的消息,被认为是不安全的——如果 Leader 发生故障,另一个副本成为新的 Leader,这些消息就丢失了。如果允许读取这些消息,就可能会破坏数据一致性。

这也意味着,如果 Broker 间的消息复制因为某些原因变慢了,那么消息到达消费者的时间也会随之变长。延迟时间可以通过 replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大延迟时间。

img

其他请求

我们讨论了 Kafka 中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是 Kafka 的自定义二进制协议。集群中 Broker 间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举 Partition Leader 过程中,控制器会发送 LeaderAndIsr 请求给新的 Leader 和其他跟随副本。

这个协议目前已经支持 20 种请求类型,并且仍然在演进以支持更多的类型。

总结

副本机制

  • 每个 Partition 都有一个 Leader,零个或多个 Follower。
  • Leader 处理一切对 Partition (分区)的读写请求;而 Follower 只需被动的同步 Leader 上的数据。
  • 同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份。

选举机制

Follower 宕机,啥事儿没有;Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。
生产者/消费者如何知道谁是 Leader

  • Kafka 将这种元数据存储在 Zookeeper 服务中。
  • 生产者和消费者都和 Zookeeper 连接并通信。

参考资料

Kafka 存储

Kafka 是 Apache 的开源项目。Kafka 既可以作为一个消息队列中间件,也可以作为一个分布式流处理平台

Kafka 用于构建实时数据管道和流应用。它具有水平可伸缩性,容错性,快速快速性

逻辑存储

img

持久化

持久化是 Kafka 的一个重要特性。

Kafka 集群持久化保存(使用可配置的保留期限)所有发布记录——无论它们是否被消费。但是,Kafka 不会一直保留数据,也不会等待所有的消费者读取了消息才删除消息。只要数据量达到上限(比如 1G)或者数据达到过期时间(比如 7 天),Kafka 就会删除旧消息。Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。

Kafka 对消息的存储和缓存严重依赖于文件系统

  • 顺序磁盘访问在某些情况下比随机内存访问还要快!在 Kafka 中,所有数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这表明数据被转移到了内核的 pagecache 中。所以,虽然 Kafka 数据存储在磁盘中,但其访问性能也不低

  • Kafka 的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端依次将消息块一次加载到它的日志中。这可以有效减少大量的小型 I/O 操作

  • 由于 Kafka 在 Producer、Broker 和 Consumer 都共享标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。这可以避免字节拷贝带来的开销

  • Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 Consumer 消费时解压缩。压缩传输数据,可以有效减少网络带宽开销

    • Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。

所有这些优化都允许 Kafka 以接近网络速度传递消息。

物理存储

Log

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。

Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。

Log 是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log 对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test,那么,Kafka 会在磁盘上创建两个子目录:test-0test-1;而在服务器端,这就对应两个 Log 对象。

Log Segment

img

因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。

Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除

Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

Segment 文件可以分为两类:

  • 索引文件
    • 偏移量索引文件( .index
    • 时间戳索引文件( .timeindex
    • 已终止事务的索引文件(.txnindex):如果没有使用 Kafka 事务,则不会创建该文件
  • 日志数据文件(.log

文件格式

Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式和从生产者发送过来或消费者读取的数据格式是一样的。因为使用了相同的数据格式,使得 Kafka 可以进行零拷贝技术给消费者发送消息,同时避免了压缩和解压。

除了键、值和偏移量外,消息里还包含了消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip 或者 LZ4)和时间戳(0.10.0 新增)。时间戳可以是生产者发送消息的时间,也可以是消息到达 Broker 的时间,这个是可配的。

如果生产者发送的是压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:

img

如果生产者使用了压缩功能,发送的批次越大,就意味着能获得更好的网络传输效率,并且节省磁盘存储空间。

Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。

索引

Kafka 允许消费者从任意有效的偏移量位置开始读取消息。Kafka 为每个 Partition 都维护了一个索引(即 .index 文件),该索引将偏移量映射到片段文件以及偏移量在文件里的位置。

索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重读消息并录制偏移量和位置来重新生成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka 会自动重新生成这些索引。

索引文件用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由 offset 和 position 组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka 采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;

有了偏移量索引文件,通过它,Kafka 就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的 position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。

img

清理

每个日志片段可以分为以下两个部分:

  • 干净的部分:这部分消息之前已经被清理过,每个键只存在一个值。
  • 污浊的部分:在上一次清理后写入的新消息。

img

如果在 Kafka 启动时启用了清理功能(通过 log.cleaner.enabled 配置),每个 Broker 会启动一个清理管理器线程和若干个清理线程,每个线程负责一个 Partition。

清理线程会读取污浊的部分,并在内存里创建一个 map。map 的 key 是消息键的哈希吗,value 是消息的偏移量。对于相同的键,只保留最新的位移。其中 key 的哈希大小为 16 字节,位移大小为 8 个字节。也就是说,一个映射只有 24 字节,假设消息大小为 1KB,那么 1GB 的段有 1 百万条消息,建立这个段的映射只需要 24MB 的内存,映射的内存效率是非常高效的。

在配置 Kafka 时,管理员需要设置这些清理线程可以使用的总内存。如果设置 1GB 的总内存同时有 5 个清理线程,那么每个线程只有 200MB 的内存可用。在清理线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka 会一次清理最老的几个脏段,然后在下一次再处理其他的脏段。

一旦建立完脏段的键与位移的映射后,清理线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。

对于一个段,清理前后的效果如下:

img

删除事件

对于只保留最新消息的清理策略来说,Kafka 还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个 null 的消息内容。当清理线程发现这条消息时,它首先仍然进行一个正常的清理并且保留这个包含 null 的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,清理线程会删除这条消息,这个键会从 Partition 中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。

参考资料

Git 帮助手册

国外网友制作了一张 Git Cheat Sheet,总结很精炼,各位不妨收藏一下。

本节选择性介绍 git 中比较常用的命令行场景。

img

安装

(1)Debian/Ubuntu 环境安装

如果你使用的系统是 Debian/Ubuntu , 安装命令为:

1
2
3
4
5
$ apt-get install libcurl4-gnutls-dev libexpat1-dev gettext \
> libz-dev libssl-dev
$ apt-get install git-core
$ git --version
git version 1.8.1.2

(2)Centos/RedHat 环境安装

如果你使用的系统是 Centos/RedHat ,安装命令为:

1
2
3
4
5
$ yum install curl-devel expat-devel gettext-devel \
> openssl-devel zlib-devel
$ yum -y install git-core
$ git --version
git version 1.7.1

(3)Windows 环境安装

Git 官方下载地址下载 exe 安装包。按照安装向导安装即可。

建议安装 Git Bash 这个 git 的命令行工具。

(4)Mac 环境安装

Git 官方下载地址下载 mac 安装包。按照安装向导安装即可。

配置

Git 自带一个 git config 的工具来帮助设置控制 Git 外观和行为的配置变量。 这些变量存储在三个不同的位置:

  • /etc/gitconfig 文件: 包含系统上每一个用户及他们仓库的通用配置。 如果使用带有 --system 选项的 git config 时,它会从此文件读写配置变量。
  • ~/.gitconfig~/.config/git/config 文件:只针对当前用户。 可以传递 --global 选项让 Git 读写此文件。
  • 当前使用仓库的 Git 目录中的 config 文件(就是 .git/config):针对该仓库。

每一个级别覆盖上一级别的配置,所以 .git/config 的配置变量会覆盖 /etc/gitconfig 中的配置变量。

在 Windows 系统中,Git 会查找 $HOME 目录下(一般情况下是 C:\Users\$USER)的 .gitconfig 文件。 Git 同样也会寻找 /etc/gitconfig 文件,但只限于 MSys 的根目录下,即安装 Git 时所选的目标位置。

配置用户信息

当安装完 Git 应该做的第一件事就是设置你的用户名称与邮件地址。 这样做很重要,因为每一个 Git 的提交都会使用这些信息,并且它会写入到你的每一次提交中,不可更改:

1
2
git config --global user.name "John Doe"
git config --global user.email johndoe@example.com

再次强调,如果使用了 --global 选项,那么该命令只需要运行一次,因为之后无论你在该系统上做任何事情, Git 都会使用那些信息。 当你想针对特定项目使用不同的用户名称与邮件地址时,可以在那个项目目录下运行没有 --global 选项的命令来配置。

很多 GUI 工具都会在第一次运行时帮助你配置这些信息。

给 Git 命令添加别名

在 OS X 和 Linux 下, 你的 Git 的配置文件储存在 ~/.gitconfig。我在[alias] 部分添加了一些快捷别名(和一些我容易拼写错误的),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[alias]
a = add
amend = commit --amend
c = commit
ca = commit --amend
ci = commit -a
co = checkout
d = diff
dc = diff --changed
ds = diff --staged
f = fetch
loll = log --graph --decorate --pretty=oneline --abbrev-commit
m = merge
one = log --pretty=oneline
outstanding = rebase -i @{u}
s = status
unpushed = log @{u}
wc = whatchanged
wip = rebase -i @{u}
zap = fetch -p

缓存一个仓库的用户名和密码

你可能有一个仓库需要授权,这时你可以缓存用户名和密码,而不用每次推/拉(push/pull)的时候都输入,Credential helper 能帮你。

1
2
git config --global credential.helper cache
## Set git to use the credential memory cache
1
2
git config --global credential.helper 'cache --timeout=3600'
## Set the cache to timeout after 1 hour (setting is in seconds)

仓库

初始化仓库

1
$ git init

克隆仓库

1
2
3
4
# 通过 SSH
$ git clone ssh://user@domain.com/repo.git
# 通过 HTTP
$ git clone http://domain.com/user/repo.git

储藏

有时,我们需要在同一个项目的不同分支上工作。当需要切换分支时,偏偏本地的工作还没有完成,此时,提交修改显得不严谨,但是不提交代码又无法切换分支。这时,你可以使用 git stash 将本地的修改内容作为草稿储藏起来。

官方称之为储藏,但我个人更喜欢称之为存草稿。

1
2
3
4
5
6
7
8
9
10
11
12
# 1. 将修改作为当前分支的草稿保存
$ git stash

# 2. 查看草稿列表
$ git stash list
stash@{0}: WIP on master: 6fae349 :memo: Writing docs.

# 3.1 删除草稿
$ git stash drop stash@{0}

# 3.2 读取草稿
$ git stash apply stash@{0}

暂存

git add 命令用于将修改添加到暂存区。

暂存指定文件

1
git add xxx

暂存当前目录下所有修改

1
git add .

暂存所有修改

1
git add -A

暂存文件部分内容

暂存文件部分内容

1
git add --patch filename.x

-p 简写。这会打开交互模式, 你将能够用 s 选项来分隔提交(commit);

然而, 如果这个文件是新的, 会没有这个选择, 添加一个新文件时,这样做:

1
git add -N filename.x

然后, 你需要用 e 选项来手动选择需要添加的行,执行 git diff --cached 将会显示哪些行暂存了哪些行只是保存在本地了。

把暂存的内容变成未暂存,把未暂存的内容暂存起来

这个有点困难, 我能想到的最好的方法是先 stash 未暂存的内容, 然后重置(reset),再 pop 第一步 stashed 的内容, 最后再 add 它们。

1
2
3
4
git stash -k
git reset --hard
git stash pop
git add -A

提交

git commit 命令用于将修改保存到到本地仓库。

查看最近一次提交

1
git show

或者

1
git log -n1 -p

提交本地的所有修改

1
git commit -a

提交暂存的修改

1
git commit

把暂存的内容添加到上一次的提交

1
git commit --amend

附加消息提交

1
git commit -m 'commit message'

修改提交信息

如果你的提交信息写错了且这次提交(commit)还没有推送(push),可以使用以下命令修改:

1
git commit --amend

或者

1
git commit --amend -m 'xxxxxxx'

修改提交信息中的用户名和邮箱

1
git commit --amend --author "New Authorname <authoremail@mydomain.com>"

从提交中移除一个文件

1
2
3
git checkout HEAD^ myfile
git add -A
git commit --amend

删除最后一次提交

如果你需要删除推了的提交(pushed commits),你可以使用下面的方法。可是,这会不可逆的改变你的历史,也会搞乱那些已经从该仓库拉取(pulled)了的人的历史。简而言之,如果你不是很确定,千万不要这么做。

1
2
git reset HEAD^ --hard
git push -f [remote] [branch]

如果你还没有推到远程, 把 Git 重置(reset)到你最后一次提交前的状态就可以了(同时保存暂存的变化):

1
2
(my-branch*)$ git reset --soft HEAD@{1}

这只能在没有推送之前有用. 如果你已经推了, 唯一安全能做的是 git revert SHAofBadCommit, 那会创建一个新的提交(commit)用于撤消前一个提交的所有变化(changes); 或者, 如果你推的这个分支是 rebase-safe 的 (例如: 其它开发者不会从这个分支拉), 只需要使用 git push -f; 更多, 请参考 the above section

删除任意提交

同样的警告:不到万不得已的时候不要这么做.

1
2
git rebase --onto SHA1_OF_BAD_COMMIT^ SHA1_OF_BAD_COMMIT
git push -f [remote] [branch]

或者做一个 交互式 rebase 删除那些你想要删除的提交(commit)里所对应的行。

我尝试推一个修正后的提交(amended commit)到远程,但是报错

1
2
3
4
5
6
7
To https://github.com/yourusername/repo.git
! [rejected] mybranch -> mybranch (non-fast-forward)
error: failed to push some refs to 'https://github.com/tanay1337/webmaker.org.git'
hint: Updates were rejected because the tip of your current branch is behind
hint: its remote counterpart. Integrate the remote changes (e.g.
hint: 'git pull ...') before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.

注意, rebasing(见下面)和修正(amending)会用一个新的提交(commit)代替旧的, 所以如果之前你已经往远程仓库上推过一次修正前的提交(commit),那你现在就必须强推(force push) (-f)。 注意 – 总是 确保你指明一个分支!

1
(my-branch)$ git push origin mybranch -f

一般来说, 要避免强推. 最好是创建和推(push)一个新的提交(commit),而不是强推一个修正后的提交。后者会使那些与该分支或该分支的子分支工作的开发者,在源历史中产生冲突。

不小心强制重置,想找回内容

如果你意外的做了 git reset --hard, 你通常能找回你的提交(commit), 因为 Git 对每件事都会有日志,且都会保存几天。

1
(master)$ git reflog

你将会看到一个你过去提交(commit)的列表, 和一个重置的提交。 选择你想要回到的提交(commit)的 SHA,再重置一次:

1
(master)$ git reset --hard SHA1234

这样就完成了。

重置

撤销本地修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 移除缓存区的所有文件(i.e. 撤销上次git add)
$ git reset HEAD

# 将HEAD重置到上一次提交的版本,并将之后的修改标记为未添加到缓存区的修改
$ git reset <commit>

# 将HEAD重置到上一次提交的版本,并保留未提交的本地修改
$ git reset --keep <commit>

# 放弃工作目录下的所有修改
$ git reset --hard HEAD

# 将HEAD重置到指定的版本,并抛弃该版本之后的所有修改
$ git reset --hard <commit-hash>

# 用远端分支强制覆盖本地分支
$ git reset --hard <remote/branch> e.g., upstream/master, origin/my-feature

# 放弃某个文件的所有本地修改
$ git checkout HEAD <file>

删除添加.gitignore文件前错误提交的文件:

1
2
3
4
# 提交一条 git 记录,提交信息为 remove xyz file
$ git rm -r --cached .
$ git add .
$ git commit -m "remove xyz file"

撤销远程修改(创建一个新的提交,并回滚到指定版本):

1
2
# revert 哈希号为 commit-hash 的记录
$ git revert <commit-hash>

彻底删除指定版本:

1
2
3
# 执行下面命令后,commit-hash 提交后的记录都会被彻底删除,使用需谨慎
$ git reset --hard <commit-hash>
$ git push -f

更新

1
2
3
4
5
6
7
8
# 下载远程端版本,但不合并到HEAD中
$ git fetch <remote>

# 将远程端版本合并到本地版本中
$ git pull origin master

# 以rebase方式将远端分支与本地合并
$ git pull --rebase <remote> <branch>

推送

推送提交到远程仓库

1
git push remote <remote> <branch>

发布标签

1
git push --tags

未暂存

未暂存(Unstaged)的内容

把未暂存的内容移动到一个新分支

  • git checkout -b my-branch

我想把未暂存的内容移动到另一个已存在的分支

1
2
3
git stash
git checkout my-branch
git stash pop

丢弃本地未提交的变化

如果你只是想重置源(origin)和你本地(local)之间的一些提交(commit),你可以:

1
2
3
4
5
6
7
8
## one commit
$ git reset --hard HEAD^
## two commits
$ git reset --hard HEAD^^
## four commits
$ git reset --hard HEAD~4
## or
$ git checkout -f

重置某个特殊的文件, 你可以用文件名做为参数:

1
git reset filename

我想丢弃某些未暂存的内容

如果你想丢弃工作拷贝中的一部分内容,而不是全部。

签出(checkout)不需要的内容,保留需要的。

1
2
$ git checkout -p
## Answer y to all of the snippets you want to drop

另外一个方法是使用 stash, Stash 所有要保留下的内容, 重置工作拷贝, 重新应用保留的部分。

1
2
3
4
$ git stash -p
## Select all of the snippets you want to save
$ git reset --hard
$ git stash pop

或者, stash 你不需要的部分, 然后 stash drop。

1
2
3
$ git stash -p
## Select all of the snippets you don't want to save
$ git stash drop

分支

分支(Branches)

列出所有的分支

1
git branch

列出所有的远端分支

1
git branch -r

基于当前分支创建新分支

1
git branch <new-branch>

基于远程分支创建新分支

1
git branch --track <new-branch> <remote-branch>

删除本地分支

1
git branch -d <branch>

强制删除本地分支

注意:强制删除本地分支,将会丢失未合并的修改

1
git branch -D <branch>

删除远程分支

1
2
git push <remote> :<branch> (since Git v1.5.0)
git push <remote> --delete <branch> (since Git v1.7.0)

切换分支

1
git checkout <branch>

创建并切换到新分支

1
git checkout -b <branch>

我从错误的分支拉取了内容,或把内容拉取到了错误的分支

这是另外一种使用 git reflog 情况,找到在这次错误拉(pull) 之前 HEAD 的指向。

1
2
3
(master)$ git reflog
ab7555f HEAD@{0}: pull origin wrong-branch: Fast-forward
c5bc55a HEAD@{1}: checkout: checkout message goes here

重置分支到你所需的提交(desired commit):

1
git reset --hard c5bc55a

完成。

我想扔掉本地的提交(commit),以便我的分支与远程的保持一致

先确认你没有推(push)你的内容到远程。

git status 会显示你领先(ahead)源(origin)多少个提交:

1
2
3
4
5
(my-branch)$ git status
## On branch my-branch
## Your branch is ahead of 'origin/my-branch' by 2 commits.
## (use "git push" to publish your local commits)
#

一种方法是:

1
(master)$ git reset --hard origin/my-branch

我需要提交到一个新分支,但错误的提交到了 master

在 master 下创建一个新分支,不切换到新分支,仍在 master 下:

1
(master)$ git branch my-branch

把 master 分支重置到前一个提交:

1
(master)$ git reset --hard HEAD^

HEAD^HEAD^1 的简写,你可以通过指定要设置的HEAD来进一步重置。

或者, 如果你不想使用 HEAD^, 找到你想重置到的提交(commit)的 hash(git log 能够完成), 然后重置到这个 hash。 使用git push 同步内容到远程。

例如, master 分支想重置到的提交的 hash 为a13b85e:

1
2
(master)$ git reset --hard a13b85e
HEAD is now at a13b85e

签出(checkout)刚才新建的分支继续工作:

1
(master)$ git checkout my-branch

我想保留来自另外一个 ref-ish 的整个文件

假设你正在做一个原型方案(原文为 working spike (see note)), 有成百的内容,每个都工作得很好。现在, 你提交到了一个分支,保存工作内容:

1
(solution)$ git add -A && git commit -m "Adding all changes from this spike into one big commit."

当你想要把它放到一个分支里 (可能是feature, 或者 develop), 你关心是保持整个文件的完整,你想要一个大的提交分隔成比较小。

假设你有:

  • 分支 solution, 拥有原型方案, 领先 develop 分支。
  • 分支 develop, 在这里你应用原型方案的一些内容。

我去可以通过把内容拿到你的分支里,来解决这个问题:

1
(develop)$ git checkout solution -- file1.txt

这会把这个文件内容从分支 solution 拿到分支 develop 里来:

1
2
3
4
5
6
## On branch develop
## Your branch is up-to-date with 'origin/develop'.
## Changes to be committed:
## (use "git reset HEAD <file>..." to unstage)
#
## modified: file1.txt

然后, 正常提交。

Note: Spike solutions are made to analyze or solve the problem. These solutions are used for estimation and discarded once everyone gets clear visualization of the problem. ~ Wikipedia.

我把几个提交(commit)提交到了同一个分支,而这些提交应该分布在不同的分支里

假设你有一个master分支, 执行git log, 你看到你做过两次提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(master)$ git log

commit e3851e817c451cc36f2e6f3049db528415e3c114
Author: Alex Lee <alexlee@example.com>
Date: Tue Jul 22 15:39:27 2014 -0400

Bug #21 - Added CSRF protection

commit 5ea51731d150f7ddc4a365437931cd8be3bf3131
Author: Alex Lee <alexlee@example.com>
Date: Tue Jul 22 15:39:12 2014 -0400

Bug #14 - Fixed spacing on title

commit a13b85e984171c6e2a1729bb061994525f626d14
Author: Aki Rose <akirose@example.com>
Date: Tue Jul 21 01:12:48 2014 -0400

First commit

让我们用提交 hash(commit hash)标记 bug (e3851e8 for #21, 5ea5173 for #14).

首先, 我们把master分支重置到正确的提交(a13b85e):

1
2
(master)$ git reset --hard a13b85e
HEAD is now at a13b85e

现在, 我们对 bug #21 创建一个新的分支:

1
2
(master)$ git checkout -b 21
(21)$

接着, 我们用 cherry-pick 把对 bug #21 的提交放入当前分支。 这意味着我们将应用(apply)这个提交(commit),仅仅这一个提交(commit),直接在 HEAD 上面。

1
(21)$ git cherry-pick e3851e8

这时候, 这里可能会产生冲突, 参见交互式 rebasing 章 冲突节 解决冲突.

再者, 我们为 bug #14 创建一个新的分支, 也基于master分支

1
2
3
(21)$ git checkout master
(master)$ git checkout -b 14
(14)$

最后, 为 bug #14 执行 cherry-pick:

1
(14)$ git cherry-pick 5ea5173

我想删除上游(upstream)分支被删除了的本地分支

一旦你在 github 上面合并(merge)了一个 pull request, 你就可以删除你 fork 里被合并的分支。 如果你不准备继续在这个分支里工作, 删除这个分支的本地拷贝会更干净,使你不会陷入工作分支和一堆陈旧分支的混乱之中。

1
git fetch -p

我不小心删除了我的分支

如果你定期推送到远程, 多数情况下应该是安全的,但有些时候还是可能删除了还没有推到远程的分支。 让我们先创建一个分支和一个新的文件:

1
2
3
4
5
(master)$ git checkout -b my-branch
(my-branch)$ git branch
(my-branch)$ touch foo.txt
(my-branch)$ ls
README.md foo.txt

添加文件并做一次提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(my-branch)$ git add .
(my-branch)$ git commit -m 'foo.txt added'
(my-branch)$ foo.txt added
1 files changed, 1 insertions(+)
create mode 100644 foo.txt
(my-branch)$ git log

commit 4e3cd85a670ced7cc17a2b5d8d3d809ac88d5012
Author: siemiatj <siemiatj@example.com>
Date: Wed Jul 30 00:34:10 2014 +0200

foo.txt added

commit 69204cdf0acbab201619d95ad8295928e7f411d5
Author: Kate Hudson <katehudson@example.com>
Date: Tue Jul 29 13:14:46 2014 -0400

Fixes #6: Force pushing after amending commits

现在我们切回到主(master)分支,‘不小心的’删除my-branch分支

1
2
3
4
5
6
7
(my-branch)$ git checkout master
Switched to branch 'master'
Your branch is up-to-date with 'origin/master'.
(master)$ git branch -D my-branch
Deleted branch my-branch (was 4e3cd85).
(master)$ echo oh noes, deleted my branch!
oh noes, deleted my branch!

在这时候你应该想起了reflog, 一个升级版的日志,它存储了仓库(repo)里面所有动作的历史。

1
2
3
4
(master)$ git reflog
69204cd HEAD@{0}: checkout: moving from my-branch to master
4e3cd85 HEAD@{1}: commit: foo.txt added
69204cd HEAD@{2}: checkout: moving from master to my-branch

正如你所见,我们有一个来自删除分支的提交 hash(commit hash),接下来看看是否能恢复删除了的分支。

1
2
3
4
5
6
(master)$ git checkout -b my-branch-help
Switched to a new branch 'my-branch-help'
(my-branch-help)$ git reset --hard 4e3cd85
HEAD is now at 4e3cd85 foo.txt added
(my-branch-help)$ ls
README.md foo.txt

看! 我们把删除的文件找回来了。 Git 的 reflog 在 rebasing 出错的时候也是同样有用的。

我想删除一个分支

删除一个远程分支:

1
(master)$ git push origin --delete my-branch

你也可以:

1
(master)$ git push origin :my-branch

删除一个本地分支:

1
(master)$ git branch -D my-branch

我想从别人正在工作的远程分支签出(checkout)一个分支

首先, 从远程拉取(fetch) 所有分支:

1
(master)$ git fetch --all

假设你想要从远程的daves分支签出到本地的daves

1
2
3
(master)$ git checkout --track origin/daves
Branch daves set up to track remote branch daves from origin.
Switched to a new branch 'daves'

(--trackgit checkout -b [branch] [remotename]/[branch] 的简写)

这样就得到了一个daves分支的本地拷贝, 任何推过(pushed)的更新,远程都能看到.

标签

添加标签

1
$ git tag <tag-name>

添加标签并附加消息

1
$ git tag -a <tag-name>

删除标签

1
2
git tag -d <tag_name>
git push <remote> :refs/tags/<tag_name>

恢复已删除标签

如果你想恢复一个已删除标签(tag), 可以按照下面的步骤: 首先, 需要找到无法访问的标签(unreachable tag):

1
git fsck --unreachable | grep tag

记下这个标签(tag)的 hash,然后用 Git 的 update-ref:

1
git update-ref refs/tags/<tag_name> <hash>

这时你的标签(tag)应该已经恢复了。

Rebase 和 Merge

merge 与 rebase 虽然是 git 常用功能,但是强烈建议不要使用 git 命令来完成这项工作。

因为如果出现代码冲突,在没有代码比对工具的情况下,实在太艰难了。

你可以考虑使用各种 Git GUI 工具。

将分支合并到当前 HEAD 中

1
git merge <branch>

将当前 HEAD 版本重置到分支中

1
git rebase <branch>

撤销 rebase/merge

你可以合并(merge)或 rebase 了一个错误的分支, 或者完成不了一个进行中的 rebase/merge。 Git 在进行危险操作的时候会把原始的 HEAD 保存在一个叫 ORIG_HEAD 的变量里, 所以要把分支恢复到 rebase/merge 前的状态是很容易的。

1
(my-branch)$ git reset --hard ORIG_HEAD

我已经 rebase 过, 但是我不想强推(force push)

不幸的是,如果你想把这些变化(changes)反应到远程分支上,你就必须得强推(force push)。 是因你快进(Fast forward)了提交,改变了 Git 历史, 远程分支不会接受变化(changes),除非强推(force push)。这就是许多人使用 merge 工作流, 而不是 rebasing 工作流的主要原因之一, 开发者的强推(force push)会使大的团队陷入麻烦。使用时需要注意,一种安全使用 rebase 的方法是,不要把你的变化(changes)反映到远程分支上, 而是按下面的做:

1
2
3
4
(master)$ git checkout my-branch
(my-branch)$ git rebase -i master
(my-branch)$ git checkout master
(master)$ git merge --ff-only my-branch

更多, 参见 this SO thread.

我需要组合(combine)几个提交(commit)

假设你的工作分支将会做对于 master 的 pull-request。 一般情况下你不关心提交(commit)的时间戳,只想组合 所有 提交(commit) 到一个单独的里面, 然后重置(reset)重提交(recommit)。 确保主(master)分支是最新的和你的变化都已经提交了, 然后:

1
2
(my-branch)$ git reset --soft master
(my-branch)$ git commit -am "New awesome feature"

如果你想要更多的控制, 想要保留时间戳, 你需要做交互式 rebase (interactive rebase):

1
(my-branch)$ git rebase -i master

如果没有相对的其它分支, 你将不得不相对自己的HEAD 进行 rebase。 例如:你想组合最近的两次提交(commit), 你将相对于HEAD\~2 进行 rebase, 组合最近 3 次提交(commit), 相对于HEAD\~3, 等等。

1
(master)$ git rebase -i HEAD~2

在你执行了交互式 rebase 的命令(interactive rebase command)后, 你将在你的编辑器里看到类似下面的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
pick b729ad5 fixup
pick e3851e8 another fix

## Rebase 8074d12..b729ad5 onto 8074d12
#
## Commands:
## p, pick = use commit
## r, reword = use commit, but edit the commit message
## e, edit = use commit, but stop for amending
## s, squash = use commit, but meld into previous commit
## f, fixup = like "squash", but discard this commit's log message
## x, exec = run command (the rest of the line) using shell
#
## These lines can be re-ordered; they are executed from top to bottom.
#
## If you remove a line here THAT COMMIT WILL BE LOST.
#
## However, if you remove everything, the rebase will be aborted.
#
## Note that empty commits are commented out

所有以 # 开头的行都是注释, 不会影响 rebase.

然后,你可以用任何上面命令列表的命令替换 pick, 你也可以通过删除对应的行来删除一个提交(commit)。

例如, 如果你想 单独保留最旧(first)的提交(commit),组合所有剩下的到第二个里面, 你就应该编辑第二个提交(commit)后面的每个提交(commit) 前的单词为 f:

1
2
3
4
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
f b729ad5 fixup
f e3851e8 another fix

如果你想组合这些提交(commit) 并重命名这个提交(commit), 你应该在第二个提交(commit)旁边添加一个r,或者更简单的用s 替代 f:

1
2
3
4
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
s b729ad5 fixup
s e3851e8 another fix

你可以在接下来弹出的文本提示框里重命名提交(commit)。

1
2
3
4
5
6
7
8
9
10
11
Newer, awesomer features

## Please enter the commit message for your changes. Lines starting
## with '#' will be ignored, and an empty message aborts the commit.
## rebase in progress; onto 8074d12
## You are currently editing a commit while rebasing branch 'master' on '8074d12'.
#
## Changes to be committed:
# modified: README.md
#

如果成功了, 你应该看到类似下面的内容:

1
(master)$ Successfully rebased and updated refs/heads/master.

安全合并(merging)策略

--no-commit 执行合并(merge)但不自动提交, 给用户在做提交前检查和修改的机会。 no-ff 会为特性分支(feature branch)的存在过留下证据, 保持项目历史一致。

1
(master)$ git merge --no-ff --no-commit my-branch

我需要将一个分支合并成一个提交(commit)

1
(master)$ git merge --squash my-branch

我只想组合(combine)未推的提交(unpushed commit)

有时候,在将数据推向上游之前,你有几个正在进行的工作提交(commit)。这时候不希望把已经推(push)过的组合进来,因为其他人可能已经有提交(commit)引用它们了。

1
(master)$ git rebase -i @{u}

这会产生一次交互式的 rebase(interactive rebase), 只会列出没有推(push)的提交(commit), 在这个列表时进行 reorder/fix/squash 都是安全的。

检查是否分支上的所有提交(commit)都合并(merge)过了

检查一个分支上的所有提交(commit)是否都已经合并(merge)到了其它分支, 你应该在这些分支的 head(或任何 commits)之间做一次 diff:

1
(master)$ git log --graph --left-right --cherry-pick --oneline HEAD...feature/120-on-scroll

这会告诉你在一个分支里有而另一个分支没有的所有提交(commit), 和分支之间不共享的提交(commit)的列表。 另一个做法可以是:

1
(master)$ git log master ^feature/120-on-scroll --no-merges

交互式 rebase(interactive rebase)可能出现的问题

这个 rebase 编辑屏幕出现’noop’

如果你看到的是这样:

1
noop

这意味着你 rebase 的分支和当前分支在同一个提交(commit)上, 或者 领先(ahead) 当前分支。 你可以尝试:

  • 检查确保主(master)分支没有问题
  • rebase HEAD\~2 或者更早

有冲突的情况

如果你不能成功的完成 rebase, 你可能必须要解决冲突。

首先执行 git status 找出哪些文件有冲突:

1
2
3
4
5
6
7
(my-branch)$ git status
On branch my-branch
Changes not staged for commit:
(use "git add <file>..." to update what will be committed)
(use "git checkout -- <file>..." to discard changes in working directory)

modified: README.md

在这个例子里面, README.md 有冲突。 打开这个文件找到类似下面的内容:

1
2
3
4
5
<<<<<<< HEAD
some code
=========
some code
>>>>>>> new-commit

你需要解决新提交的代码(示例里, 从中间==线到new-commit的地方)与HEAD 之间不一样的地方.

有时候这些合并非常复杂,你应该使用可视化的差异编辑器(visual diff editor):

1
(master*)$ git mergetool -t opendiff

在你解决完所有冲突和测试过后, git add 变化了的(changed)文件, 然后用git rebase --continue 继续 rebase。

1
2
(my-branch)$ git add README.md
(my-branch)$ git rebase --continue

如果在解决完所有的冲突过后,得到了与提交前一样的结果, 可以执行git rebase --skip

任何时候你想结束整个 rebase 过程,回来 rebase 前的分支状态, 你可以做:

1
(my-branch)$ git rebase --abort

查看信息

显示工作路径下已修改的文件:git status

显示与上次提交版本文件的不同:git diff

显示提交历史:

1
2
3
4
5
6
7
8
# 从最新提交开始,显示所有的提交记录(显示hash, 作者信息,提交的标题和时间)
$ git log

# 显示某个用户的所有提交
$ git log --author="username"

# 显示某个文件的所有修改
$ git log -p <file>

显示搜索内容:

1
2
3
4
5
# 从当前目录的所有文件中查找文本内容
$ git grep "Hello"

# 在某一版本中搜索文本
$ git grep "Hello" v2.5

其他

克隆所有子模块

1
git clone --recursive git://github.com/foo/bar.git

如果已经克隆了:

1
git submodule update --init --recursive

已删除补丁(patch)

如果某人在 GitHub 上给你发了一个 pull request, 但是然后他删除了他自己的原始 fork, 你将没法克隆他们的提交(commit)或使用 git am。在这种情况下, 最好手动的查看他们的提交(commit),并把它们拷贝到一个本地新分支,然后做提交。

做完提交后, 再修改作者,参见变更作者。 然后, 应用变化, 再发起一个新的 pull request。

跟踪文件(Tracking Files)

我只想改变一个文件名字的大小写,而不修改内容

1
(master)$ git mv --force myfile MyFile

我想从 Git 删除一个文件,但保留该文件

1
(master)$ git rm --cached log.txt

Fork 项目

GitHub 中 Fork 是 服务端的代码仓库克隆(即 新克隆出来的代码仓库在远程服务端),包含了原来的仓库(即 upstream repository,上游仓库)所有内容,如分支、Tag、提交。代码托管服务(如 Github、BitBucket)提供了方便的完成 Fork 操作的功能(在仓库页面点一下 Fork 按钮)。这样有了一个你自己的可以自由提交的远程仓库,然后可以通过的 Pull Request 把你的提交贡献回 原仓库。而对于原仓库 Owner 来说,鼓励别人 Fork 自己的仓库,通过 Pull Request 给自己的仓库做贡献,也能提高了自己仓库的知名度。

参考:Fork a repo

(1)执行 git remote -v,您将看到当前为 fork 配置的远程存储库。

(2)添加上游项目的仓库地址

1
git remote add upstream <github仓库地址>

(3)确认是否添加成功,再次键入 git remote -v

(4)获取上游项目更新,可以执行 git fetch upstream

(5)同步上游项目的代码到新仓库

1
2
3
4
# merge
git merge upstream/master
# rebase
git rebase upstream/master origin/master

我不知道我做错了些什么

你把事情搞砸了:你 重置(reset) 了一些东西, 或者你合并了错误的分支, 亦或你强推了后找不到你自己的提交(commit)了。有些时候, 你一直都做得很好, 但你想回到以前的某个状态。

这就是 git reflog 的目的, reflog 记录对分支顶端(the tip of a branch)的任何改变, 即使那个顶端没有被任何分支或标签引用。基本上, 每次 HEAD 的改变, 一条新的记录就会增加到reflog。遗憾的是,这只对本地分支起作用,且它只跟踪动作 (例如,不会跟踪一个没有被记录的文件的任何改变)。

1
2
3
4
(master)$ git reflog
0a2e358 HEAD@{0}: reset: moving to HEAD\~2
0254ea7 HEAD@{1}: checkout: moving from 2.2 to master
c10f740 HEAD@{2}: checkout: moving from master to 2.2

上面的 reflog 展示了从 master 分支签出(checkout)到 2.2 分支,然后再签回。 那里,还有一个硬重置(hard reset)到一个较旧的提交。最新的动作出现在最上面以 HEAD@{0}标识.

如果事实证明你不小心回移(move back)了提交(commit), reflog 会包含你不小心回移前 master 上指向的提交(0254ea7)。

1
git reset --hard 0254ea7

然后使用 git reset 就可以把 master 改回到之前的 commit,这提供了一个在历史被意外更改情况下的安全网。

📚 资料

一篇文章让你彻底掌握 Scala

Scala 是大数据领域的热门语言,如:Akka、Kafka,所以,想要学习大数据顶级开源项目的源码,有必要具备一定的 Scala 基础。

基本语法

Scala 基本语法需要注意以下几点:

  • 区分大小写 - Scala 是大小写敏感的。
  • 类名 - 对于所有的类名的第一个字母要大写。示例:class MyFirstScalaClass
  • 方法名称 - 所有的方法名称的第一个字母用小写。示例:def myMethodName()
  • 程序文件名 - 程序文件的名称应该与对象名称完全匹配(新版本不需要了,但建议保留这种习惯)。示例: 假设”HelloWorld”是对象的名称。那么该文件应保存为’HelloWorld.scala”
  • def main(args: Array[String]) - Scala 程序从 main() 方法开始处理,这是每一个 Scala 程序的强制程序入口部分。
  • 一行中只有空格或者带有注释,Scala 会认为其是空行,会忽略它。标记可以被空格或者注释来分割。
  • Scala 是面向行的语言,语句可以用分号(;)结束或换行符。

注释

Scala 类似 Java 支持单行和多行注释。

【示例】单行和多行注释

1
2
3
4
5
6
7
8
9
object HelloWorld {
/*
* 这是我的第一个 Scala 程序
* 以下程序将输出'Hello World!'
*/
def main(args: Array[String]) {
println("Hello, world!") // 输出 Hello World
}
}

变量

在 Scala 中,使用关键词 var 声明变量,使用关键词 val 声明常量。

【示例】声明变量

1
2
var myVar : String = "Foo"
var myVar : String = "Too"

【示例】声明常量

1
val myVal : String = "Foo"

变量类型声明

变量的类型在变量名之后等号之前声明。定义变量的类型的语法格式如下:

1
2
3
4
// 声明变量类型
var VariableName : DataType [= Initial Value]
// 声明常量类型
val VariableName : DataType [= Initial Value]

在 Scala 中声明变量和常量不一定要指明数据类型,在没有指明数据类型的情况下,其数据类型是通过变量或常量的初始值推断出来的。所以,如果在没有指明数据类型的情况下声明变量或常量必须要给出其初始值,否则将会报错。

1
2
3
var myVar = 10;
val myVal = "Hello, Scala!";
val xmax, ymax = 100

数据类型

Scala 与 Java 有着相同的数据类型:

数据类型 描述
Byte 8 位有符号补码整数。数值区间为 -128 到 127
Short 16 位有符号补码整数。数值区间为 -32768 到 32767
Int 32 位有符号补码整数。数值区间为 -2147483648 到 2147483647
Long 64 位有符号补码整数。数值区间为 -9223372036854775808 到 9223372036854775807
Float 32 位, IEEE 754 标准的单精度浮点数
Double 64 位 IEEE 754 标准的双精度浮点数
Char 16 位无符号 Unicode 字符, 区间值为 U+0000 到 U+FFFF
String 字符序列
Boolean true 或 false
Unit 表示无值,和其他语言中 void 等同。用作不返回任何结果的方法的结果类型。Unit 只有一个实例值,写成()。
Null null 或空引用
Nothing Nothing 类型在 Scala 的类层级的最底端;它是任何其他类型的子类型。
Any Any 是所有其他类的超类
AnyRef AnyRef 类是 Scala 里所有引用类(reference class)的基类

上表中列出的数据类型都是对象,也就是说 scala 没有 java 中的原生类型。在 scala 是可以对数字等基础类型调用方法的。

数组

Scala 数组声明的语法格式:

1
2
3
4
// 方式一
var z:Array[String] = new Array[String](3)
// 方式二
var z = new Array[String](3)

逻辑控制语句

条件语句

【示例】条件语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object IfDemo {
def main(args: Array[String]) {
var x = 30;

if (x == 10) {
println("X 的值为 10");
} else if (x == 20) {
println("X 的值为 20");
} else if (x == 30) {
println("X 的值为 30");
} else {
println("无法判断 X 的值");
}
}
}

循环语句

和 Java 一样,Scala 支持 whiledo ... whilefor 三种循环语句。

1
2
3
4
5
6
7
8
9
10
11
12
object WhileDemo {
def main(args: Array[String]) {
// 局部变量
var a = 10;

// while 循环执行
while (a < 20) {
println("Value of a: " + a);
a = a + 1;
}
}
}

**scala 不支持 breakcontinue**。但是,可以通过 Breaks 对象来进行循环控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import scala.util.control._

object BreakDemo {
def main(args: Array[String]) {
var a = 0;
var b = 0;
val numList1 = List(1, 2, 3, 4, 5);
val numList2 = List(11, 12, 13);

val outer = new Breaks;
val inner = new Breaks;

outer.breakable {
for (a <- numList1) {
println("Value of a: " + a);
inner.breakable {
for (b <- numList2) {
println("Value of b: " + b);
if (b == 12) {
inner.break;
}
}
} // 内嵌循环中断
}
} // 外部循环中断
}
}

模式匹配

scala 的 match 对应 Java 里的 switch,但是写在选择器表达式之后。即: 选择器 match {备选项}。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author peng.zhang
*/
object MatchDemo {
def main(args: Array[String]) {
println(matchTest("two"))
println(matchTest("test"))
println(matchTest(1))
println(matchTest(6))

}

def matchTest(x: Any): Any = x match {
case 1 => "one"
case "two" => 2
case y: Int => "scala.Int"
case _ => "many"
}
}

运算符

Scala 含有丰富的内置运算符,包括以下几种类型:

  • 算术运算符:+-*/%
  • 关系运算符:==!=><>=<=
  • 逻辑运算符:&&||!
  • 位运算符:~&|^<<>>>>> (无符号右移)
  • 赋值运算符:=

方法与函数

Scala 有方法与函数,二者在语义上的区别很小。

Scala 中的方法跟 Java 的类似,方法是组成类的一部分。

Scala 中的函数则是一个完整的对象,Scala 中的函数其实就是继承了 Trait 的类的对象。

Scala 中使用 val 语句可以定义函数,def 语句定义方法。

【示例】

1
2
3
4
class Test {
def m(x: Int) = x + 3
val f = (x: Int) => x + 3
}

闭包

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数。

如下面这段匿名的函数:

1
val multiplier = (i:Int) => i * 10

函数体内有一个变量 i,它作为函数的一个参数。如下面的另一段代码:

1
val multiplier = (i:Int) => i * factor

在 multiplier 中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier 函数被调用时,i 被赋予一个新的值。然而,factor 不是形式参数,而是自由变量,考虑下面代码:

1
2
var factor = 3
val multiplier = (i:Int) => i * factor

这里我们引入一个自由变量 factor,这个变量定义在函数外面。

这样定义的函数变量 multiplier 成为一个”闭包”,因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数。

1
2
3
4
5
6
7
8
9
object ClosureDemo {
def main(args: Array[String]) {
println("muliplier(1) value = " + multiplier(1))
println("muliplier(2) value = " + multiplier(2))
}

var factor = 3
val multiplier = (i: Int) => i * factor
}

集合

Scala 集合支持 List、Set、Map、元祖、Option。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 定义整型 List
val x = List(1,2,3,4)

// 定义 Set
val x = Set(1,3,5,7)

// 定义 Map
val x = Map("one" -> 1, "two" -> 2, "three" -> 3)

// 创建两个不同类型元素的元组
val x = (10, "Runoob")

// 定义 Option
val x:Option[Int] = Some(5)

迭代器

迭代器 it 的两个基本操作是 nexthasNext

调用 it.next() 会返回迭代器的下一个元素,并且更新迭代器的状态。

调用 it.hasNext() 用于检测集合中是否还有元素。

1
2
3
4
5
6
7
8
9
object Test {
def main(args: Array[String]) {
val it = Iterator("Baidu", "Google", "Runoob", "Taobao")

while (it.hasNext){
println(it.next())
}
}
}

类和对象

类是对象的抽象,而对象是类的具体实例。类是抽象的,不占用内存,而对象是具体的,占用存储空间。类是用于创建对象的蓝图,它是一个定义包括在特定类型的对象中的方法和变量的软件模板。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Point(val xc: Int, val yc: Int) {
var x: Int = xc
var y: Int = yc

def move(dx: Int, dy: Int) {
x = x + dx
y = y + dy
println("x 的坐标点 : " + x);
println("y 的坐标点 : " + y);
}
}

class Location(override val xc: Int, override val yc: Int, val zc: Int)
extends Point(xc, yc) {
var z: Int = zc

def move(dx: Int, dy: Int, dz: Int) {
x = x + dx
y = y + dy
z = z + dz
println("x 的坐标点 : " + x);
println("y 的坐标点 : " + y);
println("z 的坐标点 : " + z);
}
}

object Test {
def main(args: Array[String]) {
val loc = new Location(10, 20, 15);

// 移到一个新的位置
loc.move(10, 10, 5);
}
}

Trait

Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。

与接口不同的是,它还可以定义属性和方法的实现。

一般情况下 Scala 的类只能够继承单一父类,但是如果是 Trait(特征) 的话就可以继承多个,从结果来看就是实现了多重继承。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
trait Equal {
def isEqual(x: Any): Boolean

def isNotEqual(x: Any): Boolean = !isEqual(x)
}

class Point(xc: Int, yc: Int) extends Equal {
var x: Int = xc
var y: Int = yc

def isEqual(obj: Any) =
obj.isInstanceOf[Point] &&
obj.asInstanceOf[Point].x == x
}

object Test {
def main(args: Array[String]) {
val p1 = new Point(2, 3)
val p2 = new Point(2, 4)
val p3 = new Point(3, 3)

println(p1.isNotEqual(p2))
println(p1.isNotEqual(p3))
println(p1.isNotEqual(2))
}
}

异常

Scala 抛出异常的方法和 Java 一样,使用 throw 关键词。

【示例】抛出异常

1
throw new IllegalArgumentException

【示例】捕获异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.io.{FileNotFoundException, FileReader, IOException}

object Test {
def main(args: Array[String]) {
try {
val f = new FileReader("input.txt")
} catch {
case ex: FileNotFoundException => {
println("Missing file exception")
}
case ex: IOException => {
println("IO Exception")
}
} finally {
println("Exiting finally...")
}
}
}

输入输出

读取用户输入

使用 scala.io.StdIn.readLine() 方法读取用户输入

1
2
3
4
5
6
7
8
object StdInDemo {
def main(args: Array[String]) {
print("请输入内容: ")
val line = StdIn.readLine()

println("你输入的是: " + line)
}
}

读取文件内容

1
2
3
4
5
6
7
8
9
object SourceDemo {
def main(args: Array[String]) {
println("文件内容为:")

Source.fromFile("test.txt").foreach {
print
}
}
}

定义包

Scala 使用 package 关键字定义包,在 Scala 将代码定义到某个包中有两种方式:

第一种方法和 Java 一样,在文件的头定义包名,这种方法就后续所有代码都放在该包中。 比如:

1
2
package com.runoob
class HelloWorld

第二种方法有些类似 C#,如:

1
2
3
package com.runoob {
class HelloWorld
}

引用

Scala 使用 import 关键字引用包。

1
2
3
4
5
6
7
import java.awt.Color  // 引入Color

import java.awt._ // 引入包内所有成员

def handler(evt: event.ActionEvent) { // java.awt.event.ActionEvent
... // 因为引入了java.awt,所以可以省去前面的部分
}

import 语句可以出现在任何地方,而不是只能在文件顶部。import 的效果从开始延伸到语句块的结束。这可以大幅减少名称冲突的可能性。

如果想要引入包中的几个成员,可以使用 selector(选取器):

1
2
3
4
5
6
7
import java.awt.{Color, Font}

// 重命名成员
import java.util.{HashMap => JavaHashMap}

// 隐藏成员
import java.util.{HashMap => _, _} // 引入了util包的所有成员,但是HashMap被隐藏了

注意:默认情况下,Scala 总会引入 java.lang._ 、 scala._ 和 Predef._,这里也能解释,为什么以 scala 开头的包,在使用时都是省去 scala.的。

访问修饰符

Scala 访问修饰符基本和 Java 的一样,分别有:private,protected,public。

如果没有指定访问修饰符,默认情况下,Scala 对象的访问级别都是 public。

Scala 中的 private 限定符,比 Java 更严格,在嵌套类情况下,外层类甚至不能访问被嵌套类的私有成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Outer {

class Inner {
private def f() {
println("f")
}

class InnerMost {
f() // 正确
}

}

(new Inner).f() //错误
}

参考资料

Kafka 生产者

生产者简介

不管是把 Kafka 作为消息队列系统、还是数据存储平台,总是需要一个可以向 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,或者是一个兼具两种角色的应用程序。

使用 Kafka 的场景很多,诉求也各有不同,主要有:是否允许丢失消息?是否接受重复消息?是否有严格的延迟和吞吐量要求?

不同的场景对于 Kafka 生产者 API 的使用和配置会有直接的影响。

生产者传输实体

Kafka Producer 发送的数据对象叫做 ProducerRecord ,它有 4 个关键参数:

  • Topic - 主题
  • Partition - 分区(非必填)
  • Key - 键(非必填)
  • Value - 值

生产者发送流程

Kafka 生产者发送消息流程:

(1)序列化 - 发送前,生产者要先把键和值序列化。

(2)分区 - 数据被传给分区器。如果在 ProducerRecord 中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。

(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。

  • 批次,就是一组消息,这些消息属于同一个主题和分区
  • 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。

(4)响应 - 服务器收到消息会返回一个响应。

  • 如果成功,则返回一个 RecordMetaData 对象,它包含了主题、分区、偏移量;
  • 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。

img

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?

  • 生产者会向任意 broker 发送一个元数据请求(MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。
  • 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

img

生产者 API

Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。

  1. 构造生产者对象所需的参数对象。
  2. 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
  3. 使用 KafkaProducersend 方法发送消息。
  4. 调用 KafkaProducerclose 方法关闭生产者并释放各种系统资源。

创建生产者

Kafka 生产者核心配置:

  • bootstrap.servers - 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。
  • key.serializer - 键的序列化器。
  • value.serializer - 值的序列化器。
1
2
3
4
5
6
7
8
9
10
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);

异步发送

直接发送消息,不关心消息是否到达。

这种方式吞吐量最高,但有小概率会丢失消息。

【示例】异步发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

同步发送

返回一个 Future 对象,调用 get() 方法,会一直阻塞等待 Broker 返回结果。

这是一种可靠传输方式,但吞吐量最差。

【示例】同步发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}

异步响应发送

代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出异常、记录错误日志。

这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。

【示例】异步响应发送

首先,定义一个 callback:

1
2
3
4
5
6
7
8
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}

然后,使用这个 callback:

1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

关闭连接

调用 producer.close() 方法可以关闭 Kafka 生产者连接。

1
2
3
4
5
6
7
8
9
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topic, msg));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
producer.close();
}

生产者的连接

Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。

选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。

何时创建 TCP 连接

Kafka 生产者创建连接有三个时机:

(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。

(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。

  • 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
  • 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。

何时关闭 TCP 连接

Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭

主动关闭是指调用 producer.close() 方法来关闭生产者连接;甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。

如果设置 Producer 端 connections.max.idle.ms 参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms 指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1,TCP 连接将成为永久长连接。

值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

序列化

Kafka 内置了常用 Java 基础类型的序列化器,如:StringSerializerIntegerSerializerDoubleSerializer 等。

但如果要传输较为复杂的对象,推荐使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。

使用方式是通过实现 org.apache.kafka.common.serialization.Serializer 接口来引入自定义的序列化器。

分区

什么是分区

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

img

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。

为什么要分区

为什么 Kafka 的数据结构采用三级结构?

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。

前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。

如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。

  • 没有 key 时的分发逻辑:每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。
  • 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错

自定义分区策略

如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?

首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partitionclose,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

1
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的 topickeykeyBytesvaluevalueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。

接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。

负载均衡算法常见的有:

  • 随机算法
  • 轮询算法
  • 最小活跃数算法
  • 源地址哈希算法

可以根据实际需要去实现。

压缩

Kafka 的消息格式

目前,Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。

不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

那么社区引入 V2 版本的目的是什么呢?V2 版本主要是针对 V1 版本的一些弊端做了修正。

在 V1 版本中,每条消息都需要执行 CRC 校验。但有些情况下消息的 CRC 值是会发生变化的。比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC 值也会相应更新;再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化。鉴于这些情况,再对每条消息都执行 CRC 校验就有点没必要了,不仅浪费空间还耽误 CPU 时间。

因此,在 V2 版本中,只对消息集合执行 CRC 校验。V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。

Kafka 的压缩流程

Kafka 的压缩流程,一言以概之——Producer 端压缩、Broker 端保持、Consumer 端解压缩。

压缩过程

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。

【示例】开启 GZIP 的 Producer 对象

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");

Producer<String, String> producer = new KafkaProducer<>(props);

通常,Broker 从 Producer 端接收到消息后,不做任何处理。以下两种情况除外:

  • 情况一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种情况。

  • 情况二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。

所谓零拷贝,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。

解压缩的过程

通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。

压缩算法

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。

在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗

何时启用压缩

何时启用压缩是比较合适的时机呢?

压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。

如果环境中带宽资源有限,那么也建议开启压缩。

幂等性

什么是幂等性

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

Kafka Producer 的幂等性

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

我们必须要了解幂等性 Producer 的作用范围:

  • 首先,**enable.idempotence 只能保证单分区上的幂等性**,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
  • 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

PID 和 Sequence Number

为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。

  • PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
  • Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个 <Topic, Partition> 都对应一个从 0 开始单调递增的 Sequence Number。

Broker 端在缓存中保存了这 seq number,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。

img
实现幂等之后:

img

生成 PID 的流程

在执行创建事务时,如下:

1
Producer<String, String> producer = new KafkaProducer<String, String>(props);

会创建一个 Sender,并启动线程,执行如下 run 方法,在 maybeWaitForProducerId()中生成一个 producerId,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
====================================
类名:Sender
====================================

void run(long now) {
if (transactionManager != null) {
try {
........
if (!transactionManager.isTransactional()) {
// 为idempotent producer生成一个producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
........

幂等性的应用实例

(1)配置属性

需要设置:

  • enable.idempotence,需要设置为 ture,此时就会默认把 acks 设置为 all,所以不需要再设置 acks 属性了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 开启幂等性
properties.put("enable.idempotence", true);
// 设置重试次数
properties.put("retries", 3);
//Reduce the no of requests less than 0
properties.put("linger.ms", 1);
// buffer.memory 控制生产者可用于缓冲的内存总量
properties.put("buffer.memory", 33554432);

// 使用配置初始化 Kafka 生产者
producer = new KafkaProducer<>(properties);

(2)发送消息

跟一般生产者一样,如下

1
2
3
4
5
6
7
public void produceIdempotMessage(String topic, String message) {
// 创建Producer
Producer producer = buildIdempotProducer();
// 发送消息
producer.send(new ProducerRecord<String, String>(topic, message));
producer.flush();
}

此时,因为我们并没有配置 transaction.id 属性,所以不能使用事务相关 API,如下

1
producer.initTransactions();

否则会出现如下错误:

1
2
3
4
Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.
at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)

Kafka 事务

Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败

消息可靠性保障,由低到高为:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。

事务

Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

事务属性实现前提是幂等性,即在配置事务属性 transaction.id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

在事务属性之前先引入了生产者幂等性,它的作用为:

  • 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败。
  • consumer-transform-producer 模式下,因为消费者提交偏移量出现问题,导致重复消费。需要将这个模式下消费者提交偏移量操作和生产者一系列生成消息的操作封装成一个原子操作。

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息(o1 到 o2 之间的消息)。

事务操作的 API

Producer 提供了 initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五个事务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();

/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;

/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;

/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;

/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;

Kafka 事务相关配置

使用 kafka 的事务 api 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行 consumer#commitSync 或者 consumer#commitAsyc
  • 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。
  • 和幂等性 Producer 一样,开启 enable.idempotence = true。如果配置了 transaction.id,则此时 enable.idempotence 会被设置为 true
  • 消费者需要配置事务隔离级别 isolation.level。在 consume-trnasform-produce 模式下使用事务时,必须设置为 READ_COMMITTED
    • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

Kafka 事务应用示例

只有生成操作

创建一个事务,在这个事务操作中,只有生成消息操作。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 在一个事务只有生产消息操作
*/
public void onlyProduceInTransaction() {
Producer producer = buildProducer();

// 1.初始化事务
producer.initTransactions();

// 2.开启事务
producer.beginTransaction();

try {
// 3.kafka写操作集合
// 3.1 do业务逻辑

// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));

producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。

// 4.事务提交
producer.commitTransaction();


} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}

}

创建生产者,代码如下,需要:

  • 配置 transactional.id 属性
  • 配置 enable.idempotence 属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 需要:
* 1、设置transactional.id
* 2、设置enable.idempotence
* @return
*/
private Producer buildProducer() {

// create instance for properties to access producer configs
Properties props = new Properties();

// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");

// 设置事务id
props.put("transactional.id", "first-transactional");

// 设置幂等性
props.put("enable.idempotence",true);

//Set acknowledgements for producer requests.
props.put("acks", "all");

//If the request fails, the producer can automatically retry,
props.put("retries", 1);

//Specify buffer size in config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
//props.put("batch.size", 16384);

//Reduce the no of requests less than 0
props.put("linger.ms", 1);

//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);

// Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");


Producer<String, String> producer = new KafkaProducer<String, String>(props);

return producer;
}

消费-生产并存(consume-transform-produce)

在一个事务中,既有生产消息操作又有消费消息操作,即常说的 Consume-tansform-produce 模式。如下实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();

// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();

// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);

try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));


// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}

// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");

// 8.事务提交
producer.commitTransaction();

} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}

创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭
  • 而且在代码里面也不能使用手动提交 commitSync( )或者 commitAsync( )
  • 设置 isolation.level
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 需要:
* 1、关闭自动提交 enable.auto.commit
* 2、isolation.level为
* @return
*/
public Consumer buildConsumer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "group0323");
// 设置隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);

return consumer;
}

只有消费操作

创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量操作在一个事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 在一个事务只有消息操作
*/
public void onlyConsumeInTransaction() {
Producer producer = buildProducer();

// 1.初始化事务
producer.initTransactions();

// 2.开启事务
producer.beginTransaction();

// 3.kafka读消息的操作集合
Consumer consumer = buildConsumer();
while (true) {
// 3.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);

try {
// 3.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 3.2.1 处理消息 print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 3.2.2 记录提交偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}

// 4.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");

// 5.事务提交
producer.commitTransaction();

} catch (Exception e) {
// 6.放弃事务
producer.abortTransaction();
}
}

}

生产者的配置

更详尽的生产者配置可以参考:Kafka 生产者官方配置说明

以下为生产者主要配置参数清单:

  • acks:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为 acks=1
    • acks=0 如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
    • acks=1 如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
    • acks=all 如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。
  • buffer.memory:用来设置 Producer 缓冲区大小。
  • compression.type:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:nonegzipsnappylz4zstd。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。
  • retries:用来设置发送失败的重试次数。
  • batch.size:用来设置一个批次可占用的内存大小。
  • linger.ms:用来设置 Producer 在发送批次前的等待时间。
  • client.id:Kafka 服务器用它来识别消息源,可以是任意字符串。
  • max.in.flight.requests.per.connection:用来设置 Producer 在收到服务器响应前可以发送多少个消息。
  • timeout.ms:用来设置 Broker 等待同步副本返回消息确认的时间,与 acks 的配置相匹配。
  • request.timeout.ms:Producer 在发送数据时等待服务器返回响应的时间。
  • metadata.fetch.timeout.ms:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。
  • max.block.ms:该配置控制 KafkaProducer.send()KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。
  • max.request.size:请求的最大字节数。
  • receieve.buffer.bytes:TCP 接收缓冲区的大小。
  • send.buffer.bytes:TCP 发送缓冲区的大小。

参考资料

Kafka 消费者

消费者简介

pull 模式

消息引擎获取消息有两种模式:

  • push 模式:MQ 推送数据给消费者
  • pull 模式:消费者主动向 MQ 请求数据

img

Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。

push 模式的优缺点:

  • 缺点:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。

push 模式的优缺点:

  • 优点:consumer 可以根据自己的消费能力自主的决定消费策略
  • 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达

消费者

每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。

一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:

img

消费者群组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。

Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer

如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费

img

不同消费者群组之间互不影响

img

消费流程

Kafka 消费者通过 poll 来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:

  • 消费者通过 customer.poll(time) 中设置等待时间
  • Broker 会等待累计一定量数据,然后发送给消费者。这样可以减少网络开销。

img

poll 除了获取消息外,还有其他作用:

  • 发送心跳信息。消费者通过向被指派为群组协调器的 Broker 发送心跳来维护他和群组的从属关系,当机器宕掉后,群组协调器触发再均衡。

消费者 API

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
Properties props = new Properties();
// 服务器地址
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
// 关闭自动提交偏移量
props.put("enable.auto.commit", "false");
// 设置 key 反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置 value 反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

订阅主题

1
2
3
4
// 订阅主题列表
consumer.subscribe(Arrays.asList("t1", "t2"));
// 订阅所有与 test 相关的主题
consumer.subscribe("test.*");

subscribe 方法允许传入一个正则表达式,这样就可以匹配多个主题。如果有人创建了新的主题,并且主题名恰好匹配正则表达式,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题。

轮询获取消息

消息轮询是消费者 API 的核心。一旦消费者订阅了主题,轮询就会处理所有细节,包括:群组协调、分区再均衡、发送心跳和获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
// 3. 轮询
while (true) {
// 4. 消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
} finally {
// 5. 退出程序前,关闭消费者
consumer.close();
}

手动提交偏移量

(1)同步提交

使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}

同步提交的缺点:同步提交方式会一直阻塞,直到接收到 Broker 的响应请求,这会大大限制吞吐量

(2)异步提交

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息

1
2
3
4
5
6
7
8
9
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}

commitAsync() 也支持回调,在 Broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不过如果要用它来进行重试,则一定要注意提交的顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) { log.error("Commit failed for offsets {}", offsets, e); }
}
});
}

重试异步提交

可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试;如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

(3)同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功

因此,在消费者关闭前一般会组合使用 commitSync()commitAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}

(4)提交特定的偏移量

提交偏移量的频率和处理消息批次的频率是一样的。如果想要更频繁地提交该怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync()commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

解决办法是:消费者 API 允许在调用 commitSync()commitAsync() 方法时传进去希望提交的分区和偏移量的 map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private int count = 0;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();


while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());

currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 1000 == 0) { consumer.commitAsync(currentOffsets, null); }
count++;
}
}

(5)从特定偏移量处开始处理

使用 poll() 方法可以从各个分区的最新偏移量处开始处理消息。

不过,有时候,我们可能需要从特定偏移量处开始处理消息。

  • 从分区的起始位置开始读消息:seekToBeginning(Collection<TopicPartition> partitions) 方法
  • 从分区的末尾位置开始读消息:seekToEnd(Collection<TopicPartition> partitions) 方法
  • 查找偏移量:seek(TopicPartition partition, long offset) 方法

通过 seek(TopicPartition partition, long offset) 可以实现处理消息和提交偏移量在一个事务中完成。思路就是需要在客户端建立一张数据表,保证处理消息和和消息偏移量位置写入到这张数据表。在一个事务中,此时就可以保证处理消息和记录偏移量要么同时成功,要么同时失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
consumer.subscribe(topic);
// 1.第一次调用pool,加入消费者群组
consumer.poll(0);
// 2.获取负责的分区,并从本地数据库读取改分区最新偏移量,并通过seek方法修改poll获取消息的位置
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(),
record.offset());
}
commitDBTransaction();
}

关闭连接

如果想让消费者从轮询消费消息的无限循环中退出,可以通过另一个线程调用 consumer.wakeup() 方法consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll() ,并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

...

try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() +
"-- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" +
consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}

分区再均衡

什么是分区再均衡

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)Rebalance 实现了消费者群组的高可用性和伸缩性

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

当在群组里面 新增/移除消费者 或者 新增/移除 kafka 集群 broker 节点 时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。

何时生分区再均衡

分区再均衡的触发时机有三种:

  • 消费者群组成员数发生变更。比如有新的 Consumer 加入群组或者离开群组,或者是有 Consumer 实例崩溃被“踢出”群组。
    • 新增消费者。consumer 订阅主题之后,第一次执行 poll 方法
    • 移除消费者。执行 consumer.close() 操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
    • 新增 broker。如重启 broker 节点
    • 移除 broker。如 kill 掉 broker 节点。

分区再均衡的过程

Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的

(1)选择群主

当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。

img

(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。

  • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
  • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。

(4)群主分配完成之后,把分配情况发送给群组协调器。

(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

如何判定消费者已经死亡

消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。

当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。

查找协调者

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

  2. 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

分区再均衡的问题

  • 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  • 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • 最后,Rebalance 实在是太慢了。

避免分区再均衡

通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。

分区再均衡发生的时机有三个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:组成员数量发生变化。

有两种情况,消费者并没有宕机,但也被视为消亡:

  • 未及时发送心跳
  • Consumer 消费时间过长

未及时发送心跳

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置session.timeout.ms 和 heartbeat.interval.ms的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

Consumer 消费时间过长

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer 消费数据时需要将消息处理之后写入到 MongoDB。显然,这是一个很重的消费逻辑。MongoDB 的一丁点不稳定都会导致 Consumer 程序消费时长的增加。此时,**max.poll.interval.ms** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。

GC 参数

如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。

提交偏移量

每次调用 poll() 方法,它总是会返回由生产者写入 Kafka 但还没有被消费者读取过的记录,Kafka 因此可以追踪哪些记录是被哪个群组的哪个消费者读取的。

更新分区当前位置的操作叫作提交

偏移量的用处

如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

(1)如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理

img

(2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

img

由此可知,处理偏移量,会对客户端处理数据产生影响。

提交偏移量的旧方案

老版本的 Consumer Group 把偏移量保存在 ZooKeeper 中。ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将偏移量保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销,有利于实现伸缩性。

这种方案的问题在于:ZooKeeper 其实并不适合进行高频的写操作,而 Consumer Group 的偏移量更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 偏移量保存在 ZooKeeper 中是不合适的做法。

提交偏移量的新方案

新版本 Consumer 的偏移量管理机制其实也很简单。

消费者向一个叫做 _consumer_offsets 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

**_consumer_offsets 主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >**。

通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建偏移量主题。偏移量主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。如果偏移量主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。分区数可以通过 offsets.topic.num.partitions 设置;副本数可以通过 offsets.topic.replication.factor 设置。

自动提交

自动提交是 Kafka 处理偏移量最简单的方式。

enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s

与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到 5s 的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题

手动提交

自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。因此,可以通过手动提交偏移量,由开发者自行控制。

首先,enable.auto.commit 设为 false,关闭自动提交

如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。

  • public void onPartitionsRevoked(Collection partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  • public void onPartitionsAssigned(Collection partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
}

public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}

try {
consumer.subscribe(topics, new HandleRebalance());

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// 忽略异常,正在关闭消费者
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}

反序列化器

生产者需要用序列化器将 Java 对象转换成字节数组再发送给 Kafka;同理,消费者需要用反序列化器将从 Kafka 接收到的字节数组转换成 Java 对象。

独立消费者

通常,会有多个 Kafka 消费者组成群组,关注一个主题。

但可能存在这样的场景:只需要一个消费者从一个主题的所有分区或某个特定的分区读取数据。这时,就不需要消费者群组和再均衡了,只需要把主题或分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或为自己分配分区,但不能同时做这两件事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);

for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitSync();
}
}

消费者的配置

  • bootstrap.servers - Broker 集群地址,格式:ip1:port,ip2:port…,不需要设定全部的集群地址,设置两个或者两个以上即可。
  • group.id - 消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。
  • fetch.min.bytes - 消费者获取记录的最小字节数。Kafka 会等到有足够的数据时才返回消息给消费者,以降低负载。
  • fetch.max.wait.ms - Kafka 需要等待足够的数据才返回给消费者,如果一直没有足够的数据,消费者就会迟迟收不到消息。所以需要指定 Broker 的等待延迟,一旦超时,直接返回数据给消费者。
  • max.partition.fetch.bytes - 指定了服务器从每个分区返回给消费者的最大字节数。默认为 1 MB。
  • session.timeout.ms - 指定了消费者的心跳超时时间。如果消费者没有在有效时间内发送心跳给群组协调器,协调器会视消费者已经消亡,从而触发分区再均衡。默认为 3 秒。
  • auto.offset.reset - 指定了消费者在读取一个没有偏移量的分区或偏移量无效的情况下,该如何处理。
    • latest - 表示在偏移量无效时,消费者将从最新的记录开始读取分区记录。
    • earliest - 表示在偏移量无效时,消费者将从起始位置读取分区记录。
  • enable.auto.commit - 指定了是否自动提交消息偏移量,默认开启。
  • partition.assignment.strategy - 消费者的分区分配策略。
    • Range - 表示会将主题的若干个连续的分区分配给消费者。
    • RoundRobin - 表示会将主题的所有分区按照轮询方式分配给消费者。
  • client.id - 客户端标识。
  • max.poll.records - 用于控制单次能获取到的记录数量。
  • receive.buffer.bytes - 用于设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 64KB。如果设置为-1,则使用操作系统的默认值。
  • send.buffer.bytes - 用于设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。

参考资料

Kafka 可靠传输

消息不丢失

如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。

一条消息从生产到消费,可以划分三个阶段:

img

  • 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
  • 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
  • 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。

这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。

存储阶段

存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

上面的话可以解读为:

  • 已提交只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
  • 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
  • 只要还有一个副本是存活的,那么已提交的消息就不会丢失
  • 消费者只能读取已提交的消息

副本机制

Kafka 的副本机制是 kafka 可靠性保证的核心

Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。

Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

副本数

replication.factor 的作用是设置每个分区的副本数replication.factor 是主题级别配置; default.replication.factor 是 broker 级别配置。

副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。

不完全的选主

unclean.leader.election.enable 用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable 是 broker 级别(实际上是集群范围内)配置,默认值为 true。

  • 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险
  • 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。

最少同步副本

min.insync.replicas 控制的是消息至少要被写入到多少个副本才算是“已提交”min.insync.replicas 是主题级别和 broker 级别配置。

尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。

如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。

注意:要确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

生产阶段

在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。

Kafka 生产者 中提到了,Kafka 有三种发送方式:同步、异步、异步回调。

同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。

解决生产者丢失消息的方案:

生产者使用异步回调方式 producer.send(msg, callback) 发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

  • 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
  • 如果是消息不合格造成的,那么可以调整消息格式后再次发送。

然后,需要基于以下几点来保证 Kafka 生产者的可靠性:

ACK

生产者可选的确认模式有三种:acks=0acks=1acks=all

  • acks=0acks=1 都有丢失数据的风险。

  • acks=all 意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas ,就可以决定在得到确认响应前,至少有多少副本能够收到消息。

这是最保险的做法,但也会降低吞吐量。

重试

如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。

  • 可重试错误,如:LEADER_NOT_AVAILABLE,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。
  • 不可重试错误,如:INVALID_CONFIG,即使重试,也无法改变配置选项,重试没有意义。

需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。

设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

错误处理

开发者需要自行处理的错误:

  • 不可重试的 broker 错误,如消息大小错误、认证错误等;
  • 消息发送前发生的错误,如序列化错误;
  • 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。

消费阶段

前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。

消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。

img

消费者的可靠性配置

  • group.id - 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
  • auto.offset.reset - 有两个选项:
    • earliest - 消费者会从分区的开始位置读取数据
    • latest - 消费者会从分区末尾位置读取数据
  • enable.auto.commit - 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。
  • auto.commit.interval.ms - 自动提交的频率,默认为每 5 秒提交一次。

显示提交偏移量

如果 enable.auto.commit 设为 true,即自动提交,就无需考虑提交偏移量的问题。

如果选择显示提交偏移量,需要考虑以下问题:

  • 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
  • 提交频率是性能和重复消息数之间的权衡
  • 分区再均衡
  • 消费可能需要重试机制
  • 超时处理
  • 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
  • 幂等性设计
    • 写数据库:根据主键判断记录是否存在
    • 写 Redis:set 操作天然具有幂等性
    • 复杂的逻辑处理,则可以在消息中加入全局 ID

重复消息

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性

常用的实现幂等操作的方法:

利用数据库的唯一约束实现幂等

关系型数据库可以使用 INSERT IF NOT EXIST 语句防止重复;Redis 可以使用 SETNX 命令来防止重复;其他数据库只要支持类似语义,也是一个道理。

为更新的数据设置前置条件

如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

记录并检查操作

还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。

消息的有序性

某些场景下,可能会要求按序发送消息。

方案一、单 Partition

Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。

方案分析

优点:简单粗暴。开发者什么也不用做。

缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。

结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。

方案二、同一个 key 的消息发送给指定 Partition

(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。

(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。

消息积压

先修复消费者,然后停掉当前所有消费者。

新建 Topic,扩大分区,以提高并发处理能力。

创建临时消费者程序,并部署在多节点上,扩大消费处理能力。

最后处理完积压消息后,恢复原先部署架构。

验证系统可靠性

建议从 3 个层面验证系统的可靠性:

  • 配置验证
  • 应用验证
    • 客户端和服务器断开连接
    • 选举
    • 依次重启 broker
    • 依次重启生产者
    • 依次重启消费者
  • 监控可靠性
    • 对于生产者来说,最重要的两个指标是消息的 error-rateretry-rate。如果这两个指标上升,说明系统出了问题。
    • 对于消费者来说,最重要的指标是 consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。

最佳实践

生产者

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = allacks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

服务器(Kafka Broker)

  1. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  2. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  3. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  4. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

消费者

  1. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

参考资料

ShardingSphere Jdbc

简介

shardingsphere-jdbc 定位为轻量级 Java 框架,在 Java 的 JDBC 层提供的额外服务。 它使用客户端直连数据库,以 jar 包形式提供服务,无需额外部署和依赖,可理解为增强版的 JDBC 驱动,完全兼容 JDBC 和各种 ORM 框架。

  • 适用于任何基于 JDBC 的 ORM 框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template 或直接使用 JDBC。
  • 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP 等。
  • 支持任意实现 JDBC 规范的数据库,目前支持 MySQL,Oracle,SQLServer,PostgreSQL 以及任何遵循 SQL92 标准的数据库。

img

快速入门

引入 maven 依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>${latest.release.version}</version>
</dependency>

注意:请将 ${latest.release.version} 更改为实际的版本号。

规则配置

ShardingSphere-JDBC 可以通过 JavaYAMLSpring 命名空间Spring Boot Starter 这 4 种方式进行配置,开发者可根据场景选择适合的配置方式。 详情请参见配置手册

创建数据源

通过 ShardingSphereDataSourceFactory 工厂和规则配置对象获取 ShardingSphereDataSource。 该对象实现自 JDBC 的标准 DataSource 接口,可用于原生 JDBC 开发,或使用 JPA, MyBatis 等 ORM 类库。

1
DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, configurations, properties);

概念和功能

单一数据节点难于满足互联网的海量数据场景。

从性能方面来说,由于关系型数据库大多采用 B+ 树类型的索引,在数据量超过阈值的情况下,索引深度的增加也将使得磁盘访问的 IO 次数增加,进而导致查询性能的下降;同时,高并发访问请求也使得集中式数据库成为系统的最大瓶颈。

在传统的关系型数据库无法满足互联网场景需要的情况下,将数据存储至原生支持分布式的 NoSQL 的尝试越来越多。 但 NoSQL 对 SQL 的不兼容性以及生态圈的不完善,使得它们在与关系型数据库的博弈中始终无法完成致命一击,而关系型数据库的地位却依然不可撼动。

数据分片按照某个维度将存放在单一数据库中的数据分散地存放至多个数据库或表中以达到提升性能瓶颈以及可用性的效果。数据分片的有效手段是对关系型数据库进行分库和分表。分库和分表均可以有效的避免由数据量超过可承受阈值而产生的查询瓶颈。 除此之外,分库还能够用于有效的分散对数据库单点的访问量;分表虽然无法缓解数据库压力,但却能够提供尽量将分布式事务转化为本地事务的可能,一旦涉及到跨库的更新操作,分布式事务往往会使问题变得复杂。 使用多主多从的分片方式,可以有效的避免数据单点,从而提升数据架构的可用性。

通过分库和分表进行数据的拆分来使得各个表的数据量保持在阈值以下,以及对流量进行疏导应对高访问量,是应对高并发和海量数据系统的有效手段。 数据分片的拆分方式又分为垂直分片和水平分片。

垂直分片

按照业务拆分的方式称为垂直分片,又称为纵向拆分,它的核心理念是专库专用。 在拆分之前,一个数据库由多个数据表构成,每个表对应着不同的业务。而拆分之后,则是按照业务将表进行归类,分布到不同的数据库中,从而将压力分散至不同的数据库。 下图展示了根据业务需要,将用户表和订单表垂直分片到不同的数据库的方案。

垂直分片

垂直分片往往需要对架构和设计进行调整。通常来讲,是来不及应对互联网业务需求快速变化的;而且,它也并无法真正的解决单点瓶颈。 垂直拆分可以缓解数据量和访问量带来的问题,但无法根治。如果垂直拆分之后,表中的数据量依然超过单节点所能承载的阈值,则需要水平分片来进一步处理。

水平分片

水平分片又称为横向拆分。 相对于垂直分片,它不再将数据根据业务逻辑分类,而是通过某个字段(或某几个字段),根据某种规则将数据分散至多个库或表中,每个分片仅包含数据的一部分。 例如:根据主键分片,偶数主键的记录放入 0 库(或表),奇数主键的记录放入 1 库(或表),如下图所示。

水平分片

水平分片从理论上突破了单机数据量处理的瓶颈,并且扩展相对自由,是分库分表的标准解决方案。

数据分片带来的问题

  • 数据路由:需要知道数据需要从哪个具体的数据库的分表中获取。
  • SQL 不兼容:分表导致表名称的修改,或者分页、排序、聚合、分组等操作的不正确处理。
  • 跨库事务:合理采用分表,可以在降低单表数据量的情况下,尽量使用本地事务,善于使用同库不同表可有效避免分布式事务带来的麻烦。 在不能避免跨库事务的场景,有些业务仍然需要保持事务的一致性。 而基于 XA 的分布式事务由于在并发度高的场景中性能无法满足需要,并未被互联网巨头大规模使用,他们大多采用最终一致性的柔性事务代替强一致事务。

ShardingSphere 内核剖析

ShardingSphere 的 3 个产品的数据分片主要流程是完全一致的。 核心由 SQL 解析 => 执行器优化 => SQL 路由 => SQL 改写 => SQL 执行 => 结果归并的流程组成。

img

  • QL 解析:分为词法解析和语法解析。 先通过词法解析器将 SQL 拆分为一个个不可再分的单词。再使用语法解析器对 SQL 进行理解,并最终提炼出解析上下文。 解析上下文包括表、选择项、排序项、分组项、聚合函数、分页信息、查询条件以及可能需要修改的占位符的标记。
  • 执行器优化:合并和优化分片条件,如 OR 等。
  • SQL 路由:根据解析上下文匹配用户配置的分片策略,并生成路由路径。目前支持分片路由和广播路由。
  • SQL 改写:将 SQL 改写为在真实数据库中可以正确执行的语句。SQL 改写分为正确性改写和优化改写。
  • SQL 执行:通过多线程执行器异步执行。
  • 结果归并:将多个执行结果集归并以便于通过统一的 JDBC 接口输出。结果归并包括流式归并、内存归并和使用装饰模式的追加归并这几种方式。

解析引擎

抽象语法树

解析过程分为词法解析语法解析。 词法解析器用于将 SQL 拆解为不可再分的原子符号,称为 Token。并根据不同数据库方言所提供的字典,将其归类为关键字,表达式,字面量和操作符。 再使用语法解析器将 SQL 转换为抽象语法树。

例如,以下 SQL:

1
SELECT id, name FROM t_user WHERE status = 'ACTIVE' AND age > 18

解析之后的为抽象语法树见下图。

SQL抽象语法树

为了便于理解,抽象语法树中的关键字的 Token 用绿色表示,变量的 Token 用红色表示,灰色表示需要进一步拆分。

最后,通过对抽象语法树的遍历去提炼分片所需的上下文,并标记有可能需要改写的位置。 供分片使用的解析上下文包含查询选择项(Select Items)、表信息(Table)、分片条件(Sharding Condition)、自增主键信息(Auto increment Primary Key)、排序信息(Order By)、分组信息(Group By)以及分页信息(Limit、Rownum、Top)。 SQL 的一次解析过程是不可逆的,一个个 Token 按 SQL 原本的顺序依次进行解析,性能很高。 考虑到各种数据库 SQL 方言的异同,在解析模块提供了各类数据库的 SQL 方言字典。

SQL 解析引擎

SQL 解析作为分库分表类产品的核心,其性能和兼容性是最重要的衡量指标。 ShardingSphere 的 SQL 解析器经历了 3 代产品的更新迭代。

第一代 SQL 解析器为了追求性能与快速实现,在 1.4.x 之前的版本使用 Druid 作为 SQL 解析器。经实际测试,它的性能远超其它解析器。

第二代 SQL 解析器从 1.5.x 版本开始,ShardingSphere 采用完全自研的 SQL 解析引擎。 由于目的不同,ShardingSphere 并不需要将 SQL 转为一颗完全的抽象语法树,也无需通过访问器模式进行二次遍历。它采用对 SQL 半理解的方式,仅提炼数据分片需要关注的上下文,因此 SQL 解析的性能和兼容性得到了进一步的提高。

第三代 SQL 解析器则从 3.0.x 版本开始,ShardingSphere 尝试使用 ANTLR 作为 SQL 解析的引擎,并计划根据 DDL -> TCL -> DAL –> DCL -> DML –>DQL 这个顺序,依次替换原有的解析引擎,目前仍处于替换迭代中。 使用 ANTLR 的原因是希望 ShardingSphere 的解析引擎能够更好的对 SQL 进行兼容。对于复杂的表达式、递归、子查询等语句,虽然 ShardingSphere 的分片核心并不关注,但是会影响对于 SQL 理解的友好度。 经过实例测试,ANTLR 解析 SQL 的性能比自研的 SQL 解析引擎慢 3-10 倍左右。为了弥补这一差距,ShardingSphere 将使用 PreparedStatement 的 SQL 解析的语法树放入缓存。 因此建议采用 PreparedStatement 这种 SQL 预编译的方式提升性能。

第三代 SQL 解析引擎的整体结构划分如下图所示。

解析引擎结构

路由引擎

改写引擎

执行引擎

归并引擎