Dunwu Blog

大道至简,知易行难

RocketMQ FAQ

API 问题

connect to <172.17.0.1:10909> failed

启动后,Producer 客户端连接 RocketMQ 时报错:

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.1:10909> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:357)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:343)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:901)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:878)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:873)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:369)
at com.emrubik.uc.mdm.sync.utils.MdmInit.sendMessage(MdmInit.java:62)
at com.emrubik.uc.mdm.sync.utils.MdmInit.main(MdmInit.java:2149)

原因:RocketMQ 部署在虚拟机上,内网 ip 为 10.10.30.63,该虚拟机一个 docker0 网卡,ip 为 172.17.0.1。RocketMQ broker 启动时默认使用了 docker0 网卡,Producer 客户端无法连接 172.17.0.1,造成以上问题。

解决方案

(1)干掉 docker0 网卡或修改网卡名称

(2)停掉 broker,修改 broker 配置文件,重启 broker。

修改 conf/broker.conf,增加两行来指定启动 broker 的 IP:

1
2
namesrvAddr = 10.10.30.63:9876
brokerIP1 = 10.10.30.63

启动时需要指定配置文件

1
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

话术

职场“黑话”

二字动词

复盘,赋能,加持,沉淀,倒逼,落地,串联,协同,反哺,兼容,包装,重组,履约,响应,量化,布局,联动,细分,梳理,输出,加速,共建,支撑,融合,聚合,集成,对标,聚焦,抓手,拆解,抽象,摸索,提炼,打通,打透,吃透,迁移,分发,分装,辐射,围绕,复用,渗透,扩展,开拓,皮实,共创,共建,解耦,集成,对齐,拉齐,对焦,给到,拿到,死磕

三字名词

感知度,方法论,组合拳,引爆点,点线面,精细化,差异化,平台化,结构化,影响力,耦合性,便捷性,一致性,端到端,短平快,护城河,体验感,颗粒度

四字名词

生命周期,价值转化,强化认知,资源倾斜,完善逻辑,抽离透传,复用打法,商业模式,快速响应,定性定量,关键路径,去中心化,结果导向,垂直领域,归因分析,体验度量,信息屏障,资源整合

术语应用

比如原来你提问:这个问题你准备怎么解决?

现在的你可以说:你这个问题的底层逻辑是什么?顶层设计在哪?最终交付价值是什么?过程的抓手在哪?如何保证回答闭环?你比别人的亮点在哪?优势在哪?你的思考和沉淀是什么?这个问题换成我来问是否会不一样?在这之前,有自己的思考和沉淀吗?这些问题的颗粒度是怎样拆分的,能作为爆点,引发回答者对问题关键路径的探索吗?别人回答了,你能反哺赋能他们,共建团队意识生态吗?只会问而不会解决,你有自己独有的价值吗?

比如你之前只会一脸懵逼的看着他,愣着不敢说话,现在你可以这么回复他:

我们这款产品底层逻辑是打通信息屏障,创建行业新生态。顶层设计是聚焦用户感知赛道,通过差异化和颗粒度达到引爆点。交付价值是在垂直领域采用复用打法达成持久收益。抽离透传归因分析作为抓手为产品赋能,体验度量作为闭环的评判标准。亮点是载体,优势是链路。思考整个生命周期,完善逻辑考虑资源倾斜。方法论是组合拳达到平台化标准。

模板

yes and 法则

当别人提出一个观点,自己不认同时,我们往往会说,yes,but,这样别人会觉得“那就这样吧,你说什么就是什么吧”的想法,无法形成有效的沟通。

yes-and 的原则首先是要接纳,而不是全身都是刺,即当别人提出观点的时候还没仔细思考就先给予否定,尤其当对方不认可攻击你时,很多人立马想到的就是反击。

然后关键是这里的 and,这里的 and 并不是转折或反驳,而是并且或附加的内容,可以巧妙地避免意见不同甚至冲突,所以 yes-and 不仅仅是做事方式,也是一种特别好的沟通方式,让对方感受很舒服。

【示例】

男朋友特别喜欢爬山,有一天男朋友邀请女生去爬山,女生说了下面的一段话 “我不爱爬上,但是我特别好奇你到山顶后看到的风景,你能爬上去之后能给我拍几样张照片吗?专门为我拍的,然后你下来以后,我在哪个咖啡馆等你,你跟我讲讲此次的经历!”

可以听出来,这位女生并不爱爬山,也没有勉强自己去迎合新的男朋友,而是肯定了男朋友的爱好,并且在此基础上创造了新的情景来继续他们的交流!

PREP 模型

PREP 模型用于表达观点

PREP 四个英文字母分别代表:Point,观点;Reason,理由;Example,案例;Point,再次讲观点。这是最经典的表达结构。

整个 PREP 结构的关键是,开始就要讲出你的观点,点明主题;后面再举出理由来论证观点;案例部分,最好讲自己的经历或故事来解释,这样听众比较容易听懂;最后再重复和强调一下你的观点。

SCQ-A 模型

SCQ-A 模型 用于提出问题,请求帮助

  • situation:阐述背景
  • conflict:阐述冲突
  • question:为了解决冲突,你提出要解决的问题
  • answer:你的看法

【示例】

老板,最近竞争对手上任了新的 CEO,做了一系列措施,比如下调了产品的价格,增大推广和营销,导致现在我们的很多市场被对方蚕食了。

我们现在该如何调整应应对当前的情况,保持市场上的领导位置。

目前我的看法就是优化目前的营销渠道,全面包围对手。

FFC 赞美法则

所谓 FFC 赞美法,就是指在赞美人的时候,先用自己的语言来表达感受(Feel),然后再进一步通过陈述事实(Fact)来论证自己的感受,最后再通过比较(Compare),来加深对对方的认可,这样对方会感觉特别好。

让对方服从你行为的经典话术

这个话术一般来讲模板是这样的: 是..还是…/是否/要不要,xxx 好处是这样。

这里的关键是尽量不让用户思考,提供非 A 即 B 的选项,说某个选项的好处,从而让对方服从你。

比如麦当劳、肯德基有 3 个经典话术,进行快推式产品营销:

  • 您是否要加一包薯条,这样可以凑成一个套餐,节省 2 元?(实际上多消费 5 元)
  • 您要不要加 3 元把可乐换成大杯,可以多一半哦?
  • 您要不要加 10 元买个玩具给小朋友呢?

沟通中的万能表达模型- 观察+感受+需求+请求

  • 观察:即你观察的客观事实是什么?注意这里要是事实才行
  • 感受:通过观察之后,你心情感受是怎样的?
  • 需求:你内心希望要解决的问题是什么?
  • 请求:向对方请求你的需求需要得到满足

FFA 法则

  • Fact——事实
  • Feeling——感受
  • Action——行动

最近一段团队不少新人加入,整体运营效率下降了 30%。(事实)我感觉主要是业务知识传递有些跟不上。(感受)接下来的一周,我会安排老员工一对一辅导每位新人。(行动)

实战

别人求你办事,如果你说:“这事儿不太好办”,那么资源置换就来了。

不好办说明能办,但需要附加条件,懂的人自然知道接下来应该怎么办。

拒绝借钱:“你知道的,我最近 XX,也没钱。”

遇到借钱,只要你平时不太露富,就好用。

朋友管你借钱,用“你知道的”直接把皮球踢回去,再给个具体理由,比如买了什么东西、投入基金股票里了,随便什么都行。

只要把对方放在知道你没钱的位置上,他就不好再开下一句口,一般会主动结束对话。

表示体贴,但不想真的去接客户,先打电话沟通:“约的地方有点远,需要我去接您吗?”

绝大部分人遇到这样的问句,本能反应是不用,你的目的就达到了。

不过如果真遇到要你接的,这人多半有点矫情,以后相处要注意多恭维一下。

经典汇报话术 1:“老板,我们团队做了 A、B 两版方案,各有优势,您给提提意见,看选哪个好。”

该话术利用了沉锚效应,抛出了二者选其一的锚,避开全部拒绝的选项,引导领导“选一个”、“提意见”,减小被全部驳回的风险。

经典汇报话术 2:“领导,我是这么想的,XXX。第一,X;第二,X;第三,X。”

我们的大脑被训练得听到“第一、第二”就默认为其中有条理、有逻辑,不管其中是不是真的严丝合缝地支撑你的观点。善用一二三做汇报,领导会觉得你准备得很充分,考虑周全。

遇到问题请求领导帮忙:“经过了解,现在碰到了一些情况,我的解决办法是 XX,您看还有没有什么更好的办法。”

不要直接说自己解决不了,让领导想办法。不管自己提出的办法多平庸,都一定要提。

请求他人帮忙:“能请你帮我打印一下文件吗,因为我一会儿真有事。”、“不好意思,能插一下队吗,因为我真的着急。”

善用“因为所以”,“因为所以”是十几年语文教学留给我们的条件反射,不管多离奇的理由,听到的时候都会默认有道理。

聊天想聊下去:揪住对方句子里的关键词+延伸过往彼此交流过的信息。

例 1

朋友:“今天又加班,烦死了。”

你:“怎么又加班啊,又是上回让你加班的那个领导吗?”

例 2

闺蜜:“我爱豆塌房子了!”

你:“哪个爱豆?上回你说的 XXX?我去!”

聊天不想继续聊下去:重复关键词+感叹词。重点!不要扩展任何有效信息。

例 1

朋友:“今天又加班,烦死了”

你:“怎么又加班啊,唉,这叫啥事啊,我无语。”

例 2

闺蜜:“我爱豆又塌房子了!”

你:“又塌房子!我去,也太那个了,绝了!”

领导说:“辛苦了。”

你:“从中学到很多,很有收获。”

敏而好学、不居功,领导更喜欢这样的下属。

给领导的节日问候:尊称+感谢+互动+祝福

尊称放在前面,引起注意。互动要具体、细节,才有记忆点。

XX,过年好!
感谢您一直的关照,从您身上学到很多。

上次 XX,您说 XXX 我一直记得,受益匪浅。

又到新的一年,祝您和家人新年快乐!

改变一个人的想法:认同立场,替换观点。

无论任何人,观点不是不可改变的,但立场很难动摇。

比如你的预算交上去,被砍了很大一块,你能做的不是抱怨老板抠门,而是认同老板砍预算是为了控制成本,开公司是为了赚钱的,这就是老板该有的立场。

所以,如果你不想自己的预算被砍掉,只有你能向老板展示,你做的方案能为他赚更多钱,老板就不会不同意。

工作汇报

实情 话术
新项目玩砸了 进行了积极的试错,吸收了宝贵的经验。
数据不好看 有较大的增长空间
啥也没干 稳定发展
接下来,我依旧打算啥也不干 保持现有成绩,稳定成果
数据稍微好看一点 取得了较大增长

沟通

当你觉得对方特别啰嗦,又不好意思打断对方谈话

直接给对方说的话下定义,作评判。例如:“好的”、“那确实不错”、“的确是这样”、“嗯,你说的对” 等等。

对方会瞬间失去表达欲望。

当你接到你不想做的任务时

  • 我仔细看了一下这个需求,我这里可能存在 XXX XXX XXX 方面的短板
  • “领导,我仔细看了一下这个需求,我这里可能存在 XXX XXX XXX 方面的短板”
  • “想要推进这件事情的话,我可能需要 XXX XXX XXX 方面的支持”

好的可能:领导感觉你是认真经过调研,分析了可行性,确实你不太适合,他表示会再考虑考虑

坏的可能:你还是得做,但是这句话的意思已经很明确了:我可是给你说清楚了啊,这件事办砸了,你可不能赖我

当我想刺探什么秘密的时候,我会先说一个结论,然后看对方的反应

  • “我们公司下个月要发奖金你知道对吧,想好怎么花了么?”
    • “卧槽你这消息灵通啊”——真的要发
    • “你听谁说的”——不知道真假,但是可以继续测
  • 这个时候我会再补一句:“我能得到这个消息,说明肯定有人放风给我”
    • “胡扯,没有这个规划”——下个月不发奖金
    • “你每天花花肠子怎么这么多”——没有明确的回应这个问题,我的猜测 80%是真实的

临时要交什么汇报,或者做工作总结的时候,不要紧张

【你日常都在做什么】+【这件事的目的是什么】+【你在做这件事的时候有什么困难】+【怎么把这件事做的更好】

比如:

我最近在更新我的知乎账号,坚持回答问题,这是为了能够积累更多的粉丝数,获得更多的认同,也培养自己输出的习惯

但是我发现我的阅读量上去了,但是点赞量还没有起来

所以我决定在写到这一段的时候给读者老爷们磕个头求个赞,哐哐哐

想要说服别人的时候

注意两点:

  • 先肯定对方的想法

  • 尽量不要出现比较主观的用语,如:“我觉得”、“照我看”、“我认为”

【示例】我仔细听了你的诉求,很有道理,这个需求是可以理解的。但是,我们换个角度想一想:……。而且,我还听别人说:…..。所以,不妨折中一下:…..。你觉得这样如何呢?

当别人给你布置杂活的时候

我们要死扣细节,不停问细节:

  • “你说的这件事大概什么时候需要?”

  • “这个时间点具体到几点?”

  • “我是微信给你还是邮箱给你?”

  • “那需要我先给一个计划,你帮我看看合适不合适么?”

  • ……

一个杂活而已,你不停地追问细节,会极大地增加你们之间的沟通成本,让对方崩溃,从此再也不想给你安排杂活。

参考资料

《极客时间教程 - 职场求生攻略》笔记

学会如何工作,和学习技术同等重要

以利益为视角,以换位思考为手段。

优先级:工作中那么多事情,我要如何安排优先级?

基于工作性质安排优先级

工作可以划分为:

业务拓展:需求分析、设计、开发都属于这范畴。

安全问题:安全无小事。要高度重视安全问题。

线上问题:直接影响用户体验和权益。要第一优先级去处理。

基于合作安排优先级

事情如果没有明显的轻重缓急,优先做那些会阻塞别人工作的事情。

做事情本身的优先级

我们做事情的时候,如果能把其中的每一步都想清楚,理清依赖关系,安排得井井有条,这就已经事半功倍了。

沟通:邮件那么重要,你还在轻视邮件吗?

邮件的特性

  1. 异步交流:邮件是一种异步交流的方式,双方有足够的时间准备邮件内容。
  2. 无法修改:邮件内容无法修改,这是邮件可靠的基石。
  3. 方便扩散:邮件有邮件组,可以很方便地把相关人员加进来,并且保留邮件历史记录。

邮件是公司内部的合同

场景 1:设计确认(邮件的“确认”功能)

场景 2:优先级(邮件的“证据链”功能)

场景 3:大促(邮件的“沟通协调”功能)

场景 4:新业务接入(邮件的“防遗忘”功能)

场景 5:技术升级和 Bug 修复(邮件的“广而告之”功能)

邮件的魅力

我们都是普通人,普通人没有“背锅”的压力,就没有持久的把事情做好的动力。

沟通:程序员为什么应该爱上交流?

主观能动性:为什么程序员,需要发挥主观能动性?

责任的边界:程序员的职责范围仅仅只是被安排的任务吗?

RocketMQ 基本原理

原理

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

顺序消息

第一种模型

假如生产者产生了 2 条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,那么需要 M1 到达消费端被消费后,通知 S2,然后 S2 再将 M2 发送到消费端。

这个模型存在的问题是,如果 M1 和 M2 分别发送到两台 Server 上,就不能保证 M1 先达到 MQ 集群,也不能保证 M1 被先消费。换个角度看,如果 M2 先于 M1 达到 MQ 集群,甚至 M2 被消费后,M1 才达到消费端,这时消息也就乱序了,说明以上模型是不能保证消息的顺序的。

第二种模型

如何才能在 MQ 集群保证消息的顺序?一种简单的方式就是将 M1、M2 发送到同一个 Server 上:

这样可以保证 M1 先于 M2 到达 MQServer(生产者等待 M1 发送成功后再发送 M2),根据先达到先被消费的原则,M1 会先于 M2 被消费,这样就保证了消息的顺序。

这个模型也仅仅是理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送 M1 耗时大于发送 M2 的耗时,那么 M2 就仍将被先消费,仍然不能保证消息的顺序。即使 M1 和 M2 同时到达消费端,由于不清楚消费端 1 和消费端 2 的负载情况,仍然有可能出现 M2 先于 M1 被消费的情况。

如何解决这个问题?将 M1 和 M2 发往同一个消费者,且发送 M1 后,需要消费端响应成功后才能发送 M2。

这可能产生另外的问题:如果 M1 被发送到消费端后,消费端 1 没有响应,那是继续发送 M2 呢,还是重新发送 M1?一般为了保证消息一定被消费,肯定会选择重发 M1 到另外一个消费端 2,就如下图所示。

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端 1 没有响应 Server 时有两种情况,一种是 M1 确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费 M1 且已经发送响应消息,只是 MQ Server 端没有收到。如果是第二种情况,重发 M1,就会造成 M1 被重复消费。也就引入了我们要说的第二个问题,消息重复问题,这个后文会详细讲解。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简单处理。总结起来,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系。

这样的设计虽然简单易行,但也会存在一些很严重的问题,比如:

  1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
  2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

RocketMQ 的解决方案:通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1. 不关注乱序的应用实际大量存在
  2. 队列无序并不意味着消息无序

最后我们从源码角度分析 RocketMQ 怎么实现发送顺序消息。

RocketMQ 通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

1
2
3
4
5
6
7
8
9
10
11
// RocketMQ 通过 MessageQueueSelector 中实现的算法来确定消息发送到哪一个队列上
// RocketMQ 默认提供了两种 MessageQueueSelector 实现:随机/Hash
// 当然你可以根据业务实现自己的 MessageQueueSelector 来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

在获取到路由信息以后,会根据 MessageQueueSelector 实现的算法来选择一个队列,同一个 OrderId 获取到的肯定是同一个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
private SendResult send()  {
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根据我们的算法,选择一个发送队列
// 这里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}

消息重复

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  1. 消费端处理消息的业务逻辑保持幂等性。
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

第 1 条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。

第 2 条原理就是利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

第 1 条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。

第 2 条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RocketMQ 不解决消息重复的问题的原因。

RocketMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

事务消息

RocketMQ 除了支持普通消息,顺序消息,另外还支持事务消息。

假设这样的场景:

img

图中执行本地事务(Bob 账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

img

RocketMQ 分布式事务步骤:

发送 Prepared 消息 2222222222222222222,并拿到接受消息的地址。
执行本地事务
通过第 1 步骤拿到的地址去访问消息,并修改消息状态。

参考资料

《Kafka 核心源码解读》笔记

开篇词

从功能上讲,Kafka 源码分为四大模块。

  • 服务器端源码:实现 Kafka 架构和各类优秀特性的基础。
  • Java 客户端源码:定义了与 Broker 端的交互机制,以及通用的 Broker 端组件支撑代码。
  • Connect 源码:用于实现 Kafka 与外部系统的高性能数据传输。
  • Streams 源码:用于实现实时的流处理功能。

导读

构建 Kafka 工程和源码阅读环境、Scala 语言热身

kafka 项目主要目录结构

  • bin 目录:保存 Kafka 工具行脚本,我们熟知的 kafka-server-start 和 kafka-consoleproducer 等脚本都存放在这里。
  • clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。
  • config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。
  • connect 目录:保存 Connect 组件的源代码。我在开篇词里提到过,Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。
  • core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。
  • streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。

日志段

保存消息文件的对象是怎么实现的?

Kafka 日志结构

Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)位移索引文件(.index)时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。

一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组 .log.index.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

日志段源码解析

日志段源码位于 Kafka 的 core 工程的 LogSegment.scala 中。该文件下定义了三个 Scala 对象:

  • LogSegment class:日志段类
  • LogSegment object:保存静态变量或静态方法。相当于 LogSegment class 的工具类。
  • LogFlushStats object:尾部有个 stats,用于统计,负责为日志落盘进行计时。

LogSegment class 声明

1
2
3
4
5
6
7
8
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { ... }

参数说明:

  • log包含日志条目的文件记录FileRecords 就是实际保存 Kafka 消息的对象。
  • lazyOffsetIndex偏移量索引
  • lazyTimeIndex时间戳索引
  • txnIndex事务索引
  • baseOffset此段中偏移量的下限。事实上,在磁盘上看到的 Kafka 文件名就是 baseOffset 的值。每个 LogSegment 对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。
  • indexIntervalBytes索引中条目之间的近似字节数。indexIntervalBytes 值其实就是 Broker 端参数 log.index.interval.bytes 值,它控制了日志段对象新增索引项的频率。默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项。
  • rollJitterMs日志段对象新增倒计时的“扰动值”。因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。
  • time:**Timer 实例**。

append 方法

append 方法接收 4 个参数:分别表示

  • largestOffset:待写入消息批次中消息的最大位移值
  • largestTimestamp:最大时间戳
  • shallowOffsetOfMaxTimestamp:最大时间戳对应消息的位移
  • records:真正要写入的消息集合

  • 第一步:在源码中,首先调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
  • 第二步:代码调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset 的值是不是 介于 [0,Int.MAXVALUE] 之间。在极个别的情况下,这个差值可能会越界,这时, append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。
  • 第三步:待这些做完之后,append 方法调用 FileRecordsappend 方法执行真正的写入。它的工作是将内存中的消息对象写入到操作系统的页缓存就可以了。
  • 第四步:再下一步,就是更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。虽然后面我会详细介绍,这里我还是稍微提一下:时间戳索引项保存时间戳与消息位移的对应关系。在这步操作中,Kafka 会更新并保存这组对应关系。
  • 第五步:append 方法的最后一步就是更新索引项和写入的字节数了。我在前面说过,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。

read 方法

read 方法作用:从第一个偏移量 >= startOffset 的 Segment 开始读取消息集。如果指定了 maxOffset,则消息集将包含不超过 maxSize 字节,并将在 maxOffset 之前结束。

read 方法入参

  • startOffset:要读取的第一条消息的位移;
  • maxSize:能读取的最大字节数;
  • maxPosition :能读到的最大文件位置;
  • minOneMessage:是否允许在消息体过大时至少返回第一条消息。

read 方法代码逻辑:

  1. 调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。输入参数 startOffset 仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。
  2. 待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSizemaxPosition 参数共同计算要读取的总字节数。举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。
  3. 调用 FileRecordsslice 方法,从指定位置读取指定大小的消息集合。

recover 方法

recover 开始时,代码依次调用索引对象的 reset 方法清空所有的索引文件,之后会开始遍历日志段中的所有消息集合或消息批次(RecordBatch)。对于读取到的每个消息集合,日志段必须要确保它们是合法的,这主要体现在两个方面:

  • 该集合中的消息必须要符合 Kafka 定义的二进制格式;
  • 该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。

校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。同样,这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数,并根据该值有条件地写入索引项。最后是更新事务型 Producer 的状态以及 Leader Epoch 缓存。不过,这两个并不是理解 Kafka 日志结构所必需的组件,因此,我们可以忽略它们。

遍历执行完成后,Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。同时, Kafka 还必须相应地调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。

日志

日志是日志段的容器,里面定义了很多管理日志段的操作。

Log 源码结构

  • LogAppendInfo(C):保存消息元数据信息
  • LogAppendInfo(O):LogAppendInfo(C)工厂方法类
  • UnifiedLog(C):UnifiedLog.scala 中最核心的代码
  • UnifiedLog(O):UnifiedLog(C)工厂方法类
  • RollParams(C):用于控制日志段是否切分(Roll)的数据结构。
  • RollParams(O):RollParams 伴生类的工厂方法。
  • LogMetricNames(O):定义了 Log 对象的监控指标。
  • LogOffsetSnapshot(C):封装分区所有位移元数据的容器类。
  • LogReadInfo(C):封装读取日志返回的数据及其元数据。
  • CompletedTxn(C):记录已完成事务的元数据,主要用于构建事务索引。

Log Class & Object

Log Object 作用:

  • 定义了 Kafka 支持的文件类型
    • .log:Kafka 日志文件
    • .index:Kafka 偏移量索引文件
    • .timeindex:Kafka 时间戳索引文件
    • .txnindex:Kafka 事务索引文件
    • .snapshot:Kafka 为幂等型或事务型 Producer 所做的快照文件
    • .deleted:被标记为待删除的文件
    • .cleaned:用于日志清理的临时文件
    • .swap:将文件交换到日志中时使用的临时文件
    • -delete:被标记为待删除的目录
    • -future:用于变更主题分区文件夹地址的目录
  • 定义了多种工具类方法

UnifiedLog Class 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// UnifiedLog 定义
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { ... }

// LocalLog 定义
class LocalLog(@volatile private var _dir: File,
@volatile private[log] var config: LogConfig,
private[log] val segments: LogSegments,
@volatile private[log] var recoveryPoint: Long,
@volatile private var nextOffsetMetadata: LogOffsetMetadata,
private[log] val scheduler: Scheduler,
private[log] val time: Time,
private[log] val topicPartition: TopicPartition,
private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { ... }

上面属性中最重要的两个属性是:_dirlogStartOffset_dir 就是这个日志所在的文件夹路径,也就是主题分区的路径。logStartOffset,表示日志的当前最早位移。_dirlogStartOffset 都是 volatile var 类型,表示它们的值是变动的,而且可能被多个线程更新。

Log End Offset(LEO),是表示日志下一条待插入消息的位移值,而这个 Log Start Offset 是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。

图中绿色的位移值 3 是日志的 Log Start Offset,而位移值 15 表示 LEO。另外,位移值 8 是高水位值,它是区分已提交消息和未提交消息的分水岭。

有意思的是,Log End Offset 可以简称为 LEO,但 Log Start Offset 却不能简称为 LSO。因为在 Kafka 中,LSO 特指 Log Stable Offset,属于 Kafka 事务的概念。

Log 类的其他属性你暂时不用理会,因为它们要么是很明显的工具类属性,比如 timer 和 scheduler,要么是高阶用法才会用到的高级属性,比如 producerStateManager 和 logDirFailureChannel。工具类的代码大多是做辅助用的,跳过它们也不妨碍我们理解 Kafka 的核心功能;而高阶功能代码设计复杂,学习成本高,性价比不高。

其他一些重要属性:

  • nextOffsetMetadata:它封装了下一条待插入消息的位移值,你基本上可以把这个属性和 LEO 等同起来。
  • highWatermarkMetadata:是分区日志高水位值。
  • segments:我认为这是 Log 类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。
  • Leader Epoch Cache 对象。Leader Epoch 是社区于 0.11.0.0 版本引入源码中的,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的 Leader Epoch Cache 是一个缓存类数据,里面保存了分区 Leader 的 Epoch 值与对应位移值的映射关系,我建议你查看下 LeaderEpochFileCache 类,深入地了解下它的实现原理。

LOG 类初始化逻辑

Log 的常见操作

Log 的常见操作可以分为 4 类:

  • 高水位管理操作:高水位的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一。
  • 日志段管理:Log 是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。
  • 关键位移值管理:日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等。确保这些位移值的正确性,是构建消息引擎一致性的基础。
  • 读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。

高水位管理操作

高水位定义:

1
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

高水位值是 volatile(易变型)的。因为多个线程可能同时读取它,因此需要设置成 volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用 Java Monitor 锁来确保并发修改的线程安全。

高水位值的初始值是 Log Start Offset 值。上节课我们提到,每个 Log 对象都会维护一个 Log Start Offset 值。当首次构建高水位时,它会被赋值成 Log Start Offset 值。

LogOffsetMetadata 定义

1
2
3
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

三个参数:

  • messageOffset:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。
  • segmentBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
  • relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想,Kafka 什么时候需要计算位置之间的字节数呢?答案就是在读取日志的时候。假设每次读取时只能读 1MB 的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了 1MB。
获取和设置高水位值
1
2
3
4
5
6
7
8
9
10
11
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
throw new IllegalArgumentException("High watermark offset should be non-negative")

lock synchronized { // 保护Log对象修改的Monitor锁
highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制
}
trace(s"Setting high watermark $newHighWatermark")
}
更新高水位值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def updateHighWatermark(hw: Long): Long = {
// 新高水位值一定介于[Log Start Offset,Log End Offset]之间
val newHighWatermark = if (hw < logStartOffset)
logStartOffset
else if (hw > logEndOffset)
logEndOffset
else
hw
// 设置高水位值
updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
// 最后返回新高水位值
newHighWatermark
}

def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
// 新高水位值不能越过Log End Offset
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")

lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值

// 保证高水位单调递增。当新的偏移元数据位于较新的段上时,我们还会更新高水位线,每当日志滚动到新段时就会发生这种情况。
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark) // 返回老的高水位值
} else {
None
}
}
}

这两个方法有着不同的用途。updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值;而 maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。

读取高水位值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭

val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量
if (offsetMetadata.messageOffsetOnly) { // 没有获得到完整的高水位元数据
lock.synchronized {
// 给定消息偏移量,在日志中找到其对应的偏移量元数据。如果消息偏移量超出范围,则抛出异常
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
fullOffset
}
} else {
offsetMetadata
}
}

日志段管理

添加

1
2
@threadsafe
def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
deleteSegments(deletable)
}
}

修改

源码里面不涉及修改日志段对象,所谓的修改或更新也就是替换而已,用新的日志段对象替换老的日志段对象。举个简单的例子。segments.put(1L, newSegment) 语句在没有 Key=1 时是添加日志段,否则就是替换已有日志段。

查询

主要都是利用了 ConcurrentSkipListMap 的现成方法。

  • segments.firstEntry:获取第一个日志段对象;
  • segments.lastEntry:获取最后一个日志段对象,即 Active Segment;
  • segments.higherEntry:获取第一个起始位移值 ≥ 给定 Key 值的日志段对象;
  • segments.floorEntry:获取最后一个起始位移值 ≤ 给定 Key 值的日志段对象。

关键位移值管理

Log 对象维护了一些关键位移值数据,比如 Log Start Offset、LEO 等。

Log 对象中的 LEO 永远指向下一条待插入消息,也就是说,LEO 值上面是没有消息的

1
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

Log End Offset 对象被更新的时机:

  • 对象初始化时:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。
  • 写入新消息时:这个最容易理解。以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。
  • Log 对象发生日志切分(Log Roll)时:日志切分是啥呢?其实就是创建一个全新的日志段对象,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。
  • 日志截断(Log Truncation)时:这个也是显而易见的。日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。

Log Start Offset 被更新的时机:

  • Log 对象初始化时:和 LEO 类似,Log 对象初始化时要给 Log Start Offset 赋值,一般是将第一个日志段的起始位移值赋值给它。
  • 日志截断时:同理,一旦日志中的部分消息被删除,可能会导致 Log Start Offset 发生变化,因此有必要更新该值。
  • Follower 副本同步时:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。
  • 删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 LogStart Offset 值的变化。
  • 删除消息时:严格来说,这个更新时机有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。

读写操作

写操作流程:

读操作

read 方法中有 4 个参数:

  • startOffset,即从 Log 对象的哪个位移值开始读消息。
  • maxLength,即最多能读取多少字节。
  • isolation,设置读取隔离级别,主要控制能够读取的最大位移值,多用于 Kafka 事务。
  • minOneMessage,即是否允许至少读一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。

索引

索引类图及源文件组织架构

  • AbstractIndex.scala:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。
  • LazyIndex.scala:它定义了 AbstractIndex 上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能。
  • OffsetIndex.scala:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”对。
  • TimeIndex.scala:定义时间戳索引,保存“< 时间戳,位移值 >”对。
  • TransactionIndex.scala:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。

AbstractIndex 代码结构

  • 索引文件(file)。每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了,这个字段是 var 型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?是的。自 1.1.0 版本之后,Kafka 允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。
  • 起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。
  • 索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka 源码传入该参数的值是 Broker 端参数 segment.index.bytes 的值,即 10MB。这就是在默认情况下,所有 Kafka 索引文件大小都是 10MB 的原因。
  • 索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。

位移索引

位移索引也就是所谓的 OffsetIndex。Key 就是消息的相对位移,Value 是保存该消息的日志段文件中该消息第一个字节的物理文件位置。

参考资料

《极客时间教程 - 分布式协议与算法实战》笔记

拜占庭将军问题

拜占庭将军问题是由莱斯利·兰波特在其同名论文中提出的分布式对等网络通信容错问题。其实是借拜占庭将军的例子,抛出了分布式共识性问题,并探讨和论证了解决的方法。

分布式计算中,不同的节点通过通讯交换信息达成共识而按照同一套协作策略行动。但有时候,系统中的节点可能出错而发送错误的信息,用于传递信息的通讯网络也可能导致信息损坏,使得网络中不同的成员关于全体协作的策略得出不同结论,从而破坏系统一致性。拜占庭将军问题被认为是容错性问题中最难的问题类型之一。

问题描述

一群拜占庭将军各领一支军队共同围困一座城市。

为了简化问题,军队的行动策略只有两种:进攻(Attack,后面简称 A)或 撤退(Retreat,后面简称 R)。如果这些军队不是统一进攻或撤退,就可能因兵力不足导致失败。因此,将军们通过投票来达成一致策略:同进或同退

因为将军们分别在城市的不同方位,所以他们只能通过信使互相联系。在投票过程中,每位将军都将自己的投票信息(A 或 R)通知其他所有将军,这样一来每位将军根据自己的投票和其他所有将军送来的信息就可以分析出共同的投票结果而决定行动策略。

这个抽象模型的问题在于:将军中可能存在叛徒,他们不仅会发出误导性投票,还可能选择性地发送投票信息

由于将军之间需要通过信使通讯,叛变将军可能通过伪造信件来以其他将军的身份发送假投票。而即使在保证所有将军忠诚的情况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等情况。因此很难通过保证人员可靠性及通讯可靠性来解决问题。

假使那些忠诚(或是没有出错)的将军仍然能通过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。

上述的故事可以映射到分布式系统中,_将军代表分布式系统中的节点;信使代表通信系统;叛徒代表故障或异常_。

img

问题分析

兰伯特针对拜占庭将军问题,给出了两个解决方案:口头协议和书面协议。

本文介绍一下口头协议。

在口头协议中,拜占庭将军问题被简化为将军 - 副官模型,其核心规则如下:

  • 忠诚的副官遵守同一命令。
  • 若将军是忠诚的,所有忠诚的副官都执行他的命令。
  • 如果叛徒人数为 m,将军人数不能少于 3m + 1 ,那么拜占庭将军问题就能解决了。——关于这个公式,可以不必深究,如果对推导过程感兴趣,可以参考论文。

示例一、叛徒人数为 1,将军人数为 3

img

这个示例中,将军人数不满足 3m + 1,无法保证忠诚的副官都执行将军的命令。

示例二、叛徒人数为 1,将军人数为 4

img

这个示例中,将军人数满足 3m + 1,无论是副官中有叛徒,还是将军是叛徒,都能保证忠诚的副官执行将军的命令。

CAP 理论

CAP 是指:在一个分布式系统中, 一致性、可用性和分区容忍性,最多只能同时满足其中两项。

  • 一致性(C:Consistency):多个数据副本是否能保持一致
  • 可用性(A:Availability):分布式系统在面对各种异常时可以提供正常服务的能力
  • 分区容忍性(P:Partition Tolerance):分布式系统在遇到任何网络分区故障的时候,仍然需要能对外提供一致性和可用性的服务,除非是整个网络环境都发生了故障

CAP 权衡

在分布式系统中,分区容忍性必不可少,因为需要总是假设网络是不可靠的;CAP 理论实际在是要在可用性和一致性之间做权衡。

  • CP:需要让所有节点下线成为不可用的状态,等待同步完成。
  • AP:在同步过程中允许读取所有节点的数据,但是数据可能不一致。

ACID 理论

ACID 特性:

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚。
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

img

在分布式系统中实现 ACID 比单机复杂的多。

在分布式系统中实现 ACID,即实现分布式事务,具体的方案有如下几种:

  • 两阶段提交(2PC)
  • 三阶段提交(3PC)
  • 补偿事务(TCC)
  • 本地消息表(异步确保)
  • MQ 事务消息
  • Sagas 事务模型

BASE 理论

BASE 理论是对 CAP 中一致性和可用性权衡的结果。

BASE 是指:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

BASE 特性

  • 基本可用(Basically Available):指分布式系统在出现故障的时候,保证核心可用,允许损失部分可用性。
  • 软状态(Soft State):指允许系统中的数据存在中间状态,并认为该中间状态不会影响系统整体可用性,即允许系统不同节点的数据副本之间进行同步的过程存在延时。
  • 最终一致性(Eventually Consistent):最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能达到一致的状态。

img

Paxos 算法

Paxos 是 Leslie Lamport 于 1990 年提出的一种基于消息传递且具有高度容错特性的共识(consensus)算法。

Paxos 算法包含 2 个部分:

  • Basic Paxos 算法:描述的多节点之间如何就某个值达成共识。
  • Multi Paxos 思想:描述的是执行多个 Basic Paxos 实例,就一系列值达成共识。

Paxos 算法解决的问题正是分布式共识性问题,即一个分布式系统中的各个进程如何就某个值(决议)达成一致。

Paxos 算法运行在允许宕机故障的异步系统中,不要求可靠的消息传递,可容忍消息丢失、延迟、乱序以及重复。它利用大多数 (Majority) 机制保证了 2N+1 的容错能力,即 2N+1 个节点的系统最多允许 N 个节点同时出现故障。

Basic Paxos 算法

角色

img

  • 提议者(Proposer):发出提案(Proposal),用于投票表决。Proposal 信息包括提案编号 (Proposal ID) 和提议的值 (Value)。在绝大多数场景中,集群中收到客户端请求的节点,才是提议者。这样做的好处是,对业务代码没有入侵性,也就是说,我们不需要在业务代码中实现算法逻辑。
  • 决策者(Acceptor):对每个 Proposal 进行投票,若 Proposal 获得多数 Acceptor 的接受,则称该 Proposal 被批准。一般来说,集群中的所有节点都在扮演决策者的角色,参与共识协商,并接受和存储数据。
  • 学习者(Learner):不参与决策,从 Proposers/Acceptors 学习、记录最新达成共识的提案(Value)。一般来说,学习者是数据备份节点,比如主从架构中的从节点,被动地接受数据,容灾备份。

在多副本状态机中,每个副本都同时具有 Proposer、Acceptor、Learner 三种角色。

这三种角色,在本质上代表的是三种功能:

  • 提议者代表的是接入和协调功能,收到客户端请求后,发起二阶段提交,进行共识协商;
  • 接受者代表投票协商和存储数据,对提议的值进行投票,并接受达成共识的值,存储保存;
  • 学习者代表存储数据,不参与共识协商,只接受达成共识的值,存储保存。

算法

Paxos 算法通过一个决议分为两个阶段(Learn 阶段之前决议已经形成):

  1. Prepare 阶段:Proposer 向 Acceptors 发出 Prepare 请求,Acceptors 针对收到的 Prepare 请求进行 Promise 承诺。
  2. Accept 阶段:Proposer 收到多数 Acceptors 承诺的 Promise 后,向 Acceptors 发出 Propose 请求,Acceptors 针对收到的 Propose 请求进行 Accept 处理。
  3. Learn 阶段:Proposer 在收到多数 Acceptors 的 Accept 之后,标志着本次 Accept 成功,决议形成,将形成的决议发送给所有 Learners。

Paxos 算法流程中的每条消息描述如下:

  • Prepare: Proposer 生成全局唯一且递增的 Proposal ID (可使用时间戳加 Server ID),向所有 Acceptors 发送 Prepare 请求,这里无需携带提案内容,只携带 Proposal ID 即可。

  • Promise: Acceptors 收到 Prepare 请求后,做出“两个承诺,一个应答”。

    • 两个承诺:

      • 不再接受 Proposal ID 小于等于当前请求的 Prepare 请求。
      • 不再接受 Proposal ID 小于当前请求的 Propose 请求。
    • 一个应答:

      • 不违背以前作出的承诺下,回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID,没有则返回空值。
  • Propose: Proposer 收到多数 Acceptors 的 Promise 应答后,从应答中选择 Proposal ID 最大的提案的 Value,作为本次要发起的提案。如果所有应答的提案 Value 均为空值,则可以自己随意决定提案 Value。然后携带当前 Proposal ID,向所有 Acceptors 发送 Propose 请求。

  • Accept: Acceptor 收到 Propose 请求后,在不违背自己之前作出的承诺下,接受并持久化当前 Proposal ID 和提案 Value。

  • Learn: Proposer 收到多数 Acceptors 的 Accept 后,决议形成,将形成的决议发送给所有 Learners。

Multi Paxos 思想

Basic Paxos 的问题

Basic Paxos 有以下问题,导致它不能应用于实际:

  • Basic Paxos 算法只能对一个值形成决议
  • Basic Paxos 算法会消耗大量网络带宽。Basic Paxos 中,决议的形成至少需要两次网络通信,在高并发情况下可能需要更多的网络通信,极端情况下甚至可能形成活锁。如果想连续确定多个值,Basic Paxos 搞不定了。

Multi Paxos 的改进

Multi Paxos 正是为解决以上问题而提出。Multi Paxos 基于 Basic Paxos 做了两点改进:

  • 针对每一个要确定的值,运行一次 Paxos 算法实例(Instance),形成决议。每一个 Paxos 实例使用唯一的 Instance ID 标识。
  • 在所有 Proposer 中选举一个 Leader,由 Leader 唯一地提交 Proposal 给 Acceptor 进行表决。这样没有 Proposer 竞争,解决了活锁问题。在系统中仅有一个 Leader 进行 Value 提交的情况下,Prepare 阶段就可以跳过,从而将两阶段变为一阶段,提高效率。

Multi Paxos 首先需要选举 Leader,Leader 的确定也是一次决议的形成,所以可执行一次 Basic Paxos 实例来选举出一个 Leader。选出 Leader 之后只能由 Leader 提交 Proposal,在 Leader 宕机之后服务临时不可用,需要重新选举 Leader 继续服务。在系统中仅有一个 Leader 进行 Proposal 提交的情况下,Prepare 阶段可以跳过。

Multi Paxos 通过改变 Prepare 阶段的作用范围至后面 Leader 提交的所有实例,从而使得 Leader 的连续提交只需要执行一次 Prepare 阶段,后续只需要执行 Accept 阶段,将两阶段变为一阶段,提高了效率。为了区分连续提交的多个实例,每个实例使用一个 Instance ID 标识,Instance ID 由 Leader 本地递增生成即可。

Multi Paxos 允许有多个自认为是 Leader 的节点并发提交 Proposal 而不影响其安全性,这样的场景即退化为 Basic Paxos。

Chubby 和 Boxwood 均使用 Multi Paxos。ZooKeeper 使用的 Zab 也是 Multi Paxos 的变形。

Raft 算法

Raft 基础

Raft 将一致性问题分解成了三个子问题:

  • 选举 Leader
  • 日志复制
  • 安全性

服务器角色

在 Raft 中,任何时刻,每个服务器都处于这三个角色之一 :

  • Leader - 领导者,通常一个系统中是一主(Leader)多从(Follower)。Leader 负责处理所有的客户端请求
  • Follower - 跟随者,不会发送任何请求,只是简单的 响应来自 Leader 或者 Candidate 的请求
  • Candidate - 参选者,选举新 Leader 时的临时角色。

img

:bulb: 图示说明:

  • Follower 只响应来自其他服务器的请求。在一定时限内,如果 Follower 接收不到消息,就会转变成 Candidate,并发起选举。
  • Candidate 向 Follower 发起投票请求,如果获得集群中半数以上的选票,就会转变为 Leader。
  • 在一个 Term 内,Leader 始终保持不变,直到下线了。Leader 需要周期性向所有 Follower 发送心跳消息,以阻止 Follower 转变为 Candidate。

任期

img

Raft 把时间分割成任意长度的 任期(Term),任期用连续的整数标记。每一段任期从一次选举开始。Raft 保证了在一个给定的任期内,最多只有一个领导者

  • 如果选举成功,Leader 会管理整个集群直到任期结束。
  • 如果选举失败,那么这个任期就会因为没有 Leader 而结束。

不同服务器节点观察到的任期转换状态可能不一样

  • 服务器节点可能观察到多次的任期转换。
  • 服务器节点也可能观察不到任何一次任期转换。

任期在 Raft 算法中充当逻辑时钟的作用,使得服务器节点可以查明一些过期的信息(比如过期的 Leader)。每个服务器节点都会存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号。

  • 如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。
  • 如果一个 Candidate 或者 Leader 发现自己的任期号过期了,那么他会立即恢复成跟随者状态。
  • 如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

RPC

Raft 算法中服务器节点之间的通信使用 **_远程过程调用(RPC)_**。

基本的一致性算法只需要两种 RPC:

  • RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
  • AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

选举 Leader

选举规则

Raft 使用一种心跳机制来触发 Leader 选举

Leader 需要周期性的向所有 Follower 发送心跳消息,以此维持自己的权威并阻止新 Leader 的产生。

每个 Follower 都设置了一个随机的竞选超时时间,一般为 150ms ~ 300ms,如果在竞选超时时间内没有收到 Leader 的心跳消息,就会认为当前 Term 没有可用的 Leader,并发起选举来选出新的 Leader。开始一次选举过程,Follower 先要增加自己的当前 Term 号,并转换为 Candidate

Candidate 会并行的向集群中的所有服务器节点发送投票请求(RequestVote RPC,它会保持当前状态直到以下三件事情之一发生:

  • 自己成为 Leader
  • 其他的服务器成为 Leader
  • 没有任何服务器成为 Leader
自己成为 Leader
  • 当一个 Candidate 从整个集群半数以上的服务器节点获得了针对同一个 Term 的选票,那么它就赢得了这次选举并成为 Leader。每个服务器最多会对一个 Term 投出一张选票,按照先来先服务(FIFO)的原则。_要求半数以上选票的规则确保了最多只会有一个 Candidate 赢得此次选举_。
  • 一旦 Candidate 赢得选举,就立即成为 Leader。然后它会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。
其他的服务器成为 Leader

等待投票期间,Candidate 可能会从其他的服务器接收到声明它是 Leader 的 AppendEntries RPC

  • 如果这个 Leader 的 Term 号(包含在此次的 RPC 中)不小于 Candidate 当前的 Term,那么 Candidate 会承认 Leader 合法并回到 Follower 状态。
  • 如果此次 RPC 中的 Term 号比自己小,那么 Candidate 就会拒绝这个消息并继续保持 Candidate 状态。
没有任何服务器成为 Leader

如果有多个 Follower 同时成为 Candidate,那么选票可能会被瓜分以至于没有 Candidate 可以赢得半数以上的投票。当这种情况发生的时候,每一个 Candidate 都会竞选超时,然后通过增加当前 Term 号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,竞选超时时间是一个随机的时间,在一个固定的区间(例如 150-300 毫秒)随机选择,这样可以把选举都分散开。

  • 以至于在大多数情况下,只有一个服务器会超时,然后它赢得选举,成为 Leader,并在其他服务器超时之前发送心跳包。
  • 同样的机制也被用在选票瓜分的情况下:每一个 Candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。

理解了上面的选举规则后,我们通过动图来加深认识。

日志复制

日志格式

日志由含日志索引(log index)的日志条目(log entry)组成。每个日志条目包含它被创建时的 Term 号(下图中方框中的数字),和一个复制状态机需要执行的指令。如果一个日志条目被复制到半数以上的服务器上,就被认为可以提交(Commit)了。

  • 日志条目中的 Term 号被用来检查是否出现不一致的情况。
  • 日志条目中的日志索引(一个整数值)用来表明它在日志中的位置。

img

Raft 日志同步保证如下两点:

  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们所存储的命令是相同的
    • 这个特性基于这条原则:Leader 最多在一个 Term 内、在指定的一个日志索引上创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。
  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们之前的所有条目都是完全一样的
    • 这个特性由 AppendEntries RPC 的一个简单的一致性检查所保证。在发送 AppendEntries RPC 时,Leader 会把新日志条目之前的日志条目的日志索引和 Term 号一起发送。如果 Follower 在它的日志中找不到包含相同日志索引和 Term 号的日志条目,它就会拒绝接收新的日志条目。

日志复制流程

img

  1. Leader 负责处理所有客户端的请求。
  2. Leader 把请求作为日志条目加入到它的日志中,然后并行的向其他服务器发送 AppendEntries RPC 请求,要求 Follower 复制日志条目。
  3. Follower 复制成功后,返回确认消息。
  4. 当这个日志条目被半数以上的服务器复制后,Leader 提交这个日志条目到它的复制状态机,并向客户端返回执行结果。

注意:如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断的重复尝试发送 AppendEntries RPC 请求 (尽管已经回复了客户端),直到所有的跟随者都最终复制了所有的日志条目。

日志一致性

一般情况下,Leader 和 Followers 的日志保持一致,因此日志条目一致性检查通常不会失败。然而,Leader 崩溃可能会导致日志不一致:旧的 Leader 可能没有完全复制完日志中的所有条目。

Leader 和 Follower 日志不一致的可能

Leader 和 Follower 可能存在多种日志不一致的可能。

img

:bulb: 图示说明:

上图阐述了 Leader 和 Follower 可能存在多种日志不一致的可能,每一个方框表示一个日志条目,里面的数字表示任期号 。

当一个 Leader 成功当选时,Follower 可能出现以下情况(a-f):

  • 存在未更新日志条目,如(a、b)。
  • 存在未提交日志条目,如(c、d)。
  • 两种情况都存在,如(e、f)。

_例如,场景 f 可能会这样发生,某服务器在 Term2 的时候是 Leader,已附加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在 Term3 重新被选为 Leader,并且又增加了一些日志条目到自己的日志中;在 Term 2 和 Term 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态_。

Leader 和 Follower 日志一致的保证

Leader 通过强制 Followers 复制它的日志来处理日志的不一致,Followers 上的不一致的日志会被 Leader 的日志覆盖

  • Leader 为了使 Followers 的日志同自己的一致,Leader 需要找到 Followers 同它的日志一致的地方,然后覆盖 Followers 在该位置之后的条目。
  • Leader 会从后往前试,每次日志条目失败后尝试前一个日志条目,直到成功找到每个 Follower 的日志一致位点,然后向后逐条覆盖 Followers 在该位置之后的条目。

安全性

前面描述了 Raft 算法是如何选举 Leader 和复制日志的。

Raft 还增加了一些限制来完善 Raft 算法,以保证安全性:保证了任意 Leader 对于给定的 Term,都拥有了之前 Term 的所有被提交的日志条目。

选举限制

拥有最新的已提交的日志条目的 Follower 才有资格成为 Leader。

Raft 使用投票的方式来阻止一个 Candidate 赢得选举除非这个 Candidate 包含了所有已经提交的日志条目。 Candidate 为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。

RequestVote RPC 实现了这样的限制:RequestVote RPC 中包含了 Candidate 的日志信息, Follower 会拒绝掉那些日志没有自己新的投票请求

如何判断哪个日志条目比较新?

Raft 通过比较两份日志中最后一条日志条目的日志索引和 Term 来判断哪个日志比较新。

  • 先判断 Term,哪个数值大即代表哪个日志比较新。
  • 如果 Term 相同,再比较 日志索引,哪个数值大即代表哪个日志比较新。

提交旧任期的日志条目

一个当前 Term 的日志条目被复制到了半数以上的服务器上,Leader 就认为它是可以被提交的。如果这个 Leader 在提交日志条目前就下线了,后续的 Leader 可能会覆盖掉这个日志条目。

img

💡 图示说明:

上图解释了为什么 Leader 无法对旧 Term 的日志条目进行提交。

  • 阶段 (a) ,S1 是 Leader,且 S1 写入日志条目为 (Term 2,日志索引 2),只有 S2 复制了这个日志条目。
  • 阶段 (b),S1 下线,S5 被选举为 Term3 的 Leader。S5 写入日志条目为 (Term 3,日志索引 2)。
  • 阶段 (c),S5 下线,S1 重新上线,并被选举为 Term4 的 Leader。此时,Term 2 的那条日志条目已经被复制到了集群中的大多数节点上,但是还没有被提交。
  • 阶段 (d),S1 再次下线,S5 重新上线,并被重新选举为 Term3 的 Leader。然后 S5 覆盖了日志索引 2 处的日志。
  • 阶段 (e),如果阶段 (d) 还未发生,即 S1 再次下线之前,S1 把自己主导的日志条目复制到了大多数节点上,那么在后续 Term 里面这些新日志条目就会被提交。这样在同一时刻就同时保证了,之前的所有旧日志条目就会被提交。

Raft 永远不会通过计算副本数目的方式去提交一个之前 Term 内的日志条目。只有 Leader 当前 Term 里的日志条目通过计算副本数目可以被提交;一旦当前 Term 的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。

当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的 Term,这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

日志压缩

在实际的系统中,不能让日志无限膨胀,否则系统重启时需要花很长的时间进行恢复,从而影响可用性。Raft 采用对整个系统进行快照来解决,快照之前的日志都可以丢弃。

每个副本独立的对自己的系统状态生成快照,并且只能对已经提交的日志条目生成快照。

快照包含以下内容:

  • 日志元数据。最后一条已提交的日志条目的日志索引和 Term。这两个值在快照之后的第一条日志条目的 AppendEntries RPC 的完整性检查的时候会被用上。
  • 系统当前状态。

当 Leader 要发送某个日志条目,落后太多的 Follower 的日志条目会被丢弃,Leader 会将快照发给 Follower。或者新上线一台机器时,也会发送快照给它。

img

生成快照的频率要适中,频率过高会消耗大量 I/O 带宽;频率过低,一旦需要执行恢复操作,会丢失大量数据,影响可用性。推荐当日志达到某个固定的大小时生成快照。

生成一次快照可能耗时过长,影响正常日志同步。可以通过使用 copy-on-write 技术避免快照过程影响正常日志同步。

说明:本文仅阐述 Raft 算法的核心内容,不包括算法论证、评估等

一致性哈希算法

一致性哈希(Consistent Hash)算法的目标是:相同的请求尽可能落到同一个服务器上

一致性哈希 可以很好的解决 稳定性问题,可以将所有的 存储节点 排列在 首尾相接Hash 环上,每个 key 在计算 Hash 后会 顺时针 找到 临接存储节点 存放。而当有节点 加入退出 时,仅影响该节点在 Hash 环上 顺时针相邻后续节点

img

  • 相同的请求是指:一般在使用一致性哈希时,需要指定一个 key 用于 hash 计算,可能是:
    • 用户 ID
    • 请求方 IP
    • 请求服务名称,参数列表构成的串
  • 尽可能是指:服务器可能发生上下线,少数服务器的变化不应该影响大多数的请求。

当某台候选服务器宕机时,原本发往该服务器的请求,会基于虚拟节点,平摊到其它候选服务器,不会引起剧烈变动。

  • 优点

加入删除 节点只影响 哈希环顺时针方向相邻的节点,对其他节点无影响。

  • 缺点

加减节点 会造成 哈希环 中部分数据 无法命中。当使用 少量节点 时,节点变化 将大范围影响 哈希环数据映射,不适合 少量数据节点 的分布式方案。普通一致性哈希分区 在增减节点时需要 增加一倍减去一半 节点才能保证 数据负载的均衡

注意:因为 一致性哈希分区 的这些缺点,一些分布式系统采用 虚拟槽一致性哈希 进行改进,比如 Dynamo 系统。

Gossip 协议

Gossip 协议是集群中节点相互通信的内部通信技术。 Gossip 是一种高效、轻量级、可靠的节点间广播协议,用于传播数据。它是去中心化的、“流行病”的、容错的和点对点通信协议。

Goosip 协议的信息传播和扩散通常需要由种子节点发起。整个传播过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip 的执行过程

Gossip 协议的执行过程:Gossip 过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip 类型

Gossip 有两种类型:

  • **Anti-Entropy(反熵)**:以固定的概率传播所有的数据。Anti-Entropy 是 SI model,节点只有两种状态,Suspective 和 Infective,叫做 simple epidemics。
  • **Rumor-Mongering(谣言传播)**:仅传播新到达的数据。Rumor-Mongering 是 SIR model,节点有三种状态,Suspective,Infective 和 Removed,叫做 complex epidemics。

熵是物理学上的一个概念,代表杂乱无章,而反熵就是在杂乱无章中寻求一致。本质上,反熵是一种通过异步修复实现最终一致性的方法。反熵指的是集群中的节点,每隔段时间就随机选择某个其他节点,然后通过互相交换自己的所有数据来消除两者之间的差异,实现数据的最终一致性。由于消息会不断反复的交换,因此消息数量是非常庞大的,无限制的(unbounded),这对一个系统来说是一个巨大的开销。所以,反熵不适合动态变化或节点数比较多的分布式环境

谣言传播模型指的是当一个节点有了新数据后,这个节点变成活跃状态,并周期性地联系其他节点向其发送新数据,直到所有的节点都存储了该新数据。在谣言传播模型下,消息可以发送得更频繁,因为消息只包含最新 update,体积更小。而且,一个谣言消息在某个时间点之后会被标记为 removed,并且不再被传播,因此,谣言传播模型下,系统有一定的概率会不一致。而由于,谣言传播模型下某个时间点之后消息不再传播,因此消息是有限的,系统开销小。

一般来说,为了在通信代价和可靠性之间取得折中,需要将这两种方法结合使用。

Gossip 中的通信模式

在 Gossip 协议下,网络中两个节点之间有三种通信方式:

  • Push: 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点,B 节点更新 A 中比自己新的数据
  • Pull:A 仅将数据 key, version 推送给 B,B 将本地比 A 新的数据(Key, value, version)推送给 A,A 更新本地
  • Push/Pull:与 Pull 类似,只是多了一步,A 再将本地比 B 新的数据推送给 B,B 则更新本地

如果把两个节点数据同步一次定义为一个周期,则在一个周期内,Push 需通信 1 次,Pull 需 2 次,Push/Pull 则需 3 次。虽然消息数增加了,但从效果上来讲,Push/Pull 最好,理论上一个周期内可以使两个节点完全一致。直观上,Push/Pull 的收敛速度也是最快的。

Gossip 的优点

  • 扩展性:网络可以允许节点的任意增加和减少,新增加的节点的状态最终会与其他节点一致。
  • 容错:网络中任何节点的宕机和重启都不会影响 Gossip 消息的传播,Gossip 协议具有天然的分布式系统容错特性。
  • 去中心化:Gossip 协议不要求任何中心节点,所有节点都可以是对等的,任何一个节点无需知道整个网络状况,只要网络是连通的,任意一个节点就可以把消息散播到全网。
  • 一致性收敛:Gossip 协议中的消息会以一传十、十传百一样的指数级速度在网络中快速传播,因此系统状态的不一致可以在很快的时间内收敛到一致。消息传播速度达到了 logN。
  • 简单:Gossip 协议的过程极其简单,实现起来几乎没有太多复杂性。

Gossip 的缺陷

分布式网络中,没有一种完美的解决方案,Gossip 协议跟其他协议一样,也有一些不可避免的缺陷,主要是两个:

  • 消息的延迟:由于 Gossip 协议中,节点只会随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网的,因此使用 Gossip 协议会造成不可避免的消息延迟。不适合用在对实时性要求较高的场景下。
  • 消息冗余:Gossip 协议规定,节点会定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此就不可避免的存在消息重复发送给同一节点的情况,造成了消息的冗余,同时也增加了收到消息的节点的处理压力。而且,由于是定期发送,因此,即使收到了消息的节点还会反复收到重复消息,加重了消息的冗余。

QuorumNWR 算法

通过 Quorum NWR,你可以自定义一致性级别,通过临时调整写入或者查询的方式,当 W + R > N 时,就可以实现强一致性了。

Quorum NWR 的三要素

  • **N**:表示副本数,又叫做复制因子(Replication Factor)。也就是说,N 表示集群中同一份数据有多少个副本。在实现 Quorum NWR 的时候,你需要实现自定义副本的功能。也就是说,用户可以自定义指定数据的副本数。
  • **W**:又称写一致性级别(Write Consistency Level),表示成功完成 W 个副本更新,才完成写操作
  • **R**:又称读一致性级别(Read Consistency Level),表示读取一个数据对象时需要读 R 个副本。你可以这么理解,读取指定数据时,要读 R 副本,然后返回 R 个副本中最新的那份数据。

N、W、R 值的不同组合,会产生不同的一致性效果:

  • W + R > N 的时候,对于客户端来讲,整个系统能保证强一致性,一定能返回更新后的那份数据。
  • W + R < N 的时候,对于客户端来讲,整个系统只能保证最终一致性,可能会返回旧数据。

需要注意的是,副本数不能超过节点数:多副本的意义在于冗余备份,如果副本数超过节点数,就意味着在一个节点上会存在多个副本,那么冗余备份的意义就不大了。

PBFT 算法

PoW 算法

ZAB 协议

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。

ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议

ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

选举 Leader

ZooKeeper 的故障恢复

ZooKeeper 集群采用一主(称为 Leader)多从(称为 Follower)模式,主从节点通过副本机制保证数据一致。

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)。
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zkid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zkid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

原子广播(Atomic Broadcast)

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

img

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

InfluxDB 企业版一致性实现剖析

Hashicorp Raft

基于 Raft 的分布式 KV 系统开发实战

参考资料

《极客时间教程 - 微服务架构核心 20 讲》笔记

什么是微服务架构

微服务是一种架构模式。

微服务的六个特点:

  • 一组小的服务
  • 独立的进程
  • 独立部署
  • 轻量级通信
  • 基于业务能力
  • 无集中式管理——这里指的是可以用不同的技术栈,不同的存储

微服务定义:基于有界上下文的、松散耦合的、面向服务的架构。

架构师如何权衡微服务的利弊

架构之道在于权衡利弊。

微服务架构的优点

  • 强模块化边界
  • 可独立部署
  • 技术多样性

微服务架构的缺点

  • 分布式系统复杂性
  • 最终一致性
  • 运维复杂性
  • 测试复杂性

分布式系统带来的一个挑战就是取终一致性。

康威法则和微服务给架构师怎样的启示

康威法则:设计系统的架构受制于产生这些设计的组织的沟通结构。

img

康威的原文中提出的各定律

  • 第一定律 组织沟通方式会通过系统设计表达出来
  • 第二定律 时间再多一件事情也不可能做的完美,但总有时间做完一件事情
  • 第三定律 线型系统和线型组织架构间有潜在的异质同态特性
  • 第四定律 大的系统组织总是比小系统更倾向于分解

其中心思想实际就是分而治之

企业应该在什么时候开始考虑引入微服务

微服务的适用性:

img

微服务重在服务治理,其对于平台基础设施有较高要求,所以企业刚开始应用微服务并不一定能提高生产力。简单来说:单体服务适用于小团队;微服务适用于大团队。

何时选择微服务,在于度的把控。当研发团队人员增长到一定程度,沟通成本不断增长时,就可以考虑微服务架构了。一个经验数据是,当团队达到 100 人规模时,就可以考虑使用微服务架构了。

罗马不是一天建成的:架构是一个演进的过程,不应该一开始就将系统设计的过于复杂。

什么样组织架构更适合微服务

img

  • 左边是比较传统的组织架构。产品从左到右流程走,可能出现的问题,反馈比较慢,对业务支持比较慢。沟通成本比较大。

  • 右边是比较合适微服务的组织架构, 每一个团队(基于微服务的跨职能的团队),有开发,有产品,有测试,团队都支持自己的微服务。交付的产口是平台,对外提供 API 接口支持多样的业务。

img

DevOps 理念:谁开发的,谁构建,谁支持。

如何理解阿里巴巴提出的微服务

中台战略和微服务的关系

img

业务中台和技术中台统称为大中台,支撑业务前台。正所谓,万丈高楼平地起,中台基础越扎实,前台发展就越快。

PaaS 和 核心业务层是和微服务相关的。这一些基本都可以用微服务来实现。

  • IaaS:Infrastructure-as-a-Service(基础设施即服务)

  • PaaS:Platform-as-a-Service(平台即服务)

如何给出一个清晰简洁的服务分层方式

大致的服务分层图:

img

SOA(Service-Oriented Architecture)或微服务大致可分为

  • 基础服务:也被称为:核心领域服务、中间层服务、公共服务
  • 聚合服务:对基础服务的聚合,以满足业务需求,提供给外部调用。

微服务总体技术架构体系是怎么设计的

img

  • 接入层:接入外部流量,内部做负载均衡
  • 网关层:反向路由,限流,安全,跨横切面的功能。
  • 业务服务层:可分为:聚合服务,基础服务
  • 支撑服务:各种公共性的后台服务
  • 平台服务:可以是一些管理系统
  • 基础设施:由运维团队运维

其中,与微服务相关的主要有:网关层、业务服务层、支撑服务、平台服务

微服务最经典的三种服务发现机制

消费者(客户端)如何发现生产者(服务端),有三种模式:

(1)通过 DNS 访问 LB(负载均衡),LB 分发

img

(2)消费者内置 LB, 生产者将自身信息注册到注册中心上,并通过发送定时心跳来确认自身服务可用。消费者定期从注册中心拉取生产者信息

img

(3)结全前面两种方式, 在 Consumer 的主机上也布置一个 LB。 LB 会定期同步注册中心的信息。 运维成本比较高一点。

img

微服务 API 服务网关(一)原理

网关用于屏蔽服务内部的逻辑,希望外部访问看到是统一的接口。

img

网关主要的功能:

  • 反向代理:将外部的请求换成内部调用。
  • 安全认证:防刷、防爬虫。
  • 限流熔断:处理可能会突发流量。
  • 日志监控:进行访问访问审计,监控流量。

一般不要把过多的业务逻辑写在网关当中。

img

服务 API 服务网关(二)开源网关 Zuul

Servlet 和 Filter Runner 过滤器:前置路由过滤器, 路由过滤器,后置路由过滤器

过滤器开发,可以通过脚本开发。开发完后上传到过滤器目录中, 被扫描后加到 Filter Runner 中。

各个 Filter 共享数据通过 Request Context 来实现。

img

过滤链的流程:

img

跟 Netflix 学习微服务路由发现体系

netflix 有两个比较重要的支撑服务

  • 服务注册中心 Eureka
  • 网关 zuul

img

集中式配置中心的作用和原理是什么

为什么要引入配置中心呢?

配置文件中的属性不方便管理,无法动态更新,无法审计。配置中心可以解决这些问题。

什么可做配置呢?

  • 业务开关
  • 调用/响应超时
  • 限流
  • 连接字符串
  • 动态参数

Svr 更新配置有两种方式:推和拉。

img

携程的 Apollo 配置中心:

img

github : https://github.com/ctripcorp/apollo

微服务通讯方式 RPC vs REST

RPC:远程过程调用

REST:Restful

img

微服务框架需要考虑哪些治理环节

一个公司的微服务多了,就要需要考虑服务治理:

  • 软负载:蓝绿发布,灰度发布

  • 指标(Metrics):服务的调用量,耗时监控

  • 调用链埋点:方便快速定位问题

契约生成代码: 定义结构体可自动生成 json 格式, vscode 有插件。

img

阿里巴巴微服务治理生态:Dubbo http://dubbo.apache.org/en-us/

微服务监控系统分层和监控架构

五个层次的监控:

  • 基础层施监控
  • 系统层监控
  • 应用层监控
    • url
    • sevice
    • mysql
    • cache 可用率
    • 性能
    • qps
  • 业务层监控
    • 核心指标监控
    • 登录注册
  • 端用户体验监控

img

  • 日志监控:Elasticsearch
  • metrics 监控
  • 健康检查
  • 调用链监控
  • 告警系统

比较典型的监控架构,大部分公司的流程

img

数据量比较大一般用 Kafka 作为缓冲队列。

Nagios 健康检测工具。

ELK:ELK 是 Elasticsearch、Logstash、Kibana 三大开源框架首字母大写简称。

微服务的调用链监控该如何选型

调用链的监控 谷歌 2010 年提出来的。

通过 Span 来跟踪, RootSpan ChildSpan 跨进程时 会有 Trace di + parant span id

img

三个主流调用链监控系统的比较:

img

微服务的容错限流是如何工作的

Netfiix Hystrix 具有熔断、隔离、限流、降级的功能 。

img

说明:

  • 3 Cirult OPen 判断是否可以熔断, 是则执行 getFAllBack() 降级处理函数
  • 5 run() 超时 也执行降级处理函数。
  • 6 不成功也 执行处理函数 。
  • Calculate Cirult Health 就是在正常执行成功后计算是否需要熔断。

Docker 容器部署技术 & 持续交付流水线

docker 容器治理就是解决:环境不一致的问题。把依赖的所有包都打在镜像中。

统一、标准化的交付流水线。

UAT 环境: User Acceptance Test (用户验收测试)

img

发布模式: 蓝绿布置,灰度发布(金丝雀发布)。

img

容器集群调度和基于容器的发布体系

资源调度框架 Mesos 架构

img

基于容器的云发布体系

img

《RPC 实战与核心原理》笔记

别老想着怎么用好 RPC 框架,你得多花时间琢磨原理

为什么要学习 RPC

RPC 不仅是微服务的架构基础,实际上,只要涉及网络通信,就可能用到 RPC。

  • 例 1:大型分布式应用系统可能会依赖消息队列、分布式缓存、分布式数据库以及统一配置中心等,应用程序与依赖的这些中间件之间都可以通过 RPC 进行通信。比如 etcd,它作为一个统一的配置服务,客户端就是通过 gRPC 框架与服务端进行通信的。

  • 例 2:我们经常会谈到的容器编排引擎 Kubernetes,它本身就是分布式的,Kubernetes 的 kube-apiserver 与整个分布式集群中的每个组件间的通讯,都是通过 gRPC 框架进行的。

RPC 是解决分布式系统通信问题的一大利器

核心原理:能否画张图解释下 RPC 的通信流程?

什么是 RPC?

RPC 的全称是 Remote Procedure Call,即远程过程调用

RPC 的作用体现在两个方面:

  • 屏蔽远程调用跟本地调用的差异,让用户像调用本地一样去调用远程方法。
  • 隐藏底层网络通信的复杂性,让用户更专注于业务逻辑。

RPC 通信流程

RPC 是一个远程调用,因此必然需要通过网络传输数据,且 RPC 常用于业务系统之间的数据交互,需要保证其可靠性,所以 RPC 一般默认采用 TCP 协议来传输。

网络传输数据是二进制数据,因此请求方需要将请求参数转为二进制数据,即序列化

响应方接受到请求,要将二进制数据转换为请求参数,需要反序列化

请求方和响应方识别彼此的信息,需要约定好彼此数据的格式,即协议大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。

为了屏蔽底层通信细节,使用户聚焦自身业务,因此 RPC 框架一般引入了动态代理,通过依赖注入等技术,拦截方法调用,完成远程调用的通信逻辑。

RPC 在架构中的位置

RPC 框架能够帮助我们解决系统拆分后的通信问题,并且能让我们像调用本地一样去调用
远程方法。

协议:怎么设计可扩展且向后兼容的协议?

协议的作用

在传输过程中,RPC 并不会把请求参数的所有二进制数据整体一下子发送到对端机器上,中间可能会拆分成好几个数据包,也可能会合并其他请求的数据包(合并的前提是同一个 TCP 连接上的数据),至于怎么拆分合并,这其中的细节会涉及到系统参数配置和 TCP 窗口大小。对于服务提供方应用来说,他会从 TCP 通道里面收到很多的二进制数据,那这时候怎么识别出哪些二进制是第一个请求的呢?

个人理解:为了避免语义不一致的事情发生,需要为数据报文设定边界,请求方和接收方都按照设定的边界去读写数据。这类似于文章使用标点符号去断句。

为何需要设计 RPC 协议

RPC 协议对性能要求高,而公有网络协议往往数据报文较大,内容不够紧凑。

如何设计 RPC 协议?

首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。

接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。

综上,一个 RPC 协议大概会由下图中的这些参数组成:

可扩展的协议

前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参
数就会导致线上兼容问题。

为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。

序列化:对象怎么在网络中传输?

为什么需要序列化

调用方和被调用方的数据原本是对象,无法在网络中传输,必须转换为二进制数据。因此,需要一种方式来实现此过程:将对象转为二进制数据,即序列化;同时,需要根据二进制数据逆向转化为对象,即反序列化

从 RPC 的实现角度来看,序列化的作用如下图所示:

常用序列化方式

  • JDK 序列化:ObjectInputStreamObjectOutputStream
  • JSON
  • 二进制
    • Hessian
    • Protobuf
    • Thirft

RPC 协议选型

优先级依次从高到低:安全性、通用性、兼容性、性能、效率、空间开销

在序列化的选择上,与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。我们更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次我们才会去考虑性能、效率和空间开销。

使用 RPC 需要注意哪些问题

  • 对象构造得过于复杂 - 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
  • 对象过于复杂、庞大 - 入参对象与返回值对象体积不要太大,更不要传太大的集合;
  • 使用序列化框架不支持的类作为入参类 - 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
  • 对象有复杂的继承关系 - 对象不要有复杂的继承关系,最好不要有父子类的情况。

网络通信:RPC 框架在网络通信上更倾向于哪种网络 IO 模型?

常见的网络 IO 模型

常见的网络 IO 模型分为四种:同步阻塞 IO(BIO)、同步非阻塞 IO(NIO)、IO 多路复用和异步非阻塞 IO(AIO)。在这四种 IO 模型中,只有 AIO 为异步 IO,其他都是同步 IO。

IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。

IO 多路复用分为 select,poll 和 epoll。

什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。

零拷贝

系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。

网络 IO 读写流程

应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。

应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。

所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。

Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。

Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。

Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写
操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。

Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷
贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。

动态代理:面向接口编程,屏蔽 RPC 处理流程

动态代理可以帮用户屏蔽远程调用的细节,实现像调用本地一样地调用远程的体验。

JDK 支持的动态代理方式是通过实现 InvocationHandler 接口。这种方式有一定的局限性——它要求被代理的类只能是接口。原因是因为生成的代理类会继承 Proxy 类,但 Java 是不支持多重继承的。此外,它还有性能问题。它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低。

除 JDK 以外,还有其他第三方框架可以实现动态代理,如像 Javassist、Byte Buddy。

Javassist 的是通过控制底层字节码来实现动态代理,不需要反射完成调用,所以性能肯定比 JDK 的动态代理方式性能要好。

Byte Buddy 则属于后起之秀,在很多优秀的项目中,像 Spring、Jackson 都用到了 Byte Buddy 来完成底层代理。相比 Javassist,Byte Buddy 提供了更容易操作的 API,编写的代码可读性更高。更重要的是,生成的代理类执行速度比 Javassist 更快。

RPC 实战:剖析 gRPC 源码,动手实现一个完整的 RPC

架构设计:设计一个灵活的 RPC 框架

RPC 架构

其实 RPC 就是把拦截到的方法参数,转成可以在网络中传输的二进制,并保证在服务提供方能正确地还原出语义,最终实现像调用本地一样地调用远程的目的

RPC 本质上就是一个远程调用,必然需要通过网络来传输数据,为了屏蔽网络传输的复杂性,需要封装一个单独的数据传输模块用来收发二进制数据。

用户请求的时候是基于方法调用,方法出入参数都是对象数据,对象是肯定没法直接在网络中传输的,我们需要提前把它转成可传输的二进制,这就是我们说的序列化过程。但只是把方法调用参数的二进制数据传输到服务提供方是不够的,我们需要在方法调用参数的二进制数据后面增加“断句”符号来分隔出不同的请求,在两个“断句”符号中间放的内容就是我们请求的二进制数据,这个过程我们叫做协议封装。可以把这两个处理过程放在同一个模块,统称为协议模块。除此之外,我们还可以在协议模块中加入压缩功能,这是因为压缩过程也是对传输的二进制数据进行操作。

RPC 还需要为调用方找到所有的服务提供方,并需要在 RPC 里面维护好接口跟服务提供者地址的关系,这样调用方在发起请求的时候才能快速地找到对应的接收地址,这个过程即为“服务发现”。

但服务发现只是解决了接口和服务提供方地址映射关系的查找问题。但是,对于 RPC 来说,每次发送请求的时候都是需要用 TCP 连接的,相对服务提供方 IP 地址,TCP 连接状态是瞬息万变的,所以我们的 RPC 框架里面要有连接管理器去维护 TCP 连接的状态。

有了集群之后,提供方可能就需要管理好这些服务了,那我们的 RPC 就需要内置一些服务治理的功能,比如服务提供方权重的设置、调用授权等一些常规治理手段。而服务调用方需要额外做哪些事情呢?每次调用前,我们都需要根据服务提供方设置的规则,从集群中选择可用的连接用于发送请求。

RPC 可扩展架构

在 RPC 框架中,如何支持插件化架构呢?

可以使用 SPI 技术来实现。注意:由于 JDK SPI 性能不高,并且不支持自动注入,所以,一般会选择其他的 SPI 实现。

有了 SPI 支持插件式加载后,RPC 框架就变成了一个微内核架构。

服务发现:到底是要 CP 还是 AP?

RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。

  • 服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心之中,注册中心将这个服务节点的 IP 和接口保存下来。
  • 服务订阅:在服务调用方启动的时候,去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。

基于 ZooKeeper 的服务发现

使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。

搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能

img

通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。

在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:

  • 注册中心负载过高;
  • 各节点数据不一致;
  • 服务下发不及时或下发错误的服务节点列表。

RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。

基于消息总线的最终一致性的注册中心

ZooKeeper 的一大特点就是强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这也就直接导致了 ZooKeeper 集群性能上的下降。

而 RPC 框架的服务发现,在服务节点刚上线时,服务调用方是可以容忍在一段时间之后(比如几秒钟之后)发现这个新上线的节点的。毕竟服务节点刚上线之后的几秒内,甚至更长的一段时间内没有接收到请求流量,对整个服务集群是没有什么影响的,所以我们可以牺牲掉 CP(强制一致性),而选择 AP(最终一致),来换取整个注册中心集群的性能和稳定性。

img

健康检测:这个节点都挂了,为啥还要疯狂发请求?

健康检测,它能帮助我们从连接列表里面过滤掉一些存在问题的节点,避免在发请求的时候选择出有问题的节点而影响业务。

服务方状态一般有三种情况:

  1. 健康状态 - 建立连接成功,并且心跳探活也一直成功;
  2. 亚健康状态 - 建立连接成功,但是心跳请求连续失败;
  3. 死亡状态 - 建立连接失败。

设计健康检测方案的时候,不能简单地从 TCP 连接是否健康、心跳是否正常等简单维度考虑,因为健康检测的目的就是要保证“业务无损”,因此,可以加入业务请求可用率因素,这样能最大化地提升 RPC 接口可用率。

正常情况下,我们大概 30S 会发一次心跳请求,这个间隔一般不会太短,如果太短会给服务节点造成很大的压力。但是如果太长的话,又不能及时摘除有问题的节点。

路由策略:怎么让请求按照设定的规则发到不同的节点上?

服务路由是指通过一定的规则从集群中选择合适的节点。

为什么需要路由策略

服务路由通常用于以下场景,目的在于实现流量隔离:

  • 分组调用

  • 蓝绿发布

  • 灰度发布

  • 流量切换

  • 线下测试联调

  • 读写分离

路由规则

  • 条件路由:基于条件表达式的路由规则
  • 脚本路由:基于脚本语言的路由规则
  • 标签路由:将服务分组的路由规则

负载均衡:节点负载差距这么大,为什么收到的流量还一样?

负载均衡算法

  • 随机算法
    • 加权随机算法
  • 轮询算法
    • 加权轮询算法
  • 最小活跃数算法
    • 加权最小活跃数算法
  • 哈希算法
  • 一致性哈希算法

RPC 框架中的负载均衡

RPC 负载均衡所采用的策略与传统的 Web 服务负载均衡所采用策略的不同之处:

  1. 搭建负载均衡设备或 TCP/IP 四层代理,需要额外成本;
  2. 请求流量都经过负载均衡设备,多经过一次网络传输,会额外浪费一些性能;
  3. 负载均衡添加节点和摘除节点,一般都要手动添加,当大批量扩容和下线时,会有大量的人工操作,“服务发现”在操作上是个问题;
  4. 我们在服务治理的时候,针对不同接口服务、服务的不同分组,我们的负载均衡策略是需要可配的,如果大家都经过这一个负载均衡设备,就不容易根据不同的场景来配置不同的负载均衡策略了。

RPC 的负载均衡完全由 RPC 框架自身实现,RPC 的服务调用者会与“注册中心”下发的所有服务节点建立长连接,在每次发起 RPC 调用时,服务调用者都会通过配置的负载均衡插件,自主选择一个服务节点,发起 RPC 调用请求。

如何设计自适应的负载均衡

那服务调用者节点又该如何判定一个服务节点的处理能力呢?

可以采用一种打分制的策略,服务调用者收集与之建立长连接的每个服务节点的指标数据,如服务节点的负载指标、CPU 核数、内存大小、请求处理的耗时指标(如请求平均耗时、TP99、TP999)、服务节点的状态指标(如正常、亚健康)。通过这些指标,计算出一个分数,比如总分 10 分,如果 CPU 负载达到 70%,就减它 3 分,当然了,减 3 分只是个类比,需要减多少分是需要一个计算策略的。

然后,根据不同指标的重要程度设置权重,然后累加,计算公式:

1
健康值 = 指标值1 * 权重1 + 指标值2 * 权重2 + ...

服务调用者给每个服务节点都打完分之后,会发送请求,那这时候我们又该如何根据分数去控制给每个服务节点发送多少流量呢?

关键步骤

  1. 添加服务指标收集器,并将其作为插件,默认有运行时状态指标收集器、请求耗时指标收集器。
  2. 运行时状态指标收集器收集服务节点 CPU 核数、CPU 负载以及内存等指标,在服务调用者与服务提供者的心跳数据中获取。
  3. 请求耗时指标收集器收集请求耗时数据,如平均耗时、TP99、TP999 等。
  4. 可以配置开启哪些指标收集器,并设置这些参考指标的指标权重,再根据指标数据和指标权重来综合打分。
  5. 通过服务节点的综合打分与节点的权重,最终计算出节点的最终权重,之后服务调用者会根据随机权重的策略,来选择服务节点。

异常重试:在约定时间内安全可靠地重试

异常重试

就是当调用端发起的请求失败时,RPC 框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试的次数。

当然,不是所有的异常都要触发重试,只有符合重试条件的异常才能触发重试,比如网络超时异常、网络连接异常等等(这个需要 RPC 去判定)。

注意:有时网络可能发生抖动,导致请求超时,这时如果 RPC 触发超时重试,会触发业务逻辑重复执行,如果接口没有幂等性设计,就可能引发问题。如:重发写表。

重试超时时间

连续的异常重试可能会出现一种不可靠的情况,那就是连续的异常重试并且每次处理的请求时间比较长,最终会导致请求处理的时间过长,超出用户设置的超时时间。

解决这个问题最直接的方式就是,在每次重试后都重置一下请求的超时时间。

当调用端发起 RPC 请求时,如果发送请求发生异常并触发了异常重试,我们可以先判定下这个请求是否已经超时,如果已经超时了就直接返回超时异常,否则就先重置下这个请求的超时时间,之后再发起重试。

在所有发起重试、负载均衡选择节点的时候,去掉重试之前出现过问题的那个节点,以保证重试的成功率。

业务异常

RPC 框架是不会知道哪些业务异常能够去进行异常重试的,我们可以加个重试异常的白名单,用户可以将允许重试的异常加入到这个白名单中。当调用端发起调用,并且配置了异常重试策略,捕获到异常之后,我们就可以采用这样的异常处理策略。如果这个异常是 RPC 框架允许重试的异常,或者这个异常类型存在于可重试异常的白名单中,我们就允许对这个请求进行重试。


综上,一个可靠的 RPC 容错处理机制如下:

img

优雅关闭:如何避免服务停机带来的业务损失?

优雅关闭:如何避免服务停机带来的业务损失?

在服务重启的时候,对于调用方来说,可能会存在以下几种情况:

  • 调用方发请求前,目标服务已经下线。对于调用方来说,跟目标节点的连接会断开,这时候调用方可以立马感知到,并且在其健康列表里面会把这个节点挪掉,自然也就不会被负载均衡选中。
  • 调用方发请求的时候,目标服务正在关闭。但调用方并不知道它正在关闭,而且两者之间的连接也没断开,所以这个节点还会存在健康列表里面,因此该节点就有一定概率会被负载均衡选中。

当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。

但如果只是靠等待被动调用,就会让这个关闭过程整体有点漫长。因为有的调用方那个时刻没有业务请求,就不能及时地通知调用方了,所以我们可以加上主动通知流程,这样既可以保证实时性,也可以避免通知失败的情况。

如何捕获到关闭事件呢?在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。

优雅启动:如何避免流量打到没有启动完成的节点?

优雅启动:如何避免流量打到没有启动完成的节点?

运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在 Java 里面,在运行过程中,JVM 虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到 JVM 缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。

但是这些“临时数据”,都在应用重启后就消失了。如果让刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。

启动预热

启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。

首先,对于调用方来说,我们要知道服务提供方启动的时间。有两种方法:

  • 一种是服务提供方在启动的时候,把自己启动的时间告诉注册中心;
  • 另外一种就是注册中心收到的服务提供方的请求注册时间。

怎么确保所有机器的日期时间是一样的?在真实环境中机器都会默认开启 NTP 时间同步功能,来保证所有机器时间的一致性。

最终的结果就是,调用方通过服务发现,除了可以拿到 IP 列表,还可以拿到对应的启动时间。我们需要把这个时间作用在负载均衡上。

通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。

延迟暴露

服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。

为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个
Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。

具体如何实现呢?

我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的
Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。

熔断限流:业务如何实现自我保护

限流

限流算法

  • 计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶

限流要点

服务端主要是通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时候,对部分访问量特别大的应用进行合理的限流。

  • 服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
    限流的配置并不方便。
  • 我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值;
  • 我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

服务提供方主要通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时,对部分访问量特别大的应用进行合理的限流。

服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
限流的配置并不方便。我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值。

我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

熔断

调用端可以通过熔断机制进行自我保护,防止调用下游服务出现异常,或者耗时过长影响调用端的业务逻辑,RPC 框架可以在动态代理的逻辑中去整合熔断器,实现 RPC 框架的熔断功能。

熔断器的工作机制主要是关闭、打开和半打开这三个状态之间的切换。在正常情况下,熔断器是关闭的;当调用端调用下游服务出现异常时,熔断器会收集异常指标信息进行计算,当达到熔断条件时熔断器打开,这时调用端再发起请求是会直接被熔断器拦截,并快速地执行失败逻辑;当熔断器打开一段时间后,会转为半打开状态,这时熔断器允许调用端发送一个请求给服务端,如果这次请求能够正常地得到服务端的响应,则将状态置为关闭状态,否则
设置为打开。

业务分组:如何隔离流量?

img

在 RPC 里面我们可以通过分组的方式人为地给不同的调用方划分出不同的小集群,从而实现调用方流量隔离的效果,保障我们的核心业务不受非核心业务的干扰。但我们在考虑问题的时候,不能顾此失彼,不能因为新加一个的功能而影响到原有系统的稳定性。

其实我们不仅可以通过分组把服务提供方划分成不同规模的小集群,我们还可以利用分组完成一个接口多种实现的功能。正常情况下,为了方便我们自己管理服务,我一般都会建议每个接口完成的功能尽量保证唯一。但在有些特殊场景下,两个接口也会完全一样,只是具体实现上有那么一点不同,那么我们就可以在服务提供方应用里面同时暴露两个相同接口,但只是接口分组不一样罢了。

动态分组

分组可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速伸缩。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

异步 RPC:压榨单机吞吐量

异步 RPC:压榨单机吞吐量

影响到 RPC 调用的吞吐量的主要原因就是服务端的业务逻辑比较耗时,并且 CPU 大部分时间都在等待而没有去计算,导致 CPU 利用率不够,而提升单机吞吐量的最好办法就是使用异步 RPC。

RPC 框架的异步策略主要是调用端异步与服务端异步。调用端的异步就是通过 Future 方式实现异步,调用端发起一次异步请求并且从请求上下文中拿到一个 Future,之后通过 Future 的 get 方法获取结果,如果业务逻辑中同时调用多个其它的服务,则可以通过 Future 的方式减少业务逻辑的耗时,提升吞吐量。服务端异步则需要一种回调方式,让业务逻辑可以异步处理,之后调用 RPC 框架提供的回调接口,将最终结果异步通知给调用端。

另外,我们可以通过对 CompletableFuture 的支持,实现 RPC 调用在调用端与服务端之间的完全异步,同时提升两端的单机吞吐量。

此外,RPC 框架也可以有其它的异步策略,比如集成 RxJava,再比如 gRPC 的 StreamObserver 入参对象,但 CompletableFuture 是 Java8 原生提供的,无代码入侵性,并且在使用上更加方便。

安全体系:如何建立可靠的安全体系?

RPC 是解决应用间互相通信的框架,而应用之间的远程调用过程一般不会暴露在公网,换句话讲就是说 RPC 一般用于解决内部应用之间的通信,而这个“内部”是指应用都部署在同一个大局域网内。相对于公网环境,局域网的隔离性更好,也就相对更安全,所以在 RPC 里面我们很少考虑像数据包篡改、请求伪造等恶意行为。

对于 RPC 来说,需要关心的安全问题不会有公网应用那么复杂,我们只要保证让服务调用方能拿到真实的服务提供方 IP 地址集合,且服务提供方可以管控调用自己的应用就够了(比如颁发数字签名)。

分布式环境下如何快速定位问题?

问题定位:链路追踪

链路追踪要点

  • traceId:用于表示一次完整的请求
  • spanId:用于标识一次 RPC 调用在分布式请求中的位置
  • annonation:业务自定义埋点数据

链路追踪理论

链路追踪代表产品

  • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
  • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
  • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
  • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。

详解时钟轮在 RPC 中的应用

无论是同步调用还是异步调用,调用端内部实行的都是异步,而调用端在向服务端发送消息之前会创建一个 Future,并存储这个消息标识与这个 Future 的映射,当服务端收到消息并且处理完毕后向调用端发送响应消息,调用端在接收到消息后会根据消息的唯一标识找到这个 Future,并将结果注入给这个 Future。

一般定时任务方案的缺点

  1. 方案一:每创建一个 Future 都启动一个线程,之后 sleep,到达超时时间就触发请求超时的处理逻辑。
  • 缺点:需要创建大量线程。例如:高并发场景下,单机可能每秒要发送数万次请求,请求超时时间设置的是 5 秒,那我们要创建多少个线程用来执行超时任务呢?超过 10 万个线程!
  1. 方案二:用一个线程来处理所有的定时任务,不断轮询定时任务。假设一个线程每隔 100 毫秒会扫描一遍所有的处理 Future 超时的任务,当发现一个 Future 超时了,我们就执行这个任务,对这个 Future 执行超时逻辑。
  • 缺点:很浪费 CPU。高并发场景下,如果调用端刚好在 1 秒内发送了 1 万次请求,这 1 万次请求要在 5 秒后才会超时,那么那个扫描的线程在这个 5 秒内就会不停地对这 1 万个任务进行扫描遍历,要额外扫描 40 多次(每 100 毫秒扫描一次,5 秒内要扫描近 50 次),很浪费 CPU。

时钟轮方案

在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度,而时钟轮就相当于秒针与分针等跳动的一个周期,我们会将每个任务放到对应的时间槽位上。

时钟轮的运行机制和生活中的时钟也是一样的,每隔固定的单位时间,就会从一个时间槽位跳到下一个时间槽位,这就相当于我们的秒针跳动了一次;时钟轮可以分为多层,下一层时钟轮中每个槽位的单位时间是当前时间轮整个周期的时间,这就相当于 1 分钟等于 60 秒钟;当时钟轮将一个周期的所有槽位都跳动完之后,就会从下一层时钟轮中取出一个槽位的任务,重新分布到当前的时钟轮中,当前时钟轮则从第 0 槽位从新开始跳动,这就相当于
下一分钟的第 1 秒。

时钟轮在 RPC 中的应用

  • 调用端请求超时处理:每发一次请求,都创建一个处理请求超时的定时任务放到时钟轮里,在高并发、高访问量的情况下,时钟轮每次只轮询一个时间槽位中的任务,这样会节省大量的 CPU。
  • 调用端与服务端启动超时也可以应用到时钟轮:以调用端为例,假设我们想要让应用可以快速地部署,例如 1 分钟内启动,如果超过 1 分钟则启动失败。我们可以在调用端启动时创建一个处理启动超时的定时任务,放到时钟轮里。
  • 定时心跳:RPC 框架调用端定时向服务端发送心跳,来维护连接状态,我们可以将心跳的逻辑封装为一个心跳任务,放到时钟轮里。

流量回放:保障业务技术升级的神器

实际情况就是我们不仅要保障已有业务的稳定,还需要快速去完成各种新业务的需求,这期间我们的应用代码就会经常发生变化,而发生变化后就可能会引入新的不稳定因素,而且这个过程会一直持续不断发生。

为了保障应用升级后,我们的业务行为还能保持和升级前一样,我们在大多数情况下都是依靠已有的 TestCase 去验证,但这种方式在一定程度上并不是完全可靠的。最可靠的方式就是引入线上 Case 去验证改造后的应用,把线上的真实流量在改造后的应用里面进行回放,这样不仅节省整个上线时间,还能弥补手动维护 Case 存在的缺陷。

应用引入了 RPC 后,所有的请求流量都会被 RPC 接管,所以我们可以很自然地在 RPC 里面支持流量回放功能。虽然这个功能本身并不是 RPC 的核心功能,但对于使用 RPC 的人来说,他们有了这个功能之后,就可以更放心地升级自己的应用了。

动态分组:超高效实现秒级扩缩容

分组后带来的收益,它可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速扩缩容。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

如何在没有接口的情况下进行 RPC 调用?

应用场景

(1)测试平台:各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。

(2)轻量级的服务网关:可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。服务网关要作为所有 RPC 服务的调用端,是不能依赖所有服务提供方的接口 API 的,也需要调用端在没有服务提供方提供接口的情况下,仍然可以正常地发起 RPC 调用。

如何泛化调用

所谓的 RPC 调用,本质上就是调用端向服务端发送一条请求消息,服务端接收并处理,之后向调用端发送一条响应消息,调用端处理完响应消息之后,一次 RPC 调用就完成了。只要调用端将服务端需要知道的信息,如接口名、业务分组名、方法名以及参数信息等封装成请求消息发送给服务端,服务端就能够解析并处理这条请求消息,这样问题就解决了。

泛化调用接口示例:

1
2
3
4
class GenericService {
Object $invoke(String methodName, String[] paramTypes, Object[] params);
CompletableFuture<Object> $asyncInvoke(String methodName, String[] paramTypes
}

如何在线上环境里兼容多种 RPC 协议?

业界有很多 RPC 框架,如:Dubbo、Hessian、gRPC 等,它们随着技术发展逐渐涌现出来。不同时期、不同项目为了解决自身的通信问题,可能会选择不同的 RPC 框架。

对于一个公司来说,不同的 RPC 框架,会使得维护成本变高。所以,如果想缩减维护成本,自然会想到统一 RPC 框架。

但这面临的重要问题是:如果直接切换 RPC 框架,会导致新旧 RPC 框架的服务无法通信,从而造成业务损失。为此,一个折中的方案就是:先不移除原有的 RPC 框架,但同时接入新的 RPC 框架,让两种 RPC 同时提供服务,然后等所有的应用都接入完新的 RPC 以后,再让所有的应用逐步接入到新的 RPC 上。

在保持原有 RPC 使用方式不变的情况下,同时引入新的 RPC 框架的思路,是可以让所有的应用最终都能升级到我们想要升级的 RPC 上,但对于开发人员来说,这样切换成本还是有点儿高,整个过程最少需要两次上线才能彻底地把应用里面的旧 RPC 都切换成新 RPC。还有一种方案:要让新的 RPC 能同时支持多种 RPC 调用,当一个调用方切换到新的 RPC 之后,调用方和服务提供方之间就可以用新的协议完成调用;当调用方还是用老的 RPC 进行调用的话,调用方和服务提供方之间就继续沿用老的协议完成调用。

如何优雅处理多协议

每种协议约定的数据包格式是不一样的,而且每种协议开头都有一个协议编码,我们一般叫做 magic number。

当 RPC 收到了数据包后,我们可以先解析出 magic number 来。获取到 magic number 后,我们就很容易地找到对应协议的数据格式,然后用对应协议的数据格式去解析收到的二进制数据包。

协议解析过程就是把一连串的二进制数据变成一个 RPC 内部对象,但这个对象一般是跟协议相关的,所以为了能让 RPC 内部处理起来更加方便,我们一般都会把这个协议相关的对象转成一个跟协议无关的 RPC 对象。这是因为在 RPC 流程中,当服务提供方收到反序列化后的请求的时候,我们需要根据当前请求的参数找到对应接口的实现类去完成真正的方法调用。如果这个请求参数是跟协议相关的话,那后续 RPC 的整个处理逻辑就会变得很复杂。

当完成了真正的方法调用以后,RPC 返回的也是一个跟协议无关的通用对象,所以在真正往调用方写回数据的时候,我们同样需要完成一个对象转换的逻辑,只不过这时候是把通用对象转成协议相关的对象。

参考资料

SpringBoot Actuator 快速入门

spring-boot-actuator 模块提供了 Spring Boot 的所有生产就绪功能。启用这些功能的推荐方法是添加 spring-boot-starter-actuator 依赖。

如果是 Maven 项目,添加以下依赖:

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

如果是 Gradle 项目,添加以下声明:

1
2
3
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
}

端点(Endpoint)

Actuator Endpoint 使 Spring Boot 用户可以监控应用,并和应用进行交互。Spring Boot 内置了许多 端点,并允许用户自定义端点。例如,health 端点提供基本的应用健康信息。

用户可以启用或禁用每个单独的端点并通过 HTTP 或 JMX 暴露它们(使它们可远程访问)。当端点被启用和公开时,它被认为是可用的。内置端点仅在可用时才会自动配置。大多数应用程序选择通过 HTTP 公开。例如,默认情况下,health 端点映射到 /actuator/health

启用端点

默认情况下,除了 shutdown 之外的所有端点都已启用。要配置端点的启用,请使用 management.endpoint.<id>.enabled 属性。以下示例启用 shutdown 端点:

1
management.endpoint.shutdown.enabled=true

如果您希望端点是明确指定才启用,请将 management.endpoints.enabled-by-default 属性设置为 false 并根据需要明确指定启用的端点,以下为示例:

1
2
management.endpoints.enabled-by-default=false
management.endpoint.info.enabled=true

暴露端点

由于端点可能包含敏感信息,您应该仔细考虑何时暴露它们。下表显示了内置端点的默认曝光:

ID JMX Web
auditevents Yes No
beans Yes No
caches Yes No
conditions Yes No
configprops Yes No
env Yes No
flyway Yes No
health Yes Yes
heapdump N/A No
httptrace Yes No
info Yes No
integrationgraph Yes No
jolokia N/A No
logfile N/A No
loggers Yes No
liquibase Yes No
metrics Yes No
mappings Yes No
prometheus N/A No
quartz Yes No
scheduledtasks Yes No
sessions Yes No
shutdown Yes No
startup Yes No
threaddump Yes No

要更改暴露的端点,请使用以下特定于技术的包含和排除属性:

Property Default
management.endpoints.jmx.exposure.exclude
management.endpoints.jmx.exposure.include *
management.endpoints.web.exposure.exclude
management.endpoints.web.exposure.include health

include 属性列出了暴露的端点的 ID。 exclude 属性列出了不应暴露的端点的 ID。 exclude 属性优先于 include 属性。您可以使用端点 ID 列表配置包含和排除属性。

例如,仅暴露 health 和 info 端点,其他端点都不通过 JMX 暴露,可以按如下配置:

1
management.endpoints.jmx.exposure.include=health,info

注意:* 可用于选择所有端点。

安全

出于安全考虑,只有 /health 端点会通过 HTTP 方式暴露。用户可以通过 management.endpoints.web.exposure.include 决定哪些端点可以通过 HTTP 方式暴露。

如果 Spring Security 在类路径上并且不存在其他 WebSecurityConfigurerAdapterSecurityFilterChain bean,则除 /health 之外的所有 actuator 都由 Spring Boot 自动启用安全控制。如果用户自定义了 WebSecurityConfigurerAdapterSecurityFilterChain bean,Spring Boot 不再启用安全控制,由用户自行控制访问规则。

如果您希望为 HTTP 端点定义安全控制(例如,只允许具有特定角色的用户访问它们),Spring Boot 提供了一些方便的 RequestMatcher 对象,您可以将它们与 Spring Security 结合使用。

下面是一个典型的 Spring Security 配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().hasRole("ENDPOINT_ADMIN"));
http.httpBasic();
return http.build();
}

}

前面的示例使用 EndpointRequest.toAnyEndpoint() 将请求匹配到任何端点,然后确保所有端点都具有 ENDPOINT_ADMIN 角色。 EndpointRequest 上还提供了其他几种匹配器方法。

如果希望无需身份验证即可访问所有执行器端点。可以通过更改 management.endpoints.web.exposure.include 属性来做到这一点,如下所示:

1
management.endpoints.web.exposure.include=*

此外,如果存在 Spring Security,您将需要添加自定义安全配置,以允许未经身份验证的访问端点,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().permitAll());
return http.build();
}

}

由于 Spring Boot 依赖于 Spring Security 的默认设置,因此 CSRF 保护默认开启。这意味着在使用默认安全配置时,需要 POST(关闭和记录器端点)、PUT 或 DELETE 的执行器端点会收到 403(禁止)错误。

建议仅在创建非浏览器客户端使用的服务时完全禁用 CSRF 保护。

配置端点

端点会自动缓存对不带任何参数的读操作的响应数据。要配置端点缓存响应的时间量,请使用其 cache.time-to-live 属性。以下示例将 bean 端点缓存的生存时间设置为 10 秒:

1
management.endpoint.beans.cache.time-to-live=10s

Actuator Web 端点的超媒体

Spring Boot Actuator 中内置了一个“发现页面”端点,其中包含了所有端点的链接。默认情况下,“发现页面”在 /actuator 上可用。

要禁用“发现页面”,请将以下属性添加到您的应用程序属性中:

1
management.endpoints.web.discovery.enabled=false

配置自定义管理上下文路径后,“发现页面”会自动从 /actuator 移动到应用管理上下文的根目录。例如,如果管理上下文路径是 /management,则发现页面可从 /management 获得。当管理上下文路径设置为 / 时,发现页面被禁用以防止与其他映射发生冲突的可能性。

跨域支持

CORS 是一种 W3C 规范,可让用户以灵活的方式指定授权哪种跨域请求。如果使用 Spring MVC 或 Spring WebFlux,则可以配置 Actuator 的 Web 端点以支持此类场景。

CORS 支持默认是禁用的,只有在设置 management.endpoints.web.cors.allowed-origins 属性后才会启用。以下配置允许来自 example.com 域的 GET 和 POST 调用:

1
2
management.endpoints.web.cors.allowed-origins=https://example.com
management.endpoints.web.cors.allowed-methods=GET,POST

自定义端点

如果添加带有 @Endpoint 注释的 @Bean,则任何带有 @ReadOperation@WriteOperation@DeleteOperation 注释的方法都会自动通过 JMX 公开,并且在 Web 应用程序中,也可以通过 HTTP 公开。可以使用 Jersey、Spring MVC 或 Spring WebFlux 通过 HTTP 公开端点。如果 Jersey 和 Spring MVC 都可用,则使用 Spring MVC。

以下示例公开了一个返回自定义对象的读取操作:

1
2
3
4
@ReadOperation
public CustomData getData() {
return new CustomData("test", 5);
}

您还可以使用 @JmxEndpoint@WebEndpoint 编写特定技术的端点。这些端点仅限于各自的技术。例如,@WebEndpoint 仅通过 HTTP 而不是通过 JMX 公开。

您可以使用 @EndpointWebExtension@EndpointJmxExtension 编写特定技术的扩展。这些注释让您可以提供特定技术的操作来扩充现有端点。

最后,如果您需要访问 Web 框架的功能,您可以实现 servlet 或 Spring @Controller@RestController 端点,但代价是它们无法通过 JMX 或使用不同的 Web 框架获得。

通过 HTTP 进行监控和管理

自定义管理端点路径

如果是 Web 应用,Spring Boot Actuator 会自动将所有启用的端点通过 HTTP 方式暴露。默认约定是使用前缀为 /actuator 的端点的 id 作为 URL 路径。例如,健康被暴露为 /actuator/health

有时,自定义管理端点的前缀很有用。例如,您的应用程序可能已经将 /actuator 用于其他目的。您可以使用 management.endpoints.web.base-path 属性更改管理端点的前缀,如以下示例所示:

1
management.endpoints.web.base-path=/manage

该示例将端点从 /actuator/{id} 更改为 /manage/{id}(例如,/manage/info)。

自定义管理服务器端口

1
management.server.port=8081

配置 SSL

当配置为使用自定义端口时,还可以使用各种 management.server.ssl.* 属性为管理服务器配置自己的 SSL。例如,这样做可以让管理服务器在主应用程序使用 HTTPS 时通过 HTTP 可用,如以下属性设置所示:

1
2
3
4
5
6
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:store.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=false

或者,主服务器和管理服务器都可以使用 SSL,但使用不同的密钥存储,如下所示:

1
2
3
4
5
6
7
8
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:main.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=true
management.server.ssl.key-store=classpath:management.jks
management.server.ssl.key-password=secret

自定义管理服务器地址

1
2
management.server.port=8081
management.server.address=127.0.0.1

禁用 HTTP 端点

如果您不想通过 HTTP 方式暴露端点,可以将管理端口设置为 -1,如以下示例所示:

1
management.server.port=-1

也可以通过使用 management.endpoints.web.exposure.exclude 属性来实现这一点,如以下示例所示:

1
management.endpoints.web.exposure.exclude=*

通过 JMX 进行监控和管理

Java 管理扩展 (JMX) 提供了一种标准机制来监视和管理应用程序。默认情况下,此功能未启用。您可以通过将 spring.jmx.enabled 配置属性设置为 true 来打开它。 Spring Boot 将最合适的 MBeanServer 暴露为 ID 为 mbeanServer 的 bean。使用 Spring JMX 注释(@ManagedResource@ManagedAttribute@ManagedOperation)注释的任何 bean 都会暴露给它。

如果您的平台提供标准 MBeanServer,则 Spring Boot 会使用该标准并在必要时默认使用 VM MBeanServer。如果一切都失败了,则创建一个新的 MBeanServer

有关更多详细信息,请参阅 JmxAutoConfiguration 类。

默认情况下,Spring Boot 还将管理端点公开为 org.springframework.boot 域下的 JMX MBean。要完全控制 JMX 域中的端点注册,请考虑注册您自己的 EndpointObjectNameFactory 实现。

定制化 MBean Names

MBean 的名称通常由端点的 id 生成。例如,健康端点公开为 org.springframework.boot:type=Endpoint,name=Health

如果您的应用程序包含多个 Spring ApplicationContext,您可能会发现名称冲突。要解决此问题,您可以将 spring.jmx.unique-names 属性设置为 true,以便 MBean 名称始终是唯一的。

如果需要定制,跨域按如下配置:

1
2
spring.jmx.unique-names=true
management.endpoints.jmx.domain=com.example.myapp

禁用 JMX 端点

想禁用 JMX 端点,可以按如下配置:

1
management.endpoints.jmx.exposure.exclude=*

将 Jolokia 用于基于 HTTP 的 JMX

Jolokia 是一个 JMX-HTTP 的桥接工具,它提供了另一种访问 JMX bean 的方法。要使用 Jolokia,需要先添加依赖:

1
2
3
4
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-core</artifactId>
</dependency

然后,您可以通过将 jolokia* 添加到 Management.Endpoints.web.exposure.include 属性来暴露 Jolokia 端点。然后,您可以在管理 HTTP 服务器上使用 /actuator/jolokia 访问它。

日志

Spring Boot Actuator 支持查看和配置应用日志级别。

日志级别的可选值如下:

  • TRACE
  • DEBUG
  • INFO
  • WARN
  • ERROR
  • FATAL
  • OFF
  • null

null 表示没有显式配置。

指标

审计

Spring Boot Actuator 支持简单的审计功能。如果应用中启用了 Spring Security,Spring Boot Actuator 就会发布安全事件(如:“身份验证成功”、“失败”和“访问被拒绝”异常)。

可以通过在应用的配置中提供 AuditEventRepository 类型的 bean 来启用审计。为方便起见,Spring Boot 提供了一个 InMemoryAuditEventRepositoryInMemoryAuditEventRepository 的功能有限,建议仅将其用于开发环境。

如果要自定义安全事件,可以提供 AbstractAuthenticationAuditListenerAbstractAuthorizationAuditListener 实现。

此外,还可以将审计服务用于业务活动。为此,要么将 AuditEventRepository bean 注入组件并直接使用它,要么使用 Spring ApplicationEventPublisher 发布 AuditApplicationEvent(通过实现 ApplicationEventPublisherAware)。

HTTP 追踪

用户可以通过在应用中提供 HttpTraceRepository 类型的 bean 来启用 HTTP 跟踪。Spring Boot 提供了内置的 InMemoryHttpTraceRepository,它可以存储最近 100 次(默认)请求-响应的追踪数据。与其他 HTTP 追踪解决方案相比,InMemoryHttpTraceRepository 比较受限,建议仅用于开发环境。对于生产环境,建议使用 Zipkin 或 Spring Cloud Sleuth。

或者,可以自定义 HttpTraceRepository

处理监控

在 spring-boot 模块中,您可以找到两个类来创建对进程监控有用的文件:

  • ApplicationPidFileWriter 创建一个包含应用程序 PID 的文件(默认情况下,在应用程序目录中,文件名为 application.pid)。
  • WebServerPortFileWriter 创建一个文件(或多个文件),其中包含正在运行的 Web 服务器的端口(默认情况下,在应用程序目录中,文件名为 application.port)。

参考资料

分布式分区

什么是分区

分区通常是这样定义的,即每一条数据(或者每条记录,每行或每个文档)只属于某个特定分区。实际上,每个分区都可以视为一个完整的小型数据库,虽然数据库可能存在一些跨分区的操作。

在不同系统中,分区有着不同的称呼,例如它对应于 MongoDB, Elasticsearch 和 SolrCloud 中的 shard, HBase 的 region, Bigtable 中的 tablet, Cassandra 和 Riak 中的 vnode ,以及 Couch base 中的 vBucket。总体而言,分区是最普遍的术语。

为什么需要分区

数据量如果太大,单台机器进行存储和处理就会成为瓶颈,因此需要引入数据分区机制。

分区的目地是通过多台机器均匀分布数据和查询负载,避免出现热点。这需要选择合适的数据分区方案,在节点添加或删除时重新动态平衡分区。

数据分区与数据复制

分区通常与复制结合使用,即每个分区在多个节点都存有副本。这意味着某条记录属于特定的分区,而同样的内容会保存在不同的节点上以提高系统的容错性。

一个节点上可能存储了多个分区。每个分区都有自己的主副本,例如被分配给某节点,而从副本则分配在其他一些节点。一个节点可能既是某些分区的主副本,同时又是其他分区的从副本。

键-值数据的分区

分区的主要目标是将数据和查询负载均匀分布在所有节点上。如果节点平均分担负载,那么理论上 10 个节点应该能够处理 10 倍的数据量和 10 倍于单个节点的读写吞吐量(忽略复制) 。

而如果分区不均匀,则会出现某些分区节点比其他分区承担更多的数据量或查询负载,称之为倾斜。倾斜会导致分区效率严重下降,在极端情况下,所有的负载可能会集中在一个分区节点上,这就意味着 10 个节点 9 个空闲,系统的瓶颈在最繁忙的那个节点上。这种负载严重不成比例的分区即成为系统热点

避免热点最简单的方法是将记录随机分配给所有节点。这种方法可以比较均匀地分布数据,但是有一个很大的缺点:当视图读取特定的数据时,没有办法知道数据保存在哪个节点上,所以不得不并行查询所有节点。

可以改进上述方法。现在我们假设数据是简单的键-值数据模型,这意味着总是可以通过关键字来访问记录。

基于关键字区间分区

一种分区方式是为每个分区分配一段连续的关键字或者关键宇区间范围(以最小值和最大值来指示)。

关键字的区间段不一定非要均匀分布,这主要是因为数据本身可能就不均匀。

分区边界可以由管理员手动确定,或由数据库自动选择。采用这种分区策略的系统包括 Bigtable、HBase、RethinkDB、2.4 版本前的 MongoDB。

每个分区内可以按照关键字排序保存(参阅第 3 章的“ SSTables 和 LSM Trees ”)。这样可以轻松支持区间查询,即将关键字作为一个拼接起来的索引项从而一次查询得到多个相关记录。

然而,基于关键字的区间分区的缺点是某些访问模式会导致热点。如果关键字是时间戳,则分区对应于一个时间范围,所有的写入操作都集中在同一个分区(即当天的分区),这会导致该分区在写入时负载过高,而其他分区始终处于空闲状态。为了避免上述问题,需要使用时间戳以外的其他内容作为关键字的第一项。

基于关键字晗希值分区

对于上述数据倾斜和热点问题,许多分布式系统采用了基于关键字哈希函数的方式来分区。

一个好的哈希函数可以处理数据倾斜并使其均匀分布。用于数据分区目的的哈希函数不需要再加密方面很强:例如 :Cassandra 和 MongoDB 使用 MD5,Voldemort 使用 Fowler-Noll-Vo。许多编程语言也有内置的简单哈希函数,但是要注意这些内置的哈希函数可能并不适合分区,例如,Java 的 Object.hashCode 和 Object#hash,同一个键在不同的进程中可能返回不同的哈希值。

一且找到合适的关键宇哈希函数,就可以为每个分区分配一个哈希范围(而不是直接作用于关键宇范围),关键字根据其哈希值的范围划分到不同的分区中。

img

这种方法可以很好地将关键字均匀地分配到多个分区中。分区边界可以是均匀间隔,也可以是伪随机选择( 在这种情况下,该技术有时被称为一致性哈希) 。

然而,通过关键宇哈希进行分区,我们丧失了良好的区间查询特性。即使关键字相邻,但经过哈希之后会分散在不同的分区中,区间查询就失去了原有的有序相邻的特性。在 MongoDB 中,如果启用了基于哈希的分片模式,则区间查询会发送到所有的分区上,而 Riak、Couchbase 和 Voldemort 干脆就不支持关键字上的区间查询。

Cassandra 则在两种分区策略之间做了一个折中。Cassandra 中的表可以声明为由多个列组成的复合主键。复合主键只有第一部分可用于哈希分区,而其他列则用作组合索引来对 Cassandra SSTable 中的数据进行排序。因此,它不支持在第一列上进行区间查询,但如果为第一列指定好了固定值,可以对其他列执行高效的区间查询。

组合索引为一对多的关系提供了一个优雅的数据模型。

负载倾斜与热点

基于哈希的分区方法可以减轻热点,但无住做到完全避免。一个极端情况是,所有的读/写操作都是针对同一个关键字,则最终所有请求都将被路由到同一个分区。

一个简单的技术就是在关键字的开头或结尾处添加一个随机数。只需一个两位数的十进制随机数就可以将关键字的写操作分布到 100 个不同的关键字上,从而分配到不同的分区上。但是,随之而来的问题是,之后的任何读取都需要些额外的工作,必须从所有 100 个关键字中读取数据然后进行合井。因此通常只对少量的热点关键字附加随机数才有意义;而对于写入吞吐量低的绝大多数关键字,这些都意味着不必要的开销。此外,还需要额外的元数据来标记哪些关键字进行了特殊处理。

分区与二级索引

二级索引通常不能唯一标识一条记录,而是用来加速特定值的查询。

二级索引是关系数据库的必要特性,在文档数据库中应用也非常普遍。但考虑到其复杂性,许多键-值存储(如 HBase 和 Voldemort)并不支持二级索引;但其他一些如 Riak 则开始增加对二级索引的支持。此外,二级索引技术也是 Solr 和 Elasticsearch 等全文索引服务器存在之根本。

二级索引带来的主要挑战是它们不能规整的地映射到分区中。有两种主要的方法来支持对二级索引进行分区:基于文档的分区和基于词条的分区。

基于文档分区的二级索引

img

在这种索引方法中,每个分区完全独立,各自维护自己的二级索引,且只负责自己分区内的文档而不关心其他分区中数据。每当需要写数据库时,包括添加,删除或更新文档等,只需要处理包含目标文档 ID 的那一个分区。因此文档分区索引也被称为本地索引,而不是全局索引。

这种查询分区数据库的方法有时也称为分散/聚集,显然这种二级索引的查询代价高昂。即使采用了并行查询,也容易导致读延迟显著放大。尽管如此,它还是广泛用于实践: MongoDB 、Riak、Cassandra、Elasticsearch 、SolrCloud 和 VoltDB 都支持基于文档分区二级索引。大多数数据库供应商都建议用户自己来构建合适的分区方案,尽量由单个分区满足二级索引查询,但现实往往难以如愿,尤其是当查询中可能引用多个二级索引时。

基于词条的二级索引分区

另一种方法,可以对所有的数据构建全局索引,而不是每个分区维护自己的本地索引。而且,为避免成为瓶颈,不能将全局索引存储在一个节点上,否则就破坏了设计分区均衡的目标。所以,全局索引也必须进行分区,且可以与数据关键字采用不同的分区策略。

img

词条分区以待查找的关键字本身作为索引。名字词条源于全文索引,term 指的是文档中出现的所有单词的集合。

可以直接通过关键词来全局划分索引,或者对其取哈希值。直接分区的好处是可以支持高效的区间查询;而采用哈希的方式则可以更均句的划分分区。

这种全局的词条分区相比于文档分区索引的主要优点是,它的读取更为高效,即它不需要采用 scatter/gather 对所有的分区都执行一遍查询,客户端只需要想包含词条的那一个分区发出读请求。然而全局索引的不利之处在于, 写入速度较慢且非常复杂,主要因为单个文档的更新时,里面可能会涉及多个二级索引,而二级索引的分区又可能完全不同甚至在不同的节点上,由此势必引人显著的写放大。

理想情况下,索引应该时刻保持最新,即写入的数据要立即反映在最新的索引上。但是,对于词条分区来讲,这需要一个跨多个相关分区的分布式事务支持,写入速度会受到极大的影响,所以现有的数据库都不支持同步更新二级索引。

分区再均衡

集群节点数变化,数据规模增长等情况,都会导致分区的分布不均。要保持分区的均衡,势必要将数据和请求进行迁移,这样一个迁移负载的过程称为分区再均衡

无论对于哪种分区方案, 分区再平衡通常至少要满足:

  • 平衡之后,负载、数据存储、读写请求等应该在集群范围更均匀地分布。
  • 再平衡执行过程中,数据库应该可以继续正常提供读写服务。
  • 避免不必要的负载迁移,以加快动态再平衡,井尽量减少网络和磁盘 I/O 影响。

动态再平衡的策略

为什么不用取模?

最好将哈希值划分为不同的区间范围,然后将每个区间分配给一个分区。

为什么不直接使用 mod?对节点数取模方法的问题是,如果节点数 N 发生了变化,会导致很多关键字需要从现有的节点迁移到另一个节点。

固定数量的分区

创建远超实际节点数的分区数,然后为每个节点分配多个分区。

接下来, 如果集群中添加了一个新节点,该新节点可以从每个现有的节点上匀走几个分区,直到分区再次达到全局平衡。

选中的整个分区会在节点之间迁移,但分区的总数量仍维持不变,也不会改变关键字到分区的映射关系。这里唯一要调整的是分区与节点的对应关系。考虑到节点间通过网络传输数据总是需要些时间,这样调整可以逐步完成,在此期间,旧分区仍然可以接收读写请求。

原则上,也可以将集群中的不同的硬件配置因素考虑进来,即性能更强大的节点将分配更多的分区,从而分担更多的负载。

目前,Riak、Elasticsearch、Couchbase 和 Voldemort 都支持这种动态平衡方法。

使用该策略时,分区的数量往往在数据库创建时就确定好,之后不会改变。原则上也可以拆分和合并分区(稍后介绍),但固定数量的分区使得相关操作非常简单,因此许多采用固定分区策略的数据库决定不支持分区拆分功能。所以,在初始化时,已经充分考虑将来扩容增长的需求(未来可能拥有的最大节点数),设置一个足够大的分区数。而每个分区也有些额外的管理开销,选择过高的数字可能会有副作用。

动态分区

对于采用关键宇区间分区的数据库,如果边界设置有问题,最终可能会出现所有数据都挤在一个分区而其他分区基本为空,那么设定固定边界、固定数量的分区将非常不便:而手动去重新配置分区边界又非常繁琐。

因此, 一些数据库如 HBase 和 RethinkDB 等采用了动态创建分区。当分区的数据增长超过一个可配的参数阔值(HBase 上默认值是 10GB),它就拆分为两个分区,每个承担一半的数据量。相反,如果大量数据被删除,并且分区缩小到某个阈值以下,则将其与相邻分区进行合井。该过程类似于 B 树的分裂操作。

每个分区总是分配给一个节点,而每个节点可以承载多个分区,这点与固定数量的分区一样。当一个大的分区发生分裂之后,可以将其中的一半转移到其他某节点以平衡负载。对于 HBase,分区文件的传输需要借助 HDFS。

动态分区的一个优点是分区数量可以自动适配数据总量。如果只有少量的数据,少量的分区就足够了,这样系统开销很小;如果有大量的数据,每个分区的大小则被限制在一个可配的最大值。

但是,需要注意的是,对于一个空的数据库, 因为没有任何先验知识可以帮助确定分区的边界,所以会从一个分区开始。可能数据集很小,但直到达到第一个分裂点之前,所有的写入操作都必须由单个节点来处理, 而其他节点则处于空闲状态。为了缓解这个问题,HBase 和 MongoDB 允许在一个空的数据库上配置一组初始分区(这被称为预分裂)。对于关键字区间分区,预分裂要求已经知道一些关键字的分布情况。

动态分区不仅适用于关键字区间分区,也适用于基于哈希的分区策略。MongoDB 从版本 2.4 开始,同时支持二者,井且都可以动态分裂分区。

按节点比例分区

采用动态分区策略,拆分和合并操作使每个分区的大小维持在设定的最小值和最大值之间,因此分区的数量与数据集的大小成正比关系。另一方面,对于固定数量的分区方式,其每个分区的大小也与数据集的大小成正比。两种情况,分区的数量都与节点数无关。

Cassandra 和 Ketama 则采用了第三种方式,使分区数与集群节点数成正比关系。换句话说,每个节点具有固定数量的分区。此时, 当节点数不变时,每个分区的大小与数据集大小保持正比的增长关系; 当节点数增加时,分区则会调整变得更小。较大的数据量通常需要大量的节点来存储,因此这种方法也使每个分区大小保持稳定。

当一个新节点加入集群时,它随机选择固定数量的现有分区进行分裂,然后拿走这些分区的一半数据量,将另一半数据留在原节点。随机选择可能会带来不太公平的分区分裂,但是当平均分区数量较大时(Cassandra 默认情况下,每个节点有 256 个分区),新节点最终会从现有节点中拿走相当数量的负载。Cassandra 在 3.0 时推出了改进算洁,可以避免上述不公平的分裂。

随机选择分区边界的前提要求采用基于哈希分区(可以从哈希函数产生的数字范围里设置边界)。这种方法也最符合本章开头所定义一致性哈希。一些新设计的哈希函数也可以以较低的元数据开销达到类似的效果。

自动与手动再平衡操作

动态平衡另一个重要问题我们还没有考虑:它是自动执行还是手动方式执行?

全自动式再平衡会更加方便,它在正常维护之外所增加的操作很少。但是,也有可能出现结果难以预测的情况。再平衡总体讲是个比较昂贵的操作,它需要重新路由请求井将大量数据从一个节点迁移到另一个节点。万一执行过程中间出现异常,会使网络或节点的负载过重,井影响其他请求的性能。

将自动平衡与自动故障检测相结合也可能存在一些风险。例如,假设某个节点负载过重,对请求的响应暂时受到影响,而其他节点可能会得到结论:该节点已经失效;接下来激活自动平衡来转移其负载。客观上这会加重该节点、其他节点以及网络的负荷,可能会使总体情况变得更槽,甚至导致级联式的失效扩散。

请求路由

当数据集分布到多个节点上,需要解决一个问题:当客户端发起请求时,如何知道应该连接哪个节点?如果发生了分区再平衡,分区与节点的对应关系随之还会变化。为了回答该问题,我们需要一段处理逻辑来感知这些变化,并负责处理客户端的连接。

这其实属于一类典型的服务发现问题,服务发现并不限于数据库,任何通过网络访问的系统都有这样的需求,尤其是当服务目标支持高可用时(在多台机器上有冗余配置)。

服务发现有以下处理策略:

  1. 允许客户端链接任意的节点(例如,采用循环式的负载均衡器)。如果某节点恰好拥有所请求的分区,则直接处理该请求:否则,将请求转发到下一个合适的节点,接收答复,并将答复返回给客户端。
  2. 将所有客户端的请求都发送到一个路由层,由后者负责将请求转发到对应的分区节点上。路由层本身不处理任何请求,它仅充一个分区感知的负载均衡器。
  3. 客户端感知分区和节点分配关系。此时,客户端可以直接连接到目标节点,而不需要任何中介。

img

许多分布式数据系统依靠独立的协调服务(如 ZooKeeper )跟踪集群范围内的元数据。每个节点都向 ZooKeeper 中注册自己, ZooKeeper 维护了分区到节点的最终映射关系。其他参与者(如路由层或分区感知的客户端)可以向 ZooKeeper 订阅此信息。一旦分区发生了改变,或者添加、删除节点, ZooKeeper 就会主动通知路由层,这样使路由信息保持最新状态。

例如,HBase、SolrCloud 和 Kafka 也使用 ZooKeeper 来跟踪分区分配情况。MongoDB 有类似的设计,但它依赖于自己的配置服务器和 mongos 守护进程来充当路由层。

Cassandra 和 Riak 则采用了不同的方法,它们在节点之间使用 gossip 协议来同步群集状态的变化。请求可以发送到任何节点,由该节点负责将其转发到目标分区节点。这种方式增加了数据库节点的复杂性,但是避免了对 ZooKeeper 之类的外部协调服务的依赖。

img

并行查询执行

到目前为止,我们只关注了读取或写入单个关键字这样简单的查询(对于文档分区的二级索引,里面要求分散/聚集查询)。这基本上也是大多数 NoSQL 分布式数据存储所支持的访问类型。

然而对于大规模并行处理(massively parallel processing, MPP)这一类主要用于数据分析的关系数据库,在查询类型方面要复杂得多。典型的数据仓库查询包含多个联合、过滤、分组和聚合操作。MPP 查询优化器会将复杂的查询分解成许多执行阶段和分区,以便在集群的不同节点上井行执行。尤其是涉及全表扫描这样的查询操作,可以通过并行执行获益颇多。

小结

数据量如果太大,单台机器进行存储和处理就会成为瓶颈,因此需要引入数据分区机制。分区的目地是通过多台机器均匀分布数据和查询负载,避免出现热点。这需要选择合适的数据分区方案,在节点添加或删除时重新动态平衡分区。

两种主要的分区方法:

  • 基于关键字区间的分区。先对关键字进行排序,每个分区只负责一段包含最小到最大关键字范围的一段关键字。对关键字排序的优点是可以支持高效的区间查询,但是如果应用程序经常访问与排序一致的某段关键字,就会存在热点的风险。采用这种方怯,当分区太大时,通常将其分裂为两个子区间,从而动态地再平衡分区。
  • 哈希分区。将哈希函数作用于每个关键字,每个分区负责一定范围的哈希值。这种方法打破了原关键字的顺序关系,它的区间查询效率比较低,但可以更均匀地分配负载。采用哈希分区时,通常事先创建好足够多(但固定数量)的分区,让每个节点承担多个分区,当添加或删除节点时将某些分区从一个节点迁移到另一个节点,也可以支持动态分区。

混合上述两种基本方住也是可行的,例如使用复合键:键的一部分来标识分区,而另一部分来记录排序后的顺序。

二级索引也需要进行分区,有两种方法:

  • 基于文档来分区二级索引(本地索引)。二级索引存储在与关键字相同的分区中,这意味着写入时我们只需要更新一个分区,但缺点是读取二级索引时需要在所有分区上执行 scatter/gather。
  • 基于词条来分区二级索引(全局索引)。它是基于索引的值而进行的独立分区。二级索引中的条目可能包含来自关键字的多个分区里的记录。在写入时,不得不更新二级索引的多个分区;但读取时,则可以从单个分区直接快速提取数据。

最后,讨论了如何将查询请求路由到正确的分区,包括简单的分区感知负载均衡器,以及复杂的并行查询执行引擎。

参考资料