Dunwu Blog

大道至简,知易行难

RPC 进阶篇

RPC 架构模型

了解前面的知识点(序列化、动态代理、通信),其实已经可以实现一个点对点的 RPC 架构了。

采用微内核架构的 RPC 架构模型:

img

在 RPC 框架里面,怎么支持插件化架构的呢?我们可以将每个功能点抽象成一个接
口,将这个接口作为插件的契约,然后把这个功能的接口与功能的实现分离,并提供接口的默认实现。在 Java 里面,JDK 有自带的 SPI(Service Provider Interface)服务发现机
制,它可以动态地为某个接口寻找服务实现。使用 SPI 机制需要在 Classpath 下的 META-INF/services 目录里创建一个以服务接口命名的文件,这个文件里的内容就是这个接口的具体实现类。

但在实际项目中,我们其实很少使用到 JDK 自带的 SPI 机制,首先它不能按需加载,
ServiceLoader 加载某个接口实现类的时候,会遍历全部获取,也就是接口的实现类得全部载入并实例化一遍,会造成不必要的浪费。另外就是扩展如果依赖其它的扩展,那就做不到自动注入和装配,这就很难和其他框架集成,比如扩展里面依赖了一个 Spring Bean,原生的 Java SPI 就不支持。

服务注册和发现

RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。

  • 服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心之中,注册中心将这个服务节点的 IP 和接口保存下来。
  • 服务订阅:在服务调用方启动的时候,去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。

基于 ZooKeeper 的服务发现

使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。

搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能

img

通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。

在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:

  • 注册中心负载过高;
  • 各节点数据不一致;
  • 服务下发不及时或下发错误的服务节点列表。

RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。

基于消息总线的最终一致性的注册中心

ZooKeeper 的一大特点就是强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这也就直接导致了 ZooKeeper 集群性能上的下降。

而 RPC 框架的服务发现,在服务节点刚上线时,服务调用方是可以容忍在一段时间之后
(比如几秒钟之后)发现这个新上线的节点的。毕竟服务节点刚上线之后的几秒内,甚至更长的一段时间内没有接收到请求流量,对整个服务集群是没有什么影响的,所以我们可以牺牲掉 CP(强制一致性),而选择 AP(最终一致),来换取整个注册中心集群的性能和稳定性。

img

健康检查

使用频率适中的心跳去检测目标机器的健康状态

  • 健康状态:建立连接成功,并且心跳探活也一直成功;
  • 亚健康状态:建立连接成功,但是心跳请求连续失败;
  • 死亡状态:建立连接失败。

可以使用可用率来作为健康状态的量化标准

1
可用率 = 一个时间窗口内接口调用成功次数 / 总调用次数

当可用率低于某个比例,就认为这个节点存在问题,把它挪到亚健康列表,这样既考虑了高低频的调用接口,也兼顾了接口响应时间不同的问题。

路由和负载均衡

对于服务调用方来说,一个接口会有多个服务提供方同时提供服务,所以我们的 RPC 在每次发起请求的时候,都需要从多个服务提供方节点里面选择一个用于发请求的节点。这被称为路由策略。

  • IP 路由:最简单的当然是 IP 路由,因为服务上线后,会暴露服务到注册中心,将自身 IP、端口等元信息告知注册中心。这样消费方就可以在向注册中心请求服务地址时,感知其存在。
  • 参数路由:但有时,会有一些复杂的场景,如:灰度发布、定点调用,我们并不希望上线的服务被所有消费者感知,为了更加细粒度的控制,可以使用参数路由。通过参数控制通信的路由策略。

除了特殊场景的路由策略以外,对于机器中多个服务方,如何选择调用哪个服务节点,可以应用负载均衡策略。RPC 负载均衡策略一般包括随机、轮询、一致性 Hash、最近最少连接等。

img

负载均衡详情可以参考:负载均衡基本原理

超时重试

超时重试机制是指:当调用端发起的请求失败或超时未收到响应时,RPC 框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试的次数。

img

限流、降级、熔断

限流方案:Redis + lua、Sentinel

熔断方案:Hystrix

优雅启动关闭

如何避免服务停机带来的业务损失:

img

如何避免流量打到没有启动完成的节点:

img

容错处理

异常重试

就是当调用端发起的请求失败时,RPC 框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试的次数。

当然,不是所有的异常都要触发重试,只有符合重试条件的异常才能触发重试,比如网络超时异常、网络连接异常等等(这个需要 RPC 去判定)。

注意:有时网络可能发生抖动,导致请求超时,这时如果 RPC 触发超时重试,会触发业务逻辑重复执行,如果接口没有幂等性设计,就可能引发问题。如:重发写表。

重试超时时间

连续的异常重试可能会出现一种不可靠的情况,那就是连续的异常重试并且每次处理的请求时间比较长,最终会导致请求处理的时间过长,超出用户设置的超时时间。

解决这个问题最直接的方式就是,在每次重试后都重置一下请求的超时时间。

当调用端发起 RPC 请求时,如果发送请求发生异常并触发了异常重试,我们可以先判定下这个请求是否已经超时,如果已经超时了就直接返回超时异常,否则就先重置下这个请求的超时时间,之后再发起重试。

在所有发起重试、负载均衡选择节点的时候,去掉重试之前出现过问题的那个节点,以保证重试的成功率。

业务异常

RPC 框架是不会知道哪些业务异常能够去进行异常重试的,我们可以加个重试异常的白名
单,用户可以将允许重试的异常加入到这个白名单中。当调用端发起调用,并且配置了异常重试策略,捕获到异常之后,我们就可以采用这样的异常处理策略。如果这个异常是 RPC 框架允许重试的异常,或者这个异常类型存在于可重试异常的白名单中,我们就允许对这个请求进行重试。


综上,一个可靠的 RPC 容错处理机制如下:

img

优雅上线下线

如何避免服务停机带来的业务损失?

优雅下线

当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。

在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。

img

优雅上线

启动预热

启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。

首先,在真实环境中机器都会默认开启 NTP 时间同步功能,来保证所有机器时间的一致性。

调用方通过服务发现,除了可以拿到 IP 列表,还可以拿到对应的启动时间。我们需要把这个时间作用在负载均衡上。

img

通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。

延迟暴露

服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。

为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个
Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。

具体如何实现呢?

我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的
Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。

img

限流熔断

限流算法有很多,比如最简单的计数器,还有可以做到平滑限流的滑动窗口、漏斗算法以及令牌桶算法等等。其中令牌桶算法最为常用。

服务端主要是通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时候,对部分访问量特别大的应用进行合理的限流。

服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
限流的配置并不方便,我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值。

我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

调用端可以通过熔断机制进行自我保护,防止调用下游服务出现异常,或者耗时过长影响调
用端的业务逻辑,RPC 框架可以在动态代理的逻辑中去整合熔断器,实现 RPC 框架的熔断
功能。

业务分组

img

在 RPC 里面我们可以通过分组的方式人为地给不同的调用方划分出不同的小集群,从而实现调用方流量隔离的效果,保障我们的核心业务不受非核心业务的干扰。但我们在考虑问题的时候,不能顾此失彼,不能因为新加一个的功能而影响到原有系统的稳定性。

其实我们不仅可以通过分组把服务提供方划分成不同规模的小集群,我们还可以利用分组完成一个接口多种实现的功能。正常情况下,为了方便我们自己管理服务,我一般都会建议每个接口完成的功能尽量保证唯一。但在有些特殊场景下,两个接口也会完全一样,只是具体实现上有那么一点不同,那么我们就可以在服务提供方应用里面同时暴露两个相同接口,但只是接口分组不一样罢了。

动态分组

分组可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速伸缩。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

参考资料

《RPC 实战与核心原理》笔记

别老想着怎么用好 RPC 框架,你得多花时间琢磨原理

为什么要学习 RPC

RPC 不仅是微服务的架构基础,实际上,只要涉及网络通信,就可能用到 RPC。

  • 例 1:大型分布式应用系统可能会依赖消息队列、分布式缓存、分布式数据库以及统一配置中心等,应用程序与依赖的这些中间件之间都可以通过 RPC 进行通信。比如 etcd,它作为一个统一的配置服务,客户端就是通过 gRPC 框架与服务端进行通信的。

  • 例 2:我们经常会谈到的容器编排引擎 Kubernetes,它本身就是分布式的,Kubernetes 的 kube-apiserver 与整个分布式集群中的每个组件间的通讯,都是通过 gRPC 框架进行的。

RPC 是解决分布式系统通信问题的一大利器

核心原理:能否画张图解释下 RPC 的通信流程?

什么是 RPC?

RPC 的全称是 Remote Procedure Call,即远程过程调用

RPC 的作用体现在两个方面:

  • 屏蔽远程调用跟本地调用的差异,让用户像调用本地一样去调用远程方法。
  • 隐藏底层网络通信的复杂性,让用户更专注于业务逻辑。

RPC 通信流程

RPC 是一个远程调用,因此必然需要通过网络传输数据,且 RPC 常用于业务系统之间的数据交互,需要保证其可靠性,所以 RPC 一般默认采用 TCP 协议来传输。

网络传输数据是二进制数据,因此请求方需要将请求参数转为二进制数据,即序列化

响应方接受到请求,要将二进制数据转换为请求参数,需要反序列化

请求方和响应方识别彼此的信息,需要约定好彼此数据的格式,即协议大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。

为了屏蔽底层通信细节,使用户聚焦自身业务,因此 RPC 框架一般引入了动态代理,通过依赖注入等技术,拦截方法调用,完成远程调用的通信逻辑。

RPC 在架构中的位置

RPC 框架能够帮助我们解决系统拆分后的通信问题,并且能让我们像调用本地一样去调用
远程方法。

协议:怎么设计可扩展且向后兼容的协议?

协议的作用

在传输过程中,RPC 并不会把请求参数的所有二进制数据整体一下子发送到对端机器上,中间可能会拆分成好几个数据包,也可能会合并其他请求的数据包(合并的前提是同一个 TCP 连接上的数据),至于怎么拆分合并,这其中的细节会涉及到系统参数配置和 TCP 窗口大小。对于服务提供方应用来说,他会从 TCP 通道里面收到很多的二进制数据,那这时候怎么识别出哪些二进制是第一个请求的呢?

个人理解:为了避免语义不一致的事情发生,需要为数据报文设定边界,请求方和接收方都按照设定的边界去读写数据。这类似于文章使用标点符号去断句。

为何需要设计 RPC 协议

RPC 协议对性能要求高,而公有网络协议往往数据报文较大,内容不够紧凑。

如何设计 RPC 协议?

首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。

接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。

综上,一个 RPC 协议大概会由下图中的这些参数组成:

可扩展的协议

前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参
数就会导致线上兼容问题。

为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。

序列化:对象怎么在网络中传输?

为什么需要序列化

调用方和被调用方的数据原本是对象,无法在网络中传输,必须转换为二进制数据。因此,需要一种方式来实现此过程:将对象转为二进制数据,即序列化;同时,需要根据二进制数据逆向转化为对象,即反序列化

从 RPC 的实现角度来看,序列化的作用如下图所示:

常用序列化方式

  • JDK 序列化:ObjectInputStreamObjectOutputStream
  • JSON
  • 二进制
    • Hessian
    • Protobuf
    • Thirft

RPC 协议选型

优先级依次从高到低:安全性、通用性、兼容性、性能、效率、空间开销

在序列化的选择上,与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。我们更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次我们才会去考虑性能、效率和空间开销。

使用 RPC 需要注意哪些问题

  • 对象构造得过于复杂 - 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
  • 对象过于复杂、庞大 - 入参对象与返回值对象体积不要太大,更不要传太大的集合;
  • 使用序列化框架不支持的类作为入参类 - 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
  • 对象有复杂的继承关系 - 对象不要有复杂的继承关系,最好不要有父子类的情况。

网络通信:RPC 框架在网络通信上更倾向于哪种网络 IO 模型?

常见的网络 IO 模型

常见的网络 IO 模型分为四种:同步阻塞 IO(BIO)、同步非阻塞 IO(NIO)、IO 多路复用和异步非阻塞 IO(AIO)。在这四种 IO 模型中,只有 AIO 为异步 IO,其他都是同步 IO。

IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。

IO 多路复用分为 select,poll 和 epoll。

什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。

零拷贝

系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。

网络 IO 读写流程

应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。

应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。

所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。

Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。

Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。

Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写
操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。

Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷
贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。

动态代理:面向接口编程,屏蔽 RPC 处理流程

动态代理可以帮用户屏蔽远程调用的细节,实现像调用本地一样地调用远程的体验。

JDK 支持的动态代理方式是通过实现 InvocationHandler 接口。这种方式有一定的局限性——它要求被代理的类只能是接口。原因是因为生成的代理类会继承 Proxy 类,但 Java 是不支持多重继承的。此外,它还有性能问题。它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低。

除 JDK 以外,还有其他第三方框架可以实现动态代理,如像 Javassist、Byte Buddy。

Javassist 的是通过控制底层字节码来实现动态代理,不需要反射完成调用,所以性能肯定比 JDK 的动态代理方式性能要好。

Byte Buddy 则属于后起之秀,在很多优秀的项目中,像 Spring、Jackson 都用到了 Byte Buddy 来完成底层代理。相比 Javassist,Byte Buddy 提供了更容易操作的 API,编写的代码可读性更高。更重要的是,生成的代理类执行速度比 Javassist 更快。

RPC 实战:剖析 gRPC 源码,动手实现一个完整的 RPC

架构设计:设计一个灵活的 RPC 框架

RPC 架构

其实 RPC 就是把拦截到的方法参数,转成可以在网络中传输的二进制,并保证在服务提供方能正确地还原出语义,最终实现像调用本地一样地调用远程的目的

RPC 本质上就是一个远程调用,必然需要通过网络来传输数据,为了屏蔽网络传输的复杂性,需要封装一个单独的数据传输模块用来收发二进制数据。

用户请求的时候是基于方法调用,方法出入参数都是对象数据,对象是肯定没法直接在网络中传输的,我们需要提前把它转成可传输的二进制,这就是我们说的序列化过程。但只是把方法调用参数的二进制数据传输到服务提供方是不够的,我们需要在方法调用参数的二进制数据后面增加“断句”符号来分隔出不同的请求,在两个“断句”符号中间放的内容就是我们请求的二进制数据,这个过程我们叫做协议封装。可以把这两个处理过程放在同一个模块,统称为协议模块。除此之外,我们还可以在协议模块中加入压缩功能,这是因为压缩过程也是对传输的二进制数据进行操作。

RPC 还需要为调用方找到所有的服务提供方,并需要在 RPC 里面维护好接口跟服务提供者地址的关系,这样调用方在发起请求的时候才能快速地找到对应的接收地址,这个过程即为“服务发现”。

但服务发现只是解决了接口和服务提供方地址映射关系的查找问题。但是,对于 RPC 来说,每次发送请求的时候都是需要用 TCP 连接的,相对服务提供方 IP 地址,TCP 连接状态是瞬息万变的,所以我们的 RPC 框架里面要有连接管理器去维护 TCP 连接的状态。

有了集群之后,提供方可能就需要管理好这些服务了,那我们的 RPC 就需要内置一些服务治理的功能,比如服务提供方权重的设置、调用授权等一些常规治理手段。而服务调用方需要额外做哪些事情呢?每次调用前,我们都需要根据服务提供方设置的规则,从集群中选择可用的连接用于发送请求。

RPC 可扩展架构

在 RPC 框架中,如何支持插件化架构呢?

可以使用 SPI 技术来实现。注意:由于 JDK SPI 性能不高,并且不支持自动注入,所以,一般会选择其他的 SPI 实现。

有了 SPI 支持插件式加载后,RPC 框架就变成了一个微内核架构。

服务发现:到底是要 CP 还是 AP?

RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。

  • 服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心之中,注册中心将这个服务节点的 IP 和接口保存下来。
  • 服务订阅:在服务调用方启动的时候,去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。

基于 ZooKeeper 的服务发现

使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。

搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能

img

通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。

在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:

  • 注册中心负载过高;
  • 各节点数据不一致;
  • 服务下发不及时或下发错误的服务节点列表。

RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。

基于消息总线的最终一致性的注册中心

ZooKeeper 的一大特点就是强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这也就直接导致了 ZooKeeper 集群性能上的下降。

而 RPC 框架的服务发现,在服务节点刚上线时,服务调用方是可以容忍在一段时间之后(比如几秒钟之后)发现这个新上线的节点的。毕竟服务节点刚上线之后的几秒内,甚至更长的一段时间内没有接收到请求流量,对整个服务集群是没有什么影响的,所以我们可以牺牲掉 CP(强制一致性),而选择 AP(最终一致),来换取整个注册中心集群的性能和稳定性。

img

健康检测:这个节点都挂了,为啥还要疯狂发请求?

健康检测,它能帮助我们从连接列表里面过滤掉一些存在问题的节点,避免在发请求的时候选择出有问题的节点而影响业务。

服务方状态一般有三种情况:

  1. 健康状态 - 建立连接成功,并且心跳探活也一直成功;
  2. 亚健康状态 - 建立连接成功,但是心跳请求连续失败;
  3. 死亡状态 - 建立连接失败。

设计健康检测方案的时候,不能简单地从 TCP 连接是否健康、心跳是否正常等简单维度考虑,因为健康检测的目的就是要保证“业务无损”,因此,可以加入业务请求可用率因素,这样能最大化地提升 RPC 接口可用率。

正常情况下,我们大概 30S 会发一次心跳请求,这个间隔一般不会太短,如果太短会给服务节点造成很大的压力。但是如果太长的话,又不能及时摘除有问题的节点。

路由策略:怎么让请求按照设定的规则发到不同的节点上?

服务路由是指通过一定的规则从集群中选择合适的节点。

为什么需要路由策略

服务路由通常用于以下场景,目的在于实现流量隔离:

  • 分组调用

  • 蓝绿发布

  • 灰度发布

  • 流量切换

  • 线下测试联调

  • 读写分离

路由规则

  • 条件路由:基于条件表达式的路由规则
  • 脚本路由:基于脚本语言的路由规则
  • 标签路由:将服务分组的路由规则

负载均衡:节点负载差距这么大,为什么收到的流量还一样?

负载均衡算法

  • 随机算法
    • 加权随机算法
  • 轮询算法
    • 加权轮询算法
  • 最小活跃数算法
    • 加权最小活跃数算法
  • 哈希算法
  • 一致性哈希算法

RPC 框架中的负载均衡

RPC 负载均衡所采用的策略与传统的 Web 服务负载均衡所采用策略的不同之处:

  1. 搭建负载均衡设备或 TCP/IP 四层代理,需要额外成本;
  2. 请求流量都经过负载均衡设备,多经过一次网络传输,会额外浪费一些性能;
  3. 负载均衡添加节点和摘除节点,一般都要手动添加,当大批量扩容和下线时,会有大量的人工操作,“服务发现”在操作上是个问题;
  4. 我们在服务治理的时候,针对不同接口服务、服务的不同分组,我们的负载均衡策略是需要可配的,如果大家都经过这一个负载均衡设备,就不容易根据不同的场景来配置不同的负载均衡策略了。

RPC 的负载均衡完全由 RPC 框架自身实现,RPC 的服务调用者会与“注册中心”下发的所有服务节点建立长连接,在每次发起 RPC 调用时,服务调用者都会通过配置的负载均衡插件,自主选择一个服务节点,发起 RPC 调用请求。

如何设计自适应的负载均衡

那服务调用者节点又该如何判定一个服务节点的处理能力呢?

可以采用一种打分制的策略,服务调用者收集与之建立长连接的每个服务节点的指标数据,如服务节点的负载指标、CPU 核数、内存大小、请求处理的耗时指标(如请求平均耗时、TP99、TP999)、服务节点的状态指标(如正常、亚健康)。通过这些指标,计算出一个分数,比如总分 10 分,如果 CPU 负载达到 70%,就减它 3 分,当然了,减 3 分只是个类比,需要减多少分是需要一个计算策略的。

然后,根据不同指标的重要程度设置权重,然后累加,计算公式:

1
健康值 = 指标值1 * 权重1 + 指标值2 * 权重2 + ...

服务调用者给每个服务节点都打完分之后,会发送请求,那这时候我们又该如何根据分数去控制给每个服务节点发送多少流量呢?

关键步骤

  1. 添加服务指标收集器,并将其作为插件,默认有运行时状态指标收集器、请求耗时指标收集器。
  2. 运行时状态指标收集器收集服务节点 CPU 核数、CPU 负载以及内存等指标,在服务调用者与服务提供者的心跳数据中获取。
  3. 请求耗时指标收集器收集请求耗时数据,如平均耗时、TP99、TP999 等。
  4. 可以配置开启哪些指标收集器,并设置这些参考指标的指标权重,再根据指标数据和指标权重来综合打分。
  5. 通过服务节点的综合打分与节点的权重,最终计算出节点的最终权重,之后服务调用者会根据随机权重的策略,来选择服务节点。

异常重试:在约定时间内安全可靠地重试

异常重试

就是当调用端发起的请求失败时,RPC 框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试的次数。

当然,不是所有的异常都要触发重试,只有符合重试条件的异常才能触发重试,比如网络超时异常、网络连接异常等等(这个需要 RPC 去判定)。

注意:有时网络可能发生抖动,导致请求超时,这时如果 RPC 触发超时重试,会触发业务逻辑重复执行,如果接口没有幂等性设计,就可能引发问题。如:重发写表。

重试超时时间

连续的异常重试可能会出现一种不可靠的情况,那就是连续的异常重试并且每次处理的请求时间比较长,最终会导致请求处理的时间过长,超出用户设置的超时时间。

解决这个问题最直接的方式就是,在每次重试后都重置一下请求的超时时间。

当调用端发起 RPC 请求时,如果发送请求发生异常并触发了异常重试,我们可以先判定下这个请求是否已经超时,如果已经超时了就直接返回超时异常,否则就先重置下这个请求的超时时间,之后再发起重试。

在所有发起重试、负载均衡选择节点的时候,去掉重试之前出现过问题的那个节点,以保证重试的成功率。

业务异常

RPC 框架是不会知道哪些业务异常能够去进行异常重试的,我们可以加个重试异常的白名单,用户可以将允许重试的异常加入到这个白名单中。当调用端发起调用,并且配置了异常重试策略,捕获到异常之后,我们就可以采用这样的异常处理策略。如果这个异常是 RPC 框架允许重试的异常,或者这个异常类型存在于可重试异常的白名单中,我们就允许对这个请求进行重试。


综上,一个可靠的 RPC 容错处理机制如下:

img

优雅关闭:如何避免服务停机带来的业务损失?

优雅关闭:如何避免服务停机带来的业务损失?

在服务重启的时候,对于调用方来说,可能会存在以下几种情况:

  • 调用方发请求前,目标服务已经下线。对于调用方来说,跟目标节点的连接会断开,这时候调用方可以立马感知到,并且在其健康列表里面会把这个节点挪掉,自然也就不会被负载均衡选中。
  • 调用方发请求的时候,目标服务正在关闭。但调用方并不知道它正在关闭,而且两者之间的连接也没断开,所以这个节点还会存在健康列表里面,因此该节点就有一定概率会被负载均衡选中。

当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。

但如果只是靠等待被动调用,就会让这个关闭过程整体有点漫长。因为有的调用方那个时刻没有业务请求,就不能及时地通知调用方了,所以我们可以加上主动通知流程,这样既可以保证实时性,也可以避免通知失败的情况。

如何捕获到关闭事件呢?在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。

优雅启动:如何避免流量打到没有启动完成的节点?

优雅启动:如何避免流量打到没有启动完成的节点?

运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在 Java 里面,在运行过程中,JVM 虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到 JVM 缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。

但是这些“临时数据”,都在应用重启后就消失了。如果让刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。

启动预热

启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。

首先,对于调用方来说,我们要知道服务提供方启动的时间。有两种方法:

  • 一种是服务提供方在启动的时候,把自己启动的时间告诉注册中心;
  • 另外一种就是注册中心收到的服务提供方的请求注册时间。

怎么确保所有机器的日期时间是一样的?在真实环境中机器都会默认开启 NTP 时间同步功能,来保证所有机器时间的一致性。

最终的结果就是,调用方通过服务发现,除了可以拿到 IP 列表,还可以拿到对应的启动时间。我们需要把这个时间作用在负载均衡上。

通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。

延迟暴露

服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。

为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个
Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。

具体如何实现呢?

我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的
Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。

熔断限流:业务如何实现自我保护

限流

限流算法

  • 计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶

限流要点

服务端主要是通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时候,对部分访问量特别大的应用进行合理的限流。

  • 服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
    限流的配置并不方便。
  • 我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值;
  • 我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

服务提供方主要通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时,对部分访问量特别大的应用进行合理的限流。

服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
限流的配置并不方便。我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值。

我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

熔断

调用端可以通过熔断机制进行自我保护,防止调用下游服务出现异常,或者耗时过长影响调用端的业务逻辑,RPC 框架可以在动态代理的逻辑中去整合熔断器,实现 RPC 框架的熔断功能。

熔断器的工作机制主要是关闭、打开和半打开这三个状态之间的切换。在正常情况下,熔断器是关闭的;当调用端调用下游服务出现异常时,熔断器会收集异常指标信息进行计算,当达到熔断条件时熔断器打开,这时调用端再发起请求是会直接被熔断器拦截,并快速地执行失败逻辑;当熔断器打开一段时间后,会转为半打开状态,这时熔断器允许调用端发送一个请求给服务端,如果这次请求能够正常地得到服务端的响应,则将状态置为关闭状态,否则
设置为打开。

业务分组:如何隔离流量?

img

在 RPC 里面我们可以通过分组的方式人为地给不同的调用方划分出不同的小集群,从而实现调用方流量隔离的效果,保障我们的核心业务不受非核心业务的干扰。但我们在考虑问题的时候,不能顾此失彼,不能因为新加一个的功能而影响到原有系统的稳定性。

其实我们不仅可以通过分组把服务提供方划分成不同规模的小集群,我们还可以利用分组完成一个接口多种实现的功能。正常情况下,为了方便我们自己管理服务,我一般都会建议每个接口完成的功能尽量保证唯一。但在有些特殊场景下,两个接口也会完全一样,只是具体实现上有那么一点不同,那么我们就可以在服务提供方应用里面同时暴露两个相同接口,但只是接口分组不一样罢了。

动态分组

分组可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速伸缩。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

异步 RPC:压榨单机吞吐量

异步 RPC:压榨单机吞吐量

影响到 RPC 调用的吞吐量的主要原因就是服务端的业务逻辑比较耗时,并且 CPU 大部分时间都在等待而没有去计算,导致 CPU 利用率不够,而提升单机吞吐量的最好办法就是使用异步 RPC。

RPC 框架的异步策略主要是调用端异步与服务端异步。调用端的异步就是通过 Future 方式实现异步,调用端发起一次异步请求并且从请求上下文中拿到一个 Future,之后通过 Future 的 get 方法获取结果,如果业务逻辑中同时调用多个其它的服务,则可以通过 Future 的方式减少业务逻辑的耗时,提升吞吐量。服务端异步则需要一种回调方式,让业务逻辑可以异步处理,之后调用 RPC 框架提供的回调接口,将最终结果异步通知给调用端。

另外,我们可以通过对 CompletableFuture 的支持,实现 RPC 调用在调用端与服务端之间的完全异步,同时提升两端的单机吞吐量。

此外,RPC 框架也可以有其它的异步策略,比如集成 RxJava,再比如 gRPC 的 StreamObserver 入参对象,但 CompletableFuture 是 Java8 原生提供的,无代码入侵性,并且在使用上更加方便。

安全体系:如何建立可靠的安全体系?

RPC 是解决应用间互相通信的框架,而应用之间的远程调用过程一般不会暴露在公网,换句话讲就是说 RPC 一般用于解决内部应用之间的通信,而这个“内部”是指应用都部署在同一个大局域网内。相对于公网环境,局域网的隔离性更好,也就相对更安全,所以在 RPC 里面我们很少考虑像数据包篡改、请求伪造等恶意行为。

对于 RPC 来说,需要关心的安全问题不会有公网应用那么复杂,我们只要保证让服务调用方能拿到真实的服务提供方 IP 地址集合,且服务提供方可以管控调用自己的应用就够了(比如颁发数字签名)。

分布式环境下如何快速定位问题?

问题定位:链路追踪

链路追踪要点

  • traceId:用于表示一次完整的请求
  • spanId:用于标识一次 RPC 调用在分布式请求中的位置
  • annonation:业务自定义埋点数据

链路追踪理论

链路追踪代表产品

  • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
  • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
  • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
  • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。

详解时钟轮在 RPC 中的应用

无论是同步调用还是异步调用,调用端内部实行的都是异步,而调用端在向服务端发送消息之前会创建一个 Future,并存储这个消息标识与这个 Future 的映射,当服务端收到消息并且处理完毕后向调用端发送响应消息,调用端在接收到消息后会根据消息的唯一标识找到这个 Future,并将结果注入给这个 Future。

一般定时任务方案的缺点

  1. 方案一:每创建一个 Future 都启动一个线程,之后 sleep,到达超时时间就触发请求超时的处理逻辑。
  • 缺点:需要创建大量线程。例如:高并发场景下,单机可能每秒要发送数万次请求,请求超时时间设置的是 5 秒,那我们要创建多少个线程用来执行超时任务呢?超过 10 万个线程!
  1. 方案二:用一个线程来处理所有的定时任务,不断轮询定时任务。假设一个线程每隔 100 毫秒会扫描一遍所有的处理 Future 超时的任务,当发现一个 Future 超时了,我们就执行这个任务,对这个 Future 执行超时逻辑。
  • 缺点:很浪费 CPU。高并发场景下,如果调用端刚好在 1 秒内发送了 1 万次请求,这 1 万次请求要在 5 秒后才会超时,那么那个扫描的线程在这个 5 秒内就会不停地对这 1 万个任务进行扫描遍历,要额外扫描 40 多次(每 100 毫秒扫描一次,5 秒内要扫描近 50 次),很浪费 CPU。

时钟轮方案

在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度,而时钟轮就相当于秒针与分针等跳动的一个周期,我们会将每个任务放到对应的时间槽位上。

时钟轮的运行机制和生活中的时钟也是一样的,每隔固定的单位时间,就会从一个时间槽位跳到下一个时间槽位,这就相当于我们的秒针跳动了一次;时钟轮可以分为多层,下一层时钟轮中每个槽位的单位时间是当前时间轮整个周期的时间,这就相当于 1 分钟等于 60 秒钟;当时钟轮将一个周期的所有槽位都跳动完之后,就会从下一层时钟轮中取出一个槽位的任务,重新分布到当前的时钟轮中,当前时钟轮则从第 0 槽位从新开始跳动,这就相当于
下一分钟的第 1 秒。

时钟轮在 RPC 中的应用

  • 调用端请求超时处理:每发一次请求,都创建一个处理请求超时的定时任务放到时钟轮里,在高并发、高访问量的情况下,时钟轮每次只轮询一个时间槽位中的任务,这样会节省大量的 CPU。
  • 调用端与服务端启动超时也可以应用到时钟轮:以调用端为例,假设我们想要让应用可以快速地部署,例如 1 分钟内启动,如果超过 1 分钟则启动失败。我们可以在调用端启动时创建一个处理启动超时的定时任务,放到时钟轮里。
  • 定时心跳:RPC 框架调用端定时向服务端发送心跳,来维护连接状态,我们可以将心跳的逻辑封装为一个心跳任务,放到时钟轮里。

流量回放:保障业务技术升级的神器

实际情况就是我们不仅要保障已有业务的稳定,还需要快速去完成各种新业务的需求,这期间我们的应用代码就会经常发生变化,而发生变化后就可能会引入新的不稳定因素,而且这个过程会一直持续不断发生。

为了保障应用升级后,我们的业务行为还能保持和升级前一样,我们在大多数情况下都是依靠已有的 TestCase 去验证,但这种方式在一定程度上并不是完全可靠的。最可靠的方式就是引入线上 Case 去验证改造后的应用,把线上的真实流量在改造后的应用里面进行回放,这样不仅节省整个上线时间,还能弥补手动维护 Case 存在的缺陷。

应用引入了 RPC 后,所有的请求流量都会被 RPC 接管,所以我们可以很自然地在 RPC 里面支持流量回放功能。虽然这个功能本身并不是 RPC 的核心功能,但对于使用 RPC 的人来说,他们有了这个功能之后,就可以更放心地升级自己的应用了。

动态分组:超高效实现秒级扩缩容

分组后带来的收益,它可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速扩缩容。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

如何在没有接口的情况下进行 RPC 调用?

应用场景

(1)测试平台:各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。

(2)轻量级的服务网关:可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。服务网关要作为所有 RPC 服务的调用端,是不能依赖所有服务提供方的接口 API 的,也需要调用端在没有服务提供方提供接口的情况下,仍然可以正常地发起 RPC 调用。

如何泛化调用

所谓的 RPC 调用,本质上就是调用端向服务端发送一条请求消息,服务端接收并处理,之后向调用端发送一条响应消息,调用端处理完响应消息之后,一次 RPC 调用就完成了。只要调用端将服务端需要知道的信息,如接口名、业务分组名、方法名以及参数信息等封装成请求消息发送给服务端,服务端就能够解析并处理这条请求消息,这样问题就解决了。

泛化调用接口示例:

1
2
3
4
class GenericService {
Object $invoke(String methodName, String[] paramTypes, Object[] params);
CompletableFuture<Object> $asyncInvoke(String methodName, String[] paramTypes
}

如何在线上环境里兼容多种 RPC 协议?

业界有很多 RPC 框架,如:Dubbo、Hessian、gRPC 等,它们随着技术发展逐渐涌现出来。不同时期、不同项目为了解决自身的通信问题,可能会选择不同的 RPC 框架。

对于一个公司来说,不同的 RPC 框架,会使得维护成本变高。所以,如果想缩减维护成本,自然会想到统一 RPC 框架。

但这面临的重要问题是:如果直接切换 RPC 框架,会导致新旧 RPC 框架的服务无法通信,从而造成业务损失。为此,一个折中的方案就是:先不移除原有的 RPC 框架,但同时接入新的 RPC 框架,让两种 RPC 同时提供服务,然后等所有的应用都接入完新的 RPC 以后,再让所有的应用逐步接入到新的 RPC 上。

在保持原有 RPC 使用方式不变的情况下,同时引入新的 RPC 框架的思路,是可以让所有的应用最终都能升级到我们想要升级的 RPC 上,但对于开发人员来说,这样切换成本还是有点儿高,整个过程最少需要两次上线才能彻底地把应用里面的旧 RPC 都切换成新 RPC。还有一种方案:要让新的 RPC 能同时支持多种 RPC 调用,当一个调用方切换到新的 RPC 之后,调用方和服务提供方之间就可以用新的协议完成调用;当调用方还是用老的 RPC 进行调用的话,调用方和服务提供方之间就继续沿用老的协议完成调用。

如何优雅处理多协议

每种协议约定的数据包格式是不一样的,而且每种协议开头都有一个协议编码,我们一般叫做 magic number。

当 RPC 收到了数据包后,我们可以先解析出 magic number 来。获取到 magic number 后,我们就很容易地找到对应协议的数据格式,然后用对应协议的数据格式去解析收到的二进制数据包。

协议解析过程就是把一连串的二进制数据变成一个 RPC 内部对象,但这个对象一般是跟协议相关的,所以为了能让 RPC 内部处理起来更加方便,我们一般都会把这个协议相关的对象转成一个跟协议无关的 RPC 对象。这是因为在 RPC 流程中,当服务提供方收到反序列化后的请求的时候,我们需要根据当前请求的参数找到对应接口的实现类去完成真正的方法调用。如果这个请求参数是跟协议相关的话,那后续 RPC 的整个处理逻辑就会变得很复杂。

当完成了真正的方法调用以后,RPC 返回的也是一个跟协议无关的通用对象,所以在真正往调用方写回数据的时候,我们同样需要完成一个对象转换的逻辑,只不过这时候是把通用对象转成协议相关的对象。

参考资料

SpringBoot Actuator 快速入门

spring-boot-actuator 模块提供了 Spring Boot 的所有生产就绪功能。启用这些功能的推荐方法是添加 spring-boot-starter-actuator 依赖。

如果是 Maven 项目,添加以下依赖:

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

如果是 Gradle 项目,添加以下声明:

1
2
3
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
}

端点(Endpoint)

Actuator Endpoint 使 Spring Boot 用户可以监控应用,并和应用进行交互。Spring Boot 内置了许多 端点,并允许用户自定义端点。例如,health 端点提供基本的应用健康信息。

用户可以启用或禁用每个单独的端点并通过 HTTP 或 JMX 暴露它们(使它们可远程访问)。当端点被启用和公开时,它被认为是可用的。内置端点仅在可用时才会自动配置。大多数应用程序选择通过 HTTP 公开。例如,默认情况下,health 端点映射到 /actuator/health

启用端点

默认情况下,除了 shutdown 之外的所有端点都已启用。要配置端点的启用,请使用 management.endpoint.<id>.enabled 属性。以下示例启用 shutdown 端点:

1
management.endpoint.shutdown.enabled=true

如果您希望端点是明确指定才启用,请将 management.endpoints.enabled-by-default 属性设置为 false 并根据需要明确指定启用的端点,以下为示例:

1
2
management.endpoints.enabled-by-default=false
management.endpoint.info.enabled=true

暴露端点

由于端点可能包含敏感信息,您应该仔细考虑何时暴露它们。下表显示了内置端点的默认曝光:

ID JMX Web
auditevents Yes No
beans Yes No
caches Yes No
conditions Yes No
configprops Yes No
env Yes No
flyway Yes No
health Yes Yes
heapdump N/A No
httptrace Yes No
info Yes No
integrationgraph Yes No
jolokia N/A No
logfile N/A No
loggers Yes No
liquibase Yes No
metrics Yes No
mappings Yes No
prometheus N/A No
quartz Yes No
scheduledtasks Yes No
sessions Yes No
shutdown Yes No
startup Yes No
threaddump Yes No

要更改暴露的端点,请使用以下特定于技术的包含和排除属性:

Property Default
management.endpoints.jmx.exposure.exclude
management.endpoints.jmx.exposure.include *
management.endpoints.web.exposure.exclude
management.endpoints.web.exposure.include health

include 属性列出了暴露的端点的 ID。 exclude 属性列出了不应暴露的端点的 ID。 exclude 属性优先于 include 属性。您可以使用端点 ID 列表配置包含和排除属性。

例如,仅暴露 health 和 info 端点,其他端点都不通过 JMX 暴露,可以按如下配置:

1
management.endpoints.jmx.exposure.include=health,info

注意:* 可用于选择所有端点。

安全

出于安全考虑,只有 /health 端点会通过 HTTP 方式暴露。用户可以通过 management.endpoints.web.exposure.include 决定哪些端点可以通过 HTTP 方式暴露。

如果 Spring Security 在类路径上并且不存在其他 WebSecurityConfigurerAdapterSecurityFilterChain bean,则除 /health 之外的所有 actuator 都由 Spring Boot 自动启用安全控制。如果用户自定义了 WebSecurityConfigurerAdapterSecurityFilterChain bean,Spring Boot 不再启用安全控制,由用户自行控制访问规则。

如果您希望为 HTTP 端点定义安全控制(例如,只允许具有特定角色的用户访问它们),Spring Boot 提供了一些方便的 RequestMatcher 对象,您可以将它们与 Spring Security 结合使用。

下面是一个典型的 Spring Security 配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().hasRole("ENDPOINT_ADMIN"));
http.httpBasic();
return http.build();
}

}

前面的示例使用 EndpointRequest.toAnyEndpoint() 将请求匹配到任何端点,然后确保所有端点都具有 ENDPOINT_ADMIN 角色。 EndpointRequest 上还提供了其他几种匹配器方法。

如果希望无需身份验证即可访问所有执行器端点。可以通过更改 management.endpoints.web.exposure.include 属性来做到这一点,如下所示:

1
management.endpoints.web.exposure.include=*

此外,如果存在 Spring Security,您将需要添加自定义安全配置,以允许未经身份验证的访问端点,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().permitAll());
return http.build();
}

}

由于 Spring Boot 依赖于 Spring Security 的默认设置,因此 CSRF 保护默认开启。这意味着在使用默认安全配置时,需要 POST(关闭和记录器端点)、PUT 或 DELETE 的执行器端点会收到 403(禁止)错误。

建议仅在创建非浏览器客户端使用的服务时完全禁用 CSRF 保护。

配置端点

端点会自动缓存对不带任何参数的读操作的响应数据。要配置端点缓存响应的时间量,请使用其 cache.time-to-live 属性。以下示例将 bean 端点缓存的生存时间设置为 10 秒:

1
management.endpoint.beans.cache.time-to-live=10s

Actuator Web 端点的超媒体

Spring Boot Actuator 中内置了一个“发现页面”端点,其中包含了所有端点的链接。默认情况下,“发现页面”在 /actuator 上可用。

要禁用“发现页面”,请将以下属性添加到您的应用程序属性中:

1
management.endpoints.web.discovery.enabled=false

配置自定义管理上下文路径后,“发现页面”会自动从 /actuator 移动到应用管理上下文的根目录。例如,如果管理上下文路径是 /management,则发现页面可从 /management 获得。当管理上下文路径设置为 / 时,发现页面被禁用以防止与其他映射发生冲突的可能性。

跨域支持

CORS 是一种 W3C 规范,可让用户以灵活的方式指定授权哪种跨域请求。如果使用 Spring MVC 或 Spring WebFlux,则可以配置 Actuator 的 Web 端点以支持此类场景。

CORS 支持默认是禁用的,只有在设置 management.endpoints.web.cors.allowed-origins 属性后才会启用。以下配置允许来自 example.com 域的 GET 和 POST 调用:

1
2
management.endpoints.web.cors.allowed-origins=https://example.com
management.endpoints.web.cors.allowed-methods=GET,POST

自定义端点

如果添加带有 @Endpoint 注释的 @Bean,则任何带有 @ReadOperation@WriteOperation@DeleteOperation 注释的方法都会自动通过 JMX 公开,并且在 Web 应用程序中,也可以通过 HTTP 公开。可以使用 Jersey、Spring MVC 或 Spring WebFlux 通过 HTTP 公开端点。如果 Jersey 和 Spring MVC 都可用,则使用 Spring MVC。

以下示例公开了一个返回自定义对象的读取操作:

1
2
3
4
@ReadOperation
public CustomData getData() {
return new CustomData("test", 5);
}

您还可以使用 @JmxEndpoint@WebEndpoint 编写特定技术的端点。这些端点仅限于各自的技术。例如,@WebEndpoint 仅通过 HTTP 而不是通过 JMX 公开。

您可以使用 @EndpointWebExtension@EndpointJmxExtension 编写特定技术的扩展。这些注释让您可以提供特定技术的操作来扩充现有端点。

最后,如果您需要访问 Web 框架的功能,您可以实现 servlet 或 Spring @Controller@RestController 端点,但代价是它们无法通过 JMX 或使用不同的 Web 框架获得。

通过 HTTP 进行监控和管理

自定义管理端点路径

如果是 Web 应用,Spring Boot Actuator 会自动将所有启用的端点通过 HTTP 方式暴露。默认约定是使用前缀为 /actuator 的端点的 id 作为 URL 路径。例如,健康被暴露为 /actuator/health

有时,自定义管理端点的前缀很有用。例如,您的应用程序可能已经将 /actuator 用于其他目的。您可以使用 management.endpoints.web.base-path 属性更改管理端点的前缀,如以下示例所示:

1
management.endpoints.web.base-path=/manage

该示例将端点从 /actuator/{id} 更改为 /manage/{id}(例如,/manage/info)。

自定义管理服务器端口

1
management.server.port=8081

配置 SSL

当配置为使用自定义端口时,还可以使用各种 management.server.ssl.* 属性为管理服务器配置自己的 SSL。例如,这样做可以让管理服务器在主应用程序使用 HTTPS 时通过 HTTP 可用,如以下属性设置所示:

1
2
3
4
5
6
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:store.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=false

或者,主服务器和管理服务器都可以使用 SSL,但使用不同的密钥存储,如下所示:

1
2
3
4
5
6
7
8
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:main.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=true
management.server.ssl.key-store=classpath:management.jks
management.server.ssl.key-password=secret

自定义管理服务器地址

1
2
management.server.port=8081
management.server.address=127.0.0.1

禁用 HTTP 端点

如果您不想通过 HTTP 方式暴露端点,可以将管理端口设置为 -1,如以下示例所示:

1
management.server.port=-1

也可以通过使用 management.endpoints.web.exposure.exclude 属性来实现这一点,如以下示例所示:

1
management.endpoints.web.exposure.exclude=*

通过 JMX 进行监控和管理

Java 管理扩展 (JMX) 提供了一种标准机制来监视和管理应用程序。默认情况下,此功能未启用。您可以通过将 spring.jmx.enabled 配置属性设置为 true 来打开它。 Spring Boot 将最合适的 MBeanServer 暴露为 ID 为 mbeanServer 的 bean。使用 Spring JMX 注释(@ManagedResource@ManagedAttribute@ManagedOperation)注释的任何 bean 都会暴露给它。

如果您的平台提供标准 MBeanServer,则 Spring Boot 会使用该标准并在必要时默认使用 VM MBeanServer。如果一切都失败了,则创建一个新的 MBeanServer

有关更多详细信息,请参阅 JmxAutoConfiguration 类。

默认情况下,Spring Boot 还将管理端点公开为 org.springframework.boot 域下的 JMX MBean。要完全控制 JMX 域中的端点注册,请考虑注册您自己的 EndpointObjectNameFactory 实现。

定制化 MBean Names

MBean 的名称通常由端点的 id 生成。例如,健康端点公开为 org.springframework.boot:type=Endpoint,name=Health

如果您的应用程序包含多个 Spring ApplicationContext,您可能会发现名称冲突。要解决此问题,您可以将 spring.jmx.unique-names 属性设置为 true,以便 MBean 名称始终是唯一的。

如果需要定制,跨域按如下配置:

1
2
spring.jmx.unique-names=true
management.endpoints.jmx.domain=com.example.myapp

禁用 JMX 端点

想禁用 JMX 端点,可以按如下配置:

1
management.endpoints.jmx.exposure.exclude=*

将 Jolokia 用于基于 HTTP 的 JMX

Jolokia 是一个 JMX-HTTP 的桥接工具,它提供了另一种访问 JMX bean 的方法。要使用 Jolokia,需要先添加依赖:

1
2
3
4
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-core</artifactId>
</dependency

然后,您可以通过将 jolokia* 添加到 Management.Endpoints.web.exposure.include 属性来暴露 Jolokia 端点。然后,您可以在管理 HTTP 服务器上使用 /actuator/jolokia 访问它。

日志

Spring Boot Actuator 支持查看和配置应用日志级别。

日志级别的可选值如下:

  • TRACE
  • DEBUG
  • INFO
  • WARN
  • ERROR
  • FATAL
  • OFF
  • null

null 表示没有显式配置。

指标

审计

Spring Boot Actuator 支持简单的审计功能。如果应用中启用了 Spring Security,Spring Boot Actuator 就会发布安全事件(如:“身份验证成功”、“失败”和“访问被拒绝”异常)。

可以通过在应用的配置中提供 AuditEventRepository 类型的 bean 来启用审计。为方便起见,Spring Boot 提供了一个 InMemoryAuditEventRepositoryInMemoryAuditEventRepository 的功能有限,建议仅将其用于开发环境。

如果要自定义安全事件,可以提供 AbstractAuthenticationAuditListenerAbstractAuthorizationAuditListener 实现。

此外,还可以将审计服务用于业务活动。为此,要么将 AuditEventRepository bean 注入组件并直接使用它,要么使用 Spring ApplicationEventPublisher 发布 AuditApplicationEvent(通过实现 ApplicationEventPublisherAware)。

HTTP 追踪

用户可以通过在应用中提供 HttpTraceRepository 类型的 bean 来启用 HTTP 跟踪。Spring Boot 提供了内置的 InMemoryHttpTraceRepository,它可以存储最近 100 次(默认)请求-响应的追踪数据。与其他 HTTP 追踪解决方案相比,InMemoryHttpTraceRepository 比较受限,建议仅用于开发环境。对于生产环境,建议使用 Zipkin 或 Spring Cloud Sleuth。

或者,可以自定义 HttpTraceRepository

处理监控

在 spring-boot 模块中,您可以找到两个类来创建对进程监控有用的文件:

  • ApplicationPidFileWriter 创建一个包含应用程序 PID 的文件(默认情况下,在应用程序目录中,文件名为 application.pid)。
  • WebServerPortFileWriter 创建一个文件(或多个文件),其中包含正在运行的 Web 服务器的端口(默认情况下,在应用程序目录中,文件名为 application.port)。

参考资料

分布式分区

在不同系统中,分区有着不同的称呼,例如它对应于 MongoDB, Elasticsearch 和 SolrCloud 中的 shard, HBase 的 region, Bigtable 中的
tablet, Cassandra 和 Riak 中的 vnode ,以及 Couch base 中的 vBucket。总体而言,分区是最普遍的术语。

分区通常是这样定义的,即每一条数据(或者每条记录,每行或每个文档)只属于某个特定分区。实际上,每个分区都可以视为一个完整的小型数据库,虽然数据库可能存在一些跨分区的操作。

采用数据分区的主要目的是提高可扩展性。不同的分区可以放在一个无共享集群的不同节点上,这样一个大数据集可以分散在更多的磁盘上,查询负载也随之分布到更多的处理器上。

对单个分区进行查询时,每个节点对自己所在分区可以独立执行查询操作,因此添加更多的节点可以提高查询吞吐量。超大而复杂的查询尽管比较困难,但也可能做到跨节点的并行处理。

数据分区与数据复制

分区通常与复制结合使用,即每个分区在多个节点都存有副本。这意味着某条记录属于特定的分区,而同样的内容会保存在不同的节点上以提高系统的容错性。

一个节点上可能存储了多个分区。每个分区都有自己的主副本,例如被分配给某节点,而从副本则分配在其他一些节点。一个节点可能既是某些分区的主副本,同时又是其他分区的从副本。

键-值数据的分区

分区的主要目标是将数据和查询负载均匀分布在所有节点上。如果节点平均分担负载,那么理论上 10 个节点应该能够处理 10 倍的数据量和 10 倍于单个节点的读写吞吐量(忽略复制) 。

而如果分区不均匀,则会出现某些分区节点比其他分区承担更多的数据量或查询负载,称之为倾斜。倾斜会导致分区效率严重下降,在极端情况下,所有的负载可能会集中在一个分区节点上,这就意味着 10 个节点 9 个空闲,系统的瓶颈在最繁忙的那个节点上。这种负载严重不成比例的分区即成为系统热点。

基于关键字区间分区

一种分区方式是为每个分区分配一段连续的关键字或者关键宇区间范围(以最小值和最大值来指示)。

关键字的区间段不一定非要均匀分布,这主要是因为数据本身可能就不均匀。

每个分区内可以按照关键字排序保存(参阅第 3 章的“ SSTables 和 LSM Trees ”)。这样可以轻松支持区间查询,即将关键字作为一个拼接起来的索引项从而一次查询得到多个相关记录。

然而,基于关键字的区间分区的缺点是某些访问模式会导致热点。如果关键字是时间戳,则分区对应于一个时间范围,所有的写入操作都集中在同一个分区(即当天的分区),这会导致该分区在写入时负载过高,而其他分区始终处于空闲状态。为了避免上述问题,需要使用时间戳以外的其他内容作为关键字的第一项。

基于关键字晗希值分区

一且找到合适的关键宇 H 合希函数,就可以为每个分区分配一个哈希范围(而不是直接作用于关键宇范围),关键字根据其哈希值的范围划分到不同的分区中。

img

这种方总可以很好地将关键字均匀地分配到多个分区中。分区边界可以是均匀间隔,也可以是伪随机选择( 在这种情况下,该技术有时被称为一致性哈希) 。

然而,通过关键宇 II 合希进行分区,我们丧失了良好的区间查询特性。即使关键字相邻,但经过哈希之后会分散在不同的分区中,区间查询就失去了原有的有序相邻的特性。

负载倾斜与热点

基于哈希的分区方能可以减轻热点,但无住做到完全避免。一个极端情况是,所有的读/写操作都是针对同一个关键字,则最终所有请求都将被路由到同一个分区。

一个简单的技术就是在关键字的开头或结尾处添加一个随机数。只需一个两位数的十进制随机数就可以将关键字的写操作分布到 100 个不同的关键字上,从而分配到不同的分区上。但是,随之而来的问题是,之后的任何读取都需要些额外的工作,必须从所有 100 个关键字中读取数据然后进行合井。因此通常只对少量的热点关键字附加随机数才有意义。

分区与二级索引

二级索引通常不能唯一标识一条记录,而是用来加速特定值的查询。

二级索引带来的主要挑战是它们不能规整的地映射到分区中。有两种主要的方法来支持对二级索引进行分区:基于文档的分区和基于词条的分区。

基于文档分区的二级索引

img

在这种索引方法中,每个分区完全独立,各自维护自己的二级索引,且只负责自己分区内的文档而不关心其他分区中数据。每当需要写数据库时,包括添加,删除或更新文档等,只需要处理包含目标文档 ID 的那一个分区。因此文档分区索引也被称为本地索引,而不是全局索引。

这种查询分区数据库的方陆有时也称为分散/聚集,显然这种二级索引的查询代价高昂。即使采用了并行查询,也容易导致读延迟显著放大。尽管如此,它还是广泛用于实践: MongoDB 、Riak、Cassandra、Elasticsearch 、SolrCloud 和 VoltDB 都支持基于文档分区二级索引。

基于词条的二级索引分区

可以对所有的数据构建全局索引,而不是每个分区维护自己的本地索引。

为避免成为瓶颈,不能将全局索引存储在一个节点上,否则就破坏了设计分区均衡的目标。所以,全局索引也必须进行分区,且可以与数据关键字采用不同的分区策略。

img

可以直接通过关键词来全局划分索引,或者对其取哈希值。直接分区的好处是可以支持高效的区间查询;而采用哈希的方式则可以更均句的划分分区。

这种全局的词条分区相比于文档分区索引的主要优点是,它的读取更为高效,即它不需要采用 scatter/gather 对所有的分区都执行一遍查询。

然而全局索引的不利之处在于, 写入速度较慢且非常复杂,主要因为单个文档的更新时,里面可能会涉及多个二级索引,而二级索引的分区又可能完全不同甚至在不同的节点上,由此势必引人显著的写放大。

理想情况下,索引应该时刻保持最新,即写入的数据要立即反映在最新的索引上。但是,对于词条分区来讲,这需要一个跨多个相关分区的分布式事务支持,写入速度会受到极大的影响,所以现有的数据库都不支持同步更新二级索引。

分区再均衡

随着时间的推移,数据库可能总会出现某些变化:

  • 查询压力增加,因此需要更多的 C PU 来处理负载。
  • 数据规模增加,因此需要更多的磁盘和内存来存储数据。
  • 节点可能出现故障,因此需要其他机器来接管失效的节点。

所有这些变化都要求数据和请求可以从一个节点转移到另一个节点。这样一个迁移负载的过程称为再平衡(或者动态平衡)。无论对于哪种分区方案, 分区再平衡通常至少要满足:

  • 平衡之后,负载、数据存储、读写请求等应该在集群范围更均匀地分布。
  • 再平衡执行过程中,数据库应该可以继续正常提供读写服务。
  • 避免不必要的负载迁移,以加快动态再平衡,井尽量减少网络和磁盘 I/O 影响。

动态再平衡的策略

为什么不用取模?

最好将哈希值划分为不同的区间范围,然后将每个区间分配给一个分区。

为什么不直接使用 mod,对节点数取模方怯的问题是,如果节点数 N 发生了变化,会导致很多关键字需要从现有的节点迁移到另一个节点。

固定数量的分区

创建远超实际节点数的分区数,然后为每个节点分配多个分区。

接下来, 如果集群中添加了一个新节点,该新节点可以从每个现有的节点上匀走几个分区,直到分区再次达到全局平衡。

动态分区

对于采用关键宇区间分区的数据库,如果边界设置有问题,最终可能会出现所有数据都挤在一个分区而其他分区基本为空,那么设定固定边界、固定数量的分区将非常不便:而手动去重新配置分区边界又非常繁琐。

因此, 一些数据库如 HBas e 和 RethinkDB 等采用了动态创建分区。当分区的数据增长超过一个可配的参数阔值( HBase 上默认值是 lOGB ),它就拆分为两个分区,每个承担一半的数据量[26]。相反,如果大量数据被删除,并且分区缩小到某个阑值以下,则将其与相邻分区进行合井。

每个分区总是分配给一个节点,而每个节点可以承载多个分区,这点与固定数量的分区一样。当一个大的分区发生分裂之后,可以将其中的一半转移到其他某节点以平衡负载。

但是,需要注意的是,对于一个空的数据库, 因为没有任何先验知识可以帮助确定分区的边界,所以会从一个分区开始。可能数据集很小,但直到达到第一个分裂点之前,所有的写入操作都必须由单个节点来处理, 而其他节点则处于空闲状态。

按节点比例分区

采用动态分区策略,拆分和合并操作使每个分区的大小维持在设定的最小值和最大值之间,因此分区的数量与数据集的大小成正比关系。另一方面,对于固定数量的分区方式,其每个分区的大小也与数据集的大小成正比。两种情况,分区的数量都与节点数无关。

Cassandra 和 Ketama 则采用了第三种方式,使分区数与集群节点数成正比关系。换句话说,每个节点具有固定数量的分区。此时, 当节点数不变时,每个分区的大小与数据集大小保持正比的增长关系; 当节点数增加时,分区则会调整变得更小。较大的数据量通常需要大量的节点来存储,因此这种方陆也使每个分区大小保持稳定。当一个

新节点加入集群时,它随机选择固定数量的现有分区进行分裂,然后拿走这些分区的一半数据量,将另一半数据留在原节点。随机选择分区边界的前提要求采用基于哈希分区(可以从哈希函数产生的数字范围里设置边界)

自动与手动再平衡操作

全自动式再平衡会更加方便,它在正常维护之外所增加的操作很少。但是,也有可能出现结果难以预测的情况。再平衡总体讲是个比较昂贵的操作,它需要重新路由请求井将大量数据从一个节点迁移到另一个节点。万一执行过程中间出现异常,会使网络或节点的负载过重,井影响其他请求的性能。

将自动平衡与自动故障检测相结合也可能存在一些风险。例如,假设某个节点负载过重,对请求的响应暂时受到影响,而其他节点可能会得到结论:该节点已经失效;接下来激活自动平衡来转移其负载。客观上这会加重该节点、其他节点以及网络的负荷,可能会使总体情况变得更槽,甚至导致级联式的失效扩散。

请求路由

处理策略

  1. 允许客户端链接任意的节点(例如,采用循环式的负载均衡器)。如果某节点恰好拥有所请求的分区,贝 lj 直接处理该请求:否则,将请求转发到下一个合适的节点,接收答复,并将答复返回给客户端。
  2. 将所有客户端的请求都发送到一个路由层,由后者负责将请求转发到对应的分区节点上。路由层本身不处理任何请求,它仅充一个分区感知的负载均衡器。
  3. 客户端感知分区和节点分配关系。此时,客户端可以直接连接到目标节点,而不需要任何中介。

img

许多分布式数据系统依靠独立的协调服务(如 ZooKeeper )跟踪集群范围内的元数据。每个节点都向 ZooKeeper 中注册自己, ZooKeeper 维护了分区到节点的最终映射关系。其他参与者(如路由层或分区感知的客户端)可以向 ZooKeeper 订阅此信息。一旦分区发生了改变,或者添加、删除节点, ZooKeeper 就会主动通知路由层,这样使路由信息保持最新状态。

img

参考资料

分布式复制

分布式复制是指:在多个节点上保存相同数据的副本,每个副本具体的存储位置可能不尽相同。复制方住可以提供冗余:如果某些节点发生不可用,则可以通过其他节点继续提供数据访问服务。复制也可以帮助提高系统性能。

分布式复制简介

数据复制的作用

  • 使数据在地理位置上更接近用户,从而降低访问延迟
  • 当部分组件出现位障,系统依然可以继续工作,从而提高可用性
  • 扩展至多台机器以同时提供数据访问服务,从而提高读吞吐量

分布式系统的复制方式有以下几种:

  • 主从复制
  • 多主节点复制
  • 无主节点复制

分布式系统的复制需要考虑以下问题:

  • 同步还是异步
  • 如何处理失败的副本
  • 如何保证数据一致

主节点与从节点

每个保存数据库完整数据集的节点称之为副本。有了多副本,必然会面临一个问题:如何确保所有副本之间的数据是一致的?

主从复制的工作原理如下:

  1. 指定某一个副本为主副本(或称为主节点) 。当客户写数据库时,必须将写请求首先发送给主副本,主副本首先将新数据写入本地存储。
  2. 其他副本则全部称为从副本(或称为从节点)。主副本把新数据写入本地存储后,然后将数据更改作为复制的日志或更改流发送给所有从副本。每个从副本获得更改日志之后将其应用到本地,且严格保持与主副本相同的写入顺序。
  3. 客户端从数据库中读数据时,可以在主副本或者从副本上执行查询。再次强调,只有主副本才可以接受写请求:从客户端的角度来看,从副本都是只读的。

img

典型应用:

  • 数据库:MySql、MongoDB 等
  • 消息队列:Kafka、RabbitMQ 等

同步复制与异步复制

对于关系数据库系统,同步或异步通常是一个可配置的选项:而其他系统则可能是硬性指定或者只能二选一。同步复制与异步复制基本流程是,客户将更新请求发送给主节点,主节点接收到请求,接下来将数据更新转发给从节点。最后,由主节点来通知客户更新完成。

img

通常情况下, 复制速度会非常快,例如多数数据库系统可以在一秒之内完成所有从节点的更新。但是,系统其实并没有保证一定会在多段时间内完成复制。有些情况下,从节点可能落后主节点几分钟甚至更长时间,例如,由于从节点刚从故障中恢复,或者系统已经接近最大设计上限,或者节点之间的网络出现问题。

  • 同步复制的优点: 一旦向用户确认,从节点可以明确保证完成了与主节点的更新同步,数据已经处于最新版本。万一主节点发生故障,总是可以在从节点继续访问最新数据。
  • 同步复制的缺点:如果同步的从节点无法完成确认(例如由于从节点发生崩愤,或者网络故障,或任何其他原因), 写入就不能视为成功。主节点会阻塞其后所有的写操作,直到同步副本确认完成。

因此,把所有从节点都配置为同步复制有些不切实际。因为这样的话,任何一个同步节点的中断都会导致整个系统更新停滞不前。实际应用中,很多数据库推荐的模式是:只要有一个从节点或半数以上的从节点同步成功,就视为同步,直接返回结果;剩下的节点都通过异步方式同步。万一同步的从节点变得不可用或性能下降, 则将另一个异步的从节点提升为同步模式。这样可以保证至少有两个节点(即主节点和一个同步从节点)拥有最新的数据副本。这种配置有时也称为半同步 。

主从复制还经常会被配置为全异步模式。此时如果主节点发生失败且不可恢复,则所有尚未复制到从节点的写请求都会丢失。这意味着即使向客户端确认了写操作, 却无法保证数据的持久化。但全异步配置的优点则是,不管从节点上数据多么滞后, 主节点总是可以继续响应写请求,系统的吞吐性能更好。

  • 异步复制的优点:不管从节点上数据多么滞后,主节点总是可以继续响应写请求,系统的吞吐性能更好。
  • 异步复制的缺点:如果主节点发生失败且不可恢复,则所有尚未复制到从节点的写请求都会丢失。

配置新的从节点

当如果出现以下情况时,如需要增加副本数以提高容错能力,或者替换失败的副本,就需要考虑增加新的从节点。但如何确保新的从节点和主节点保持数据一致呢?

简单地将数据文件从一个节点复制到另一个节点通常是不够的。主要是因为客户端仍在不断向数据库写入新数据,数据始终处于不断变化之中,因此常规的文件拷贝方式将会导致不同节点上呈现出不同时间点的数据。

另一种思路是:考虑锁定数据库(使其不可写)来使磁盘上的文件保持一致,但这会违反高可用的设计目标。

在不停机、数据服务不中断的前提下,也有一种可行性复制方案,其主要操作步骤如下:

  1. 在某个时间点对主节点的数据副本产生一个一致性快照,这样避免长时间锁定整个数据库。目前大多数数据库都支持此功能,快照也是系统备份所必需的。而在某些情况下,可能需要第三方工具, 如 MySQL 的 innobackupex。
  2. 将此快照拷贝到新的从节点。
  3. 从节点连接到主节点并请求快照点之后所发生的数据更改日志。因为在第一步创建快照时,快照与系统复制日志的某个确定位置相关联,这个位置信息在不同的系统有不同的称呼,如 PostgreSQL 将其称为“ log sequence number” (日志序列号),而 MySQL 将其称为“ binlog coordinates ” 。
  4. 获得日志之后,从节点来应用这些快照点之后所有数据变更,这个过程称之为追赶。接下来,它可以继续处理主节点上新的数据变化。井重复步骤 1 ~步骤 4 。

建立新的从副本具体操作步骤可能因数据库系统而异。

处理节点失效

系统中的任何节点都可能因故障或者计划内的维护(例如重启节点以安装内核安全补丁)而导致中断甚至停机。如果能够在不停机的情况下重启某个节点,这会对运维带来巨大的便利。我们的目标是,尽管个别节点会出现中断,但要保持系统总体的持续运行,并尽可能减小节点中断带来的影响。

如何通过主从复制技术来实现系统高可用呢?

从节点失效: 追赶式恢复

从节点的本地磁盘上都保存了副本收到的数据变更日志。如果从节点发生崩溃,然后顺利重启,或者主从节点之间的网络发生暂时中断(闪断),则恢复比较容易,根据副本的复制日志,从节点可以知道在发生故障之前所处理的最后一笔事务,然后连接到主节点,并请求自那笔事务之后中断期间内所有的数据变更。在收到这些数据变更日志之后,将其应用到本地来追赶主节点。之后就和正常情况一样持续接收来自主节点数据流的变化。

主节点失效:节点切换

选择某个从节点将其提升为主节点;客户端也需要更新,这样之后的写请求会发送给新的主节点,然后其他从节点要接受来自新的主节点上的变更数据,这一过程称之为切换。

故障切换可以手动进行,例如通知管理员主节点发生失效,采取必要的步骤来创建新的主节点;或者以自动方式进行。自动切换的步骤通常如下:

  1. 确认主节点失效。有很多种出错可能性,很难准确检测出问题的原因,所以大多数系统都采用了基于超时的机制:节点间频繁地互相发生发送心跳悄息,如果发现某一个节点在一段比较长时间内(例如 30s )没有响应,即认为该节点发生失效。
  2. 选举新的主节点。可以通过选举的方式(超过多数的节点达成共识)来选举新的主节点,或者由之前选定的某控制节点来指定新的主节点。候选节点最好与原主节点的数据差异最小,这样可以最小化数据丢失的风险。让所有节点同意新的主节点是个典型的共识问题。
  3. 重新配置系统使新主节点生效。客户端现在需要将写请求发送给新的主节点。如果原主节点之后重新上线,可能仍然自认为是主节点,而没有意识到其他节点已经达成共识迫使其下台。这时系统要确保原主节点降级为从节点,并认可新的主节点。

上述切换过程依然充满了很多变数:

  • 如果使用了异步复制,且失效之前,新的主节点并未收到原主节点的所有数据;在选举之后,原主节点很快又重新上线并加入到集群,接下来的写操作会发生什么?新的主节点很可能会收到冲突的写请求,这是因为原主节点未意识的角色变化,还会尝试同步其他从节点,但其中的一个现在已经接管成为现任主节点。常见的解决方案是,原主节点上未完成复制的写请求就此丢弃,但这可能会违背数据更新持久化的承诺。
  • 如果在数据库之外有其他系统依赖于数据库的内容并在一起协同使用,丢弃数据的方案就特别危险。例如,在 GitHub 的一个事故中,某个数据并非完全同步的 MySQL 从节点被提升为主副本,数据库使用了自增计数器将主键分配给新创建的行,但是因为新的主节点计数器落后于原主节点( 即二者并非完全同步),它重新使用了已被原主节点分配出去的某些主键,而恰好这些主键已被外部 Redis 所引用,结果出现 MySQL 和 Redis 之间的不一致,最后导致了某些私有数据被错误地泄露给了其他用户。
  • 在某些故障情况下,可能会发生两个节点同时-都自认为是主节点。这种情况被称为脑裂,它非常危险:两个主节点都可能接受写请求,并且没有很好解决冲突的办法,最后数据可能会丢失或者破坏。作为一种安全应急方案,有些系统会采取措施来强制关闭其中一个节点。然而,如果设计或者实现考虑不周,可能会出现两个节点都被关闭的情况。
  • 如何设置合适的超时来检测主节点失效呢? 主节点失效后,超时时间设置得越长也意味着总体恢复时间就越长。但如果超时设置太短,可能会导致很多不必要的切换。例如,突发的负载峰值会导致节点的响应时间变长甚至超肘,或者由于网络故障导致延迟增加。如果系统此时已经处于高负载压力或网络已经出现严重拥塞,不必要的切换操作只会使总体情况变得更糟。

复制日志的实现

基于语句的复制

最简单的情况,主节点记录所执行的每个写请求(操作语句)井将该操作语句作为日志发送给从节点。对于关系数据库,这意味着每个 INSERT 、UPDATE 或 DELETE 语句都会转发给从节点,并且每个从节点都会分析井执行这些 SQU 吾句,如同它们是来自客户端那样。

听起来很合理也不复杂,但这种复制方式有一些不适用的场景:

  • 任何调用非确定性函数的语句,如 NOW() 获取当前时间,或 RAND() 获取一个随机数等,可能会在不同的副本上产生不同的值。
  • 如果语句中使用了自增列,或者依赖于数据库的现有数据(例如, UPDATE ... WHERE <某些条件>),则所有副本必须按照完全相同的顺序执行,否则可能会带来不同的结果。进而,如果有多个同时并发执行的事务时, 会有很大的限制。
  • 有副作用的语句(例如,触发器、存储过程、用户定义的函数等),可能会在每个副本上产生不同的副作用。

有可能采取一些特殊措施来解决这些问题,例如,主节点可以在记录操作语句时将非确定性函数替换为执行之后的确定的结果,这样所有节点直接使用相同的结果值。但是,这里面存在太多边界条件需要考虑,因此目前通常首选的是其他复制实现方案。

MySQL 5.1 版本之前采用基于操作语句的复制。现在由于逻辑紧凑,依然在用,但是默认情况下,如果语句中存在一些不确定性操作,则 MySQL 会切换到基于行的复制(稍后讨论)。VoltDB 使用基于语句的复制,它通过事务级别的确定性来保证复制的安全。

基于预写日志(WAL)传输

通常每个写操作都是以追加写的方式写入到日志中:

  • 对于日志结构存储引擎,日志是主要的存储方式。日志段在后台压缩井支持垃圾回收。
  • 对于采用覆写磁盘的 BTree 结构,每次修改会预先写入日志,如系统发生崩溃,通过索引更新的方式迅速恢复到此前一致状态。

不管哪种情况,所有对数据库写入的字节序列都被记入日志。因此可以使用完全相同的日志在另一个节点上构建副本:除了将日志写入磁盘之外, 主节点还可以通过网络将其发送给从节点。

PostgreSQL 、Oracle 以及其他系统等支持这种复制方式。其主要缺点是日志描述的数据结果非常底层: 一个 WAL 包含了哪些磁盘块的哪些字节发生改变,诸如此类的细节。这使得复制方案和存储引擎紧密耦合。如果数据库的存储格式从一个版本改为另一个版本,那么系统通常无能支持主从节点上运行不同版本的软件。

看起来这似乎只是个有关实现方面的小细节,但可能对运营产生巨大的影响。如果复制协议允许从节点的软件版本比主节点更新,则可以实现数据库软件的不停机升级:首先升级从节点,然后执行主节点切换,使升级后的从节点成为新的主节点。相反,复制协议如果要求版本必须严格一致(例如 WALf 专输),那么就势必以停机为代价。

基于行的逻辑日志复制

另一种方能是复制和存储引擎采用不同的日志格式,这样复制与存储逻辑剥离。这种复制日志称为逻辑日志,以区分物理存储引擎的数据表示。

关系数据库的逻辑日志通常是指一系列记录来描述数据表行级别的写请求:

  • 对于行插入,日志包含所有相关列的新值。
  • 对于行删除,日志里有足够的信息来唯一标识已删除的行,通常是靠主键,但如果表上没有定义主键,就需要记录所有列的旧值。
  • 对于行更新,日志包含足够的信息来唯一标识更新的行,以及所有列的新值(或至少包含所有已更新列的新值)。

如果一条事务涉及多行的修改,则会产生多个这样的日志记录,并在后面跟着一条记录,指出该事务已经提交。MySQL 的二进制日志 binlog (当配置为基于行的复制时)使用该方式。

由于逻辑日志与存储引擎逻辑解耦,因此可以更容易地保持向后兼容,从而使主从节点能够运行不同版本的软件甚至是不同的存储引擎。

对于外部应用程序来说,逻辑日志格式也更容易解析。

基于触发器的复制

在某些情况下,我们可能需要更高的灵活性。例如,只想复制数据的一部分,或者想从一种数据库复制到另一种数据库,或者需要订制、管理冲突解决逻辑( 参阅本章后面的“处理写冲突”),则需要将复制控制交给应用程序层。

有一些工具,可以通过读取数据库日志让应用程序获取数据变更。另一种方法则是借助许多关系数据库都支持的功能:触发器和存储过程。

触发器支持注册自己的应用层代码,使得当数据库系统发生数据更改(写事务)时自动执行上述自定义代码。通过触发器技术,可以将数据更改记录到一个单独的表中,然后外部处理逻辑访问该表,实施必要的自定义应用层逻辑,例如将数据更改复制到另一个系统。Oracle 的 Databus 和 Postgres 的 Bucardo 就是这种技术的典型代表。基于触发器的复制通常比其他复制方式开销更高, 也比数据库内置复制更容易出错,或者暴露一些限制。然而,其高度灵活性仍有用武之地。

复制滞后问题

主从复制要求所有写请求都经由主节点,而任何副本只能接受只读查询。对于读操作密集的负载(如 Web ),这是一个不错的选择:创建多个从副本,将读请求分发给这些从副本,从而减轻主节点负载井允许读取请求就近满足。

在这种扩展体系下,只需添加更多的从副本,就可以提高读请求的服务吞吐量。但是,这种方法实际上只能用于异步复制,如果试图同步复制所有的从副本,则单个节点故障或网络中断将使整个系统无法写入。而且节点越多,发生故障的概率越高,所以完全同步的配置现实中反而非常不可靠。

不幸的是,如果一个应用正好从一个异步的从节点读取数据,而该副本落后于主节点,则应用可能会读到过期的信息。这会导致数据库中出现明显的不一致:由于并非所有的写入都反映在从副本上,如果同时对主节点和从节点发起相同的查询,可能会得到不同的结果。经过一段时间之后,从节点最终会赶上并与主节点数据保持一致。这种效应也被称为最终一致性

读自己的写

许多应用让用户提交一些数据,接下来查看他们自己所提交的内容。例如客户数据库中的记录,亦或者是讨论主题的评论等。提交新数据须发送到主节点,但是当用户读取数据时,数据可能来自从节点。这对于读密集和偶尔写入的负载是个非常合适的方案。

然而对于异步复制存在这样一个问题,如图所示,用户在写入不久即查看数据,则新数据可能尚未到达从节点。对用户来讲, 看起来似乎是刚刚提交的数据丢失了,显然用户不会高兴。

img

对于这种情况,我们需要读写一致性。该机制保证如果用户重新加载页面, 他们总能看到自己最近提交的更新。但对其他用户则没有任何保证,这些用户的更新可能会在稍后才能刷新看到。如何实现呢?有以下几种可行性方案:

  • 如果用户访问可能会被修改的内容,从主节点读取; 否则,在从节点读取。这背后就要求有一些方法在实际执行查询之前,就已经知道内容是否可能会被修改。例如,社交网络上的用户首页信息通常只能由所有者编辑,而其他人无法编辑。因此,这就形成一个简单的规则: 总是从主节点读取用户自己的首页配置文件,而在从节点读取其他用户的配置文件。
  • 如果应用的大部分内容都可能被所有用户修改,那么上述方法将不太有效,它会导致大部分内容都必须经由主节点,这就丧失了读操作的扩展性。此时需要其他方案来判断是否从主节点读取。例如,跟踪最近更新的时间,如果更新后一分钟之内,则总是在主节点读取;井监控从节点的复制滞后程度,避免从那些滞后时间超过一分钟的从节点读取。
  • 客户端还可以记住最近更新时的时间戳,井附带在读请求中,据此信息,系统可以确保对该用户提供读服务时都应该至少包含了该时间戳的更新。如果不够新,要么交由另一个副本来处理,要么等待直到副本接收到了最近的更新。时间戳可以是逻辑时间戳(例如用来指示写入顺序的日志序列号)或实际系统时钟(在这种情况下,时钟同步又称为一个关键点)。
  • 如果副本分布在多数据中心(例如考虑与用户的地理接近,以及高可用性),情况会更复杂些。必须先把请求路由到主节点所在的数据中心(该数据中心可能离用户很远)。

如果同一用户可能会从多个设备访问数据,情况会更加复杂。

  • 记住用户上次更新时间戳的方法实现起来会比较困难,因为在一台设备上运行的代码完全无法知道在其他设备上发生了什么。此时,元数据必须做到全局共享。
  • 如果副本分布在多数据中心,无法保证来自不同设备的连接经过路由之后都到达同一个数据中心。例如,用户的台式计算机使用了家庭宽带连接,而移动设备则使用蜂窝数据网络,不同设备的网络连接线路可能完全不同。如果方案要求必须从主节点读取,则首先需要想办毡确保将来自不同设备的请求路由到同一个数据中心。

单调读

img

用户看到了最新内容之后又读到了过期的内容,好像时间被回拨, 此时需要单调读一致性。

单调读一致性可以确保不会发生这种异常。这是一个比强一致性弱,但比最终一致性强的保证。当读取数据时,单调读保证,如果某个用户依次进行多次读取,则他绝不会看到回攘现象,即在读取较新值之后又发生读旧值的情况。

实现单调读的一种方式是,确保每个用户总是从固定的同一副本执行读取(而不同的用户可以从不同的副本读取)。例如,基于用户 ID 的哈希的方怯而不是随机选择副本。但如果该副本发生失效,则用户的查询必须重新路由到另一个副本。

前缀一致读

前缀一致读:对于一系列按照某个顺序发生的写请求,那么读取这些内容时也会按照当时写入的顺序。

如果数据库总是以相同的顺序写入,则读取总是看到一致的序列,不会发生这种反常。然而,在许多分布式数据库中,不同的分区独立运行,因此不存在全局写入顺序。这就导致当用户从数据库中读数据时,可能会看到数据库的某部分旧值和另一部分新值。

一个解决方案是确保任何具有因果顺序关系的写入都交给一个分区来完成,但该方案真实实现效率会大打折扣。现在有一些新的算法来显式地追踪事件因果关系。

复制滞后的解决方案

使用最终一致性系统时,最好事先就思考这样的问题:如果复制延迟增加到几分钟甚至几小时,那么应用层的行为会是什么样子?如果这种情况不可接受,那么在设计系统肘,就要考虑提供一个更强的一致性保证,比如写后读; 如果系统设计时假定是同步复制,但最终它事实上成为了异步复制,就可能会导致灾难性后果。

在应用层可以提供比底层数据库更强有力的保证。例如只在主节点上进行特定类型的读取,而代价则是,应用层代码中处理这些问题通常会非常复杂,且容易出错。

如果应用程序开发人员不必担心这么多底层的复制问题,而是假定数据库在“做正确的事情”,情况就变得很简单。而这也是事务存在的原因,事务是数据库提供更强保证的一种方式。

单节点上支持事务已经非常成熟。然而,在转向分布式数据库(即支持复制和分区)的过程中,有许多系统却选择放弃支持事务,并声称事务在性能与可用性方面代价过高,所以选择了最终一致性。

多主节点复制

主从复制方法较为常见,但存在一个明显的缺点:系统只有一个主节点,而所有写入都必须经由主节点。如果由于某种原因,例如与主节点之间的网络中断而导致主节点无法连接,主从复制方案就会影响所有的写入操作。

对主从复制模型进行自然的扩展,则可以配置多个主节点,每个主节点都可以接受写 s 操作,后面复制的流程类似: 处理写的每个主节点都必须将该数据更改转发到所有其他节点。这就是多主节点( 也称为主-主,或主动/主动)复制。此时,每个主节点还同时扮演其他主节点的从节点。

适用场景

在一个数据中心内部使用多主节点基本没有太大意义,其复杂性已经超过所能带来的好处。

但是,以下场景这种配置则是合理的:

  • 多数据中心
  • 离线客户端操作
  • 协作编辑

多数据中心

为了容忍整个数据中心级别故障或者更接近用户,可以把数据库的副本横跨多个数据中心。而如果使用常规的基于主从的复制模型,主节点势必只能放在其中的某一个数据中心,而所有写请求都必须经过该数据中心。

有了多主节点复制模型,则可以在每个数据中心都配置主节点。在每个数据中心内,采用常规的主从复制方案;而在数据中心之间,由各个数据中心的主节点来负责同其他数据中心的主节点进行数据的交换、更新。

部署单主节点的主从复制方案与多主复制方案之间的差异

  • 性能:对于主从复制,每个写请求都必须经由广域网传送至主节点所在的数据中心。这会大大增加写入延迟,井基本偏离了采用多数据中心的初衷(即就近访问)。而在多主节点模型中,每个写操作都可以在本地数据中心快速响应,然后采用异步复制方式将变化同步到其他数据中心。因此,对上层应用有效屏蔽了数据中心之间的网络延迟,使得终端用户所体验到的性能更好。
  • 容忍数据中心失效:对于主从复制,如果主节点所在的数据中心发生故障,必须切换至另一个数据中心,将其中的一个从节点被提升为主节点。在多主节点模型中,每个数据中心则可以独立于其他数据中心继续运行,发生故障的数据中心在恢复之后更新到最新状态。
  • 容忍网络问题:数据中心之间的通信通常经由广域网,它往往不如数据中心内的本地网络可靠。对于主从复制模型,由于写请求是同步操作,对数据中心之间的网络性能和稳定性等更加依赖。多主节点模型则通常采用异步复制,可以更好地容忍此类问题,例如临时网络闪断不会妨碍写请求最终成功。

离线客户端操作

另一种多主复制比较适合的场景是,应用在与网络断开后还需要继续工作。

这种情况下,每个设备都有一个充当主节点的本地数据库(用来接受写请求),然后在所有设备之间采用异步方式同步这些多主节点上的副本,同步滞后可能是几小时或者数天,具体时间取决于设备何时可以再次联网。

从架构层面来看,上述设置基本上等同于数据中心之间的多主复制,只不过是个极端情况,即一个设备就是数据中心,而且它们之间的网络连接非常不可靠。多个设备同步日历的例子表明,多主节点可以得到想要的结果,但中间过程依然有很多的未知数。

有一些工具可以使多主配置更为容易,如 CouchDB 就是为这种操作模式而设计的。

协作编辑

实时协作编辑应用程序允许多个用户同时编辑文档。

我们通常不会将协作编辑完全等价于数据库复制问题,但二者确实有很多相似之处。当一个用户编辑文档时· ,所做的更改会立即应用到本地副本( Web 浏览器或客户端应用程序),然后异步复制到服务器以及编辑同一文档的其他用户。如果要确保不会发生编辑冲突,则应用程序必须先将文档锁定,然后才能对其进行编辑。如果另一个用户想要编辑同一个文档, 首先必须等到第一个用户提交修改并释放锁。这种协作模式相当于主从复制模型下在主节点上执行事务操作。

为了加快协作编辑的效率, 可编辑的粒度需要非常小。例如,单个按键甚至是全程无锁。然而另一方面, 也会面临所有多主复制都存在的挑战, 即如何解决冲突。

处理写冲突

多主复制的最大问题是可能发生写冲突。

同步与异步冲突检测

如果是主从复制数据库,第二个写请求要么会被阻塞直到第一个写完成, 要么被中止(用户必须重试) 。然而在多主节点的复制模型下,这两个写请求都是成功的,井且只能在稍后的时间点上才能异步检测到冲突,那时再要求用户层来解决冲突为时已晚。

理论上, 也可以做到同步冲突检测,即等待写请求完成对所有副本的同步,然后再通知用户写入成功。但是,这样做将会失去多主节点的主要优势:允许每个主节点独立接受写请求。如果确实想要同步方式冲突检测,或许应该考虑采用单主节点的主从复制模型。

避免冲突

处理冲突最理想的策略是避免发生冲突,即如果应用层可以保证对特定记录的写请求总是通过同一个主节点,这样就不会发生写冲突。现实中,由于不少多主节点复制模型所实现的冲突解决方案存在瑕疵,因此,避免冲突反而成为大家普遍推荐的首选方案。

但是,有时可能需要改变事先指定的主节点,例如由于该数据中心发生故障,不得不将流量重新路由到其他数据中心,或者是因为用户已经漫游到另一个位置,因而更靠近新数据中心。此时,冲突避免方式不再有效,必须有措施来处理同时写入冲突的可能性。

收敛于一致状态

对于主从复制模型,数据更新符合顺序性原则,即如果同一个字段有多个更新,则最后一个写操作将决定该字段的最终值。

对于多主节点复制模型,由于不存在这样的写入顺序,所以最终值也会变得不确定。

实现收敛的冲突解决有以下可能的方式:

  • 给每个写入分配唯一的 ID ,例如, 一个时间戳, 一个足够长的随机数, 一个 UUID 或者一个基于键-值的哈希,挑选最高 ID 的写入作为胜利者,并将其他写入丢弃。如果基于时间戳,这种技术被称为最后写入者获胜。虽然这种方法很流行,但是很容易造成数据丢失。
  • 为每个副本分配一个唯一的 ID ,井制定规则,例如序号高的副本写入始终优先于序号低的副本。这种方法也可能会导致数据丢失。
  • 以某种方式将这些值合并在一起。例如,按字母顺序排序,然后拼接在一起。
  • 利用预定义好的格式来记录和保留冲突相关的所有信息,然后依靠应用层的逻辑,事后解决冲突(可能会提示用户) 。

自定义冲突解决逻辑

解决冲突最合适的方式可能还是依靠应用层,所以大多数多主节点复制模型都有工具来让用户编写应用代码来解决冲突。可以在写入时或在读取时执行这些代码逻辑:

  • 在写入时执行:只要数据库系统在复制变更日志时检测到冲突,就会调用应用层的冲突处理程序。
  • 在读取时执行:当检测到冲突时,所有冲突写入值都会暂时保存下来。下一次读取数据时,会将数据的多个版本读返回给应用层。应用层可能会提示用户或自动解决冲突, 井将最后的结果返回到数据库。

注意, 冲突解决通常用于单个行或文档, 而不是整个事务。因此,如果有一个原子事务包含多个不同写请求,每个写请求仍然是分开考虑来解决冲突。

无主节点复制

单主节点和多主节点复制,都是基于这样一种核心思路,即客户端先向某个节点(主节点)发送写请求,然后数据库系统负责将写请求复制到其他副本。由主节点决定写操作的顺序, 从节点按照相同的顺序来应用主节点所发送的写日志。

一些数据存储系统则采用了不同的设计思路:选择放弃主节点,允许任何副本直接接受来自客户端的写请求。对于某些无主节点系统实现,客户端直接将其写请求发送到多副本,而在其他一些实现中,由一个协调者节点代表客户端进行写人,但与主节点的数据库不同,协调者井不负责写入顺序的维护。

参考资料

DevOps 简介

什么是 DevOps

什么是 DevOps?DevOps 集文化理念、实践和工具于一身,它强调团队授权、跨团队沟通和协作以及技术自动化,其最终目标是优化质量和交付

DevOps 理念,旨在打破开发工程师和运维工程师的壁垒,强调两个团队合而为一,在产品的整个生命周期(从开发、测试、部署再到运维、运营)内相互协作,工程师不再限于单一职能。

DevOps 始于 2007 年左右,当时的开发和运维对传统的软件开发模式提出了担忧:在这种模式下,编写代码的开发人员与负责部署的运维人员分开工作。 DevOps 一词是开发(development)和运维(operations)这两个词的组合,反映了将二者合而为一的过程。

DevOps 如何工作

DevOps 团队包括在整个产品生命周期中协同工作的开发人员和运维人员,以提高软件部署的速度和质量。这是一种新的工作方式,一种文化转变,对团队及其工作的组织具有重要意义。

在 DevOps 模型下,开发和运维团队不再“孤立”。有时,这两个团队甚至会合并为一个团队,工程师在整个应用程序生命周期中工作,需要具备从开发、测试到部署和运维的复合型能力。

DevOps 团队使用工具来自动化和优化流程,这有助于提高可靠性。 DevOps 工具链可帮助团队处理重要的 DevOps 基础知识,包括持续集成、持续交付、自动化和协作。

DevOps 价值观也适用于开发以外的团队。如果 QA、安全团队也和开发、运维团队紧密地结合在一起,贯穿产品的整个生命周期。此时,安全成为了所有 DevOps 团队成员的工作重点,此时可以称为为 “DevSecOps”。

DevOps 的生命周期

由于 DevOps 的连续性,可以使用无限循环来展示 DevOps 生命周期的各个阶段是如何相互关联的。尽管看起来是按顺序流动的,但循环象征着在整个生命周期中始终保持持续迭代。

DevOps 生命周期由六个阶段组成,分别代表开发和运维所需的流程、功能和工具。在每个阶段,团队协作和沟通以保持一致性、速度和质量。

img

图片来自 https://www.tasksgrid.com/devops-guide/

DevOps 的优势

  • 速度:应用 DevOps 可以更频繁地发布可交付成果,并且质量和稳定性也更高。高效的迭代,可以根据客户和市场反馈进行快速响应,以适应市场变化,有效推动业务发展。
  • 促进协作:DevOps 的基础是开发和运维之间的协作文化,两个团队紧密协作,共同承担诸多责任,并将各自的工作流程相互融合。这有助于减少效率低下的工作,同时节约大家的时间。
  • 快速发布:提高发布的频率和速度,以便能够更快速地进行创新并完善产品。您发布新功能和修复错误的速度越快,就越能快速地响应客户需求并建立竞争优势。持续集成和持续交付是自动执行软件发布流程(从构建到部署)的两项实践经验。
  • 可靠性:持续集成和持续部署等实践可检验程序变更后,功能是否正常,是否安全,从而提高软件产品的交付质量。监控和日志记录可以帮助团队实时了解服务当前的运行状态。
  • 规模:大规模运行和管理您的基础设施及开发流程。自动化和一致性可在降低风险的同时,帮助您有效管理复杂或不断变化的系统。例如,基础设施即代码能够帮助您以一种可重复且更有效的方式来管理部署、测试和生产环境。
  • 安全性:通过将自动实施的合规性策略、精细控制和配置管理技术集成到敏捷开发和 DevOps 工作流程中,使得产品内置了安全性。

DevOps 工具

DevOps 各生命周期阶段都有合适的工具可以作为解决方案。它们通过提高协作效率、减少上下文切换、引入自动化以及实现可监控来全方位增强 DevOps 实践。

DevOps 工具链通常遵循两种模式:完整解决方案或开放式工具链。

  • 完整解决方案实现了端到端的交付,流程很完备,但是一般难以兼容、集成第三方工具。
  • 开放式工具链允许使用不同的工具进行定制。

这两种方法各有利弊。

这里列举一些常见的 DevOps 工具:

  • 项目管理Jira
  • 文档管理Confluence
  • 代码管理GitlabGithub
  • CI/CDGitlabJenkins
  • 容器
    • Docker 将应用程序与该程序的依赖,打包在一个文件里面。运行这个文件,就会生成一个虚拟容器。程序在这个虚拟容器里运行,就好像在真实的物理机上运行一样。有了 Docker,就不用担心环境问题。
    • Kubernetes 是谷歌开源的容器集群管理系统 是用于自动部署,扩展和管理 Docker 应用程序的开源系统,简称 K8S。
  • 日志
    • ELK 技术栈,通过数据采集工具 Logstack、Beats 套件、日志存储、解析服务 ElasticSearch、日志可视化工具 Kibnana,形成了一套完整的端到端日志解决方案,深受业界好评。
  • 监控
    • ELK 的技术栈比较成熟,应用范围也比较广,除了可用作监控系统外,还可以用作日志查询和分析。
    • Prometheus 的独特之处在于它采用了拉数据的方式,对业务影响较小,同时也采用了时间序列数据库存储,而且支持独有的 PromQL 查询语言,功能强大而且简洁。
    • Grafana 是流行的监控数据分析和可视化套件。
    • Graphite 是基于时间序列数据库存储的监控系统,并且提供了功能强大的各种聚合函数比如 sum、average、top5 等可用于监控分析,而且对外提供了 API 也可以接入其他图形化监控系统如 Grafana。
  • 链路追踪
    • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
    • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
    • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
    • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。
  • 负载均衡
    • Nginx 可以作为四层或七层负载均衡器。
    • LVS 可以作为四层负载均衡器。其负载均衡的性能要优于 Nginx。
    • HAProxy 可以作为 HTTP 和 TCP 负载均衡器。
    • F5 作为硬件负载均衡
    • A10 作为硬件负载均衡
  • 网关
    • Kong 是一个云原生、快速、可扩展和分布式的微服务抽象层(也称为 API 网关,API 中间件)。
    • Zuul 是 Netflix 开源的一个 API 网关,Zuul 在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。
  • 告警:短信、邮件、企业聊天软件、OA

参考资料

Elasticsearch 映射

在 Elasticsearch 中,**Mapping**(映射),用来定义一个文档以及其所包含的字段如何被存储和索引,可以在映射中事先定义字段的数据类型、字段的权重、分词器等属性,就如同在关系型数据库中创建数据表时会设置字段的类型。

Mapping 会把 JSON 文档映射成 Lucene 所需要的扁平格式

一个 Mapping 属于一个索引的 Type

  • 每个文档都属于一个 Type
  • 一个 Type 有一个 Mapping 定义
  • 7.0 开始,不需要在 Mapping 定义中指定 type 信息

每个 document 都是 field 的集合,每个 field 都有自己的数据类型。映射数据时,可以创建一个 mapping,其中包含与 document 相关的 field 列表。映射定义还包括元数据 field,例如 _source ,它自定义如何处理 document 的关联元数据。

映射方式

在 Elasticsearch 中,映射可分为静态映射和动态映射。在关系型数据库中写入数据之前首先要建表,在建表语句中声明字段的属性,在 Elasticsearch 中,则不必如此,Elasticsearch 最重要的功能之一就是让你尽可能快地开始探索数据,文档写入 Elasticsearch 中,它会根据字段的类型自动识别,这种机制称为动态映射,而静态映射则是写入数据之前对字段的属性进行手工设置。

静态映射

ES 官方将静态映射称为显式映射(Explicit mapping静态映射是在创建索引时显示的指定索引映射。静态映射和 SQL 中在建表语句中指定字段属性类似。相比动态映射,通过静态映射可以添加更详细、更精准的配置信息。例如:

  • 哪些字符串字段应被视为全文字段。
  • 哪些字段包含数字、日期或地理位置。
  • 日期值的格式。
  • 用于控制动态添加字段的自定义规则。

【示例】创建索引时,显示指定 mapping

1
2
3
4
5
6
7
8
9
10
PUT /my-index-000001
{
"mappings": {
"properties": {
"age": { "type": "integer" },
"email": { "type": "keyword" },
"name": { "type": "text" }
}
}
}

【示例】在已存在的索引中,指定一个 field 的属性

1
2
3
4
5
6
7
8
9
PUT /my-index-000001/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false
}
}
}

【示例】查看 mapping

1
GET /my-index-000001/_mapping

【示例】查看指定 field 的 mapping

1
GET /my-index-000001/_mapping/field/employee-id

动态映射

动态映射机制,允许用户不手动定义映射,Elasticsearch 会自动识别字段类型。在实际项目中,如果遇到的业务在导入数据之前不确定有哪些字段,也不清楚字段的类型是什么,使用动态映射非常合适。当 Elasticsearch 在文档中碰到一个以前没见过的字段时,它会利用动态映射来决定该字段的类型,并自动把该字段添加到映射中。

示例:创建一个名为 data 的索引、其 mapping 类型为 _doc,并且有一个类型为 long 的字段 count

1
2
PUT data/_doc/1
{ "count": 5 }

动态字段映射

动态字段映射(Dynamic field mappings)是用于管理动态字段检测的规则。当 Elasticsearch 在文档中检测到新字段时,默认情况下会动态将该字段添加到类型映射中。

在 mapping 中可以通过将 dynamic 参数设置为 trueruntime 来开启动态映射。

dynamic 不同设置的作用:

可选值 说明
true 新字段被添加到 mapping 中。mapping 的默认设置。
runtime 新字段被添加到 mapping 中并作为运行时字段——这些字段不会被索引,但是可以在查询时出现在 _source 中。
false 新字段不会被索引或搜索,但仍会出现在返回匹配的 _source 字段中。这些字段不会添加到映射中,并且必须显式添加新字段。
strict 如果检测到新字段,则会抛出异常并拒绝文档。必须将新字段显式添加到映射中。

需要注意的是:对已有字段,一旦已经有数据写入,就不再支持修改字段定义。如果希望改变字段类型,必须重建索引。这是由于 Lucene 实现的倒排索引,一旦生成后,就不允许修改。如果修改了字段的数据类型,会导致已被索引的字段无法被搜索。

启用动态字段映射后,Elasticsearch 使用内置规则来确定如何映射每个字段的数据类型。规则如下:

JSON 数据类型 "dynamic":"true" "dynamic":"runtime"
null 没有字段被添加 没有字段被添加
true or false boolean 类型 boolean 类型
浮点型数字 float 类型 double 类型
数字 数字型 long 类型
JSON 对象 object 类型 没有字段被添加
数组 由数组中第一个非空值决定 由数组中第一个非空值决定
开启日期检测的字符串 date 类型 date 类型
开启数字检测的字符串 float 类型或 long类型 double 类型或 long 类型
什么也没开启的字符串 带有 .keyword 子 field 的 text 类型 keyword 类型

下面举一个例子认识动态 mapping,在 Elasticsearch 中创建一个新的索引并查看它的 mapping,命令如下:

1
2
PUT books
GET books/_mapping

此时 books 索引的 mapping 是空的,返回结果如下:

1
2
3
4
5
{
"books": {
"mappings": {}
}
}

再往 books 索引中写入一条文档,命令如下:

1
2
3
4
5
6
PUT books/it/1
{
"id": 1,
"publish_date": "2019-11-10",
"name": "master Elasticsearch"
}

文档写入完成之后,再次查看 mapping,返回结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"books": {
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"publish_date": {
"type": "date"
}
}
}
}
}

动态映射有时可能会错误的识别字段类型,这种情况下,可能会导致一些功能无法正常使用,如 Range 查询。所以,使用动态 mapping 要结合实际业务需求来综合考虑,如果将 Elasticsearch 当作主要的数据存储使用,并且希望出现未知字段时抛出异常来提醒你注意这一问题,那么开启动态 mapping 并不适用。

动态模板

动态模板(dynamic templates是用于给 mapping 动态添加字段的自定义规则。

动态模板可以设置匹配条件,只有匹配的情况下才使用动态模板:

  • match_mapping_type 对 Elasticsearch 检测到的数据类型进行操作
  • matchunmatch 使用模式匹配字段名称
  • path_matchpath_unmatch 对字段的完整虚线路径进行操作
  • 如果动态模板没有定义 match_mapping_typematchpath_match,则不会匹配任何字段。您仍然可以在批量请求的 dynamic_templates 部分按名称引用模板。

【示例】当设置 'dynamic':'true' 时,Elasticsearch 会将字符串字段映射为带有关键字子字段的文本字段。如果只是索引结构化内容并且对全文搜索不感兴趣,可以让 Elasticsearch 仅将字段映射为关键字字段。这种情况下,只有完全匹配才能搜索到这些字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT my-index-000001
{
"mappings": {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}

运行时字段

运行时字段是在查询时评估的字段。运行时字段有以下作用:

  • 在不重新索引数据的情况下,向现有文档添加字段
  • 在不了解数据结构的情况下,也可以处理数据
  • 在查询时覆盖从索引字段返回的值
  • 为特定用途定义字段而不修改底层架构

检索 Elasticsearch 时,运行时字段和其他字段并没有什么不同。

需要注意的是:使用 _search API 上的 fields 参数来检索运行时字段的值。运行时字段不会显示在 _source 中,但 fields API 适用于所有字段,即使是那些未作为原始 _source 的一部分发送的字段。

运行时字段在处理日志数据时很有用,尤其是当日志是不确定的数据结构时:这种情况下,会降低搜索速度,但您的索引大小要小得多,您可以更快地处理日志,而无需为它们设置索引。

运行时字段的优点

因为运行时字段没有被索引,所以添加运行时字段不会增加索引大小。用户可以直接在 mapping 中定义运行时字段,从而节省存储成本并提高采集数据的速度。定义了运行时字段后,可以立即在搜索请求、聚合、过滤和排序中使用它。

如果将运行时字段设为索引字段,则无需修改任何引用运行时字段的查询。更好的是,您可以引用字段是运行时字段的一些索引,以及字段是索引字段的其他索引。您可以灵活地选择要索引哪些字段以及保留哪些字段作为运行时字段。

就其核心而言,运行时字段最重要的好处是能够在您提取字段后将字段添加到文档中。此功能简化了映射决策,因为您不必预先决定如何解析数据,并且可以使用运行时字段随时修改映射。使用运行时字段允许更小的索引和更快的摄取时间,这结合使用更少的资源并降低您的运营成本。

字段数据类型

在 Elasticsearch 中,每个字段都有一个字段数据类型或字段类型,用于指示字段包含的数据类型(例如字符串或布尔值)及其预期用途。字段类型按系列分组。同一族中的类型具有完全相同的搜索行为,但可能具有不同的空间使用或性能特征。

Elasticsearch 提供了非常丰富的数据类型,官方将其分为以下几类:

  • 普通类型

    • binary:编码为 Base64 字符串的二进制值。
    • boolean:布尔类型,值为 true 或 false。
    • Keywords:keyword 族类型,包括 keywordconstant_keywordwildcard
    • Numbers:数字类型,如 longdouble
    • Dates:日期类型,包括 datedate_nanos
    • alias:用于定义存在字段的别名。
  • 对象类型

    • object:JSON 对象
    • flattened:整个 JSON 对象作为单个字段值。
    • nested:保留其子字段之间关系的 JSON 对象。
    • join:为同一索引中的文档定义父/子关系。
  • 结构化数据类型

    • Range:范围类型,例如:long_rangedouble_rangedate_rangeip_range
    • ip:IPv4 和 IPv6 地址。
    • version:版本号。支持 Semantic Versioning 优先规则。
    • murmur3:计算并存储 hash 值。
  • 聚合数据类型

  • 文本搜索类型

  • 文档排名类型

    • dense_vector:记录浮点数的密集向量。
    • rank_feature:记录一个数字特征,为了在查询时提高命中率。
    • rank_features:记录多个数字特征,为了在查询时提高命中率。
  • 空间数据类型

  • 其他类型

元数据字段

一个文档中,不仅仅包含数据 ,也包含元数据。元数据是用于描述文档的信息。

  • 标识元数据字段
    • _index:文档所属的索引。
    • _id:文档的 ID。
  • 文档 source 元数据字段
    • _source:文档正文的原始 JSON。
    • _size_source 字段的大小(以字节为单位),由 mapper-size 插件提供。
  • 文档计数元数据字段
    • _doc_count:当文档表示预聚合数据时,用于存储文档计数的自定义字段。
  • 索引元数据字段
  • 路由元数据字段
    • _routing:将文档路由到特定分片的自定义路由值。
  • 其他元数据字段
    • _meta:应用程序特定的元数据。
    • _tier:文档所属索引的当前数据层首选项。

映射参数

Elasticsearch 提供了以下映射参数:

  • analyzer:指定在索引或搜索文本字段时用于文本分析的分析器。
  • coerce:如果开启,Elasticsearch 将尝试清理脏数据以适应字段的数据类型。
  • copy_to:允许将多个字段的值复制到一个组字段中,然后可以将其作为单个字段进行查询。
  • doc_values:默认情况下,所有字段都是被
  • dynamic:是否开启动态映射。
  • eager_global_ordinals:当在 global ordinals 的时候,refresh 以后下一次查询字典就需要重新构建,在追求查询的场景下很影响查询性能。可以使用 eager_global_ordinals,即在每次 refresh 以后即可更新字典,字典常驻内存,减少了查询的时候构建字典的耗时。
  • enabled:只能应用于顶级 mapping 定义和 object 字段。设置为 false 后,Elasticsearch 解析时,会完全跳过该字段。
  • fielddata:默认情况下, text 字段是可搜索的,但不可用于聚合、排序或脚本。如果为字段设置 fielddata=true,就会通过反转倒排索引将 fielddata 加载到内存中。请注意,这可能会占用大量内存。如果想对 text 字段进行聚合、排序或脚本操作,fielddata 是唯一方法。
  • fields:有时候,同一个字段需要以不同目的进行索引,此时可以通过 fields 进行配置。
  • format:用于格式化日期类型。
  • ignore_above:字符串长度大于 ignore_above 所设,则不会被索引或存储。
  • ignore_malformed:有时候,同一个字段,可能会存储不同的数据类型。默认情况下,Elasticsearch 解析字段数据类型失败时,会引发异常,并拒绝整个文档。 如果设置 ignore_malformedtrue,则允许忽略异常。这种情况下,格式错误的字段不会被索引,但文档中的其他字段可以正常处理。
  • index_options 用于控制将哪些信息添加到倒排索引以进行搜索和突出显示。只有 textkeyword 等基于术语(term)的字段类型支持此配置。
  • index_phrases:如果启用,两个词的组合(shingles)将被索引到一个单独的字段中。这允许以更大的索引为代价,更有效地运行精确的短语查询(无 slop)。请注意,当停用词未被删除时,此方法效果最佳,因为包含停用词的短语将不使用辅助字段,并将回退到标准短语查询。接受真或假(默认)。
  • index_prefixes:index_prefixes 参数启用 term 前缀索引以加快前缀搜索。
  • indexindex 选项控制字段值是否被索引。默认为 true。
  • meta:附加到字段的元数据。此元数据对 Elasticsearch 是不透明的,它仅适用于多个应用共享相同索引的元数据信息,例如:单位。
  • normalizerkeyword 字段的 normalizer 属性类似于 analyzer ,只是它保证分析链只产生单个标记。 normalizer 在索引 keyword 之前应用,以及在搜索时通过查询解析器(例如匹配查询)或通过术语级别查询(例如术语查询)搜索关键字字段时应用。
  • normsnorms 存储在查询时使用的各种规范化因子,以便计算文档的相关性评分。
  • null_value:null 值无法被索引和搜索。当一个字段被设为 null,则被视为没有值。null_value 允许将空值替换为指定值,以便对其进行索引和搜索。
  • position_increment_gap:分析的文本字段会考虑术语位置,以便能够支持邻近或短语查询。当索引具有多个值的文本字段时,值之间会添加一个“假”间隙,以防止大多数短语查询在值之间匹配。此间隙的大小使用 position_increment_gap 配置,默认为 100。
  • properties:类型映射、对象字段和嵌套字段包含的子字段,都称为属性。这些属性可以是任何数据类型,包括对象和嵌套。
  • search_analyzer:通常,在索引时和搜索时应使用相同的分析器,以确保查询中的术语与倒排索引中的术语格式相同。但是,有时在搜索时使用不同的分析器可能是有意义的,例如使用 edge_ngram 标记器实现自动补全或使用同义词搜索时。
  • similarity:Elasticsearch 允许为每个字段配置文本评分算法或相似度。相似度设置提供了一种选择文本相似度算法的简单方法,而不是默认的 BM25,例如布尔值。只有 textkeyword 等基于文本的字段类型支持此配置。
  • store:默认情况下,对字段值进行索引以使其可搜索,但不会存储它们。这意味着可以查询该字段,但无法检索原始字段值。通常这不重要,字段值已经是默认存储的 _source 字段的一部分。如果您只想检索单个字段或几个字段的值,而不是整个 _source,则可以通过 source filtering 来实现。
  • term_vector:term_vector 包含有关分析过程产生的术语的信息,包括:
    • 术语列表
    • 每个 term 的位置(或顺序)
    • 起始和结束字符偏移量,用于将 term 和原始字符串进行映射
    • 有效负载(如果可用) - 用户定义的,与 term 位置相关的二进制数据

映射配置

  • index.mapping.total_fields.limit:索引中的最大字段数。字段和对象映射以及字段别名计入此限制。默认值为 1000
  • index.mapping.depth.limit:字段的最大深度,以内部对象的数量来衡量。例如,如果所有字段都在根对象级别定义,则深度为 1。如果有一个对象映射,则深度为 2,以此类推。默认值为 20
  • index.mapping.nested_fields.limit:索引中不同 nested 映射的最大数量。 nested 类型只应在特殊情况下使用,即需要相互独立地查询对象数组。为了防止设计不佳的映射,此设置限制了每个索引的唯一 nested 类型的数量。默认值为 50
  • index.mapping.nested_objects.limit:单个文档中,所有 nested 类型中包含的最大嵌套 JSON 对象数。当文档包含太多 nested 对象时,此限制有助于防止出现内存溢出。默认值为 10000
  • index.mapping.field_name_length.limit:设置字段名称的最大长度。默认为 Long.MAX_VALUE(无限制)。

参考资料

《消息队列高手课》笔记

为什么需要消息队列?

消息队列的应用

  • 异步处理
    • 快速响应
    • 减少等待,提升性能
  • 流量控制
  • 服务解耦

该如何选择消息队列?

  • 是否开源:这决定了能否商用,所以最为重要。
  • 社区活跃度越高越好:高社区活跃度,一般保证了低 Bug 率,因为大部分 Bug,已经有人遇到并解决了。
  • 技术生态适配性:客户端对各种编程语言的支持。比如:如果使用 MQ 的都是 Java 应用,那么 ActiveMQ、RabbitMQ、RocketMQ、Kafka 都可以。如果需要支持其他语言,那么 RMQ 比较合适,因为它支持的编程语言比较丰富。如果 MQ 是应用于大数据或流式计算,那么 Kafka 几乎是标配。如果是应用于在线业务系统,那么 Kafka 就不合适了,可以考虑 RabbitMQ、 RocketMQ 很合适。
  • 高可用:应用于线上的准入标准。
  • 高性能:具备足够好的性能,能满足绝大多数场景的性能要求。
  • 业务场景的适应性:不同业务场景,会有不同的诉求,此时要根据不同 MQ 的特性针对性选择。

主流 MQ

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级 万级 十万级 十万级,略高于 RocketMQ
topic 数量对吞吐量的影响 topic 可以达到几百、几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 毫秒级 微秒级 毫秒级 毫秒级以内
可用性 高:基于主从架构实现高可用 同 ActiveMQ 非常高:分布式架构 非常高:分布式架构。每个数据都有多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到不丢失 同 RocketMQ
应用场景 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 适合在线业务 大数据、实时计算以及日志采集领域,应用最为广泛
流行度 不活跃 社区非常活跃 阿里出品,有非常活跃的中文社区 社区非常活跃
支持编程语言 非常多 Java Scala、Java
学习成本 采用 ErLang 开发,比较小众,不利于扩展和二次开发 采用 Java 开发,且贡献者多为中国人,容易读懂源码 使用 Scala 和 Java 开发,容易读懂源码

RabbitMQ

突出亮点

  1. 支持的编程语言最多
  2. 支持非常灵活的路由配置

明显短板

  1. 对消息堆积的支持并不好
  2. 性能差强人意

消息模型:主题和队列有什么区别?

队列模型

最初的消息队列,就是一个严格意义上的队列。在计算机领域,“队列(Queue)”是一种数据结构,有完整而严格的定义。

早期的消息队列,就是按照“队列”的数据结构来设计的。生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。

如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息。此时,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

发布 - 订阅模型(Publish-Subscribe Pattern)

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

队列模型和发布 - 订阅模型最大的区别就是:一份消息数据能不能被消费多次的问题

RabbitMQ 的消息模型

RabbitMQ,它是少数依然坚持使用队列模型的产品之一。那它是怎么解决多个消费者的问题呢?

在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。

同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能。

RocketMQ 的消息模型

RocketMQ 使用的消息模型是标准的发布 - 订阅模型

但是,在 RocketMQ 也有队列(Queue)这个概念,并且队列在 RocketMQ 中是一个非常重要的概念

几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。

如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。

这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。

也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。

每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

如何利用事务消息实现分布式事务?

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

  • Kafka 的解决方案是:直接抛出异常,让用户自行处理。用户可以在业务代码中反复重试提交,直到提交成功,或者删除之前修改的数据记录进行事务补偿。
  • RocketMQ 的解决方案是:通过事务反查机制来解决事务消息提交失败的问题。如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

RocketMQ 事务消息流程

基于 MQ 的分布式事务方案其实是对本地消息表的封装,将本地消息表基于 MQ 内部,其他方面的协议基本与本地消息表一致。下面主要基于 RocketMQ 4.3 之后的版本介绍 MQ 的分布式事务方案。

在本地消息表方案中,保证事务主动方发写业务表数据和写消息表数据的一致性是基于数据库事务,RocketMQ 的事务消息相对于普通 MQ,相对于提供了 2PC 的提交接口,方案如下:

正常情况——事务主动方发消息 这种情况下,事务主动方服务正常,没有发生故障,发消息流程如下:

img

  1. 发送方向 MQ 服务端(MQ Server)发送 half 消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(commit 或是 rollback)。
  5. MQ Server 收到 commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 rollback 状态则删除半消息,订阅方将不会接受该消息。

异常情况——事务主动方消息恢复 在断网或者应用重启等异常情况下,图中 4 提交的二次确认超时未到达 MQ Server,此时处理逻辑如下:

img

  1. MQ Server 对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认
  4. MQ Server 基于 commit / rollback 对消息进行投递或者删除

思考:为什么不等待写业务表成功后再向消息队列发送提交消息呢?

因为可能存在这样情况:写业务表成功了,但是还没来得及发消息,节点就宕机了。

MQ 事务方案总结

相比本地消息表方案,MQ 事务方案优点是:

  • 消息数据独立存储 ,降低业务系统与消息系统之间的耦合。
  • 吞吐量优于使用本地消息表方案。

缺点是:

  • 一次消息发送需要两次网络请求(half 消息 + commit/rollback 消息)
  • 业务处理服务需要实现消息状态回查接口

如何确保消息不会丢失?

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。
  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

如何处理消费过程中的重复消息?

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。

如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。

常用的设计幂等操作的方法:

  1. 利用数据库的唯一约束实现幂等
  2. 为更新的数据设置前置条件:设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。
  3. 记录并检查操作:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。——此处涉及分布式 ID 知识点,可以使用类似 GUID、雪花算法 等方式来实现

消息积压了该如何处理?

在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于消息队列本身的性能,不需要太关注。

主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。

而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。

发送端性能优化

发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。

消费端性能优化

如果消费的速度跟不上生产消息的速度,就会造成消息积压。即供大于求。

一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。

消息积压的处理

需要先分析消息积压的原因:是发送变快了,还是消费变慢了。大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。

如果是因为促销或抢购等原因,导致消息陡增,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

学习开源代码该如何入手?

(1)先看官方文档,了解关键点:

  • 这个项目是什么
  • 这个项目有什么用
  • 这个项目如何使用
  • 这个项目适用于什么场景
  • 这个项目有哪些优点、缺点
  • 。。。

(2)由点及面的阅读源码

不要泛泛而读,容易迷失。

最好带着目的性,带着问题去阅读源码,最好是带着问题的答案去读源码

如何使用异步设计提升系统性能?

异步编程,可以减少或者避免线程等待,从而提高处理速度。但是,其增加了程序复杂度,应酌情使用。

如何实现高性能的异步网络传输?

系统一般可以分为 IO 密集型应用和计算密集型应用。

大多数业务系统都属于 IO 密集型应用。最常用的 IO 资源有磁盘 IO 和带宽 IO。由于 IO 相较于内存计算,耗时较高,所以往往成为性能优化的关键。

提升 IO 效率的关键在于减少 IO 等待时间,在大量连接请求的时候,如果单线程,显然阻塞时间较长,所以,一般应采用并发 IO 模型。但是,线程数过多时,线程本身造成的 CPU 上下文切换,竞态造成的冲突都会造成额外的开销,导致 CPU 负载升高,从而降低系统整体性能。所以,理想的 IO 模型应该是一个能够复用少量线程的并发 IO 模型。这个模型的当前答案就是 NIO,其最具代表性的框架就是 Netty。其核心原理就是通过多路复用,来提升 IO 效率。

序列化与反序列化:如何通过网络传输结构化的数据?

传输协议:应用程序之间对话的语言

传输协议的目的,在于定义一种信息规则,使得收发双方能够互相交流。传输协议并没有什么必须遵循的规范,能满足需要即可。复杂的协议可以如网络协议报文一样,定义为 TLV 结构。

内存管理:如何避免内存溢出和频繁的垃圾回收?

Kafka 如何实现高性能 IO?

使用批量消息提升服务端处理能力

使用顺序读写提升磁盘 IO 性能

利用 PageCache 加速消息读写

  • PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存。调用系统的 API 读写文件的时候,不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
  • 应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况。一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
  • 用户的应用程序在使用完某块 PageCache 后,操作系统并不会立刻就清除这个 PageCache,而是尽可能地利用空闲的物理内存保存这些 PageCache,除非系统内存不够用,操作系统才会清理掉一部分 PageCache。清理的策略一般是 LRU 或它的变种算法,这个算法我们不展开讲,它保留 PageCache 的逻辑是:优先保留最近一段时间最常使用的那些 PageCache。
  • Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能。

零拷贝技术

在服务端,处理消费的大致逻辑是这样的:

  • 首先,从文件中找到消息数据,读到内存中;
  • 然后,把消息通过网络发给客户端。

这个过程中,数据实际上做了 2 次或者 3 次复制:

  1. 从文件复制数据到 PageCache 中,如果命中 PageCache,这一步可以省掉;
  2. 从 PageCache 复制到应用程序的内存空间中,也就是我们可以操作的对象所在的内存;
  3. 从应用程序的内存空间复制到 Socket 的缓冲区,这个过程就是我们调用网络应用框架的 API 发送数据的过程。

Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的 2、3 步骤两次复制合并成一次复制。直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

缓存策略:如何使用缓存来减少磁盘 IO?

如何正确使用锁保护共享数据,协调异步线程?

如何用硬件同步原语(CAS)替代锁?

数据压缩:时间换空间的游戏

数据压缩不仅能节省存储空间,还可以用于提升网络传输性能。

压缩和解压的操作都是计算密集型的操作,非常耗费 CPU 资源。如果你的应用处理业务逻辑就需要耗费大量的 CPU 资源,就不太适合再进行压缩和解压。数据压缩,它本质上是用 CPU 资源换取存储资源,或者说是用压缩解压的时间来换取存储的空间,这个买卖是不是划算,需要根据实际情况先衡量一下。

目前常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4 等等。在选择压缩算法的时候,需要综合考虑压缩时间和压缩率两个因素,被压缩数据的内容也是影响压缩时间和压缩率的重要因素,必要的时候可以先用业务数据做一个压缩测试,这样有助于选择最合适的压缩算法。一般来说,压缩率越高的算法,压缩耗时也越高。如果是对性能要求高的系统,可以选择压缩速度快的算法,比如 LZ4;如果需要更高的压缩比,可以考虑 GZIP 或者压缩率更高的 XZ 等算法。

另外一个影响压缩率的重要因素是压缩分段的大小,你需要根据业务情况选择一个合适的分段策略,在保证不错的压缩率的前提下,尽量减少解压浪费。

Kafka 在生产者上,对每批消息进行压缩,批消息在服务端不解压,消费者在收到消息之后再进行解压。简单地说,Kafka 的压缩和解压都是在客户端完成的。

RocketMQ Producer 源码分析:消息生产的实现过程

Producer 中包含的几个核心的服务都是有状态的,在 Producer 启动时,在 MQClientInstance 这个类中来统一来启动。在发送消息的流程中,RocketMQ 分了三种发送方式:单向、同步和异步,这三种发送方式对应的发送流程基本是相同的,同步和异步发送是由已经封装好的 MQClientAPIImpl 类来分别实现的。

对于我们在分析代码中提到的几个重要的业务逻辑实现类,你最好能记住这几个类和它的功能,包括 :DefaultMQProducerImpl 封装了大部分 Producer 的业务逻辑,MQClientInstance 封装了客户端一些通用的业务逻辑,MQClientAPIImpl 封装了客户端与服务端的 RPC,NettyRemotingClient 实现了底层网络通信。

Kafka Consumer 源码分析:消息消费的实现过程

Kafka 消费模型的几个要点:

  • Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
  • 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
  • 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
  • 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更是,会触发 reblance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
  • Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。

发送请求时,构建 Request 对象,暂存入发送队列,但不立即发送,而是等待合适的时机批量发送。并且,用回调或者 RequestFeuture 方式,预先定义好如何处理响应的逻辑。在收到 Broker 返回的响应之后,也不会立即处理,而是暂存在队列中,择机处理。那这个择机策略就比较复杂了,有可能是需要读取响应的时候,也有可能是缓冲区满了或是时间到了,都有可能触发一次真正的网络请求,也就是在 poll() 方法中发送所有待发送 Request 并处理所有 Response。

Kafka 和 RocketMQ 的消息复制实现的差异点在哪?

如果要确保数据一致性,必须采用“主 - 从”的复制方式。

在“主 - 从”模式下,数据先写入到主节点上,从节点只从主节点上复制数据,如果出现主从数据不一致的情况,必须以主节点上的数据为准。

RocketMQ 如何实现复制

RocketMQ 提供新、老两种复制方式:传统的主从模式和新的基于 Dledger 的复制方式。传统的主从模式性能更好,但灵活性和可用性稍差,而基于 Dledger 的复制方式,在 Broker 故障的时候可以自动选举出新节点,可用性更好,性能稍差,并且资源利用率更低一些。

RocketMQ 引入 Dledger,通过 Dledger 来完成复制。Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

Kafka 是如何实现复制的

Kafka 中,复制的基本单位是分区。每个分区的几个副本之间,构成一个小的复制集群,Broker 只是这些分区副本的容器,所以 Kafka 的 Broker 是不分主从的。

分区的多个副本中也是采用一主多从的方式。Kafka 在写入消息的时候,采用的也是异步复制的方式。消息在写入到主节点之后,并不会马上返回写入成功,而是等待足够多的节点都复制成功后再返回。Kafka 为这个“足够多”创造了一个专有名词:ISR(In Sync Replicas),翻译过来就是“保持数据同步的副本”。ISR 的数量是可配的,但需要注意的是,这个 ISR 中是包含主节点的。

Kafka 使用 ZooKeeper 来监控每个分区的多个节点,如果发现某个分区的主节点宕机了,Kafka 会利用 ZooKeeper 来选出一个新的主节点,这样解决了可用性的问题。选举的时候,会从所有 ISR 节点中来选新的主节点,这样可以保证数据一致性。

RocketMQ 客户端如何在集群中找到正确的节点?

NameServer 在集群中起到的一个核心作用就是,为客户端提供路由信息,帮助客户端找到对应的 Broker。每个 NameServer 节点上都保存了集群所有 Broker 的路由信息,可以独立提供服务。Broker 会与所有 NameServer 节点建立长连接,定期上报 Broker 的路由信息。客户端会选择连接某一个 NameServer 节点,定期获取订阅主题的路由信息,用于 Broker 寻址。

不仅仅是 RocketMQ,任何一个弹性分布式集群,都需要一个类似于 NameServer 服务,来帮助访问集群的客户端寻找集群中的节点,这个服务一般称为 NamingService。

在 RocketMQ 中,NameServer 是一个独立的进程,为 Broker、生产者和消费者提供服务。NameServer 最主要的功能就是,为客户端提供寻址服务,协助客户端找到主题对应的 Broker 地址。此外,NameServer 还负责监控每个 Broker 的存活状态。

NameServer 支持只部署一个节点,也支持部署多个节点组成一个集群,这样可以避免单点故障。在集群模式下,NameServer 各节点之间是不需要任何通信的,也不会通过任何方式互相感知,每个节点都可以独立提供全部服务。

NameServer 的总体结构

  • NamesrvStartup:程序入口。
  • NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
  • RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息。
  • BrokerHousekeepingService:监控 Broker 连接状态的代理类。
  • DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
  • ClusterTestRequestProcessor:用于测试的请求处理器。

NameServer 的所有核心功能都是在 RouteInfoManager 这个类中实现的。RouteInfoManager 这个类中保存了所有的路由信息,这些路由信息都是保存在内存中,并且没有持久化的。

1
2
3
4
5
6
7
8
9
public class BrokerData implements Comparable<BrokerData> {
// ...
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// ...
}

Kafka 的协调服务 ZooKeeper:实现分布式系统的“瑞士军刀”

ZooKeeper 是一个分布式的协调服务,它的核心服务是一个高可用、高可靠的一致性存储,在此基础上,提供了包括读写元数据、节点监控、选举、节点间通信和分布式锁等很多功能,这些功能可以极大方便我们快速开发一个分布式的集群系统。

ZooKeeper 的使用注意点:

  1. 不要往 ZooKeeper 里面写入大量数据,它不是一个真正意义上的存储系统,只适合存放少量的数据。依据服务器配置的不同,ZooKeeper 在写入超过几百 MB 数据之后,性能和稳定性都会严重下降。
  2. 不要让业务集群的可用性依赖于 ZooKeeper 的可用性,什么意思呢?你的系统可以使用 Zookeeper,但你要留一手,要考虑如果 Zookeeper 集群宕机了,你的业务集群最好还能提供服务。因为 ZooKeeper 的选举过程是比较慢的,而它对网络的抖动又比较敏感,一旦触发选举,这段时间内的 ZooKeeper 是不能提供任何服务的。

Kafka 主要使用 ZooKeeper 来保存它的元数据、监控 Broker 和分区的存活状态,并利用 ZooKeeper 来进行选举。

Kafka 在 ZooKeeper 中保存的元数据,主要就是 Broker 的列表和主题分区信息两棵树。这份元数据同时也被缓存到每一个 Broker 中。客户端并不直接和 ZooKeeper 来通信,而是在需要的时候,通过 RPC 请求去 Broker 上拉取它关心的主题的元数据,然后保存到客户端的元数据缓存中,以便支撑客户端生产和消费

RocketMQ 与 Kafka 中如何实现事务?

Kafka 和 RocketMQ 都是基于两阶段提交来实现的事务,都利用了特殊的主题中的队列和分区来记录事务日志。

不同之处在于对处于事务中的消息的处理方式,RocketMQ 是把这些消息暂存在一个特殊的队列中,待事务提交后再移动到业务队列中;而 Kafka 直接把消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。

RocketMQ 和 Kafka 的事务,它们的适用场景是不一样的,RocketMQ 的事务适用于解决本地事务和发消息的数据一致性问题,而 Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。

MQTT 协议:如何支持海量的在线 IoT 设备?

MQTT 是专门为物联网设备设计的一套标准的通信协议。这套协议在消息模型和功能上与普通的消息队列协议是差不多的,最大的区别在于应用场景不同。在物联网应用场景中,IoT 设备性能差,网络连接不稳定。服务端面临的挑战主要是,需要支撑海量的客户端和主题。

已有的开源的 MQTT 产品,对于协议的支持都不错,在客户端数量小于十万级别的情况下,可以选择。对于海量客户端的场景,服务端必须使用集群来支撑,可以选择收费的云服务和企业版产品。也可以选择自行来构建 MQTT 集群。

自行构建集群,最关键的技术点就是,通过前置的 Proxy 集群来解决海量连接、会话管理和海量主题这三个问题。前置 Proxy 负责在 Broker 和客户端之间转发消息,通过这种方式,将海量客户端连接收敛为少量的 Proxy 与 Broker 之间的连接,解决了海量客户端连接数的问题。维护会话的实现原理,和 Tomcat 维护 HTTP 会话是一样的。对于海量主题,可以在后端部署多组 Broker 小集群,每个小集群分担一部分主题这样的方式来解决。

Pulsar 的存储计算分离设计:全新的消息队列设计思路

Pulsar 和其他消息队列最大的区别是,它采用了存储计算分离的设计。存储消息的职责从 Broker 中分离出来,交给专门的 BookKeeper 存储集群。这样 Broker 就变成了无状态的节点,在集群调度和故障恢复方面更加简单灵活。

存储计算分离是一种设计思想,它将系统的存储职责和计算职责分离开,存储节点只负责数据存储,而计算节点只负责计算,计算节点是无状态的。无状态的计算节点,具有易于开发、调度灵活的优点,故障转移和恢复也更加简单快速。这种设计的缺点是,系统总体的复杂度更高,性能也更差。不过对于大部分分布式的业务系统来说,由于它不需要自己开发存储系统,采用存储计算分离的设计,既可以充分利用这种设计的优点,整个系统也不会因此变得过于复杂,综合评估优缺点,利大于弊,更加划算。

Flink 分析计算任务之后生成 JobGraph,JobGraph 是一个有向无环图,数据流过这个图中的节点,在每个节点进行计算和变换,最终流出有向无环图就完成了计算。JobGraph 中的每个节点是一个 Task,Task 是可以并行执行的,每个线程就是一个 SubTask。SubTask 被 JobManager 分配给某个 TaskManager,在 TaskManager 进程中的一个线程中执行。

流计算与消息(二):在流计算中使用 Kafka 链接计算任务

端到端 Exactly Once 语义,可以保证在分布式系统中,每条数据不多不少只被处理一次。在流计算中,因为数据重复会导致计算结果错误,所以 Exactly Once 在流计算场景中尤其重要。Kafka 和 Flink 都提供了保证 Exactly Once 的特性,配合使用可以实现端到端的 Exactly Once 语义。

在 Flink 中,如果节点出现故障,可以自动重启计算任务,重新分配计算节点来保证系统的可用性。配合 CheckPoint 机制,可以保证重启后任务的状态恢复到最后一次 CheckPoint,然后从 CheckPoint 中记录的恢复位置继续读取数据进行计算。Flink 通过一个巧妙的 Barrier 使 CheckPoint 中恢复位置和各节点状态完全对应。

Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。在配合 Flink 的时候,每个 Flink 的 CheckPoint 对应一个 Kafka 事务,只要保证 CheckPoint 和 Kafka 事务同步提交就可以实现端到端的 Exactly Once,Flink 通过“二阶段提交”这个分布式事务的经典算法来保证 CheckPoint 和 Kafka 事务状态的一致性。

主流消息队列都是如何存储消息的?

在所有的存储系统中,消息队列的存储可能是最简单的。每个主题包含若干个分区,每个分区其实就是一个 WAL(Write Ahead Log),写入的时候只能尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。

Kafka 存储消息结构

Kafka 的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。

每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。

写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。

可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。

RocketMQ 存储消息结构

RocketMQ 的存储以 Broker 为单位。它的存储也是分为消息文件和索引文件,但是在 RocketMQ 中,每个 Broker 只有一组消息文件,它把在这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在 RocketMQ 中称为 ConsumerQueue。RocketMQ 中的索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。

写入消息的时候,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比 Kafka 的查找是要快的。

Kafka 和 RocketMQ 的存储结构比较

对比这两种存储结构,你可以看到它们有很多共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第一条消息的索引序号,索引中记录了消息的位置等等。

在消息文件的存储粒度上,Kafka 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ 以 Broker 为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。

大多数场景下,这两种存储设计的差异其实并不明显,都可以满足需求。但是在某些极限场景下,依然会体现出它们设计的差异。比如,在一个 Broker 上有上千个活动主题的情况下,RocketMQ 的写入性能就会体现出优势。再比如,如果我们的消息都是几个、十几个字节的小消息,但是消息的数量很多,这时候 Kafka 的稀疏索引设计就能节省非常多的存储空间。

参考资料

XXX

简介

什么是 XXX

XXX 有什么用

XXX 原理

参考资料

源码级深度理解 Java SPI

SPI 简介

SPI 全称 Service Provider Interface,是 Java 提供的,旨在由第三方实现或扩展的 API,它是一种用于动态加载服务的机制。Java 中 SPI 机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是 解耦

Java SPI 有四个要素:

  • SPI 接口:为服务提供者实现类约定的的接口或抽象类。
  • SPI 实现类:实际提供服务的实现类。
  • SPI 配置:Java SPI 机制约定的配置文件,提供查找服务实现类的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。
  • **ServiceLoader**:Java SPI 的核心类,用于加载 SPI 实现类。 ServiceLoader 中有各种实用方法来获取特定实现、迭代它们或重新加载服务。

SPI 示例

正所谓,实践出真知,我们不妨通过一个具体的示例来看一下,如何使用 Java SPI。

SPI 接口

首先,需要定义一个 SPI 接口,和普通接口并没有什么差别。

1
2
3
4
5
package io.github.dunwu.javacore.spi;

public interface DataStorage {
String search(String key);
}

SPI 实现类

假设,我们需要在程序中使用两种不同的数据存储——Mysql 和 Redis。因此,我们需要两个不同的实现类去分别完成相应工作。

Mysql 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class MysqlStorage implements DataStorage {
@Override
public String search(String key) {
return "【Mysql】搜索" + key + ",结果:No";
}
}

Redis 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class RedisStorage implements DataStorage {
@Override
public String search(String key) {
return "【Redis】搜索" + key + ",结果:Yes";
}
}

到目前为止,定义接口,并实现接口和普通的 Java 接口实现没有任何不同。

SPI 配置

如果想通过 Java SPI 机制来发现服务,就需要在 SPI 配置中约定好发现服务的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。以本示例代码为例,其文件名应该为 io.github.dunwu.javacore.spi.DataStorage,文件中的内容如下:

1
2
io.github.dunwu.javacore.spi.MysqlStorage
io.github.dunwu.javacore.spi.RedisStorage

ServiceLoader

完成了上面的步骤,就可以通过 ServiceLoader 来加载服务。示例如下:

1
2
3
4
5
6
7
8
9
10
11
import java.util.ServiceLoader;

public class SpiDemo {

public static void main(String[] args) {
ServiceLoader<DataStorage> serviceLoader = ServiceLoader.load(DataStorage.class);
System.out.println("============ Java SPI 测试============");
serviceLoader.forEach(loader -> System.out.println(loader.search("Yes Or No")));
}

}

输出:

1
2
3
============ Java SPI 测试============
【Mysql】搜索Yes Or No,结果:No
【Redis】搜索Yes Or No,结果:Yes

SPI 原理

上文中,我们已经了解 Java SPI 的要素以及使用 Java SPI 的方法。你有没有想过,Java SPI 和普通 Java 接口有何不同,Java SPI 是如何工作的。实际上,Java SPI 机制依赖于 ServiceLoader 类去解析、加载服务。因此,掌握了 ServiceLoader 的工作流程,就掌握了 SPI 的原理。ServiceLoader 的代码本身很精练,接下来,让我们通过走读源码的方式,逐一理解 ServiceLoader 的工作流程。

ServiceLoader 的成员变量

先看一下 ServiceLoader 类的成员变量,大致有个印象,后面的源码中都会使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ServiceLoader<S> implements Iterable<S> {

// SPI 配置文件目录
private static final String PREFIX = "META-INF/services/";

// 将要被加载的 SPI 服务
private final Class<S> service;

// 用于加载 SPI 服务的类加载器
private final ClassLoader loader;

// ServiceLoader 创建时的访问控制上下文
private final AccessControlContext acc;

// SPI 服务缓存,按实例化的顺序排列
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// 懒查询迭代器
private LazyIterator lookupIterator;

// ...
}

ServiceLoader 的工作流程

(1)ServiceLoader.load 静态方法

应用程序加载 Java SPI 服务,都是先调用 ServiceLoader.load 静态方法。ServiceLoader.load 静态方法的作用是:

  1. 指定类加载 ClassLoader 和访问控制上下文;
  2. 然后,重新加载 SPI 服务
    1. 清空缓存中所有已实例化的 SPI 服务
    2. 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器

这里,摘录 ServiceLoader.load 相关源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// service 传入的是期望加载的 SPI 接口类型
// loader 是用于加载 SPI 服务的类加载器
public static <S> ServiceLoader<S> load(Class<S> service,
ClassLoader loader)
{
return new ServiceLoader<>(service, loader);
}

public void reload() {
// 清空缓存中所有已实例化的 SPI 服务
providers.clear();
// 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器
lookupIterator = new LazyIterator(service, loader);
}

// 私有构造方法
// 重新加载 SPI 服务
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
// 指定类加载 ClassLoader 和访问控制上下文
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
// 然后,重新加载 SPI 服务
reload();
}

(2)应用程序通过 ServiceLoaderiterator 方法遍历 SPI 实例

ServiceLoader 的类定义,明确了 ServiceLoader 类实现了 Iterable<T> 接口,所以,它是可以迭代遍历的。实际上,ServiceLoader 类维护了一个缓存 providers( LinkedHashMap 对象),缓存 providers 中保存了已经被成功加载的 SPI 实例,这个 Map 的 key 是 SPI 接口实现类的全限定名,value 是该实现类的一个实例对象。

当应用程序调用 ServiceLoaderiterator 方法时,ServiceLoader 会先判断缓存 providers 中是否有数据:如果有,则直接返回缓存 providers 的迭代器;如果没有,则返回懒加载迭代器的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Iterator<S> iterator() {
return new Iterator<S>() {

// 缓存 SPI providers
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();

// lookupIterator 是 LazyIterator 实例,用于懒加载 SPI 实例
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}

public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

(3)懒加载迭代器的工作流程

上面的源码中提到了,lookupIterator 是 LazyIterator 实例,而 LazyIterator 用于懒加载 SPI 实例。那么, LazyIterator 是如何工作的呢?

这里,摘取 LazyIterator 关键代码

  • hasNextService 方法:
    1. 拼接 META-INF/services/ + SPI 接口全限定名
    2. 通过类加载器,尝试加载资源文件
    3. 解析资源文件中的内容,获取 SPI 接口的实现类的全限定名 nextName
  • nextService 方法:
    1. hasNextService() 方法解析出了 SPI 实现类的的全限定名 nextName,通过反射,获取 SPI 实现类的类定义 Class<?>
    2. 然后,尝试通过 Class<?>newInstance 方法实例化一个 SPI 服务对象。如果成功,则将这个对象加入到缓存 providers 中并返回该对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 1.拼接 META-INF/services/ + SPI 接口全限定名
// 2.通过类加载器,尝试加载资源文件
// 3.解析资源文件中的内容
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}

private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a s");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

SPI 和类加载器

通过上面两个章节中,走读 ServiceLoader 代码,我们已经大致了解 Java SPI 的工作原理,即通过 ClassLoader 加载 SPI 配置文件,解析 SPI 服务,然后通过反射,实例化 SPI 服务实例。我们不妨思考一下,为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?

学习过 JVM 的读者,想必都了解过类加载器的双亲委派模型(Parents Delegation Model)。双亲委派模型要求除了顶层的 BootstrapClassLoader 外,其余的类加载器都应有自己的父类加载器。这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现。双亲委派继承体系图如下:

img

双亲委派机制约定了:一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

双亲委派的好处:使得 Java 类伴随着它的类加载器,天然具备一种带有优先级的层次关系,从而使得类加载得到统一,不会出现重复加载的问题:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

双亲委派的限制:子类加载器可以使用父类加载器已经加载的类,而父类加载器无法使用子类加载器已经加载的。——这就导致了双亲委派模型并不能解决所有的类加载器问题。Java SPI 就面临着这样的问题:

  • SPI 的接口是 Java 核心库的一部分,是由 BootstrapClassLoader 加载的;
  • 而 SPI 实现的 Java 类一般是由 AppClassLoader 来加载的。BootstrapClassLoader 是无法找到 SPI 的实现类的,因为它只加载 Java 的核心库。它也不能代理给 AppClassLoader,因为它是最顶层的类加载器。这也解释了本节开始的问题——为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?因为如果不指定 ClassLoader,则无法获取 SPI 服务。

如果不做任何的设置,Java 应用的线程的上下文类加载器默认就是 AppClassLoader。在核心类库使用 SPI 接口时,传递的类加载器使用线程上下文类加载器,就可以成功的加载到 SPI 实现的类。线程上下文类加载器在很多 SPI 的实现中都会用到。

通常可以通过 Thread.currentThread().getClassLoader()Thread.currentThread().getContextClassLoader() 获取线程上下文类加载器。

Java SPI 的不足

Java SPI 存在一些不足:

  • 不能按需加载,需要遍历所有的实现,并实例化,然后在循环中才能找到我们需要的实现。如果不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。

  • 获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。

  • 多个并发多线程使用 ServiceLoader 类的实例是不安全的。

SPI 应用场景

SPI 在 Java 开发中应用十分广泛。首先,在 Java 的 java.util.spi package 中就约定了很多 SPI 接口。下面,列举一些 SPI 接口:

除此以外,SPI 还有很多应用,下面列举几个经典案例。

SPI 应用案例之 JDBC DriverManager

作为 Java 工程师,尤其是 CRUD 工程师,相必都非常熟悉 JDBC。众所周知,关系型数据库有很多种,如:Mysql、Oracle、PostgreSQL 等等。JDBC 如何识别各种数据库的驱动呢?

创建数据库连接

我们先回顾一下,JDBC 如何创建数据库连接的呢?

JDBC4.0 之前,连接数据库的时候,通常会用 Class.forName(XXX) 方法来加载数据库相应的驱动,然后再获取数据库连接,继而进行 CRUD 等操作。

1
Class.forName("com.mysql.jdbc.Driver")

而 JDBC4.0 之后,不再需要用 Class.forName(XXX) 方法来加载数据库驱动,直接获取连接就可以了。显然,这种方式很方便,但是如何做到的呢?

  • JDBC 接口:首先,Java 中内置了接口 java.sql.Driver

  • JDBC 接口实现:各个数据库的驱动自行实现 java.sql.Driver 接口,用于管理数据库连接。

    • Mysql:在 mysql 的 Java 驱动包 mysql-connector-java-XXX.jar 中,可以找到 META-INF/services 目录,该目录下会有一个名字为java.sql.Driver 的文件,文件内容是 com.mysql.cj.jdbc.Drivercom.mysql.cj.jdbc.Driver 正是 Mysql 版的 java.sql.Driver 实现。如下图所示:

    • PostgreSQL 实现:在 PostgreSQL 的 Java 驱动包 postgresql-42.0.0.jar 中,也可以找到同样的配置文件,文件内容是 org.postgresql.Driverorg.postgresql.Driver 正是 PostgreSQL 版的 java.sql.Driver 实现。
  • 创建数据库连接

    以 Mysql 为例,创建数据库连接代码如下:

    1
    2
    final String DB_URL = String.format("jdbc:mysql://%s:%s/%s", DB_HOST, DB_PORT, DB_SCHEMA);
    connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);

DriverManager

从前文,我们已经知道 DriverManager 是创建数据库连接的关键。它究竟是如何工作的呢?

可以看到是加载实例化驱动的,接着看 loadInitialDrivers 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private static void loadInitialDrivers() {
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// 通过 classloader 获取所有实现 java.sql.Driver 的驱动类
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// 利用 SPI,记载所有 Driver 服务
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
// 获取迭代器
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try{
// 遍历迭代器
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});

// 打印数据库驱动信息
println("DriverManager.initialize: jdbc.drivers = " + drivers);

if (drivers == null || drivers.equals("")) {
return;
}
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
// 尝试实例化驱动
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}

上面的代码主要步骤是:

  1. 从系统变量中获取驱动的实现类。
  2. 利用 SPI 来获取所有驱动的实现类。
  3. 遍历所有驱动,尝试实例化各个实现类。
  4. 根据第 1 步获取到的驱动列表来实例化具体的实现类。

需要关注的是下面这行代码:

1
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);

这里实际获取的是 java.util.ServiceLoader.LazyIterator 迭代器。调用其 hasNext 方法时,会搜索 classpath 下以及 jar 包中的 META-INF/services 目录,查找 java.sql.Driver 文件,并找到文件中的驱动实现类的全限定名。调用其 next 方法时,会根据驱动类的全限定名去尝试实例化一个驱动类的对象。

SPI 应用案例之 Common-Logging

common-logging(也称 Jakarta Commons Logging,缩写 JCL)是常用的日志门面工具包。

common-logging 的核心类是入口是 LogFactoryLogFatory 是一个抽象类,它负责加载具体的日志实现。

其入口方法是 LogFactory.getLog 方法,源码如下:

1
2
3
4
5
6
7
public static Log getLog(Class clazz) throws LogConfigurationException {
return getFactory().getInstance(clazz);
}

public static Log getLog(String name) throws LogConfigurationException {
return getFactory().getInstance(name);
}

从以上源码可知,getLog 采用了工厂设计模式,是先调用 getFactory 方法获取具体日志库的工厂类,然后根据类名称或类型创建日志实例。

LogFatory.getFactory 方法负责选出匹配的日志工厂,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public static LogFactory getFactory() throws LogConfigurationException {
// 省略...

// 加载 commons-logging.properties 配置文件
Properties props = getConfigurationFile(contextClassLoader, FACTORY_PROPERTIES);

// 省略...

// 决定创建哪个 LogFactory 实例
// (1)尝试读取全局属性 org.apache.commons.logging.LogFactory
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for system property [" + FACTORY_PROPERTY +
"] to define the LogFactory subclass to use...");
}

try {
// 如果指定了 org.apache.commons.logging.LogFactory 属性,尝试实例化具体实现类
String factoryClass = getSystemProperty(FACTORY_PROPERTY, null);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class '" + factoryClass +
"' as specified by system property " + FACTORY_PROPERTY);
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No system property [" + FACTORY_PROPERTY + "] defined.");
}
}
} catch (SecurityException e) {
// 异常处理
} catch (RuntimeException e) {
// 异常处理
}

// (2)利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 实现类
if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for a resource file of name [" + SERVICE_ID +
"] to define the LogFactory subclass to use...");
}
try {
final InputStream is = getResourceAsStream(contextClassLoader, SERVICE_ID);

if( is != null ) {
// This code is needed by EBCDIC and other strange systems.
// It's a fix for bugs reported in xerces
BufferedReader rd;
try {
rd = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
rd = new BufferedReader(new InputStreamReader(is));
}

String factoryClassName = rd.readLine();
rd.close();

if (factoryClassName != null && ! "".equals(factoryClassName)) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class " +
factoryClassName +
" as specified by file '" + SERVICE_ID +
"' which was present in the path of the context classloader.");
}
factory = newFactory(factoryClassName, baseClassLoader, contextClassLoader );
}
} else {
// is == null
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No resource file with name '" + SERVICE_ID + "' found.");
}
}
} catch (Exception ex) {
// note: if the specified LogFactory class wasn't compatible with LogFactory
// for some reason, a ClassCastException will be caught here, and attempts will
// continue to find a compatible class.
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] A security exception occurred while trying to create an" +
" instance of the custom factory class" +
": [" + trim(ex.getMessage()) +
"]. Trying alternative implementations...");
}
// ignore
}
}

// (3)尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性

if (factory == null) {
if (props != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Looking in properties file for entry with key '" + FACTORY_PROPERTY +
"' to define the LogFactory subclass to use...");
}
String factoryClass = props.getProperty(FACTORY_PROPERTY);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Properties file specifies LogFactory subclass '" + factoryClass + "'");
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);

// TODO: think about whether we need to handle exceptions from newFactory
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Properties file has no entry specifying LogFactory subclass.");
}
}
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No properties file available to determine" + " LogFactory subclass from..");
}
}
}

// (4)以上情况都不满足,实例化默认实现类 org.apache.commons.logging.impl.LogFactoryImpl

if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Loading the default LogFactory implementation '" + FACTORY_DEFAULT +
"' via the same classloader that loaded this LogFactory" +
" class (ie not looking in the context classloader).");
}

factory = newFactory(FACTORY_DEFAULT, thisClassLoader, contextClassLoader);
}

if (factory != null) {
/**
* Always cache using context class loader.
*/
cacheFactory(contextClassLoader, factory);

if (props != null) {
Enumeration names = props.propertyNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
String value = props.getProperty(name);
factory.setAttribute(name, value);
}
}
}

return factory;
}

从 getFactory 方法的源码可以看出,其核心逻辑分为 4 步:

  1. 首先,尝试查找全局属性 org.apache.commons.logging.LogFactory,如果指定了具体类,尝试创建实例。
  2. 利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 的实现类。
  3. 尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性,如果指定了具体类,尝试创建实例。
  4. 以上情况如果都不满足,则实例化默认实现类,即 org.apache.commons.logging.impl.LogFactoryImpl

SPI 应用案例之 Spring Boot

Spring Boot 是基于 Spring 构建的框架,其设计目的在于简化 Spring 应用的配置、运行。在 Spring Boot 中,大量运用了自动装配来尽可能减少配置。

下面是一个 Spring Boot 入口示例,可以看到,代码非常简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@GetMapping("/hello")
public String hello(@RequestParam(value = "name", defaultValue = "World") String name) {
return String.format("Hello %s!", name);
}
}

那么,Spring Boot 是如何做到寥寥几行代码,就可以运行一个 Spring Boot 应用的呢。我们不妨带着疑问,从源码入手,一步步探究其原理。

@SpringBootApplication 注解

首先,Spring Boot 应用的启动类上都会标记一个 @SpringBootApplication 注解。@SpringBootApplication 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}
)
public @interface SpringBootApplication {
// 略
}

除了 @Target@Retention@Documented@Inherited 这几个元注解, @SpringBootApplication 注解的定义中还标记了 @SpringBootConfiguration@EnableAutoConfiguration@ComponentScan 三个注解。

@SpringBootConfiguration 注解

@SpringBootConfiguration 注解的定义来看,@SpringBootConfiguration 注解本质上就是一个 @Configuration 注解,这意味着被@SpringBootConfiguration 注解修饰的类会被 Spring Boot 识别为一个配置类。

1
2
3
4
5
6
7
8
9
10
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
public @interface SpringBootConfiguration {
@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

@EnableAutoConfiguration 注解

@EnableAutoConfiguration 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

Class<?>[] exclude() default {};

String[] excludeName() default {};
}

@EnableAutoConfiguration 注解包含了 @AutoConfigurationPackage@Import({AutoConfigurationImportSelector.class}) 两个注解。

@AutoConfigurationPackage 注解

@AutoConfigurationPackage 会将被修饰的类作为主配置类,该类所在的 package 会被视为根路径,Spring Boot 默认会自动扫描根路径下的所有 Spring Bean(被 @Component 以及继承 @Component 的各个注解所修饰的类)。——这就是为什么 Spring Boot 的启动类一般要置于根路径的原因。这个功能等同于在 Spring xml 配置中通过 context:component-scan 来指定扫描路径。@Import 注解的作用是向 Spring 容器中直接注入指定组件。@AutoConfigurationPackage 注解中注明了 @Import({Registrar.class})Registrar 类用于保存 Spring Boot 的入口类、根路径等信息。

SpringFactoriesLoader.loadFactoryNames 方法

@Import(AutoConfigurationImportSelector.class) 表示直接注入 AutoConfigurationImportSelectorAutoConfigurationImportSelector 有一个核心方法 getCandidateConfigurations 用于获取候选配置。该方法调用了 SpringFactoriesLoader.loadFactoryNames 方法,这个方法即为 Spring Boot SPI 的关键,它负责加载所有 META-INF/spring.factories 文件,加载的过程由 SpringFactoriesLoader 负责。

Spring Boot 的 META-INF/spring.factories 文件本质上就是一个 properties 文件,数据内容就是一个个键值对。

SpringFactoriesLoader.loadFactoryNames 方法的关键源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// spring.factories 文件的格式为:key=value1,value2,value3
// 遍历所有 META-INF/spring.factories 文件
// 解析文件,获得 key=factoryClass 的类名称
public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
String factoryTypeName = factoryType.getName();
return loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());
}

private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
// 尝试获取缓存,如果缓存中有数据,直接返回
MultiValueMap<String, String> result = cache.get(classLoader);
if (result != null) {
return result;
}

try {
// 获取资源文件路径
Enumeration<URL> urls = (classLoader != null ?
classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
result = new LinkedMultiValueMap<>();
// 遍历所有路径
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
UrlResource resource = new UrlResource(url);
// 解析文件,得到对应的一组 Properties
Properties properties = PropertiesLoaderUtils.loadProperties(resource);
// 遍历解析出的 properties,组装数据
for (Map.Entry<?, ?> entry : properties.entrySet()) {
String factoryTypeName = ((String) entry.getKey()).trim();
for (String factoryImplementationName : StringUtils.commaDelimitedListToStringArray((String) entry.getValue())) {
result.add(factoryTypeName, factoryImplementationName.trim());
}
}
}
cache.put(classLoader, result);
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load factories from location [" +
FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}

归纳上面的方法,主要作了这些事:

加载所有 META-INF/spring.factories 文件,加载过程有 SpringFactoriesLoader 负责。

  • 在 CLASSPATH 中搜寻所有 META-INF/spring.factories 配置文件
  • 然后,解析 spring.factories 文件,获取指定自动装配类的全限定名

Spring Boot 的 AutoConfiguration

Spring Boot 有各种 starter 包,可以根据实际项目需要,按需取材。在项目开发中,只要将 starter 包引入,我们就可以用很少的配置,甚至什么都不配置,即可获取相关的能力。通过前面的 Spring Boot SPI 流程,只完成了自动装配工作的一半,剩下的工作如何处理呢 ?

以 spring-boot-starter-web 的 jar 包为例,查看其 maven pom,可以看到,它依赖于 spring-boot-starter,所有 Spring Boot 官方 starter 包都会依赖于这个 jar 包。而 spring-boot-starter 又依赖于 spring-boot-autoconfigure,Spring Boot 的自动装配秘密,就在于这个 jar 包。

从 spring-boot-autoconfigure 包的结构来看,它有一个 META-INF/spring.factories ,显然利用了 Spring Boot SPI,来自动装配其中的配置类。

下图是 spring-boot-autoconfigure 的 META-INF/spring.factories 文件的部分内容,可以看到其中注册了一长串会被自动加载的 AutoConfiguration 类。

RedisAutoConfiguration 为例,这个配置类中,会根据 @ConditionalXXX 中的条件去决定是否实例化对应的 Bean,实例化 Bean 所依赖的重要参数则通过 RedisProperties 传入。

RedisProperties 中维护了 Redis 连接所需要的关键属性,只要在 yml 或 properties 配置文件中,指定 spring.redis 开头的属性,都会被自动装载到 RedisProperties 实例中。

通过以上分析,已经一步步解读出 Spring Boot 自动装载的原理。

SPI 应用案例之 Dubbo

Dubbo 并未使用 Java SPI,而是自己封装了一套新的 SPI 机制。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,配置内容形式如下:

1
2
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样可以按需加载指定的实现类。Dubbo SPI 除了支持按需加载接口实现类,还增加了 IOC 和 AOP 等特性。

ExtensionLoader 入口

Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,可以加载指定的实现类。

ExtensionLoadergetExtension 方法是其入口方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
// 获取默认的拓展实现类
return getDefaultExtension();
}
// Holder,顾名思义,用于持有目标对象
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
// 双重检查
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
// 设置实例到 holder 中
holder.set(instance);
}
}
}
return (T) instance;
}

可以看出,这个方法的作用就是:首先检查缓存,缓存未命中则调用 createExtension 方法创建拓展对象。那么,createExtension 是如何创建拓展对象的呢,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension(
(T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}

createExtension 方法的的工作步骤可以归纳为:

  1. 通过 getExtensionClasses 获取所有的拓展类
  2. 通过反射创建拓展对象
  3. 向拓展对象中注入依赖
  4. 将拓展对象包裹在相应的 Wrapper 对象中

以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是 Dubbo IOC 与 AOP 的具体实现。

获取所有的拓展类

Dubbo 在通过名称获取拓展类之前,首先需要根据配置文件解析出拓展项名称到拓展类的映射关系表(Map<名称, 拓展类>),之后再根据拓展项名称从映射关系表中取出相应的拓展类即可。相关过程的代码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取已加载的拓展类
Map<String, Class<?>> classes = cachedClasses.get();
// 双重检查
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载拓展类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

这里也是先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过 loadExtensionClasses 加载拓展类。下面分析 loadExtensionClasses 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Map<String, Class<?>> loadExtensionClasses() {
// 获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
// 对 SPI 注解内容进行切分
String[] names = NAME_SEPARATOR.split(value);
// 检测 SPI 注解内容是否合法,不合法则抛出异常
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension...");
}

// 设置默认名称,参考 getDefaultExtension 方法
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
// 加载指定文件夹下的配置文件
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}

loadExtensionClasses 方法总共做了两件事情,一是对 SPI 注解进行解析,二是调用 loadDirectory 方法加载指定文件夹配置文件。SPI 注解解析过程比较简单,无需多说。下面我们来看一下 loadDirectory 做了哪些事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
// fileName = 文件夹路径 + type 全限定名
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
// 根据文件名加载所有的同名文件
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 加载资源
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("...");
}
}

loadDirectory 方法先通过 classLoader 获取所有资源链接,然后再通过 loadResource 方法加载资源。我们继续跟下去,看一下 loadResource 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void loadResource(Map<String, Class<?>> extensionClasses,
ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
// 按行读取配置内容
while ((line = reader.readLine()) != null) {
// 定位 # 字符
final int ci = line.indexOf('#');
if (ci >= 0) {
// 截取 # 之前的字符串,# 之后的内容为注释,需要忽略
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 以等于号 = 为界,截取键与值
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 加载类,并通过 loadClass 方法对类进行缓存
loadClass(extensionClasses, resourceURL,
Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class...");
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class...");
}
}

loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法进行其他操作。loadClass 方法用于主要用于操作缓存,该方法的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL,
Class<?> clazz, String name) throws NoSuchMethodException {

if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("...");
}

// 检测目标类上是否有 Adaptive 注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
// 设置 cachedAdaptiveClass缓存
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("...");
}

// 检测 clazz 是否是 Wrapper 类型
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
// 存储 clazz 到 cachedWrapperClasses 缓存中
wrappers.add(clazz);

// 程序进入此分支,表明 clazz 是一个普通的拓展类
} else {
// 检测 clazz 是否有默认的构造方法,如果没有,则抛出异常
clazz.getConstructor();
if (name == null || name.length() == 0) {
// 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为 name
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("...");
}
}
// 切分 name
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
// 如果类上有 Activate 注解,则使用 names 数组的第一个元素作为键,
// 存储 name 到 Activate 注解对象的映射关系
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
// 存储 Class 到名称的映射关系
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
// 存储名称到 Class 的映射关系
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("...");
}
}
}
}
}

如上,loadClass 方法操作了不同的缓存,比如 cachedAdaptiveClasscachedWrapperClassescachedNames 等等。除此之外,该方法没有其他什么逻辑了。

参考资料