SpringBoot 之安全快速入门
SpringBoot 之安全快速入门
QuickStart
(1)添加依赖
1 | <dependency> |
(2)添加配置
1 | spring.security.user.name = root |
(3)启动应用后,访问任意路径,都会出现以下页面,提示你先执行登录操作。输入配置的用户名、密码(root/root)即可访问应用页面。
(1)添加依赖
1 | <dependency> |
(2)添加配置
1 | spring.security.user.name = root |
(3)启动应用后,访问任意路径,都会出现以下页面,提示你先执行登录操作。输入配置的用户名、密码(root/root)即可访问应用页面。
本文目标是设计一个用于提高开发人员开发效率的低代码平台,这里会采用系统解决方案设计的一般思路来逐步探寻设计方案。
广义上的低代码平台包括低代码平台和零代码平台,它们都属于 APaaS(Application Platform as a Service 应用平台即服务),两者的主要区别在于对代码的依赖程度:
该方案将传统的集成开发环境(IDE)充分可视化,开发者对前端界面组件、数据源绑定方式、数据模型、业务逻辑和工作流等都可以自由定义,平台将自动生成代码,开发者也可以添加自己的代码,对程序具有较强的控制能力,因此该方案具备更高的灵活性,可以设计出定制化程度高、逻辑复杂的软件。
由于该方案仍涉及代码开发、部署等技术工作,所以它仍然是一个技术开发平台,需要较高的学习成本,主要价值是提高开发效率,减少重复劳动。
Outsystems 就是采用该方案的典型产品,如下为产品截图:
用户通过可视化方式构建数据模型、视图、权限、工作流等,即可在平台提供的环境中运行,无需编译部署,更像一种傻瓜式的应用搭建平台。平台对各类组件、业务逻辑做了较高层级的封装,因此用户无法随心所欲修改界面风格、交互方式、处理逻辑等。
该方案可以实现完全零代码,对使用者技术要求不高,但需要具备业务抽象、建模能力。主要价值是降低开发门槛、快速适应变化。
明道云、伙伴云等都是此类方案的典型产品,如下为明道云的产品截图:
绝大部分的企业软件由以下四个部分组成:
低代码平台将以上进行抽象,支持数据模型、业务流程、用户权限、统计图表,因此可以作为更通用的企业软件解决方案,这四类能力也是任何一个低代码平台都必须具备的核心要素。
建立数据模型就是提取业务实体的数据特征,抽象为数据表,建立表间关系。制作 ER 图的过程就是数据建模。市面上常见的低代码平台均提供了丰富的控件,可以拖拽完成数据模型搭建。此外,数据模型搭建与表单展示合二为一,每完成一个数据表的创建,就自动生成了该表的增删改查功能及相关页面,进而隐藏了数据库设计、前端开发这些专业技术。其实,这也就是我们常说的表单引擎。
这里顺便提一下,虽然很多低代码平台将数据建模与表单展示合二为一,但通过这种方式自动生成的表单只能实现最基础的增删改查页面,用户对界面展示内容及形式的控制程度很低,无法满足大部分企业软件的需求,所以低代码平台一般还会提供自定义页面功能,用户可根据需要在页面上配置按钮、图表等元素,满足个性化需求。
业务流程指为了实现某项目标,由多人合作,按照一定的规则、顺序进行的一系列活动,在软件中,业务流程的参与者可以是人,也可以是程序。低代码平台实现了可视化流程配置,用户对触发条件、处理节点、节点参与者进行配置,实现自定义业务流程。
大部分的低代码平台都采用了非常经典的 RBAC(Role-Based Access Control )模型管理用户权限,简单来说就是将拥有相同权限的用户添加为相同角色,通过为角色分配权限,实现了“用户——角色——权限”的授权模式。由于企业是一个组织,一般都会有部门的概念,所以也可以将部门添加到某个角色,实现“用户——部门——角色——权限”的授权模式。
统计图表可以类比 Excel 中的透视图,统计图表由数据源、统计规则、展示形式定义,低代码平台也正是遵循这种方式,实现统计图表的可视化配置。
OutSystems 是快速应用开发的头号低代码平台,并且是 2018 年 Gartner 高生产力平台的领导者。OutSystems 号称将低代码功能与高级移动功能相结合的唯一解决方案,它支持整个应用程序组合的可视化开发,可轻松与现有系统集成。
Mendix 帮助企业改善创新方式。通过使用可视化模型,在 Mendix 上构建应用程序非常简单,快速且直观,可使开发人员和业务分析人员等众多人员构建强大的应用程序,而无需编写代码。借助模型驱动开发,业务领导者和 IT 部门可以共享语言来快速构建应用程序。
由于系统的用户群体是有一定技术基础的开发人员。所以,系统定位是低代码平台,而非零代码平台。
其次,由于系统主要是用于简化基本的页面开发,所以技术路线应该选择:基于模型驱动的应用平台。
最后,由于生成的代码是应用于 Java Web 框架。生成的后端代码是 java 代码;前端代码是基于 vue + element-ui 生态的前端代码。
代码生成规则对应的数据建模:
自动生成前后端代码。
Kafka 是一个分布式的、可水平扩展的、基于发布/订阅模式的、支持容错的消息系统。
Kafka 使用 Zookeeper 来维护集群成员的信息。每个 Broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在 Broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。Kafka 组件订阅 Zookeeper 的 /broker/ids
路径,当有 Broker 加入集群或退出集群时,这些组件就可以获得通知。
如果要启动另一个具有相同 ID 的 Broker,会得到一个错误——新 Broker 会试着进行注册,但不会成功,因为 ZooKeeper 中已经有一个具有相同 ID 的 Broker。
在 Broker 停机、出现网络分区或长时间垃圾回收停顿时,Broker 会与 ZooKeeper 断开连接,此时 Broker 在启动时创建的临时节点会自动被 ZooKeeper 移除。监听 Broker 列表的 Kafka 组件会被告知 Broker 已移除。
Kafka 在 ZooKeeper 的关键存储信息:
admin
:存储管理信息。主要为删除主题事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)brokers
:存储 Broker 相关信息。broker 节点以及节点上的主题相关信息cluster
:存储 kafka 集群信息config
:存储 broker,client,topic,user 以及 changer 相关的配置信息consumers
:存储消费者相关信息controller
:存储控制器节点信息controller_epoch
:存储控制器节点当前的年龄(说明控制器节点变更次数)ZooKeeper 两个重要特性:
- 客户端会话结束时,ZooKeeper 就会删除临时节点。
- 客户端注册监听它关心的节点,当节点状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端。
详细内容可以参考:ZooKeeper 原理
控制器(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。控制器其实就是一个 Broker,只不过它除了具有一般 Broker 的功能以外,还负责 Leader 的选举。
集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller
节点。Kafka 当前选举控制器的规则是:第一个在 ZooKeeper 成功创建 /controller
临时节点的 Broker 会被指定为控制器。
选举控制器的详细流程:
第一个在 ZooKeeper 中成功创建 /controller
临时节点的 Broker 会被指定为控制器。
其他 Broker 在控制器节点上创建 Zookeeper watch 对象。
如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 临时节点就会消失。集群中的其他 Broker 通过 watch 对象得到状态变化的通知,它们会尝试让自己成为新的控制器。
第一个在 Zookeeper 里创建一个临时节点 /controller
的 Broker 成为新控制器。其他 Broker 在新控制器节点上创建 Zookeeper watch 对象。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他节点会忽略旧的 epoch 的消息。
当控制器发现一个 Broker 已离开集群,并且这个 Broker 是某些 Partition 的 Leader。此时,控制器会遍历这些 Partition,并用轮询方式确定谁应该成为新 Leader,随后,新 Leader 开始处理生产者和消费者的请求,而 Follower 开始从 Leader 那里复制消息。
简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行 Partition Leader 选举。控制器使用 epoch 来避免“脑裂”,“脑裂”是指两个节点同时被认为自己是当前的控制器。
这里的 Topic 管理,就是指控制器帮助我们完成对 Kafka Topic 的创建、删除以及分区增加的操作。换句话说,当我们执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
分区重分配主要是指,kafka-reassign-partitions 脚本(关于这个脚本,后面我也会介绍)提供的对已有 Topic 分区进行细粒度的分配功能。这部分功能也是控制器实现的。
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。在专栏后面说到工具的时候,我们再详谈 Preferred 领导者选举,这里你只需要了解这也是控制器的职责范围就可以了。
集群成员管理,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器中保存了多种数据,比较重要的的数据有:
值得注意的是,这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要是指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。
副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。
副本机制有哪些好处?
但是,Kafka 只实现了第一个好处,原因后面会阐述。
Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志。
Kafka 副本有两种角色:
为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。
Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。
除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。
ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。
判断 Follower 是否与 Leader 同步的标准:
Kafka Broker 端参数 replica.lag.time.max.ms
参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable
控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
Broker 的大部分工作是处理客户端、Partition 副本和控制器发送给 Partition Leader 的请求。Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 Broker 如何对请求作出响应。
broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接,并把它交给 Processor 线程去处理。Processor 线程的数量是可配置的。Processor 线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。
当请求放进请求队列后,IO 线程负责进行处理。
生产请求和获取请求都需要发送给 Partition 的 Leader 副本处理。如果 Broker 收到一个针对特定分区的请求,而该分区的 Leader 在另一个 Broker 上,那么发送请求的客户端会收到一个“非分区 Leader”的错误响应。Kafka 客户端要自己负责把生成请求和获取请求发送到正确的 Broker 上。
客户端怎么知道哪个是 Leader 呢?客户端通过使用另一种类型的请求来实现,那就是元数据请求(metadata request)。这种请求包含了客户端感兴趣的 Topic 列表。broker 的响应消息指明了这些 Topic 所包含的 Partition、Partition 有哪些副本,以及哪个副本是 Leader。元数据请求可以发给任意一个 broker,因为所有 Broker 都缓存了这些信息。
客户端会把这些信息缓存起来,并直接往目标 Broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms
来配置),从而知道元数据是否发生了变化。
acks 参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:
acks=0
- 消息发送完毕,生产者认为消息写入成功;acks=1
- Leader 写入成功,生产者认为消息写入成功;acks=all
- 所有同步副本写入成功,生产者才认为消息写入成功。如果 Leader 收到生产消息,它会执行一些检查逻辑,包含:
acks
参数取值是否合法(只允许 0
,1
,all
)?acks
设置为 all
,是否有足够的同步副本已经安全写入消息?(我们可以配置如果同步副本数量不足,Leader 拒绝处理新消息)之后,消息被写入到本地磁盘。一旦消息本地持久化后,如果 acks
被设为 0
或 1
,那么会返回结果给客户端;如果 acks
被设为 all
那么会将请求放置在一个称为 purgatory
的缓冲区中等待其他的副本写入完成。
Leader 处理拉取请求和处理生产请求的方式很相似:
客户端可以指定 Broker 返回数据量的上限和下限,防止数据量过大造成客户端内存溢出。同时,客户端也可以指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回。指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。
不是所有 Leader 的数据都能够被读取。消费者只能读取已提交的消息。只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
因为还没有被足够的副本持久化的消息,被认为是不安全的——如果 Leader 发生故障,另一个副本成为新的 Leader,这些消息就丢失了。如果允许读取这些消息,就可能会破坏数据一致性。
这也意味着,如果 Broker 间的消息复制因为某些原因变慢了,那么消息到达消费者的时间也会随之变长。延迟时间可以通过 replica.lag.time.max.ms
来配置,它指定了副本在复制消息时可被允许的最大延迟时间。
我们讨论了 Kafka 中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是 Kafka 的自定义二进制协议。集群中 Broker 间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举 Partition Leader 过程中,控制器会发送 LeaderAndIsr 请求给新的 Leader 和其他跟随副本。
这个协议目前已经支持 20 种请求类型,并且仍然在演进以支持更多的类型。
Follower 宕机,啥事儿没有;Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。
生产者/消费者如何知道谁是 Leader
Kafka 是 Apache 的开源项目。Kafka 既可以作为一个消息队列中间件,也可以作为一个分布式流处理平台。
Kafka 用于构建实时数据管道和流应用。它具有水平可伸缩性,容错性,快速快速性。
持久化是 Kafka 的一个重要特性。
Kafka 集群持久化保存(使用可配置的保留期限)所有发布记录——无论它们是否被消费。但是,Kafka 不会一直保留数据,也不会等待所有的消费者读取了消息才删除消息。只要数据量达到上限(比如 1G)或者数据达到过期时间(比如 7 天),Kafka 就会删除旧消息。Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。
Kafka 对消息的存储和缓存严重依赖于文件系统。
顺序磁盘访问在某些情况下比随机内存访问还要快!在 Kafka 中,所有数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这表明数据被转移到了内核的 pagecache 中。所以,虽然 Kafka 数据存储在磁盘中,但其访问性能也不低。
Kafka 的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端依次将消息块一次加载到它的日志中。这可以有效减少大量的小型 I/O 操作。
由于 Kafka 在 Producer、Broker 和 Consumer 都共享标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。这可以避免字节拷贝带来的开销。
Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 Consumer 消费时解压缩。压缩传输数据,可以有效减少网络带宽开销。
所有这些优化都允许 Kafka 以接近网络速度传递消息。
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。
Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。
Log
是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log
对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test
,那么,Kafka 会在磁盘上创建两个子目录:test-0
和 test-1
;而在服务器端,这就对应两个 Log
对象。
因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。
Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除。
Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
Segment 文件可以分为两类:
.index
).timeindex
).txnindex
):如果没有使用 Kafka 事务,则不会创建该文件.log
)Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式和从生产者发送过来或消费者读取的数据格式是一样的。因为使用了相同的数据格式,使得 Kafka 可以进行零拷贝技术给消费者发送消息,同时避免了压缩和解压。
除了键、值和偏移量外,消息里还包含了消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip 或者 LZ4)和时间戳(0.10.0 新增)。时间戳可以是生产者发送消息的时间,也可以是消息到达 Broker 的时间,这个是可配的。
如果生产者发送的是压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:
如果生产者使用了压缩功能,发送的批次越大,就意味着能获得更好的网络传输效率,并且节省磁盘存储空间。
Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。
Kafka 允许消费者从任意有效的偏移量位置开始读取消息。Kafka 为每个 Partition 都维护了一个索引(即 .index
文件),该索引将偏移量映射到片段文件以及偏移量在文件里的位置。
索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重读消息并录制偏移量和位置来重新生成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka 会自动重新生成这些索引。
索引文件用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由 offset 和 position 组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka 采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;
有了偏移量索引文件,通过它,Kafka 就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的 position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。
每个日志片段可以分为以下两个部分:
如果在 Kafka 启动时启用了清理功能(通过 log.cleaner.enabled
配置),每个 Broker 会启动一个清理管理器线程和若干个清理线程,每个线程负责一个 Partition。
清理线程会读取污浊的部分,并在内存里创建一个 map。map 的 key 是消息键的哈希吗,value 是消息的偏移量。对于相同的键,只保留最新的位移。其中 key 的哈希大小为 16 字节,位移大小为 8 个字节。也就是说,一个映射只有 24 字节,假设消息大小为 1KB,那么 1GB 的段有 1 百万条消息,建立这个段的映射只需要 24MB 的内存,映射的内存效率是非常高效的。
在配置 Kafka 时,管理员需要设置这些清理线程可以使用的总内存。如果设置 1GB 的总内存同时有 5 个清理线程,那么每个线程只有 200MB 的内存可用。在清理线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka 会一次清理最老的几个脏段,然后在下一次再处理其他的脏段。
一旦建立完脏段的键与位移的映射后,清理线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。
对于一个段,清理前后的效果如下:
对于只保留最新消息的清理策略来说,Kafka 还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个 null 的消息内容。当清理线程发现这条消息时,它首先仍然进行一个正常的清理并且保留这个包含 null 的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,清理线程会删除这条消息,这个键会从 Partition 中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。
国外网友制作了一张 Git Cheat Sheet,总结很精炼,各位不妨收藏一下。
本节选择性介绍 git 中比较常用的命令行场景。
(1)Debian/Ubuntu 环境安装
如果你使用的系统是 Debian/Ubuntu , 安装命令为:
1 | $ apt-get install libcurl4-gnutls-dev libexpat1-dev gettext \ |
(2)Centos/RedHat 环境安装
如果你使用的系统是 Centos/RedHat ,安装命令为:
1 | $ yum install curl-devel expat-devel gettext-devel \ |
(3)Windows 环境安装
在Git 官方下载地址下载 exe 安装包。按照安装向导安装即可。
建议安装 Git Bash 这个 git 的命令行工具。
(4)Mac 环境安装
在Git 官方下载地址下载 mac 安装包。按照安装向导安装即可。
Git 自带一个 git config
的工具来帮助设置控制 Git 外观和行为的配置变量。 这些变量存储在三个不同的位置:
/etc/gitconfig
文件: 包含系统上每一个用户及他们仓库的通用配置。 如果使用带有 --system
选项的 git config
时,它会从此文件读写配置变量。~/.gitconfig
或 ~/.config/git/config
文件:只针对当前用户。 可以传递 --global
选项让 Git 读写此文件。config
文件(就是 .git/config
):针对该仓库。每一个级别覆盖上一级别的配置,所以 .git/config
的配置变量会覆盖 /etc/gitconfig
中的配置变量。
在 Windows 系统中,Git 会查找 $HOME
目录下(一般情况下是 C:\Users\$USER
)的 .gitconfig
文件。 Git 同样也会寻找 /etc/gitconfig
文件,但只限于 MSys 的根目录下,即安装 Git 时所选的目标位置。
当安装完 Git 应该做的第一件事就是设置你的用户名称与邮件地址。 这样做很重要,因为每一个 Git 的提交都会使用这些信息,并且它会写入到你的每一次提交中,不可更改:
1 | git config --global user.name "John Doe" |
再次强调,如果使用了 --global
选项,那么该命令只需要运行一次,因为之后无论你在该系统上做任何事情, Git 都会使用那些信息。 当你想针对特定项目使用不同的用户名称与邮件地址时,可以在那个项目目录下运行没有 --global
选项的命令来配置。
很多 GUI 工具都会在第一次运行时帮助你配置这些信息。
在 OS X 和 Linux 下, 你的 Git 的配置文件储存在 ~/.gitconfig
。我在[alias]
部分添加了一些快捷别名(和一些我容易拼写错误的),如下:
1 | [alias] |
你可能有一个仓库需要授权,这时你可以缓存用户名和密码,而不用每次推/拉(push/pull)的时候都输入,Credential helper 能帮你。
1 | git config --global credential.helper cache |
1 | git config --global credential.helper 'cache --timeout=3600' |
1 | $ git init |
1 | # 通过 SSH |
有时,我们需要在同一个项目的不同分支上工作。当需要切换分支时,偏偏本地的工作还没有完成,此时,提交修改显得不严谨,但是不提交代码又无法切换分支。这时,你可以使用 git stash
将本地的修改内容作为草稿储藏起来。
官方称之为储藏,但我个人更喜欢称之为存草稿。
1 | # 1. 将修改作为当前分支的草稿保存 |
git add
命令用于将修改添加到暂存区。
1 | git add xxx |
1 | git add . |
1 | git add -A |
暂存文件部分内容
1 | git add --patch filename.x |
-p
简写。这会打开交互模式, 你将能够用 s
选项来分隔提交(commit);
然而, 如果这个文件是新的, 会没有这个选择, 添加一个新文件时,这样做:
1 | git add -N filename.x |
然后, 你需要用 e
选项来手动选择需要添加的行,执行 git diff --cached
将会显示哪些行暂存了哪些行只是保存在本地了。
这个有点困难, 我能想到的最好的方法是先 stash 未暂存的内容, 然后重置(reset),再 pop 第一步 stashed 的内容, 最后再 add 它们。
1 | git stash -k |
git commit
命令用于将修改保存到到本地仓库。
1 | git show |
或者
1 | git log -n1 -p |
1 | git commit -a |
1 | git commit |
1 | git commit --amend |
1 | git commit -m 'commit message' |
如果你的提交信息写错了且这次提交(commit)还没有推送(push),可以使用以下命令修改:
1 | git commit --amend |
或者
1 | git commit --amend -m 'xxxxxxx' |
1 | git commit --amend --author "New Authorname <authoremail@mydomain.com>" |
1 | git checkout HEAD^ myfile |
如果你需要删除推了的提交(pushed commits),你可以使用下面的方法。可是,这会不可逆的改变你的历史,也会搞乱那些已经从该仓库拉取(pulled)了的人的历史。简而言之,如果你不是很确定,千万不要这么做。
1 | git reset HEAD^ --hard |
如果你还没有推到远程, 把 Git 重置(reset)到你最后一次提交前的状态就可以了(同时保存暂存的变化):
1 | (my-branch*)$ git reset --soft HEAD@{1} |
这只能在没有推送之前有用. 如果你已经推了, 唯一安全能做的是 git revert SHAofBadCommit
, 那会创建一个新的提交(commit)用于撤消前一个提交的所有变化(changes); 或者, 如果你推的这个分支是 rebase-safe 的 (例如: 其它开发者不会从这个分支拉), 只需要使用 git push -f
; 更多, 请参考 the above section。
同样的警告:不到万不得已的时候不要这么做.
1 | git rebase --onto SHA1_OF_BAD_COMMIT^ SHA1_OF_BAD_COMMIT |
或者做一个 交互式 rebase 删除那些你想要删除的提交(commit)里所对应的行。
1 | To https://github.com/yourusername/repo.git |
注意, rebasing(见下面)和修正(amending)会用一个新的提交(commit)代替旧的, 所以如果之前你已经往远程仓库上推过一次修正前的提交(commit),那你现在就必须强推(force push) (-f
)。 注意 – 总是 确保你指明一个分支!
1 | (my-branch)$ git push origin mybranch -f |
一般来说, 要避免强推. 最好是创建和推(push)一个新的提交(commit),而不是强推一个修正后的提交。后者会使那些与该分支或该分支的子分支工作的开发者,在源历史中产生冲突。
如果你意外的做了 git reset --hard
, 你通常能找回你的提交(commit), 因为 Git 对每件事都会有日志,且都会保存几天。
1 | (master)$ git reflog |
你将会看到一个你过去提交(commit)的列表, 和一个重置的提交。 选择你想要回到的提交(commit)的 SHA,再重置一次:
1 | (master)$ git reset --hard SHA1234 |
这样就完成了。
撤销本地修改:
1 | # 移除缓存区的所有文件(i.e. 撤销上次git add) |
删除添加.gitignore
文件前错误提交的文件:
1 | # 提交一条 git 记录,提交信息为 remove xyz file |
撤销远程修改(创建一个新的提交,并回滚到指定版本):
1 | # revert 哈希号为 commit-hash 的记录 |
彻底删除指定版本:
1 | # 执行下面命令后,commit-hash 提交后的记录都会被彻底删除,使用需谨慎 |
1 | # 下载远程端版本,但不合并到HEAD中 |
1 | git push remote <remote> <branch> |
1 | git push --tags |
未暂存(Unstaged)的内容
git checkout -b my-branch
1 | git stash |
如果你只是想重置源(origin)和你本地(local)之间的一些提交(commit),你可以:
1 | ## one commit |
重置某个特殊的文件, 你可以用文件名做为参数:
1 | git reset filename |
如果你想丢弃工作拷贝中的一部分内容,而不是全部。
签出(checkout)不需要的内容,保留需要的。
1 | $ git checkout -p |
另外一个方法是使用 stash
, Stash 所有要保留下的内容, 重置工作拷贝, 重新应用保留的部分。
1 | $ git stash -p |
或者, stash 你不需要的部分, 然后 stash drop。
1 | $ git stash -p |
分支(Branches)
1 | git branch |
1 | git branch -r |
1 | git branch <new-branch> |
1 | git branch --track <new-branch> <remote-branch> |
1 | git branch -d <branch> |
注意:强制删除本地分支,将会丢失未合并的修改
1 | git branch -D <branch> |
1 | git push <remote> :<branch> (since Git v1.5.0) |
1 | git checkout <branch> |
1 | git checkout -b <branch> |
这是另外一种使用 git reflog
情况,找到在这次错误拉(pull) 之前 HEAD 的指向。
1 | (master)$ git reflog |
重置分支到你所需的提交(desired commit):
1 | git reset --hard c5bc55a |
完成。
先确认你没有推(push)你的内容到远程。
git status
会显示你领先(ahead)源(origin)多少个提交:
1 | (my-branch)$ git status |
一种方法是:
1 | (master)$ git reset --hard origin/my-branch |
在 master 下创建一个新分支,不切换到新分支,仍在 master 下:
1 | (master)$ git branch my-branch |
把 master 分支重置到前一个提交:
1 | (master)$ git reset --hard HEAD^ |
HEAD^
是 HEAD^1
的简写,你可以通过指定要设置的HEAD
来进一步重置。
或者, 如果你不想使用 HEAD^
, 找到你想重置到的提交(commit)的 hash(git log
能够完成), 然后重置到这个 hash。 使用git push
同步内容到远程。
例如, master 分支想重置到的提交的 hash 为a13b85e
:
1 | (master)$ git reset --hard a13b85e |
签出(checkout)刚才新建的分支继续工作:
1 | (master)$ git checkout my-branch |
假设你正在做一个原型方案(原文为 working spike (see note)), 有成百的内容,每个都工作得很好。现在, 你提交到了一个分支,保存工作内容:
1 | (solution)$ git add -A && git commit -m "Adding all changes from this spike into one big commit." |
当你想要把它放到一个分支里 (可能是feature
, 或者 develop
), 你关心是保持整个文件的完整,你想要一个大的提交分隔成比较小。
假设你有:
solution
, 拥有原型方案, 领先 develop
分支。develop
, 在这里你应用原型方案的一些内容。我去可以通过把内容拿到你的分支里,来解决这个问题:
1 | (develop)$ git checkout solution -- file1.txt |
这会把这个文件内容从分支 solution
拿到分支 develop
里来:
1 | ## On branch develop |
然后, 正常提交。
Note: Spike solutions are made to analyze or solve the problem. These solutions are used for estimation and discarded once everyone gets clear visualization of the problem. ~ Wikipedia.
假设你有一个master
分支, 执行git log
, 你看到你做过两次提交:
1 | (master)$ git log |
让我们用提交 hash(commit hash)标记 bug (e3851e8
for #21, 5ea5173
for #14).
首先, 我们把master
分支重置到正确的提交(a13b85e
):
1 | (master)$ git reset --hard a13b85e |
现在, 我们对 bug #21 创建一个新的分支:
1 | (master)$ git checkout -b 21 |
接着, 我们用 cherry-pick 把对 bug #21 的提交放入当前分支。 这意味着我们将应用(apply)这个提交(commit),仅仅这一个提交(commit),直接在 HEAD 上面。
1 | (21)$ git cherry-pick e3851e8 |
这时候, 这里可能会产生冲突, 参见交互式 rebasing 章 冲突节 解决冲突.
再者, 我们为 bug #14 创建一个新的分支, 也基于master
分支
1 | (21)$ git checkout master |
最后, 为 bug #14 执行 cherry-pick
:
1 | (14)$ git cherry-pick 5ea5173 |
一旦你在 github 上面合并(merge)了一个 pull request, 你就可以删除你 fork 里被合并的分支。 如果你不准备继续在这个分支里工作, 删除这个分支的本地拷贝会更干净,使你不会陷入工作分支和一堆陈旧分支的混乱之中。
1 | git fetch -p |
如果你定期推送到远程, 多数情况下应该是安全的,但有些时候还是可能删除了还没有推到远程的分支。 让我们先创建一个分支和一个新的文件:
1 | (master)$ git checkout -b my-branch |
添加文件并做一次提交
1 | (my-branch)$ git add . |
现在我们切回到主(master)分支,‘不小心的’删除my-branch
分支
1 | (my-branch)$ git checkout master |
在这时候你应该想起了reflog
, 一个升级版的日志,它存储了仓库(repo)里面所有动作的历史。
1 | (master)$ git reflog |
正如你所见,我们有一个来自删除分支的提交 hash(commit hash),接下来看看是否能恢复删除了的分支。
1 | (master)$ git checkout -b my-branch-help |
看! 我们把删除的文件找回来了。 Git 的 reflog
在 rebasing 出错的时候也是同样有用的。
删除一个远程分支:
1 | (master)$ git push origin --delete my-branch |
你也可以:
1 | (master)$ git push origin :my-branch |
删除一个本地分支:
1 | (master)$ git branch -D my-branch |
首先, 从远程拉取(fetch) 所有分支:
1 | (master)$ git fetch --all |
假设你想要从远程的daves
分支签出到本地的daves
1 | (master)$ git checkout --track origin/daves |
(--track
是 git checkout -b [branch] [remotename]/[branch]
的简写)
这样就得到了一个daves
分支的本地拷贝, 任何推过(pushed)的更新,远程都能看到.
1 | $ git tag <tag-name> |
1 | $ git tag -a <tag-name> |
1 | git tag -d <tag_name> |
如果你想恢复一个已删除标签(tag), 可以按照下面的步骤: 首先, 需要找到无法访问的标签(unreachable tag):
1 | git fsck --unreachable | grep tag |
记下这个标签(tag)的 hash,然后用 Git 的 update-ref:
1 | git update-ref refs/tags/<tag_name> <hash> |
这时你的标签(tag)应该已经恢复了。
merge 与 rebase 虽然是 git 常用功能,但是强烈建议不要使用 git 命令来完成这项工作。
因为如果出现代码冲突,在没有代码比对工具的情况下,实在太艰难了。
你可以考虑使用各种 Git GUI 工具。
1 | git merge <branch> |
1 | git rebase <branch> |
你可以合并(merge)或 rebase 了一个错误的分支, 或者完成不了一个进行中的 rebase/merge。 Git 在进行危险操作的时候会把原始的 HEAD 保存在一个叫 ORIG_HEAD 的变量里, 所以要把分支恢复到 rebase/merge 前的状态是很容易的。
1 | (my-branch)$ git reset --hard ORIG_HEAD |
不幸的是,如果你想把这些变化(changes)反应到远程分支上,你就必须得强推(force push)。 是因你快进(Fast forward)了提交,改变了 Git 历史, 远程分支不会接受变化(changes),除非强推(force push)。这就是许多人使用 merge 工作流, 而不是 rebasing 工作流的主要原因之一, 开发者的强推(force push)会使大的团队陷入麻烦。使用时需要注意,一种安全使用 rebase 的方法是,不要把你的变化(changes)反映到远程分支上, 而是按下面的做:
1 | (master)$ git checkout my-branch |
更多, 参见 this SO thread.
假设你的工作分支将会做对于 master
的 pull-request。 一般情况下你不关心提交(commit)的时间戳,只想组合 所有 提交(commit) 到一个单独的里面, 然后重置(reset)重提交(recommit)。 确保主(master)分支是最新的和你的变化都已经提交了, 然后:
1 | (my-branch)$ git reset --soft master |
如果你想要更多的控制, 想要保留时间戳, 你需要做交互式 rebase (interactive rebase):
1 | (my-branch)$ git rebase -i master |
如果没有相对的其它分支, 你将不得不相对自己的HEAD
进行 rebase。 例如:你想组合最近的两次提交(commit), 你将相对于HEAD\~2
进行 rebase, 组合最近 3 次提交(commit), 相对于HEAD\~3
, 等等。
1 | (master)$ git rebase -i HEAD~2 |
在你执行了交互式 rebase 的命令(interactive rebase command)后, 你将在你的编辑器里看到类似下面的内容:
1 | pick a9c8a1d Some refactoring |
所有以 #
开头的行都是注释, 不会影响 rebase.
然后,你可以用任何上面命令列表的命令替换 pick
, 你也可以通过删除对应的行来删除一个提交(commit)。
例如, 如果你想 单独保留最旧(first)的提交(commit),组合所有剩下的到第二个里面, 你就应该编辑第二个提交(commit)后面的每个提交(commit) 前的单词为 f
:
1 | pick a9c8a1d Some refactoring |
如果你想组合这些提交(commit) 并重命名这个提交(commit), 你应该在第二个提交(commit)旁边添加一个r
,或者更简单的用s
替代 f
:
1 | pick a9c8a1d Some refactoring |
你可以在接下来弹出的文本提示框里重命名提交(commit)。
1 | Newer, awesomer features |
如果成功了, 你应该看到类似下面的内容:
1 | (master)$ Successfully rebased and updated refs/heads/master. |
--no-commit
执行合并(merge)但不自动提交, 给用户在做提交前检查和修改的机会。 no-ff
会为特性分支(feature branch)的存在过留下证据, 保持项目历史一致。
1 | (master)$ git merge --no-ff --no-commit my-branch |
1 | (master)$ git merge --squash my-branch |
有时候,在将数据推向上游之前,你有几个正在进行的工作提交(commit)。这时候不希望把已经推(push)过的组合进来,因为其他人可能已经有提交(commit)引用它们了。
1 | (master)$ git rebase -i @{u} |
这会产生一次交互式的 rebase(interactive rebase), 只会列出没有推(push)的提交(commit), 在这个列表时进行 reorder/fix/squash 都是安全的。
检查一个分支上的所有提交(commit)是否都已经合并(merge)到了其它分支, 你应该在这些分支的 head(或任何 commits)之间做一次 diff:
1 | (master)$ git log --graph --left-right --cherry-pick --oneline HEAD...feature/120-on-scroll |
这会告诉你在一个分支里有而另一个分支没有的所有提交(commit), 和分支之间不共享的提交(commit)的列表。 另一个做法可以是:
1 | (master)$ git log master ^feature/120-on-scroll --no-merges |
如果你看到的是这样:
1 | noop |
这意味着你 rebase 的分支和当前分支在同一个提交(commit)上, 或者 领先(ahead) 当前分支。 你可以尝试:
HEAD\~2
或者更早如果你不能成功的完成 rebase, 你可能必须要解决冲突。
首先执行 git status
找出哪些文件有冲突:
1 | (my-branch)$ git status |
在这个例子里面, README.md
有冲突。 打开这个文件找到类似下面的内容:
1 | <<<<<<< HEAD |
你需要解决新提交的代码(示例里, 从中间==
线到new-commit
的地方)与HEAD
之间不一样的地方.
有时候这些合并非常复杂,你应该使用可视化的差异编辑器(visual diff editor):
1 | (master*)$ git mergetool -t opendiff |
在你解决完所有冲突和测试过后, git add
变化了的(changed)文件, 然后用git rebase --continue
继续 rebase。
1 | (my-branch)$ git add README.md |
如果在解决完所有的冲突过后,得到了与提交前一样的结果, 可以执行git rebase --skip
。
任何时候你想结束整个 rebase 过程,回来 rebase 前的分支状态, 你可以做:
1 | (my-branch)$ git rebase --abort |
显示工作路径下已修改的文件:git status
显示与上次提交版本文件的不同:git diff
显示提交历史:
1 | # 从最新提交开始,显示所有的提交记录(显示hash, 作者信息,提交的标题和时间) |
显示搜索内容:
1 | # 从当前目录的所有文件中查找文本内容 |
1 | git clone --recursive git://github.com/foo/bar.git |
如果已经克隆了:
1 | git submodule update --init --recursive |
如果某人在 GitHub 上给你发了一个 pull request, 但是然后他删除了他自己的原始 fork, 你将没法克隆他们的提交(commit)或使用 git am
。在这种情况下, 最好手动的查看他们的提交(commit),并把它们拷贝到一个本地新分支,然后做提交。
做完提交后, 再修改作者,参见变更作者。 然后, 应用变化, 再发起一个新的 pull request。
1 | (master)$ git mv --force myfile MyFile |
1 | (master)$ git rm --cached log.txt |
GitHub 中 Fork 是 服务端的代码仓库克隆(即 新克隆出来的代码仓库在远程服务端),包含了原来的仓库(即 upstream repository,上游仓库)所有内容,如分支、Tag、提交。代码托管服务(如 Github、BitBucket)提供了方便的完成 Fork 操作的功能(在仓库页面点一下 Fork 按钮)。这样有了一个你自己的可以自由提交的远程仓库,然后可以通过的 Pull Request 把你的提交贡献回 原仓库。而对于原仓库 Owner 来说,鼓励别人 Fork 自己的仓库,通过 Pull Request 给自己的仓库做贡献,也能提高了自己仓库的知名度。
参考:Fork a repo
(1)执行 git remote -v
,您将看到当前为 fork 配置的远程存储库。
(2)添加上游项目的仓库地址
1 | git remote add upstream <github仓库地址> |
(3)确认是否添加成功,再次键入 git remote -v
。
(4)获取上游项目更新,可以执行 git fetch upstream
(5)同步上游项目的代码到新仓库
1 | # merge |
你把事情搞砸了:你 重置(reset)
了一些东西, 或者你合并了错误的分支, 亦或你强推了后找不到你自己的提交(commit)了。有些时候, 你一直都做得很好, 但你想回到以前的某个状态。
这就是 git reflog
的目的, reflog
记录对分支顶端(the tip of a branch)的任何改变, 即使那个顶端没有被任何分支或标签引用。基本上, 每次 HEAD 的改变, 一条新的记录就会增加到reflog
。遗憾的是,这只对本地分支起作用,且它只跟踪动作 (例如,不会跟踪一个没有被记录的文件的任何改变)。
1 | (master)$ git reflog |
上面的 reflog 展示了从 master 分支签出(checkout)到 2.2 分支,然后再签回。 那里,还有一个硬重置(hard reset)到一个较旧的提交。最新的动作出现在最上面以 HEAD@{0}
标识.
如果事实证明你不小心回移(move back)了提交(commit), reflog 会包含你不小心回移前 master 上指向的提交(0254ea7)。
1 | git reset --hard 0254ea7 |
然后使用 git reset 就可以把 master 改回到之前的 commit,这提供了一个在历史被意外更改情况下的安全网。
Scala 是大数据领域的热门语言,如:Akka、Kafka,所以,想要学习大数据顶级开源项目的源码,有必要具备一定的 Scala 基础。
Scala 基本语法需要注意以下几点:
class MyFirstScalaClass
def myMethodName()
def main(args: Array[String])
- Scala 程序从 main()
方法开始处理,这是每一个 Scala 程序的强制程序入口部分。Scala 类似 Java 支持单行和多行注释。
【示例】单行和多行注释
1 | object HelloWorld { |
在 Scala 中,使用关键词 var
声明变量,使用关键词 val
声明常量。
【示例】声明变量
1 | var myVar : String = "Foo" |
【示例】声明常量
1 | val myVal : String = "Foo" |
变量的类型在变量名之后等号之前声明。定义变量的类型的语法格式如下:
1 | // 声明变量类型 |
在 Scala 中声明变量和常量不一定要指明数据类型,在没有指明数据类型的情况下,其数据类型是通过变量或常量的初始值推断出来的。所以,如果在没有指明数据类型的情况下声明变量或常量必须要给出其初始值,否则将会报错。
1 | var myVar = 10; |
Scala 与 Java 有着相同的数据类型:
数据类型 | 描述 |
---|---|
Byte | 8 位有符号补码整数。数值区间为 -128 到 127 |
Short | 16 位有符号补码整数。数值区间为 -32768 到 32767 |
Int | 32 位有符号补码整数。数值区间为 -2147483648 到 2147483647 |
Long | 64 位有符号补码整数。数值区间为 -9223372036854775808 到 9223372036854775807 |
Float | 32 位, IEEE 754 标准的单精度浮点数 |
Double | 64 位 IEEE 754 标准的双精度浮点数 |
Char | 16 位无符号 Unicode 字符, 区间值为 U+0000 到 U+FFFF |
String | 字符序列 |
Boolean | true 或 false |
Unit | 表示无值,和其他语言中 void 等同。用作不返回任何结果的方法的结果类型。Unit 只有一个实例值,写成()。 |
Null | null 或空引用 |
Nothing | Nothing 类型在 Scala 的类层级的最底端;它是任何其他类型的子类型。 |
Any | Any 是所有其他类的超类 |
AnyRef | AnyRef 类是 Scala 里所有引用类(reference class)的基类 |
上表中列出的数据类型都是对象,也就是说 scala 没有 java 中的原生类型。在 scala 是可以对数字等基础类型调用方法的。
Scala 数组声明的语法格式:
1 | // 方式一 |
【示例】条件语句
1 | object IfDemo { |
和 Java 一样,Scala 支持 while
、do ... while
、for
三种循环语句。
1 | object WhileDemo { |
**scala 不支持 break
和 continue
**。但是,可以通过 Breaks
对象来进行循环控制。
1 | import scala.util.control._ |
scala 的 match
对应 Java 里的 switch
,但是写在选择器表达式之后。即: 选择器 match {备选项}。
1 | /** |
Scala 含有丰富的内置运算符,包括以下几种类型:
+
、-
、*
、/
、%
==
、!=
、>
、<
、>=
、<=
&&
、||
、!
~
、&
、|
、^
、<<
、>>
、>>>
(无符号右移)=
Scala 有方法与函数,二者在语义上的区别很小。
Scala 中的方法跟 Java 的类似,方法是组成类的一部分。
Scala 中的函数则是一个完整的对象,Scala 中的函数其实就是继承了 Trait 的类的对象。
Scala 中使用 val
语句可以定义函数,def
语句定义方法。
【示例】
1 | class Test { |
闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。
闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数。
如下面这段匿名的函数:
1 | val multiplier = (i:Int) => i * 10 |
函数体内有一个变量 i,它作为函数的一个参数。如下面的另一段代码:
1 | val multiplier = (i:Int) => i * factor |
在 multiplier 中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier 函数被调用时,i 被赋予一个新的值。然而,factor 不是形式参数,而是自由变量,考虑下面代码:
1 | var factor = 3 |
这里我们引入一个自由变量 factor,这个变量定义在函数外面。
这样定义的函数变量 multiplier 成为一个”闭包”,因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数。
1 | object ClosureDemo { |
Scala 集合支持 List、Set、Map、元祖、Option。
1 | // 定义整型 List |
迭代器 it 的两个基本操作是 next 和 hasNext。
调用 it.next() 会返回迭代器的下一个元素,并且更新迭代器的状态。
调用 it.hasNext() 用于检测集合中是否还有元素。
1 | object Test { |
类是对象的抽象,而对象是类的具体实例。类是抽象的,不占用内存,而对象是具体的,占用存储空间。类是用于创建对象的蓝图,它是一个定义包括在特定类型的对象中的方法和变量的软件模板。
1 | class Point(val xc: Int, val yc: Int) { |
Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。
与接口不同的是,它还可以定义属性和方法的实现。
一般情况下 Scala 的类只能够继承单一父类,但是如果是 Trait(特征) 的话就可以继承多个,从结果来看就是实现了多重继承。
1 | trait Equal { |
Scala 抛出异常的方法和 Java 一样,使用 throw
关键词。
【示例】抛出异常
1 | throw new IllegalArgumentException |
【示例】捕获异常
1 | import java.io.{FileNotFoundException, FileReader, IOException} |
使用 scala.io.StdIn.readLine()
方法读取用户输入
1 | object StdInDemo { |
1 | object SourceDemo { |
Scala 使用 package
关键字定义包,在 Scala 将代码定义到某个包中有两种方式:
第一种方法和 Java 一样,在文件的头定义包名,这种方法就后续所有代码都放在该包中。 比如:
1 | package com.runoob |
第二种方法有些类似 C#,如:
1 | package com.runoob { |
Scala 使用 import
关键字引用包。
1 | import java.awt.Color // 引入Color |
import 语句可以出现在任何地方,而不是只能在文件顶部。import 的效果从开始延伸到语句块的结束。这可以大幅减少名称冲突的可能性。
如果想要引入包中的几个成员,可以使用 selector(选取器):
1 | import java.awt.{Color, Font} |
注意:默认情况下,Scala 总会引入 java.lang._ 、 scala._ 和 Predef._,这里也能解释,为什么以 scala 开头的包,在使用时都是省去 scala.的。
Scala 访问修饰符基本和 Java 的一样,分别有:private,protected,public。
如果没有指定访问修饰符,默认情况下,Scala 对象的访问级别都是 public。
Scala 中的 private 限定符,比 Java 更严格,在嵌套类情况下,外层类甚至不能访问被嵌套类的私有成员。
1 | class Outer { |
不管是把 Kafka 作为消息队列系统、还是数据存储平台,总是需要一个可以向 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,或者是一个兼具两种角色的应用程序。
使用 Kafka 的场景很多,诉求也各有不同,主要有:是否允许丢失消息?是否接受重复消息?是否有严格的延迟和吞吐量要求?
不同的场景对于 Kafka 生产者 API 的使用和配置会有直接的影响。
Kafka Producer 发送的数据对象叫做 ProducerRecord
,它有 4 个关键参数:
Topic
- 主题Partition
- 分区(非必填)Key
- 键(非必填)Value
- 值Kafka 生产者发送消息流程:
(1)序列化 - 发送前,生产者要先把键和值序列化。
(2)分区 - 数据被传给分区器。如果在 ProducerRecord
中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord
的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。
(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
(4)响应 - 服务器收到消息会返回一个响应。
RecordMetaData
对象,它包含了主题、分区、偏移量;生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
MetadataRequest
),获取到每一个分区对应的 Leader 信息,并缓存到本地。Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer
。通常我们开发一个生产者的步骤有 4 步。
KafkaProducer
对象实例。KafkaProducer
的 send
方法发送消息。KafkaProducer
的 close
方法关闭生产者并释放各种系统资源。Kafka 生产者核心配置:
bootstrap.servers
- 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers
中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers
指定所有的 Broker。key.serializer
- 键的序列化器。value.serializer
- 值的序列化器。1 | // 指定生产者的配置 |
直接发送消息,不关心消息是否到达。
这种方式吞吐量最高,但有小概率会丢失消息。
【示例】异步发送
1 | ProducerRecord<String, String> record = |
返回一个 Future
对象,调用 get()
方法,会一直阻塞等待 Broker
返回结果。
这是一种可靠传输方式,但吞吐量最差。
【示例】同步发送
1 | ProducerRecord<String, String> record = |
代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出异常、记录错误日志。
这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。
【示例】异步响应发送
首先,定义一个 callback:
1 | private class DemoProducerCallback implements Callback { |
然后,使用这个 callback:
1 | ProducerRecord<String, String> record = |
调用 producer.close()
方法可以关闭 Kafka 生产者连接。
1 | Producer<String, String> producer = new KafkaProducer<>(properties); |
Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。
选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。
Kafka 生产者创建连接有三个时机:
(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers
中所有 Broker 的 TCP 连接。
(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。
metadata.max.age.ms
参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
主动关闭是指调用 producer.close()
方法来关闭生产者连接;甚至包括用户调用 kill -9
主动“杀掉”Producer 应用。
如果设置 Producer 端 connections.max.idle.ms
参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms
指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1
,TCP 连接将成为永久长连接。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
Kafka 内置了常用 Java 基础类型的序列化器,如:StringSerializer
、IntegerSerializer
、DoubleSerializer
等。
但如果要传输较为复杂的对象,推荐使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。
使用方式是通过实现 org.apache.kafka.common.serialization.Serializer
接口来引入自定义的序列化器。
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。
为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。
所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。
前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord
,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。
如果 ProducerRecord
指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。
topic.metadata.refresh.interval.ms
的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class
。这个参数该怎么设定呢?
首先,要实现 org.apache.kafka.clients.producer.Partitioner
接口。这个接口定义了两个方法:partition
和 close
,通常只需要实现最重要的 partition
方法。我们来看看这个方法的方法签名:
1 | int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); |
这里的 topic
、key
、keyBytes
、value
和 valueBytes
都属于消息数据,cluster
则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
接着,设置 partitioner.class
参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。
负载均衡算法常见的有:
可以根据实际需要去实现。
目前,Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
那么社区引入 V2 版本的目的是什么呢?V2 版本主要是针对 V1 版本的一些弊端做了修正。
在 V1 版本中,每条消息都需要执行 CRC 校验。但有些情况下消息的 CRC 值是会发生变化的。比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC 值也会相应更新;再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化。鉴于这些情况,再对每条消息都执行 CRC 校验就有点没必要了,不仅浪费空间还耽误 CPU 时间。
因此,在 V2 版本中,只对消息集合执行 CRC 校验。V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
Kafka 的压缩流程,一言以概之——Producer 端压缩、Broker 端保持、Consumer 端解压缩。
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type
参数即表示启用指定类型的压缩算法。
【示例】开启 GZIP 的 Producer 对象
1 | Properties props = new Properties(); |
通常,Broker 从 Producer 端接收到消息后,不做任何处理。以下两种情况除外:
情况一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种情况。
情况二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
所谓零拷贝,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。
通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗。
何时启用压缩是比较合适的时机呢?
压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
如果环境中带宽资源有限,那么也建议开启压缩。
幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)
,或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
。
enable.idempotence
被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
我们必须要了解幂等性 Producer 的作用范围:
enable.idempotence
只能保证单分区上的幂等性**,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。
<Topic, Partition>
都对应一个从 0 开始单调递增的 Sequence Number。Broker 端在缓存中保存了这 seq number,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition>
的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。
实现幂等之后:
在执行创建事务时,如下:
1 | Producer<String, String> producer = new KafkaProducer<String, String>(props); |
会创建一个 Sender,并启动线程,执行如下 run 方法,在 maybeWaitForProducerId()中生成一个 producerId,如下:
1 | ==================================== |
(1)配置属性
需要设置:
enable.idempotence
,需要设置为 ture,此时就会默认把 acks 设置为 all,所以不需要再设置 acks 属性了。1 | // 指定生产者的配置 |
(2)发送消息
跟一般生产者一样,如下
1 | public void produceIdempotMessage(String topic, String message) { |
此时,因为我们并没有配置 transaction.id
属性,所以不能使用事务相关 API,如下
1 | producer.initTransactions(); |
否则会出现如下错误:
1 | Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer. |
Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。
消息可靠性保障,由低到高为:
Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。
Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
事务属性实现前提是幂等性,即在配置事务属性 transaction.id
时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
在事务属性之前先引入了生产者幂等性,它的作用为:
消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息(o1 到 o2 之间的消息)。
Producer
提供了 initTransactions
, beginTransaction
, sendOffsets
, commitTransaction
, abortTransaction
五个事务方法。
1 | /** |
使用 kafka 的事务 api 时的一些注意事项:
consumer#commitSync
或者 consumer#commitAsyc
transctional.id
。最好为其设置一个有意义的名字。enable.idempotence = true
。如果配置了 transaction.id
,则此时 enable.idempotence
会被设置为 trueisolation.level
。在 consume-trnasform-produce
模式下使用事务时,必须设置为 READ_COMMITTED
。read_uncommitted
:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。创建一个事务,在这个事务操作中,只有生成消息操作。代码如下:
1 | /** |
创建生产者,代码如下,需要:
transactional.id
属性enable.idempotence
属性1 | /** |
在一个事务中,既有生产消息操作又有消费消息操作,即常说的 Consume-tansform-produce 模式。如下实例代码
1 | /** |
创建消费者代码,需要:
1 | /** |
创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量操作在一个事务。
1 | /** |
更详尽的生产者配置可以参考:Kafka 生产者官方配置说明
以下为生产者主要配置参数清单:
acks
:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为 acks=1
acks=0
如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries
也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。acks=1
如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。acks=all
如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。buffer.memory
:用来设置 Producer 缓冲区大小。compression.type
:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:none
、gzip
、snappy
、lz4
或 zstd
。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。retries
:用来设置发送失败的重试次数。batch.size
:用来设置一个批次可占用的内存大小。linger.ms
:用来设置 Producer 在发送批次前的等待时间。client.id
:Kafka 服务器用它来识别消息源,可以是任意字符串。max.in.flight.requests.per.connection
:用来设置 Producer 在收到服务器响应前可以发送多少个消息。timeout.ms
:用来设置 Broker 等待同步副本返回消息确认的时间,与 acks
的配置相匹配。request.timeout.ms
:Producer 在发送数据时等待服务器返回响应的时间。metadata.fetch.timeout.ms
:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。max.block.ms
:该配置控制 KafkaProducer.send()
和KafkaProducer.partitionsFor()
允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。max.request.size
:请求的最大字节数。receieve.buffer.bytes
:TCP 接收缓冲区的大小。send.buffer.bytes
:TCP 发送缓冲区的大小。消息引擎获取消息有两种模式:
Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。
push 模式的优缺点:
push 模式的优缺点:
每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。
一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。
Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer。
如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费。
不同消费者群组之间互不影响。
Kafka 消费者通过 poll
来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:
customer.poll(time)
中设置等待时间poll 除了获取消息外,还有其他作用:
1 | Properties props = new Properties(); |
1 | // 订阅主题列表 |
subscribe
方法允许传入一个正则表达式,这样就可以匹配多个主题。如果有人创建了新的主题,并且主题名恰好匹配正则表达式,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题。
消息轮询是消费者 API 的核心。一旦消费者订阅了主题,轮询就会处理所有细节,包括:群组协调、分区再均衡、发送心跳和获取数据。
1 | try { |
使用 commitSync()
提交偏移量最简单也最可靠。这个 API 会提交由 poll()
方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
1 | while (true) { |
同步提交的缺点:同步提交方式会一直阻塞,直到接收到 Broker 的响应请求,这会大大限制吞吐量。
在成功提交或碰到无法恢复的错误之前,commitSync()
会一直重试,但是 commitAsync()
不会,这也是 commitAsync()
不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync()
重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息。
1 | while (true) { |
commitAsync()
也支持回调,在 Broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不过如果要用它来进行重试,则一定要注意提交的顺序。
1 | while (true) { |
重试异步提交
可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试;如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般会组合使用 commitSync()
和 commitAsync()
。
1 | try { |
提交偏移量的频率和处理消息批次的频率是一样的。如果想要更频繁地提交该怎么办?如果 poll()
方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync()
或 commitAsync()
来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
解决办法是:消费者 API 允许在调用 commitSync()
和 commitAsync()
方法时传进去希望提交的分区和偏移量的 map。
1 | private int count = 0; |
使用 poll()
方法可以从各个分区的最新偏移量处开始处理消息。
不过,有时候,我们可能需要从特定偏移量处开始处理消息。
seekToBeginning(Collection<TopicPartition> partitions)
方法seekToEnd(Collection<TopicPartition> partitions)
方法seek(TopicPartition partition, long offset)
方法通过 seek(TopicPartition partition, long offset)
可以实现处理消息和提交偏移量在一个事务中完成。思路就是需要在客户端建立一张数据表,保证处理消息和和消息偏移量位置写入到这张数据表。在一个事务中,此时就可以保证处理消息和记录偏移量要么同时成功,要么同时失败。
1 | consumer.subscribe(topic); |
如果想让消费者从轮询消费消息的无限循环中退出,可以通过另一个线程调用 consumer.wakeup()
方法。 consumer.wakeup()
是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup()
可以退出 poll()
,并抛出 WakeupException
异常,或者如果调用 consumer.wakeup()
时线程没有等待轮询,那么异常将在下一轮调用 poll()
时抛出。
1 | Runtime.getRuntime().addShutdownHook(new Thread() { |
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)。Rebalance 实现了消费者群组的高可用性和伸缩性。
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
当在群组里面 新增/移除消费者 或者 新增/移除 kafka 集群 broker 节点 时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
分区再均衡的触发时机有三种:
consumer.close()
操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。consumer.subscribe(Pattern.compile(“t.*c”))
就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的。
(1)选择群主
当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。
(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。
(4)群主分配完成之后,把分配情况发送给群组协调器。
(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。
消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。
当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets
身上。
目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
。
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。
分区再均衡发生的时机有三个:
后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:组成员数量发生变化。
有两种情况,消费者并没有宕机,但也被视为消亡:
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置session.timeout.ms 和 heartbeat.interval.ms的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。
session.timeout.ms
= 6s。heartbeat.interval.ms
= 2s。session.timeout.ms
>= 3 * heartbeat.interval.ms
。将 session.timeout.ms
设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer 消费数据时需要将消息处理之后写入到 MongoDB。显然,这是一个很重的消费逻辑。MongoDB 的一丁点不稳定都会导致 Consumer 程序消费时长的增加。此时,**max.poll.interval.ms
** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。
如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。
每次调用 poll()
方法,它总是会返回由生产者写入 Kafka 但还没有被消费者读取过的记录,Kafka 因此可以追踪哪些记录是被哪个群组的哪个消费者读取的。
更新分区当前位置的操作叫作提交。
如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
(1)如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
(2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
由此可知,处理偏移量,会对客户端处理数据产生影响。
老版本的 Consumer Group 把偏移量保存在 ZooKeeper 中。ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将偏移量保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销,有利于实现伸缩性。
这种方案的问题在于:ZooKeeper 其实并不适合进行高频的写操作,而 Consumer Group 的偏移量更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 偏移量保存在 ZooKeeper 中是不合适的做法。
新版本 Consumer 的偏移量管理机制其实也很简单。
消费者向一个叫做 _consumer_offsets
的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
**_consumer_offsets
主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >
**。
通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建偏移量主题。偏移量主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。如果偏移量主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。分区数可以通过 offsets.topic.num.partitions
设置;副本数可以通过 offsets.topic.replication.factor
设置。
自动提交是 Kafka 处理偏移量最简单的方式。
当 enable.auto.commit
属性被设为 true,那么每过 5s
,消费者会自动把从 poll()
方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms
控制,默认值是 5s
。
与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到 5s 的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。
在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。
自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。
自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。因此,可以通过手动提交偏移量,由开发者自行控制。
首先,把 enable.auto.commit
设为 false,关闭自动提交。
如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe()
方法时传进去一个 ConsumerRebalanceListener
实例就可以了。 ConsumerRebalanceListener
有两个需要实现的方法。
public void onPartitionsRevoked(Collection partitions)
方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。public void onPartitionsAssigned(Collection partitions)
方法会在重新分配分区之后和消费者开始读取消息之前被调用。1 | private Map<TopicPartition, OffsetAndMetadata> currentOffsets= |
生产者需要用序列化器将 Java 对象转换成字节数组再发送给 Kafka;同理,消费者需要用反序列化器将从 Kafka 接收到的字节数组转换成 Java 对象。
通常,会有多个 Kafka 消费者组成群组,关注一个主题。
但可能存在这样的场景:只需要一个消费者从一个主题的所有分区或某个特定的分区读取数据。这时,就不需要消费者群组和再均衡了,只需要把主题或分区分配给消费者,然后开始读取消息并提交偏移量。
如果是这样,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或为自己分配分区,但不能同时做这两件事。
1 | List<PartitionInfo> partitionInfos = null; |
bootstrap.servers
- Broker 集群地址,格式:ip1:port,ip2:port…,不需要设定全部的集群地址,设置两个或者两个以上即可。group.id
- 消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。fetch.min.bytes
- 消费者获取记录的最小字节数。Kafka 会等到有足够的数据时才返回消息给消费者,以降低负载。fetch.max.wait.ms
- Kafka 需要等待足够的数据才返回给消费者,如果一直没有足够的数据,消费者就会迟迟收不到消息。所以需要指定 Broker 的等待延迟,一旦超时,直接返回数据给消费者。max.partition.fetch.bytes
- 指定了服务器从每个分区返回给消费者的最大字节数。默认为 1 MB。session.timeout.ms
- 指定了消费者的心跳超时时间。如果消费者没有在有效时间内发送心跳给群组协调器,协调器会视消费者已经消亡,从而触发分区再均衡。默认为 3 秒。auto.offset.reset
- 指定了消费者在读取一个没有偏移量的分区或偏移量无效的情况下,该如何处理。latest
- 表示在偏移量无效时,消费者将从最新的记录开始读取分区记录。earliest
- 表示在偏移量无效时,消费者将从起始位置读取分区记录。enable.auto.commit
- 指定了是否自动提交消息偏移量,默认开启。partition.assignment.strategy
- 消费者的分区分配策略。Range
- 表示会将主题的若干个连续的分区分配给消费者。RoundRobin
- 表示会将主题的所有分区按照轮询方式分配给消费者。client.id
- 客户端标识。max.poll.records
- 用于控制单次能获取到的记录数量。receive.buffer.bytes
- 用于设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 64KB。如果设置为-1,则使用操作系统的默认值。send.buffer.bytes
- 用于设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。
一条消息从生产到消费,可以划分三个阶段:
这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。
存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
上面的话可以解读为:
Kafka 的副本机制是 kafka 可靠性保证的核心。
Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。
Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
replication.factor
的作用是设置每个分区的副本数。replication.factor
是主题级别配置; default.replication.factor
是 broker 级别配置。
副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
unclean.leader.election.enable
用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable
是 broker 级别(实际上是集群范围内)配置,默认值为 true。
min.insync.replicas
控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas
是主题级别和 broker 级别配置。
尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
注意:要确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。
Kafka 生产者 中提到了,Kafka 有三种发送方式:同步、异步、异步回调。
同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。
解决生产者丢失消息的方案:
生产者使用异步回调方式 producer.send(msg, callback)
发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
然后,需要基于以下几点来保证 Kafka 生产者的可靠性:
生产者可选的确认模式有三种:acks=0
、acks=1
、acks=all
。
acks=0
、acks=1
都有丢失数据的风险。
acks=all
意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas
,就可以决定在得到确认响应前,至少有多少副本能够收到消息。
这是最保险的做法,但也会降低吞吐量。
如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。
LEADER_NOT_AVAILABLE
,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。INVALID_CONFIG
,即使重试,也无法改变配置选项,重试没有意义。需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。
设置 retries
为一个较大的值。这里的 retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
开发者需要自行处理的错误:
前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。
消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。
group.id
- 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
。auto.offset.reset
- 有两个选项:earliest
- 消费者会从分区的开始位置读取数据latest
- 消费者会从分区末尾位置读取数据enable.auto.commit
- 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。auto.commit.interval.ms
- 自动提交的频率,默认为每 5 秒提交一次。如果
enable.auto.commit
设为 true,即自动提交,就无需考虑提交偏移量的问题。
如果选择显示提交偏移量,需要考虑以下问题:
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性。
常用的实现幂等操作的方法:
关系型数据库可以使用 INSERT IF NOT EXIST
语句防止重复;Redis 可以使用 SETNX
命令来防止重复;其他数据库只要支持类似语义,也是一个道理。
如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。
某些场景下,可能会要求按序发送消息。
Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。
方案分析
优点:简单粗暴。开发者什么也不用做。
缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。
结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。
(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。
(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。
先修复消费者,然后停掉当前所有消费者。
新建 Topic,扩大分区,以提高并发处理能力。
创建临时消费者程序,并部署在多节点上,扩大消费处理能力。
最后处理完积压消息后,恢复原先部署架构。
建议从 3 个层面验证系统的可靠性:
error-rate
和 retry-rate
。如果这两个指标上升,说明系统出了问题。consumer-lag
,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。生产者
producer.send(msg)
,而要使用 producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send
方法。acks = all
。acks
是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。retries
为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。服务器(Kafka Broker)
unclean.leader.election.enable = false。
这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。replication.factor
>= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。min.insync.replicas
> 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。replication.factor
> min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
。消费者
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。shardingsphere-jdbc 定位为轻量级 Java 框架,在 Java 的 JDBC 层提供的额外服务。 它使用客户端直连数据库,以 jar 包形式提供服务,无需额外部署和依赖,可理解为增强版的 JDBC 驱动,完全兼容 JDBC 和各种 ORM 框架。
1 | <dependency> |
注意:请将 ${latest.release.version}
更改为实际的版本号。
ShardingSphere-JDBC 可以通过 Java
,YAML
,Spring 命名空间
和 Spring Boot Starter
这 4 种方式进行配置,开发者可根据场景选择适合的配置方式。 详情请参见配置手册。
通过 ShardingSphereDataSourceFactory
工厂和规则配置对象获取 ShardingSphereDataSource
。 该对象实现自 JDBC 的标准 DataSource 接口,可用于原生 JDBC 开发,或使用 JPA, MyBatis 等 ORM 类库。
1 | DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, configurations, properties); |
单一数据节点难于满足互联网的海量数据场景。
从性能方面来说,由于关系型数据库大多采用 B+ 树类型的索引,在数据量超过阈值的情况下,索引深度的增加也将使得磁盘访问的 IO 次数增加,进而导致查询性能的下降;同时,高并发访问请求也使得集中式数据库成为系统的最大瓶颈。
在传统的关系型数据库无法满足互联网场景需要的情况下,将数据存储至原生支持分布式的 NoSQL 的尝试越来越多。 但 NoSQL 对 SQL 的不兼容性以及生态圈的不完善,使得它们在与关系型数据库的博弈中始终无法完成致命一击,而关系型数据库的地位却依然不可撼动。
数据分片指按照某个维度将存放在单一数据库中的数据分散地存放至多个数据库或表中以达到提升性能瓶颈以及可用性的效果。数据分片的有效手段是对关系型数据库进行分库和分表。分库和分表均可以有效的避免由数据量超过可承受阈值而产生的查询瓶颈。 除此之外,分库还能够用于有效的分散对数据库单点的访问量;分表虽然无法缓解数据库压力,但却能够提供尽量将分布式事务转化为本地事务的可能,一旦涉及到跨库的更新操作,分布式事务往往会使问题变得复杂。 使用多主多从的分片方式,可以有效的避免数据单点,从而提升数据架构的可用性。
通过分库和分表进行数据的拆分来使得各个表的数据量保持在阈值以下,以及对流量进行疏导应对高访问量,是应对高并发和海量数据系统的有效手段。 数据分片的拆分方式又分为垂直分片和水平分片。
按照业务拆分的方式称为垂直分片,又称为纵向拆分,它的核心理念是专库专用。 在拆分之前,一个数据库由多个数据表构成,每个表对应着不同的业务。而拆分之后,则是按照业务将表进行归类,分布到不同的数据库中,从而将压力分散至不同的数据库。 下图展示了根据业务需要,将用户表和订单表垂直分片到不同的数据库的方案。
垂直分片往往需要对架构和设计进行调整。通常来讲,是来不及应对互联网业务需求快速变化的;而且,它也并无法真正的解决单点瓶颈。 垂直拆分可以缓解数据量和访问量带来的问题,但无法根治。如果垂直拆分之后,表中的数据量依然超过单节点所能承载的阈值,则需要水平分片来进一步处理。
水平分片又称为横向拆分。 相对于垂直分片,它不再将数据根据业务逻辑分类,而是通过某个字段(或某几个字段),根据某种规则将数据分散至多个库或表中,每个分片仅包含数据的一部分。 例如:根据主键分片,偶数主键的记录放入 0 库(或表),奇数主键的记录放入 1 库(或表),如下图所示。
水平分片从理论上突破了单机数据量处理的瓶颈,并且扩展相对自由,是分库分表的标准解决方案。
ShardingSphere 的 3 个产品的数据分片主要流程是完全一致的。 核心由 SQL 解析 => 执行器优化 => SQL 路由 => SQL 改写 => SQL 执行 => 结果归并
的流程组成。
解析过程分为词法解析和语法解析。 词法解析器用于将 SQL 拆解为不可再分的原子符号,称为 Token。并根据不同数据库方言所提供的字典,将其归类为关键字,表达式,字面量和操作符。 再使用语法解析器将 SQL 转换为抽象语法树。
例如,以下 SQL:
1 | SELECT id, name FROM t_user WHERE status = 'ACTIVE' AND age > 18 |
解析之后的为抽象语法树见下图。
为了便于理解,抽象语法树中的关键字的 Token 用绿色表示,变量的 Token 用红色表示,灰色表示需要进一步拆分。
最后,通过对抽象语法树的遍历去提炼分片所需的上下文,并标记有可能需要改写的位置。 供分片使用的解析上下文包含查询选择项(Select Items)、表信息(Table)、分片条件(Sharding Condition)、自增主键信息(Auto increment Primary Key)、排序信息(Order By)、分组信息(Group By)以及分页信息(Limit、Rownum、Top)。 SQL 的一次解析过程是不可逆的,一个个 Token 按 SQL 原本的顺序依次进行解析,性能很高。 考虑到各种数据库 SQL 方言的异同,在解析模块提供了各类数据库的 SQL 方言字典。
SQL 解析作为分库分表类产品的核心,其性能和兼容性是最重要的衡量指标。 ShardingSphere 的 SQL 解析器经历了 3 代产品的更新迭代。
第一代 SQL 解析器为了追求性能与快速实现,在 1.4.x 之前的版本使用 Druid 作为 SQL 解析器。经实际测试,它的性能远超其它解析器。
第二代 SQL 解析器从 1.5.x 版本开始,ShardingSphere 采用完全自研的 SQL 解析引擎。 由于目的不同,ShardingSphere 并不需要将 SQL 转为一颗完全的抽象语法树,也无需通过访问器模式进行二次遍历。它采用对 SQL 半理解
的方式,仅提炼数据分片需要关注的上下文,因此 SQL 解析的性能和兼容性得到了进一步的提高。
第三代 SQL 解析器则从 3.0.x 版本开始,ShardingSphere 尝试使用 ANTLR 作为 SQL 解析的引擎,并计划根据 DDL -> TCL -> DAL –> DCL -> DML –>DQL
这个顺序,依次替换原有的解析引擎,目前仍处于替换迭代中。 使用 ANTLR 的原因是希望 ShardingSphere 的解析引擎能够更好的对 SQL 进行兼容。对于复杂的表达式、递归、子查询等语句,虽然 ShardingSphere 的分片核心并不关注,但是会影响对于 SQL 理解的友好度。 经过实例测试,ANTLR 解析 SQL 的性能比自研的 SQL 解析引擎慢 3-10 倍左右。为了弥补这一差距,ShardingSphere 将使用 PreparedStatement
的 SQL 解析的语法树放入缓存。 因此建议采用 PreparedStatement
这种 SQL 预编译的方式提升性能。
第三代 SQL 解析引擎的整体结构划分如下图所示。