Dunwu Blog

大道至简,知易行难

《极客时间教程 - 微服务架构核心 20 讲》笔记

什么是微服务架构

微服务是一种架构模式。

微服务的六个特点:

  • 一组小的服务
  • 独立的进程
  • 独立部署
  • 轻量级通信
  • 基于业务能力
  • 无集中式管理——这里指的是可以用不同的技术栈,不同的存储

微服务定义:基于有界上下文的、松散耦合的、面向服务的架构。

架构师如何权衡微服务的利弊

架构之道在于权衡利弊。

微服务架构的优点

  • 强模块化边界
  • 可独立部署
  • 技术多样性

微服务架构的缺点

  • 分布式系统复杂性
  • 最终一致性
  • 运维复杂性
  • 测试复杂性

分布式系统带来的一个挑战就是取终一致性。

康威法则和微服务给架构师怎样的启示

康威法则:设计系统的架构受制于产生这些设计的组织的沟通结构。

img

康威的原文中提出的各定律

  • 第一定律 组织沟通方式会通过系统设计表达出来
  • 第二定律 时间再多一件事情也不可能做的完美,但总有时间做完一件事情
  • 第三定律 线型系统和线型组织架构间有潜在的异质同态特性
  • 第四定律 大的系统组织总是比小系统更倾向于分解

其中心思想实际就是分而治之

企业应该在什么时候开始考虑引入微服务

微服务的适用性:

img

微服务重在服务治理,其对于平台基础设施有较高要求,所以企业刚开始应用微服务并不一定能提高生产力。简单来说:单体服务适用于小团队;微服务适用于大团队。

何时选择微服务,在于度的把控。当研发团队人员增长到一定程度,沟通成本不断增长时,就可以考虑微服务架构了。一个经验数据是,当团队达到 100 人规模时,就可以考虑使用微服务架构了。

罗马不是一天建成的:架构是一个演进的过程,不应该一开始就将系统设计的过于复杂。

什么样组织架构更适合微服务

img

  • 左边是比较传统的组织架构。产品从左到右流程走,可能出现的问题,反馈比较慢,对业务支持比较慢。沟通成本比较大。

  • 右边是比较合适微服务的组织架构, 每一个团队(基于微服务的跨职能的团队),有开发,有产品,有测试,团队都支持自己的微服务。交付的产口是平台,对外提供 API 接口支持多样的业务。

img

DevOps 理念:谁开发的,谁构建,谁支持。

如何理解阿里巴巴提出的微服务

中台战略和微服务的关系

img

业务中台和技术中台统称为大中台,支撑业务前台。正所谓,万丈高楼平地起,中台基础越扎实,前台发展就越快。

PaaS 和 核心业务层是和微服务相关的。这一些基本都可以用微服务来实现。

  • IaaS:Infrastructure-as-a-Service(基础设施即服务)

  • PaaS:Platform-as-a-Service(平台即服务)

如何给出一个清晰简洁的服务分层方式

大致的服务分层图:

img

SOA(Service-Oriented Architecture)或微服务大致可分为

  • 基础服务:也被称为:核心领域服务、中间层服务、公共服务
  • 聚合服务:对基础服务的聚合,以满足业务需求,提供给外部调用。

微服务总体技术架构体系是怎么设计的

img

  • 接入层:接入外部流量,内部做负载均衡
  • 网关层:反向路由,限流,安全,跨横切面的功能。
  • 业务服务层:可分为:聚合服务,基础服务
  • 支撑服务:各种公共性的后台服务
  • 平台服务:可以是一些管理系统
  • 基础设施:由运维团队运维

其中,与微服务相关的主要有:网关层、业务服务层、支撑服务、平台服务

微服务最经典的三种服务发现机制

消费者(客户端)如何发现生产者(服务端),有三种模式:

(1)通过 DNS 访问 LB(负载均衡),LB 分发

img

(2)消费者内置 LB, 生产者将自身信息注册到注册中心上,并通过发送定时心跳来确认自身服务可用。消费者定期从注册中心拉取生产者信息

img

(3)结全前面两种方式, 在 Consumer 的主机上也布置一个 LB。 LB 会定期同步注册中心的信息。 运维成本比较高一点。

img

微服务 API 服务网关(一)原理

网关用于屏蔽服务内部的逻辑,希望外部访问看到是统一的接口。

img

网关主要的功能:

  • 反向代理:将外部的请求换成内部调用。
  • 安全认证:防刷、防爬虫。
  • 限流熔断:处理可能会突发流量。
  • 日志监控:进行访问访问审计,监控流量。

一般不要把过多的业务逻辑写在网关当中。

img

服务 API 服务网关(二)开源网关 Zuul

Servlet 和 Filter Runner 过滤器:前置路由过滤器, 路由过滤器,后置路由过滤器

过滤器开发,可以通过脚本开发。开发完后上传到过滤器目录中, 被扫描后加到 Filter Runner 中。

各个 Filter 共享数据通过 Request Context 来实现。

img

过滤链的流程:

img

跟 Netflix 学习微服务路由发现体系

netflix 有两个比较重要的支撑服务

  • 服务注册中心 Eureka
  • 网关 zuul

img

集中式配置中心的作用和原理是什么

为什么要引入配置中心呢?

配置文件中的属性不方便管理,无法动态更新,无法审计。配置中心可以解决这些问题。

什么可做配置呢?

  • 业务开关
  • 调用/响应超时
  • 限流
  • 连接字符串
  • 动态参数

Svr 更新配置有两种方式:推和拉。

img

携程的 Apollo 配置中心:

img

github : https://github.com/ctripcorp/apollo

微服务通讯方式 RPC vs REST

RPC:远程过程调用

REST:Restful

img

微服务框架需要考虑哪些治理环节

一个公司的微服务多了,就要需要考虑服务治理:

  • 软负载:蓝绿发布,灰度发布

  • 指标(Metrics):服务的调用量,耗时监控

  • 调用链埋点:方便快速定位问题

契约生成代码: 定义结构体可自动生成 json 格式, vscode 有插件。

img

阿里巴巴微服务治理生态:Dubbo http://dubbo.apache.org/en-us/

微服务监控系统分层和监控架构

五个层次的监控:

  • 基础层施监控
  • 系统层监控
  • 应用层监控
    • url
    • sevice
    • mysql
    • cache 可用率
    • 性能
    • qps
  • 业务层监控
    • 核心指标监控
    • 登录注册
  • 端用户体验监控

img

  • 日志监控:Elasticsearch
  • metrics 监控
  • 健康检查
  • 调用链监控
  • 告警系统

比较典型的监控架构,大部分公司的流程

img

数据量比较大一般用 Kafka 作为缓冲队列。

Nagios 健康检测工具。

ELK:ELK 是 Elasticsearch、Logstash、Kibana 三大开源框架首字母大写简称。

微服务的调用链监控该如何选型

调用链的监控 谷歌 2010 年提出来的。

通过 Span 来跟踪, RootSpan ChildSpan 跨进程时 会有 Trace di + parant span id

img

三个主流调用链监控系统的比较:

img

微服务的容错限流是如何工作的

Netfiix Hystrix 具有熔断、隔离、限流、降级的功能 。

img

说明:

  • 3 Cirult OPen 判断是否可以熔断, 是则执行 getFAllBack() 降级处理函数
  • 5 run() 超时 也执行降级处理函数。
  • 6 不成功也 执行处理函数 。
  • Calculate Cirult Health 就是在正常执行成功后计算是否需要熔断。

Docker 容器部署技术 & 持续交付流水线

docker 容器治理就是解决:环境不一致的问题。把依赖的所有包都打在镜像中。

统一、标准化的交付流水线。

UAT 环境: User Acceptance Test (用户验收测试)

img

发布模式: 蓝绿布置,灰度发布(金丝雀发布)。

img

容器集群调度和基于容器的发布体系

资源调度框架 Mesos 架构

img

基于容器的云发布体系

img

《RPC 实战与核心原理》笔记

别老想着怎么用好 RPC 框架,你得多花时间琢磨原理

为什么要学习 RPC

RPC 不仅是微服务的架构基础,实际上,只要涉及网络通信,就可能用到 RPC。

  • 例 1:大型分布式应用系统可能会依赖消息队列、分布式缓存、分布式数据库以及统一配置中心等,应用程序与依赖的这些中间件之间都可以通过 RPC 进行通信。比如 etcd,它作为一个统一的配置服务,客户端就是通过 gRPC 框架与服务端进行通信的。

  • 例 2:我们经常会谈到的容器编排引擎 Kubernetes,它本身就是分布式的,Kubernetes 的 kube-apiserver 与整个分布式集群中的每个组件间的通讯,都是通过 gRPC 框架进行的。

RPC 是解决分布式系统通信问题的一大利器

核心原理:能否画张图解释下 RPC 的通信流程?

什么是 RPC?

RPC 的全称是 Remote Procedure Call,即远程过程调用

RPC 的作用体现在两个方面:

  • 屏蔽远程调用跟本地调用的差异,让用户像调用本地一样去调用远程方法。
  • 隐藏底层网络通信的复杂性,让用户更专注于业务逻辑。

RPC 通信流程

RPC 是一个远程调用,因此必然需要通过网络传输数据,且 RPC 常用于业务系统之间的数据交互,需要保证其可靠性,所以 RPC 一般默认采用 TCP 协议来传输。

网络传输数据是二进制数据,因此请求方需要将请求参数转为二进制数据,即序列化

响应方接受到请求,要将二进制数据转换为请求参数,需要反序列化

请求方和响应方识别彼此的信息,需要约定好彼此数据的格式,即协议大多数的协议会分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。

为了屏蔽底层通信细节,使用户聚焦自身业务,因此 RPC 框架一般引入了动态代理,通过依赖注入等技术,拦截方法调用,完成远程调用的通信逻辑。

RPC 在架构中的位置

RPC 框架能够帮助我们解决系统拆分后的通信问题,并且能让我们像调用本地一样去调用
远程方法。

协议:怎么设计可扩展且向后兼容的协议?

协议的作用

在传输过程中,RPC 并不会把请求参数的所有二进制数据整体一下子发送到对端机器上,中间可能会拆分成好几个数据包,也可能会合并其他请求的数据包(合并的前提是同一个 TCP 连接上的数据),至于怎么拆分合并,这其中的细节会涉及到系统参数配置和 TCP 窗口大小。对于服务提供方应用来说,他会从 TCP 通道里面收到很多的二进制数据,那这时候怎么识别出哪些二进制是第一个请求的呢?

个人理解:为了避免语义不一致的事情发生,需要为数据报文设定边界,请求方和接收方都按照设定的边界去读写数据。这类似于文章使用标点符号去断句。

为何需要设计 RPC 协议

RPC 协议对性能要求高,而公有网络协议往往数据报文较大,内容不够紧凑。

如何设计 RPC 协议?

首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。

接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。

综上,一个 RPC 协议大概会由下图中的这些参数组成:

可扩展的协议

前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参
数就会导致线上兼容问题。

为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。

序列化:对象怎么在网络中传输?

为什么需要序列化

调用方和被调用方的数据原本是对象,无法在网络中传输,必须转换为二进制数据。因此,需要一种方式来实现此过程:将对象转为二进制数据,即序列化;同时,需要根据二进制数据逆向转化为对象,即反序列化

从 RPC 的实现角度来看,序列化的作用如下图所示:

常用序列化方式

  • JDK 序列化:ObjectInputStreamObjectOutputStream
  • JSON
  • 二进制
    • Hessian
    • Protobuf
    • Thirft

RPC 协议选型

优先级依次从高到低:安全性、通用性、兼容性、性能、效率、空间开销

在序列化的选择上,与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。我们更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次我们才会去考虑性能、效率和空间开销。

使用 RPC 需要注意哪些问题

  • 对象构造得过于复杂 - 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
  • 对象过于复杂、庞大 - 入参对象与返回值对象体积不要太大,更不要传太大的集合;
  • 使用序列化框架不支持的类作为入参类 - 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
  • 对象有复杂的继承关系 - 对象不要有复杂的继承关系,最好不要有父子类的情况。

网络通信:RPC 框架在网络通信上更倾向于哪种网络 IO 模型?

常见的网络 IO 模型

常见的网络 IO 模型分为四种:同步阻塞 IO(BIO)、同步非阻塞 IO(NIO)、IO 多路复用和异步非阻塞 IO(AIO)。在这四种 IO 模型中,只有 AIO 为异步 IO,其他都是同步 IO。

IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。

IO 多路复用分为 select,poll 和 epoll。

什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。

零拷贝

系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。

网络 IO 读写流程

应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。

应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。

所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。

Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。

Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。

Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写
操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。

Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷
贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。

动态代理:面向接口编程,屏蔽 RPC 处理流程

动态代理可以帮用户屏蔽远程调用的细节,实现像调用本地一样地调用远程的体验。

JDK 支持的动态代理方式是通过实现 InvocationHandler 接口。这种方式有一定的局限性——它要求被代理的类只能是接口。原因是因为生成的代理类会继承 Proxy 类,但 Java 是不支持多重继承的。此外,它还有性能问题。它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低。

除 JDK 以外,还有其他第三方框架可以实现动态代理,如像 Javassist、Byte Buddy。

Javassist 的是通过控制底层字节码来实现动态代理,不需要反射完成调用,所以性能肯定比 JDK 的动态代理方式性能要好。

Byte Buddy 则属于后起之秀,在很多优秀的项目中,像 Spring、Jackson 都用到了 Byte Buddy 来完成底层代理。相比 Javassist,Byte Buddy 提供了更容易操作的 API,编写的代码可读性更高。更重要的是,生成的代理类执行速度比 Javassist 更快。

RPC 实战:剖析 gRPC 源码,动手实现一个完整的 RPC

架构设计:设计一个灵活的 RPC 框架

RPC 架构

其实 RPC 就是把拦截到的方法参数,转成可以在网络中传输的二进制,并保证在服务提供方能正确地还原出语义,最终实现像调用本地一样地调用远程的目的

RPC 本质上就是一个远程调用,必然需要通过网络来传输数据,为了屏蔽网络传输的复杂性,需要封装一个单独的数据传输模块用来收发二进制数据。

用户请求的时候是基于方法调用,方法出入参数都是对象数据,对象是肯定没法直接在网络中传输的,我们需要提前把它转成可传输的二进制,这就是我们说的序列化过程。但只是把方法调用参数的二进制数据传输到服务提供方是不够的,我们需要在方法调用参数的二进制数据后面增加“断句”符号来分隔出不同的请求,在两个“断句”符号中间放的内容就是我们请求的二进制数据,这个过程我们叫做协议封装。可以把这两个处理过程放在同一个模块,统称为协议模块。除此之外,我们还可以在协议模块中加入压缩功能,这是因为压缩过程也是对传输的二进制数据进行操作。

RPC 还需要为调用方找到所有的服务提供方,并需要在 RPC 里面维护好接口跟服务提供者地址的关系,这样调用方在发起请求的时候才能快速地找到对应的接收地址,这个过程即为“服务发现”。

但服务发现只是解决了接口和服务提供方地址映射关系的查找问题。但是,对于 RPC 来说,每次发送请求的时候都是需要用 TCP 连接的,相对服务提供方 IP 地址,TCP 连接状态是瞬息万变的,所以我们的 RPC 框架里面要有连接管理器去维护 TCP 连接的状态。

有了集群之后,提供方可能就需要管理好这些服务了,那我们的 RPC 就需要内置一些服务治理的功能,比如服务提供方权重的设置、调用授权等一些常规治理手段。而服务调用方需要额外做哪些事情呢?每次调用前,我们都需要根据服务提供方设置的规则,从集群中选择可用的连接用于发送请求。

RPC 可扩展架构

在 RPC 框架中,如何支持插件化架构呢?

可以使用 SPI 技术来实现。注意:由于 JDK SPI 性能不高,并且不支持自动注入,所以,一般会选择其他的 SPI 实现。

有了 SPI 支持插件式加载后,RPC 框架就变成了一个微内核架构。

服务发现:到底是要 CP 还是 AP?

RPC 框架必须要有服务注册和发现机制,这样,集群中的节点才能知道通信方的请求地址。

  • 服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心之中,注册中心将这个服务节点的 IP 和接口保存下来。
  • 服务订阅:在服务调用方启动的时候,去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。

基于 ZooKeeper 的服务发现

使用 ZooKeeper 作为服务注册中心,是 Java 分布式系统的经典方案。

搭建一个 ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能

img

通常我们可以使用 ZooKeeper、etcd 或者分布式缓存(如 Hazelcast)来解决事件通知问题,但当集群达到一定规模之后,依赖的 ZooKeeper 集群、etcd 集群可能就不稳定了,无法满足我们的需求。

在超大规模的服务集群下,注册中心所面临的挑战就是超大批量服务节点同时上下线,注册中心集群接受到大量服务变更请求,集群间各节点间需要同步大量服务节点数据,最终导致如下问题:

  • 注册中心负载过高;
  • 各节点数据不一致;
  • 服务下发不及时或下发错误的服务节点列表。

RPC 框架依赖的注册中心的服务数据的一致性其实并不需要满足 CP,只要满足 AP 即可。

基于消息总线的最终一致性的注册中心

ZooKeeper 的一大特点就是强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这也就直接导致了 ZooKeeper 集群性能上的下降。

而 RPC 框架的服务发现,在服务节点刚上线时,服务调用方是可以容忍在一段时间之后(比如几秒钟之后)发现这个新上线的节点的。毕竟服务节点刚上线之后的几秒内,甚至更长的一段时间内没有接收到请求流量,对整个服务集群是没有什么影响的,所以我们可以牺牲掉 CP(强制一致性),而选择 AP(最终一致),来换取整个注册中心集群的性能和稳定性。

img

健康检测:这个节点都挂了,为啥还要疯狂发请求?

健康检测,它能帮助我们从连接列表里面过滤掉一些存在问题的节点,避免在发请求的时候选择出有问题的节点而影响业务。

服务方状态一般有三种情况:

  1. 健康状态 - 建立连接成功,并且心跳探活也一直成功;
  2. 亚健康状态 - 建立连接成功,但是心跳请求连续失败;
  3. 死亡状态 - 建立连接失败。

设计健康检测方案的时候,不能简单地从 TCP 连接是否健康、心跳是否正常等简单维度考虑,因为健康检测的目的就是要保证“业务无损”,因此,可以加入业务请求可用率因素,这样能最大化地提升 RPC 接口可用率。

正常情况下,我们大概 30S 会发一次心跳请求,这个间隔一般不会太短,如果太短会给服务节点造成很大的压力。但是如果太长的话,又不能及时摘除有问题的节点。

路由策略:怎么让请求按照设定的规则发到不同的节点上?

服务路由是指通过一定的规则从集群中选择合适的节点。

为什么需要路由策略

服务路由通常用于以下场景,目的在于实现流量隔离:

  • 分组调用

  • 蓝绿发布

  • 灰度发布

  • 流量切换

  • 线下测试联调

  • 读写分离

路由规则

  • 条件路由:基于条件表达式的路由规则
  • 脚本路由:基于脚本语言的路由规则
  • 标签路由:将服务分组的路由规则

负载均衡:节点负载差距这么大,为什么收到的流量还一样?

负载均衡算法

  • 随机算法
    • 加权随机算法
  • 轮询算法
    • 加权轮询算法
  • 最小活跃数算法
    • 加权最小活跃数算法
  • 哈希算法
  • 一致性哈希算法

RPC 框架中的负载均衡

RPC 负载均衡所采用的策略与传统的 Web 服务负载均衡所采用策略的不同之处:

  1. 搭建负载均衡设备或 TCP/IP 四层代理,需要额外成本;
  2. 请求流量都经过负载均衡设备,多经过一次网络传输,会额外浪费一些性能;
  3. 负载均衡添加节点和摘除节点,一般都要手动添加,当大批量扩容和下线时,会有大量的人工操作,“服务发现”在操作上是个问题;
  4. 我们在服务治理的时候,针对不同接口服务、服务的不同分组,我们的负载均衡策略是需要可配的,如果大家都经过这一个负载均衡设备,就不容易根据不同的场景来配置不同的负载均衡策略了。

RPC 的负载均衡完全由 RPC 框架自身实现,RPC 的服务调用者会与“注册中心”下发的所有服务节点建立长连接,在每次发起 RPC 调用时,服务调用者都会通过配置的负载均衡插件,自主选择一个服务节点,发起 RPC 调用请求。

如何设计自适应的负载均衡

那服务调用者节点又该如何判定一个服务节点的处理能力呢?

可以采用一种打分制的策略,服务调用者收集与之建立长连接的每个服务节点的指标数据,如服务节点的负载指标、CPU 核数、内存大小、请求处理的耗时指标(如请求平均耗时、TP99、TP999)、服务节点的状态指标(如正常、亚健康)。通过这些指标,计算出一个分数,比如总分 10 分,如果 CPU 负载达到 70%,就减它 3 分,当然了,减 3 分只是个类比,需要减多少分是需要一个计算策略的。

然后,根据不同指标的重要程度设置权重,然后累加,计算公式:

1
健康值 = 指标值1 * 权重1 + 指标值2 * 权重2 + ...

服务调用者给每个服务节点都打完分之后,会发送请求,那这时候我们又该如何根据分数去控制给每个服务节点发送多少流量呢?

关键步骤

  1. 添加服务指标收集器,并将其作为插件,默认有运行时状态指标收集器、请求耗时指标收集器。
  2. 运行时状态指标收集器收集服务节点 CPU 核数、CPU 负载以及内存等指标,在服务调用者与服务提供者的心跳数据中获取。
  3. 请求耗时指标收集器收集请求耗时数据,如平均耗时、TP99、TP999 等。
  4. 可以配置开启哪些指标收集器,并设置这些参考指标的指标权重,再根据指标数据和指标权重来综合打分。
  5. 通过服务节点的综合打分与节点的权重,最终计算出节点的最终权重,之后服务调用者会根据随机权重的策略,来选择服务节点。

异常重试:在约定时间内安全可靠地重试

异常重试

就是当调用端发起的请求失败时,RPC 框架自身可以进行重试,再重新发送请求,用户可以自行设置是否开启重试以及重试的次数。

当然,不是所有的异常都要触发重试,只有符合重试条件的异常才能触发重试,比如网络超时异常、网络连接异常等等(这个需要 RPC 去判定)。

注意:有时网络可能发生抖动,导致请求超时,这时如果 RPC 触发超时重试,会触发业务逻辑重复执行,如果接口没有幂等性设计,就可能引发问题。如:重发写表。

重试超时时间

连续的异常重试可能会出现一种不可靠的情况,那就是连续的异常重试并且每次处理的请求时间比较长,最终会导致请求处理的时间过长,超出用户设置的超时时间。

解决这个问题最直接的方式就是,在每次重试后都重置一下请求的超时时间。

当调用端发起 RPC 请求时,如果发送请求发生异常并触发了异常重试,我们可以先判定下这个请求是否已经超时,如果已经超时了就直接返回超时异常,否则就先重置下这个请求的超时时间,之后再发起重试。

在所有发起重试、负载均衡选择节点的时候,去掉重试之前出现过问题的那个节点,以保证重试的成功率。

业务异常

RPC 框架是不会知道哪些业务异常能够去进行异常重试的,我们可以加个重试异常的白名单,用户可以将允许重试的异常加入到这个白名单中。当调用端发起调用,并且配置了异常重试策略,捕获到异常之后,我们就可以采用这样的异常处理策略。如果这个异常是 RPC 框架允许重试的异常,或者这个异常类型存在于可重试异常的白名单中,我们就允许对这个请求进行重试。


综上,一个可靠的 RPC 容错处理机制如下:

img

优雅关闭:如何避免服务停机带来的业务损失?

优雅关闭:如何避免服务停机带来的业务损失?

在服务重启的时候,对于调用方来说,可能会存在以下几种情况:

  • 调用方发请求前,目标服务已经下线。对于调用方来说,跟目标节点的连接会断开,这时候调用方可以立马感知到,并且在其健康列表里面会把这个节点挪掉,自然也就不会被负载均衡选中。
  • 调用方发请求的时候,目标服务正在关闭。但调用方并不知道它正在关闭,而且两者之间的连接也没断开,所以这个节点还会存在健康列表里面,因此该节点就有一定概率会被负载均衡选中。

当服务提供方正在关闭,如果这之后还收到了新的业务请求,服务提供方直接返回一个特定的异常给调用方(比如 ShutdownException)。这个异常就是告诉调用方“我已经收到这个请求了,但是我正在关闭,并没有处理这个请求”,然后调用方收到这个异常响应后,RPC 框架把这个节点从健康列表挪出,并把请求自动重试到其他节点,因为这个请求是没有被服务提供方处理过,所以可以安全地重试到其他节点,这样就可以实现对业务无损。

但如果只是靠等待被动调用,就会让这个关闭过程整体有点漫长。因为有的调用方那个时刻没有业务请求,就不能及时地通知调用方了,所以我们可以加上主动通知流程,这样既可以保证实时性,也可以避免通知失败的情况。

如何捕获到关闭事件呢?在 Java 语言里面,对应的是 Runtime.addShutdownHook 方法,可以注册关闭的钩子。在 RPC 启动的时候,我们提前注册关闭钩子,并在里面添加了两个处理程序,一个负责开启关闭标识,一个负责安全关闭服务对象,服务对象在关闭的时候会通知调用方下线节点。同时需要在我们调用链里面加上挡板处理器,当新的请求来的时候,会判断关闭标识,如果正在关闭,则抛出特定异常。

优雅启动:如何避免流量打到没有启动完成的节点?

优雅启动:如何避免流量打到没有启动完成的节点?

运行了一段时间后的应用,执行速度会比刚启动的应用更快。这是因为在 Java 里面,在运行过程中,JVM 虚拟机会把高频的代码编译成机器码,被加载过的类也会被缓存到 JVM 缓存中,再次使用的时候不会触发临时加载,这样就使得“热点”代码的执行不用每次都通过解释,从而提升执行速度。

但是这些“临时数据”,都在应用重启后就消失了。如果让刚启动的应用就承担像停机前一样的流量,这会使应用在启动之初就处于高负载状态,从而导致调用方过来的请求可能出现大面积超时,进而对线上业务产生损害行为。

启动预热

启动预热,就是让刚启动的服务提供方应用不承担全部的流量,而是让它被调用的次数随着时间的移动慢慢增加,最终让流量缓和地增加到跟已经运行一段时间后的水平一样。

首先,对于调用方来说,我们要知道服务提供方启动的时间。有两种方法:

  • 一种是服务提供方在启动的时候,把自己启动的时间告诉注册中心;
  • 另外一种就是注册中心收到的服务提供方的请求注册时间。

怎么确保所有机器的日期时间是一样的?在真实环境中机器都会默认开启 NTP 时间同步功能,来保证所有机器时间的一致性。

最终的结果就是,调用方通过服务发现,除了可以拿到 IP 列表,还可以拿到对应的启动时间。我们需要把这个时间作用在负载均衡上。

通过这个小逻辑的改动,我们就可以保证当服务提供方运行时长小于预热时间时,对服务提供方进行降权,减少被负载均衡选择的概率,避免让应用在启动之初就处于高负载状态,从而实现服务提供方在启动后有一个预热的过程。

延迟暴露

服务提供方应用在没有启动完成的时候,调用方的请求就过来了,而调用方请求过来的原因是,服务提供方应用在启动过程中把解析到的 RPC 服务注册到了注册中心,这就导致在后续加载没有完成的情况下服务提供方的地址就被服务调用方感知到了。

为了解决这个问题,需要在应用启动加载、解析 Bean 的时候,如果遇到了 RPC 服务的 Bean,只先把这个
Bean 注册到 Spring-BeanFactory 里面去,而并不把这个 Bean 对应的接口注册到注册中心,只有等应用启动完成后,才把接口注册到注册中心用于服务发现,从而实现让服务调用方延迟获取到服务提供方地址。

具体如何实现呢?

我们可以在服务提供方应用启动后,接口注册到注册中心前,预留一个 Hook 过程,让用户可以实现可扩展的
Hook 逻辑。用户可以在 Hook 里面模拟调用逻辑,从而使 JVM 指令能够预热起来,并且用户也可以在 Hook 里面事先预加载一些资源,只有等所有的资源都加载完成后,最后才把接口注册到注册中心。

熔断限流:业务如何实现自我保护

限流

限流算法

  • 计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶

限流要点

服务端主要是通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时候,对部分访问量特别大的应用进行合理的限流。

  • 服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
    限流的配置并不方便。
  • 我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值;
  • 我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

服务提供方主要通过限流来进行自我保护,我们在实现限流时要考虑到应用和 IP 级别,方便我们在服务治理的时,对部分访问量特别大的应用进行合理的限流。

服务端的限流阈值配置都是作用于单机的,而在有些场景下,例如对整个服务设置限流阈值,服务进行扩容时,
限流的配置并不方便。我们可以在注册中心或配置中心下发限流阈值配置的时候,将总服务节点数也下发给服务节点,让 RPC 框架自己去计算限流阈值。

我们还可以让 RPC 框架的限流模块依赖一个专门的限流服务,对服务设置限流阈值进行精准地控制,但是这种方式依赖了限流服务,相比单机的限流方式,在性能和耗时上有劣势。

熔断

调用端可以通过熔断机制进行自我保护,防止调用下游服务出现异常,或者耗时过长影响调用端的业务逻辑,RPC 框架可以在动态代理的逻辑中去整合熔断器,实现 RPC 框架的熔断功能。

熔断器的工作机制主要是关闭、打开和半打开这三个状态之间的切换。在正常情况下,熔断器是关闭的;当调用端调用下游服务出现异常时,熔断器会收集异常指标信息进行计算,当达到熔断条件时熔断器打开,这时调用端再发起请求是会直接被熔断器拦截,并快速地执行失败逻辑;当熔断器打开一段时间后,会转为半打开状态,这时熔断器允许调用端发送一个请求给服务端,如果这次请求能够正常地得到服务端的响应,则将状态置为关闭状态,否则
设置为打开。

业务分组:如何隔离流量?

img

在 RPC 里面我们可以通过分组的方式人为地给不同的调用方划分出不同的小集群,从而实现调用方流量隔离的效果,保障我们的核心业务不受非核心业务的干扰。但我们在考虑问题的时候,不能顾此失彼,不能因为新加一个的功能而影响到原有系统的稳定性。

其实我们不仅可以通过分组把服务提供方划分成不同规模的小集群,我们还可以利用分组完成一个接口多种实现的功能。正常情况下,为了方便我们自己管理服务,我一般都会建议每个接口完成的功能尽量保证唯一。但在有些特殊场景下,两个接口也会完全一样,只是具体实现上有那么一点不同,那么我们就可以在服务提供方应用里面同时暴露两个相同接口,但只是接口分组不一样罢了。

动态分组

分组可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速伸缩。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

异步 RPC:压榨单机吞吐量

异步 RPC:压榨单机吞吐量

影响到 RPC 调用的吞吐量的主要原因就是服务端的业务逻辑比较耗时,并且 CPU 大部分时间都在等待而没有去计算,导致 CPU 利用率不够,而提升单机吞吐量的最好办法就是使用异步 RPC。

RPC 框架的异步策略主要是调用端异步与服务端异步。调用端的异步就是通过 Future 方式实现异步,调用端发起一次异步请求并且从请求上下文中拿到一个 Future,之后通过 Future 的 get 方法获取结果,如果业务逻辑中同时调用多个其它的服务,则可以通过 Future 的方式减少业务逻辑的耗时,提升吞吐量。服务端异步则需要一种回调方式,让业务逻辑可以异步处理,之后调用 RPC 框架提供的回调接口,将最终结果异步通知给调用端。

另外,我们可以通过对 CompletableFuture 的支持,实现 RPC 调用在调用端与服务端之间的完全异步,同时提升两端的单机吞吐量。

此外,RPC 框架也可以有其它的异步策略,比如集成 RxJava,再比如 gRPC 的 StreamObserver 入参对象,但 CompletableFuture 是 Java8 原生提供的,无代码入侵性,并且在使用上更加方便。

安全体系:如何建立可靠的安全体系?

RPC 是解决应用间互相通信的框架,而应用之间的远程调用过程一般不会暴露在公网,换句话讲就是说 RPC 一般用于解决内部应用之间的通信,而这个“内部”是指应用都部署在同一个大局域网内。相对于公网环境,局域网的隔离性更好,也就相对更安全,所以在 RPC 里面我们很少考虑像数据包篡改、请求伪造等恶意行为。

对于 RPC 来说,需要关心的安全问题不会有公网应用那么复杂,我们只要保证让服务调用方能拿到真实的服务提供方 IP 地址集合,且服务提供方可以管控调用自己的应用就够了(比如颁发数字签名)。

分布式环境下如何快速定位问题?

问题定位:链路追踪

链路追踪要点

  • traceId:用于表示一次完整的请求
  • spanId:用于标识一次 RPC 调用在分布式请求中的位置
  • annonation:业务自定义埋点数据

链路追踪理论

链路追踪代表产品

  • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
  • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
  • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
  • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。

详解时钟轮在 RPC 中的应用

无论是同步调用还是异步调用,调用端内部实行的都是异步,而调用端在向服务端发送消息之前会创建一个 Future,并存储这个消息标识与这个 Future 的映射,当服务端收到消息并且处理完毕后向调用端发送响应消息,调用端在接收到消息后会根据消息的唯一标识找到这个 Future,并将结果注入给这个 Future。

一般定时任务方案的缺点

  1. 方案一:每创建一个 Future 都启动一个线程,之后 sleep,到达超时时间就触发请求超时的处理逻辑。
  • 缺点:需要创建大量线程。例如:高并发场景下,单机可能每秒要发送数万次请求,请求超时时间设置的是 5 秒,那我们要创建多少个线程用来执行超时任务呢?超过 10 万个线程!
  1. 方案二:用一个线程来处理所有的定时任务,不断轮询定时任务。假设一个线程每隔 100 毫秒会扫描一遍所有的处理 Future 超时的任务,当发现一个 Future 超时了,我们就执行这个任务,对这个 Future 执行超时逻辑。
  • 缺点:很浪费 CPU。高并发场景下,如果调用端刚好在 1 秒内发送了 1 万次请求,这 1 万次请求要在 5 秒后才会超时,那么那个扫描的线程在这个 5 秒内就会不停地对这 1 万个任务进行扫描遍历,要额外扫描 40 多次(每 100 毫秒扫描一次,5 秒内要扫描近 50 次),很浪费 CPU。

时钟轮方案

在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度,而时钟轮就相当于秒针与分针等跳动的一个周期,我们会将每个任务放到对应的时间槽位上。

时钟轮的运行机制和生活中的时钟也是一样的,每隔固定的单位时间,就会从一个时间槽位跳到下一个时间槽位,这就相当于我们的秒针跳动了一次;时钟轮可以分为多层,下一层时钟轮中每个槽位的单位时间是当前时间轮整个周期的时间,这就相当于 1 分钟等于 60 秒钟;当时钟轮将一个周期的所有槽位都跳动完之后,就会从下一层时钟轮中取出一个槽位的任务,重新分布到当前的时钟轮中,当前时钟轮则从第 0 槽位从新开始跳动,这就相当于
下一分钟的第 1 秒。

时钟轮在 RPC 中的应用

  • 调用端请求超时处理:每发一次请求,都创建一个处理请求超时的定时任务放到时钟轮里,在高并发、高访问量的情况下,时钟轮每次只轮询一个时间槽位中的任务,这样会节省大量的 CPU。
  • 调用端与服务端启动超时也可以应用到时钟轮:以调用端为例,假设我们想要让应用可以快速地部署,例如 1 分钟内启动,如果超过 1 分钟则启动失败。我们可以在调用端启动时创建一个处理启动超时的定时任务,放到时钟轮里。
  • 定时心跳:RPC 框架调用端定时向服务端发送心跳,来维护连接状态,我们可以将心跳的逻辑封装为一个心跳任务,放到时钟轮里。

流量回放:保障业务技术升级的神器

实际情况就是我们不仅要保障已有业务的稳定,还需要快速去完成各种新业务的需求,这期间我们的应用代码就会经常发生变化,而发生变化后就可能会引入新的不稳定因素,而且这个过程会一直持续不断发生。

为了保障应用升级后,我们的业务行为还能保持和升级前一样,我们在大多数情况下都是依靠已有的 TestCase 去验证,但这种方式在一定程度上并不是完全可靠的。最可靠的方式就是引入线上 Case 去验证改造后的应用,把线上的真实流量在改造后的应用里面进行回放,这样不仅节省整个上线时间,还能弥补手动维护 Case 存在的缺陷。

应用引入了 RPC 后,所有的请求流量都会被 RPC 接管,所以我们可以很自然地在 RPC 里面支持流量回放功能。虽然这个功能本身并不是 RPC 的核心功能,但对于使用 RPC 的人来说,他们有了这个功能之后,就可以更放心地升级自己的应用了。

动态分组:超高效实现秒级扩缩容

分组后带来的收益,它可以帮助服务提供方实现调用方的隔离。但是因为调用方流量并不是一成不变的,而且还可能会因为突发事件导致某个分组的流量溢出,而在整个大集群还有富余能力的时候,又因为分组隔离不能为出问题的集群提供帮助。

为了解决这种突发流量的问题,我们提供了一种更高效的方案,可以实现分组的快速扩缩容。事实上我们还可以利用动态分组解决分组后给每个分组预留机器冗余的问题,我们没有必要把所有冗余的机器都分配到分组里面,我们可以把这些预留的机器做成一个共享的池子,从而减少整体预留的实例数量。

如何在没有接口的情况下进行 RPC 调用?

应用场景

(1)测试平台:各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。

(2)轻量级的服务网关:可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。服务网关要作为所有 RPC 服务的调用端,是不能依赖所有服务提供方的接口 API 的,也需要调用端在没有服务提供方提供接口的情况下,仍然可以正常地发起 RPC 调用。

如何泛化调用

所谓的 RPC 调用,本质上就是调用端向服务端发送一条请求消息,服务端接收并处理,之后向调用端发送一条响应消息,调用端处理完响应消息之后,一次 RPC 调用就完成了。只要调用端将服务端需要知道的信息,如接口名、业务分组名、方法名以及参数信息等封装成请求消息发送给服务端,服务端就能够解析并处理这条请求消息,这样问题就解决了。

泛化调用接口示例:

1
2
3
4
class GenericService {
Object $invoke(String methodName, String[] paramTypes, Object[] params);
CompletableFuture<Object> $asyncInvoke(String methodName, String[] paramTypes
}

如何在线上环境里兼容多种 RPC 协议?

业界有很多 RPC 框架,如:Dubbo、Hessian、gRPC 等,它们随着技术发展逐渐涌现出来。不同时期、不同项目为了解决自身的通信问题,可能会选择不同的 RPC 框架。

对于一个公司来说,不同的 RPC 框架,会使得维护成本变高。所以,如果想缩减维护成本,自然会想到统一 RPC 框架。

但这面临的重要问题是:如果直接切换 RPC 框架,会导致新旧 RPC 框架的服务无法通信,从而造成业务损失。为此,一个折中的方案就是:先不移除原有的 RPC 框架,但同时接入新的 RPC 框架,让两种 RPC 同时提供服务,然后等所有的应用都接入完新的 RPC 以后,再让所有的应用逐步接入到新的 RPC 上。

在保持原有 RPC 使用方式不变的情况下,同时引入新的 RPC 框架的思路,是可以让所有的应用最终都能升级到我们想要升级的 RPC 上,但对于开发人员来说,这样切换成本还是有点儿高,整个过程最少需要两次上线才能彻底地把应用里面的旧 RPC 都切换成新 RPC。还有一种方案:要让新的 RPC 能同时支持多种 RPC 调用,当一个调用方切换到新的 RPC 之后,调用方和服务提供方之间就可以用新的协议完成调用;当调用方还是用老的 RPC 进行调用的话,调用方和服务提供方之间就继续沿用老的协议完成调用。

如何优雅处理多协议

每种协议约定的数据包格式是不一样的,而且每种协议开头都有一个协议编码,我们一般叫做 magic number。

当 RPC 收到了数据包后,我们可以先解析出 magic number 来。获取到 magic number 后,我们就很容易地找到对应协议的数据格式,然后用对应协议的数据格式去解析收到的二进制数据包。

协议解析过程就是把一连串的二进制数据变成一个 RPC 内部对象,但这个对象一般是跟协议相关的,所以为了能让 RPC 内部处理起来更加方便,我们一般都会把这个协议相关的对象转成一个跟协议无关的 RPC 对象。这是因为在 RPC 流程中,当服务提供方收到反序列化后的请求的时候,我们需要根据当前请求的参数找到对应接口的实现类去完成真正的方法调用。如果这个请求参数是跟协议相关的话,那后续 RPC 的整个处理逻辑就会变得很复杂。

当完成了真正的方法调用以后,RPC 返回的也是一个跟协议无关的通用对象,所以在真正往调用方写回数据的时候,我们同样需要完成一个对象转换的逻辑,只不过这时候是把通用对象转成协议相关的对象。

参考资料

SpringBoot Actuator 快速入门

spring-boot-actuator 模块提供了 Spring Boot 的所有生产就绪功能。启用这些功能的推荐方法是添加 spring-boot-starter-actuator 依赖。

如果是 Maven 项目,添加以下依赖:

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

如果是 Gradle 项目,添加以下声明:

1
2
3
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
}

端点(Endpoint)

Actuator Endpoint 使 Spring Boot 用户可以监控应用,并和应用进行交互。Spring Boot 内置了许多 端点,并允许用户自定义端点。例如,health 端点提供基本的应用健康信息。

用户可以启用或禁用每个单独的端点并通过 HTTP 或 JMX 暴露它们(使它们可远程访问)。当端点被启用和公开时,它被认为是可用的。内置端点仅在可用时才会自动配置。大多数应用程序选择通过 HTTP 公开。例如,默认情况下,health 端点映射到 /actuator/health

启用端点

默认情况下,除了 shutdown 之外的所有端点都已启用。要配置端点的启用,请使用 management.endpoint.<id>.enabled 属性。以下示例启用 shutdown 端点:

1
management.endpoint.shutdown.enabled=true

如果您希望端点是明确指定才启用,请将 management.endpoints.enabled-by-default 属性设置为 false 并根据需要明确指定启用的端点,以下为示例:

1
2
management.endpoints.enabled-by-default=false
management.endpoint.info.enabled=true

暴露端点

由于端点可能包含敏感信息,您应该仔细考虑何时暴露它们。下表显示了内置端点的默认曝光:

ID JMX Web
auditevents Yes No
beans Yes No
caches Yes No
conditions Yes No
configprops Yes No
env Yes No
flyway Yes No
health Yes Yes
heapdump N/A No
httptrace Yes No
info Yes No
integrationgraph Yes No
jolokia N/A No
logfile N/A No
loggers Yes No
liquibase Yes No
metrics Yes No
mappings Yes No
prometheus N/A No
quartz Yes No
scheduledtasks Yes No
sessions Yes No
shutdown Yes No
startup Yes No
threaddump Yes No

要更改暴露的端点,请使用以下特定于技术的包含和排除属性:

Property Default
management.endpoints.jmx.exposure.exclude
management.endpoints.jmx.exposure.include *
management.endpoints.web.exposure.exclude
management.endpoints.web.exposure.include health

include 属性列出了暴露的端点的 ID。 exclude 属性列出了不应暴露的端点的 ID。 exclude 属性优先于 include 属性。您可以使用端点 ID 列表配置包含和排除属性。

例如,仅暴露 health 和 info 端点,其他端点都不通过 JMX 暴露,可以按如下配置:

1
management.endpoints.jmx.exposure.include=health,info

注意:* 可用于选择所有端点。

安全

出于安全考虑,只有 /health 端点会通过 HTTP 方式暴露。用户可以通过 management.endpoints.web.exposure.include 决定哪些端点可以通过 HTTP 方式暴露。

如果 Spring Security 在类路径上并且不存在其他 WebSecurityConfigurerAdapterSecurityFilterChain bean,则除 /health 之外的所有 actuator 都由 Spring Boot 自动启用安全控制。如果用户自定义了 WebSecurityConfigurerAdapterSecurityFilterChain bean,Spring Boot 不再启用安全控制,由用户自行控制访问规则。

如果您希望为 HTTP 端点定义安全控制(例如,只允许具有特定角色的用户访问它们),Spring Boot 提供了一些方便的 RequestMatcher 对象,您可以将它们与 Spring Security 结合使用。

下面是一个典型的 Spring Security 配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().hasRole("ENDPOINT_ADMIN"));
http.httpBasic();
return http.build();
}

}

前面的示例使用 EndpointRequest.toAnyEndpoint() 将请求匹配到任何端点,然后确保所有端点都具有 ENDPOINT_ADMIN 角色。 EndpointRequest 上还提供了其他几种匹配器方法。

如果希望无需身份验证即可访问所有执行器端点。可以通过更改 management.endpoints.web.exposure.include 属性来做到这一点,如下所示:

1
management.endpoints.web.exposure.include=*

此外,如果存在 Spring Security,您将需要添加自定义安全配置,以允许未经身份验证的访问端点,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
@Configuration(proxyBeanMethods = false)
public class MySecurityConfiguration {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http.requestMatcher(EndpointRequest.toAnyEndpoint())
.authorizeRequests((requests) -> requests.anyRequest().permitAll());
return http.build();
}

}

由于 Spring Boot 依赖于 Spring Security 的默认设置,因此 CSRF 保护默认开启。这意味着在使用默认安全配置时,需要 POST(关闭和记录器端点)、PUT 或 DELETE 的执行器端点会收到 403(禁止)错误。

建议仅在创建非浏览器客户端使用的服务时完全禁用 CSRF 保护。

配置端点

端点会自动缓存对不带任何参数的读操作的响应数据。要配置端点缓存响应的时间量,请使用其 cache.time-to-live 属性。以下示例将 bean 端点缓存的生存时间设置为 10 秒:

1
management.endpoint.beans.cache.time-to-live=10s

Actuator Web 端点的超媒体

Spring Boot Actuator 中内置了一个“发现页面”端点,其中包含了所有端点的链接。默认情况下,“发现页面”在 /actuator 上可用。

要禁用“发现页面”,请将以下属性添加到您的应用程序属性中:

1
management.endpoints.web.discovery.enabled=false

配置自定义管理上下文路径后,“发现页面”会自动从 /actuator 移动到应用管理上下文的根目录。例如,如果管理上下文路径是 /management,则发现页面可从 /management 获得。当管理上下文路径设置为 / 时,发现页面被禁用以防止与其他映射发生冲突的可能性。

跨域支持

CORS 是一种 W3C 规范,可让用户以灵活的方式指定授权哪种跨域请求。如果使用 Spring MVC 或 Spring WebFlux,则可以配置 Actuator 的 Web 端点以支持此类场景。

CORS 支持默认是禁用的,只有在设置 management.endpoints.web.cors.allowed-origins 属性后才会启用。以下配置允许来自 example.com 域的 GET 和 POST 调用:

1
2
management.endpoints.web.cors.allowed-origins=https://example.com
management.endpoints.web.cors.allowed-methods=GET,POST

自定义端点

如果添加带有 @Endpoint 注释的 @Bean,则任何带有 @ReadOperation@WriteOperation@DeleteOperation 注释的方法都会自动通过 JMX 公开,并且在 Web 应用程序中,也可以通过 HTTP 公开。可以使用 Jersey、Spring MVC 或 Spring WebFlux 通过 HTTP 公开端点。如果 Jersey 和 Spring MVC 都可用,则使用 Spring MVC。

以下示例公开了一个返回自定义对象的读取操作:

1
2
3
4
@ReadOperation
public CustomData getData() {
return new CustomData("test", 5);
}

您还可以使用 @JmxEndpoint@WebEndpoint 编写特定技术的端点。这些端点仅限于各自的技术。例如,@WebEndpoint 仅通过 HTTP 而不是通过 JMX 公开。

您可以使用 @EndpointWebExtension@EndpointJmxExtension 编写特定技术的扩展。这些注释让您可以提供特定技术的操作来扩充现有端点。

最后,如果您需要访问 Web 框架的功能,您可以实现 servlet 或 Spring @Controller@RestController 端点,但代价是它们无法通过 JMX 或使用不同的 Web 框架获得。

通过 HTTP 进行监控和管理

自定义管理端点路径

如果是 Web 应用,Spring Boot Actuator 会自动将所有启用的端点通过 HTTP 方式暴露。默认约定是使用前缀为 /actuator 的端点的 id 作为 URL 路径。例如,健康被暴露为 /actuator/health

有时,自定义管理端点的前缀很有用。例如,您的应用程序可能已经将 /actuator 用于其他目的。您可以使用 management.endpoints.web.base-path 属性更改管理端点的前缀,如以下示例所示:

1
management.endpoints.web.base-path=/manage

该示例将端点从 /actuator/{id} 更改为 /manage/{id}(例如,/manage/info)。

自定义管理服务器端口

1
management.server.port=8081

配置 SSL

当配置为使用自定义端口时,还可以使用各种 management.server.ssl.* 属性为管理服务器配置自己的 SSL。例如,这样做可以让管理服务器在主应用程序使用 HTTPS 时通过 HTTP 可用,如以下属性设置所示:

1
2
3
4
5
6
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:store.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=false

或者,主服务器和管理服务器都可以使用 SSL,但使用不同的密钥存储,如下所示:

1
2
3
4
5
6
7
8
server.port=8443
server.ssl.enabled=true
server.ssl.key-store=classpath:main.jks
server.ssl.key-password=secret
management.server.port=8080
management.server.ssl.enabled=true
management.server.ssl.key-store=classpath:management.jks
management.server.ssl.key-password=secret

自定义管理服务器地址

1
2
management.server.port=8081
management.server.address=127.0.0.1

禁用 HTTP 端点

如果您不想通过 HTTP 方式暴露端点,可以将管理端口设置为 -1,如以下示例所示:

1
management.server.port=-1

也可以通过使用 management.endpoints.web.exposure.exclude 属性来实现这一点,如以下示例所示:

1
management.endpoints.web.exposure.exclude=*

通过 JMX 进行监控和管理

Java 管理扩展 (JMX) 提供了一种标准机制来监视和管理应用程序。默认情况下,此功能未启用。您可以通过将 spring.jmx.enabled 配置属性设置为 true 来打开它。 Spring Boot 将最合适的 MBeanServer 暴露为 ID 为 mbeanServer 的 bean。使用 Spring JMX 注释(@ManagedResource@ManagedAttribute@ManagedOperation)注释的任何 bean 都会暴露给它。

如果您的平台提供标准 MBeanServer,则 Spring Boot 会使用该标准并在必要时默认使用 VM MBeanServer。如果一切都失败了,则创建一个新的 MBeanServer

有关更多详细信息,请参阅 JmxAutoConfiguration 类。

默认情况下,Spring Boot 还将管理端点公开为 org.springframework.boot 域下的 JMX MBean。要完全控制 JMX 域中的端点注册,请考虑注册您自己的 EndpointObjectNameFactory 实现。

定制化 MBean Names

MBean 的名称通常由端点的 id 生成。例如,健康端点公开为 org.springframework.boot:type=Endpoint,name=Health

如果您的应用程序包含多个 Spring ApplicationContext,您可能会发现名称冲突。要解决此问题,您可以将 spring.jmx.unique-names 属性设置为 true,以便 MBean 名称始终是唯一的。

如果需要定制,跨域按如下配置:

1
2
spring.jmx.unique-names=true
management.endpoints.jmx.domain=com.example.myapp

禁用 JMX 端点

想禁用 JMX 端点,可以按如下配置:

1
management.endpoints.jmx.exposure.exclude=*

将 Jolokia 用于基于 HTTP 的 JMX

Jolokia 是一个 JMX-HTTP 的桥接工具,它提供了另一种访问 JMX bean 的方法。要使用 Jolokia,需要先添加依赖:

1
2
3
4
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-core</artifactId>
</dependency

然后,您可以通过将 jolokia* 添加到 Management.Endpoints.web.exposure.include 属性来暴露 Jolokia 端点。然后,您可以在管理 HTTP 服务器上使用 /actuator/jolokia 访问它。

日志

Spring Boot Actuator 支持查看和配置应用日志级别。

日志级别的可选值如下:

  • TRACE
  • DEBUG
  • INFO
  • WARN
  • ERROR
  • FATAL
  • OFF
  • null

null 表示没有显式配置。

指标

审计

Spring Boot Actuator 支持简单的审计功能。如果应用中启用了 Spring Security,Spring Boot Actuator 就会发布安全事件(如:“身份验证成功”、“失败”和“访问被拒绝”异常)。

可以通过在应用的配置中提供 AuditEventRepository 类型的 bean 来启用审计。为方便起见,Spring Boot 提供了一个 InMemoryAuditEventRepositoryInMemoryAuditEventRepository 的功能有限,建议仅将其用于开发环境。

如果要自定义安全事件,可以提供 AbstractAuthenticationAuditListenerAbstractAuthorizationAuditListener 实现。

此外,还可以将审计服务用于业务活动。为此,要么将 AuditEventRepository bean 注入组件并直接使用它,要么使用 Spring ApplicationEventPublisher 发布 AuditApplicationEvent(通过实现 ApplicationEventPublisherAware)。

HTTP 追踪

用户可以通过在应用中提供 HttpTraceRepository 类型的 bean 来启用 HTTP 跟踪。Spring Boot 提供了内置的 InMemoryHttpTraceRepository,它可以存储最近 100 次(默认)请求-响应的追踪数据。与其他 HTTP 追踪解决方案相比,InMemoryHttpTraceRepository 比较受限,建议仅用于开发环境。对于生产环境,建议使用 Zipkin 或 Spring Cloud Sleuth。

或者,可以自定义 HttpTraceRepository

处理监控

在 spring-boot 模块中,您可以找到两个类来创建对进程监控有用的文件:

  • ApplicationPidFileWriter 创建一个包含应用程序 PID 的文件(默认情况下,在应用程序目录中,文件名为 application.pid)。
  • WebServerPortFileWriter 创建一个文件(或多个文件),其中包含正在运行的 Web 服务器的端口(默认情况下,在应用程序目录中,文件名为 application.port)。

参考资料

分布式分区

什么是分区

分区通常是这样定义的,即每一条数据(或者每条记录,每行或每个文档)只属于某个特定分区。实际上,每个分区都可以视为一个完整的小型数据库,虽然数据库可能存在一些跨分区的操作。

在不同系统中,分区有着不同的称呼,例如它对应于 MongoDB, Elasticsearch 和 SolrCloud 中的 shard, HBase 的 region, Bigtable 中的 tablet, Cassandra 和 Riak 中的 vnode ,以及 Couch base 中的 vBucket。总体而言,分区是最普遍的术语。

为什么需要分区

数据量如果太大,单台机器进行存储和处理就会成为瓶颈,因此需要引入数据分区机制。

分区的目地是通过多台机器均匀分布数据和查询负载,避免出现热点。这需要选择合适的数据分区方案,在节点添加或删除时重新动态平衡分区。

数据分区与数据复制

分区通常与复制结合使用,即每个分区在多个节点都存有副本。这意味着某条记录属于特定的分区,而同样的内容会保存在不同的节点上以提高系统的容错性。

一个节点上可能存储了多个分区。每个分区都有自己的主副本,例如被分配给某节点,而从副本则分配在其他一些节点。一个节点可能既是某些分区的主副本,同时又是其他分区的从副本。

键-值数据的分区

分区的主要目标是将数据和查询负载均匀分布在所有节点上。如果节点平均分担负载,那么理论上 10 个节点应该能够处理 10 倍的数据量和 10 倍于单个节点的读写吞吐量(忽略复制) 。

而如果分区不均匀,则会出现某些分区节点比其他分区承担更多的数据量或查询负载,称之为倾斜。倾斜会导致分区效率严重下降,在极端情况下,所有的负载可能会集中在一个分区节点上,这就意味着 10 个节点 9 个空闲,系统的瓶颈在最繁忙的那个节点上。这种负载严重不成比例的分区即成为系统热点

避免热点最简单的方法是将记录随机分配给所有节点。这种方法可以比较均匀地分布数据,但是有一个很大的缺点:当视图读取特定的数据时,没有办法知道数据保存在哪个节点上,所以不得不并行查询所有节点。

可以改进上述方法。现在我们假设数据是简单的键-值数据模型,这意味着总是可以通过关键字来访问记录。

基于关键字区间分区

一种分区方式是为每个分区分配一段连续的关键字或者关键宇区间范围(以最小值和最大值来指示)。

关键字的区间段不一定非要均匀分布,这主要是因为数据本身可能就不均匀。

分区边界可以由管理员手动确定,或由数据库自动选择。采用这种分区策略的系统包括 Bigtable、HBase、RethinkDB、2.4 版本前的 MongoDB。

每个分区内可以按照关键字排序保存(参阅第 3 章的“ SSTables 和 LSM Trees ”)。这样可以轻松支持区间查询,即将关键字作为一个拼接起来的索引项从而一次查询得到多个相关记录。

然而,基于关键字的区间分区的缺点是某些访问模式会导致热点。如果关键字是时间戳,则分区对应于一个时间范围,所有的写入操作都集中在同一个分区(即当天的分区),这会导致该分区在写入时负载过高,而其他分区始终处于空闲状态。为了避免上述问题,需要使用时间戳以外的其他内容作为关键字的第一项。

基于关键字晗希值分区

对于上述数据倾斜和热点问题,许多分布式系统采用了基于关键字哈希函数的方式来分区。

一个好的哈希函数可以处理数据倾斜并使其均匀分布。用于数据分区目的的哈希函数不需要再加密方面很强:例如 :Cassandra 和 MongoDB 使用 MD5,Voldemort 使用 Fowler-Noll-Vo。许多编程语言也有内置的简单哈希函数,但是要注意这些内置的哈希函数可能并不适合分区,例如,Java 的 Object.hashCode 和 Object#hash,同一个键在不同的进程中可能返回不同的哈希值。

一且找到合适的关键宇哈希函数,就可以为每个分区分配一个哈希范围(而不是直接作用于关键宇范围),关键字根据其哈希值的范围划分到不同的分区中。

img

这种方法可以很好地将关键字均匀地分配到多个分区中。分区边界可以是均匀间隔,也可以是伪随机选择( 在这种情况下,该技术有时被称为一致性哈希) 。

然而,通过关键宇哈希进行分区,我们丧失了良好的区间查询特性。即使关键字相邻,但经过哈希之后会分散在不同的分区中,区间查询就失去了原有的有序相邻的特性。在 MongoDB 中,如果启用了基于哈希的分片模式,则区间查询会发送到所有的分区上,而 Riak、Couchbase 和 Voldemort 干脆就不支持关键字上的区间查询。

Cassandra 则在两种分区策略之间做了一个折中。Cassandra 中的表可以声明为由多个列组成的复合主键。复合主键只有第一部分可用于哈希分区,而其他列则用作组合索引来对 Cassandra SSTable 中的数据进行排序。因此,它不支持在第一列上进行区间查询,但如果为第一列指定好了固定值,可以对其他列执行高效的区间查询。

组合索引为一对多的关系提供了一个优雅的数据模型。

负载倾斜与热点

基于哈希的分区方法可以减轻热点,但无住做到完全避免。一个极端情况是,所有的读/写操作都是针对同一个关键字,则最终所有请求都将被路由到同一个分区。

一个简单的技术就是在关键字的开头或结尾处添加一个随机数。只需一个两位数的十进制随机数就可以将关键字的写操作分布到 100 个不同的关键字上,从而分配到不同的分区上。但是,随之而来的问题是,之后的任何读取都需要些额外的工作,必须从所有 100 个关键字中读取数据然后进行合井。因此通常只对少量的热点关键字附加随机数才有意义;而对于写入吞吐量低的绝大多数关键字,这些都意味着不必要的开销。此外,还需要额外的元数据来标记哪些关键字进行了特殊处理。

分区与二级索引

二级索引通常不能唯一标识一条记录,而是用来加速特定值的查询。

二级索引是关系数据库的必要特性,在文档数据库中应用也非常普遍。但考虑到其复杂性,许多键-值存储(如 HBase 和 Voldemort)并不支持二级索引;但其他一些如 Riak 则开始增加对二级索引的支持。此外,二级索引技术也是 Solr 和 Elasticsearch 等全文索引服务器存在之根本。

二级索引带来的主要挑战是它们不能规整的地映射到分区中。有两种主要的方法来支持对二级索引进行分区:基于文档的分区和基于词条的分区。

基于文档分区的二级索引

img

在这种索引方法中,每个分区完全独立,各自维护自己的二级索引,且只负责自己分区内的文档而不关心其他分区中数据。每当需要写数据库时,包括添加,删除或更新文档等,只需要处理包含目标文档 ID 的那一个分区。因此文档分区索引也被称为本地索引,而不是全局索引。

这种查询分区数据库的方法有时也称为分散/聚集,显然这种二级索引的查询代价高昂。即使采用了并行查询,也容易导致读延迟显著放大。尽管如此,它还是广泛用于实践: MongoDB 、Riak、Cassandra、Elasticsearch 、SolrCloud 和 VoltDB 都支持基于文档分区二级索引。大多数数据库供应商都建议用户自己来构建合适的分区方案,尽量由单个分区满足二级索引查询,但现实往往难以如愿,尤其是当查询中可能引用多个二级索引时。

基于词条的二级索引分区

另一种方法,可以对所有的数据构建全局索引,而不是每个分区维护自己的本地索引。而且,为避免成为瓶颈,不能将全局索引存储在一个节点上,否则就破坏了设计分区均衡的目标。所以,全局索引也必须进行分区,且可以与数据关键字采用不同的分区策略。

img

词条分区以待查找的关键字本身作为索引。名字词条源于全文索引,term 指的是文档中出现的所有单词的集合。

可以直接通过关键词来全局划分索引,或者对其取哈希值。直接分区的好处是可以支持高效的区间查询;而采用哈希的方式则可以更均句的划分分区。

这种全局的词条分区相比于文档分区索引的主要优点是,它的读取更为高效,即它不需要采用 scatter/gather 对所有的分区都执行一遍查询,客户端只需要想包含词条的那一个分区发出读请求。然而全局索引的不利之处在于, 写入速度较慢且非常复杂,主要因为单个文档的更新时,里面可能会涉及多个二级索引,而二级索引的分区又可能完全不同甚至在不同的节点上,由此势必引人显著的写放大。

理想情况下,索引应该时刻保持最新,即写入的数据要立即反映在最新的索引上。但是,对于词条分区来讲,这需要一个跨多个相关分区的分布式事务支持,写入速度会受到极大的影响,所以现有的数据库都不支持同步更新二级索引。

分区再均衡

集群节点数变化,数据规模增长等情况,都会导致分区的分布不均。要保持分区的均衡,势必要将数据和请求进行迁移,这样一个迁移负载的过程称为分区再均衡

无论对于哪种分区方案, 分区再平衡通常至少要满足:

  • 平衡之后,负载、数据存储、读写请求等应该在集群范围更均匀地分布。
  • 再平衡执行过程中,数据库应该可以继续正常提供读写服务。
  • 避免不必要的负载迁移,以加快动态再平衡,井尽量减少网络和磁盘 I/O 影响。

动态再平衡的策略

为什么不用取模?

最好将哈希值划分为不同的区间范围,然后将每个区间分配给一个分区。

为什么不直接使用 mod?对节点数取模方法的问题是,如果节点数 N 发生了变化,会导致很多关键字需要从现有的节点迁移到另一个节点。

固定数量的分区

创建远超实际节点数的分区数,然后为每个节点分配多个分区。

接下来, 如果集群中添加了一个新节点,该新节点可以从每个现有的节点上匀走几个分区,直到分区再次达到全局平衡。

选中的整个分区会在节点之间迁移,但分区的总数量仍维持不变,也不会改变关键字到分区的映射关系。这里唯一要调整的是分区与节点的对应关系。考虑到节点间通过网络传输数据总是需要些时间,这样调整可以逐步完成,在此期间,旧分区仍然可以接收读写请求。

原则上,也可以将集群中的不同的硬件配置因素考虑进来,即性能更强大的节点将分配更多的分区,从而分担更多的负载。

目前,Riak、Elasticsearch、Couchbase 和 Voldemort 都支持这种动态平衡方法。

使用该策略时,分区的数量往往在数据库创建时就确定好,之后不会改变。原则上也可以拆分和合并分区(稍后介绍),但固定数量的分区使得相关操作非常简单,因此许多采用固定分区策略的数据库决定不支持分区拆分功能。所以,在初始化时,已经充分考虑将来扩容增长的需求(未来可能拥有的最大节点数),设置一个足够大的分区数。而每个分区也有些额外的管理开销,选择过高的数字可能会有副作用。

动态分区

对于采用关键宇区间分区的数据库,如果边界设置有问题,最终可能会出现所有数据都挤在一个分区而其他分区基本为空,那么设定固定边界、固定数量的分区将非常不便:而手动去重新配置分区边界又非常繁琐。

因此, 一些数据库如 HBase 和 RethinkDB 等采用了动态创建分区。当分区的数据增长超过一个可配的参数阔值(HBase 上默认值是 10GB),它就拆分为两个分区,每个承担一半的数据量。相反,如果大量数据被删除,并且分区缩小到某个阈值以下,则将其与相邻分区进行合井。该过程类似于 B 树的分裂操作。

每个分区总是分配给一个节点,而每个节点可以承载多个分区,这点与固定数量的分区一样。当一个大的分区发生分裂之后,可以将其中的一半转移到其他某节点以平衡负载。对于 HBase,分区文件的传输需要借助 HDFS。

动态分区的一个优点是分区数量可以自动适配数据总量。如果只有少量的数据,少量的分区就足够了,这样系统开销很小;如果有大量的数据,每个分区的大小则被限制在一个可配的最大值。

但是,需要注意的是,对于一个空的数据库, 因为没有任何先验知识可以帮助确定分区的边界,所以会从一个分区开始。可能数据集很小,但直到达到第一个分裂点之前,所有的写入操作都必须由单个节点来处理, 而其他节点则处于空闲状态。为了缓解这个问题,HBase 和 MongoDB 允许在一个空的数据库上配置一组初始分区(这被称为预分裂)。对于关键字区间分区,预分裂要求已经知道一些关键字的分布情况。

动态分区不仅适用于关键字区间分区,也适用于基于哈希的分区策略。MongoDB 从版本 2.4 开始,同时支持二者,井且都可以动态分裂分区。

按节点比例分区

采用动态分区策略,拆分和合并操作使每个分区的大小维持在设定的最小值和最大值之间,因此分区的数量与数据集的大小成正比关系。另一方面,对于固定数量的分区方式,其每个分区的大小也与数据集的大小成正比。两种情况,分区的数量都与节点数无关。

Cassandra 和 Ketama 则采用了第三种方式,使分区数与集群节点数成正比关系。换句话说,每个节点具有固定数量的分区。此时, 当节点数不变时,每个分区的大小与数据集大小保持正比的增长关系; 当节点数增加时,分区则会调整变得更小。较大的数据量通常需要大量的节点来存储,因此这种方法也使每个分区大小保持稳定。

当一个新节点加入集群时,它随机选择固定数量的现有分区进行分裂,然后拿走这些分区的一半数据量,将另一半数据留在原节点。随机选择可能会带来不太公平的分区分裂,但是当平均分区数量较大时(Cassandra 默认情况下,每个节点有 256 个分区),新节点最终会从现有节点中拿走相当数量的负载。Cassandra 在 3.0 时推出了改进算洁,可以避免上述不公平的分裂。

随机选择分区边界的前提要求采用基于哈希分区(可以从哈希函数产生的数字范围里设置边界)。这种方法也最符合本章开头所定义一致性哈希。一些新设计的哈希函数也可以以较低的元数据开销达到类似的效果。

自动与手动再平衡操作

动态平衡另一个重要问题我们还没有考虑:它是自动执行还是手动方式执行?

全自动式再平衡会更加方便,它在正常维护之外所增加的操作很少。但是,也有可能出现结果难以预测的情况。再平衡总体讲是个比较昂贵的操作,它需要重新路由请求井将大量数据从一个节点迁移到另一个节点。万一执行过程中间出现异常,会使网络或节点的负载过重,井影响其他请求的性能。

将自动平衡与自动故障检测相结合也可能存在一些风险。例如,假设某个节点负载过重,对请求的响应暂时受到影响,而其他节点可能会得到结论:该节点已经失效;接下来激活自动平衡来转移其负载。客观上这会加重该节点、其他节点以及网络的负荷,可能会使总体情况变得更槽,甚至导致级联式的失效扩散。

请求路由

当数据集分布到多个节点上,需要解决一个问题:当客户端发起请求时,如何知道应该连接哪个节点?如果发生了分区再平衡,分区与节点的对应关系随之还会变化。为了回答该问题,我们需要一段处理逻辑来感知这些变化,并负责处理客户端的连接。

这其实属于一类典型的服务发现问题,服务发现并不限于数据库,任何通过网络访问的系统都有这样的需求,尤其是当服务目标支持高可用时(在多台机器上有冗余配置)。

服务发现有以下处理策略:

  1. 允许客户端链接任意的节点(例如,采用循环式的负载均衡器)。如果某节点恰好拥有所请求的分区,则直接处理该请求:否则,将请求转发到下一个合适的节点,接收答复,并将答复返回给客户端。
  2. 将所有客户端的请求都发送到一个路由层,由后者负责将请求转发到对应的分区节点上。路由层本身不处理任何请求,它仅充一个分区感知的负载均衡器。
  3. 客户端感知分区和节点分配关系。此时,客户端可以直接连接到目标节点,而不需要任何中介。

img

许多分布式数据系统依靠独立的协调服务(如 ZooKeeper )跟踪集群范围内的元数据。每个节点都向 ZooKeeper 中注册自己, ZooKeeper 维护了分区到节点的最终映射关系。其他参与者(如路由层或分区感知的客户端)可以向 ZooKeeper 订阅此信息。一旦分区发生了改变,或者添加、删除节点, ZooKeeper 就会主动通知路由层,这样使路由信息保持最新状态。

例如,HBase、SolrCloud 和 Kafka 也使用 ZooKeeper 来跟踪分区分配情况。MongoDB 有类似的设计,但它依赖于自己的配置服务器和 mongos 守护进程来充当路由层。

Cassandra 和 Riak 则采用了不同的方法,它们在节点之间使用 gossip 协议来同步群集状态的变化。请求可以发送到任何节点,由该节点负责将其转发到目标分区节点。这种方式增加了数据库节点的复杂性,但是避免了对 ZooKeeper 之类的外部协调服务的依赖。

img

并行查询执行

到目前为止,我们只关注了读取或写入单个关键字这样简单的查询(对于文档分区的二级索引,里面要求分散/聚集查询)。这基本上也是大多数 NoSQL 分布式数据存储所支持的访问类型。

然而对于大规模并行处理(massively parallel processing, MPP)这一类主要用于数据分析的关系数据库,在查询类型方面要复杂得多。典型的数据仓库查询包含多个联合、过滤、分组和聚合操作。MPP 查询优化器会将复杂的查询分解成许多执行阶段和分区,以便在集群的不同节点上井行执行。尤其是涉及全表扫描这样的查询操作,可以通过并行执行获益颇多。

小结

数据量如果太大,单台机器进行存储和处理就会成为瓶颈,因此需要引入数据分区机制。分区的目地是通过多台机器均匀分布数据和查询负载,避免出现热点。这需要选择合适的数据分区方案,在节点添加或删除时重新动态平衡分区。

两种主要的分区方法:

  • 基于关键字区间的分区。先对关键字进行排序,每个分区只负责一段包含最小到最大关键字范围的一段关键字。对关键字排序的优点是可以支持高效的区间查询,但是如果应用程序经常访问与排序一致的某段关键字,就会存在热点的风险。采用这种方怯,当分区太大时,通常将其分裂为两个子区间,从而动态地再平衡分区。
  • 哈希分区。将哈希函数作用于每个关键字,每个分区负责一定范围的哈希值。这种方法打破了原关键字的顺序关系,它的区间查询效率比较低,但可以更均匀地分配负载。采用哈希分区时,通常事先创建好足够多(但固定数量)的分区,让每个节点承担多个分区,当添加或删除节点时将某些分区从一个节点迁移到另一个节点,也可以支持动态分区。

混合上述两种基本方住也是可行的,例如使用复合键:键的一部分来标识分区,而另一部分来记录排序后的顺序。

二级索引也需要进行分区,有两种方法:

  • 基于文档来分区二级索引(本地索引)。二级索引存储在与关键字相同的分区中,这意味着写入时我们只需要更新一个分区,但缺点是读取二级索引时需要在所有分区上执行 scatter/gather。
  • 基于词条来分区二级索引(全局索引)。它是基于索引的值而进行的独立分区。二级索引中的条目可能包含来自关键字的多个分区里的记录。在写入时,不得不更新二级索引的多个分区;但读取时,则可以从单个分区直接快速提取数据。

最后,讨论了如何将查询请求路由到正确的分区,包括简单的分区感知负载均衡器,以及复杂的并行查询执行引擎。

参考资料

分布式复制

复制主要指通过网络在多台机器上保存相同数据的副本

复制数据,可能出于各种各样的原因:

  • 提高可用性 - 当部分组件出现位障,系统依然可以继续工作,系统依然可以继续工作。
  • 降低访问延迟 - 使数据在地理位置上更接近用户。
  • 提高读吞吐量 - 扩展至多台机器以同时提供数据访问服务。

复制的模式有以下几种:

  • 主从复制 - 所有的写入操作都发送到主节点,由主节点负责将数据更改事件发送到从节点。每个从节点都可以接收读请求,但内容可能是过期值。
  • 多主复制 - 系统存在多个主节点,每个都可以接收写请求,客户端将写请求发送到其中的一个主节点上,由该主节点负责将数据更改事件同步到其他主节点和自己的从节点。
  • 无主复制 - 系统中不存在主节点,每一个节点都能接受客户端的写请求。接受写请求的副本不会将数据变更同步到其他的副本。此外,读取时从多个节点上并行读取,以此检测和纠正某些过期数据

此外,复制还需要考虑以下问题:

  • 同步还是异步
  • 如何处理失败的副本
  • 如何保证数据一致

主从复制

如何确保所有副本之间的数据是一致的?

对于每一次数据写入,所有副本都需要随之更新;否则,某些副本将出现数据不一致。

最常见的解决方案就是主从复制,其原理如下:

主从复制模式中只有一个主副本(或称为主节点) ,其余称为从副本(或称为从节点)。

  1. 所有的写请求只能发送给主副本,主副本首先将新数据写入本地存储。

  2. 然后,主副本将数据更改作为复制的日志或更新流发送给所有从副本。每个从副本获得更新数据之后将其应用到本地,且严格保持与主副本相同的写入顺序。

  3. 读请求既可以在主副本上,也可以在从副本上执行。

再次强调,只有主副本才可以接受写请求:从客户端的角度来看,从副本都是只读的。如果由于某种原因,例如与主节点之间的网络中断而导致主节点无法连接,主从复制方案就会影响所有的写入操作。

主从复制系统

支持主从复制的系统:

  • 数据库:MySql、PostgreSQL(9.0 版本后)、MongoDB 等
  • 消息队列:Kafka、RabbitMQ 等

同步复制与异步复制

主从复制——同步和异步

通常情况下,复制速度会非常快。但是,系统其实并没有保证一定会在多长时间内完成复制。有些情况下,从节点可能落后主节点几分钟甚至更长时间,例如,由于从节点刚从故障中恢复,或者系统已经接近最大设计上限,或者节点之间的网络出现问题。

  • 同步复制的优点:一旦向用户确认,从节点可以明确保证完成了与主节点的更新同步,数据已经处于最新版本。万一主节点发生故障,总是可以在从节点继续访问最新数据。
  • 同步复制的缺点:如果同步的从节点无法完成确认(例如由于从节点发生崩溃,或者网络故障,或任何其他原因),写入就不能视为成功。主节点会阻塞其后所有的写操作,直到同步副本确认完成。

因此,把所有从节点都配置为同步复制有些不切实际。因为这样的话,任何一个同步节点的中断都会导致整个系统更新停滞不前。实际应用中,推荐的同步模式(也是很多数据库的选择)是:只要有一个从节点或半数以上的从节点同步成功,就视为同步,直接返回结果;剩下的节点都通过异步方式同步。万一同步的从节点变得不可用或性能下降,则将另一个异步的从节点提升为同步模式。这样可以保证至少有两个节点(即主节点和一个同步从节点)拥有最新的数据副本。这种配置有时也称为半同步

主从复制还经常会被配置为全异步模式。

  • 异步复制的优点:不管从节点上数据多么滞后,主节点总是可以继续响应写请求,系统的吞吐性能更好。
  • 异步复制的缺点:如果主节点发生故障且不可恢复,则所有尚未复制到从节点的写请求都会丢失。这意味着即使向客户端确认了写操作,却无法保证数据的持久化。

配置新的从节点

当如果出现以下情况时,如需要增加副本数以提高容错能力,或者替换失败的副本,就需要考虑增加新的从节点。但如何确保新的从节点和主节点保持数据一致呢?

简单地将数据文件从一个节点复制到另一个节点通常是不够的。主要是因为客户端仍在不断向数据库写入新数据,数据始终处于不断变化之中,因此常规的文件拷贝方式将会导致不同节点上呈现出不同时间点的数据。

另一种思路是:考虑锁定数据库(使其不可写)来使磁盘上的文件保持一致,但这会违反高可用的设计目标。在不停机、数据服务不中断的前提下,也有一种可行性复制方案,其主要操作步骤如下:

  1. 在某个时间点对主节点的数据副本产生一个一致性快照,这样避免长时间锁定整个数据库。目前大多数数据库都支持此功能,快照也是系统备份所必需的。而在某些情况下,可能需要第三方工具,如 MySQL 的 innobackupex。
  2. 将此快照拷贝到新的从节点。
  3. 从节点连接到主节点并请求快照点之后所发生的数据更改日志。因为在第一步创建快照时,快照与系统复制日志的某个确定位置相关联,这个位置信息在不同的系统有不同的称呼,如 PostgreSQL 将其称为“ log sequence number” (日志序列号),而 MySQL 将其称为“ binlog coordinates ” 。
  4. 获得日志之后,从节点来应用这些快照点之后所有数据变更,这个过程称之为追赶。接下来,它可以继续处理主节点上新的数据变化。井重复步骤 1 ~步骤 4 。

在不同系统中,建立新的从副本具体操作步骤可能有所不同。

处理节点失效

系统中的任何节点都可能因故障或者计划内的维护(例如重启节点以安装内核安全补丁)而导致中断甚至停机。如果能够在不停机的情况下重启某个节点,这会对运维带来巨大的便利。我们的目标是,尽管个别节点会出现中断,但要保持系统总体的持续运行,并尽可能减小节点中断带来的影响。

如何通过主从复制技术来实现系统高可用呢?

从节点失效:追赶式恢复

从节点的本地磁盘上都保存了副本收到的数据变更日志。如果从节点发生崩溃,然后顺利重启,或者主从节点之间的网络发生暂时中断(闪断),则恢复比较容易,根据副本的复制日志,从节点可以知道在发生故障之前所处理的最后一笔事务,然后连接到主节点,并请求自那笔事务之后中断期间内所有的数据变更。在收到这些数据变更日志之后,将其应用到本地来追赶主节点。之后就和正常情况一样持续接收来自主节点数据流的变化。

主节点失效:节点切换

选择某个从节点将其提升为主节点;客户端也需要更新,这样之后的写请求会发送给新的主节点,然后其他从节点要接受来自新的主节点上的变更数据,这一过程称之为切换。

故障切换可以手动进行,例如通知管理员主节点发生失效,采取必要的步骤来创建新的主节点;或者以自动方式进行。自动切换的步骤通常如下:

  1. 确认主节点失效。有很多种出错可能性,很难准确检测出问题的原因,所以大多数系统都采用了基于超时的机制:节点间频繁地互相发生发送心跳悄息,如果发现某一个节点在一段比较长时间内(例如 30s )没有响应,即认为该节点发生失效。
  2. 选举新的主节点。可以通过选举的方式(超过多数的节点达成共识)来选举新的主节点,或者由之前选定的某控制节点来指定新的主节点。候选节点最好与原主节点的数据差异最小,这样可以最小化数据丢失的风险。让所有节点同意新的主节点是个典型的共识问题。
  3. 重新配置系统使新主节点生效。客户端现在需要将写请求发送给新的主节点。如果原主节点之后重新上线,可能仍然自认为是主节点,而没有意识到其他节点已经达成共识迫使其下台。这时系统要确保原主节点降级为从节点,并认可新的主节点。

上述切换过程依然充满了很多变数:

  • 如果使用了异步复制,且失效之前,新的主节点并未收到原主节点的所有数据;在选举之后,原主节点很快又重新上线并加入到集群,接下来的写操作会发生什么?新的主节点很可能会收到冲突的写请求,这是因为原主节点未意识的角色变化,还会尝试同步其他从节点,但其中的一个现在已经接管成为现任主节点。常见的解决方案是,原主节点上未完成复制的写请求就此丢弃,但这可能会违背数据更新持久化的承诺。
  • 如果在数据库之外有其他系统依赖于数据库的内容并在一起协同使用,丢弃数据的方案就特别危险。例如,在 GitHub 的一个事故中,某个数据并非完全同步的 MySQL 从节点被提升为主副本,数据库使用了自增计数器将主键分配给新创建的行,但是因为新的主节点计数器落后于原主节点( 即二者并非完全同步),它重新使用了已被原主节点分配出去的某些主键,而恰好这些主键已被外部 Redis 所引用,结果出现 MySQL 和 Redis 之间的不一致,最后导致了某些私有数据被错误地泄露给了其他用户。
  • 在某些故障情况下,可能会发生两个节点同时-都自认为是主节点。这种情况被称为脑裂,它非常危险:两个主节点都可能接受写请求,并且没有很好解决冲突的办法,最后数据可能会丢失或者破坏。作为一种安全应急方案,有些系统会采取措施来强制关闭其中一个节点。然而,如果设计或者实现考虑不周,可能会出现两个节点都被关闭的情况。
  • 如何设置合适的超时来检测主节点失效呢? 主节点失效后,超时时间设置得越长也意味着总体恢复时间就越长。但如果超时设置太短,可能会导致很多不必要的切换。例如,突发的负载峰值会导致节点的响应时间变长甚至超肘,或者由于网络故障导致延迟增加。如果系统此时已经处于高负载压力或网络已经出现严重拥塞,不必要的切换操作只会使总体情况变得更糟。

复制日志的实现

基于语句的复制

最简单的情况,主节点记录所执行的每个写请求(操作语句)井将该操作语句作为日志发送给从节点。对于关系数据库,这意味着每个 INSERT 、UPDATE 或 DELETE 语句都会转发给从节点,并且每个从节点都会分析井执行这些 SQU 吾句,如同它们是来自客户端那样。

听起来很合理也不复杂,但这种复制方式有一些不适用的场景:

  • 任何调用非确定性函数的语句,如 NOW() 获取当前时间,或 RAND() 获取一个随机数等,可能会在不同的副本上产生不同的值。
  • 如果语句中使用了自增列,或者依赖于数据库的现有数据(例如,UPDATE ... WHERE <某些条件>),则所有副本必须按照完全相同的顺序执行,否则可能会带来不同的结果。进而,如果有多个同时并发执行的事务时,会有很大的限制。
  • 有副作用的语句(例如,触发器、存储过程、用户定义的函数等),可能会在每个副本上产生不同的副作用。

有可能采取一些特殊措施来解决这些问题,例如,主节点可以在记录操作语句时将非确定性函数替换为执行之后的确定的结果,这样所有节点直接使用相同的结果值。但是,这里面存在太多边界条件需要考虑,因此目前通常首选的是其他复制实现方案。

MySQL 5.1 版本之前采用基于操作语句的复制。现在由于逻辑紧凑,依然在用,但是默认情况下,如果语句中存在一些不确定性操作,则 MySQL 会切换到基于行的复制(稍后讨论)。VoltDB 使用基于语句的复制,它通过事务级别的确定性来保证复制的安全。

基于预写日志(WAL)传输

通常每个写操作都是以追加写的方式写入到日志中:

  • 对于日志结构存储引擎,日志是主要的存储方式。日志段在后台压缩井支持垃圾回收。
  • 对于采用覆写磁盘的 BTree 结构,每次修改会预先写入日志,如系统发生崩溃,通过索引更新的方式迅速恢复到此前一致状态。

不管哪种情况,所有对数据库写入的字节序列都被记入日志。因此可以使用完全相同的日志在另一个节点上构建副本:除了将日志写入磁盘之外,主节点还可以通过网络将其发送给从节点。

PostgreSQL 、Oracle 以及其他系统等支持这种复制方式。其主要缺点是日志描述的数据结果非常底层:一个 WAL 包含了哪些磁盘块的哪些字节发生改变,诸如此类的细节。这使得复制方案和存储引擎紧密耦合。如果数据库的存储格式从一个版本改为另一个版本,那么系统通常无能支持主从节点上运行不同版本的软件。

看起来这似乎只是个有关实现方面的小细节,但可能对运营产生巨大的影响。如果复制协议允许从节点的软件版本比主节点更新,则可以实现数据库软件的不停机升级:首先升级从节点,然后执行主节点切换,使升级后的从节点成为新的主节点。相反,复制协议如果要求版本必须严格一致(例如 WALf 专输),那么就势必以停机为代价。

基于行的逻辑日志复制

如果复制和存储引擎采用不同的日志格式,这样复制与存储的逻辑就可以剥离。这种复制日志称为逻辑日志,以区分物理存储引擎的数据表示。

关系数据库的逻辑日志通常是指一系列记录来描述数据表行级别的写请求:

  • 对于行插入,日志包含所有相关列的新值。
  • 对于行删除,日志里有足够的信息来唯一标识已删除的行,通常是靠主键,但如果表上没有定义主键,就需要记录所有列的旧值。
  • 对于行更新,日志包含足够的信息来唯一标识更新的行,以及所有列的新值(或至少包含所有已更新列的新值)。

如果一条事务涉及多行的修改,则会产生多个这样的日志记录,并在后面跟着一条记录,指出该事务已经提交。MySQL 的二进制日志 binlog (当配置为基于行的复制时)使用该方式。

由于逻辑日志与存储引擎逻辑解耦,因此可以更容易地保持向后兼容,从而使主从节点能够运行不同版本的软件甚至是不同的存储引擎。

对于外部应用程序来说,逻辑日志格式也更容易解析。

基于触发器的复制

在某些情况下,我们可能需要更高的灵活性。例如,只想复制数据的一部分,或者想从一种数据库复制到另一种数据库,或者需要订制、管理冲突解决逻辑,则需要将复制控制交给应用程序层。

有一些工具,可以通过读取数据库日志让应用程序获取数据变更。另一种方法则是借助许多关系数据库都支持的功能:触发器和存储过程。

触发器支持注册自己的应用层代码,使得当数据库系统发生数据更改(写事务)时自动执行上述自定义代码。通过触发器技术,可以将数据更改记录到一个单独的表中,然后外部处理逻辑访问该表,实施必要的自定义应用层逻辑,例如将数据更改复制到另一个系统。Oracle 的 Databus 和 Postgres 的 Bucardo 就是这种技术的典型代表。基于触发器的复制通常比其他复制方式开销更高,也比数据库内置复制更容易出错,或者暴露一些限制。然而,其高度灵活性仍有用武之地。

复制滞后问题

主从复制要求所有写请求都经由主节点,而任何副本只能接受只读查询。对于读操作密集的负载(如 Web ),这是一个不错的选择:创建多个从副本,将读请求分发给这些从副本,从而减轻主节点负载井允许读取请求就近满足。

在这种扩展体系下,只需添加更多的从副本,就可以提高读请求的服务吞吐量。但是,这种方法实际上只能用于异步复制,如果试图同步复制所有的从副本,则单个节点故障或网络中断将使整个系统无法写入。而且节点越多,发生故障的概率越高,所以完全同步的配置现实中反而非常不可靠。

不幸的是,如果一个应用正好从一个异步的从节点读取数据,而该副本落后于主节点,则应用可能会读到过期的信息。这会导致数据库中出现明显的不一致:由于并非所有的写入都反映在从副本上,如果同时对主节点和从节点发起相同的查询,可能会得到不同的结果。经过一段时间之后,从节点最终会赶上并与主节点数据保持一致。这种效应也被称为最终一致性

写后读一致性

许多应用让用户提交一些数据,接下来查看他们自己所提交的内容。例如客户数据库中的记录,亦或者是讨论主题的评论等。提交新数据须发送到主节点,但是当用户读取数据时,数据可能来自从节点。这对于读多写少的场景是个非常合适的方案。

然而对于异步复制存在这样一个问题,如图所示,用户在写入不久即查看数据,则新数据可能尚未到达从节点。对用户来讲,看起来似乎是刚刚提交的数据丢失了,显然用户不会高兴。

img

对于这种情况,我们需要读写一致性。该机制保证如果用户重新加载页面,他们总能看到自己最近提交的更新。但对其他用户则没有任何保证,这些用户的更新可能会在稍后才能刷新看到。如何实现呢?有以下几种可行性方案:

  • 如果用户访问可能会被修改的内容,从主节点读取; 否则,在从节点读取。这背后就要求有一些方法在实际执行查询之前,就已经知道内容是否可能会被修改。例如,社交网络上的用户首页信息通常只能由所有者编辑,而其他人无法编辑。因此,这就形成一个简单的规则:总是从主节点读取用户自己的首页配置文件,而在从节点读取其他用户的配置文件。
  • 如果应用的大部分内容都可能被所有用户修改,那么上述方法将不太有效,它会导致大部分内容都必须经由主节点,这就丧失了读操作的扩展性。此时需要其他方案来判断是否从主节点读取。例如,跟踪最近更新的时间,如果更新后一分钟之内,则总是在主节点读取;井监控从节点的复制滞后程度,避免从那些滞后时间超过一分钟的从节点读取。
  • 客户端还可以记住最近更新时的时间戳,井附带在读请求中,据此信息,系统可以确保对该用户提供读服务时都应该至少包含了该时间戳的更新。如果不够新,要么交由另一个副本来处理,要么等待直到副本接收到了最近的更新。时间戳可以是逻辑时间戳(例如用来指示写入顺序的日志序列号)或实际系统时钟(在这种情况下,时钟同步又称为一个关键点)。
  • 如果副本分布在多数据中心(例如考虑与用户的地理接近,以及高可用性),情况会更复杂些。必须先把请求路由到主节点所在的数据中心(该数据中心可能离用户很远)。

如果同一用户可能会从多个设备访问数据,情况会更加复杂。此时,要提供跨设备的写后读一致性,即如果用户在某设备上输入了一些信息然后在另一台设备商查看,也应该看到刚刚所输入的内容。在这种情况下,还有一些需要考虑的问题:

  • 记住用户上次更新时间戳的方法实现起来会比较困难,因为在一台设备上运行的代码完全无法知道在其他设备上发生了什么。此时,元数据必须做到全局共享。
  • 如果副本分布在多数据中心,无法保证来自不同设备的连接经过路由之后都到达同一个数据中心。例如,用户的台式计算机使用了家庭宽带连接,而移动设备则使用蜂窝数据网络,不同设备的网络连接线路可能完全不同。如果方案要求必须从主节点读取,则首先需要想办毡确保将来自不同设备的请求路由到同一个数据中心。

单调读

假定用户从不同副本进行了多次读取,如图所示,用户刷新一个网页,读请求可能被随机路由到某个从节点。用户 2345 先后在两个从节点上执行了两次完全相同的查询(先是少量滞后的节点,然后是滞后很大的从节点),则很有可能出现以下情况。第一个查询返回了最近用户 1234 所添加的评论,但第二个查询因为滞后的原因,还没有收到更新因而返回结果是空。用户看到了最新内容之后又读到了过期的内容,好像时间被回拨,此时需要单调读一致性。

img

单调读一致性可以确保不会发生这种异常。这是一个比强一致性弱,但比最终一致性强的保证。当读取数据时,单调读保证,如果某个用户依次进行多次读取,则他绝不会看到回攘现象,即在读取较新值之后又发生读旧值的情况。

实现单调读的一种方式是,确保每个用户总是从固定的同一副本执行读取(而不同的用户可以从不同的副本读取)。例如,基于用户 ID 的哈希的方怯而不是随机选择副本。但如果该副本发生失效,则用户的查询必须重新路由到另一个副本。

前缀一致读

前缀一致读:对于一系列按照某个顺序发生的写请求,那么读取这些内容时也会按照当时写入的顺序。

如果数据库总是以相同的顺序写入,则读取总是看到一致的序列,不会发生这种反常。然而,在许多分布式数据库中,不同的分区独立运行,因此不存在全局写入顺序。这就导致当用户从数据库中读数据时,可能会看到数据库的某部分旧值和另一部分新值。

img

一个解决方案是确保任何具有因果顺序关系的写入都交给一个分区来完成,但该方案真实实现效率会大打折扣。现在有一些新的算法来显式地追踪事件因果关系。

复制滞后的解决方案

使用最终一致性系统时,最好事先就思考这样的问题:如果复制延迟增加到几分钟甚至几小时,那么应用层的行为会是什么样子?如果这种情况不可接受,那么在设计系统肘,就要考虑提供一个更强的一致性保证,比如写后读; 如果系统设计时假定是同步复制,但最终它事实上成为了异步复制,就可能会导致灾难性后果。

在应用层可以提供比底层数据库更强有力的保证。例如只在主节点上进行特定类型的读取,而代价则是,应用层代码中处理这些问题通常会非常复杂,且容易出错。

如果应用程序开发人员不必担心这么多底层的复制问题,而是假定数据库在“做正确的事情”,情况就变得很简单。而这也是事务存在的原因,事务是数据库提供更强保证的一种方式。

单节点上支持事务已经非常成熟。然而,在转向分布式数据库(即支持复制和分区)的过程中,有许多系统却选择放弃支持事务,并声称事务在性能与可用性方面代价过高,所以选择了最终一致性。

多主复制

主从复制方法较为常见,但存在一个明显的缺点:系统只有一个主节点,而所有写入都必须经由主节点。如果由于某种原因,例如与主节点之间的网络中断而导致主节点无法连接,主从复制方案就会影响所有的写入操作。

对主从复制模型进行自然的扩展,则可以配置多个主节点,每个主节点都可以接受写操作,后面复制的流程类似:处理写的每个主节点都必须将该数据更改转发到所有其他节点。这就是多主节点( 也称为主-主,或主动/主动)复制。此时,每个主节点还同时扮演其他主节点的从节点。

适用场景

在一个数据中心内部使用多主节点基本没有太大意义,其复杂性已经超过所能带来的好处。

但是,以下场景这种配置则是合理的:

  • 多数据中心
  • 离线客户端操作
  • 协作编辑

多数据中心

为了容忍整个数据中心级别故障或者更接近用户,可以把数据库的副本横跨多个数据中心。而如果使用常规的基于主从的复制模型,主节点势必只能放在其中的某一个数据中心,而所有写请求都必须经过该数据中心。

有了多主节点复制模型,则可以在每个数据中心都配置主节点。在每个数据中心内,采用常规的主从复制方案;而在数据中心之间,由各个数据中心的主节点来负责同其他数据中心的主节点进行数据的交换、更新。

img

部署单主节点的主从复制方案与多主复制方案之间的差异

  • 性能:对于主从复制,每个写请求都必须经由广域网传送至主节点所在的数据中心。这会大大增加写入延迟,井基本偏离了采用多数据中心的初衷(即就近访问)。而在多主节点模型中,每个写操作都可以在本地数据中心快速响应,然后采用异步复制方式将变化同步到其他数据中心。因此,对上层应用有效屏蔽了数据中心之间的网络延迟,使得终端用户所体验到的性能更好。
  • 容忍数据中心失效:对于主从复制,如果主节点所在的数据中心发生故障,必须切换至另一个数据中心,将其中的一个从节点被提升为主节点。在多主节点模型中,每个数据中心则可以独立于其他数据中心继续运行,发生故障的数据中心在恢复之后更新到最新状态。
  • 容忍网络问题:数据中心之间的通信通常经由广域网,它往往不如数据中心内的本地网络可靠。对于主从复制模型,由于写请求是同步操作,对数据中心之间的网络性能和稳定性等更加依赖。多主节点模型则通常采用异步复制,可以更好地容忍此类问题,例如临时网络闪断不会妨碍写请求最终成功。

有些数据库己内嵌支持了多主复制,但有些则借助外部工具来实现,例如 MySQL 的 Tungsten Replicator,PostgreSQL BDR 以及 Oracle GoldenGate。

多主复制的缺点:不同的数据中心可能会同时修改相同的数据,因而必须解决潜在的写冲突。

离线客户端操作

另一种多主复制比较适合的场景是,应用在与网络断开后还需要继续工作。在离线状态下进行的任何更改,会在设备下次上线时,与服务器一级其他设备同步。

这种情况下,每个设备都有一个充当主节点的本地数据库(用来接受写请求),然后在所有设备之间采用异步方式同步这些多主节点上的副本,同步滞后可能是几小时或者数天,具体时间取决于设备何时可以再次联网。

从架构层面来看,上述设置基本上等同于数据中心之间的多主复制,只不过是个极端情况,即一个设备就是数据中心,而且它们之间的网络连接非常不可靠。多个设备同步日历的例子表明,多主节点可以得到想要的结果,但中间过程依然有很多的未知数。

有一些工具可以使多主配置更为容易,如 CouchDB 就是为这种操作模式而设计的。

协作编辑

实时协作编辑应用程序允许多个用户同时编辑文档。例如,Etherpad 和 Google Docs 允许多人同时编辑文本文档或电子表格。

我们通常不会将协作编辑完全等价于数据库复制问题,但二者确实有很多相似之处。当一个用户编辑文档时· ,所做的更改会立即应用到本地副本( Web 浏览器或客户端应用程序),然后异步复制到服务器以及编辑同一文档的其他用户。

如果要确保不会发生编辑冲突,则应用程序必须先将文档锁定,然后才能对其进行编辑。如果另一个用户想要编辑同一个文档,首先必须等到第一个用户提交修改并释放锁。这种协作模式相当于主从复制模型下在主节点上执行事务操作。

为了加快协作编辑的效率,可编辑的粒度需要非常小。例如,单个按键甚至是全程无锁。然而另一方面,也会面临所有多主复制都存在的挑战,即如何解决冲突。

处理写冲突

多主复制的最大问题是可能发生写冲突。

例如,两个用户同时编辑 Wiki 页面,如图所示。用户 1 将页面的标题从 A 更改为 B,与此同时用户 2 将标题从 A 更改为 C。每个用户的更改都成功地提交到本地主节点。但是,当更改被异步复制到对方时,会发现存在冲突。注意:正常情况下的主从复制不会出现这种情况。

img

同步与异步冲突检测

如果是主从复制数据库,第二个写请求要么会被阻塞直到第一个写完成,要么被中止(用户必须重试) 。然而在多主节点的复制模型下,这两个写请求都是成功的,井且只能在稍后的时间点上才能异步检测到冲突,那时再要求用户层来解决冲突为时已晚。

理论上,也可以做到同步冲突检测,即等待写请求完成对所有副本的同步,然后再通知用户写入成功。但是,这样做将会失去多主节点的主要优势:允许每个主节点独立接受写请求。如果确实想要同步方式冲突检测,或许应该考虑采用单主节点的主从复制模型。

避免冲突

处理冲突最理想的策略是避免发生冲突,即如果应用层可以保证对特定记录的写请求总是通过同一个主节点,这样就不会发生写冲突。现实中,由于不少多主节点复制模型所实现的冲突解决方案存在瑕疵,因此,避免冲突反而成为大家普遍推荐的首选方案。

但是,有时可能需要改变事先指定的主节点,例如由于该数据中心发生故障,不得不将流量重新路由到其他数据中心,或者是因为用户已经漫游到另一个位置,因而更靠近新数据中心。此时,冲突避免方式不再有效,必须有措施来处理同时写入冲突的可能性。

收敛于一致状态

对于主从复制模型,数据更新符合顺序性原则,即如果同一个字段有多个更新,则最后一个写操作将决定该字段的最终值。

对于多主节点复制模型,由于不存在这样的写入顺序,所以最终值也会变得不确定。

实现收敛的冲突解决有以下可能的方式:

  • 给每个写入分配唯一的 ID ,例如,一个时间戳,一个足够长的随机数,一个 UUID 或者一个基于键-值的哈希,挑选最高 ID 的写入作为胜利者,并将其他写入丢弃。如果基于时间戳,这种技术被称为最后写入者获胜。虽然这种方法很流行,但是很容易造成数据丢失。
  • 为每个副本分配一个唯一的 ID ,井制定规则,例如序号高的副本写入始终优先于序号低的副本。这种方法也可能会导致数据丢失。
  • 以某种方式将这些值合并在一起。例如,按字母顺序排序,然后拼接在一起。
  • 利用预定义好的格式来记录和保留冲突相关的所有信息,然后依靠应用层的逻辑,事后解决冲突(可能会提示用户) 。

自定义冲突解决逻辑

解决冲突最合适的方式可能还是依靠应用层,所以大多数多主节点复制模型都有工具来让用户编写应用代码来解决冲突。可以在写入时或在读取时执行这些代码逻辑:

  • 在写入时执行:只要数据库系统在复制变更日志时检测到冲突,就会调用应用层的冲突处理程序。
  • 在读取时执行:当检测到冲突时,所有冲突写入值都会暂时保存下来。下一次读取数据时,会将数据的多个版本读返回给应用层。应用层可能会提示用户或自动解决冲突,井将最后的结果返回到数据库。

注意,冲突解决通常用于单个行或文档,而不是整个事务。因此,如果有一个原子事务包含多个不同写请求,每个写请求仍然是分开考虑来解决冲突。

拓扑结构

如果存在两个以上的主节点,则存在多种可能的复制拓扑结构。

img

最常见的拓扑结构是全部-至-全部,每个主节点将其写入同步到其他所有主节点。而其他一些拓扑结构也有普遍使用,例如,默认情况下 MySQL 只支持环形拓扑结构,其中的每个节点接收来自前序节点的写入,并将这些写入(加上自己的写入)转发给后序节点。另一种流行的拓扑是星形结构:一个指定的根节点将写入转发给所有其他节点。星形拓扑还可以推广到树状结构。

在环形和星形拓扑中,写请求需要通过多个节点才能到达所有的副本,即中间节点需要转发从其他节点收到的数据变更。为防止无限循环,每个节点需要赋予一个唯一的标识符,在复制日志中的每个写请求都标记了已通过的节点标识符。如果某个节点收到了包含自身标识符的数据更改,表明该请求已经被处理过,因此会忽略此变更请求,避免重复转发

环形和星形拓扑的问题是,如果某一个节点发生了故障,在修复之前,会影响其他节点之间复制日志的转发。可以采用重新配置拓扑结构的方法暂时排除掉故障节点。在大多数部署中,这种重新配置必须手动完成。而对于链接更密集的拓扑(如全部到全部),消息可以沿着不同的路径传播,避免了单点故障,因而有更好的容错性。

但另一方面,全链接拓扑也存在一些自身的问题。主要是存在某些网络链路比其他链路更快的情况(例如由于不同网络拥塞),从而导致复制日志之间的覆盖。

如下图所示,客户端 A 向主节点 1 的表中首先插入一行,然后客户端 B 在主节点 3 上对行记录进行更新。而在主节点 2 上,由于网络原因可能出现意外的写日志复制顺序,例如它先接收到了主节点 3 的更新日志,之后才接收到主节点 1 的插入日志。

img

这里涉及到一个因果关系问题,类似于在前面“前缀一致读”所看到的:更新操作一定是依赖于先前完成的插入,因此我们要确保所有节点上一定先接收插入日志,然后再处理更新。在每笔写日志里简单地添加时间戳还不够,主要因为无法确保时钟完全同步,因而无法在主节点 2 上正确地排序所收到日志。

为了使得日志消息正确有序,可以使用一种称为版本向量的技术,稍后将讨论这种技术(参见“检测并发写入”)。需要指出,冲突检测技术在许多多主节点复制系统中的实现还不够完善。

无主复制

单主节点和多主节点复制,都是基于这样一种核心思路,即客户端先向某个节点(主节点)发送写请求,然后数据库系统负责将写请求复制到其他副本。由主节点决定写操作的顺序,从节点按照相同的顺序来应用主节点所发送的写日志。

一些数据存储系统则采用了不同的设计思路:选择放弃主节点,允许任何副本直接接受来自客户端的写请求。对于某些无主节点系统实现,客户端直接将其写请求发送到多副本,而在其他一些实现中,由一个协调者节点代表客户端进行写人,但与主节点的数据库不同,协调者井不负责写入顺序的维护。

节点失效时写入数据库

假设一个三副本数据库,其中一个副本当前不可用。在基于主节点复制模型下,如果要继续处理写操作,则需要执行切换操作。

对于无主节点配置,则不存在这样的切换操作。用户将写请求并行发送到三个副本,有两个可用副本接受写请求,而不可用的副本无法处理该写请求。如果假定三个副本中有两个成功确认写操作,用户收到两个确认的回复之后,即可认为写入成功。客户完全可以忽略其中一个副本无法写入的情况。

img

失效的节点之后重新上线,而客户端又开始从中读取内容。由于节点失效期间发生的任何写入在该节点上都尚未同步,因此读取可能会得到过期的数据。

为了解决这个问题,当一个客户端从数据库中读取数据时,它不是向一个副本发送请求,而是并行地发送到多个副本。客户端可能会得到不同节点的不同响应,包括某些节点的新值和某些节点的旧值。可以采用版本号技术确定哪个值更新(参见后面的“检测并发写入”)。

读修复与反熵

复制模型应确保所有数据最终复制到所有的副本。当一个失效的节点重新上线之后,如何赶上中间错过的那些写请求呢?

有以下两种机制:

  • 读修复 - 当客户端井行读取多个副本时,可以检测到过期的返回值。然后将新值写入到过期的副本中。这种方法主要适合那些被频繁读取的场景。
  • 反熵 - 利用后台进程不断查找副本间的数据差异,将任何缺少的数据从一个副本复制到另一个副本。与基于主节点复制的复制日志不同,反熵过程并不保证以特定的顺序复制写入,并且会引入明显的同步滞后。

读写 quorum

我们知道,成功的写操作要求三个副本中至少两个完成,这意味着至多有一个副本可能包含旧值。因此,在读取时需要至少向两个副本发起读请求,通过版本号可以确定一定至少有一个包含新值。如果第三个副本出现停机或响应缓慢,则读取仍可以继续并返回最新值。

把上述道理推广到一般情况,如果有 n 个副本,写入需要 w 个节点确认,读取必须至少查询 r 个节点,则只要 w + r > n,读取的节点中一定会包含最新值。例如在前面的例子中,n = 3,w = 2,r = 2。满足上述这些 r、w 值的读/写操作称之为法定票数读或法定票数写。也可以认为 r 和 w 是用于判定读、写是否有效的最低票数。

参数 n、w 和 r 通常是可配置的,一个常见的选择是设置 n 为某奇数,w = r = (n + 1) / 2(向上舍入)。也可以根据自己的需求灵活调整这些配置。例如,对于读多写少的负载,设置 w = n 和 r = 1 比较合适,这样读取速度更快,但是一个失效的节点就会使得数据库所有写入因无法完成 quorum 而失败。

quorum 一致性的局限性

通常,设定 r 和 w 为简单多数(多于 n / 2)节点,即可确保 w + r > n,且同时容忍多达 n / 2 个节点故障。但是,quorum 不一定非得是多数,读和写的节点集中有一个重叠的节点才是最关键的

也可以将 w 和 r 设置为较小的数字,从而让 w + r <= n。此时,读取和写入操作仍会被发送到 n 个节点,但只需等待更少的节点回应即可返回。

由于 w 和 r 配置的节点数较小,读取请求当中可能恰好没有包含新值的节点,因此最终可能会返回一个过期的旧值。好的一方面是,这种配置可以获得更低的延迟和更高的可用性,例如网络中断,许多副本变得无法访问,相比而言有更高的概率继续处理读取和写入。只有当可用的副本数已经低于 w 或 r 时,数据库才会变得无法读/写,即处于不可用状态。

即使在 w + r > n 的情况下,也可能存在返回旧值的边界条件。这主要取决于具体实现,可能的情况包括:

  • 如果采用了 sloppy quorum(参阅后面的“宽松的 quorum 与数据回传”),写操作的 w 节点和读取的 r 节点可能完全不同,因此无法保证读写请求一定存在重叠的节点。
  • 如果两个写操作同时发生,则无法明确先后顺序。这种情况下,唯一安全的解决方案是合并并发写入(参见前面的“处理写冲突”)。如果根据时间戳挑选胜者,则由于时钟偏差问题,某些写入可能会被错误地抛弃。
  • 如果写操作与读操作同时发生,写操作可能仅在一部分副本上完成。此时,读取时返回旧值还是新值存在不确定性。
  • 如果某些副本上已经写入成功,而其他一些副本发生写入失败(例如磁盘已满),且总的成功副本数少于 w,那些已成功的副本上不会做回滚。这意味着尽管这样的写操作被视为失败,后续的读操作仍可能返回新值。
  • 如果具有新值的节点后来发生失效,但恢复数据来自某个旧值,则总的新值副本数会低于 w,这就打破了之前的判定条件。
  • 即使一切工作正常,也会出现一些边界情况,如一致性与共识中所介绍的“可线性化与 quorum”。

建议最好不要把参数 w 和 r 视为绝对的保证,而是一种灵活可调的读取新值的概率。

这里通常无法得到前面的“复制滞后问题”中所罗列的一致性保证,包括写后读、单调读、前缀一致读等,因此前面讨论种种异常同样会发生在这里。如果确实需要更强的保证,需要考虑事务与共识问题。

宽松的 quorum 与数据回传

quorum 并不总如期待的那样提供高容错能力。一个网络中断可以很容易切断一个客户端到多数数据库节点的连接。尽管这些集群节点是活着的,而且其他客户端也确实可以正常连接,但是对于断掉连接的客户端来讲,情况无疑等价于集群整体失效。这种情况下,很可能无法满足最低的 w 和 r 所要求的节点数,因此导致客户端无法满足 quorum 要求。

在一个大规模集群中(节点数远大于 n 个),客户可能在网络中断期间还能连接到某些数据库节点,但这些节点又不是能够满足数据仲裁的那些节点。此时,我们是否应该接受该写请求,只是将它们暂时写入一些可访问的节点中?(这些节点并不在 n 个节点集合中)。

这种方案称之为宽松的仲裁:写入和读取仍然需要 w 和 r 个成功的响应,但包含了那些并不在先前指定的 n 个节点。一旦网络问题得到解决,临时节点需要把接收到的写入全部发送到原始主节点上。这就是所谓的数据回传。

可以看出,sloppy quorum 对于提高写入可用性特别有用:要有任何 w 个节点可用,数据库就可以接受新的写入。然而这意味着,即使满足 w + r > n,也不能保证在读取某个键时,一定能读到最新值,因为新值可能被临时写入 n 之外的某些节点且尚未回传过来。

检测并发写

无主复制数据库允许多个客户端对相同的主键同时发起写操作,即使采用严格的 quorum 机制也可能会发生写冲突。这与多主复制类似,此外,由于读时修复或者数据回传也会导致并发写冲突。

一个核心问题是,由于网络延迟不稳定或者局部失效,请求在不同的节点上可能会呈现不同的顺序。如图所示,对于包含三个节点的数据系统,客户端 A 和 B 同时向主键 X 发起写请求:

img

  • 节点 1 收到来自客户端 A 的写请求,但由于节点失效,没有收到客户端 B 的写请求。
  • 节点 2 首先收到 A 的写请求,然后是 B 的写请求。
  • 节点 3 首先收到 B 的写请求,然后是 A 的写请求。

如果节点每当收到新的写请求时就简单地覆盖原有的主键,那么这些节点将永久无法达成一致。我们知道副本应该收敛于相同的内容,这样才能达成最终一致。但如何才能做到呢?如果不想丢失数据,必须了解很多关于数据库内部冲突处理的机制。

我们已经在前面的“处理写冲突”简要介绍了一些解决冲突的技巧,现在我们来更详细地探讨这个问题。

最后写入者获胜(丢弃并发写入)

一种实现最终收敛的方法是,每个副本总是保存最新值,允许覆盖并丢弃旧值。那么,假定每个写请求都最终同步到所有副本,只要我们有一个明确的方法来确定哪一个写入是最新的,则副本可以最终收敛到相同的值。

这个想法其实有些争议,关键点在于前面所提到关于如何定义“最新”。不过即使无法确定写请求的“自然顺序”,我们可以强制对其排序。例如,为每个写请求附加一个时间戳,然后选择最新即最大的时间戳,丢弃较早时间戳的写入。这个冲突解决算法被称为最后写入者获胜(last write wins,LWW)。

LWW 可以实现最终收敛的目标,但是以牺牲数据持久性为代价。如果同一个主键有多个并发写,即使这些并发写都向客户端报告成功,但最后只有一个写入值会存活下来,其他的将被系统默默丢弃。在一些场景如缓存系统,覆盖写是可以接受的。如果覆盖、丢失数据不可接受,则 LWW 并不是解决冲突很好的选择。

要确保 LWW 安全无副作用的唯一方法是,只写入一次然后写入值视为不可变,这样就避免了对同一个主键的并发写。例如,Cassandra 的一个推荐使用方法就是采用 UUID 作为主键,这样每个写操作都针对的不同的、系统唯一的主键。

Happens-before 关系和并发

如果 B 知道 A,或者依赖于 A,或者以某种方式在 A 基础上构建,则称操作 A 在操作 B 之前发生。这是定义何为并发的关键。事实上,我们也可以简单地说,如果两个操作都不在另一个之前发生,那么操作是并发的。

因此,对于两个操作 A 和 B,一共存在三种可能性,我们需要的是一个算法来判定两个操作是否并发。如果一个操作发生在另一个操作之前,则后面的操作可以覆盖较早的操作。如果属于并发,就需要解决潜在的冲突问题。

确定前后关系

我们来看一个确定操作并发性的算法,即两个操作究竟属于并发还是一个发生在另一个之前。简单起见,我们先从只有一个副本的数据库开始。

下图的例子是两个客户端同时向购物车添加商品。初始时购物车为空。然后两个客户端向数据库共发出五次写入操作:

img

  1. 客户端 1 首先将牛奶加入购物车。这是第一次写入该主键的值,服务器保存成功然后分配版本 1,服务器将值与版本号一起返回给该客户端 1。
  2. 客户端 2 将鸡蛋加入购物车,此时它并不知道客户端 1 已添加了牛奶,而是认为鸡蛋是购物车中的唯一物品。服务器为此写入并分配版本 2,然后将鸡蛋和牛奶存储为两个单独的值,最后将这两个值与版本号 2 返回给客户端 2。
  3. 客户端 1 也并不意识上述步骤 2,想要将面粉加入购物车,且以为购物车的内容应该是[牛奶,面粉],将此值与版本号 1 一起发送到服务器。服务器可以从版本号中知道[牛奶,面粉]的新值要取代先前值[牛奶],但值[鸡蛋]则是新的并发操作。因此,服务器将版本 3 分配给[牛奶,面粉]并覆盖版本 1 的[牛奶],同时保留版本 2 的值[鸡蛋],将二者同时返回给客户端 1。
  4. 客户端 2 想要加入火腿,也不知道客户端 1 刚刚加了面粉。其在最后一个响应中从服务器收到的两个值是[牛奶]和[蛋],现在合并这些值,并添加火腿形成一个新的值[鸡蛋,牛奶,火腿]。它将该值与前一个版本号 2 一起发送到服务器。服务器检测到版本 2 会覆盖[鸡蛋],但与[牛奶,面粉]是同时发生,所以设置为版本 4 并将所有这些值发送给客户端 2。
  5. 最后,客户端 1 想要加培根。它以前在版本 3 中从服务器接收[牛奶,面粉]和[鸡蛋],所以合并这些值,添加培根,并将最终值[牛奶,面粉,鸡蛋,培根]连同版本号 3 来覆盖[牛奶,面粉],但与[鸡蛋,牛奶,火腿]并发,所以服务器会保留这些并发值。

上面操作之间的数据流可以通过下图展示。箭头表示某个操作发生在另一个操作之前,即后面的操作“知道”或是“依赖”于前面的操作。在这个例子中,因为总有另一个操作同时进行,所以每个客户端都没有时时刻刻和服务器上的数据保持同步。但是,新版本值最终会覆盖旧值,且不会发生已写入值的丢失。

img

服务器判断操作是否并发的依据主要依靠对比版本号,而并不需要解释新旧值本身。算法的工作流程如下:

  • 服务器为每个主键维护一个版本号,每当主键新值写入时递增版本号,并将新版本号与写入的值一起保存。
  • 当客户端读取主键时,服务器将返回所有(未被覆盖的)当前值以及最新的版本号。且要求写之前,客户必须先发送读请求。
  • 客户端写主键,写请求必须包含之前读到的版本号、读到的值和新值合并后的集合。写请求的响应可以像读操作一样,会返回所有当前值,这样就可以像购物车例子那样一步步链接起多个写入的值。
  • 当服务器收到带有特定版本号的写入时,覆盖该版本号或更低版本的所有值(因为知道这些值已经被合并到新传入的值集合中),但必须保存更高版本号的所有值(因为这些值与当前的写操作属于并发)。

合并同时写入的值

一个简单的合并方法是基于版本号或时间戳来选择最后一个值,但这意味着会丢失部分数据。所以,需要在程序中额外做一些工作。在应用代码中合并非常复杂且容易出错,因此可以设计一些专门的数据结构来自动执行合并。例如,Riak 支持成为 CRDT 一系列数据结构,以合理的方式高效自动合并,包括支持删除标记。

版本矢量

使用单个版本号来捕获操作间的依赖关系,当多个副本同时接受写入时,这是不够的。因此我们需要为每个副本和每个主键均定义一个版本号。每个副本在处理写入时增加自己的版本号,并且跟踪从其他副本看到的版本号。通过这些信息来指示要覆盖哪些值,该保留哪些并发值。

所有副本的版本号集合成为版本矢量。

参考资料

DevOps 简介

什么是 DevOps

什么是 DevOps?DevOps 集文化理念、实践和工具于一身,它强调团队授权、跨团队沟通和协作以及技术自动化,其最终目标是优化质量和交付

DevOps 理念,旨在打破开发工程师和运维工程师的壁垒,强调两个团队合而为一,在产品的整个生命周期(从开发、测试、部署再到运维、运营)内相互协作,工程师不再限于单一职能。

DevOps 始于 2007 年左右,当时的开发和运维对传统的软件开发模式提出了担忧:在这种模式下,编写代码的开发人员与负责部署的运维人员分开工作。 DevOps 一词是开发(development)和运维(operations)这两个词的组合,反映了将二者合而为一的过程。

DevOps 如何工作

DevOps 团队包括在整个产品生命周期中协同工作的开发人员和运维人员,以提高软件部署的速度和质量。这是一种新的工作方式,一种文化转变,对团队及其工作的组织具有重要意义。

在 DevOps 模型下,开发和运维团队不再“孤立”。有时,这两个团队甚至会合并为一个团队,工程师在整个应用程序生命周期中工作,需要具备从开发、测试到部署和运维的复合型能力。

DevOps 团队使用工具来自动化和优化流程,这有助于提高可靠性。 DevOps 工具链可帮助团队处理重要的 DevOps 基础知识,包括持续集成、持续交付、自动化和协作。

DevOps 价值观也适用于开发以外的团队。如果 QA、安全团队也和开发、运维团队紧密地结合在一起,贯穿产品的整个生命周期。此时,安全成为了所有 DevOps 团队成员的工作重点,此时可以称为为 “DevSecOps”。

DevOps 的生命周期

由于 DevOps 的连续性,可以使用无限循环来展示 DevOps 生命周期的各个阶段是如何相互关联的。尽管看起来是按顺序流动的,但循环象征着在整个生命周期中始终保持持续迭代。

DevOps 生命周期由六个阶段组成,分别代表开发和运维所需的流程、功能和工具。在每个阶段,团队协作和沟通以保持一致性、速度和质量。

img

图片来自 https://www.tasksgrid.com/devops-guide/

DevOps 的优势

  • 速度:应用 DevOps 可以更频繁地发布可交付成果,并且质量和稳定性也更高。高效的迭代,可以根据客户和市场反馈进行快速响应,以适应市场变化,有效推动业务发展。
  • 促进协作:DevOps 的基础是开发和运维之间的协作文化,两个团队紧密协作,共同承担诸多责任,并将各自的工作流程相互融合。这有助于减少效率低下的工作,同时节约大家的时间。
  • 快速发布:提高发布的频率和速度,以便能够更快速地进行创新并完善产品。您发布新功能和修复错误的速度越快,就越能快速地响应客户需求并建立竞争优势。持续集成和持续交付是自动执行软件发布流程(从构建到部署)的两项实践经验。
  • 可靠性:持续集成和持续部署等实践可检验程序变更后,功能是否正常,是否安全,从而提高软件产品的交付质量。监控和日志记录可以帮助团队实时了解服务当前的运行状态。
  • 规模:大规模运行和管理您的基础设施及开发流程。自动化和一致性可在降低风险的同时,帮助您有效管理复杂或不断变化的系统。例如,基础设施即代码能够帮助您以一种可重复且更有效的方式来管理部署、测试和生产环境。
  • 安全性:通过将自动实施的合规性策略、精细控制和配置管理技术集成到敏捷开发和 DevOps 工作流程中,使得产品内置了安全性。

DevOps 工具

DevOps 各生命周期阶段都有合适的工具可以作为解决方案。它们通过提高协作效率、减少上下文切换、引入自动化以及实现可监控来全方位增强 DevOps 实践。

DevOps 工具链通常遵循两种模式:完整解决方案或开放式工具链。

  • 完整解决方案实现了端到端的交付,流程很完备,但是一般难以兼容、集成第三方工具。
  • 开放式工具链允许使用不同的工具进行定制。

这两种方法各有利弊。

这里列举一些常见的 DevOps 工具:

  • 项目管理Jira
  • 文档管理Confluence
  • 代码管理GitlabGithub
  • CI/CDGitlabJenkins
  • 容器
    • Docker 将应用程序与该程序的依赖,打包在一个文件里面。运行这个文件,就会生成一个虚拟容器。程序在这个虚拟容器里运行,就好像在真实的物理机上运行一样。有了 Docker,就不用担心环境问题。
    • Kubernetes 是谷歌开源的容器集群管理系统 是用于自动部署,扩展和管理 Docker 应用程序的开源系统,简称 K8S。
  • 日志
    • ELK 技术栈,通过数据采集工具 Logstack、Beats 套件、日志存储、解析服务 ElasticSearch、日志可视化工具 Kibnana,形成了一套完整的端到端日志解决方案,深受业界好评。
  • 监控
    • ELK 的技术栈比较成熟,应用范围也比较广,除了可用作监控系统外,还可以用作日志查询和分析。
    • Prometheus 的独特之处在于它采用了拉数据的方式,对业务影响较小,同时也采用了时间序列数据库存储,而且支持独有的 PromQL 查询语言,功能强大而且简洁。
    • Grafana 是流行的监控数据分析和可视化套件。
    • Graphite 是基于时间序列数据库存储的监控系统,并且提供了功能强大的各种聚合函数比如 sum、average、top5 等可用于监控分析,而且对外提供了 API 也可以接入其他图形化监控系统如 Grafana。
  • 链路追踪
    • Zipkin:Zipkin 是 Twitter 开源的调用链分析工具,目前基于 spring-cloud-sleuth 得到了广泛的使用,特点是轻量,使用、部署简单。
    • Pinpoint:是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能强大,接入端无代码侵入。
    • SkyWalking:是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI 功能较强,接入端无代码侵入。目前已加入 Apache 孵化器。
    • CAT:CAT 是美团点评开源的基于编码和配置的调用链分析,应用监控分析,日志采集,监控报警等一系列的监控平台工具。
  • 负载均衡
    • Nginx 可以作为四层或七层负载均衡器。
    • LVS 可以作为四层负载均衡器。其负载均衡的性能要优于 Nginx。
    • HAProxy 可以作为 HTTP 和 TCP 负载均衡器。
    • F5 作为硬件负载均衡
    • A10 作为硬件负载均衡
  • 网关
    • Kong 是一个云原生、快速、可扩展和分布式的微服务抽象层(也称为 API 网关,API 中间件)。
    • Zuul 是 Netflix 开源的一个 API 网关,Zuul 在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。
  • 告警:短信、邮件、企业聊天软件、OA

参考资料

《消息队列高手课》笔记

为什么需要消息队列?

消息队列的应用

  • 异步处理
  • 系统解耦
  • 流量削峰
  • 系统间通信
  • 数据缓冲
  • 数据一致性

该如何选择消息队列?

  • 是否开源:这决定了能否商用,所以最为重要。
  • 社区活跃度越高越好:高社区活跃度,一般保证了低 Bug 率,因为大部分 Bug,已经有人遇到并解决了。
  • 技术生态适配性:客户端对各种编程语言的支持。比如:如果使用 MQ 的都是 Java 应用,那么 ActiveMQ、RabbitMQ、RocketMQ、Kafka 都可以。如果需要支持其他语言,那么 RMQ 比较合适,因为它支持的编程语言比较丰富。如果 MQ 是应用于大数据或流式计算,那么 Kafka 几乎是标配。如果是应用于在线业务系统,那么 Kafka 就不合适了,可以考虑 RabbitMQ、 RocketMQ 很合适。
  • 高可用:应用于线上的准入标准。
  • 高性能:具备足够好的性能,能满足绝大多数场景的性能要求。
  • 可靠传输

主流 MQ

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级 万级 十万级 十万级,略高于 RocketMQ
topic 数量对吞吐量的影响 topic 可以达到几百、几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 毫秒级 微秒级 毫秒级 毫秒级以内
可用性 高:基于主从架构实现高可用 同 ActiveMQ 非常高:分布式架构 非常高:分布式架构。每个数据都有多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到不丢失 同 RocketMQ
应用场景 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 适合在线业务 大数据、实时计算以及日志采集领域,应用最为广泛
流行度 不活跃 社区非常活跃 阿里出品,有非常活跃的中文社区 社区非常活跃
支持编程语言 非常多 Java Scala、Java
学习成本 采用 ErLang 开发,比较小众,不利于扩展和二次开发 采用 Java 开发,且贡献者多为中国人,容易读懂源码 使用 Scala 和 Java 开发,容易读懂源码
  • RabbitMQ
    • 优点
      • 支持的编程语言最多
      • 支持非常灵活的路由配置
    • 缺点
      • 对消息堆积的支持并不好
      • 性能差强人意
  • RocketMQ
    • 优点
      • 有着不错的性能,稳定性和可靠性
      • 支持事务
    • 缺点
      • 国外认同弱于其他流行 MQ
  • Kafka
    • 优点
      • 可靠、稳定、性能高
      • 技术生态最健全,尤其是在大数据和流计算领域
    • 缺点
      • 同步收发响应延时比较高,不太适合在线业务

消息模型:主题和队列有什么区别?

队列模型

队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作。

早期的消息队列,就是按照“队列”的数据结构来设计的。生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。

如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息。此时,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

发布 订阅模型(Publish-Subscribe Pattern)

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

队列模型和发布订阅模型最大的区别就是:一份消息数据能不能被消费多次的问题

RabbitMQ 的消息模型

在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。

img

同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能。

RocketMQ 的消息模型

RocketMQ 使用的消息模型是标准的发布 - 订阅模型。但是,在 RocketMQ 也有队列(Queue)这个概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

Kafka 的消息模型和 RocketMQ 是完全一样的。只是在 Kafka 中,将 Queue 这个概念称为分区(Partition)

如何利用事务消息实现分布式事务?

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

Kafka 的解决方案是:直接抛出异常,让用户自行处理。用户可以在业务代码中反复重试提交,直到提交成功,或者删除之前修改的数据记录进行事务补偿。

img

RocketMQ 的解决方案是:通过事务反查机制来解决事务消息提交失败的问题。如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

img

MQ 事务方案总结

相比本地消息表方案,MQ 事务方案优点是:

  • 消息数据独立存储 ,降低业务系统与消息系统之间的耦合。
  • 吞吐量优于使用本地消息表方案。

缺点是:

  • 一次消息发送需要两次网络请求 (half 消息 + commit/rollback 消息)
  • 业务处理服务需要实现消息状态回查接口

如何确保消息不会丢失?

检测消息丢失方法:

利用消息队列的有序性来验证是否有消息丢失:在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

确保消息不丢失:

  • 生产阶段:捕获消息发送的错误,并针对性进行容错处理。
  • 存储阶段:数据必须设置副本,并且写数据需要保证所有副本都写入成功才视为提交成功。这样可以保证,即使主副本不可用,使用从副本替代,也包含最新数据。
  • 消费阶段:所有数据处理完毕,再手动提交消费偏移量。

如何处理消费过程中的重复消息?

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:- 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once:- 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once - 恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。

常用的设计幂等操作的方法:

  1. 利用数据库的唯一约束实现幂等:INSERT IF NOT EXIST
  2. 为更新的数据设置前置条件:设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。例如:采用乐观锁方式,为数据增加版本号,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
  3. 记录并检查操作:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。——此处涉及分布式 ID 知识点,可以使用类似 GUID、雪花算法 等方式来实现

消息积压了该如何处理?

发送端性能优化

发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。

消费端性能优化

如果消费的速度跟不上生产消息的速度,就会造成消息积压。即供大于求。

一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。

消息积压的处理

需要先分析消息积压的原因:是发送变快了,还是消费变慢了。大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。

  • 如果是因为促销或抢购等原因,导致消息陡增,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。

  • 如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

  • 如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

学习开源代码该如何入手?

(1)先看官方文档,了解关键点:

  • 这个项目是什么
  • 这个项目有什么用
  • 这个项目如何使用
  • 这个项目适用于什么场景
  • 这个项目有哪些优点、缺点

(2)由点及面的阅读源码

不要泛泛而读,容易迷失。最好带着目的性,带着问题去阅读源码,最好是带着问题的答案去读源码。

如何使用异步设计提升系统性能?

异步编程,可以减少或者避免线程等待,从而提高处理速度。但是,其增加了程序复杂度,应酌情使用。

Java 中比较常用的异步框架有 Java8 内置的 CompletableFuture 和 ReactiveX 的 RxJava

如何实现高性能的异步网络传输?

系统一般可以分为 IO 密集型应用和计算密集型应用。大多数业务系统都属于 IO 密集型应用。最常用的 IO 资源有磁盘 IO 和带宽 IO。由于 IO 相较于内存计算,耗时较高,所以往往成为性能优化的关键。

提升 IO 效率的关键在于减少 IO 等待时间,在大量连接请求的时候,如果单线程,显然阻塞时间较长,所以,一般应采用并发 IO 模型。但是,线程数过多时,线程本身造成的 CPU 上下文切换,竞态造成的冲突都会造成额外的开销,导致 CPU 负载升高,从而降低系统整体性能。所以,理想的 IO 模型应该是一个能够复用少量线程的并发 IO 模型。这个模型的当前答案就是 NIO,其最具代表性的框架就是 Netty。其核心原理就是通过多路复用,来提升 IO 效率。

序列化与反序列化:如何通过网络传输结构化的数据?

传输协议:应用程序之间对话的语言

传输协议的目的,在于定义一种信息规则,使得收发双方能够互相交流。传输协议并没有什么必须遵循的规范,能满足需要即可。复杂的协议可以如网络协议报文一样,定义为 TLV 结构。

内存管理:如何避免内存溢出和频繁的垃圾回收?

Kafka 如何实现高性能 IO?

使用批量消息提升服务端处理能力

使用顺序读写提升磁盘 IO 性能

操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写。如果是机械硬盘,这个寻址需要比较长的时间,因为它要移动磁头,这是个机械运动,机械硬盘工作的时候会发出咔咔的声音,就是移动磁头发出的声音。

顺序读写相比随机读写省去了大部分的寻址时间,它只要寻址一次,就可以连续地读写下去,所以说,性能要比随机读写要好很多。

利用 PageCache 加速消息读写

在 Kafka 中,它会利用 PageCache 加速消息读写。

  • PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存。调用系统的 API 读写文件的时候,不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
  • 应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况。一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
  • 用户的应用程序在使用完某块 PageCache 后,操作系统并不会立刻就清除这个 PageCache,而是尽可能地利用空闲的物理内存保存这些 PageCache,除非系统内存不够用,操作系统才会清理掉一部分 PageCache。清理的策略一般是 LRU 或它的变种算法,这个算法我们不展开讲,它保留 PageCache 的逻辑是:优先保留最近一段时间最常使用的那些 PageCache。
  • Kafka 在读写消息文件的时候,充分利用了 PageCache 的特性。一般来说,消息刚刚写入到服务端就会被消费,按照 LRU 的“优先清除最近最少使用的页”这种策略,读取的时候,对于这种刚刚写入的 PageCache,命中的几率会非常高。也就是说,大部分情况下,消费读消息都会命中 PageCache,带来的好处有两个:一个是读取的速度会非常快,另外一个是,给写入消息让出磁盘的 IO 资源,间接也提升了写入的性能。

零拷贝技术

在服务端,处理消费的大致逻辑是这样的:

  • 首先,从文件中找到消息数据,读到内存中;
  • 然后,把消息通过网络发给客户端。

这个过程中,数据实际上做了 2 次或者 3 次复制:

  1. 从文件复制数据到 PageCache 中,如果命中 PageCache,这一步可以省掉;
  2. 从 PageCache 复制到应用程序的内存空间中,也就是我们可以操作的对象所在的内存;
  3. 从应用程序的内存空间复制到 Socket 的缓冲区,这个过程就是我们调用网络应用框架的 API 发送数据的过程。

Kafka 使用零拷贝技术可以把这个复制次数减少一次,上面的 2、3 步骤两次复制合并成一次复制。直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

零拷贝操作,实际上是调用系统 API sendfile 实现的。

缓存策略:如何使用缓存来减少磁盘 IO?

如何正确使用锁保护共享数据,协调异步线程?

如何用硬件同步原语(CAS)替代锁?

数据压缩:时间换空间的游戏

数据压缩不仅能节省存储空间,还可以用于提升网络传输性能。

压缩和解压的操作都是计算密集型的操作,非常耗费 CPU 资源。如果你的应用处理业务逻辑就需要耗费大量的 CPU 资源,就不太适合再进行压缩和解压。数据压缩本质上是用时间换空间。这个买卖是不是划算,需要根据实际情况先衡量一下。

目前常用的压缩算法包括:ZIP,GZIP,SNAPPY,LZ4 等等。在选择压缩算法的时候,需要综合考虑压缩时间和压缩率两个因素,被压缩数据的内容也是影响压缩时间和压缩率的重要因素,必要的时候可以先用业务数据做一个压缩测试,这样有助于选择最合适的压缩算法。一般来说,压缩率越高的算法,压缩耗时也越高。如果是对性能要求高的系统,可以选择压缩速度快的算法,比如 LZ4;如果需要更高的压缩比,可以考虑 GZIP 或者压缩率更高的 XZ 等算法。

另外一个影响压缩率的重要因素是压缩分段的大小,你需要根据业务情况选择一个合适的分段策略,在保证不错的压缩率的前提下,尽量减少解压浪费。

Kafka 在生产者上,对每批消息进行压缩,批消息在服务端不解压,消费者在收到消息之后再进行解压。简单地说,Kafka 的压缩和解压都是在客户端完成的

RocketMQ Producer 源码分析:消息生产的实现过程

Producer 中包含的几个核心的服务都是有状态的,在 Producer 启动时,在 MQClientInstance 这个类中来统一来启动。在发送消息的流程中,RocketMQ 分了三种发送方式:单向、同步和异步,这三种发送方式对应的发送流程基本是相同的,同步和异步发送是由已经封装好的 MQClientAPIImpl 类来分别实现的。

对于我们在分析代码中提到的几个重要的业务逻辑实现类,你最好能记住这几个类和它的功能,包括 :DefaultMQProducerImpl 封装了大部分 Producer 的业务逻辑,MQClientInstance 封装了客户端一些通用的业务逻辑,MQClientAPIImpl 封装了客户端与服务端的 RPC,NettyRemotingClient 实现了底层网络通信。

Kafka Consumer 源码分析:消息消费的实现过程

Kafka 消费模型的几个要点:

  • Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
  • 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
  • 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
  • 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更是,会触发 reblance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
  • Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。

发送请求时,构建 Request 对象,暂存入发送队列,但不立即发送,而是等待合适的时机批量发送。并且,用回调或者 RequestFeuture 方式,预先定义好如何处理响应的逻辑。在收到 Broker 返回的响应之后,也不会立即处理,而是暂存在队列中,择机处理。那这个择机策略就比较复杂了,有可能是需要读取响应的时候,也有可能是缓冲区满了或是时间到了,都有可能触发一次真正的网络请求,也就是在 poll() 方法中发送所有待发送 Request 并处理所有 Response。

Kafka 和 RocketMQ 的消息复制实现的差异点在哪?

如果要确保数据一致性,必须采用“主 - 从”的复制方式。

在“主 - 从”模式下,数据先写入到主节点上,从节点只从主节点上复制数据,如果出现主从数据不一致的情况,必须以主节点上的数据为准。

RocketMQ 如何实现复制

在 RocketMQ 中,复制的基本单位是 Broker,也就是服务端的进程。复制采用的也是主从方式,通常情况下配置成一主一从,也可以支持一主多从。

RocketMQ 提供新、老两种复制方式:传统的主从模式和新的基于 Dledger 的复制方式。传统的主从模式性能更好,但灵活性和可用性稍差,而基于 Dledger 的复制方式,在 Broker 故障的时候可以自动选举出新节点,可用性更好,性能稍差,并且资源利用率更低一些。

RocketMQ 引入 Dledger,通过 Dledger 来完成复制。Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

Kafka 是如何实现复制的

Kafka 中,复制的基本单位是分区。每个分区的几个副本之间,构成一个小的复制集群,Broker 只是这些分区副本的容器,所以 Kafka 的 Broker 是不分主从的。

分区的多个副本中也是采用一主多从的方式。Kafka 在写入消息的时候,采用的也是异步复制的方式。消息在写入到主节点之后,并不会马上返回写入成功,而是等待足够多的节点都复制成功后再返回。Kafka 为这个“足够多”创造了一个专有名词:ISR(In Sync Replicas),翻译过来就是“保持数据同步的副本”。ISR 的数量是可配的,但需要注意的是,这个 ISR 中是包含主节点的。

Kafka 使用 ZooKeeper 来监控每个分区的多个节点,如果发现某个分区的主节点宕机了,Kafka 会利用 ZooKeeper 来选出一个新的主节点,这样解决了可用性的问题。选举的时候,会从所有 ISR 节点中来选新的主节点,这样可以保证数据一致性。

RocketMQ 客户端如何在集群中找到正确的节点?

任何一个弹性分布式集群,都需要一个类似于 NameServer 服务,来帮助访问集群的客户端寻找集群中的节点。

img

在 RocketMQ 中,NameServer 是一个独立的进程,为 Broker、生产者和消费者提供服务。NameServer 最主要的功能就是,为客户端提供寻址服务,协助客户端找到主题对应的 Broker 地址。此外,NameServer 还负责监控每个 Broker 的存活状态。

NameServer 支持只部署一个节点,也支持部署多个节点组成一个集群,这样可以避免单点故障。在集群模式下,NameServer 各节点之间是不需要任何通信的,也不会通过任何方式互相感知,每个节点都可以独立提供全部服务。

每个 Broker 都需要和所有的 NameServer 节点进行通信。当 Broker 保存的 Topic 信息发生变化的时候,它会主动通知所有的 NameServer 更新路由信息,为了保证数据一致性,Broker 还会定时给所有的 NameServer 节点上报路由信息。这个上报路由信息的 RPC 请求,也同时起到 Broker 与 NameServer 之间的心跳作用,NameServer 依靠这个心跳来确定 Broker 的健康状态。

因为每个 NameServer 节点都可以独立提供完整的服务,所以,对于客户端来说,包括生产者和消费者,只需要选择任意一个 NameServer 节点来查询路由信息就可以了。客户端在生产或消费某个主题的消息之前,会先从 NameServer 上查询这个主题的路由信息,然后根据路由信息获取到当前主题和队列对应的 Broker 物理地址,再连接到 Broker 节点上进行生产或消费。

如果 NameServer 检测到与 Broker 的连接中断了,NameServer 会认为这个 Broker 不再能提供服务。NameServer 会立即把这个 Broker 从路由信息中移除掉,避免客户端连接到一个不可用的 Broker 上去。而客户端在与 Broker 通信失败之后,会重新去 NameServer 上拉取路由信息,然后连接到其他 Broker 上继续生产或消费消息,这样就实现了自动切换失效 Broker 的功能。

NameServer 的总体结构

  • NamesrvStartup:程序入口。
  • NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
  • RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息。
  • BrokerHousekeepingService:监控 Broker 连接状态的代理类。
  • DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
  • ClusterTestRequestProcessor:用于测试的请求处理器。

NameServer 的所有核心功能都是在 RouteInfoManager 这个类中实现的。RouteInfoManager 这个类中保存了所有的路由信息,这些路由信息都是保存在内存中,并且没有持久化的。

1
2
3
4
5
6
7
8
9
public class BrokerData implements Comparable<BrokerData> {
// ...
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// ...
}

Kafka 的协调服务 ZooKeeper:实现分布式系统的“瑞士军刀”

ZooKeeper 是一个分布式的协调服务,它的核心服务是一个高可用、高可靠的一致性存储,在此基础上,提供了包括读写元数据、节点监控、选举、节点间通信和分布式锁等很多功能,这些功能可以极大方便我们快速开发一个分布式的集群系统。

ZooKeeper 的使用注意点:

  1. 不要往 ZooKeeper 里面写入大量数据,它不是一个真正意义上的存储系统,只适合存放少量的数据。依据服务器配置的不同,ZooKeeper 在写入超过几百 MB 数据之后,性能和稳定性都会严重下降。
  2. 不要让业务集群的可用性依赖于 ZooKeeper 的可用性,什么意思呢?你的系统可以使用 Zookeeper,但你要留一手,要考虑如果 Zookeeper 集群宕机了,你的业务集群最好还能提供服务。因为 ZooKeeper 的选举过程是比较慢的,而它对网络的抖动又比较敏感,一旦触发选举,这段时间内的 ZooKeeper 是不能提供任何服务的。

Kafka 主要使用 ZooKeeper 来保存它的元数据、监控 Broker 和分区的存活状态,并利用 ZooKeeper 来进行选举。

Kafka 在 ZooKeeper 中保存的元数据,主要就是 Broker 的列表和主题分区信息两棵树。这份元数据同时也被缓存到每一个 Broker 中。客户端并不直接和 ZooKeeper 来通信,而是在需要的时候,通过 RPC 请求去 Broker 上拉取它关心的主题的元数据,然后保存到客户端的元数据缓存中,以便支撑客户端生产和消费

RocketMQ 与 Kafka 中如何实现事务?

Kafka 和 RocketMQ 都是基于两阶段提交来实现的事务,都利用了特殊的主题中的队列和分区来记录事务日志。

不同之处在于对处于事务中的消息的处理方式,RocketMQ 是把这些消息暂存在一个特殊的队列中,待事务提交后再移动到业务队列中;而 Kafka 直接把消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。

RocketMQ 和 Kafka 的事务,它们的适用场景是不一样的,RocketMQ 的事务适用于解决本地事务和发消息的数据一致性问题,而 Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。

MQTT 协议:如何支持海量的在线 IoT 设备?

MQTT 是专门为物联网设备设计的一套标准的通信协议。这套协议在消息模型和功能上与普通的消息队列协议是差不多的,最大的区别在于应用场景不同。在物联网应用场景中,IoT 设备性能差,网络连接不稳定。服务端面临的挑战主要是,需要支撑海量的客户端和主题。

已有的开源的 MQTT 产品,对于协议的支持都不错,在客户端数量小于十万级别的情况下,可以选择。对于海量客户端的场景,服务端必须使用集群来支撑,可以选择收费的云服务和企业版产品。也可以选择自行来构建 MQTT 集群。

自行构建集群,最关键的技术点就是,通过前置的 Proxy 集群来解决海量连接、会话管理和海量主题这三个问题。前置 Proxy 负责在 Broker 和客户端之间转发消息,通过这种方式,将海量客户端连接收敛为少量的 Proxy 与 Broker 之间的连接,解决了海量客户端连接数的问题。维护会话的实现原理,和 Tomcat 维护 HTTP 会话是一样的。对于海量主题,可以在后端部署多组 Broker 小集群,每个小集群分担一部分主题这样的方式来解决。

img

img

Pulsar 的存储计算分离设计:全新的消息队列设计思路

img

Pulsar 和其他消息队列最大的区别是,它采用了存储计算分离的设计。存储消息的职责从 Broker 中分离出来,交给专门的 BookKeeper 存储集群。这样 Broker 就变成了无状态的节点,在集群调度和故障恢复方面更加简单灵活。

无论是 RocketMQ、RabbitMQ 还是 Kafka,消息都是存储在 Broker 的磁盘或者内存中。客户端在访问某个主题分区之前,必须先找到这个分区所在 Broker,然后连接到这个 Broker 上进行生产和消费。在集群模式下,为了避免单点故障导致丢消息,Broker 在保存消息的时候,必须也把消息复制到其他的 Broker 上。当某个 Broker 节点故障的时候,并不是集群中任意一个节点都能替代这个故障的节点,只有那些“和这个故障节点拥有相同数据的节点”才能替代这个故障的节点。原因就是,每一个 Broker 存储的消息数据是不一样的,或者说,每个节点上都存储了状态(数据)。这种节点称为“有状态的节点(Stateful Node)”。

存储计算分离是一种设计思想,它将系统的存储职责和计算职责分离开,存储节点只负责数据存储,而计算节点只负责计算,计算节点是无状态的。无状态的计算节点,具有易于开发、调度灵活的优点,故障转移和恢复也更加简单快速。这种设计的缺点是,系统总体的复杂度更高,性能也更差。不过对于大部分分布式的业务系统来说,由于它不需要自己开发存储系统,采用存储计算分离的设计,既可以充分利用这种设计的优点,整个系统也不会因此变得过于复杂,综合评估优缺点,利大于弊,更加划算。

Flink 分析计算任务之后生成 JobGraph,JobGraph 是一个有向无环图,数据流过这个图中的节点,在每个节点进行计算和变换,最终流出有向无环图就完成了计算。JobGraph 中的每个节点是一个 Task,Task 是可以并行执行的,每个线程就是一个 SubTask。SubTask 被 JobManager 分配给某个 TaskManager,在 TaskManager 进程中的一个线程中执行。

流计算与消息(二):在流计算中使用 Kafka 链接计算任务

端到端 Exactly Once 语义,可以保证在分布式系统中,每条数据不多不少只被处理一次。在流计算中,因为数据重复会导致计算结果错误,所以 Exactly Once 在流计算场景中尤其重要。Kafka 和 Flink 都提供了保证 Exactly Once 的特性,配合使用可以实现端到端的 Exactly Once 语义。

在 Flink 中,如果节点出现故障,可以自动重启计算任务,重新分配计算节点来保证系统的可用性。配合 CheckPoint 机制,可以保证重启后任务的状态恢复到最后一次 CheckPoint,然后从 CheckPoint 中记录的恢复位置继续读取数据进行计算。Flink 通过一个巧妙的 Barrier 使 CheckPoint 中恢复位置和各节点状态完全对应。

Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。在配合 Flink 的时候,每个 Flink 的 CheckPoint 对应一个 Kafka 事务,只要保证 CheckPoint 和 Kafka 事务同步提交就可以实现端到端的 Exactly Once,Flink 通过“二阶段提交”这个分布式事务的经典算法来保证 CheckPoint 和 Kafka 事务状态的一致性。

主流消息队列都是如何存储消息的?

在所有的存储系统中,消息队列的存储可能是最简单的。每个主题包含若干个分区,每个分区其实就是一个 WAL(Write Ahead Log),写入的时候只能尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。

Kafka 存储消息结构

Kafka 的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。

每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。

写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。

可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。

RocketMQ 存储消息结构

RocketMQ 的存储以 Broker 为单位。它的存储也是分为消息文件和索引文件,但是在 RocketMQ 中,每个 Broker 只有一组消息文件,它把在这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在 RocketMQ 中称为 ConsumerQueue。RocketMQ 中的索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。

写入消息的时候,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比 Kafka 的查找是要快的。

Kafka 和 RocketMQ 的存储结构比较

img

对比这两种存储结构,你可以看到它们有很多共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第一条消息的索引序号,索引中记录了消息的位置等等。

在消息文件的存储粒度上,Kafka 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ 以 Broker 为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。

大多数场景下,这两种存储设计的差异其实并不明显,都可以满足需求。但是在某些极限场景下,依然会体现出它们设计的差异。比如,在一个 Broker 上有上千个活动主题的情况下,RocketMQ 的写入性能就会体现出优势。再比如,如果我们的消息都是几个、十几个字节的小消息,但是消息的数量很多,这时候 Kafka 的稀疏索引设计就能节省非常多的存储空间。

参考资料

XXX

简介

什么是 XXX

XXX 有什么用

XXX 原理

参考资料

源码级深度理解 Java SPI

SPI 简介

SPI 全称 Service Provider Interface,是 Java 提供的,旨在由第三方实现或扩展的 API,它是一种用于动态加载服务的机制。Java 中 SPI 机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是 解耦

Java SPI 有四个要素:

  • SPI 接口:为服务提供者实现类约定的的接口或抽象类。
  • SPI 实现类:实际提供服务的实现类。
  • SPI 配置:Java SPI 机制约定的配置文件,提供查找服务实现类的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。
  • **ServiceLoader**:Java SPI 的核心类,用于加载 SPI 实现类。 ServiceLoader 中有各种实用方法来获取特定实现、迭代它们或重新加载服务。

SPI 示例

正所谓,实践出真知,我们不妨通过一个具体的示例来看一下,如何使用 Java SPI。

SPI 接口

首先,需要定义一个 SPI 接口,和普通接口并没有什么差别。

1
2
3
4
5
package io.github.dunwu.javacore.spi;

public interface DataStorage {
String search(String key);
}

SPI 实现类

假设,我们需要在程序中使用两种不同的数据存储——Mysql 和 Redis。因此,我们需要两个不同的实现类去分别完成相应工作。

Mysql 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class MysqlStorage implements DataStorage {
@Override
public String search(String key) {
return "【Mysql】搜索" + key + ",结果:No";
}
}

Redis 查询 MOCK 类

1
2
3
4
5
6
7
8
package io.github.dunwu.javacore.spi;

public class RedisStorage implements DataStorage {
@Override
public String search(String key) {
return "【Redis】搜索" + key + ",结果:Yes";
}
}

到目前为止,定义接口,并实现接口和普通的 Java 接口实现没有任何不同。

SPI 配置

如果想通过 Java SPI 机制来发现服务,就需要在 SPI 配置中约定好发现服务的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称。以本示例代码为例,其文件名应该为 io.github.dunwu.javacore.spi.DataStorage,文件中的内容如下:

1
2
io.github.dunwu.javacore.spi.MysqlStorage
io.github.dunwu.javacore.spi.RedisStorage

ServiceLoader

完成了上面的步骤,就可以通过 ServiceLoader 来加载服务。示例如下:

1
2
3
4
5
6
7
8
9
10
11
import java.util.ServiceLoader;

public class SpiDemo {

public static void main(String[] args) {
ServiceLoader<DataStorage> serviceLoader = ServiceLoader.load(DataStorage.class);
System.out.println("============ Java SPI 测试============");
serviceLoader.forEach(loader -> System.out.println(loader.search("Yes Or No")));
}

}

输出:

1
2
3
============ Java SPI 测试============
【Mysql】搜索Yes Or No,结果:No
【Redis】搜索Yes Or No,结果:Yes

SPI 原理

上文中,我们已经了解 Java SPI 的要素以及使用 Java SPI 的方法。你有没有想过,Java SPI 和普通 Java 接口有何不同,Java SPI 是如何工作的。实际上,Java SPI 机制依赖于 ServiceLoader 类去解析、加载服务。因此,掌握了 ServiceLoader 的工作流程,就掌握了 SPI 的原理。ServiceLoader 的代码本身很精练,接下来,让我们通过走读源码的方式,逐一理解 ServiceLoader 的工作流程。

ServiceLoader 的成员变量

先看一下 ServiceLoader 类的成员变量,大致有个印象,后面的源码中都会使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ServiceLoader<S> implements Iterable<S> {

// SPI 配置文件目录
private static final String PREFIX = "META-INF/services/";

// 将要被加载的 SPI 服务
private final Class<S> service;

// 用于加载 SPI 服务的类加载器
private final ClassLoader loader;

// ServiceLoader 创建时的访问控制上下文
private final AccessControlContext acc;

// SPI 服务缓存,按实例化的顺序排列
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// 懒查询迭代器
private LazyIterator lookupIterator;

// ...
}

ServiceLoader 的工作流程

(1)ServiceLoader.load 静态方法

应用程序加载 Java SPI 服务,都是先调用 ServiceLoader.load 静态方法。ServiceLoader.load 静态方法的作用是:

  1. 指定类加载 ClassLoader 和访问控制上下文;
  2. 然后,重新加载 SPI 服务
    1. 清空缓存中所有已实例化的 SPI 服务
    2. 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器

这里,摘录 ServiceLoader.load 相关源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// service 传入的是期望加载的 SPI 接口类型
// loader 是用于加载 SPI 服务的类加载器
public static <S> ServiceLoader<S> load(Class<S> service,
ClassLoader loader)
{
return new ServiceLoader<>(service, loader);
}

public void reload() {
// 清空缓存中所有已实例化的 SPI 服务
providers.clear();
// 根据 ClassLoader 和 SPI 类型,创建懒加载迭代器
lookupIterator = new LazyIterator(service, loader);
}

// 私有构造方法
// 重新加载 SPI 服务
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
// 指定类加载 ClassLoader 和访问控制上下文
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
// 然后,重新加载 SPI 服务
reload();
}

(2)应用程序通过 ServiceLoaderiterator 方法遍历 SPI 实例

ServiceLoader 的类定义,明确了 ServiceLoader 类实现了 Iterable<T> 接口,所以,它是可以迭代遍历的。实际上,ServiceLoader 类维护了一个缓存 providers( LinkedHashMap 对象),缓存 providers 中保存了已经被成功加载的 SPI 实例,这个 Map 的 key 是 SPI 接口实现类的全限定名,value 是该实现类的一个实例对象。

当应用程序调用 ServiceLoaderiterator 方法时,ServiceLoader 会先判断缓存 providers 中是否有数据:如果有,则直接返回缓存 providers 的迭代器;如果没有,则返回懒加载迭代器的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Iterator<S> iterator() {
return new Iterator<S>() {

// 缓存 SPI providers
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();

// lookupIterator 是 LazyIterator 实例,用于懒加载 SPI 实例
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}

public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

(3)懒加载迭代器的工作流程

上面的源码中提到了,lookupIterator 是 LazyIterator 实例,而 LazyIterator 用于懒加载 SPI 实例。那么, LazyIterator 是如何工作的呢?

这里,摘取 LazyIterator 关键代码

  • hasNextService 方法:
    1. 拼接 META-INF/services/ + SPI 接口全限定名
    2. 通过类加载器,尝试加载资源文件
    3. 解析资源文件中的内容,获取 SPI 接口的实现类的全限定名 nextName
  • nextService 方法:
    1. hasNextService() 方法解析出了 SPI 实现类的的全限定名 nextName,通过反射,获取 SPI 实现类的类定义 Class<?>
    2. 然后,尝试通过 Class<?>newInstance 方法实例化一个 SPI 服务对象。如果成功,则将这个对象加入到缓存 providers 中并返回该对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 1.拼接 META-INF/services/ + SPI 接口全限定名
// 2.通过类加载器,尝试加载资源文件
// 3.解析资源文件中的内容
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}

private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a s");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}

SPI 和类加载器

通过上面两个章节中,走读 ServiceLoader 代码,我们已经大致了解 Java SPI 的工作原理,即通过 ClassLoader 加载 SPI 配置文件,解析 SPI 服务,然后通过反射,实例化 SPI 服务实例。我们不妨思考一下,为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?

学习过 JVM 的读者,想必都了解过类加载器的双亲委派模型(Parents Delegation Model)。双亲委派模型要求除了顶层的 BootstrapClassLoader 外,其余的类加载器都应有自己的父类加载器。这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现。双亲委派继承体系图如下:

img

双亲委派机制约定了:一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

双亲委派的好处:使得 Java 类伴随着它的类加载器,天然具备一种带有优先级的层次关系,从而使得类加载得到统一,不会出现重复加载的问题:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

双亲委派的限制:子类加载器可以使用父类加载器已经加载的类,而父类加载器无法使用子类加载器已经加载的。——这就导致了双亲委派模型并不能解决所有的类加载器问题。Java SPI 就面临着这样的问题:

  • SPI 的接口是 Java 核心库的一部分,是由 BootstrapClassLoader 加载的;
  • 而 SPI 实现的 Java 类一般是由 AppClassLoader 来加载的。BootstrapClassLoader 是无法找到 SPI 的实现类的,因为它只加载 Java 的核心库。它也不能代理给 AppClassLoader,因为它是最顶层的类加载器。这也解释了本节开始的问题——为什么加载 SPI 服务时,需要指定类加载器 ClassLoader 呢?因为如果不指定 ClassLoader,则无法获取 SPI 服务。

如果不做任何的设置,Java 应用的线程的上下文类加载器默认就是 AppClassLoader。在核心类库使用 SPI 接口时,传递的类加载器使用线程上下文类加载器,就可以成功的加载到 SPI 实现的类。线程上下文类加载器在很多 SPI 的实现中都会用到。

通常可以通过 Thread.currentThread().getClassLoader()Thread.currentThread().getContextClassLoader() 获取线程上下文类加载器。

Java SPI 的不足

Java SPI 存在一些不足:

  • 不能按需加载,需要遍历所有的实现,并实例化,然后在循环中才能找到我们需要的实现。如果不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。

  • 获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。

  • 多个并发多线程使用 ServiceLoader 类的实例是不安全的。

SPI 应用场景

SPI 在 Java 开发中应用十分广泛。首先,在 Java 的 java.util.spi package 中就约定了很多 SPI 接口。下面,列举一些 SPI 接口:

除此以外,SPI 还有很多应用,下面列举几个经典案例。

SPI 应用案例之 JDBC DriverManager

作为 Java 工程师,尤其是 CRUD 工程师,相必都非常熟悉 JDBC。众所周知,关系型数据库有很多种,如:Mysql、Oracle、PostgreSQL 等等。JDBC 如何识别各种数据库的驱动呢?

创建数据库连接

我们先回顾一下,JDBC 如何创建数据库连接的呢?

JDBC4.0 之前,连接数据库的时候,通常会用 Class.forName(XXX) 方法来加载数据库相应的驱动,然后再获取数据库连接,继而进行 CRUD 等操作。

1
Class.forName("com.mysql.jdbc.Driver")

而 JDBC4.0 之后,不再需要用 Class.forName(XXX) 方法来加载数据库驱动,直接获取连接就可以了。显然,这种方式很方便,但是如何做到的呢?

  • JDBC 接口:首先,Java 中内置了接口 java.sql.Driver

  • JDBC 接口实现:各个数据库的驱动自行实现 java.sql.Driver 接口,用于管理数据库连接。

    • Mysql:在 mysql 的 Java 驱动包 mysql-connector-java-XXX.jar 中,可以找到 META-INF/services 目录,该目录下会有一个名字为java.sql.Driver 的文件,文件内容是 com.mysql.cj.jdbc.Drivercom.mysql.cj.jdbc.Driver 正是 Mysql 版的 java.sql.Driver 实现。如下图所示:

    • PostgreSQL 实现:在 PostgreSQL 的 Java 驱动包 postgresql-42.0.0.jar 中,也可以找到同样的配置文件,文件内容是 org.postgresql.Driverorg.postgresql.Driver 正是 PostgreSQL 版的 java.sql.Driver 实现。
  • 创建数据库连接

    以 Mysql 为例,创建数据库连接代码如下:

    1
    2
    final String DB_URL = String.format("jdbc:mysql://%s:%s/%s", DB_HOST, DB_PORT, DB_SCHEMA);
    connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);

DriverManager

从前文,我们已经知道 DriverManager 是创建数据库连接的关键。它究竟是如何工作的呢?

可以看到是加载实例化驱动的,接着看 loadInitialDrivers 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private static void loadInitialDrivers() {
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// 通过 classloader 获取所有实现 java.sql.Driver 的驱动类
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// 利用 SPI,记载所有 Driver 服务
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
// 获取迭代器
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try{
// 遍历迭代器
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});

// 打印数据库驱动信息
println("DriverManager.initialize: jdbc.drivers = " + drivers);

if (drivers == null || drivers.equals("")) {
return;
}
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
// 尝试实例化驱动
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}

上面的代码主要步骤是:

  1. 从系统变量中获取驱动的实现类。
  2. 利用 SPI 来获取所有驱动的实现类。
  3. 遍历所有驱动,尝试实例化各个实现类。
  4. 根据第 1 步获取到的驱动列表来实例化具体的实现类。

需要关注的是下面这行代码:

1
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);

这里实际获取的是 java.util.ServiceLoader.LazyIterator 迭代器。调用其 hasNext 方法时,会搜索 classpath 下以及 jar 包中的 META-INF/services 目录,查找 java.sql.Driver 文件,并找到文件中的驱动实现类的全限定名。调用其 next 方法时,会根据驱动类的全限定名去尝试实例化一个驱动类的对象。

SPI 应用案例之 Common-Logging

common-logging(也称 Jakarta Commons Logging,缩写 JCL)是常用的日志门面工具包。

common-logging 的核心类是入口是 LogFactoryLogFatory 是一个抽象类,它负责加载具体的日志实现。

其入口方法是 LogFactory.getLog 方法,源码如下:

1
2
3
4
5
6
7
public static Log getLog(Class clazz) throws LogConfigurationException {
return getFactory().getInstance(clazz);
}

public static Log getLog(String name) throws LogConfigurationException {
return getFactory().getInstance(name);
}

从以上源码可知,getLog 采用了工厂设计模式,是先调用 getFactory 方法获取具体日志库的工厂类,然后根据类名称或类型创建日志实例。

LogFatory.getFactory 方法负责选出匹配的日志工厂,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public static LogFactory getFactory() throws LogConfigurationException {
// 省略...

// 加载 commons-logging.properties 配置文件
Properties props = getConfigurationFile(contextClassLoader, FACTORY_PROPERTIES);

// 省略...

// 决定创建哪个 LogFactory 实例
// (1)尝试读取全局属性 org.apache.commons.logging.LogFactory
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for system property [" + FACTORY_PROPERTY +
"] to define the LogFactory subclass to use...");
}

try {
// 如果指定了 org.apache.commons.logging.LogFactory 属性,尝试实例化具体实现类
String factoryClass = getSystemProperty(FACTORY_PROPERTY, null);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class '" + factoryClass +
"' as specified by system property " + FACTORY_PROPERTY);
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No system property [" + FACTORY_PROPERTY + "] defined.");
}
}
} catch (SecurityException e) {
// 异常处理
} catch (RuntimeException e) {
// 异常处理
}

// (2)利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 实现类
if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Looking for a resource file of name [" + SERVICE_ID +
"] to define the LogFactory subclass to use...");
}
try {
final InputStream is = getResourceAsStream(contextClassLoader, SERVICE_ID);

if( is != null ) {
// This code is needed by EBCDIC and other strange systems.
// It's a fix for bugs reported in xerces
BufferedReader rd;
try {
rd = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
rd = new BufferedReader(new InputStreamReader(is));
}

String factoryClassName = rd.readLine();
rd.close();

if (factoryClassName != null && ! "".equals(factoryClassName)) {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Creating an instance of LogFactory class " +
factoryClassName +
" as specified by file '" + SERVICE_ID +
"' which was present in the path of the context classloader.");
}
factory = newFactory(factoryClassName, baseClassLoader, contextClassLoader );
}
} else {
// is == null
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No resource file with name '" + SERVICE_ID + "' found.");
}
}
} catch (Exception ex) {
// note: if the specified LogFactory class wasn't compatible with LogFactory
// for some reason, a ClassCastException will be caught here, and attempts will
// continue to find a compatible class.
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] A security exception occurred while trying to create an" +
" instance of the custom factory class" +
": [" + trim(ex.getMessage()) +
"]. Trying alternative implementations...");
}
// ignore
}
}

// (3)尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性

if (factory == null) {
if (props != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Looking in properties file for entry with key '" + FACTORY_PROPERTY +
"' to define the LogFactory subclass to use...");
}
String factoryClass = props.getProperty(FACTORY_PROPERTY);
if (factoryClass != null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Properties file specifies LogFactory subclass '" + factoryClass + "'");
}
factory = newFactory(factoryClass, baseClassLoader, contextClassLoader);

// TODO: think about whether we need to handle exceptions from newFactory
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] Properties file has no entry specifying LogFactory subclass.");
}
}
} else {
if (isDiagnosticsEnabled()) {
logDiagnostic("[LOOKUP] No properties file available to determine" + " LogFactory subclass from..");
}
}
}

// (4)以上情况都不满足,实例化默认实现类 org.apache.commons.logging.impl.LogFactoryImpl

if (factory == null) {
if (isDiagnosticsEnabled()) {
logDiagnostic(
"[LOOKUP] Loading the default LogFactory implementation '" + FACTORY_DEFAULT +
"' via the same classloader that loaded this LogFactory" +
" class (ie not looking in the context classloader).");
}

factory = newFactory(FACTORY_DEFAULT, thisClassLoader, contextClassLoader);
}

if (factory != null) {
/**
* Always cache using context class loader.
*/
cacheFactory(contextClassLoader, factory);

if (props != null) {
Enumeration names = props.propertyNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
String value = props.getProperty(name);
factory.setAttribute(name, value);
}
}
}

return factory;
}

从 getFactory 方法的源码可以看出,其核心逻辑分为 4 步:

  1. 首先,尝试查找全局属性 org.apache.commons.logging.LogFactory,如果指定了具体类,尝试创建实例。
  2. 利用 Java SPI 机制,尝试在 classpatch 的 META-INF/services 目录下寻找 org.apache.commons.logging.LogFactory 的实现类。
  3. 尝试从 classpath 目录下的 commons-logging.properties 文件中查找 org.apache.commons.logging.LogFactory 属性,如果指定了具体类,尝试创建实例。
  4. 以上情况如果都不满足,则实例化默认实现类,即 org.apache.commons.logging.impl.LogFactoryImpl

SPI 应用案例之 Spring Boot

Spring Boot 是基于 Spring 构建的框架,其设计目的在于简化 Spring 应用的配置、运行。在 Spring Boot 中,大量运用了自动装配来尽可能减少配置。

下面是一个 Spring Boot 入口示例,可以看到,代码非常简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

@GetMapping("/hello")
public String hello(@RequestParam(value = "name", defaultValue = "World") String name) {
return String.format("Hello %s!", name);
}
}

那么,Spring Boot 是如何做到寥寥几行代码,就可以运行一个 Spring Boot 应用的呢。我们不妨带着疑问,从源码入手,一步步探究其原理。

@SpringBootApplication 注解

首先,Spring Boot 应用的启动类上都会标记一个 @SpringBootApplication 注解。@SpringBootApplication 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}
)
public @interface SpringBootApplication {
// 略
}

除了 @Target@Retention@Documented@Inherited 这几个元注解, @SpringBootApplication 注解的定义中还标记了 @SpringBootConfiguration@EnableAutoConfiguration@ComponentScan 三个注解。

@SpringBootConfiguration 注解

@SpringBootConfiguration 注解的定义来看,@SpringBootConfiguration 注解本质上就是一个 @Configuration 注解,这意味着被@SpringBootConfiguration 注解修饰的类会被 Spring Boot 识别为一个配置类。

1
2
3
4
5
6
7
8
9
10
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
public @interface SpringBootConfiguration {
@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

@EnableAutoConfiguration 注解

@EnableAutoConfiguration 注解定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

Class<?>[] exclude() default {};

String[] excludeName() default {};
}

@EnableAutoConfiguration 注解包含了 @AutoConfigurationPackage@Import({AutoConfigurationImportSelector.class}) 两个注解。

@AutoConfigurationPackage 注解

@AutoConfigurationPackage 会将被修饰的类作为主配置类,该类所在的 package 会被视为根路径,Spring Boot 默认会自动扫描根路径下的所有 Spring Bean(被 @Component 以及继承 @Component 的各个注解所修饰的类)。——这就是为什么 Spring Boot 的启动类一般要置于根路径的原因。这个功能等同于在 Spring xml 配置中通过 context:component-scan 来指定扫描路径。@Import 注解的作用是向 Spring 容器中直接注入指定组件。@AutoConfigurationPackage 注解中注明了 @Import({Registrar.class})Registrar 类用于保存 Spring Boot 的入口类、根路径等信息。

SpringFactoriesLoader.loadFactoryNames 方法

@Import(AutoConfigurationImportSelector.class) 表示直接注入 AutoConfigurationImportSelectorAutoConfigurationImportSelector 有一个核心方法 getCandidateConfigurations 用于获取候选配置。该方法调用了 SpringFactoriesLoader.loadFactoryNames 方法,这个方法即为 Spring Boot SPI 的关键,它负责加载所有 META-INF/spring.factories 文件,加载的过程由 SpringFactoriesLoader 负责。

Spring Boot 的 META-INF/spring.factories 文件本质上就是一个 properties 文件,数据内容就是一个个键值对。

SpringFactoriesLoader.loadFactoryNames 方法的关键源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// spring.factories 文件的格式为:key=value1,value2,value3
// 遍历所有 META-INF/spring.factories 文件
// 解析文件,获得 key=factoryClass 的类名称
public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
String factoryTypeName = factoryType.getName();
return loadSpringFactories(classLoader).getOrDefault(factoryTypeName, Collections.emptyList());
}

private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) {
// 尝试获取缓存,如果缓存中有数据,直接返回
MultiValueMap<String, String> result = cache.get(classLoader);
if (result != null) {
return result;
}

try {
// 获取资源文件路径
Enumeration<URL> urls = (classLoader != null ?
classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
result = new LinkedMultiValueMap<>();
// 遍历所有路径
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
UrlResource resource = new UrlResource(url);
// 解析文件,得到对应的一组 Properties
Properties properties = PropertiesLoaderUtils.loadProperties(resource);
// 遍历解析出的 properties,组装数据
for (Map.Entry<?, ?> entry : properties.entrySet()) {
String factoryTypeName = ((String) entry.getKey()).trim();
for (String factoryImplementationName : StringUtils.commaDelimitedListToStringArray((String) entry.getValue())) {
result.add(factoryTypeName, factoryImplementationName.trim());
}
}
}
cache.put(classLoader, result);
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load factories from location [" +
FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}

归纳上面的方法,主要作了这些事:

加载所有 META-INF/spring.factories 文件,加载过程有 SpringFactoriesLoader 负责。

  • 在 CLASSPATH 中搜寻所有 META-INF/spring.factories 配置文件
  • 然后,解析 spring.factories 文件,获取指定自动装配类的全限定名

Spring Boot 的 AutoConfiguration

Spring Boot 有各种 starter 包,可以根据实际项目需要,按需取材。在项目开发中,只要将 starter 包引入,我们就可以用很少的配置,甚至什么都不配置,即可获取相关的能力。通过前面的 Spring Boot SPI 流程,只完成了自动装配工作的一半,剩下的工作如何处理呢 ?

以 spring-boot-starter-web 的 jar 包为例,查看其 maven pom,可以看到,它依赖于 spring-boot-starter,所有 Spring Boot 官方 starter 包都会依赖于这个 jar 包。而 spring-boot-starter 又依赖于 spring-boot-autoconfigure,Spring Boot 的自动装配秘密,就在于这个 jar 包。

从 spring-boot-autoconfigure 包的结构来看,它有一个 META-INF/spring.factories ,显然利用了 Spring Boot SPI,来自动装配其中的配置类。

下图是 spring-boot-autoconfigure 的 META-INF/spring.factories 文件的部分内容,可以看到其中注册了一长串会被自动加载的 AutoConfiguration 类。

RedisAutoConfiguration 为例,这个配置类中,会根据 @ConditionalXXX 中的条件去决定是否实例化对应的 Bean,实例化 Bean 所依赖的重要参数则通过 RedisProperties 传入。

RedisProperties 中维护了 Redis 连接所需要的关键属性,只要在 yml 或 properties 配置文件中,指定 spring.redis 开头的属性,都会被自动装载到 RedisProperties 实例中。

通过以上分析,已经一步步解读出 Spring Boot 自动装载的原理。

SPI 应用案例之 Dubbo

Dubbo 并未使用 Java SPI,而是自己封装了一套新的 SPI 机制。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,配置内容形式如下:

1
2
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样可以按需加载指定的实现类。Dubbo SPI 除了支持按需加载接口实现类,还增加了 IOC 和 AOP 等特性。

ExtensionLoader 入口

Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,可以加载指定的实现类。

ExtensionLoadergetExtension 方法是其入口方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
// 获取默认的拓展实现类
return getDefaultExtension();
}
// Holder,顾名思义,用于持有目标对象
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
// 双重检查
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
// 设置实例到 holder 中
holder.set(instance);
}
}
}
return (T) instance;
}

可以看出,这个方法的作用就是:首先检查缓存,缓存未命中则调用 createExtension 方法创建拓展对象。那么,createExtension 是如何创建拓展对象的呢,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension(
(T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}

createExtension 方法的的工作步骤可以归纳为:

  1. 通过 getExtensionClasses 获取所有的拓展类
  2. 通过反射创建拓展对象
  3. 向拓展对象中注入依赖
  4. 将拓展对象包裹在相应的 Wrapper 对象中

以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是 Dubbo IOC 与 AOP 的具体实现。

获取所有的拓展类

Dubbo 在通过名称获取拓展类之前,首先需要根据配置文件解析出拓展项名称到拓展类的映射关系表(Map<名称, 拓展类>),之后再根据拓展项名称从映射关系表中取出相应的拓展类即可。相关过程的代码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取已加载的拓展类
Map<String, Class<?>> classes = cachedClasses.get();
// 双重检查
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载拓展类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

这里也是先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过 loadExtensionClasses 加载拓展类。下面分析 loadExtensionClasses 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Map<String, Class<?>> loadExtensionClasses() {
// 获取 SPI 注解,这里的 type 变量是在调用 getExtensionLoader 方法时传入的
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
// 对 SPI 注解内容进行切分
String[] names = NAME_SEPARATOR.split(value);
// 检测 SPI 注解内容是否合法,不合法则抛出异常
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension...");
}

// 设置默认名称,参考 getDefaultExtension 方法
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
// 加载指定文件夹下的配置文件
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}

loadExtensionClasses 方法总共做了两件事情,一是对 SPI 注解进行解析,二是调用 loadDirectory 方法加载指定文件夹配置文件。SPI 注解解析过程比较简单,无需多说。下面我们来看一下 loadDirectory 做了哪些事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
// fileName = 文件夹路径 + type 全限定名
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
// 根据文件名加载所有的同名文件
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 加载资源
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("...");
}
}

loadDirectory 方法先通过 classLoader 获取所有资源链接,然后再通过 loadResource 方法加载资源。我们继续跟下去,看一下 loadResource 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void loadResource(Map<String, Class<?>> extensionClasses,
ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
// 按行读取配置内容
while ((line = reader.readLine()) != null) {
// 定位 # 字符
final int ci = line.indexOf('#');
if (ci >= 0) {
// 截取 # 之前的字符串,# 之后的内容为注释,需要忽略
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 以等于号 = 为界,截取键与值
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 加载类,并通过 loadClass 方法对类进行缓存
loadClass(extensionClasses, resourceURL,
Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class...");
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class...");
}
}

loadResource 方法用于读取和解析配置文件,并通过反射加载类,最后调用 loadClass 方法进行其他操作。loadClass 方法用于主要用于操作缓存,该方法的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL,
Class<?> clazz, String name) throws NoSuchMethodException {

if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("...");
}

// 检测目标类上是否有 Adaptive 注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
// 设置 cachedAdaptiveClass缓存
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("...");
}

// 检测 clazz 是否是 Wrapper 类型
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
// 存储 clazz 到 cachedWrapperClasses 缓存中
wrappers.add(clazz);

// 程序进入此分支,表明 clazz 是一个普通的拓展类
} else {
// 检测 clazz 是否有默认的构造方法,如果没有,则抛出异常
clazz.getConstructor();
if (name == null || name.length() == 0) {
// 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为 name
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("...");
}
}
// 切分 name
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
// 如果类上有 Activate 注解,则使用 names 数组的第一个元素作为键,
// 存储 name 到 Activate 注解对象的映射关系
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
// 存储 Class 到名称的映射关系
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
// 存储名称到 Class 的映射关系
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("...");
}
}
}
}
}

如上,loadClass 方法操作了不同的缓存,比如 cachedAdaptiveClasscachedWrapperClassescachedNames 等等。除此之外,该方法没有其他什么逻辑了。

参考资料

服务容错

故障分类

从故障影响范围维度来看,分布式系统的故障可以分为三类:

  • 集群故障:根据业务量大小而定,集群规模从几台到甚至上万台都有可能。一旦某些代码出现 bug,可能整个集群都会发生故障,不能提供对外提供服务。
  • 机房故障:现在大多数互联网公司为了保证业务的高可用性,往往业务部署在不止一个机房。然而现实中,某机房的光缆因为道路施工被挖断,导致整个机房脱网的事情,也是时有发生的。并且这种事情往往容易上热搜。
  • 单机故障:集群中的个别机器出现故障,这种情况往往对全局没有太大影响,但会导致调用到故障机器上的请求都失败,影响整个系统的成功率。

集群故障应对处理

一般而言,集群故障的产生原因不外乎有两种:

  • 一种是代码 bug 所导致,比如说某一段 Java 代码不断地分配大对象,但没有及时回收导致 JVM OOM 退出;
  • 另一种是流量突刺,短时间突然而至的大量请求超出了系统的承载能力。

应付集群故障的思路,主要是采用流量控制,主要手段有:限流降级熔断

机房故障应对处理

单机房脱网的事情,多半是因为一些不可抗因素,如:机房失火、光缆被挖断等等。有句老话叫:不要把鸡蛋都放在一个篮子里。同理,不要把业务都部署在一个机房中,一旦机房出事,那就彻底完蛋了。所以,很多互联网公司的业务都采用多机房部署。如果要追求更高的可靠性,可以采用同城多活部署,甚至异地多活部署。

多机房部署的好处显而易见,即提高了系统的可用性,但是这种架构引入了其他的问题:如何保证不同机房数据的一致性,如何切换多机房的流量,等等。

针对流量切换问题,一般有两种手段:

  • 基于 DNS 解析的流量切换,一般是通过把请求访问域名解析的 VIP 从一个 IDC 切换到另外一个 IDC。
  • 基于 RPC 分组的流量切换,对于一个服务来说,如果是部署在多个 IDC 的话,一般每个 IDC 就是一个分组。假如一个 IDC 出现故障,那么原先路由到这个分组的流量,就可以通过向配置中心下发命令,把原先路由到这个分组的流量全部切换到别的分组,这样的话就可以切换故障 IDC 的流量了。

单机故障应对处理

对于大规模集群来说,出现单机故障的概率是很高的。当出现单机故障时,需要有一定的自动化处理手段。

处理单机故障一个有效的办法就是自动重启。具体来讲,你可以设置一个阈值,比如以某个接口的平均耗时为准,当监控单机上某个接口的平均耗时超过一定阈值时,就认为这台机器有问题,这个时候就需要把有问题的机器从线上集群中摘除掉,然后在重启服务后,重新加入到集群中。

容错策略

服务调用并不总是一定成功的,前面我讲过,可能因为服务提供者节点自身宕机、进程异常退出或者服务消费者与提供者之间的网络出现故障等原因。对于服务调用失败的情况,需要有手段自动恢复,来保证调用成功。

常用的手段主要有以下几种:

  • 故障转移(FailOver):当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。这种策略要求服务调用的操作必须是幂等的,也就是说无论调用多少次,只要是同一个调用,返回的结果都是相同的,一般适合服务调用是读请求的场景。
  • 快速失败(FailFast):只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。实际在业务执行时,一般非核心业务的调用,会采用快速失败策略,调用失败后一般就记录下失败日志就返回了。
  • 安全失败(Failsafe):出现异常时,直接忽略。通常用于写入审计日志等操作。
  • 静默失败(Failsilent):如果大量的请求需要等到超时(或者长时间处理后)才宣告失败,很容易由于某个远程服务的请求堆积而消耗大量的线程、内存、网络等资源,进而影响到整个系统的稳定。面对这种情况,一种合理的失败策略是当请求失败后,就默认服务提供者一定时间内无法再对外提供服务,不再向它分配请求流量,将错误隔离开来,避免对系统其他部分产生影响,此即为沉默失败策略。
  • 故障恢复(FailBack):就是服务消费者调用失败或者超时后,不再重试,而是根据失败的详细信息,来决定后续的执行策略。比如对于非幂等的调用场景,如果调用失败后,不能简单地重试,而是应该查询服务端的状态,看调用到底是否实际生效,如果已经生效了就不能再重试了;如果没有生效可以再发起一次调用。通常用于消息通知操作。
  • 并行调用(Forking):并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
  • 广播调用(Broadcast):广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

容错设计模式

断路器模式

断路器的基本思路是很简单的,就是通过代理(断路器对象)来一对一地(一个远程服务对应一个断路器对象)接管服务调用者的远程请求。断路器会持续监控并统计服务返回的成功、失败、超时、拒绝等各种结果,当出现故障(失败、超时、拒绝)的次数达到断路器的阈值时,它状态就自动变为“OPEN”,后续此断路器代理的远程访问都将直接返回调用失败,而不会发出真正的远程服务请求。通过断路器对远程服务的熔断,避免因持续的失败或拒绝而消耗资源,因持续的超时而堆积请求,最终的目的就是避免雪崩效应的出现。由此可见,断路器本质是一种快速失败策略的实现方式。

舱壁隔离模式

舱壁隔离模式是常用的实现服务隔离的设计模式,舱壁这个词是来自造船业的舶来品,它原本的意思是设计舰船时,要在每个区域设计独立的水密舱室,一旦某个舱室进水,也只是影响这个舱室中的货物,而不至于让整艘舰艇沉没。这种思想就很符合容错策略中失败静默策略。

Hystrix 就采用舱壁隔离模式来实现线程隔离。

重试模式

故障转移和故障恢复策略都需要对服务进行重复调用,差别是这些重复调用有可能是同步的,也可能是后台异步进行;有可能会重复调用同一个服务,也可能会调用到服务的其他副本。无论具体是通过怎样的方式调用、调用的服务实例是否相同,都可以归结为重试设计模式的应用范畴。重试模式适合解决系统中的瞬时故障,简单的说就是有可能自己恢复(Resilient,称为自愈,也叫做回弹性)的临时性失灵,网络抖动、服务的临时过载(典型的如返回了 503 Bad Gateway 错误)这些都属于瞬时故障。

参考资料