Dunwu Blog

大道至简,知易行难

分布式复制

复制主要指通过网络在多台机器上保存相同数据的副本

复制数据,可能出于各种各样的原因:

  • 提高可用性 - 当部分组件出现位障,系统依然可以继续工作,系统依然可以继续工作。
  • 降低访问延迟 - 使数据在地理位置上更接近用户。
  • 提高读吞吐量 - 扩展至多台机器以同时提供数据访问服务。

复制的模式有以下几种:

  • 主从复制 - 所有的写入操作都发送到主节点,由主节点负责将数据更改事件发送到从节点。每个从节点都可以接收读请求,但内容可能是过期值。
  • 多主复制 - 系统存在多个主节点,每个都可以接收写请求,客户端将写请求发送到其中的一个主节点上,由该主节点负责将数据更改事件同步到其他主节点和自己的从节点。
  • 无主复制 - 系统中不存在主节点,每一个节点都能接受客户端的写请求。接受写请求的副本不会将数据变更同步到其他的副本。此外,读取时从多个节点上并行读取,以此检测和纠正某些过期数据

此外,复制还需要考虑以下问题:

  • 同步还是异步
  • 如何处理失败的副本
  • 如何保证数据一致

主从复制

如何确保所有副本之间的数据是一致的?

对于每一次数据写入,所有副本都需要随之更新;否则,某些副本将出现数据不一致。

最常见的解决方案就是主从复制,其原理如下:

主从复制模式中只有一个主副本(或称为主节点) ,其余称为从副本(或称为从节点)。

  1. 所有的写请求只能发送给主副本,主副本首先将新数据写入本地存储。

  2. 然后,主副本将数据更改作为复制的日志或更新流发送给所有从副本。每个从副本获得更新数据之后将其应用到本地,且严格保持与主副本相同的写入顺序。

  3. 读请求既可以在主副本上,也可以在从副本上执行。

再次强调,只有主副本才可以接受写请求:从客户端的角度来看,从副本都是只读的。如果由于某种原因,例如与主节点之间的网络中断而导致主节点无法连接,主从复制方案就会影响所有的写入操作。

主从复制系统

支持主从复制的系统:

  • 数据库:MySql、PostgreSQL(9.0 版本后)、MongoDB 等
  • 消息队列:Kafka、RabbitMQ 等

同步复制与异步复制

主从复制——同步和异步

通常情况下,复制速度会非常快。但是,系统其实并没有保证一定会在多长时间内完成复制。有些情况下,从节点可能落后主节点几分钟甚至更长时间,例如,由于从节点刚从故障中恢复,或者系统已经接近最大设计上限,或者节点之间的网络出现问题。

  • 同步复制的优点:一旦向用户确认,从节点可以明确保证完成了与主节点的更新同步,数据已经处于最新版本。万一主节点发生故障,总是可以在从节点继续访问最新数据。
  • 同步复制的缺点:如果同步的从节点无法完成确认(例如由于从节点发生崩溃,或者网络故障,或任何其他原因),写入就不能视为成功。主节点会阻塞其后所有的写操作,直到同步副本确认完成。

因此,把所有从节点都配置为同步复制有些不切实际。因为这样的话,任何一个同步节点的中断都会导致整个系统更新停滞不前。实际应用中,推荐的同步模式(也是很多数据库的选择)是:只要有一个从节点或半数以上的从节点同步成功,就视为同步,直接返回结果;剩下的节点都通过异步方式同步。万一同步的从节点变得不可用或性能下降,则将另一个异步的从节点提升为同步模式。这样可以保证至少有两个节点(即主节点和一个同步从节点)拥有最新的数据副本。这种配置有时也称为半同步

主从复制还经常会被配置为全异步模式。

  • 异步复制的优点:不管从节点上数据多么滞后,主节点总是可以继续响应写请求,系统的吞吐性能更好。
  • 异步复制的缺点:如果主节点发生故障且不可恢复,则所有尚未复制到从节点的写请求都会丢失。这意味着即使向客户端确认了写操作,却无法保证数据的持久化。

配置新的从节点

当如果出现以下情况时,如需要增加副本数以提高容错能力,或者替换失败的副本,就需要考虑增加新的从节点。但如何确保新的从节点和主节点保持数据一致呢?

简单地将数据文件从一个节点复制到另一个节点通常是不够的。主要是因为客户端仍在不断向数据库写入新数据,数据始终处于不断变化之中,因此常规的文件拷贝方式将会导致不同节点上呈现出不同时间点的数据。

另一种思路是:考虑锁定数据库(使其不可写)来使磁盘上的文件保持一致,但这会违反高可用的设计目标。在不停机、数据服务不中断的前提下,也有一种可行性复制方案,其主要操作步骤如下:

  1. 在某个时间点对主节点的数据副本产生一个一致性快照,这样避免长时间锁定整个数据库。目前大多数数据库都支持此功能,快照也是系统备份所必需的。而在某些情况下,可能需要第三方工具,如 MySQL 的 innobackupex。
  2. 将此快照拷贝到新的从节点。
  3. 从节点连接到主节点并请求快照点之后所发生的数据更改日志。因为在第一步创建快照时,快照与系统复制日志的某个确定位置相关联,这个位置信息在不同的系统有不同的称呼,如 PostgreSQL 将其称为“ log sequence number” (日志序列号),而 MySQL 将其称为“ binlog coordinates ” 。
  4. 获得日志之后,从节点来应用这些快照点之后所有数据变更,这个过程称之为追赶。接下来,它可以继续处理主节点上新的数据变化。井重复步骤 1 ~步骤 4 。

在不同系统中,建立新的从副本具体操作步骤可能有所不同。

处理节点失效

系统中的任何节点都可能因故障或者计划内的维护(例如重启节点以安装内核安全补丁)而导致中断甚至停机。如果能够在不停机的情况下重启某个节点,这会对运维带来巨大的便利。我们的目标是,尽管个别节点会出现中断,但要保持系统总体的持续运行,并尽可能减小节点中断带来的影响。

如何通过主从复制技术来实现系统高可用呢?

从节点失效:追赶式恢复

从节点的本地磁盘上都保存了副本收到的数据变更日志。如果从节点发生崩溃,然后顺利重启,或者主从节点之间的网络发生暂时中断(闪断),则恢复比较容易,根据副本的复制日志,从节点可以知道在发生故障之前所处理的最后一笔事务,然后连接到主节点,并请求自那笔事务之后中断期间内所有的数据变更。在收到这些数据变更日志之后,将其应用到本地来追赶主节点。之后就和正常情况一样持续接收来自主节点数据流的变化。

主节点失效:节点切换

选择某个从节点将其提升为主节点;客户端也需要更新,这样之后的写请求会发送给新的主节点,然后其他从节点要接受来自新的主节点上的变更数据,这一过程称之为切换。

故障切换可以手动进行,例如通知管理员主节点发生失效,采取必要的步骤来创建新的主节点;或者以自动方式进行。自动切换的步骤通常如下:

  1. 确认主节点失效。有很多种出错可能性,很难准确检测出问题的原因,所以大多数系统都采用了基于超时的机制:节点间频繁地互相发生发送心跳悄息,如果发现某一个节点在一段比较长时间内(例如 30s )没有响应,即认为该节点发生失效。
  2. 选举新的主节点。可以通过选举的方式(超过多数的节点达成共识)来选举新的主节点,或者由之前选定的某控制节点来指定新的主节点。候选节点最好与原主节点的数据差异最小,这样可以最小化数据丢失的风险。让所有节点同意新的主节点是个典型的共识问题。
  3. 重新配置系统使新主节点生效。客户端现在需要将写请求发送给新的主节点。如果原主节点之后重新上线,可能仍然自认为是主节点,而没有意识到其他节点已经达成共识迫使其下台。这时系统要确保原主节点降级为从节点,并认可新的主节点。

上述切换过程依然充满了很多变数:

  • 如果使用了异步复制,且失效之前,新的主节点并未收到原主节点的所有数据;在选举之后,原主节点很快又重新上线并加入到集群,接下来的写操作会发生什么?新的主节点很可能会收到冲突的写请求,这是因为原主节点未意识的角色变化,还会尝试同步其他从节点,但其中的一个现在已经接管成为现任主节点。常见的解决方案是,原主节点上未完成复制的写请求就此丢弃,但这可能会违背数据更新持久化的承诺。
  • 如果在数据库之外有其他系统依赖于数据库的内容并在一起协同使用,丢弃数据的方案就特别危险。例如,在 GitHub 的一个事故中,某个数据并非完全同步的 MySQL 从节点被提升为主副本,数据库使用了自增计数器将主键分配给新创建的行,但是因为新的主节点计数器落后于原主节点( 即二者并非完全同步),它重新使用了已被原主节点分配出去的某些主键,而恰好这些主键已被外部 Redis 所引用,结果出现 MySQL 和 Redis 之间的不一致,最后导致了某些私有数据被错误地泄露给了其他用户。
  • 在某些故障情况下,可能会发生两个节点同时-都自认为是主节点。这种情况被称为脑裂,它非常危险:两个主节点都可能接受写请求,并且没有很好解决冲突的办法,最后数据可能会丢失或者破坏。作为一种安全应急方案,有些系统会采取措施来强制关闭其中一个节点。然而,如果设计或者实现考虑不周,可能会出现两个节点都被关闭的情况。
  • 如何设置合适的超时来检测主节点失效呢? 主节点失效后,超时时间设置得越长也意味着总体恢复时间就越长。但如果超时设置太短,可能会导致很多不必要的切换。例如,突发的负载峰值会导致节点的响应时间变长甚至超肘,或者由于网络故障导致延迟增加。如果系统此时已经处于高负载压力或网络已经出现严重拥塞,不必要的切换操作只会使总体情况变得更糟。

复制日志的实现

基于语句的复制

最简单的情况,主节点记录所执行的每个写请求(操作语句)井将该操作语句作为日志发送给从节点。对于关系数据库,这意味着每个 INSERT 、UPDATE 或 DELETE 语句都会转发给从节点,并且每个从节点都会分析井执行这些 SQU 吾句,如同它们是来自客户端那样。

听起来很合理也不复杂,但这种复制方式有一些不适用的场景:

  • 任何调用非确定性函数的语句,如 NOW() 获取当前时间,或 RAND() 获取一个随机数等,可能会在不同的副本上产生不同的值。
  • 如果语句中使用了自增列,或者依赖于数据库的现有数据(例如,UPDATE ... WHERE <某些条件>),则所有副本必须按照完全相同的顺序执行,否则可能会带来不同的结果。进而,如果有多个同时并发执行的事务时,会有很大的限制。
  • 有副作用的语句(例如,触发器、存储过程、用户定义的函数等),可能会在每个副本上产生不同的副作用。

有可能采取一些特殊措施来解决这些问题,例如,主节点可以在记录操作语句时将非确定性函数替换为执行之后的确定的结果,这样所有节点直接使用相同的结果值。但是,这里面存在太多边界条件需要考虑,因此目前通常首选的是其他复制实现方案。

MySQL 5.1 版本之前采用基于操作语句的复制。现在由于逻辑紧凑,依然在用,但是默认情况下,如果语句中存在一些不确定性操作,则 MySQL 会切换到基于行的复制(稍后讨论)。VoltDB 使用基于语句的复制,它通过事务级别的确定性来保证复制的安全。

基于预写日志(WAL)传输

通常每个写操作都是以追加写的方式写入到日志中:

  • 对于日志结构存储引擎,日志是主要的存储方式。日志段在后台压缩井支持垃圾回收。
  • 对于采用覆写磁盘的 BTree 结构,每次修改会预先写入日志,如系统发生崩溃,通过索引更新的方式迅速恢复到此前一致状态。

不管哪种情况,所有对数据库写入的字节序列都被记入日志。因此可以使用完全相同的日志在另一个节点上构建副本:除了将日志写入磁盘之外,主节点还可以通过网络将其发送给从节点。

PostgreSQL 、Oracle 以及其他系统等支持这种复制方式。其主要缺点是日志描述的数据结果非常底层:一个 WAL 包含了哪些磁盘块的哪些字节发生改变,诸如此类的细节。这使得复制方案和存储引擎紧密耦合。如果数据库的存储格式从一个版本改为另一个版本,那么系统通常无能支持主从节点上运行不同版本的软件。

看起来这似乎只是个有关实现方面的小细节,但可能对运营产生巨大的影响。如果复制协议允许从节点的软件版本比主节点更新,则可以实现数据库软件的不停机升级:首先升级从节点,然后执行主节点切换,使升级后的从节点成为新的主节点。相反,复制协议如果要求版本必须严格一致(例如 WALf 专输),那么就势必以停机为代价。

基于行的逻辑日志复制

如果复制和存储引擎采用不同的日志格式,这样复制与存储的逻辑就可以剥离。这种复制日志称为逻辑日志,以区分物理存储引擎的数据表示。

关系数据库的逻辑日志通常是指一系列记录来描述数据表行级别的写请求:

  • 对于行插入,日志包含所有相关列的新值。
  • 对于行删除,日志里有足够的信息来唯一标识已删除的行,通常是靠主键,但如果表上没有定义主键,就需要记录所有列的旧值。
  • 对于行更新,日志包含足够的信息来唯一标识更新的行,以及所有列的新值(或至少包含所有已更新列的新值)。

如果一条事务涉及多行的修改,则会产生多个这样的日志记录,并在后面跟着一条记录,指出该事务已经提交。MySQL 的二进制日志 binlog (当配置为基于行的复制时)使用该方式。

由于逻辑日志与存储引擎逻辑解耦,因此可以更容易地保持向后兼容,从而使主从节点能够运行不同版本的软件甚至是不同的存储引擎。

对于外部应用程序来说,逻辑日志格式也更容易解析。

基于触发器的复制

在某些情况下,我们可能需要更高的灵活性。例如,只想复制数据的一部分,或者想从一种数据库复制到另一种数据库,或者需要订制、管理冲突解决逻辑,则需要将复制控制交给应用程序层。

有一些工具,可以通过读取数据库日志让应用程序获取数据变更。另一种方法则是借助许多关系数据库都支持的功能:触发器和存储过程。

触发器支持注册自己的应用层代码,使得当数据库系统发生数据更改(写事务)时自动执行上述自定义代码。通过触发器技术,可以将数据更改记录到一个单独的表中,然后外部处理逻辑访问该表,实施必要的自定义应用层逻辑,例如将数据更改复制到另一个系统。Oracle 的 Databus 和 Postgres 的 Bucardo 就是这种技术的典型代表。基于触发器的复制通常比其他复制方式开销更高,也比数据库内置复制更容易出错,或者暴露一些限制。然而,其高度灵活性仍有用武之地。

复制滞后问题

主从复制要求所有写请求都经由主节点,而任何副本只能接受只读查询。对于读操作密集的负载(如 Web ),这是一个不错的选择:创建多个从副本,将读请求分发给这些从副本,从而减轻主节点负载井允许读取请求就近满足。

在这种扩展体系下,只需添加更多的从副本,就可以提高读请求的服务吞吐量。但是,这种方法实际上只能用于异步复制,如果试图同步复制所有的从副本,则单个节点故障或网络中断将使整个系统无法写入。而且节点越多,发生故障的概率越高,所以完全同步的配置现实中反而非常不可靠。

不幸的是,如果一个应用正好从一个异步的从节点读取数据,而该副本落后于主节点,则应用可能会读到过期的信息。这会导致数据库中出现明显的不一致:由于并非所有的写入都反映在从副本上,如果同时对主节点和从节点发起相同的查询,可能会得到不同的结果。经过一段时间之后,从节点最终会赶上并与主节点数据保持一致。这种效应也被称为最终一致性

写后读一致性

许多应用让用户提交一些数据,接下来查看他们自己所提交的内容。例如客户数据库中的记录,亦或者是讨论主题的评论等。提交新数据须发送到主节点,但是当用户读取数据时,数据可能来自从节点。这对于读多写少的场景是个非常合适的方案。

然而对于异步复制存在这样一个问题,如图所示,用户在写入不久即查看数据,则新数据可能尚未到达从节点。对用户来讲,看起来似乎是刚刚提交的数据丢失了,显然用户不会高兴。

img

对于这种情况,我们需要读写一致性。该机制保证如果用户重新加载页面,他们总能看到自己最近提交的更新。但对其他用户则没有任何保证,这些用户的更新可能会在稍后才能刷新看到。如何实现呢?有以下几种可行性方案:

  • 如果用户访问可能会被修改的内容,从主节点读取; 否则,在从节点读取。这背后就要求有一些方法在实际执行查询之前,就已经知道内容是否可能会被修改。例如,社交网络上的用户首页信息通常只能由所有者编辑,而其他人无法编辑。因此,这就形成一个简单的规则:总是从主节点读取用户自己的首页配置文件,而在从节点读取其他用户的配置文件。
  • 如果应用的大部分内容都可能被所有用户修改,那么上述方法将不太有效,它会导致大部分内容都必须经由主节点,这就丧失了读操作的扩展性。此时需要其他方案来判断是否从主节点读取。例如,跟踪最近更新的时间,如果更新后一分钟之内,则总是在主节点读取;井监控从节点的复制滞后程度,避免从那些滞后时间超过一分钟的从节点读取。
  • 客户端还可以记住最近更新时的时间戳,井附带在读请求中,据此信息,系统可以确保对该用户提供读服务时都应该至少包含了该时间戳的更新。如果不够新,要么交由另一个副本来处理,要么等待直到副本接收到了最近的更新。时间戳可以是逻辑时间戳(例如用来指示写入顺序的日志序列号)或实际系统时钟(在这种情况下,时钟同步又称为一个关键点)。
  • 如果副本分布在多数据中心(例如考虑与用户的地理接近,以及高可用性),情况会更复杂些。必须先把请求路由到主节点所在的数据中心(该数据中心可能离用户很远)。

如果同一用户可能会从多个设备访问数据,情况会更加复杂。此时,要提供跨设备的写后读一致性,即如果用户在某设备上输入了一些信息然后在另一台设备商查看,也应该看到刚刚所输入的内容。在这种情况下,还有一些需要考虑的问题:

  • 记住用户上次更新时间戳的方法实现起来会比较困难,因为在一台设备上运行的代码完全无法知道在其他设备上发生了什么。此时,元数据必须做到全局共享。
  • 如果副本分布在多数据中心,无法保证来自不同设备的连接经过路由之后都到达同一个数据中心。例如,用户的台式计算机使用了家庭宽带连接,而移动设备则使用蜂窝数据网络,不同设备的网络连接线路可能完全不同。如果方案要求必须从主节点读取,则首先需要想办毡确保将来自不同设备的请求路由到同一个数据中心。

单调读

假定用户从不同副本进行了多次读取,如图所示,用户刷新一个网页,读请求可能被随机路由到某个从节点。用户 2345 先后在两个从节点上执行了两次完全相同的查询(先是少量滞后的节点,然后是滞后很大的从节点),则很有可能出现以下情况。第一个查询返回了最近用户 1234 所添加的评论,但第二个查询因为滞后的原因,还没有收到更新因而返回结果是空。用户看到了最新内容之后又读到了过期的内容,好像时间被回拨,此时需要单调读一致性。

img

单调读一致性可以确保不会发生这种异常。这是一个比强一致性弱,但比最终一致性强的保证。当读取数据时,单调读保证,如果某个用户依次进行多次读取,则他绝不会看到回攘现象,即在读取较新值之后又发生读旧值的情况。

实现单调读的一种方式是,确保每个用户总是从固定的同一副本执行读取(而不同的用户可以从不同的副本读取)。例如,基于用户 ID 的哈希的方怯而不是随机选择副本。但如果该副本发生失效,则用户的查询必须重新路由到另一个副本。

前缀一致读

前缀一致读:对于一系列按照某个顺序发生的写请求,那么读取这些内容时也会按照当时写入的顺序。

如果数据库总是以相同的顺序写入,则读取总是看到一致的序列,不会发生这种反常。然而,在许多分布式数据库中,不同的分区独立运行,因此不存在全局写入顺序。这就导致当用户从数据库中读数据时,可能会看到数据库的某部分旧值和另一部分新值。

img

一个解决方案是确保任何具有因果顺序关系的写入都交给一个分区来完成,但该方案真实实现效率会大打折扣。现在有一些新的算法来显式地追踪事件因果关系。

复制滞后的解决方案

使用最终一致性系统时,最好事先就思考这样的问题:如果复制延迟增加到几分钟甚至几小时,那么应用层的行为会是什么样子?如果这种情况不可接受,那么在设计系统肘,就要考虑提供一个更强的一致性保证,比如写后读; 如果系统设计时假定是同步复制,但最终它事实上成为了异步复制,就可能会导致灾难性后果。

在应用层可以提供比底层数据库更强有力的保证。例如只在主节点上进行特定类型的读取,而代价则是,应用层代码中处理这些问题通常会非常复杂,且容易出错。

如果应用程序开发人员不必担心这么多底层的复制问题,而是假定数据库在“做正确的事情”,情况就变得很简单。而这也是事务存在的原因,事务是数据库提供更强保证的一种方式。

单节点上支持事务已经非常成熟。然而,在转向分布式数据库(即支持复制和分区)的过程中,有许多系统却选择放弃支持事务,并声称事务在性能与可用性方面代价过高,所以选择了最终一致性。

多主复制

主从复制方法较为常见,但存在一个明显的缺点:系统只有一个主节点,而所有写入都必须经由主节点。如果由于某种原因,例如与主节点之间的网络中断而导致主节点无法连接,主从复制方案就会影响所有的写入操作。

对主从复制模型进行自然的扩展,则可以配置多个主节点,每个主节点都可以接受写操作,后面复制的流程类似:处理写的每个主节点都必须将该数据更改转发到所有其他节点。这就是多主节点( 也称为主-主,或主动/主动)复制。此时,每个主节点还同时扮演其他主节点的从节点。

适用场景

在一个数据中心内部使用多主节点基本没有太大意义,其复杂性已经超过所能带来的好处。

但是,以下场景这种配置则是合理的:

  • 多数据中心
  • 离线客户端操作
  • 协作编辑

多数据中心

为了容忍整个数据中心级别故障或者更接近用户,可以把数据库的副本横跨多个数据中心。而如果使用常规的基于主从的复制模型,主节点势必只能放在其中的某一个数据中心,而所有写请求都必须经过该数据中心。

有了多主节点复制模型,则可以在每个数据中心都配置主节点。在每个数据中心内,采用常规的主从复制方案;而在数据中心之间,由各个数据中心的主节点来负责同其他数据中心的主节点进行数据的交换、更新。

img

部署单主节点的主从复制方案与多主复制方案之间的差异

  • 性能:对于主从复制,每个写请求都必须经由广域网传送至主节点所在的数据中心。这会大大增加写入延迟,井基本偏离了采用多数据中心的初衷(即就近访问)。而在多主节点模型中,每个写操作都可以在本地数据中心快速响应,然后采用异步复制方式将变化同步到其他数据中心。因此,对上层应用有效屏蔽了数据中心之间的网络延迟,使得终端用户所体验到的性能更好。
  • 容忍数据中心失效:对于主从复制,如果主节点所在的数据中心发生故障,必须切换至另一个数据中心,将其中的一个从节点被提升为主节点。在多主节点模型中,每个数据中心则可以独立于其他数据中心继续运行,发生故障的数据中心在恢复之后更新到最新状态。
  • 容忍网络问题:数据中心之间的通信通常经由广域网,它往往不如数据中心内的本地网络可靠。对于主从复制模型,由于写请求是同步操作,对数据中心之间的网络性能和稳定性等更加依赖。多主节点模型则通常采用异步复制,可以更好地容忍此类问题,例如临时网络闪断不会妨碍写请求最终成功。

有些数据库己内嵌支持了多主复制,但有些则借助外部工具来实现,例如 MySQL 的 Tungsten Replicator,PostgreSQL BDR 以及 Oracle GoldenGate。

多主复制的缺点:不同的数据中心可能会同时修改相同的数据,因而必须解决潜在的写冲突。

离线客户端操作

另一种多主复制比较适合的场景是,应用在与网络断开后还需要继续工作。在离线状态下进行的任何更改,会在设备下次上线时,与服务器一级其他设备同步。

这种情况下,每个设备都有一个充当主节点的本地数据库(用来接受写请求),然后在所有设备之间采用异步方式同步这些多主节点上的副本,同步滞后可能是几小时或者数天,具体时间取决于设备何时可以再次联网。

从架构层面来看,上述设置基本上等同于数据中心之间的多主复制,只不过是个极端情况,即一个设备就是数据中心,而且它们之间的网络连接非常不可靠。多个设备同步日历的例子表明,多主节点可以得到想要的结果,但中间过程依然有很多的未知数。

有一些工具可以使多主配置更为容易,如 CouchDB 就是为这种操作模式而设计的。

协作编辑

实时协作编辑应用程序允许多个用户同时编辑文档。例如,Etherpad 和 Google Docs 允许多人同时编辑文本文档或电子表格。

我们通常不会将协作编辑完全等价于数据库复制问题,但二者确实有很多相似之处。当一个用户编辑文档时· ,所做的更改会立即应用到本地副本( Web 浏览器或客户端应用程序),然后异步复制到服务器以及编辑同一文档的其他用户。

如果要确保不会发生编辑冲突,则应用程序必须先将文档锁定,然后才能对其进行编辑。如果另一个用户想要编辑同一个文档,首先必须等到第一个用户提交修改并释放锁。这种协作模式相当于主从复制模型下在主节点上执行事务操作。

为了加快协作编辑的效率,可编辑的粒度需要非常小。例如,单个按键甚至是全程无锁。然而另一方面,也会面临所有多主复制都存在的挑战,即如何解决冲突。

处理写冲突

多主复制的最大问题是可能发生写冲突。

例如,两个用户同时编辑 Wiki 页面,如图所示。用户 1 将页面的标题从 A 更改为 B,与此同时用户 2 将标题从 A 更改为 C。每个用户的更改都成功地提交到本地主节点。但是,当更改被异步复制到对方时,会发现存在冲突。注意:正常情况下的主从复制不会出现这种情况。

img

同步与异步冲突检测

如果是主从复制数据库,第二个写请求要么会被阻塞直到第一个写完成,要么被中止(用户必须重试) 。然而在多主节点的复制模型下,这两个写请求都是成功的,井且只能在稍后的时间点上才能异步检测到冲突,那时再要求用户层来解决冲突为时已晚。

理论上,也可以做到同步冲突检测,即等待写请求完成对所有副本的同步,然后再通知用户写入成功。但是,这样做将会失去多主节点的主要优势:允许每个主节点独立接受写请求。如果确实想要同步方式冲突检测,或许应该考虑采用单主节点的主从复制模型。

避免冲突

处理冲突最理想的策略是避免发生冲突,即如果应用层可以保证对特定记录的写请求总是通过同一个主节点,这样就不会发生写冲突。现实中,由于不少多主节点复制模型所实现的冲突解决方案存在瑕疵,因此,避免冲突反而成为大家普遍推荐的首选方案。

但是,有时可能需要改变事先指定的主节点,例如由于该数据中心发生故障,不得不将流量重新路由到其他数据中心,或者是因为用户已经漫游到另一个位置,因而更靠近新数据中心。此时,冲突避免方式不再有效,必须有措施来处理同时写入冲突的可能性。

收敛于一致状态

对于主从复制模型,数据更新符合顺序性原则,即如果同一个字段有多个更新,则最后一个写操作将决定该字段的最终值。

对于多主节点复制模型,由于不存在这样的写入顺序,所以最终值也会变得不确定。

实现收敛的冲突解决有以下可能的方式:

  • 给每个写入分配唯一的 ID ,例如,一个时间戳,一个足够长的随机数,一个 UUID 或者一个基于键-值的哈希,挑选最高 ID 的写入作为胜利者,并将其他写入丢弃。如果基于时间戳,这种技术被称为最后写入者获胜。虽然这种方法很流行,但是很容易造成数据丢失。
  • 为每个副本分配一个唯一的 ID ,井制定规则,例如序号高的副本写入始终优先于序号低的副本。这种方法也可能会导致数据丢失。
  • 以某种方式将这些值合并在一起。例如,按字母顺序排序,然后拼接在一起。
  • 利用预定义好的格式来记录和保留冲突相关的所有信息,然后依靠应用层的逻辑,事后解决冲突(可能会提示用户) 。

自定义冲突解决逻辑

解决冲突最合适的方式可能还是依靠应用层,所以大多数多主节点复制模型都有工具来让用户编写应用代码来解决冲突。可以在写入时或在读取时执行这些代码逻辑:

  • 在写入时执行:只要数据库系统在复制变更日志时检测到冲突,就会调用应用层的冲突处理程序。
  • 在读取时执行:当检测到冲突时,所有冲突写入值都会暂时保存下来。下一次读取数据时,会将数据的多个版本读返回给应用层。应用层可能会提示用户或自动解决冲突,井将最后的结果返回到数据库。

注意,冲突解决通常用于单个行或文档,而不是整个事务。因此,如果有一个原子事务包含多个不同写请求,每个写请求仍然是分开考虑来解决冲突。

拓扑结构

如果存在两个以上的主节点,则存在多种可能的复制拓扑结构。

img

最常见的拓扑结构是全部-至-全部,每个主节点将其写入同步到其他所有主节点。而其他一些拓扑结构也有普遍使用,例如,默认情况下 MySQL 只支持环形拓扑结构,其中的每个节点接收来自前序节点的写入,并将这些写入(加上自己的写入)转发给后序节点。另一种流行的拓扑是星形结构:一个指定的根节点将写入转发给所有其他节点。星形拓扑还可以推广到树状结构。

在环形和星形拓扑中,写请求需要通过多个节点才能到达所有的副本,即中间节点需要转发从其他节点收到的数据变更。为防止无限循环,每个节点需要赋予一个唯一的标识符,在复制日志中的每个写请求都标记了已通过的节点标识符。如果某个节点收到了包含自身标识符的数据更改,表明该请求已经被处理过,因此会忽略此变更请求,避免重复转发

环形和星形拓扑的问题是,如果某一个节点发生了故障,在修复之前,会影响其他节点之间复制日志的转发。可以采用重新配置拓扑结构的方法暂时排除掉故障节点。在大多数部署中,这种重新配置必须手动完成。而对于链接更密集的拓扑(如全部到全部),消息可以沿着不同的路径传播,避免了单点故障,因而有更好的容错性。

但另一方面,全链接拓扑也存在一些自身的问题。主要是存在某些网络链路比其他链路更快的情况(例如由于不同网络拥塞),从而导致复制日志之间的覆盖。

如下图所示,客户端 A 向主节点 1 的表中首先插入一行,然后客户端 B 在主节点 3 上对行记录进行更新。而在主节点 2 上,由于网络原因可能出现意外的写日志复制顺序,例如它先接收到了主节点 3 的更新日志,之后才接收到主节点 1 的插入日志。

img

这里涉及到一个因果关系问题,类似于在前面“前缀一致读”所看到的:更新操作一定是依赖于先前完成的插入,因此我们要确保所有节点上一定先接收插入日志,然后再处理更新。在每笔写日志里简单地添加时间戳还不够,主要因为无法确保时钟完全同步,因而无法在主节点 2 上正确地排序所收到日志。

为了使得日志消息正确有序,可以使用一种称为版本向量的技术,稍后将讨论这种技术(参见“检测并发写入”)。需要指出,冲突检测技术在许多多主节点复制系统中的实现还不够完善。

无主复制

单主节点和多主节点复制,都是基于这样一种核心思路,即客户端先向某个节点(主节点)发送写请求,然后数据库系统负责将写请求复制到其他副本。由主节点决定写操作的顺序,从节点按照相同的顺序来应用主节点所发送的写日志。

一些数据存储系统则采用了不同的设计思路:选择放弃主节点,允许任何副本直接接受来自客户端的写请求。对于某些无主节点系统实现,客户端直接将其写请求发送到多副本,而在其他一些实现中,由一个协调者节点代表客户端进行写人,但与主节点的数据库不同,协调者井不负责写入顺序的维护。

节点失效时写入数据库

假设一个三副本数据库,其中一个副本当前不可用。在基于主节点复制模型下,如果要继续处理写操作,则需要执行切换操作。

对于无主节点配置,则不存在这样的切换操作。用户将写请求并行发送到三个副本,有两个可用副本接受写请求,而不可用的副本无法处理该写请求。如果假定三个副本中有两个成功确认写操作,用户收到两个确认的回复之后,即可认为写入成功。客户完全可以忽略其中一个副本无法写入的情况。

img

失效的节点之后重新上线,而客户端又开始从中读取内容。由于节点失效期间发生的任何写入在该节点上都尚未同步,因此读取可能会得到过期的数据。

为了解决这个问题,当一个客户端从数据库中读取数据时,它不是向一个副本发送请求,而是并行地发送到多个副本。客户端可能会得到不同节点的不同响应,包括某些节点的新值和某些节点的旧值。可以采用版本号技术确定哪个值更新(参见后面的“检测并发写入”)。

读修复与反熵

复制模型应确保所有数据最终复制到所有的副本。当一个失效的节点重新上线之后,如何赶上中间错过的那些写请求呢?

有以下两种机制:

  • 读修复 - 当客户端井行读取多个副本时,可以检测到过期的返回值。然后将新值写入到过期的副本中。这种方法主要适合那些被频繁读取的场景。
  • 反熵 - 利用后台进程不断查找副本间的数据差异,将任何缺少的数据从一个副本复制到另一个副本。与基于主节点复制的复制日志不同,反熵过程并不保证以特定的顺序复制写入,并且会引入明显的同步滞后。

读写 quorum

我们知道,成功的写操作要求三个副本中至少两个完成,这意味着至多有一个副本可能包含旧值。因此,在读取时需要至少向两个副本发起读请求,通过版本号可以确定一定至少有一个包含新值。如果第三个副本出现停机或响应缓慢,则读取仍可以继续并返回最新值。

把上述道理推广到一般情况,如果有 n 个副本,写入需要 w 个节点确认,读取必须至少查询 r 个节点,则只要 w + r > n,读取的节点中一定会包含最新值。例如在前面的例子中,n = 3,w = 2,r = 2。满足上述这些 r、w 值的读/写操作称之为法定票数读或法定票数写。也可以认为 r 和 w 是用于判定读、写是否有效的最低票数。

参数 n、w 和 r 通常是可配置的,一个常见的选择是设置 n 为某奇数,w = r = (n + 1) / 2(向上舍入)。也可以根据自己的需求灵活调整这些配置。例如,对于读多写少的负载,设置 w = n 和 r = 1 比较合适,这样读取速度更快,但是一个失效的节点就会使得数据库所有写入因无法完成 quorum 而失败。

quorum 一致性的局限性

通常,设定 r 和 w 为简单多数(多于 n / 2)节点,即可确保 w + r > n,且同时容忍多达 n / 2 个节点故障。但是,quorum 不一定非得是多数,读和写的节点集中有一个重叠的节点才是最关键的

也可以将 w 和 r 设置为较小的数字,从而让 w + r <= n。此时,读取和写入操作仍会被发送到 n 个节点,但只需等待更少的节点回应即可返回。

由于 w 和 r 配置的节点数较小,读取请求当中可能恰好没有包含新值的节点,因此最终可能会返回一个过期的旧值。好的一方面是,这种配置可以获得更低的延迟和更高的可用性,例如网络中断,许多副本变得无法访问,相比而言有更高的概率继续处理读取和写入。只有当可用的副本数已经低于 w 或 r 时,数据库才会变得无法读/写,即处于不可用状态。

即使在 w + r > n 的情况下,也可能存在返回旧值的边界条件。这主要取决于具体实现,可能的情况包括:

  • 如果采用了 sloppy quorum(参阅后面的“宽松的 quorum 与数据回传”),写操作的 w 节点和读取的 r 节点可能完全不同,因此无法保证读写请求一定存在重叠的节点。
  • 如果两个写操作同时发生,则无法明确先后顺序。这种情况下,唯一安全的解决方案是合并并发写入(参见前面的“处理写冲突”)。如果根据时间戳挑选胜者,则由于时钟偏差问题,某些写入可能会被错误地抛弃。
  • 如果写操作与读操作同时发生,写操作可能仅在一部分副本上完成。此时,读取时返回旧值还是新值存在不确定性。
  • 如果某些副本上已经写入成功,而其他一些副本发生写入失败(例如磁盘已满),且总的成功副本数少于 w,那些已成功的副本上不会做回滚。这意味着尽管这样的写操作被视为失败,后续的读操作仍可能返回新值。
  • 如果具有新值的节点后来发生失效,但恢复数据来自某个旧值,则总的新值副本数会低于 w,这就打破了之前的判定条件。
  • 即使一切工作正常,也会出现一些边界情况,如一致性与共识中所介绍的“可线性化与 quorum”。

建议最好不要把参数 w 和 r 视为绝对的保证,而是一种灵活可调的读取新值的概率。

这里通常无法得到前面的“复制滞后问题”中所罗列的一致性保证,包括写后读、单调读、前缀一致读等,因此前面讨论种种异常同样会发生在这里。如果确实需要更强的保证,需要考虑事务与共识问题。

宽松的 quorum 与数据回传

quorum 并不总如期待的那样提供高容错能力。一个网络中断可以很容易切断一个客户端到多数数据库节点的连接。尽管这些集群节点是活着的,而且其他客户端也确实可以正常连接,但是对于断掉连接的客户端来讲,情况无疑等价于集群整体失效。这种情况下,很可能无法满足最低的 w 和 r 所要求的节点数,因此导致客户端无法满足 quorum 要求。

在一个大规模集群中(节点数远大于 n 个),客户可能在网络中断期间还能连接到某些数据库节点,但这些节点又不是能够满足数据仲裁的那些节点。此时,我们是否应该接受该写请求,只是将它们暂时写入一些可访问的节点中?(这些节点并不在 n 个节点集合中)。

这种方案称之为宽松的仲裁:写入和读取仍然需要 w 和 r 个成功的响应,但包含了那些并不在先前指定的 n 个节点。一旦网络问题得到解决,临时节点需要把接收到的写入全部发送到原始主节点上。这就是所谓的数据回传。

可以看出,sloppy quorum 对于提高写入可用性特别有用:要有任何 w 个节点可用,数据库就可以接受新的写入。然而这意味着,即使满足 w + r > n,也不能保证在读取某个键时,一定能读到最新值,因为新值可能被临时写入 n 之外的某些节点且尚未回传过来。

检测并发写

无主复制数据库允许多个客户端对相同的主键同时发起写操作,即使采用严格的 quorum 机制也可能会发生写冲突。这与多主复制类似,此外,由于读时修复或者数据回传也会导致并发写冲突。

一个核心问题是,由于网络延迟不稳定或者局部失效,请求在不同的节点上可能会呈现不同的顺序。如图所示,对于包含三个节点的数据系统,客户端 A 和 B 同时向主键 X 发起写请求:

img

  • 节点 1 收到来自客户端 A 的写请求,但由于节点失效,没有收到客户端 B 的写请求。
  • 节点 2 首先收到 A 的写请求,然后是 B 的写请求。
  • 节点 3 首先收到 B 的写请求,然后是 A 的写请求。

如果节点每当收到新的写请求时就简单地覆盖原有的主键,那么这些节点将永久无法达成一致。我们知道副本应该收敛于相同的内容,这样才能达成最终一致。但如何才能做到呢?如果不想丢失数据,必须了解很多关于数据库内部冲突处理的机制。

我们已经在前面的“处理写冲突”简要介绍了一些解决冲突的技巧,现在我们来更详细地探讨这个问题。

最后写入者获胜(丢弃并发写入)

一种实现最终收敛的方法是,每个副本总是保存最新值,允许覆盖并丢弃旧值。那么,假定每个写请求都最终同步到所有副本,只要我们有一个明确的方法来确定哪一个写入是最新的,则副本可以最终收敛到相同的值。

这个想法其实有些争议,关键点在于前面所提到关于如何定义“最新”。不过即使无法确定写请求的“自然顺序”,我们可以强制对其排序。例如,为每个写请求附加一个时间戳,然后选择最新即最大的时间戳,丢弃较早时间戳的写入。这个冲突解决算法被称为最后写入者获胜(last write wins,LWW)。

LWW 可以实现最终收敛的目标,但是以牺牲数据持久性为代价。如果同一个主键有多个并发写,即使这些并发写都向客户端报告成功,但最后只有一个写入值会存活下来,其他的将被系统默默丢弃。在一些场景如缓存系统,覆盖写是可以接受的。如果覆盖、丢失数据不可接受,则 LWW 并不是解决冲突很好的选择。

要确保 LWW 安全无副作用的唯一方法是,只写入一次然后写入值视为不可变,这样就避免了对同一个主键的并发写。例如,Cassandra 的一个推荐使用方法就是采用 UUID 作为主键,这样每个写操作都针对的不同的、系统唯一的主键。

Happens-before 关系和并发

如果 B 知道 A,或者依赖于 A,或者以某种方式在 A 基础上构建,则称操作 A 在操作 B 之前发生。这是定义何为并发的关键。事实上,我们也可以简单地说,如果两个操作都不在另一个之前发生,那么操作是并发的。

因此,对于两个操作 A 和 B,一共存在三种可能性,我们需要的是一个算法来判定两个操作是否并发。如果一个操作发生在另一个操作之前,则后面的操作可以覆盖较早的操作。如果属于并发,就需要解决潜在的冲突问题。

确定前后关系

我们来看一个确定操作并发性的算法,即两个操作究竟属于并发还是一个发生在另一个之前。简单起见,我们先从只有一个副本的数据库开始。

下图的例子是两个客户端同时向购物车添加商品。初始时购物车为空。然后两个客户端向数据库共发出五次写入操作:

img

  1. 客户端 1 首先将牛奶加入购物车。这是第一次写入该主键的值,服务器保存成功然后分配版本 1,服务器将值与版本号一起返回给该客户端 1。
  2. 客户端 2 将鸡蛋加入购物车,此时它并不知道客户端 1 已添加了牛奶,而是认为鸡蛋是购物车中的唯一物品。服务器为此写入并分配版本 2,然后将鸡蛋和牛奶存储为两个单独的值,最后将这两个值与版本号 2 返回给客户端 2。
  3. 客户端 1 也并不意识上述步骤 2,想要将面粉加入购物车,且以为购物车的内容应该是[牛奶,面粉],将此值与版本号 1 一起发送到服务器。服务器可以从版本号中知道[牛奶,面粉]的新值要取代先前值[牛奶],但值[鸡蛋]则是新的并发操作。因此,服务器将版本 3 分配给[牛奶,面粉]并覆盖版本 1 的[牛奶],同时保留版本 2 的值[鸡蛋],将二者同时返回给客户端 1。
  4. 客户端 2 想要加入火腿,也不知道客户端 1 刚刚加了面粉。其在最后一个响应中从服务器收到的两个值是[牛奶]和[蛋],现在合并这些值,并添加火腿形成一个新的值[鸡蛋,牛奶,火腿]。它将该值与前一个版本号 2 一起发送到服务器。服务器检测到版本 2 会覆盖[鸡蛋],但与[牛奶,面粉]是同时发生,所以设置为版本 4 并将所有这些值发送给客户端 2。
  5. 最后,客户端 1 想要加培根。它以前在版本 3 中从服务器接收[牛奶,面粉]和[鸡蛋],所以合并这些值,添加培根,并将最终值[牛奶,面粉,鸡蛋,培根]连同版本号 3 来覆盖[牛奶,面粉],但与[鸡蛋,牛奶,火腿]并发,所以服务器会保留这些并发值。

上面操作之间的数据流可以通过下图展示。箭头表示某个操作发生在另一个操作之前,即后面的操作“知道”或是“依赖”于前面的操作。在这个例子中,因为总有另一个操作同时进行,所以每个客户端都没有时时刻刻和服务器上的数据保持同步。但是,新版本值最终会覆盖旧值,且不会发生已写入值的丢失。

img

服务器判断操作是否并发的依据主要依靠对比版本号,而并不需要解释新旧值本身。算法的工作流程如下:

  • 服务器为每个主键维护一个版本号,每当主键新值写入时递增版本号,并将新版本号与写入的值一起保存。
  • 当客户端读取主键时,服务器将返回所有(未被覆盖的)当前值以及最新的版本号。且要求写之前,客户必须先发送读请求。
  • 客户端写主键,写请求必须包含之前读到的版本号、读到的值和新值合并后的集合。写请求的响应可以像读操作一样,会返回所有当前值,这样就可以像购物车例子那样一步步链接起多个写入的值。
  • 当服务器收到带有特定版本号的写入时,覆盖该版本号或更低版本的所有值(因为知道这些值已经被合并到新传入的值集合中),但必须保存更高版本号的所有值(因为这些值与当前的写操作属于并发)。

合并同时写入的值

一个简单的合并方法是基于版本号或时间戳来选择最后一个值,但这意味着会丢失部分数据。所以,需要在程序中额外做一些工作。在应用代码中合并非常复杂且容易出错,因此可以设计一些专门的数据结构来自动执行合并。例如,Riak 支持成为 CRDT 一系列数据结构,以合理的方式高效自动合并,包括支持删除标记。

版本矢量

使用单个版本号来捕获操作间的依赖关系,当多个副本同时接受写入时,这是不够的。因此我们需要为每个副本和每个主键均定义一个版本号。每个副本在处理写入时增加自己的版本号,并且跟踪从其他副本看到的版本号。通过这些信息来指示要覆盖哪些值,该保留哪些并发值。

所有副本的版本号集合成为版本矢量。

参考资料

DevOps 简介

什么是 DevOps

什么是 DevOps?DevOps 集文化理念、实践和工具于一身,它强调团队授权、跨团队沟通和协作以及技术自动化,其最终目标是优化质量和交付

DevOps 理念,旨在打破开发工程师和运维工程师的壁垒,强调两个团队合而为一,在产品的整个生命周期(从开发、测试、部署再到运维、运营)内相互协作,工程师不再限于单一职能。

DevOps 始于 2007 年左右,当时的开发和运维对传统的软件开发模式提出了担忧:在这种模式下,编写代码的开发人员与负责部署的运维人员分开工作。 DevOps 一词是开发(development)和运维(operations)这两个词的组合,反映了将二者合而为一的过程。

DevOps 如何工作

DevOps 团队包括在整个产品生命周期中协同工作的开发人员和运维人员,以提高软件部署的速度和质量。这是一种新的工作方式,一种文化转变,对团队及其工作的组织具有重要意义。

在 DevOps 模型下,开发和运维团队不再“孤立”。有时,这两个团队甚至会合并为一个团队,工程师在整个应用程序生命周期中工作,需要具备从开发、测试到部署和运维的复合型能力。

DevOps 团队使用工具来自动化和优化流程,这有助于提高可靠性。 DevOps 工具链可帮助团队处理重要的 DevOps 基础知识,包括持续集成、持续交付、自动化和协作。

DevOps 价值观也适用于开发以外的团队。如果 QA、安全团队也和开发、运维团队紧密地结合在一起,贯穿产品的整个生命周期。此时,安全成为了所有 DevOps 团队成员的工作重点,此时可以称为为 “DevSecOps”。

DevOps 的生命周期

由于 DevOps 的连续性,可以使用无限循环来展示 DevOps 生命周期的各个阶段是如何相互关联的。尽管看起来是按顺序流动的,但循环象征着在整个生命周期中始终保持持续迭代。

DevOps 生命周期由六个阶段组成,分别代表开发和运维所需的流程、功能和工具。在每个阶段,团队协作和沟通以保持一致性、速度和质量。

img

图片来自 https://www.tasksgrid.com/devops-guide/

DevOps 的优势

  • 速度:应用 DevOps 可以更频繁地发布可交付成果,并且质量和稳定性也更高。高效的迭代,可以根据客户和市场反馈进行快速响应,以适应市场变化,有效推动业务发展。
  • 促进协作:DevOps 的基础是开发和运维之间的协作文化,两个团队紧密协作,共同承担诸多责任,并将各自的工作流程相互融合。这有助于减少效率低下的工作,同时节约大家的时间。
  • 快速发布:提高发布的频率和速度,以便能够更快速地进行创新并完善产品。您发布新功能和修复错误的速度越快,就越能快速地响应客户需求并建立竞争优势。持续集成和持续交付是自动执行软件发布流程(从构建到部署)的两项实践经验。
  • 可靠性:持续集成和持续部署等实践可检验程序变更后,功能是否正常,是否安全,从而提高软件产品的交付质量。监控和日志记录可以帮助团队实时了解服务当前的运行状态。
  • 规模:大规模运行和管理您的基础设施及开发流程。自动化和一致性可在降低风险的同时,帮助您有效管理复杂或不断变化的系统。例如,基础设施即代码能够帮助您以一种可重复且更有效的方式来管理部署、测试和生产环境。
  • 安全性:通过将自动实施的合规性策略、精细控制和配置管理技术集成到敏捷开发和 DevOps 工作流程中,使得产品内置了安全性。

DevOps 工具

DevOps 各生命周期阶段都有合适的工具可以作为解决方案。它们通过提高协作效率、减少上下文切换、引入自动化以及实现可监控来全方位增强 DevOps 实践。

DevOps 工具链通常遵循两种模式:完整解决方案或开放式工具链。

  • 完整解决方案实现了端到端的交付,流程很完备,但是一般难以兼容、集成第三方工具。
  • 开放式工具链允许使用不同的工具进行定制。

这两种方法各有利弊。

这里列举一些常见的 DevOps 工具:

  • 项目管理Jira
  • 文档管理Confluence
  • 代码管理GitlabGithub
  • CI/CDGitlabJenkins
  • 容器
    • Docker 将应用程序与该程序的依赖,打包在一个文件里面。运行这个文件,就会生成一个虚拟容器。程序在这个虚拟容器里运行,就好像在真实的物理机上运行一样。有了 Docker,就不用担心环境问题。
    • Kubernetes 是谷歌开源的容器集群管理系统 是用于自动部署,扩展和管理 Docker 应用程序的开源系统,简称 K8S。
  • 日志
    • ELK 技术栈,通过数据采集工具 Logstack、Beats 套件、日志存储、解析服务 ElasticSearch、日志可视化工具 Kibnana,形成了一套完整的端到端日志解决方案,深受业界好评。
  • 监控
    • ELK 的技术栈比较成熟,应用范围也比较广,除了可用作监控系统外,还可以用作日志查询和分析。
    • Prometheus 的独特之处在于它采用了拉数据的方式,对业务影响较小,同时也采用了时间序列数据库存储,而且支持独有的 PromQL 查询语言,功能强大而且简洁。
    • Grafana 是流行的监控数据分析和可视化套件。
    • Graphite 是基于时间序列数据库存储的监控系统,并且提供了功能强大的各种聚合函数比如 sum、average、top5 等可用于监控分析,而且对外提供了 API 也可以接入其他图形化监控系统如 Grafana。
  • 链路追踪
    • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
    • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
    • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
    • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。
  • 负载均衡
    • Nginx 可以作为四层或七层负载均衡器。
    • LVS 可以作为四层负载均衡器。其负载均衡的性能要优于 Nginx。
    • HAProxy 可以作为 HTTP 和 TCP 负载均衡器。
    • F5 作为硬件负载均衡
    • A10 作为硬件负载均衡
  • 网关
    • Kong 是一个云原生、快速、可扩展和分布式的微服务抽象层(也称为 API 网关,API 中间件)。
    • Zuul 是 Netflix 开源的一个 API 网关,Zuul 在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。
  • 告警:短信、邮件、企业聊天软件、OA

参考资料

《消息队列高手课》笔记

为什么需要消息队列?

消息队列的应用

  • 异步处理
  • 系统解耦
  • 流量削峰
  • 系统间通信
  • 数据缓冲
  • 数据一致性

该如何选择消息队列?

  • 是否开源:这决定了能否商用,所以最为重要。
  • 社区活跃度越高越好:高社区活跃度,一般保证了低 Bug 率,因为大部分 Bug,已经有人遇到并解决了。
  • 技术生态适配性:客户端对各种编程语言的支持。比如:如果使用 MQ 的都是 Java 应用,那么 ActiveMQ、RabbitMQ、RocketMQ、Kafka 都可以。如果需要支持其他语言,那么 RMQ 比较合适,因为它支持的编程语言比较丰富。如果 MQ 是应用于大数据或流式计算,那么 Kafka 几乎是标配。如果是应用于在线业务系统,那么 Kafka 就不合适了,可以考虑 RabbitMQ、 RocketMQ 很合适。
  • 高可用:应用于线上的准入标准。
  • 高性能:具备足够好的性能,能满足绝大多数场景的性能要求。
  • 可靠传输

主流 MQ

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级 万级 十万级 十万级,略高于 RocketMQ
topic 数量对吞吐量的影响 topic 可以达到几百、几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 毫秒级 微秒级 毫秒级 毫秒级以内
可用性 高:基于主从架构实现高可用 同 ActiveMQ 非常高:分布式架构 非常高:分布式架构。每个数据都有多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到不丢失 同 RocketMQ
应用场景 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 适合在线业务 大数据、实时计算以及日志采集领域,应用最为广泛
流行度 不活跃 社区非常活跃 阿里出品,有非常活跃的中文社区 社区非常活跃
支持编程语言 非常多 Java Scala、Java
学习成本 采用 ErLang 开发,比较小众,不利于扩展和二次开发 采用 Java 开发,且贡献者多为中国人,容易读懂源码 使用 Scala 和 Java 开发,容易读懂源码
  • RabbitMQ
    • 优点
      • 支持的编程语言最多
      • 支持非常灵活的路由配置
    • 缺点
      • 对消息堆积的支持并不好
      • 性能差强人意
  • RocketMQ
    • 优点
      • 有着不错的性能,稳定性和可靠性
      • 支持事务
    • 缺点
      • 国外认同弱于其他流行 MQ
  • Kafka
    • 优点
      • 可靠、稳定、性能高
      • 技术生态最健全,尤其是在大数据和流计算领域
    • 缺点
      • 同步收发响应延时比较高,不太适合在线业务

消息模型:主题和队列有什么区别?

队列模型

队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作。

早期的消息队列,就是按照“队列”的数据结构来设计的。生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。

如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息。此时,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

发布 订阅模型(Publish-Subscribe Pattern)

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

队列模型和发布订阅模型最大的区别就是:一份消息数据能不能被消费多次的问题

RabbitMQ 的消息模型

在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。

img

同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能。

RocketMQ 的消息模型

RocketMQ 使用的消息模型是标准的发布 - 订阅模型。但是,在 RocketMQ 也有队列(Queue)这个概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

Kafka 的消息模型和 RocketMQ 是完全一样的。只是在 Kafka 中,将 Queue 这个概念称为分区(Partition)

如何利用事务消息实现分布式事务?

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

Kafka 的解决方案是:直接抛出异常,让用户自行处理。用户可以在业务代码中反复重试提交,直到提交成功,或者删除之前修改的数据记录进行事务补偿。

img

RocketMQ 的解决方案是:通过事务反查机制来解决事务消息提交失败的问题。如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

img

MQ 事务方案总结

相比本地消息表方案,MQ 事务方案优点是:

  • 消息数据独立存储 ,降低业务系统与消息系统之间的耦合。
  • 吞吐量优于使用本地消息表方案。

缺点是:

  • 一次消息发送需要两次网络请求 (half 消息 + commit/rollback 消息)
  • 业务处理服务需要实现消息状态回查接口

如何确保消息不会丢失?

检测消息丢失方法:

利用消息队列的有序性来验证是否有消息丢失:在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

确保消息不丢失:

  • 生产阶段:捕获消息发送的错误,并针对性进行容错处理。
  • 存储阶段:数据必须设置副本,并且写数据需要保证所有副本都写入成功才视为提交成功。这样可以保证,即使主副本不可用,使用从副本替代,也包含最新数据。
  • 消费阶段:所有数据处理完毕,再手动提交消费偏移量。

如何处理消费过程中的重复消息?

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:- 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once:- 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once - 恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。

常用的设计幂等操作的方法:

  1. 利用数据库的唯一约束实现幂等:INSERT IF NOT EXIST
  2. 为更新的数据设置前置条件:设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。例如:采用乐观锁方式,为数据增加版本号,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
  3. 记录并检查操作:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。——此处涉及分布式 ID 知识点,可以使用类似 GUID、雪花算法 等方式来实现

消息积压了该如何处理?

发送端性能优化

发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。

消费端性能优化

如果消费的速度跟不上生产消息的速度,就会造成消息积压。即供大于求。

一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。

消息积压的处理

需要先分析消息积压的原因:是发送变快了,还是消费变慢了。大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。

  • 如果是因为促销或抢购等原因,导致消息陡增,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。

  • 如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

  • 如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

学习开源代码该如何入手?

(1)先看官方文档,了解关键点:

  • 这个项目是什么
  • 这个项目有什么用
  • 这个项目如何使用
  • 这个项目适用于什么场景
  • 这个项目有哪些优点、缺点

(2)由点及面的阅读源码

不要泛泛而读,容易迷失。最好带着目的性,带着问题去阅读源码,最好是带着问题的答案去读源码。

如何使用异步设计提升系统性能?

异步编程,可以减少或者避免线程等待,从而提高处理速度。但是,其增加了程序复杂度,应酌情使用。

Java 中比较常用的异步框架有 Java8 内置的 CompletableFuture 和 ReactiveX 的 RxJava

如何实现高性能的异步网络传输?

系统一般可以分为 IO 密集型应用和计算密集型应用。大多数业务系统都属于 IO 密集型应用。最常用的 IO 资源有磁盘 IO 和带宽 IO。由于 IO 相较于内存计算,耗时较高,所以往往成为性能优化的关键。

提升 IO 效率的关键在于减少 IO 等待时间,在大量连接请求的时候,如果单线程,显然阻塞时间较长,所以,一般应采用并发 IO 模型。但是,线程数过多时,线程本身造成的 CPU 上下文切换,竞态造成的冲突都会造成额外的开销,导致 CPU 负载升高,从而降低系统整体性能。所以,理想的 IO 模型应该是一个能够复用少量线程的并发 IO 模型。这个模型的当前答案就是 NIO,其最具代表性的框架就是 Netty。其核心原理就是通过多路复用,来提升 IO 效率。

序列化与反序列化:如何通过网络传输结构化的数据?

传输协议:应用程序之间对话的语言

传输协议的目的,在于定义一种信息规则,使得收发双方能够互相交流。传输协议并没有什么必须遵循的规范,能满足需要即可。复杂的协议可以如网络协议报文一样,定义为 TLV 结构。

内存管理:如何避免内存溢出和频繁的垃圾回收?

Kafka 如何实现高性能 IO?

使用批量消息提升服务端处理能力

使用顺序读写提升磁盘 IO 性能

操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。如果是机械硬盘,这个寻址需要比较长的时间,因为它要移动磁头,这是个机械运动,机械硬盘工作的时候会发出咔咔的声音,就是移动磁头发出的声音。

顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。

利用 PageCache 加速消息读写

在 Kafka 中,它会利用 PageCache 加速消息读写。

  • PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存。调用系统的 API 读写文件的时候,不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
  • 应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况。一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
  • 用户的应用程序在使用完某块 PageCache 后,操作系统并不会立刻就清除这个 PageCache,而是尽可能地利用空闲的物理内存保存这些 PageCache,除非系统内存不够用,操作系统才会清理掉一部分 PageCache。清理的策略一般是 LRU 或它的变种算法,这个算法我们不展开讲,它保留 PageCache 的逻辑是:优先保留最近一段时间最常使用的那些 PageCache。
  • Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能。

零拷贝技术

在服务端,处理消费的大致逻辑是这样的:

  • 首先,从文件中找到消息数据,读到内存中;
  • 然后,把消息通过网络发给客户端。

这个过程中,数据实际上做了 2 次或者 3 次复制:

  1. 从文件复制数据到 PageCache 中,如果命中 PageCache,这一步可以省掉;
  2. 从 PageCache 复制到应用程序的内存空间中,也就是我们可以操作的对象所在的内存;
  3. 从应用程序的内存空间复制到 Socket 的缓冲区,这个过程就是我们调用网络应用框架的 API 发送数据的过程。

Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的 2、3 步骤两次复制合并成一次复制。直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

零拷贝操作,实际上是调用系统 API sendfile 实现的。

缓存策略:如何使用缓存来减少磁盘 IO?

如何正确使用锁保护共享数据,协调异步线程?

如何用硬件同步原语(CAS)替代锁?

数据压缩:时间换空间的游戏

数据压缩不仅能节省存储空间,还可以用于提升网络传输性能。

压缩和解压的操作都是计算密集型的操作,非常耗费 CPU 资源。如果你的应用处理业务逻辑就需要耗费大量的 CPU 资源,就不太适合再进行压缩和解压。数据压缩本质上是用时间换空间。这个买卖是不是划算,需要根据实际情况先衡量一下。

目前常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4 等等。在选择压缩算法的时候,需要综合考虑压缩时间和压缩率两个因素,被压缩数据的内容也是影响压缩时间和压缩率的重要因素,必要的时候可以先用业务数据做一个压缩测试,这样有助于选择最合适的压缩算法。一般来说,压缩率越高的算法,压缩耗时也越高。如果是对性能要求高的系统,可以选择压缩速度快的算法,比如 LZ4;如果需要更高的压缩比,可以考虑 GZIP 或者压缩率更高的 XZ 等算法。

另外一个影响压缩率的重要因素是压缩分段的大小,你需要根据业务情况选择一个合适的分段策略,在保证不错的压缩率的前提下,尽量减少解压浪费。

Kafka 在生产者上,对每批消息进行压缩,批消息在服务端不解压,消费者在收到消息之后再进行解压。简单地说,Kafka 的压缩和解压都是在客户端完成的

RocketMQ Producer 源码分析:消息生产的实现过程

Producer 中包含的几个核心的服务都是有状态的,在 Producer 启动时,在 MQClientInstance 这个类中来统一来启动。在发送消息的流程中,RocketMQ 分了三种发送方式:单向、同步和异步,这三种发送方式对应的发送流程基本是相同的,同步和异步发送是由已经封装好的 MQClientAPIImpl 类来分别实现的。

对于我们在分析代码中提到的几个重要的业务逻辑实现类,你最好能记住这几个类和它的功能,包括 :DefaultMQProducerImpl 封装了大部分 Producer 的业务逻辑,MQClientInstance 封装了客户端一些通用的业务逻辑,MQClientAPIImpl 封装了客户端与服务端的 RPC,NettyRemotingClient 实现了底层网络通信。

Kafka Consumer 源码分析:消息消费的实现过程

Kafka 消费模型的几个要点:

  • Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
  • 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
  • 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
  • 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更是,会触发 reblance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
  • Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。

发送请求时,构建 Request 对象,暂存入发送队列,但不立即发送,而是等待合适的时机批量发送。并且,用回调或者 RequestFeuture 方式,预先定义好如何处理响应的逻辑。在收到 Broker 返回的响应之后,也不会立即处理,而是暂存在队列中,择机处理。那这个择机策略就比较复杂了,有可能是需要读取响应的时候,也有可能是缓冲区满了或是时间到了,都有可能触发一次真正的网络请求,也就是在 poll() 方法中发送所有待发送 Request 并处理所有 Response。

Kafka 和 RocketMQ 的消息复制实现的差异点在哪?

如果要确保数据一致性,必须采用“主 - 从”的复制方式。

在“主 - 从”模式下,数据先写入到主节点上,从节点只从主节点上复制数据,如果出现主从数据不一致的情况,必须以主节点上的数据为准。

RocketMQ 如何实现复制

在 RocketMQ 中,复制的基本单位是 Broker,也就是服务端的进程。复制采用的也是主从方式,通常情况下配置成一主一从,也可以支持一主多从。

RocketMQ 提供新、老两种复制方式:传统的主从模式和新的基于 Dledger 的复制方式。传统的主从模式性能更好,但灵活性和可用性稍差,而基于 Dledger 的复制方式,在 Broker 故障的时候可以自动选举出新节点,可用性更好,性能稍差,并且资源利用率更低一些。

RocketMQ 引入 Dledger,通过 Dledger 来完成复制。Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

Kafka 是如何实现复制的

Kafka 中,复制的基本单位是分区。每个分区的几个副本之间,构成一个小的复制集群,Broker 只是这些分区副本的容器,所以 Kafka 的 Broker 是不分主从的。

分区的多个副本中也是采用一主多从的方式。Kafka 在写入消息的时候,采用的也是异步复制的方式。消息在写入到主节点之后,并不会马上返回写入成功,而是等待足够多的节点都复制成功后再返回。Kafka 为这个“足够多”创造了一个专有名词:ISR(In Sync Replicas),翻译过来就是“保持数据同步的副本”。ISR 的数量是可配的,但需要注意的是,这个 ISR 中是包含主节点的。

Kafka 使用 ZooKeeper 来监控每个分区的多个节点,如果发现某个分区的主节点宕机了,Kafka 会利用 ZooKeeper 来选出一个新的主节点,这样解决了可用性的问题。选举的时候,会从所有 ISR 节点中来选新的主节点,这样可以保证数据一致性。

RocketMQ 客户端如何在集群中找到正确的节点?

任何一个弹性分布式集群,都需要一个类似于 NameServer 服务,来帮助访问集群的客户端寻找集群中的节点。

img

在 RocketMQ 中,NameServer 是一个独立的进程,为 Broker、生产者和消费者提供服务。NameServer 最主要的功能就是,为客户端提供寻址服务,协助客户端找到主题对应的 Broker 地址。此外,NameServer 还负责监控每个 Broker 的存活状态。

NameServer 支持只部署一个节点,也支持部署多个节点组成一个集群,这样可以避免单点故障。在集群模式下,NameServer 各节点之间是不需要任何通信的,也不会通过任何方式互相感知,每个节点都可以独立提供全部服务。

每个 Broker 都需要和所有的 NameServer 节点进行通信。当 Broker 保存的 Topic 信息发生变化的时候,它会主动通知所有的 NameServer 更新路由信息,为了保证数据一致性,Broker 还会定时给所有的 NameServer 节点上报路由信息。这个上报路由信息的 RPC 请求,也同时起到 Broker 与 NameServer 之间的心跳作用,NameServer 依靠这个心跳来确定 Broker 的健康状态。

因为每个 NameServer 节点都可以独立提供完整的服务,所以,对于客户端来说,包括生产者和消费者,只需要选择任意一个 NameServer 节点来查询路由信息就可以了。客户端在生产或消费某个主题的消息之前,会先从 NameServer 上查询这个主题的路由信息,然后根据路由信息获取到当前主题和队列对应的 Broker 物理地址,再连接到 Broker 节点上进行生产或消费。

如果 NameServer 检测到与 Broker 的连接中断了,NameServer 会认为这个 Broker 不再能提供服务。NameServer 会立即把这个 Broker 从路由信息中移除掉,避免客户端连接到一个不可用的 Broker 上去。而客户端在与 Broker 通信失败之后,会重新去 NameServer 上拉取路由信息,然后连接到其他 Broker 上继续生产或消费消息,这样就实现了自动切换失效 Broker 的功能。

NameServer 的总体结构

  • NamesrvStartup:程序入口。
  • NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
  • RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息。
  • BrokerHousekeepingService:监控 Broker 连接状态的代理类。
  • DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
  • ClusterTestRequestProcessor:用于测试的请求处理器。

NameServer 的所有核心功能都是在 RouteInfoManager 这个类中实现的。RouteInfoManager 这个类中保存了所有的路由信息,这些路由信息都是保存在内存中,并且没有持久化的。

1
2
3
4
5
6
7
8
9
public class BrokerData implements Comparable<BrokerData> {
// ...
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// ...
}

Kafka 的协调服务 ZooKeeper:实现分布式系统的“瑞士军刀”

ZooKeeper 是一个分布式的协调服务,它的核心服务是一个高可用、高可靠的一致性存储,在此基础上,提供了包括读写元数据、节点监控、选举、节点间通信和分布式锁等很多功能,这些功能可以极大方便我们快速开发一个分布式的集群系统。

ZooKeeper 的使用注意点:

  1. 不要往 ZooKeeper 里面写入大量数据,它不是一个真正意义上的存储系统,只适合存放少量的数据。依据服务器配置的不同,ZooKeeper 在写入超过几百 MB 数据之后,性能和稳定性都会严重下降。
  2. 不要让业务集群的可用性依赖于 ZooKeeper 的可用性,什么意思呢?你的系统可以使用 Zookeeper,但你要留一手,要考虑如果 Zookeeper 集群宕机了,你的业务集群最好还能提供服务。因为 ZooKeeper 的选举过程是比较慢的,而它对网络的抖动又比较敏感,一旦触发选举,这段时间内的 ZooKeeper 是不能提供任何服务的。

Kafka 主要使用 ZooKeeper 来保存它的元数据、监控 Broker 和分区的存活状态,并利用 ZooKeeper 来进行选举。

Kafka 在 ZooKeeper 中保存的元数据,主要就是 Broker 的列表和主题分区信息两棵树。这份元数据同时也被缓存到每一个 Broker 中。客户端并不直接和 ZooKeeper 来通信,而是在需要的时候,通过 RPC 请求去 Broker 上拉取它关心的主题的元数据,然后保存到客户端的元数据缓存中,以便支撑客户端生产和消费

RocketMQ 与 Kafka 中如何实现事务?

Kafka 和 RocketMQ 都是基于两阶段提交来实现的事务,都利用了特殊的主题中的队列和分区来记录事务日志。

不同之处在于对处于事务中的消息的处理方式,RocketMQ 是把这些消息暂存在一个特殊的队列中,待事务提交后再移动到业务队列中;而 Kafka 直接把消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。

RocketMQ 和 Kafka 的事务,它们的适用场景是不一样的,RocketMQ 的事务适用于解决本地事务和发消息的数据一致性问题,而 Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。

MQTT 协议:如何支持海量的在线 IoT 设备?

MQTT 是专门为物联网设备设计的一套标准的通信协议。这套协议在消息模型和功能上与普通的消息队列协议是差不多的,最大的区别在于应用场景不同。在物联网应用场景中,IoT 设备性能差,网络连接不稳定。服务端面临的挑战主要是,需要支撑海量的客户端和主题。

已有的开源的 MQTT 产品,对于协议的支持都不错,在客户端数量小于十万级别的情况下,可以选择。对于海量客户端的场景,服务端必须使用集群来支撑,可以选择收费的云服务和企业版产品。也可以选择自行来构建 MQTT 集群。

自行构建集群,最关键的技术点就是,通过前置的 Proxy 集群来解决海量连接、会话管理和海量主题这三个问题。前置 Proxy 负责在 Broker 和客户端之间转发消息,通过这种方式,将海量客户端连接收敛为少量的 Proxy 与 Broker 之间的连接,解决了海量客户端连接数的问题。维护会话的实现原理,和 Tomcat 维护 HTTP 会话是一样的。对于海量主题,可以在后端部署多组 Broker 小集群,每个小集群分担一部分主题这样的方式来解决。

img

img

Pulsar 的存储计算分离设计:全新的消息队列设计思路

img

Pulsar 和其他消息队列最大的区别是,它采用了存储计算分离的设计。存储消息的职责从 Broker 中分离出来,交给专门的 BookKeeper 存储集群。这样 Broker 就变成了无状态的节点,在集群调度和故障恢复方面更加简单灵活。

无论是 RocketMQ、RabbitMQ 还是 Kafka,消息都是存储在 Broker 的磁盘或者内存中。客户端在访问某个主题分区之前,必须先找到这个分区所在 Broker,然后连接到这个 Broker 上进行生产和消费。在集群模式下,为了避免单点故障导致丢消息,Broker 在保存消息的时候,必须也把消息复制到其他的 Broker 上。当某个 Broker 节点故障的时候,并不是集群中任意一个节点都能替代这个故障的节点,只有那些“和这个故障节点拥有相同数据的节点”才能替代这个故障的节点。原因就是,每一个 Broker 存储的消息数据是不一样的,或者说,每个节点上都存储了状态(数据)。这种节点称为“有状态的节点(Stateful Node)”。

存储计算分离是一种设计思想,它将系统的存储职责和计算职责分离开,存储节点只负责数据存储,而计算节点只负责计算,计算节点是无状态的。无状态的计算节点,具有易于开发、调度灵活的优点,故障转移和恢复也更加简单快速。这种设计的缺点是,系统总体的复杂度更高,性能也更差。不过对于大部分分布式的业务系统来说,由于它不需要自己开发存储系统,采用存储计算分离的设计,既可以充分利用这种设计的优点,整个系统也不会因此变得过于复杂,综合评估优缺点,利大于弊,更加划算。

Flink 分析计算任务之后生成 JobGraph,JobGraph 是一个有向无环图,数据流过这个图中的节点,在每个节点进行计算和变换,最终流出有向无环图就完成了计算。JobGraph 中的每个节点是一个 Task,Task 是可以并行执行的,每个线程就是一个 SubTask。SubTask 被 JobManager 分配给某个 TaskManager,在 TaskManager 进程中的一个线程中执行。

流计算与消息(二):在流计算中使用 Kafka 链接计算任务

端到端 Exactly Once 语义,可以保证在分布式系统中,每条数据不多不少只被处理一次。在流计算中,因为数据重复会导致计算结果错误,所以 Exactly Once 在流计算场景中尤其重要。Kafka 和 Flink 都提供了保证 Exactly Once 的特性,配合使用可以实现端到端的 Exactly Once 语义。

在 Flink 中,如果节点出现故障,可以自动重启计算任务,重新分配计算节点来保证系统的可用性。配合 CheckPoint 机制,可以保证重启后任务的状态恢复到最后一次 CheckPoint,然后从 CheckPoint 中记录的恢复位置继续读取数据进行计算。Flink 通过一个巧妙的 Barrier 使 CheckPoint 中恢复位置和各节点状态完全对应。

Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。在配合 Flink 的时候,每个 Flink 的 CheckPoint 对应一个 Kafka 事务,只要保证 CheckPoint 和 Kafka 事务同步提交就可以实现端到端的 Exactly Once,Flink 通过“二阶段提交”这个分布式事务的经典算法来保证 CheckPoint 和 Kafka 事务状态的一致性。

主流消息队列都是如何存储消息的?

在所有的存储系统中,消息队列的存储可能是最简单的。每个主题包含若干个分区,每个分区其实就是一个 WAL(Write Ahead Log),写入的时候只能尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。

Kafka 存储消息结构

Kafka 的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。

每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。

写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。

可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。

RocketMQ 存储消息结构

RocketMQ 的存储以 Broker 为单位。它的存储也是分为消息文件和索引文件,但是在 RocketMQ 中,每个 Broker 只有一组消息文件,它把在这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在 RocketMQ 中称为 ConsumerQueue。RocketMQ 中的索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。

写入消息的时候,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比 Kafka 的查找是要快的。

Kafka 和 RocketMQ 的存储结构比较

img

对比这两种存储结构,你可以看到它们有很多共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第一条消息的索引序号,索引中记录了消息的位置等等。

在消息文件的存储粒度上,Kafka 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ 以 Broker 为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。

大多数场景下,这两种存储设计的差异其实并不明显,都可以满足需求。但是在某些极限场景下,依然会体现出它们设计的差异。比如,在一个 Broker 上有上千个活动主题的情况下,RocketMQ 的写入性能就会体现出优势。再比如,如果我们的消息都是几个、十几个字节的小消息,但是消息的数量很多,这时候 Kafka 的稀疏索引设计就能节省非常多的存储空间。

参考资料

XXX

简介

什么是 XXX

XXX 有什么用

XXX 原理

参考资料

源码级深度理解 Java SPI

SPI 简介

SPI 全称 Service Provider Interface,是 Java 提供的,旨在由第三方实现或扩展的 API,它是一种用于动态加载服务的机制。Java 中 SPI 机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是 解耦

Java SPI 有四个要素:

  • SPI 接口:为服务提供者实现类约定的的接口或抽象类。
  • SPI 实现类:实际提供服务的实现类。
  • SPI 配置:Java SPI 机制约定的配置文件,提供查找服务实现类的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。
  • **ServiceLoader**:Java SPI 的核心类,用于加载 SPI 实现类。 ServiceLoader 中有各种实用方法来获取特定实现、迭代它们或重新加载服务。

SPI 示例

正所谓,实践出真知,我们不妨通过一个具体的示例来看一下,如何使用 Java SPI。

SPI 接口

首先,需要定义一个 SPI 接口,和普通接口并没有什么差别。

1
2
3
4
5
package io.github.dunwu.javacore.spi;

public interface DataStorage {
String search(String key);
}

SPI 实现类

假设,我们需要在程序中使用两种不同的数据存储——Mysql 和 Redis。因此,我们需要两个不同的实现类去分别完成相应工作。

Mysql 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class MysqlStorage implements DataStorage {
@Override
public String search(String key) {
return "【Mysql】搜索" + key + ",结果:No";
}
}

Redis 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class RedisStorage implements DataStorage {
@Override
public String search(String key) {
return "【Redis】搜索" + key + ",结果:Yes";
}
}

到目前为止,定义接口,并实现接口和普通的 Java 接口实现没有任何不同。

SPI 配置

如果想通过 Java SPI 机制来发现服务,就需要在 SPI 配置中约定好发现服务的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。以本示例代码为例,其文件名应该为 io.github.dunwu.javacore.spi.DataStorage,文件中的内容如下:

1
2
io.github.dunwu.javacore.spi.MysqlStorage
io.github.dunwu.javacore.spi.RedisStorage

ServiceLoader

完成了上面的步骤,就可以通过 ServiceLoader 来加载服务。示例如下:

1
2
3
4
5
6
7
8
9
10
11
import java.util.ServiceLoader;

public class SpiDemo {

public static void main(String[] args) {
ServiceLoader<DataStorage> serviceLoader = ServiceLoader.load(DataStorage.class);
System.out.println("============ Java SPI 测试============");
serviceLoader.forEach(loader -> System.out.println(loader.search("Yes Or No")));
}

}

输出:

1
2
3
============ Java SPI 测试============
【Mysql】搜索Yes Or No,结果:No
【Redis】搜索Yes Or No,结果:Yes

SPI 原理

上文中,我们已经了解 Java SPI 的要素以及使用 Java SPI 的方法。你有没有想过,Java SPI 和普通 Java 接口有何不同,Java SPI 是如何工作的。实际上,Java SPI 机制依赖于 ServiceLoader 类去解析、加载服务。因此,掌握了 ServiceLoader 的工作流程,就掌握了 SPI 的原理。ServiceLoader 的代码本身很精练,接下来,让我们通过走读源码的方式,逐一理解 ServiceLoader 的工作流程。

ServiceLoader 的成员变量

先看一下 ServiceLoader 类的成员变量,大致有个印象,后面的源码中都会使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ServiceLoader<S> implements Iterable<S> {

// SPI 配置文件目录
private static final String PREFIX = "META-INF/services/";

// 将要被加载的 SPI 服务
private final Class<S> service;

// 用于加载 SPI 服务的类加载器
private final ClassLoader loader;

// ServiceLoader 创建时的访问控制上下文
private final AccessControlContext acc;

// SPI 服务缓存,按实例化的顺序排列
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// 懒查询迭代器
private LazyIterator lookupIterator;

// ...
}

ServiceLoader 的工作流程

(1)ServiceLoader.load 静态方法

应用程序加载 Java SPI 服务,都是先调用 ServiceLoader.load 静态方法。ServiceLoader.load 静态方法的作用是:

  1. 指定类加载 ClassLoader 和访问控制上下文;
  2. 然后,重新加载 SPI 服务
    1. 清空缓存中所有已实例化的 SPI 服务
    2. 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器

这里,摘录 ServiceLoader.load 相关源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// service 传入的是期望加载的 SPI 接口类型
// loader 是用于加载 SPI 服务的类加载器
public static <S> ServiceLoader<S> load(Class<S> service,
ClassLoader loader)
{
return new ServiceLoader<>(service, loader);
}

public void reload() {
// 清空缓存中所有已实例化的 SPI 服务
providers.clear();
// 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器
lookupIterator = new LazyIterator(service, loader);
}

// 私有构造方法
// 重新加载 SPI 服务
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
// 指定类加载 ClassLoader 和访问控制上下文
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
// 然后,重新加载 SPI 服务
reload();
}

(2)应用程序通过 ServiceLoaderiterator 方法遍历 SPI 实例

ServiceLoader 的类定义,明确了 ServiceLoader 类实现了 Iterable<T> 接口,所以,它是可以迭代遍历的。实际上,ServiceLoader 类维护了一个缓存 providers( LinkedHashMap 对象),缓存 providers 中保存了已经被成功加载的 SPI 实例,这个 Map 的 key 是 SPI 接口实现类的全限定名,value 是该实现类的一个实例对象。

当应用程序调用 ServiceLoaderiterator 方法时,ServiceLoader 会先判断缓存 providers 中是否有数据:如果有,则直接返回缓存 providers 的迭代器;如果没有,则返回懒加载迭代器的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Iterator<S> iterator() {
return new Iterator<S>() {

// 缓存 SPI providers
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();

// lookupIterator 是 LazyIterator 实例,用于懒加载 SPI 实例
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}

public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

(3)懒加载迭代器的工作流程

上面的源码中提到了,lookupIterator 是 LazyIterator 实例,而 LazyIterator 用于懒加载 SPI 实例。那么, LazyIterator 是如何工作的呢?

这里,摘取 LazyIterator 关键代码

  • hasNextService 方法:
    1. 拼接 META-INF/services/ + SPI 接口全限定名
    2. 通过类加载器,尝试加载资源文件
    3. 解析资源文件中的内容,获取 SPI 接口的实现类的全限定名 nextName
  • nextService 方法:
    1. hasNextService() 方法解析出了 SPI 实现类的的全限定名 nextName,通过反射,获取 SPI 实现类的类定义 Class<?>
    2. 然后,尝试通过 Class<?>newInstance 方法实例化一个 SPI 服务对象。如果成功,则将这个对象加入到缓存 providers 中并返回该对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 1.拼接 META-INF/services/ + SPI 接口全限定名
// 2.通过类加载器,尝试加载资源文件
// 3.解析资源文件中的内容
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}

private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a s");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

SPI 和类加载器

通过上面两个章节中,走读 ServiceLoader 代码,我们已经大致了解 Java SPI 的工作原理,即通过 ClassLoader 加载 SPI 配置文件,解析 SPI 服务,然后通过反射,实例化 SPI 服务实例。我们不妨思考一下,为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?

学习过 JVM 的读者,想必都了解过类加载器的双亲委派模型(Parents Delegation Model)。双亲委派模型要求除了顶层的 BootstrapClassLoader 外,其余的类加载器都应有自己的父类加载器。这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现。双亲委派继承体系图如下:

img

双亲委派机制约定了:一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

双亲委派的好处:使得 Java 类伴随着它的类加载器,天然具备一种带有优先级的层次关系,从而使得类加载得到统一,不会出现重复加载的问题:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

双亲委派的限制:子类加载器可以使用父类加载器已经加载的类,而父类加载器无法使用子类加载器已经加载的。——这就导致了双亲委派模型并不能解决所有的类加载器问题。Java SPI 就面临着这样的问题:

  • SPI 的接口是 Java 核心库的一部分,是由 BootstrapClassLoader 加载的;
  • 而 SPI 实现的 Java 类一般是由 AppClassLoader 来加载的。BootstrapClassLoader 是无法找到 SPI 的实现类的,因为它只加载 Java 的核心库。它也不能代理给 AppClassLoader,因为它是最顶层的类加载器。这也解释了本节开始的问题——为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?因为如果不指定 ClassLoader,则无法获取 SPI 服务。

如果不做任何的设置,Java 应用的线程的上下文类加载器默认就是 AppClassLoader。在核心类库使用 SPI 接口时,传递的类加载器使用线程上下文类加载器,就可以成功的加载到 SPI 实现的类。线程上下文类加载器在很多 SPI 的实现中都会用到。

通常可以通过 Thread.currentThread().getClassLoader()Thread.currentThread().getContextClassLoader() 获取线程上下文类加载器。

Java SPI 的不足

Java SPI 存在一些不足:

  • 不能按需加载,需要遍历所有的实现,并实例化,然后在循环中才能找到我们需要的实现。如果不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。

  • 获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。

  • 多个并发多线程使用 ServiceLoader 类的实例是不安全的。

SPI 应用场景

SPI 在 Java 开发中应用十分广泛。首先,在 Java 的 java.util.spi package 中就约定了很多 SPI 接口。下面,列举一些 SPI 接口:

除此以外,SPI 还有很多应用,下面列举几个经典案例。

SPI 应用案例之 JDBC DriverManager

作为 Java 工程师,尤其是 CRUD 工程师,相必都非常熟悉 JDBC。众所周知,关系型数据库有很多种,如:Mysql、Oracle、PostgreSQL 等等。JDBC 如何识别各种数据库的驱动呢?

创建数据库连接

我们先回顾一下,JDBC 如何创建数据库连接的呢?

JDBC4.0 之前,连接数据库的时候,通常会用 Class.forName(XXX) 方法来加载数据库相应的驱动,然后再获取数据库连接,继而进行 CRUD 等操作。

1
Class.forName("com.mysql.jdbc.Driver")

而 JDBC4.0 之后,不再需要用 Class.forName(XXX) 方法来加载数据库驱动,直接获取连接就可以了。显然,这种方式很方便,但是如何做到的呢?

  • JDBC 接口:首先,Java 中内置了接口 java.sql.Driver

  • JDBC 接口实现:各个数据库的驱动自行实现 java.sql.Driver 接口,用于管理数据库连接。

    • Mysql:在 mysql 的 Java 驱动包 mysql-connector-java-XXX.jar 中,可以找到 META-INF/services 目录,该目录下会有一个名字为java.sql.Driver 的文件,文件内容是 com.mysql.cj.jdbc.Drivercom.mysql.cj.jdbc.Driver 正是 Mysql 版的 java.sql.Driver 实现。如下图所示:

    • PostgreSQL 实现:在 PostgreSQL 的 Java 驱动包 postgresql-42.0.0.jar 中,也可以找到同样的配置文件,文件内容是 org.postgresql.Driverorg.postgresql.Driver 正是 PostgreSQL 版的 java.sql.Driver 实现。
  • 创建数据库连接

    以 Mysql 为例,创建数据库连接代码如下:

    1
    2
    final String DB_URL = String.format("jdbc:mysql://%s:%s/%s", DB_HOST, DB_PORT, DB_SCHEMA);
    connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);

DriverManager

从前文,我们已经知道 DriverManager 是创建数据库连接的关键。它究竟是如何工作的呢?

可以看到是加载实例化驱动的,接着看 loadInitialDrivers 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private static void loadInitialDrivers() {
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// 通过 classloader 获取所有实现 java.sql.Driver 的驱动类
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// 利用 SPI,记载所有 Driver 服务
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
// 获取迭代器
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try{
// 遍历迭代器
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});

// 打印数据库驱动信息
println("DriverManager.initialize: jdbc.drivers = " + drivers);

if (drivers == null || drivers.equals("")) {
return;
}
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
// 尝试实例化驱动
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}

上面的代码主要步骤是:

  1. 从系统变量中获取驱动的实现类。
  2. 利用 SPI 来获取所有驱动的实现类。
  3. 遍历所有驱动,尝试实例化各个实现类。
  4. 根据第 1 步获取到的驱动列表来实例化具体的实现类。

需要关注的是下面这行代码:

1
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);

这里实际获取的是 java.util.ServiceLoader.LazyIterator 迭代器。调用其 hasNext 方法时,会搜索 classpath 下以及 jar 包中的 META-INF/services 目录,查找 java.sql.Driver 文件,并找到文件中的驱动实现类的全限定名。调用其 next 方法时,会根据驱动类的全限定名去尝试实例化一个驱动类的对象。

SPI 应用案例之 Common-Logging

common-logging(也称 Jakarta Commons Logging,缩写 JCL)是常用的日志门面工具包。

common-logging 的核心类是入口是 LogFactoryLogFatory 是一个抽象类,它负责加载具体的日志实现。

其入口方法是 LogFactory.getLog 方法,源码如下:

1
2
3
4
5
6
7
public static Log getLog(Class clazz) throws LogConfigurationException {
return getFactory().getInstance(clazz);
}

public static Log getLog(String name) throws LogConfigurationException {
return getFactory().getInstance(name);
}

从以上源码可知,getLog 采用了工厂设计模式,是先调用 getFactory 方法获取具体日志库的工厂类,然后根据类名称或类型创建日志实例。

LogFatory.getFactory 方法负责选出匹配的日志工厂,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public static LogFactory getFactory() throws LogConfigurationException {
// 省略...

// 加载 commons-logging.properties 配置文件
Properties props = getConfigurationFile(contextClassLoader, FACTORY_PROPERTIES);

// 省略...

// 决定创建哪个 LogFactory 实例
// (1)尝试读取全局属性 org.apache.commons.logging.LogFactory
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for system property [" + FACTORY_PROPERTY +
"] to define the LogFactory subclass to use...");
}

try {
// 如果指定了 org.apache.commons.logging.LogFactory 属性,尝试实例化具体实现类
String factoryClass = getSystemProperty(FACTORY_PROPERTY, null);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class '" + factoryClass +
"' as specified by system property " + FACTORY_PROPERTY);
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No system property [" + FACTORY_PROPERTY + "] defined.");
}
}
} catch (SecurityException e) {
// 异常处理
} catch (RuntimeException e) {
// 异常处理
}

// (2)利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 实现类
if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for a resource file of name [" + SERVICE_ID +
"] to define the LogFactory subclass to use...");
}
try {
final InputStream is = getResourceAsStream(contextClassLoader, SERVICE_ID);

if( is != null ) {
// This code is needed by EBCDIC and other strange systems.
// It's a fix for bugs reported in xerces
BufferedReader rd;
try {
rd = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
rd = new BufferedReader(new InputStreamReader(is));
}

String factoryClassName = rd.readLine();
rd.close();

if (factoryClassName != null && ! "".equals(factoryClassName)) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class " +
factoryClassName +
" as specified by file '" + SERVICE_ID +
"' which was present in the path of the context classloader.");
}
factory = newFactory(factoryClassName, baseClassLoader, contextClassLoader );
}
} else {
// is == null
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No resource file with name '" + SERVICE_ID + "' found.");
}
}
} catch (Exception ex) {
// note: if the specified LogFactory class wasn't compatible with LogFactory
// for some reason, a ClassCastException will be caught here, and attempts will
// continue to find a compatible class.
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] A security exception occurred while trying to create an" +
" instance of the custom factory class" +
": [" + trim(ex.getMessage()) +
"]. Trying alternative implementations...");
}
// ignore
}
}

// (3)尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性

if (factory == null) {
if (props != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Looking in properties file for entry with key '" + FACTORY_PROPERTY +
"' to define the LogFactory subclass to use...");
}
String factoryClass = props.getProperty(FACTORY_PROPERTY);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Properties file specifies LogFactory subclass '" + factoryClass + "'");
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);

// TODO: think about whether we need to handle exceptions from newFactory
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Properties file has no entry specifying LogFactory subclass.");
}
}
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No properties file available to determine" + " LogFactory subclass from..");
}
}
}

// (4)以上情况都不满足,实例化默认实现类 org.apache.commons.logging.impl.LogFactoryImpl

if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Loading the default LogFactory implementation '" + FACTORY_DEFAULT +
"' via the same classloader that loaded this LogFactory" +
" class (ie not looking in the context classloader).");
}

factory = newFactory(FACTORY_DEFAULT, thisClassLoader, contextClassLoader);
}

if (factory != null) {
/**
* Always cache using context class loader.
*/
cacheFactory(contextClassLoader, factory);

if (props != null) {
Enumeration names = props.propertyNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
String value = props.getProperty(name);
factory.setAttribute(name, value);
}
}
}

return factory;
}

从 getFactory 方法的源码可以看出,其核心逻辑分为 4 步:

  1. 首先,尝试查找全局属性 org.apache.commons.logging.LogFactory,如果指定了具体类,尝试创建实例。
  2. 利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 的实现类。
  3. 尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性,如果指定了具体类,尝试创建实例。
  4. 以上情况如果都不满足,则实例化默认实现类,即 org.apache.commons.logging.impl.LogFactoryImpl

SPI 应用案例之 Spring Boot

Spring Boot 是基于 Spring 构建的框架,其设计目的在于简化 Spring 应用的配置、运行。在 Spring Boot 中,大量运用了自动装配来尽可能减少配置。

下面是一个 Spring Boot 入口示例,可以看到,代码非常简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@GetMapping("/hello")
public String hello(@RequestParam(value = "name", defaultValue = "World") String name) {
return String.format("Hello %s!", name);
}
}

那么,Spring Boot 是如何做到寥寥几行代码,就可以运行一个 Spring Boot 应用的呢。我们不妨带着疑问,从源码入手,一步步探究其原理。

@SpringBootApplication 注解

首先,Spring Boot 应用的启动类上都会标记一个 @SpringBootApplication 注解。@SpringBootApplication 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}
)
public @interface SpringBootApplication {
// 略
}

除了 @Target@Retention@Documented@Inherited 这几个元注解, @SpringBootApplication 注解的定义中还标记了 @SpringBootConfiguration@EnableAutoConfiguration@ComponentScan 三个注解。

@SpringBootConfiguration 注解

@SpringBootConfiguration 注解的定义来看,@SpringBootConfiguration 注解本质上就是一个 @Configuration 注解,这意味着被@SpringBootConfiguration 注解修饰的类会被 Spring Boot 识别为一个配置类。

1
2
3
4
5
6
7
8
9
10
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
public @interface SpringBootConfiguration {
@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

@EnableAutoConfiguration 注解

@EnableAutoConfiguration 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

Class<?>[] exclude() default {};

String[] excludeName() default {};
}

@EnableAutoConfiguration 注解包含了 @AutoConfigurationPackage@Import({AutoConfigurationImportSelector.class}) 两个注解。

@AutoConfigurationPackage 注解

@AutoConfigurationPackage 会将被修饰的类作为主配置类,该类所在的 package 会被视为根路径,Spring Boot 默认会自动扫描根路径下的所有 Spring Bean(被 @Component 以及继承 @Component 的各个注解所修饰的类)。——这就是为什么 Spring Boot 的启动类一般要置于根路径的原因。这个功能等同于在 Spring xml 配置中通过 context:component-scan 来指定扫描路径。@Import 注解的作用是向 Spring 容器中直接注入指定组件。@AutoConfigurationPackage 注解中注明了 @Import({Registrar.class})Registrar 类用于保存 Spring Boot 的入口类、根路径等信息。

SpringFactoriesLoader.loadFactoryNames 方法

@Import(AutoConfigurationImportSelector.class) 表示直接注入 AutoConfigurationImportSelectorAutoConfigurationImportSelector 有一个核心方法 getCandidateConfigurations 用于获取候选配置。该方法调用了 SpringFactoriesLoader.loadFactoryNames 方法,这个方法即为 Spring Boot SPI 的关键,它负责加载所有 META-INF/spring.factories 文件,加载的过程由 SpringFactoriesLoader 负责。

Spring Boot 的 META-INF/spring.factories 文件本质上就是一个 properties 文件,数据内容就是一个个键值对。

SpringFactoriesLoader.loadFactoryNames 方法的关键源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// spring.factories 文件的格式为:key=value1,value2,value3
// 遍历所有 META-INF/spring.factories 文件
// 解析文件,获得 key=factoryClass 的类名称
public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
String factoryTypeName = factoryType.getName();
return loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());
}

private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
// 尝试获取缓存,如果缓存中有数据,直接返回
MultiValueMap<String, String> result = cache.get(classLoader);
if (result != null) {
return result;
}

try {
// 获取资源文件路径
Enumeration<URL> urls = (classLoader != null ?
classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
result = new LinkedMultiValueMap<>();
// 遍历所有路径
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
UrlResource resource = new UrlResource(url);
// 解析文件,得到对应的一组 Properties
Properties properties = PropertiesLoaderUtils.loadProperties(resource);
// 遍历解析出的 properties,组装数据
for (Map.Entry<?, ?> entry : properties.entrySet()) {
String factoryTypeName = ((String) entry.getKey()).trim();
for (String factoryImplementationName : StringUtils.commaDelimitedListToStringArray((String) entry.getValue())) {
result.add(factoryTypeName, factoryImplementationName.trim());
}
}
}
cache.put(classLoader, result);
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load factories from location [" +
FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}

归纳上面的方法,主要作了这些事:

加载所有 META-INF/spring.factories 文件,加载过程有 SpringFactoriesLoader 负责。

  • 在 CLASSPATH 中搜寻所有 META-INF/spring.factories 配置文件
  • 然后,解析 spring.factories 文件,获取指定自动装配类的全限定名

Spring Boot 的 AutoConfiguration

Spring Boot 有各种 starter 包,可以根据实际项目需要,按需取材。在项目开发中,只要将 starter 包引入,我们就可以用很少的配置,甚至什么都不配置,即可获取相关的能力。通过前面的 Spring Boot SPI 流程,只完成了自动装配工作的一半,剩下的工作如何处理呢 ?

以 spring-boot-starter-web 的 jar 包为例,查看其 maven pom,可以看到,它依赖于 spring-boot-starter,所有 Spring Boot 官方 starter 包都会依赖于这个 jar 包。而 spring-boot-starter 又依赖于 spring-boot-autoconfigure,Spring Boot 的自动装配秘密,就在于这个 jar 包。

从 spring-boot-autoconfigure 包的结构来看,它有一个 META-INF/spring.factories ,显然利用了 Spring Boot SPI,来自动装配其中的配置类。

下图是 spring-boot-autoconfigure 的 META-INF/spring.factories 文件的部分内容,可以看到其中注册了一长串会被自动加载的 AutoConfiguration 类。

RedisAutoConfiguration 为例,这个配置类中,会根据 @ConditionalXXX 中的条件去决定是否实例化对应的 Bean,实例化 Bean 所依赖的重要参数则通过 RedisProperties 传入。

RedisProperties 中维护了 Redis 连接所需要的关键属性,只要在 yml 或 properties 配置文件中,指定 spring.redis 开头的属性,都会被自动装载到 RedisProperties 实例中。

通过以上分析,已经一步步解读出 Spring Boot 自动装载的原理。

SPI 应用案例之 Dubbo

Dubbo 并未使用 Java SPI,而是自己封装了一套新的 SPI 机制。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,配置内容形式如下:

1
2
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样可以按需加载指定的实现类。Dubbo SPI 除了支持按需加载接口实现类,还增加了 IOC 和 AOP 等特性。

ExtensionLoader 入口

Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,可以加载指定的实现类。

ExtensionLoadergetExtension 方法是其入口方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
// 获取默认的拓展实现类
return getDefaultExtension();
}
// Holder,顾名思义,用于持有目标对象
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
// 双重检查
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
// 设置实例到 holder 中
holder.set(instance);
}
}
}
return (T) instance;
}

可以看出,这个方法的作用就是:首先检查缓存,缓存未命中则调用 createExtension 方法创建拓展对象。那么,createExtension 是如何创建拓展对象的呢,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension(
(T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}

createExtension 方法的的工作步骤可以归纳为:

  1. 通过 getExtensionClasses 获取所有的拓展类
  2. 通过反射创建拓展对象
  3. 向拓展对象中注入依赖
  4. 将拓展对象包裹在相应的 Wrapper 对象中

以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是 Dubbo IOC 与 AOP 的具体实现。

获取所有的拓展类

Dubbo 在通过名称获取拓展类之前,首先需要根据配置文件解析出拓展项名称到拓展类的映射关系表(Map<名称, 拓展类>),之后再根据拓展项名称从映射关系表中取出相应的拓展类即可。相关过程的代码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取已加载的拓展类
Map<String, Class<?>> classes = cachedClasses.get();
// 双重检查
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载拓展类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

这里也是先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过 loadExtensionClasses 加载拓展类。下面分析 loadExtensionClasses 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Map<String, Class<?>> loadExtensionClasses() {
// 获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
// 对 SPI 注解内容进行切分
String[] names = NAME_SEPARATOR.split(value);
// 检测 SPI 注解内容是否合法,不合法则抛出异常
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension...");
}

// 设置默认名称,参考 getDefaultExtension 方法
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
// 加载指定文件夹下的配置文件
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}

loadExtensionClasses 方法总共做了两件事情,一是对 SPI 注解进行解析,二是调用 loadDirectory 方法加载指定文件夹配置文件。SPI 注解解析过程比较简单,无需多说。下面我们来看一下 loadDirectory 做了哪些事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
// fileName = 文件夹路径 + type 全限定名
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
// 根据文件名加载所有的同名文件
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 加载资源
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("...");
}
}

loadDirectory 方法先通过 classLoader 获取所有资源链接,然后再通过 loadResource 方法加载资源。我们继续跟下去,看一下 loadResource 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void loadResource(Map<String, Class<?>> extensionClasses,
ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
// 按行读取配置内容
while ((line = reader.readLine()) != null) {
// 定位 # 字符
final int ci = line.indexOf('#');
if (ci >= 0) {
// 截取 # 之前的字符串,# 之后的内容为注释,需要忽略
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 以等于号 = 为界,截取键与值
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 加载类,并通过 loadClass 方法对类进行缓存
loadClass(extensionClasses, resourceURL,
Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class...");
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class...");
}
}

loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法进行其他操作。loadClass 方法用于主要用于操作缓存,该方法的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL,
Class<?> clazz, String name) throws NoSuchMethodException {

if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("...");
}

// 检测目标类上是否有 Adaptive 注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
// 设置 cachedAdaptiveClass缓存
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("...");
}

// 检测 clazz 是否是 Wrapper 类型
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
// 存储 clazz 到 cachedWrapperClasses 缓存中
wrappers.add(clazz);

// 程序进入此分支,表明 clazz 是一个普通的拓展类
} else {
// 检测 clazz 是否有默认的构造方法,如果没有,则抛出异常
clazz.getConstructor();
if (name == null || name.length() == 0) {
// 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为 name
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("...");
}
}
// 切分 name
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
// 如果类上有 Activate 注解,则使用 names 数组的第一个元素作为键,
// 存储 name 到 Activate 注解对象的映射关系
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
// 存储 Class 到名称的映射关系
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
// 存储名称到 Class 的映射关系
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("...");
}
}
}
}
}

如上,loadClass 方法操作了不同的缓存,比如 cachedAdaptiveClasscachedWrapperClassescachedNames 等等。除此之外,该方法没有其他什么逻辑了。

参考资料

服务容错

故障分类

从故障影响范围维度来看,分布式系统的故障可以分为三类:

  • 集群故障:根据业务量大小而定,集群规模从几台到甚至上万台都有可能。一旦某些代码出现 bug,可能整个集群都会发生故障,不能提供对外提供服务。
  • 机房故障:现在大多数互联网公司为了保证业务的高可用性,往往业务部署在不止一个机房。然而现实中,某机房的光缆因为道路施工被挖断,导致整个机房脱网的事情,也是时有发生的。并且这种事情往往容易上热搜。
  • 单机故障:集群中的个别机器出现故障,这种情况往往对全局没有太大影响,但会导致调用到故障机器上的请求都失败,影响整个系统的成功率。

集群故障应对处理

一般而言,集群故障的产生原因不外乎有两种:

  • 一种是代码 bug 所导致,比如说某一段 Java 代码不断地分配大对象,但没有及时回收导致 JVM OOM 退出;
  • 另一种是流量突刺,短时间突然而至的大量请求超出了系统的承载能力。

应付集群故障的思路,主要是采用流量控制,主要手段有:限流降级熔断

机房故障应对处理

单机房脱网的事情,多半是因为一些不可抗因素,如:机房失火、光缆被挖断等等。有句老话叫:不要把鸡蛋都放在一个篮子里。同理,不要把业务都部署在一个机房中,一旦机房出事,那就彻底完蛋了。所以,很多互联网公司的业务都采用多机房部署。如果要追求更高的可靠性,可以采用同城多活部署,甚至异地多活部署。

多机房部署的好处显而易见,即提高了系统的可用性,但是这种架构引入了其他的问题:如何保证不同机房数据的一致性,如何切换多机房的流量,等等。

针对流量切换问题,一般有两种手段:

  • 基于 DNS 解析的流量切换,一般是通过把请求访问域名解析的 VIP 从一个 IDC 切换到另外一个 IDC。
  • 基于 RPC 分组的流量切换,对于一个服务来说,如果是部署在多个 IDC 的话,一般每个 IDC 就是一个分组。假如一个 IDC 出现故障,那么原先路由到这个分组的流量,就可以通过向配置中心下发命令,把原先路由到这个分组的流量全部切换到别的分组,这样的话就可以切换故障 IDC 的流量了。

单机故障应对处理

对于大规模集群来说,出现单机故障的概率是很高的。当出现单机故障时,需要有一定的自动化处理手段。

处理单机故障一个有效的办法就是自动重启。具体来讲,你可以设置一个阈值,比如以某个接口的平均耗时为准,当监控单机上某个接口的平均耗时超过一定阈值时,就认为这台机器有问题,这个时候就需要把有问题的机器从线上集群中摘除掉,然后在重启服务后,重新加入到集群中。

容错策略

服务调用并不总是一定成功的,前面我讲过,可能因为服务提供者节点自身宕机、进程异常退出或者服务消费者与提供者之间的网络出现故障等原因。对于服务调用失败的情况,需要有手段自动恢复,来保证调用成功。

常用的手段主要有以下几种:

  • 故障转移(FailOver):当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。这种策略要求服务调用的操作必须是幂等的,也就是说无论调用多少次,只要是同一个调用,返回的结果都是相同的,一般适合服务调用是读请求的场景。
  • 快速失败(FailFast):只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。实际在业务执行时,一般非核心业务的调用,会采用快速失败策略,调用失败后一般就记录下失败日志就返回了。
  • 安全失败(Failsafe):出现异常时,直接忽略。通常用于写入审计日志等操作。
  • 静默失败(Failsilent):如果大量的请求需要等到超时(或者长时间处理后)才宣告失败,很容易由于某个远程服务的请求堆积而消耗大量的线程、内存、网络等资源,进而影响到整个系统的稳定。面对这种情况,一种合理的失败策略是当请求失败后,就默认服务提供者一定时间内无法再对外提供服务,不再向它分配请求流量,将错误隔离开来,避免对系统其他部分产生影响,此即为沉默失败策略。
  • 故障恢复(FailBack):就是服务消费者调用失败或者超时后,不再重试,而是根据失败的详细信息,来决定后续的执行策略。比如对于非幂等的调用场景,如果调用失败后,不能简单地重试,而是应该查询服务端的状态,看调用到底是否实际生效,如果已经生效了就不能再重试了;如果没有生效可以再发起一次调用。通常用于消息通知操作。
  • 并行调用(Forking):并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
  • 广播调用(Broadcast):广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

容错设计模式

断路器模式

断路器的基本思路是很简单的,就是通过代理(断路器对象)来一对一地(一个远程服务对应一个断路器对象)接管服务调用者的远程请求。断路器会持续监控并统计服务返回的成功、失败、超时、拒绝等各种结果,当出现故障(失败、超时、拒绝)的次数达到断路器的阈值时,它状态就自动变为“OPEN”,后续此断路器代理的远程访问都将直接返回调用失败,而不会发出真正的远程服务请求。通过断路器对远程服务的熔断,避免因持续的失败或拒绝而消耗资源,因持续的超时而堆积请求,最终的目的就是避免雪崩效应的出现。由此可见,断路器本质是一种快速失败策略的实现方式。

舱壁隔离模式

舱壁隔离模式是常用的实现服务隔离的设计模式,舱壁这个词是来自造船业的舶来品,它原本的意思是设计舰船时,要在每个区域设计独立的水密舱室,一旦某个舱室进水,也只是影响这个舱室中的货物,而不至于让整艘舰艇沉没。这种思想就很符合容错策略中失败静默策略。

Hystrix 就采用舱壁隔离模式来实现线程隔离。

重试模式

故障转移和故障恢复策略都需要对服务进行重复调用,差别是这些重复调用有可能是同步的,也可能是后台异步进行;有可能会重复调用同一个服务,也可能会调用到服务的其他副本。无论具体是通过怎样的方式调用、调用的服务实例是否相同,都可以归结为重试设计模式的应用范畴。重试模式适合解决系统中的瞬时故障,简单的说就是有可能自己恢复(Resilient,称为自愈,也叫做回弹性)的临时性失灵,网络抖动、服务的临时过载(典型的如返回了 503 Bad Gateway 错误)这些都属于瞬时故障。

参考资料

链路追踪

链路追踪简介

什么是链路追踪

链路追踪系统广义的概念是:由数据采集数据处理数据展示三个相对独立的模块所构成的分布式追踪系统;链路追踪系统狭义的概念是:特指链路追踪的数据采集。譬如 Spring Cloud Sleuth 就属于狭义的链路追踪系统,通常会搭配 Zipkin 作为数据展示,搭配 Elasticsearch 作为数据存储来组合使用;而 ZipkinPinpointSkyWalkingCAT 都属于广义的链路追踪系统。

个人理解,链路追踪的本质就是,通过全局唯一的 ID,将分布在各个服务节点上的同一次请求产生的数据串联起来,从而梳理出调用关系,进而辅助分析系统问题、分析调用数据并统计各种系统指标。

为什么需要链路追踪

链路追踪主要有以下作用

  • 分析系统瓶颈:通过记录调用经过的每一条链路上的耗时,我们能快速定位整个系统的瓶颈点在哪里。比如你访问微博首页发现很慢,肯定是由于某种原因造成的,有可能是运营商网络延迟,有可能是网关系统异常,有可能是某个服务异常,还有可能是缓存或者数据库异常。通过链路追踪,可以从全局视角上去观察,找出整个系统的瓶颈点所在,然后做出针对性的优化。
  • 分析链路调用:通过链路追踪可以分析调用所经过的路径,然后评估是否合理。比如一个服务调用下游依赖了多个服务,通过调用链分析,可以评估是否每个依赖都是必要的,是否可以通过业务优化来减少服务依赖。还有就是,一般业务都会在多个数据中心都部署服务,以实现异地容灾,这个时候经常会出现一种状况就是服务 A 调用了另外一个数据中心的服务 B,而没有调用同处于一个数据中心的服务 B。根据我的经验,跨数据中心的调用视距离远近都会有一定的网络延迟,像北京和广州这种几千公里距离的网络延迟可能达到 30ms 以上,这对于有些业务几乎是不可接受的。通过对调用链路进行分析,可以找出跨数据中心的服务调用,从而进行优化,尽量规避这种情况出现。
  • 生成网络拓扑:通过链路追踪中记录的链路信息,可以生成一张系统的网络调用拓扑图,它可以反映系统都依赖了哪些服务,以及服务之间的调用关系是什么样的,可以一目了然。除此之外,在网络拓扑图上还可以把服务调用的详细信息也标出来,也能起到服务监控的作用。
  • 透明传输数据:除了链路追踪,业务上经常有一种需求,期望能把一些用户数据,从调用的开始一直往下传递,以便系统中的各个服务都能获取到这个信息。比如业务想做一些 A/B 测试,这时候就想通过链路追踪,把 A/B 测试的开关逻辑一直往下传递,经过的每一层服务都能获取到这个开关值,就能够统一进行 A/B 测试。

链路追踪原理

Google 发布的一篇的论文 Dapper, a Large-Scale Distributed Systems Tracing Infrastructure,里面详细讲解了链路追踪的实现原理。Dapper 论文几乎成了现代链路追踪的理论基石,很多主流的链路追踪系统都是基于 Dapper 衍生出来的,比较有名的有 Twitter 的Zipkin、阿里的鹰眼、美团的MTrace等。

链路追踪核心概念

Dapper 提出了一些很重要的核心概念:Trace、Span、Annonation 等,这是理解链路追踪原理的前提。

Trace 和 Spans(图片来源于Dapper 论文

  • Trace (追踪) - 代表一次完整的请求。一次完整的请求是指,从客户端发起请求,记录请求流转的每一个服务,直到客户端收到响应为止。整个过程中,当请求分发到第一层级的服务时,就会生成一个全局唯一的 Trace ID,并且会随着请求分发到每一层级。因此,通过 Trace ID 就可以把一次用户请求在系统中调用的链路串联起来。
  • Span (跨度) - 代表一次调用,也是链路追踪的基本单元。由于每次 Trace 都可能会调用数量不定、坐标不定的多个服务,为了能够记录具体调用了哪些服务,以及调用的顺序、开始时点、执行时长等信息,每次开始调用服务前都要先埋入一个调用记录,这个记录称为一个 Span。
    • Span 的数据结构应该足够简单,以便于能放在日志或者网络协议的报文头里;也应该足够完备,起码应含有时间戳、起止时间、Trace 的 ID、当前 Span 的 ID、父 Span 的 ID 等能够满足追踪需要的信息。
    • Trace 实际上都是由若干个有顺序、有层级关系的 Span 所组成一颗 Trace Tree (追踪树)。
  • Annotation:用于业务自定义埋点数据,例如:一次请求的用户 ID,某一个支付订单的订单 ID 等。

数据埋点阶段

数据采集的作用就是在系统的各个不同模块中进行埋点,采集数据并上报给数据处理层进行处理。而一次请求可以分为四个阶段:

  • CS(Client Send)阶段 - 客户端发起请求时埋点,需要传递一些参数,比如服务的方法名等。
  • SR(Server Recieve)阶段 - 服务端接收请求时埋点,需要回填一些参数,比如 traceId,spanId。
  • SS(Server Send)阶段 - 服务端返回请求时埋点,这时会将上下文数据传递到异步上传队列中。
  • CR(Client Recieve)阶段 - 客户端接收返回结果时埋点,这时会将上下文数据传递到异步上传队列中。

下图显示了 Span 和 Trace 在系统中的样子。

img

(图片来源于 spring-cloud-sleuth 文档

图片说明:

每种颜色表示一个跨度(有七个跨度 - 从 A 到 G)

1
2
3
Trace Id = X
Span Id = D
Client Sent

类似上面的注释,表示当前跨度的跟踪 ID 设置为 X,跨度 ID 设置为 D。此外,从 RPC 的角度来看,发生了客户端发送事件。

下图显示了 span 的父子关系:

(图片来源于 spring-cloud-sleuth 文档

链路追踪实现

一个完整的数据链路系统大致可以分为三个相对独立的模块:

  • 数据采集 - 负责数据埋点并上报。
  • 数据处理 - 负责数据的存储与计算。
  • 数据展示 - 负责数据的可视化展示。

数据采集

数据采集负责数据埋点并上报。数据采集有三种主流的实现方式,分别是基于日志的追踪(Log-Based Tracing),基于服务的追踪(Service-Based Tracing)和基于边车代理的追踪(Sidecar-Based Tracing)。

基于日志的追踪

基于日志的追踪的思路是:将 Trace、Span 等信息直接输出到应用日志中,然后随着所有节点的日志采集汇聚到一起,再从全局日志信息中反推出完整的调用链拓扑关系。

基于日志的追踪有以下特点:

  • 侵入性小、性能影响低 - 对网络消息完全没有侵入性,对应用程序只有很少量的侵入性,对性能影响也非常低。
  • 依赖于日志采集过程,导致不够实时、精准 - 直接依赖于日志采集过程,日志本身不追求绝对的连续与一致,这也使得基于日志的追踪往往不如其他两种追踪实现来的精准。另外,业务服务的调用与日志的归集并不是同时完成的,也通常不由同一个进程完成,有可能发生业务调用已经顺利结束了,但由于日志归集不及时或者精度丢失,导致日志出现延迟或缺失记录,进而产生追踪失真。

日志追踪的代表产品是 Spring Cloud Sleuth,下面是一段由 Sleuth 在调用时自动生成的日志记录,可以从中观察到 TraceID、SpanID、父 SpanID 等追踪信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 以下为调用端的日志输出:
Created new Feign span [Trace: cbe97e67ce162943, Span: bb1798f7a7c9c142, Parent: cbe97e67ce162943, exportable:false]
2019-06-30 09:43:24.022 [http-nio-9010-exec-8] DEBUG o.s.c.s.i.web.client.feign.TraceFeignClient - The modified request equals GET http://localhost:9001/product/findAll HTTP/1.1

X-B3-ParentSpanId: cbe97e67ce162943
X-B3-Sampled: 0
X-B3-TraceId: cbe97e67ce162943
X-Span-Name: http:/product/findAll
X-B3-SpanId: bb1798f7a7c9c142

# 以下为服务端的日志输出:
[findAll] to a span [Trace: cbe97e67ce162943, Span: bb1798f7a7c9c142, Parent: cbe97e67ce162943, exportable:false]
Adding a class tag with value [ProductController] to a span [Trace: cbe97e67ce162943, Span: bb1798f7a7c9c142, Parent: cbe97e67ce162943, exportable:false]

基于服务的追踪

基于服务的追踪是目前最为常见的实现方式,被 ZipkinPinpointSkyWalking 等主流链路追踪系统广泛采用。其实现思路是:通过某些手段给目标应用注入追踪探针(Probe),针对 Java 应用一般就是通过 Java Agent 注入的。探针在结构上可视为一个寄生在目标服务身上的小型微服务系统,它一般会有自己专用的服务注册、心跳检测等功能,有专门的数据收集协议,把从目标系统中监控得到的服务调用信息,通过另一次独立的 HTTP 或者 RPC 请求发送给追踪系统。

基于服务的追踪有以下特点:

  • 侵入性强,会有性能损耗
  • 追踪更加精准、稳定

因此,基于服务的追踪会比基于日志的追踪消耗更多的资源,也有更强的侵入性,换来的收益是追踪的精确性与稳定性都有所保证,不必再依靠日志归集来传输追踪数据。

基于边车代理的追踪

基于边车代理的追踪是服务网格的专属方案,也是最理想的分布式追踪模型,它对应用完全透明,无论是日志还是服务本身都不会有任何变化;它与编程语言无关,无论应用采用什么编程语言实现,只要它还是通过网络(HTTP 或者 gRPC)来访问服务就可以被追踪到;它有自己独立的数据通道,追踪数据通过控制平面进行上报,避免了追踪对程序通信或者日志归集的依赖和干扰,保证了最佳的精确性。如果要说这种追踪实现方式还有什么缺点的话,那就是服务网格现在还不够普及,未来随着云原生的发展,相信它会成为追踪系统的主流实现方式之一。还有就是边车代理本身的对应用透明的工作原理决定了它只能实现服务调用层面的追踪,本地方法调用级别的追踪诊断是做不到的。

现在市场占有率最高的代理 Envoy 就提供了相对完善的追踪功能,但没有提供自己的界面端和存储端,所以 Envoy 和 Sleuth 一样都属于狭义的追踪系统,需要配合专门的 UI 与存储来使用,现在 ZipkinSkyWalkingJaegerLightStep Tracing 等系统都可以接受来自于 Envoy 的追踪数据,充当它的界面端。

数据处理

数据处理负责数据的存储与计算,就是将数据采集的数据按需计算,然后落地存储供查询使用。

数据处理的需求一般分为两类,一类是实时计算需求,一类是离线计算需求。

实时计算需求对计算效率要求比较高,一般要求对收集的链路数据能够在秒级别完成聚合计算,以供实时查询。而离线计算需求对计算效率要求就没那么高了,一般能在小时级别完成链路数据的聚合计算即可,一般用作数据汇总统计。针对这两类不同的数据处理需求,采用的计算方法和存储也不相同。

  • 实时数据处理:针对实时数据处理,一般采用 Flink、Storm、Spark Streaming 来对链路数据进行实时聚合加工,存储一般使用 OLTP 数据仓库,比如 HBase,使用 traceId 作为 RowKey,能天然地把一整条调用链聚合在一起,提高查询效率。
  • 离线数据处理:针对离线数据处理,一般通过运行 MapReduce 或者 Spark 批处理程序来对链路数据进行离线计算,存储一般使用 Hive。

数据展示

数据展示层的作用就是将处理后的链路信息以图形化的方式展示给用户。

实际项目中主要用到两种图形展示,一种是调用链路图,一种是调用拓扑图。

调用链路图

调用链路图一般展示服务总耗时、服务调用的网络深度、每一层经过的系统,以及多少次调用。调用链路图在实际项目中,主要是被用来做故障定位,比如某一次用户调用失败了,可以通过调用链路图查询这次用户调用经过了哪些环节,到底是哪一层的调用失败所导致。

下面是 Zipkin 的调用链路图:

img

调用拓扑图

调用拓扑图一般展示系统内都包含哪些应用,它们之间是什么关系,以及依赖调用的 QPS、平均耗时情况。调用拓扑图是一种全局视野图,在实际项目中,主要用作全局监控,用于发现系统中异常的点,从而快速做出决策。比如,某一个服务突然出现异常,那么在调用链路拓扑图中可以看出对这个服务的调用耗时都变高了,可以用红色的图样标出来,用作监控报警。

下面是 Pinpoint 的调用链路图:

img

链路追踪主流技术

链路追踪的主流开源产品比较丰富,主要有:

  • Zipkin - Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
  • Pinpoint - 是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
  • SkyWalking - 是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
  • CAT - CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。
  • OpenTelemetry - OpenCensus 和 OpenTracing 两个项目的合并。OpenTelemetry 是工具、API 和 SDK 的集合。用于检测、生成、收集和导出遥测数据(指标、日志和和追踪),以辅助分析软件的性能和行为。
  • OpenTracing - 是一套与平台无关、与厂商无关、与语言无关的追踪协议规范。官方提供多种语言的链路追踪库实现。目前官方已经不再维护。

参考资料

如何建设监控体系

当服务消费者与服务提供者之间建立了通信,作为管理者需要通过监控手段来观察服务是否正常,调用是否成功。服务监控是很复杂的,在微服务架构下,一次用户调用会因为服务化拆分后,变成多个不同服务之间的相互调用,这也就需要对拆分后的每个服务都监控起来。

监控的意义

  • 发现问题:当系统出现问题或故障,监控系统应根据监控对象的数据异常,及时发现问题,触发告警。
  • 定位问题:监控系统的告警提示,通常应该指明问题的影响范围(如某机器 IP、某机房),触发故障的内容(数据库、MQ 或某服务的某监控数据异常),触发时间等等。有了这些必要的信息,有利于工程师分析问题时缩小排查范围,更快找到问题原因。
  • 解决问题:一旦分析清楚故障的原因后,就需要根据故障的重要度、紧急程度、影响范围等要素,去决定应该如何应对故障。
  • 总结问题:如果发生了重大故障后,需要对故障进行复盘,总结故障的原因和应对故障时的措施,思考在事前有没有更好的防范手段;在事后的应对故障的处理有没有改进的空间。

监控目标

  • 对系统不间断实时监控:实际上是对系统不间断的实时监控(这就是监控)
  • 实时反馈系统当前状态:我们监控某个硬件、或者某个系统,都是需要能实时看到当前系统的状态,是正常、异常、或者故障
  • 保证服务的可靠性、安全性:我们监控的目的就是要保证系统、服务、业务正常运行
  • 保证业务持续稳定运行:如果我们的监控做得很完善,即使出现故障,能第一时间接收到故障告警,在第一时间处理解决,从而保证业务持续性的稳定运行。

监控方法

  • 明确监控对象:根据业务和系统的实际需要,明确需要监控的对象。
  • 确定性能基准指标:确定了监控对象,接下来,要确定该监控对象的性能基准。如:CPU 使用率、吞吐量等。
  • 定义告警阈值:监控对象什么情况是正常的,什么情况是异常的,什么情况是有故障的?
  • 故障处理流程:当监控对象达到告警阈值时,应如何应对?触发怎样的告警?有没有自动化处理机制,如弹性扩容等?有没有熔断、降级等?

监控流程

一旦明确了要监控的对象,接下就是考虑如何监控。

完整的监控流程主要包括以下环节:采集、传输、存储、分析、展示、告警、处理。

数据采集

通常有两种数据收集方式:

  • 服务主动上报:这种处理方式通过在业务代码或者服务框架里加入数据收集代码逻辑,在每一次服务调用完成后,主动上报服务的调用信息。这种方式在链路跟踪中较为常见,主流的技术方案有:Zipkin。
  • 代理收集:这种处理方式通过服务调用后把调用的详细信息记录到本地日志文件中,然后再通过代理去解析本地日志文件,然后再上报服务的调用信息。主流的技术方案有:ELK、Flume。

数据传输

数据传输最常用的方式有两种:

  • UDP 传输:这种处理方式是数据处理单元提供服务器的请求地址,数据采集后通过 UDP 协议与服务器建立连接,然后把数据发送过去。
  • Kafka 传输:这种处理方式是数据采集后发送到指定的 Topic,然后数据处理单元再订阅对应的 Topic,就可以从 Kafka 消息队列中读取到对应的数据。由于 Kafka 有非常高的吞吐能力,所以很适合作为大数据量的缓冲池。

数据存储

上报的监控数据需要存储,不同监控系统选择的存储非常多样化。比较常见的有:

  • 时序数据库:InfluxDB(如:Prometheus)
  • 列式数据库:OpenTSDB 用 Hbase 存储所有时序(无须采样)的数据,来构建一个分布式、可伸缩的时间序列数据库。它支持秒级数据采集,支持永久存储,可以做容量规划,并很容易地接入到现有的告警系统里。
  • SQL 数据库:Zabbix 使用关系型数据库 Mysql 存储数据。
  • 搜索引擎数据库:ELK 使用 Elasticsearch 存储数据,以倒排索引的数据结构存储,需要查询的时候,根据索引来查询。

数据处理

数据处理是对收集来的原始数据进行聚合计算并存储。数据聚合通常有两个维度:

  • 接口维度聚合:这个维度是把实时收到的数据按照接口名维度实时聚合在一起,这样就可以得到每个接口的每秒请求量、平均耗时、成功率等信息。
  • 机器维度聚合:这个维度是把实时收到的数据按照调用的节点维度聚合在一起,这样就可以从单机维度去查看每个接口的实时请求量、平均耗时等信息。

数据展示

数据展示是把处理后的数据以 Dashboard 的方式展示给用户。数据展示有多种方式,比如曲线图、饼状图、格子图展示等。

监控告警

监控告警的形式很多,如:电话告警、邮件告警、短信告警、IM 告警等。

此外,告警需要根据甄别故障的影响范围,以确定故障级别,如:重要度、紧急度等。根据故障的级别,通知需要介入的人员,快速响应处理。

监控对象

服务监控一定是通过观察数据来量化分析,所以首先要明确需要监控什么。

一般来说,服务监控数据有以下分类:

  • 基础层监控
    • CPU:CPU 利用率、用户态利用率、内核态利用率、单核平均负载
    • 内存:内存使用量、内存剩余量
    • 磁盘:磁盘使用量、磁盘使用率
    • 网络:网络流量、丢包数、错包数、连接数等。
    • 温度
    • 电压
    • 等等
  • 中间层监控
    • 数据库
      • Mysql:集群健康状况、磁盘使用率、连接数、慢日志等
      • Redis:集群健康状况、内存使用量、CPU 使用率、内存使用率、连接数、对象数、慢日志等
      • Elasticsearch:集群健康状况、CPU 使用率、内存使用率
      • MongoDB:集群健康状况、
      • 等等
    • 中间件
      • MQ:QPS、消息成功数、消息失败数、传输耗时、消息堆积量
      • 任务调度
      • 等等
  • 应用层监控:接口监控、访问服务、SQL、内存使用率、响应时间、TPS、QPS 等。
  • 业务监控:核心指标、登录、登出、下单、支付等。
  • 客户端监控:性能、返回码、地域、运营商、版本、系统等。

监控维度

一般来说,要从多个维度来对业务进行监控,具体来讲可以包括下面几个维度:

  • 全局维度。从整体角度监控对象的的请求量、平均耗时以及错误率,全局维度的监控一般是为了让你对监控对象的调用情况有个整体了解。
  • 机房维度。一般为了业务的高可用性,服务通常部署在不止一个机房,因为不同机房地域的不同,同一个监控对象的各种指标可能会相差很大,所以需要深入到机房内部去了解。
  • 单机维度。即便是在同一个机房内部,可能由于采购年份和批次的不同,位于不同机器上的同一个监控对象的各种指标也会有很大差异。一般来说,新采购的机器通常由于成本更低,配置也更高,在同等请求量的情况下,可能表现出较大的性能差异,因此也需要从单机维度去监控同一个对象。
  • 时间维度。同一个监控对象,在每天的同一时刻各种指标通常也不会一样,这种差异要么是由业务变更导致,要么是运营活动导致。为了了解监控对象各种指标的变化,通常需要与一天前、一周前、一个月前,甚至三个月前做比较。
  • 核心维度。业务上一般会依据重要性程度对监控对象进行分级,最简单的是分成核心业务和非核心业务。核心业务和非核心业务在部署上必须隔离,分开监控,这样才能对核心业务做重点保障。

监控技术

  • ELK 的技术栈比较成熟,应用范围也比较广,除了可用作监控系统外,还可以用作日志查询和分析。
  • Graphite 是基于时间序列数据库存储的监控系统,并且提供了功能强大的各种聚合函数比如 sum、average、top5 等可用于监控分析,而且对外提供了 API 也可以接入其他图形化监控系统如 Grafana。
  • TICK 的核心在于其时间序列数据库 InfluxDB 的存储功能强大,且支持类似 SQL 语言的复杂数据处理操作。
  • Prometheus 的独特之处在于它采用了拉数据的方式,对业务影响较小,同时也采用了时间序列数据库存储,而且支持独有的 PromQL 查询语言,功能强大而且简洁。
  • OpenTSDB 用 Hbase 存储所有时序(无须采样)的数据,来构建一个分布式、可伸缩的时间序列数据库。它支持秒级数据采集,支持永久存储,可以做容量规划,并很容易地接入到现有的告警系统里。OpenTSDB 可以从大规模的集群(包括集群中的网络设备、操作系统、应用程序)中获取相应的采集指标,并进行存储、索引和服务,从而使这些数据更容易让人理解,如 Web 化、图形化等。
  • Zabbix 是一个分布式监控系统,支持多种采集方式和采集客户端,有专用的 Agent 代理,也支持 SNMP、IPMI、JMX、Telnet、SSH 等多种协议,它将采集到的数据存放到数据库,然后对其进行分析整理,达到条件触发告警。其灵活的扩展性和丰富的功能是其他监控系统所不能比的。相对来说,它的总体功能做的非常优秀。

参考资料

网关路由

什么是网关

网关的首要职责就是:作为统一的出口,对外提供服务;将外部访问网关地址的流量,根据适当的规则路由到内部集群中正确的服务节点之上。因此,微服务中的网关,也常被称为“服务网关”或“API 网关”。

网关首先应该是个路由器,在满足此前提的基础上,网关还可以根据需要作为流量过滤器来使用,提供某些额外的可选的功能。网关常见的能力如下:

  • 动态路由:根据请求路由到对应的服务上去,如果服务不可用还会有重试机制
  • 负载均衡:多服务器提供同一种服务,网关会从配置中心拉取各服务注册信息,然后将请求负载均衡风阀到这些服务器进行处理
  • 流量控制:限制并发请求的流量,避免内部系统受到冲击
  • 安全认证:网关对相关权限验证、脱敏和流量清洗、签名和黑名单功能
  • 熔断降级:当服务不可用或者访问量过大,网关可以将请求做降级,将流量打到其他服务器或者做其他处理,提示用户暂时不可用
  • 灰度发布:先进行小部分服务器升级,通过网关将少量的服务路由到已升级的服务器用来测试服务是否正常,大部分请求依旧在老版本服务器上处理
  • 日志服务:服务访问情况监控和统计报表,请求的吞吐量、并发数、流量监控、性能监控和日常告警等

简单来说:

网关 = 路由器(基础职能) + 过滤器(可选职能)

什么是服务路由

服务路由是指通过一定的规则从集群中选择合适的节点。

负载均衡的作用和服务路由的功能看上去很近似,二者有什么区别呢?

负载均衡的目标是提供服务分发而不是解决路由问题,常见的静态、动态负载均衡算法也无法实现精细化的路由管理,但是负载均衡也可以简单看做是路由方案的一种。

服务路由通常用于以下场景,目的在于实现流量隔离:

  • 分组调用:一般来讲,为了保证服务的高可用性,实现异地多活的需求,一个服务往往不止部署在一个数据中心,而且出于节省成本等考虑,有些业务可能不仅在私有机房部署,还会采用公有云部署,甚至采用多家公有云部署。服务节点也会按照不同的数据中心分成不同的分组,这时对于服务消费者来说,选择哪一个分组调用,就必须有相应的路由规则。
  • 蓝绿发布:蓝绿发布场景中,一共有两套服务群组:一套是提供旧版功能的服务群组,标记为绿色;另一套是提供新版功能的服务群组,标记为蓝色。两套服务群组都是功能完善的,并且正在运行的系统,只是服务版本和访问流量不同。新版群组(蓝色)通常是为了做内部测试、验收,不对外部用户暴露。
    • 如果新版群组(蓝色)运行稳定,并测试、验收通过后,则通过服务路由、负载均衡等手段逐步将外部用户流量导向新版群组(蓝色)。
    • 如果新版群组(蓝色)运行不稳定,或测试、验收不通过,则排查、解决问题后,再继续测试、验收。
  • 灰度发布:灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行 A/B 测试,即让一部分用户使用特性 A,一部分用户使用特性 B:如果用户对 B 没有什么反对意见,那么逐步扩大发布范围,直到把所有用户都迁移到 B 上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。要支持灰度发布,就要求服务能够根据一定的规则,将流量隔离。
  • 流量切换:在业务线上运行过程中,经常会遇到一些不可抗力因素导致业务故障,比如某个机房的光缆被挖断,或者发生着火等事故导致整个机房的服务都不可用。这个时候就需要按照某个指令,能够把原来调用这个机房服务的流量切换到其他正常的机房。
  • 线下测试联调:线下测试时,可能会缺少相应环境。可以将测试应用注册到线上,然后开启路由规则,在本地进行测试。
  • 读写分离。对于大多数互联网业务来说都是读多写少,所以在进行服务部署的时候,可以把读写分开部署,所有写接口可以部署在一起,而读接口部署在另外的节点上。

服务路由的规则

条件路由

条件路由是基于条件表达式的路由规则。各个 RPC 框架的条件路由表达式各不相同。

我们不妨参考一下 Dubbo 的条件路由。Dubbo 的条件路由有两种配置粒度,如下:

  • 应用粒度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # app1的消费者只能消费所有端口为20880的服务实例
    # app2的消费者只能消费所有端口为20881的服务实例
    ---
    scope: application
    force: true
    runtime: true
    enabled: true
    key: governance-conditionrouter-consumer
    conditions:
    - application=app1 => address=*:20880
    - application=app2 => address=*:20881
  • 服务粒度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # DemoService的sayHello方法只能消费所有端口为20880的服务实例
    # DemoService的sayHi方法只能消费所有端口为20881的服务实例
    ---
    scope: service
    force: true
    runtime: true
    enabled: true
    key: org.apache.dubbo.samples.governance.api.DemoService
    conditions:
    - method=sayHello => address=*:20880
    - method=sayHi => address=*:20881

其中,conditions 定义具体的路由规则内容。conditions 部分是规则的主体,由 1 到任意多条规则组成。详见:Dubbo 路由规则

Dubbo 的条件路由规则由两个条件组成,分别用于对服务消费者和提供者进行匹配。条件路由规则的格式如下:

1
[服务消费者匹配条件] => [服务提供者匹配条件]
  • 服务消费者匹配条件:所有参数和消费者的 URL 进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。
  • 服务提供者匹配条件:所有参数和提供者的 URL 进行对比,消费者最终只拿到过滤后的地址列表。

condition:// 代表了这是一段用条件表达式编写的路由规则,下面是一个条件路由规则示例:

1
host = 10.20.153.10 => host = 10.20.153.11

该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务,不可调用其他机器上的服务。

下面列举一些 Dubbo 条件路由的典型应用场景:

  • 如果服务消费者的匹配条件为空,就表示所有的服务消费者都可以访问,就像下面的表达式一样。
1
=> host != 10.20.153.11
  • 如果服务提供者的过滤条件为空,就表示禁止所有的服务消费者访问,就像下面的表达式一样。
1
host = 10.20.153.10 =>
  • 排除某个服务节点
1
=> host != 172.22.3.91
  • 白名单
1
register.ip != 10.20.153.10,10.20.153.11 =>
  • 黑名单
1
register.ip = 10.20.153.10,10.20.153.11 =>
  • 只暴露部分机器节点
1
=> host = 172.22.3.1*,172.22.3.2*
  • 为重要应用提供额外的机器节点
1
application != kylin => host != 172.22.3.95,172.22.3.96
  • 读写分离
1
2
method = find*,list*,get*,is* => host = 172.22.3.94,172.22.3.95,172.22.3.96
method != find*,list*,get*,is* => host = 172.22.3.97,172.22.3.98
  • 前后台分离
1
2
application = bops => host = 172.22.3.91,172.22.3.92,172.22.3.93
application != bops => host = 172.22.3.94,172.22.3.95,172.22.3.96
  • 隔离不同机房网段
1
host != 172.22.3.* => host != 172.22.3.*
  • 提供者与消费者部署在同集群内,本机只访问本机的服务
1
=> host = $host

脚本路由

脚本路由是基于脚本语言的路由规则,常用的脚本语言比如 JavaScript、Groovy、JRuby 等。

1
"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("(function route(invokers) { ... } (invokers))")

这里面 script:// 就代表了这是一段脚本语言编写的路由规则,具体规则定义在脚本语言的 route 方法实现里,比如下面这段用 JavaScript 编写的 route() 方法表达的意思是,只有 IP 为 10.20.153.10 的服务消费者可以发起服务调用。

1
2
3
4
5
6
7
8
9
function route(invokers){
var result = new java.util.ArrayList(invokers.size());
for(i =0; i < invokers.size(); i ++){
if("10.20.153.10".equals(invokers.get(i).getUrl().getHost())){
result.add(invokers.get(i));
}
}
return result;
} (invokers));

标签路由

标签路由通过将某一个或多个服务的提供者划分到同一个分组,约束流量只在指定分组中流转,从而实现流量隔离的目的,可以作为蓝绿发布、灰度发布等场景的能力基础。

标签主要是指对服务提供者的分组,目前有两种方式可以完成实例分组,分别是动态规则打标静态规则打标。一般,动态规则优先级比静态规则更高,当两种规则同时存在且出现冲突时,将以动态规则为准。

以 Dubbo 的标签路由用法为例

(1)动态规则打标,可随时在服务治理控制台下发标签归组规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# governance-tagrouter-provider应用增加了两个标签分组tag1和tag2
# tag1包含一个实例 127.0.0.1:20880
# tag2包含一个实例 127.0.0.1:20881
---
force: false
runtime: true
enabled: true
key: governance-tagrouter-provider
tags:
- name: tag1
addresses: ["127.0.0.1:20880"]
- name: tag2
addresses: ["127.0.0.1:20881"]
...

(2)静态规则打标

1
<dubbo:provider tag="tag1"/>

or

1
<dubbo:service tag="tag1"/>

or

1
java -jar xxx-provider.jar -Ddubbo.provider.tag={the tag you want, may come from OS ENV}

(3)服务消费者指定标签路由

1
RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY,"tag1");

请求标签的作用域为每一次 invocation,使用 attachment 来传递请求标签,注意保存在 attachment 中的值将会在一次完整的远程调用中持续传递,得益于这样的特性,我们只需要在起始调用时,通过一行代码的设置,达到标签的持续传递。

路由规则获取方式

路由规则的获取方式主要有三种:

  • 本地静态配置:顾名思义就是路由规则存储在服务消费者本地上。服务消费者发起调用时,从本地固定位置读取路由规则,然后按照路由规则选取一个服务节点发起调用。
  • 配置中心管理:这种方式下,所有的服务消费者都从配置中心获取路由规则,由配置中心来统一管理。
  • 注册中心动态下发:这种方式下,一般是运维人员或者开发人员,通过服务治理平台修改路由规则,服务治理平台调用配置中心接口,把修改后的路由规则持久化到配置中心。因为服务消费者订阅了路由规则的变更,于是就会从配置中心获取最新的路由规则,按照最新的路由规则来执行。

一般来讲,服务路由最好是存储在配置中心,由配置中心来统一管理。这样的话,所有的服务消费者就不需要在本地管理服务路由,因为大部分的服务消费者并不关心服务路由的问题,或者说也不需要去了解其中的细节。通过配置中心,统一给各个服务消费者下发统一的服务路由,节省了沟通和管理成本。

但也不排除某些服务消费者有特定的需求,需要定制自己的路由规则,这个时候就适合通过本地配置来定制。

而动态下发可以理解为一种高级功能,它能够动态地修改路由规则,在某些业务场景下十分有用。比如某个数据中心存在问题,需要把调用这个数据中心的服务消费者都切换到其他数据中心,这时就可以通过动态下发的方式,向配置中心下发一条路由规则,将所有调用这个数据中心的请求都迁移到别的地方。

参考资料

微服务简介

本文关键词:定义演进利弊如何拆分容量规划核心组件

什么是微服务

微服务定义

微服务是由单一应用程序构成的小服务,拥有自己的进程与轻量化处理,服务依业务功能设计,以全自动的方式部署,与其他服务使用 HTTP API 通讯。同时,服务会使用最小规模的集中管理 (例如 Docker)技术,服务可以用不同的编程语言与数据库等。

——Martin Fowler 和 James Lewis 对应微服务( Microservices)的定义

个人理解,微服务是一种架构模式,它提倡将一个单一应用拆分为一些可独立运行可协同工作小的服务。在微服务架构中,每个小服务都拥有独立的进程和轻量级通信。这些服务是围绕业务能力构建的,并且可以通过全自动化部署机制独立部署。这些服务会使用最小规模的集中式管理能力(例如 Docker) ,可以用不同的编程语言编写并使用不同的数据存储技术。

从以上定义,我们可以提炼出微服务的关键特性:

  • 可独立运行 - 微服务是一个个可以独立开发、独立部署、独立运行的系统或进程。
  • 可协同工作 - 微服务之间不是完全隔离的,彼此需要协同工作,通常是采用 RPC 方式。
  • 分而治之 - 微服务本质上是一种分治思想,即把一个复杂业务拆分为多个子业务。这使得每个子业务更加高内聚、低耦合,从而能聚焦自身的功能。

微服务的演进

互联网应用架构大致的演进方向为:单体架构 -> 服务化架构 -> 微服务架构。在演化过程中,架构越来越复杂,一个应用被拆分的服务也越来越细。

互联网早期的技术栈通常为 LAMP(Linux + Apache + MySQL + PHP)或 MVC(Spring + iBatis/Hibernate + Tomcat)。这两种架构都是典型的单体应用架构。其优点是技术栈简单,因此学习上手快,部署也容易。

随着业务越来越复杂,开发团队规模不断扩张,单体应用架构就难以适应开发迭代节奏,主要有以下问题:

  • 构建、部署效率低:代码越多,依赖资源越多,则构建、部署的耗费时间自然会越长。即使每次修改一个很小的功能点,也不得不全量构建、全部部署,耗时耗力。
  • 团队协作成本高:单体应用的代码往往在一个工程中,而一个工程中的开发人员越多,显然沟通成本越高。
  • 可用性差:因为所有的功能开发最后都部署到同一个 WAR 包里,运行在同一个 Tomcat 进程之中,一旦某一功能涉及的代码或者资源有问题,那就会影响整个 WAR 包中部署的功能。

服务化:本地方法调用 转为 远程方法调用(RPC)

微服务和服务化的差异:

  • 服务拆分粒度更细
  • 服务独立部署、维护
  • 服务治理要求高

微服务架构有以下 4 个特点:

  • 服务拆分粒度更细:根据业务拆分。
  • 独立部署:每个服务在物理上相互隔离。独立部署的好处在于:如果没有拆分服务,那么任何修改都必须重新部署才能更新;而拆分为多个服务后,只有被修改的服务需要重部署。
  • 独立维护:根据组织架构拆分,分团队维护。
  • 服务治理:服务数量变多,需要有统一的服务治理平台。

简单来说,微服务就是将庞杂臃肿的单体应用拆分成细粒度的服务,独立部署,并交给各个中小团队来负责开发、测试、上线和运维整个生命周期。

  • 通过服务组件化
  • 独立的进程
  • 独立部署
  • 轻量级通信
  • 基于业务能力
  • 无集中式管理

微服务的利弊

《人月神话》中有一个软件工程界的著名理论——“没有银弹”,即世间没有能包治百病的良药,也没有能解决所有问题的架构。微服务架构的利和弊都非常突出,在实际业务场景中是否采用,如何采用,需要具体去分析、权衡。

微服务的利弊如下:

  • 优点
    • 易于扩展
    • 部署简单
    • 技术异构性
  • 缺点
    • 分布式复杂度
    • 最终一致性
    • 测试复杂度
    • 运维复杂度

康威定律

  • 第一定律:组织沟通方式会通过系统设计表达出来
  • 第二定律:时间再多一件事情也不可能做的完美,但总有时间做完一件事情
  • 第三定律:线型系统和线型组织架构间有潜在的异质同态特性
  • 第四定律:大的系统组织总是比小系统更倾向于分解

如何拆分微服务

应用微服务化架构前,要思考几个问题:什么时候进行服务化拆分?如何拆分服务?

一般来说,当应用复杂度、开发团队膨胀到难以维护时,就该考虑服务化拆分了。从经验上来看,一般开发人员超 10 人,就可以考虑服务化拆分了。

拆分微服务的思考维度

拆分服务的思考维度:

  • 业务维度:业务和数据关系密切的应该拆分为一个微服务,而功能相对比较独立的业务适合单独拆分为一个微服务。
  • 功能维度:公共功能聚合为一个服务。标准是是否被多个其他服务调用,且依赖的资源独立不与其他业务耦合。
  • 组织架构:根据实际组织架构,天然分为不同的团队,每个团队独立维护若干微服务。

但并不是说功能拆分的越细越好,过度的拆分反而会让服务数量膨胀变得难以管理,因此找到符合自己业务现状和团队人员技术水平的拆分粒度才是可取的。

拆分微服务的原则

  • 单一职责 - 高内聚,低耦合
  • 先粗后细,逐渐细化
  • 渐进式迭代
  • 考虑扩展性

拆分微服务的前置条件

微服务主要依赖几个基本组件:

  • 服务如何定义
    • 对于单体应用来说,不同功能模块之前相互交互时,通常是以类库的方式来提供各个模块的功能。
    • 对于微服务来说,每个服务都运行在各自的进程之中,无论采用哪种通讯协议,是 HTTP 还是 RPC,服务之间的调用都通过接口来约定如何交互。约定内容包括接口名、接口参数以及接口返回值。
  • 服务如何发布和订阅
    • 单体应用由于部署在同一个 WAR 包里,接口之间的调用属于进程内的调用。
    • 对于微服务来说,服务提供者需要向注册中心发布自己提供的服务(暴露接口信息以及接口地址);服务消费者向注册中心订阅哪些服务可用。
  • 服务如何监控?通常对于一个服务,我们最关心的是 QPS(调用量)、AvgTime(平均耗时)以及 P999(99.9% 的请求性能在多少毫秒以内)这些指标。这时候你就需要一种通用的监控方案,能够覆盖业务埋点、数据收集、数据处理,最后到数据展示的全链路功能。
  • 服务如何治理?可以想象,拆分为微服务架构后,服务的数量变多了,依赖关系也变复杂了。比如一个服务的性能有问题时,依赖的服务都势必会受到影响。可以设定一个调用性能阈值,如果一段时间内一直超过这个值,那么依赖服务的调用可以直接返回,这就是熔断,也是服务治理最常用的手段之一。
  • 故障如何定位?在单体应用拆分为微服务之后,一次用户调用可能依赖多个服务,每个服务又部署在不同的节点上,如果用户调用出现问题,你需要有一种解决方案能够将一次用户请求进行标记,并在多个依赖的服务系统中继续传递,以便串联所有路径,从而进行故障定位。

应用微服务架构,必须要先解决以上问题。

服务容量规划

容量规划系统的作用是根据各个微服务部署集群的最大容量和线上实际运行的负荷,来决定各个微服务是否需要弹性扩缩容,以及需要扩缩容多少台机器

微服务容量规划的挑战

微服务容量规划的复杂度主要来自以下方面:

  • 服务数量众多
  • 服务的接口表现差异巨大
  • 服务部署的集群规模大小不同
  • 服务之间还存在依赖关系

如何评估容量

容量评估需要关注的维度:

  • 选择合适的压测指标 - 主要有两类
    • 系统类指标 - CPU 使用率、内存占有量、I/O 使用率、网卡带宽等
    • 服务类指标 - 接口响应的平均耗时、P999 耗时、错误率等
  • 压测获取单机的最大容量
    • 单机压测 - 可以采用以下流量回放手段来模拟线上流量:
      • 日志回放
      • TCP-Copy
    • 集群压测 - 一般做法是通过不断把线上集群的节点摘除,以减少机器数的方式,来增加线上节点单机的流量,从而达到压测的目的。
  • 实时获取集群的运行负荷 - 集群的运行负荷也需要通过采用区间加权的方式来计算,但是因为集群的规模可能很大,超过上千台机器,显然通过计算每台单机运行的负荷再加在一起的方式效率不高。一种参考方式是:统计每台单机在不同耗时区间内的请求数,推送到集中处理的地方进行聚合,将同一个集群内的单机位于不同耗时区间内的请求进行汇总,就得到整个集群的请求在不同耗时区间内的分布了,再利用区间加权的方式就可以计算整个集群的运行负荷。

如何伸缩容量

伸缩容量的一种参考方式是使用水位线来决策扩容或是缩容水位线就是集群的最大容量除以集群的实际运行负荷,可以实时监控集群的水位线。

通常,可以为集群监控设置两条水位线:一条是安全线,一条是致命线。当集群的水位线位于致命线以下时,就需要立即扩容;在扩容一定数量的机器后,水位线回到安全线以上并保持一段时间后,就可以进行缩容了。

  • 扩容 - 扩容有两种方式:按数量、按比例(更常见做法)。
  • 缩容 - 为了避免抖动,缩容不应该一次性完成,而应该按比例逐步完成。过程中,应该多次采集水位线,满足一定比例才继续缩容。

微服务的核心组件

微服务架构下,服务调用主要依赖下面几个核心组件:

  • 服务定义
  • 注册中心
  • 服务调用
  • 服务监控
  • 服务治理

服务定义

服务调用首先要解决的问题就是服务如何对外描述。比如,你对外提供了一个服务,那么这个服务的服务名叫什么?调用这个服务需要提供哪些信息?调用这个服务返回的结果是什么格式的?该如何解析?这些就是服务定义要解决的问题。

常用的服务定义方式包括 REST API、XML 配置以及 IDL 文件三种。

  • REST API - REST API 方式通常用于 HTTP 协议的服务定义,并且常用 Wiki 或者Swagger来进行管理。
  • XML - XML 配置方式多用作 RPC 协议的服务定义,通过 *.xml 配置文件来定义接口名、参数以及返回值类型等。
  • IDL - IDL 文件方式通常用作 Thrift 和 gRPC 这类跨语言服务调用框架中,比如 gRPC 就是通过 Protobuf 文件来定义服务的接口名、参数以及返回值的数据结构。

注册中心

有了服务的接口描述,下一步要解决的问题就是服务的发布和订阅,就是说你提供了一个服务,如何让外部想调用你的服务的人知道。这个时候就需要一个类似注册中心的角色,服务提供者将自己提供的服务以及地址登记到注册中心,服务消费者则从注册中心查询所需要调用的服务的地址,然后发起请求。

一般来讲,注册中心的工作流程是:

  • 服务提供者在启动时,根据服务发布文件中配置的发布信息向注册中心注册自己的服务。
  • 服务消费者在启动时,根据消费者配置文件中配置的服务信息向注册中心订阅自己所需要的服务。
  • 注册中心返回服务提供者地址列表给服务消费者。
  • 当服务提供者发生变化,比如有节点新增或者销毁,注册中心将变更通知给服务消费者。

服务调用

服务消费者发起调用需解决以下问题:

  • 服务通信采用什么协议?就是说服务提供者和服务消费者之间以什么样的协议进行网络通信,是采用四层 TCP、UDP 协议,还是采用七层 HTTP 协议,还是采用其他协议?
  • 数据传输采用什么方式?就是说服务提供者和服务消费者之间的数据传输采用哪种方式,是同步还是异步,是在单连接上传输,还是多路复用。
  • 序列化采用什么方式?通常数据传输都会对数据进行序列化、压缩,来减少网络传输的数据量,从而减少带宽消耗和网络传输时间,比如常见的 JSON 序列化、Java 对象序列化以及 Protobuf 序列化等。

服务监控

一旦服务消费者与服务提供者之间能够正常发起服务调用,你就需要对调用情况进行监控,以了解服务是否正常。通常来讲,服务监控主要包括三个流程。

  • 数据收集。就是要把每一次服务调用的请求耗时以及成功与否收集起来,并上传到集中的数据处理中心。
  • 数据处理。有了每次调用的请求耗时以及成功与否等信息,就可以计算每秒服务请求量、平均耗时以及成功率等指标。
  • 数据展示。数据收集起来,经过处理之后,还需要以友好的方式对外展示,才能发挥价值。通常都是将数据展示在 Dashboard 面板上,并且每隔 10s 等间隔自动刷新,用作业务监控和报警等。

除了需要对服务调用情况进行监控之外,你还需要记录服务调用经过的每一层链路,以便进行问题追踪和故障定位。

服务链路追踪的工作原理大致如下:

  • 服务消费者发起调用前,会在本地按照一定的规则生成一个 requestid,发起调用时,将 requestid 当作请求参数的一部分,传递给服务提供者。
  • 服务提供者接收到请求后,记录下这次请求的 requestid,然后处理请求。如果服务提供者继续请求其他服务,会在本地再生成一个自己的 requestid,然后把这两个 requestid 都当作请求参数继续往下传递。

以此类推,通过这种层层往下传递的方式,一次请求,无论最后依赖多少次服务调用、经过多少服务节点,都可以通过最开始生成的 requestid 串联所有节点,从而达到服务追踪的目的。

服务治理

服务监控能够发现问题,服务追踪能够定位问题所在,而解决问题就得靠服务治理了。服务治理就是通过一系列的手段来保证在各种意外情况下,服务调用仍然能够正常进行。

在生产环境中,你应该经常会遇到下面几种状况。

  • 单机故障。通常遇到单机故障,都是靠运维发现并重启服务或者从线上摘除故障节点。然而集群的规模越大,越是容易遇到单机故障,在机器规模超过一百台以上时,靠传统的人肉运维显然难以应对。而服务治理可以通过一定的策略,自动摘除故障节点,不需要人为干预,就能保证单机故障不会影响业务。
  • 单 IDC 故障。你应该经常听说某某 App,因为施工挖断光缆导致大批量用户无法使用的严重故障。而服务治理可以通过自动切换故障 IDC 的流量到其他正常 IDC,可以避免因为单 IDC 故障引起的大批量业务受影响。
  • 依赖服务不可用。比如你的服务依赖依赖了另一个服务,当另一个服务出现问题时,会拖慢甚至拖垮你的服务。而服务治理可以通过熔断,在依赖服务异常的情况下,一段时期内停止发起调用而直接返回。这样一方面保证了服务消费者能够不被拖垮,另一方面也给服务提供者减少压力,使其能够尽快恢复。

上面是三种最常见的需要引入服务治理的场景,当然还有一些其他服务治理的手段比如自动扩缩容,可以用来解决服务的容量问题。

参考资料