Dunwu Blog

大道至简,知易行难

Tomcat 快速入门

🎁 版本说明

当前最新版本:Tomcat 8.5.24

环境要求:JDK7+

1. Tomcat 简介

1.1. Tomcat 是什么

Tomcat 是由 Apache 开发的一个 Servlet 容器,实现了对 Servlet 和 JSP 的支持,并提供了作为 Web 服务器的一些特有功能,如 Tomcat 管理和控制平台、安全域管理和 Tomcat 阀等。

由于 Tomcat 本身也内含了一个 HTTP 服务器,它也可以被视作一个单独的 Web 服务器。但是,不能将 Tomcat 和 Apache HTTP 服务器混淆,Apache HTTP 服务器是一个用 C 语言实现的 HTTP Web 服务器;这两个 HTTP web server 不是捆绑在一起的。Tomcat 包含了一个配置管理工具,也可以通过编辑 XML 格式的配置文件来进行配置。

1.2. Tomcat 重要目录

  • /bin - Tomcat 脚本存放目录(如启动、关闭脚本)。 *.sh 文件用于 Unix 系统; *.bat 文件用于 Windows 系统。
  • /conf - Tomcat 配置文件目录。
  • /logs - Tomcat 默认日志目录。
  • /webapps - webapp 运行的目录。

1.3. web 工程发布目录结构

一般 web 项目路径结构

1
2
3
4
5
6
7
8
9
10
11
12
|-- webapp                         # 站点根目录
|-- META-INF # META-INF 目录
| `-- MANIFEST.MF # 配置清单文件
|-- WEB-INF # WEB-INF 目录
| |-- classes # class文件目录
| | |-- *.class # 程序需要的 class 文件
| | `-- *.xml # 程序需要的 xml 文件
| |-- lib # 库文件夹
| | `-- *.jar # 程序需要的 jar 包
| `-- web.xml # Web应用程序的部署描述文件
|-- <userdir> # 自定义的目录
|-- <userfiles> # 自定义的资源文件
  • webapp:工程发布文件夹。其实每个 war 包都可以视为 webapp 的压缩包。

  • META-INF:META-INF 目录用于存放工程自身相关的一些信息,元文件信息,通常由开发工具,环境自动生成。

  • WEB-INF:Java web 应用的安全目录。所谓安全就是客户端无法访问,只有服务端可以访问的目录。

  • /WEB-INF/classes:存放程序所需要的所有 Java class 文件。

  • /WEB-INF/lib:存放程序所需要的所有 jar 文件。

  • /WEB-INF/web.xml:web 应用的部署配置文件。它是工程中最重要的配置文件,它描述了 servlet 和组成应用的其它组件,以及应用初始化参数、安全管理约束等。

1.4. Tomcat 功能

Tomcat 支持的 I/O 模型有:

  • NIO:非阻塞 I/O,采用 Java NIO 类库实现。
  • NIO2:异步 I/O,采用 JDK 7 最新的 NIO2 类库实现。
  • APR:采用 Apache 可移植运行库实现,是 C/C++ 编写的本地库。

Tomcat 支持的应用层协议有:

  • HTTP/1.1:这是大部分 Web 应用采用的访问协议。
  • AJP:用于和 Web 服务器集成(如 Apache)。
  • HTTP/2:HTTP 2.0 大幅度的提升了 Web 性能。

2. Tomcat 入门

2.1. 安装

前提条件

Tomcat 8.5 要求 JDK 版本为 1.7 以上。

进入 Tomcat 官方下载地址 选择合适版本下载,并解压到本地。

Windows

添加环境变量 CATALINA_HOME ,值为 Tomcat 的安装路径。

进入安装目录下的 bin 目录,运行 startup.bat 文件,启动 Tomcat

Linux / Unix

下面的示例以 8.5.24 版本为例,包含了下载、解压、启动操作。

1
2
3
4
5
# 下载解压到本地
wget http://mirrors.hust.edu.cn/apache/tomcat/tomcat-8/v8.5.24/bin/apache-tomcat-8.5.24.tar.gz
tar -zxf apache-tomcat-8.5.24.tar.gz
# 启动 Tomcat
./apache-tomcat-8.5.24/bin/startup.sh

启动后,访问 http://localhost:8080 ,可以看到 Tomcat 安装成功的测试页面。

img

2.2. 配置

本节将列举一些重要、常见的配置项。详细的 Tomcat8 配置可以参考 Tomcat 8 配置官方参考文档

2.2.1. Server

Server 元素表示整个 Catalina servlet 容器。

因此,它必须是 conf/server.xml 配置文件中的根元素。它的属性代表了整个 servlet 容器的特性。

属性表

属性 描述 备注
className 这个类必须实现 org.apache.catalina.Server 接口。 默认 org.apache.catalina.core.StandardServer
address 服务器等待关机命令的 TCP / IP 地址。如果没有指定地址,则使用 localhost。
port 服务器等待关机命令的 TCP / IP 端口号。设置为-1 以禁用关闭端口。
shutdown 必须通过 TCP / IP 连接接收到指定端口号的命令字符串,以关闭 Tomcat。

2.2.2. Service

Service 元素表示一个或多个连接器组件的组合,这些组件共享一个用于处理传入请求的引擎组件。Server 中可以有多个 Service。

属性表

属性 描述 备注
className 这个类必须实现org.apache.catalina.Service接口。 默认 org.apache.catalina.core.StandardService
name 此服务的显示名称,如果您使用标准 Catalina 组件,将包含在日志消息中。与特定服务器关联的每个服务的名称必须是唯一的。

实例 - conf/server.xml 配置文件示例

1
2
3
4
5
6
<?xml version="1.0" encoding="UTF-8"?>
<Server port="8080" shutdown="SHUTDOWN">
<Service name="xxx">
...
</Service>
</Server>

2.2.3. Executor

Executor 表示可以在 Tomcat 中的组件之间共享的线程池。

属性表

属性 描述 备注
className 这个类必须实现org.apache.catalina.Executor接口。 默认 org.apache.catalina.core.StandardThreadExecutor
name 线程池名称。 要求唯一, 供 Connector 元素的 executor 属性使用
namePrefix 线程名称前缀。
maxThreads 最大活跃线程数。 默认 200
minSpareThreads 最小活跃线程数。 默认 25
maxIdleTime 当前活跃线程大于 minSpareThreads 时,空闲线程关闭的等待最大时间。 默认 60000ms
maxQueueSize 线程池满情况下的请求排队大小。 默认 Integer.MAX_VALUE
1
2
3
<Service name="xxx">
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="300" minSpareThreads="25"/>
</Service>

2.2.4. Connector

Connector 代表连接组件。Tomcat 支持三种协议:HTTP/1.1、HTTP/2.0、AJP。

属性表

属性 说明 备注
asyncTimeout Servlet3.0 规范中的异步请求超时 默认 30s
port 请求连接的 TCP Port 设置为 0,则会随机选取一个未占用的端口号
protocol 协议. 一般情况下设置为 HTTP/1.1,这种情况下连接模型会在 NIO 和 APR/native 中自动根据配置选择
URIEncoding 对 URI 的编码方式. 如果设置系统变量 org.apache.catalina.STRICT_SERVLET_COMPLIANCE 为 true,使用 ISO-8859-1 编码;如果未设置此系统变量且未设置此属性, 使用 UTF-8 编码
useBodyEncodingForURI 是否采用指定的 contentType 而不是 URIEncoding 来编码 URI 中的请求参数

以下属性在标准的 Connector(NIO, NIO2 和 APR/native)中有效:

属性 说明 备注
acceptCount 当最大请求连接 maxConnections 满时的最大排队大小 默认 100,注意此属性和 Executor 中属性 maxQueueSize 的区别.这个指的是请求连接满时的堆栈大小,Executor 的 maxQueueSize 指的是处理线程满时的堆栈大小
connectionTimeout 请求连接超时 默认 60000ms
executor 指定配置的线程池名称
keepAliveTimeout keeAlive 超时时间 默认值为 connectionTimeout 配置值.-1 表示不超时
maxConnections 最大连接数 连接满时后续连接放入最大为 acceptCount 的队列中. 对 NIO 和 NIO2 连接,默认值为 10000;对 APR/native,默认值为 8192
maxThreads 如果指定了 Executor, 此属性忽略;否则为 Connector 创建的内部线程池最大值 默认 200
minSpareThreads 如果指定了 Executor, 此属性忽略;否则为 Connector 创建线程池的最小活跃线程数 默认 10
processorCache 协议处理器缓存 Processor 对象的大小 -1 表示不限制.当不使用 servlet3.0 的异步处理情况下: 如果配置 Executor,配置为 Executor 的 maxThreads;否则配置为 Connnector 的 maxThreads. 如果使用 Serlvet3.0 异步处理, 取 maxThreads 和 maxConnections 的最大值

2.2.5. Context

Context 元素表示一个 Web 应用程序,它在特定的虚拟主机中运行。每个 Web 应用程序都基于 Web 应用程序存档(WAR)文件,或者包含相应的解包内容的相应目录,如 Servlet 规范中所述。

属性表

属性 说明 备注
altDDName web.xml 部署描述符路径 默认 /WEB-INF/web.xml
docBase Context 的 Root 路径 和 Host 的 appBase 相结合, 可确定 web 应用的实际目录
failCtxIfServletStartFails 同 Host 中的 failCtxIfServletStartFails, 只对当前 Context 有效 默认为 false
logEffectiveWebXml 是否日志打印 web.xml 内容(web.xml 由默认的 web.xml 和应用中的 web.xml 组成) 默认为 false
path web 应用的 context path 如果为根路径,则配置为空字符串(“”), 不能不配置
privileged 是否使用 Tomcat 提供的 manager servlet
reloadable /WEB-INF/classes/ 和/WEB-INF/lib/ 目录中 class 文件发生变化是否自动重新加载 默认为 false
swallowOutput true 情况下, System.out 和 System.err 输出将被定向到 web 应用日志中 默认为 false

2.2.6. Engine

Engine 元素表示与特定的 Catalina 服务相关联的整个请求处理机器。它接收并处理来自一个或多个连接器的所有请求,并将完成的响应返回给连接器,以便最终传输回客户端。

属性表

属性 描述 备注
defaultHost 默认主机名,用于标识将处理指向此服务器上主机名称但未在此配置文件中配置的请求的主机。 这个名字必须匹配其中一个嵌套的主机元素的名字属性。
name 此引擎的逻辑名称,用于日志和错误消息。 在同一服务器中使用多个服务元素时,每个引擎必须分配一个唯一的名称。

2.2.7. Host

Host 元素表示一个虚拟主机,它是一个服务器的网络名称(如“www.mycompany.com”)与运行 Tomcat 的特定服务器的关联。

属性表

属性 说明 备注
name 名称 用于日志输出
appBase 虚拟主机对应的应用基础路径 可以是个绝对路径, 或${CATALINA_BASE}相对路径
xmlBase 虚拟主机 XML 基础路径,里面应该有 Context xml 配置文件 可以是个绝对路径, 或${CATALINA_BASE}相对路径
createDirs 当 appBase 和 xmlBase 不存在时,是否创建目录 默认为 true
autoDeploy 是否周期性的检查 appBase 和 xmlBase 并 deploy web 应用和 context 描述符 默认为 true
deployIgnore 忽略 deploy 的正则
deployOnStartup Tomcat 启动时是否自动 deploy 默认为 true
failCtxIfServletStartFails 配置为 true 情况下,任何 load-on-startup >=0 的 servlet 启动失败,则其对应的 Contxt 也启动失败 默认为 false

2.2.8. Cluster

由于在实际开发中,我从未用过 Tomcat 集群配置,所以没研究。

2.3. 启动

2.3.1. 部署方式

这种方式要求本地必须安装 Tomcat 。

将打包好的 war 包放在 Tomcat 安装目录下的 webapps 目录下,然后在 bin 目录下执行 startup.batstartup.sh ,Tomcat 会自动解压 webapps 目录下的 war 包。

成功后,可以访问 http://localhost:8080/xxx (xxx 是 war 包文件名)。

注意

以上步骤是最简单的示例。步骤中的 war 包解压路径、启动端口以及一些更多的功能都可以修改配置文件来定制 (主要是 server.xmlcontext.xml 文件)。

2.3.2. 嵌入式

2.3.2.1. API 方式

在 pom.xml 中添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.24</version>
</dependency>

添加 SimpleEmbedTomcatServer.java 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Optional;
import org.apache.catalina.startup.Tomcat;

public class SimpleTomcatServer {
private static final int PORT = 8080;
private static final String CONTEXT_PATH = "/javatool-server";

public static void main(String[] args) throws Exception {
// 设定 profile
Optional<String> profile = Optional.ofNullable(System.getProperty("spring.profiles.active"));
System.setProperty("spring.profiles.active", profile.orElse("develop"));

Tomcat tomcat = new Tomcat();
tomcat.setPort(PORT);
tomcat.getHost().setAppBase(".");
tomcat.addWebapp(CONTEXT_PATH, getAbsolutePath() + "src/main/webapp");
tomcat.start();
tomcat.getServer().await();
}

private static String getAbsolutePath() {
String path = null;
String folderPath = SimpleEmbedTomcatServer.class.getProtectionDomain().getCodeSource().getLocation().getPath()
.substring(1);
if (folderPath.indexOf("target") > 0) {
path = folderPath.substring(0, folderPath.indexOf("target"));
}
return path;
}
}

成功后,可以访问 http://localhost:8080/javatool-server

说明

本示例是使用 org.apache.tomcat.embed 启动嵌入式 Tomcat 的最简示例。

这个示例中使用的是 Tomcat 默认的配置,但通常,我们需要对 Tomcat 配置进行一些定制和调优。为了加载配置文件,启动类就要稍微再复杂一些。这里不想再贴代码,有兴趣的同学可以参考:

示例项目

2.3.2.2. 使用 maven 插件启动(不推荐)

不推荐理由:这种方式启动 maven 虽然最简单,但是有一个很大的问题是,真的很久很久没发布新版本了(最新版本发布时间:2013-11-11)。且貌似只能找到 Tomcat6 、Tomcat7 插件。

使用方法

在 pom.xml 中引入插件

1
2
3
4
5
6
7
8
9
10
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<port>8080</port>
<path>/${project.artifactId}</path>
<uriEncoding>UTF-8</uriEncoding>
</configuration>
</plugin>

运行 mvn tomcat7:run 命令,启动 Tomcat。

成功后,可以访问 http://localhost:8080/xxx (xxx 是 ${project.artifactId} 指定的项目名)。

2.3.3. IDE 插件

常见 Java IDE 一般都有对 Tomcat 的支持。

以 Intellij IDEA 为例,提供了 Tomcat and TomEE Integration 插件(一般默认会安装)。

使用步骤

  • 点击 Run/Debug Configurations > New Tomcat Server > local ,打开 Tomcat 配置页面。
  • 点击 Confiure… 按钮,设置 Tomcat 安装路径。
  • 点击 Deployment 标签页,设置要启动的应用。
  • 设置启动应用的端口、JVM 参数、启动浏览器等。
  • 成功后,可以访问 http://localhost:8080/(当然,你也可以在 url 中设置上下文名称)。

img

说明

个人认为这个插件不如 Eclipse 的 Tomcat 插件好用,Eclipse 的 Tomcat 插件支持对 Tomcat xml 配置文件进行配置。而这里,你只能自己去 Tomcat 安装路径下修改配置文件。

文中的嵌入式启动示例可以参考我的示例项目

3. Tomcat 架构

img

Tomcat 要实现 2 个核心功能:

  • 处理 Socket 连接,负责网络字节流与 Request 和 Response 对象的转化。
  • 加载和管理 Servlet,以及处理具体的 Request 请求

为此,Tomcat 设计了两个核心组件:

  • 连接器(Connector):负责和外部通信
  • 容器(Container):负责内部处理

3.1. Service

Tomcat 支持的 I/O 模型有:

  • NIO:非阻塞 I/O,采用 Java NIO 类库实现。
  • NIO2:异步 I/O,采用 JDK 7 最新的 NIO2 类库实现。
  • APR:采用 Apache 可移植运行库实现,是 C/C++ 编写的本地库。

Tomcat 支持的应用层协议有:

  • HTTP/1.1:这是大部分 Web 应用采用的访问协议。
  • AJP:用于和 Web 服务器集成(如 Apache)。
  • HTTP/2:HTTP 2.0 大幅度的提升了 Web 性能。

Tomcat 支持多种 I/O 模型和应用层协议。为了实现这点,一个容器可能对接多个连接器。但是,单独的连接器或容器都不能对外提供服务,需要把它们组装起来才能工作,组装后这个整体叫作 Service 组件。Tomcat 内可能有多个 Service,通过在 Tomcat 中配置多个 Service,可以实现通过不同的端口号来访问同一台机器上部署的不同应用。

img

一个 Tomcat 实例有一个或多个 Service;一个 Service 有多个 Connector 和 Container。Connector 和 Container 之间通过标准的 ServletRequest 和 ServletResponse 通信。

3.2. 连接器

连接器对 Servlet 容器屏蔽了协议及 I/O 模型等的区别,无论是 HTTP 还是 AJP,在容器中获取到的都是一个标准的 ServletRequest 对象。

连接器的主要功能是:

  • 网络通信
  • 应用层协议解析
  • Tomcat Request/Response 与 ServletRequest/ServletResponse 的转化

Tomcat 设计了 3 个组件来实现这 3 个功能,分别是 EndPointProcessor 和 **Adapter**。

img

组件间通过抽象接口交互。这样做还有一个好处是封装变化。这是面向对象设计的精髓,将系统中经常变化的部分和稳定的部分隔离,有助于增加复用性,并降低系统耦合度。网络通信的 I/O 模型是变化的,可能是非阻塞 I/O、异步 I/O 或者 APR。应用层协议也是变化的,可能是 HTTP、HTTPS、AJP。浏览器端发送的请求信息也是变化的。但是整体的处理逻辑是不变的,EndPoint 负责提供字节流给 Processor,Processor 负责提供 Tomcat Request 对象给 Adapter,Adapter 负责提供 ServletRequest 对象给容器。

如果要支持新的 I/O 方案、新的应用层协议,只需要实现相关的具体子类,上层通用的处理逻辑是不变的。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。

img

3.2.1. ProtocolHandler 组件

连接器用 ProtocolHandler 接口来封装通信协议和 I/O 模型的差异。ProtocolHandler 内部又分为 EndPoint 和 Processor 模块,EndPoint 负责底层 Socket 通信,Proccesor 负责应用层协议解析。

3.2.1.1. EndPoint

EndPoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 EndPoint 是用来实现 TCP/IP 协议的。

EndPoint 是一个接口,对应的抽象实现类是 AbstractEndpoint,而 AbstractEndpoint 的具体子类,比如在 NioEndpoint 和 Nio2Endpoint 中,有两个重要的子组件:Acceptor 和 SocketProcessor。

其中 Acceptor 用于监听 Socket 连接请求。SocketProcessor 用于处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理。为了提高处理能力,SocketProcessor 被提交到线程池来执行。而这个线程池叫作执行器(Executor)。

3.2.1.2. Processor

如果说 EndPoint 是用来实现 TCP/IP 协议的,那么 Processor 用来实现 HTTP 协议,Processor 接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象,并通过 Adapter 将其提交到容器处理,Processor 是对应用层协议的抽象。

Processor 是一个接口,定义了请求的处理等方法。它的抽象实现类 AbstractProcessor 对一些协议共有的属性进行封装,没有对方法进行实现。具体的实现有 AJPProcessor、HTTP11Processor 等,这些具体实现类实现了特定协议的解析方法和请求处理方式。

img

从图中我们看到,EndPoint 接收到 Socket 连接后,生成一个 SocketProcessor 任务提交到线程池去处理,SocketProcessor 的 Run 方法会调用 Processor 组件去解析应用层协议,Processor 通过解析生成 Request 对象后,会调用 Adapter 的 Service 方法。

3.2.2. Adapter

连接器通过适配器 Adapter 调用容器

由于协议不同,客户端发过来的请求信息也不尽相同,Tomcat 定义了自己的 Request 类来适配这些请求信息。

ProtocolHandler 接口负责解析请求并生成 Tomcat Request 类。但是这个 Request 对象不是标准的 ServletRequest,也就意味着,不能用 Tomcat Request 作为参数来调用容器。Tomcat 的解决方案是引入 CoyoteAdapter,这是适配器模式的经典运用,连接器调用 CoyoteAdapter 的 Sevice 方法,传入的是 Tomcat Request 对象,CoyoteAdapter 负责将 Tomcat Request 转成 ServletRequest,再调用容器的 Service 方法。

3.3. 容器

Tomcat 设计了 4 种容器,分别是 Engine、Host、Context 和 Wrapper。

  • Engine - Servlet 的顶层容器,包含一 个或多个 Host 子容器;
  • Host - 虚拟主机,负责 web 应用的部署和 Context 的创建;
  • Context - Web 应用上下文,包含多个 Wrapper,负责 web 配置的解析、管理所有的 Web 资源;
  • Wrapper - 最底层的容器,是对 Servlet 的封装,负责 Servlet 实例的创 建、执行和销毁。

3.3.1. 请求分发 Servlet 过程

Tomcat 是怎么确定请求是由哪个 Wrapper 容器里的 Servlet 来处理的呢?答案是,Tomcat 是用 Mapper 组件来完成这个任务的。

举例来说,假如有一个网购系统,有面向网站管理人员的后台管理系统,还有面向终端客户的在线购物系统。这两个系统跑在同一个 Tomcat 上,为了隔离它们的访问域名,配置了两个虚拟域名:manage.shopping.comuser.shopping.com,网站管理人员通过manage.shopping.com域名访问 Tomcat 去管理用户和商品,而用户管理和商品管理是两个单独的 Web 应用。终端客户通过user.shopping.com域名去搜索商品和下订单,搜索功能和订单管理也是两个独立的 Web 应用。如下所示,演示了 url 应声 Servlet 的处理流程。

img

假如有用户访问一个 URL,比如图中的http://user.shopping.com:8080/order/buy,Tomcat 如何将这个 URL 定位到一个 Servlet 呢?

  1. 首先,根据协议和端口号选定 Service 和 Engine。
  2. 然后,根据域名选定 Host。
  3. 之后,根据 URL 路径找到 Context 组件。
  4. 最后,根据 URL 路径找到 Wrapper(Servlet)。

这个路由分发过程具体是怎么实现的呢?答案是使用 Pipeline-Valve 管道。

3.3.2. Pipeline-Value

Pipeline 可以理解为现实中的管道,Valve 为管道中的阀门,Request 和 Response 对象在管道中经过各个阀门的处理和控制。

Pipeline-Valve 是责任链模式,责任链模式是指在一个请求处理的过程中有很多处理者依次对请求进行处理,每个处理者负责做自己相应的处理,处理完之后将再调用下一个处理者继续处理。Valve 表示一个处理点,比如权限认证和记录日志。

先来了解一下 Valve 和 Pipeline 接口的设计:

img

  • 每一个容器都有一个 Pipeline 对象,只要触发这个 Pipeline 的第一个 Valve,这个容器里 Pipeline 中的 Valve 就都会被调用到。但是,不同容器的 Pipeline 是怎么链式触发的呢,比如 Engine 中 Pipeline 需要调用下层容器 Host 中的 Pipeline。
  • 这是因为 Pipeline 中还有个 getBasic 方法。这个 BasicValve 处于 Valve 链表的末端,它是 Pipeline 中必不可少的一个 Valve,负责调用下层容器的 Pipeline 里的第一个 Valve。
  • Pipeline 中有 addValve 方法。Pipeline 中维护了 Valve 链表,Valve 可以插入到 Pipeline 中,对请求做某些处理。我们还发现 Pipeline 中没有 invoke 方法,因为整个调用链的触发是 Valve 来完成的,Valve 完成自己的处理后,调用 getNext.invoke() 来触发下一个 Valve 调用。
  • Valve 中主要的三个方法:setNextgetNextinvoke。Valve 之间的关系是单向链式结构,本身 invoke 方法中会调用下一个 Valve 的 invoke 方法。
  • 各层容器对应的 basic valve 分别是 StandardEngineValveStandardHostValveStandardContextValveStandardWrapperValve
  • 由于 Valve 是一个处理点,因此 invoke 方法就是来处理请求的。注意到 Valve 中有 getNext 和 setNext 方法,因此我们大概可以猜到有一个链表将 Valve 链起来了。

img

整个调用过程由连接器中的 Adapter 触发的,它会调用 Engine 的第一个 Valve:

1
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

4. Tomcat 生命周期

4.1. Tomcat 的启动过程

img

  1. Tomcat 是一个 Java 程序,它的运行从执行 startup.sh 脚本开始。startup.sh 会启动一个 JVM 来运行 Tomcat 的启动类 Bootstrap
  2. Bootstrap 会初始化 Tomcat 的类加载器并实例化 Catalina
  3. Catalina 会通过 Digester 解析 server.xml,根据其中的配置信息来创建相应组件,并调用 Serverstart 方法。
  4. Server 负责管理 Service 组件,它会调用 Servicestart 方法。
  5. Service 负责管理 Connector 和顶层容器 Engine,它会调用 ConnectorEnginestart 方法。

4.1.1. Catalina 组件

Catalina 的职责就是解析 server.xml 配置,并据此实例化 Server。接下来,调用 Server 组件的 init 方法和 start 方法,将 Tomcat 启动起来。

Catalina 还需要处理各种“异常”情况,比如当我们通过“Ctrl + C”关闭 Tomcat 时,Tomcat 将如何优雅的停止并且清理资源呢?因此 Catalina 在 JVM 中注册一个“关闭钩子”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void start() {
//1. 如果持有的 Server 实例为空,就解析 server.xml 创建出来
if (getServer() == null) {
load();
}

//2. 如果创建失败,报错退出
if (getServer() == null) {
log.fatal(sm.getString("catalina.noServer"));
return;
}

//3. 启动 Server
try {
getServer().start();
} catch (LifecycleException e) {
return;
}

// 创建并注册关闭钩子
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
}

// 用 await 方法监听停止请求
if (await) {
await();
stop();
}
}

为什么需要关闭钩子?

如果我们需要在 JVM 关闭时做一些清理工作,比如将缓存数据刷到磁盘上,或者清理一些临时文件,可以向 JVM 注册一个“关闭钩子”。“关闭钩子”其实就是一个线程,JVM 在停止之前会尝试执行这个线程的 run 方法。

Tomcat 的“关闭钩子”—— CatalinaShutdownHook 做了些什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
protected class CatalinaShutdownHook extends Thread {

@Override
public void run() {
try {
if (getServer() != null) {
Catalina.this.stop();
}
} catch (Throwable ex) {
...
}
}
}

Tomcat 的“关闭钩子”实际上就执行了 Serverstop 方法,Serverstop 方法会释放和清理所有的资源。

4.1.2. Server 组件

Server 组件的具体实现类是 StandardServer,Server 继承了 LifeCycleBase,它的生命周期被统一管理,并且它的子组件是 Service,因此它还需要管理 Service 的生命周期,也就是说在启动时调用 Service 组件的启动方法,在停止时调用它们的停止方法。Server 在内部维护了若干 Service 组件,它是以数组来保存的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void addService(Service service) {

service.setServer(this);

synchronized (servicesLock) {
// 创建一个长度 +1 的新数组
Service results[] = new Service[services.length + 1];

// 将老的数据复制过去
System.arraycopy(services, 0, results, 0, services.length);
results[services.length] = service;
services = results;

// 启动 Service 组件
if (getState().isAvailable()) {
try {
service.start();
} catch (LifecycleException e) {
// Ignore
}
}

// 触发监听事件
support.firePropertyChange("service", null, service);
}

}

Server 并没有一开始就分配一个很长的数组,而是在添加的过程中动态地扩展数组长度,当添加一个新的 Service 实例时,会创建一个新数组并把原来数组内容复制到新数组,这样做的目的其实是为了节省内存空间。

除此之外,Server 组件还有一个重要的任务是启动一个 Socket 来监听停止端口,这就是为什么你能通过 shutdown 命令来关闭 Tomcat。不知道你留意到没有,上面 Caralina 的启动方法的最后一行代码就是调用了 Server 的 await 方法。

在 await 方法里会创建一个 Socket 监听 8005 端口,并在一个死循环里接收 Socket 上的连接请求,如果有新的连接到来就建立连接,然后从 Socket 中读取数据;如果读到的数据是停止命令“SHUTDOWN”,就退出循环,进入 stop 流程。

4.1.3. Service 组件

Service 组件的具体实现类是 StandardService。

【源码】StandardService 源码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class StandardService extends LifecycleBase implements Service {
// 名字
private String name = null;

//Server 实例
private Server server = null;

// 连接器数组
protected Connector connectors[] = new Connector[0];
private final Object connectorsLock = new Object();

// 对应的 Engine 容器
private Engine engine = null;

// 映射器及其监听器
protected final Mapper mapper = new Mapper();
protected final MapperListener mapperListener = new MapperListener(this);

// ...
}

StandardService 继承了 LifecycleBase 抽象类。

StandardService 维护了一个 MapperListener 用于支持 Tomcat 热部署。当 Web 应用的部署发生变化时,Mapper 中的映射信息也要跟着变化,MapperListener 就是一个监听器,它监听容器的变化,并把信息更新到 Mapper 中,这是典型的观察者模式。

作为“管理”角色的组件,最重要的是维护其他组件的生命周期。此外在启动各种组件时,要注意它们的依赖关系,也就是说,要注意启动的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void startInternal() throws LifecycleException {

//1. 触发启动监听器
setState(LifecycleState.STARTING);

//2. 先启动 Engine,Engine 会启动它子容器
if (engine != null) {
synchronized (engine) {
engine.start();
}
}

//3. 再启动 Mapper 监听器
mapperListener.start();

//4. 最后启动连接器,连接器会启动它子组件,比如 Endpoint
synchronized (connectorsLock) {
for (Connector connector: connectors) {
if (connector.getState() != LifecycleState.FAILED) {
connector.start();
}
}
}
}

从启动方法可以看到,Service 先启动了 Engine 组件,再启动 Mapper 监听器,最后才是启动连接器。这很好理解,因为内层组件启动好了才能对外提供服务,才能启动外层的连接器组件。而 Mapper 也依赖容器组件,容器组件启动好了才能监听它们的变化,因此 Mapper 和 MapperListener 在容器组件之后启动。组件停止的顺序跟启动顺序正好相反的,也是基于它们的依赖关系。

4.1.4. Engine 组件

Engine 本质是一个容器,因此它继承了 ContainerBase 基类,并且实现了 Engine 接口。

4.2. Web 应用的部署方式

注:catalina.home:安装目录;catalina.base:工作目录;默认值 user.dir

  • Server.xml 配置 Host 元素,指定 appBase 属性,默认$catalina.base/webapps/
  • Server.xml 配置 Context 元素,指定 docBase,元素,指定 web 应用的路径
  • 自定义配置:在$catalina.base/EngineName/HostName/XXX.xml 配置 Context 元素

HostConfig 监听了 StandardHost 容器的事件,在 start 方法中解析上述配置文件:

  • 扫描 appbase 路径下的所有文件夹和 war 包,解析各个应用的 META-INF/context.xml,并 创建 StandardContext,并将 Context 加入到 Host 的子容器中。
  • 解析$catalina.base/EngineName/HostName/下的所有 Context 配置,找到相应 web 应 用的位置,解析各个应用的 META-INF/context.xml,并创建 StandardContext,并将 Context 加入到 Host 的子容器中。

注:

  • HostConfig 并没有实际解析 Context.xml,而是在 ContextConfig 中进行的。
  • HostConfig 中会定期检查 watched 资源文件(context.xml 配置文件)

ContextConfig 解析 context.xml 顺序:

  • 先解析全局的配置 config/context.xml
  • 然后解析 Host 的默认配置 EngineName/HostName/context.xml.default
  • 最后解析应用的 META-INF/context.xml

ContextConfig 解析 web.xml 顺序:

  • 先解析全局的配置 config/web.xml
  • 然后解析 Host 的默认配置 EngineName/HostName/web.xml.default 接着解析应用的 MEB-INF/web.xml
  • 扫描应用 WEB-INF/lib/下的 jar 文件,解析其中的 META-INF/web-fragment.xml 最后合并 xml 封装成 WebXml,并设置 Context

注:

  • 扫描 web 应用和 jar 中的注解(Filter、Listener、Servlet)就是上述步骤中进行的。
  • 容器的定期执行:backgroundProcess,由 ContainerBase 来实现的,并且只有在顶层容器 中才会开启线程。(backgroundProcessorDelay=10 标志位来控制)

4.3. LifeCycle

img

4.3.1. 请求处理过程

  1. 根据 server.xml 配置的指定的 connector 以及端口监听 http、或者 ajp 请求
  2. 请求到来时建立连接,解析请求参数,创建 Request 和 Response 对象,调用顶层容器 pipeline 的 invoke 方法
  3. 容器之间层层调用,最终调用业务 servlet 的 service 方法
  4. Connector 将 response 流中的数据写到 socket 中

4.4. Connector 流程

4.4.1. 阻塞 IO

4.4.2. 非阻塞 IO

4.4.3. IO 多路复用

阻塞与非阻塞的区别在于进行读操作和写操作的系统调用时,如果此时内核态没有数据可读或者没有缓冲空间可写时,是否阻塞。

IO 多路复用的好处在于可同时监听多个 socket 的可读和可写事件,这样就能使得应用可以同时监听多个 socket,释放了应用线程资源。

4.4.4. Tomcat 各类 Connector 对比

  • JIO:用 java.io 编写的 TCP 模块,阻塞 IO
  • NIO:用 java.nio 编写的 TCP 模块,非阻塞 IO,(IO 多路复用)
  • APR:全称 Apache Portable Runtime,使用 JNI 的方式来进行读取文件以及进行网络传输

Apache Portable Runtime 是一个高度可移植的库,它是 Apache HTTP Server 2.x 的核心。 APR 具有许多用途,包括访问高级 IO 功能(如 sendfile,epoll 和 OpenSSL),操作系统级功能(随机数生成,系统状态等)和本地进程处理(共享内存,NT 管道和 Unix 套接字)。

表格中字段含义说明:

  • Support Polling - 是否支持基于 IO 多路复用的 socket 事件轮询
  • Polling Size - 轮询的最大连接数
  • Wait for next Request - 在等待下一个请求时,处理线程是否释放,BIO 是没有释放的,所以在 keep-alive=true 的情况下处理的并发连接数有限
  • Read Request Headers - 由于 request header 数据较少,可以由容器提前解析完毕,不需要阻塞
  • Read Request Body - 读取 request body 的数据是应用业务逻辑的事情,同时 Servlet 的限制,是需要阻塞读取的
  • Write Response - 跟读取 request body 的逻辑类似,同样需要阻塞写

NIO 处理相关类

Poller 线程从 EventQueue 获取 PollerEvent,并执行 PollerEvent 的 run 方法,调用 Selector 的 select 方法,如果有可读的 Socket 则创建 Http11NioProcessor,放入到线程池中执行;

CoyoteAdapter 是 Connector 到 Container 的适配器,Http11NioProcessor 调用其提供的 service 方法,内部创建 Request 和 Response 对象,并调用最顶层容器的 Pipeline 中的第一个 Valve 的 invoke 方法

Mapper 主要处理 http url 到 servlet 的映射规则的解析,对外提供 map 方法

4.5. Comet

Comet 是一种用于 web 的推送技术,能使服务器实时地将更新的信息传送到客户端,而无须客户端发出请求
在 WebSocket 出来之前,如果不适用 comet,只能通过浏览器端轮询 Server 来模拟实现服务器端推送。
Comet 支持 servlet 异步处理 IO,当连接上数据可读时触发事件,并异步写数据(阻塞)

Tomcat 要实现 Comet,只需继承 HttpServlet 同时,实现 CometProcessor 接口

  • Begin:新的请求连接接入调用,可进行与 Request 和 Response 相关的对象初始化操作,并保存 response 对象,用于后续写入数据
  • Read:请求连接有数据可读时调用
  • End:当数据可用时,如果读取到文件结束或者 response 被关闭时则被调用
  • Error:在连接上发生异常时调用,数据读取异常、连接断开、处理异常、socket 超时

Note:

  • Read:在 post 请求有数据,但在 begin 事件中没有处理,则会调用 read,如果 read 没有读取数据,在会触发 Error 回调,关闭 socket
  • End:当 socket 超时,并且 response 被关闭时也会调用;server 被关闭时调用
  • Error:除了 socket 超时不会关闭 socket,其他都会关闭 socket
  • End 和 Error 时间触发时应关闭当前 comet 会话,即调用 CometEvent 的 close 方法
    Note:在事件触发时要做好线程安全的操作

4.6. 异步 Servlet

传统流程:

  • 首先,Servlet 接收到请求之后,request 数据解析;
  • 接着,调用业务接口的某些方法,以完成业务处理;
  • 最后,根据处理的结果提交响应,Servlet 线程结束

异步处理流程:

  • 客户端发送一个请求
  • Servlet 容器分配一个线程来处理容器中的一个 servlet
  • servlet 调用 request.startAsync(),保存 AsyncContext, 然后返回
  • 任何方式存在的容器线程都将退出,但是 response 仍然保持开放
  • 业务线程使用保存的 AsyncContext 来完成响应(线程池)
  • 客户端收到响应

Servlet 线程将请求转交给一个异步线程来执行业务处理,线程本身返回至容器,此时 Servlet 还没有生成响应数据,异步线程处理完业务以后,可以直接生成响应数据(异步线程拥有 ServletRequest 和 ServletResponse 对象的引用)

为什么 web 应用中支持异步?

推出异步,主要是针对那些比较耗时的请求:比如一次缓慢的数据库查询,一次外部 REST API 调用, 或者是其他一些 I/O 密集型操作。这种耗时的请求会很快的耗光 Servlet 容器的线程池,继而影响可扩展性。

Note:从客户端的角度来看,request 仍然像任何其他的 HTTP 的 request-response 交互一样,只是耗费了更长的时间而已

异步事件监听

  • onStartAsync:Request 调用 startAsync 方法时触发
  • onComplete:syncContext 调用 complete 方法时触发
  • onError:处理请求的过程出现异常时触发
  • onTimeout:socket 超时触发

Note :
onError/ onTimeout 触发后,会紧接着回调 onComplete
onComplete 执行后,就不可再操作 request 和 response

5. 参考资料

Tomcat 连接器

1. NioEndpoint 组件

Tomcat 的 NioEndPoint 组件利用 Java NIO 实现了 I/O 多路复用模型。

img

NioEndPoint 子组件功能简介:

  • LimitLatch 是连接控制器,负责控制最大连接数。NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝。
  • Acceptor 负责监听连接请求。Acceptor 运行在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。
  • Poller 的本质是一个 Selector,也运行在单独线程里。Poller 内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。
  • Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。我们知道,Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

NioEndpoint 如何实现高并发的呢?

要实现高并发需要合理设计线程模型充分利用 CPU 资源,尽量不要让线程阻塞;另外,就是有多少任务,就用相应规模的线程数去处理。

NioEndpoint 要完成三件事情:接收连接、检测 I/O 事件以及处理请求,那么最核心的就是把这三件事情分开,用不同规模的线程去处理,比如用专门的线程组去跑 Acceptor,并且 Acceptor 的个数可以配置;用专门的线程组去跑 Poller,Poller 的个数也可以配置;最后具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小。

1.1. LimitLatch

LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {

@Override
protected int tryAcquireShared() {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}

@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}

private final Sync sync;
private final AtomicLong count;
private volatile long limit;

// 线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}

LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。我们可以扩展它来实现自己的同步器,实际上 Java 并发包里的锁和条件变量等等都是通过 AQS 来实现的,而这里的 LimitLatch 也不例外。

理解源码要点:

  • 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。
  • 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的releaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

1.2. Acceptor

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。

1
2
3
serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);
  • bind 方法的第二个参数表示操作系统的等待队列长度,我在上面提到,当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。
  • ServerSocketChannel 被设置成阻塞模式,也就是说它是以阻塞的方式接收连接的。ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的生产者 - 消费者模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

1.3. Poller

Poller 本质是一个 Selector,它内部维护一个 Queue

1
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的核心方法都使用了 Synchronized 关键字进行修饰,用来保证同一时刻只有一个线程进行读写。

使用 SynchronizedQueue,意味着同一时刻只有一个 Acceptor 线程对队列进行读写;同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的队列。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel

1.4. SocketProcessor

我们知道,Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,这里请你注意:

Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannelSocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapperHttp11Processor 只调用 SocketWrapper 的方法去读写数据。

2. Nio2Endpoint 组件

Nio2Endpoint 工作流程跟 NioEndpoint 较为相似。

img

Nio2Endpoint 子组件功能说明:

  • LimitLatch 是连接控制器,它负责控制最大连接数。
  • Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannelNio2AcceptorAsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。
  • Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。

Nio2Endpoint 跟 NioEndpoint 的一个明显不同点是,Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。

2.1. Nio2Acceptor

NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数。

但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断的调 accept 方法,而是通过回调函数来完成的。我们来看看它的连接监听方法:

1
serverSock.accept(null, this);

其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。那么它是如何实现 CompletionHandler 接口的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
implements CompletionHandler<AsynchronousSocketChannel, Void> {

@Override
public void completed(AsynchronousSocketChannel socket,
Void attachment) {

if (isRunning() && !isPaused()) {
if (getMaxConnections() == -1) {
// 如果没有连接限制,继续接收新的连接
serverSock.accept(null, this);
} else {
// 如果有连接限制,就在线程池里跑 Run 方法,Run 方法会检查连接数
getExecutor().execute(this);
}
// 处理请求
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
}
}
}

可以看到 CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,我在前面说过第一个参数就是 accept 方法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。completed 方法的处理逻辑比较简单:

  • 如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。
  • 如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方法可能被阻塞,会导致这个回调方法一直不返回。

接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapperSocketProcessor,并交给线程池处理。

2.2. Nio2SocketWrapper

Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。讲到这里你是不是有个疑问:Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据, 怎么办呢?这个请求的处理不就失败了吗?

为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

  • 第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着 SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。这里请注意,Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状态。
  • 第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用层的 Buffer。

这个回调类 readCompletionHandler 的源码如下,最关键的一点是,**Nio2SocketWrapper 是作为附件类来传递的**,这样在回调函数里能拿到所有的上下文。

1
2
3
4
5
6
7
8
9
10
11
this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
...
// 通过附件类 SocketWrapper 拿到所有的上下文
Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
}

public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
...
}
}

3. AprEndpoint 组件

我们在使用 Tomcat 时,可能会在启动日志里看到这样的提示信息:

The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: ***

这句话的意思就是推荐你去安装 APR 库,可以提高系统性能。

APR(Apache Portable Runtime Libraries)是 Apache 可移植运行时库,它是用 C 语言实现的,其目的是向上层应用程序提供一个跨平台的操作系统接口库。Tomcat 可以用它来处理包括文件和网络 I/O,从而提升性能。Tomcat 支持的连接器有 NIO、NIO.2 和 APR。跟 NioEndpoint 一样,AprEndpoint 也实现了非阻塞 I/O,它们的区别是:NioEndpoint 通过调用 Java 的 NIO API 来实现非阻塞 I/O,而 AprEndpoint 是通过 JNI 调用 APR 本地库而实现非阻塞 I/O 的。

同样是非阻塞 I/O,为什么 Tomcat 会提示使用 APR 本地库的性能会更好呢?这是因为在某些场景下,比如需要频繁与操作系统进行交互,Socket 网络通信就是这样一个场景,特别是如果你的 Web 应用使用了 TLS 来加密传输,我们知道 TLS 协议在握手过程中有多次网络交互,在这种情况下 Java 跟 C 语言程序相比还是有一定的差距,而这正是 APR 的强项。

Tomcat 本身是 Java 编写的,为了调用 C 语言编写的 APR,需要通过 JNI 方式来调用。JNI(Java Native Interface) 是 JDK 提供的一个编程接口,它允许 Java 程序调用其他语言编写的程序或者代码库,其实 JDK 本身的实现也大量用到 JNI 技术来调用本地 C 程序库。

3.1. AprEndpoint 工作流程

img

3.1.1. Acceptor

Accpetor 的功能就是监听连接,接收并建立连接。它的本质就是调用了四个操作系统 API:socket、bind、listen 和 accept。那 Java 语言如何直接调用 C 语言 API 呢?答案就是通过 JNI。具体来说就是两步:先封装一个 Java 类,在里面定义一堆用native 关键字修饰的方法,像下面这样。

1
2
3
4
5
6
7
8
9
10
11
12
public class Socket {
...
// 用 native 修饰这个方法,表明这个函数是 C 语言实现
public static native long create(int family, int type,
int protocol, long cont)

public static native int bind(long sock, long sa);

public static native int listen(long sock, int backlog);

public static native long accept(long sock)
}

接着用 C 代码实现这些方法,比如 bind 函数就是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
// 注意函数的名字要符合 JNI 规范的要求
JNIEXPORT jint JNICALL
Java_org_apache_tomcat_jni_Socket_bind(JNIEnv *e, jlong sock,jlong sa)
{
jint rv = APR_SUCCESS;
tcn_socket_t *s = (tcn_socket_t *)sock;
apr_sockaddr_t *a = (apr_sockaddr_t *) sa;

// 调用 APR 库自己实现的 bind 函数
rv = (jint)apr_socket_bind(s->sock, a);
return rv;
}

专栏里我就不展开 JNI 的细节了,你可以扩展阅读获得更多信息和例子。我们要注意的是函数名字要符合 JNI 的规范,以及 Java 和 C 语言如何互相传递参数,比如在 C 语言有指针,Java 没有指针的概念,所以在 Java 中用 long 类型来表示指针。AprEndpoint 的 Acceptor 组件就是调用了 APR 实现的四个 API。

3.1.2. Poller

Acceptor 接收到一个新的 Socket 连接后,按照 NioEndpoint 的实现,它会把这个 Socket 交给 Poller 去查询 I/O 事件。AprEndpoint 也是这样做的,不过 AprEndpoint 的 Poller 并不是调用 Java NIO 里的 Selector 来查询 Socket 的状态,而是通过 JNI 调用 APR 中的 poll 方法,而 APR 又是调用了操作系统的 epoll API 来实现的。

这里有个特别的地方是在 AprEndpoint 中,我们可以配置一个叫deferAccept的参数,它对应的是 TCP 协议中的TCP_DEFER_ACCEPT,设置这个参数后,当 TCP 客户端有新的连接请求到达时,TCP 服务端先不建立连接,而是再等等,直到客户端有请求数据发过来时再建立连接。这样的好处是服务端不需要用 Selector 去反复查询请求数据是否就绪。

这是一种 TCP 协议层的优化,不是每个操作系统内核都支持,因为 Java 作为一种跨平台语言,需要屏蔽各种操作系统的差异,因此并没有把这个参数提供给用户;但是对于 APR 来说,它的目的就是尽可能提升性能,因此它向用户暴露了这个参数。

3.2. APR 提升性能的秘密

APR 连接器之所以能提高 Tomcat 的性能,除了 APR 本身是 C 程序库之外,还有哪些提速的秘密呢?

JVM 堆 VS 本地内存

我们知道 Java 的类实例一般在 JVM 堆上分配,而 Java 是通过 JNI 调用 C 代码来实现 Socket 通信的,那么 C 代码在运行过程中需要的内存又是从哪里分配的呢?C 代码能否直接操作 Java 堆?

为了回答这些问题,我先来说说 JVM 和用户进程的关系。如果你想运行一个 Java 类文件,可以用下面的 Java 命令来执行。

1
java my.class

这个命令行中的java其实是一个可执行程序,这个程序会创建 JVM 来加载和运行你的 Java 类。操作系统会创建一个进程来执行这个java可执行程序,而每个进程都有自己的虚拟地址空间,JVM 用到的内存(包括堆、栈和方法区)就是从进程的虚拟地址空间上分配的。请你注意的是,JVM 内存只是进程空间的一部分,除此之外进程空间内还有代码段、数据段、内存映射区、内核空间等。从 JVM 的角度看,JVM 内存之外的部分叫作本地内存,C 程序代码在运行过程中用到的内存就是本地内存中分配的。下面我们通过一张图来理解一下。

img

Tomcat 的 Endpoint 组件在接收网络数据时需要预先分配好一块 Buffer,所谓的 Buffer 就是字节数组byte[],Java 通过 JNI 调用把这块 Buffer 的地址传给 C 代码,C 代码通过操作系统 API 读取 Socket 并把数据填充到这块 Buffer。Java NIO API 提供了两种 Buffer 来接收数据:HeapByteBuffer 和 DirectByteBuffer,下面的代码演示了如何创建两种 Buffer。

1
2
3
4
5
// 分配 HeapByteBuffer
ByteBuffer buf = ByteBuffer.allocate(1024);

// 分配 DirectByteBuffer
ByteBuffer buf = ByteBuffer.allocateDirect(1024);

创建好 Buffer 后直接传给 Channel 的 read 或者 write 函数,最终这块 Buffer 会通过 JNI 调用传递给 C 程序。

1
2
// 将 buf 作为 read 函数的参数
int bytesRead = socketChannel.read(buf);

那 HeapByteBuffer 和 DirectByteBuffer 有什么区别呢?HeapByteBuffer 对象本身在 JVM 堆上分配,并且它持有的字节数组byte[]也是在 JVM 堆上分配。但是如果用HeapByteBuffer来接收网络数据,需要把数据从内核先拷贝到一个临时的本地内存,再从临时本地内存拷贝到 JVM 堆,而不是直接从内核拷贝到 JVM 堆上。这是为什么呢?这是因为数据从内核拷贝到 JVM 堆的过程中,JVM 可能会发生 GC,GC 过程中对象可能会被移动,也就是说 JVM 堆上的字节数组可能会被移动,这样的话 Buffer 地址就失效了。如果这中间经过本地内存中转,从本地内存到 JVM 堆的拷贝过程中 JVM 可以保证不做 GC。

如果使用 HeapByteBuffer,你会发现 JVM 堆和内核之间多了一层中转,而 DirectByteBuffer 用来解决这个问题,DirectByteBuffer 对象本身在 JVM 堆上,但是它持有的字节数组不是从 JVM 堆上分配的,而是从本地内存分配的。DirectByteBuffer 对象中有个 long 类型字段 address,记录着本地内存的地址,这样在接收数据的时候,直接把这个本地内存地址传递给 C 程序,C 程序会将网络数据从内核拷贝到这个本地内存,JVM 可以直接读取这个本地内存,这种方式比 HeapByteBuffer 少了一次拷贝,因此一般来说它的速度会比 HeapByteBuffer 快好几倍。你可以通过上面的图加深理解。

Tomcat 中的 AprEndpoint 就是通过 DirectByteBuffer 来接收数据的,而 NioEndpoint 和 Nio2Endpoint 是通过 HeapByteBuffer 来接收数据的。你可能会问,NioEndpoint 和 Nio2Endpoint 为什么不用 DirectByteBuffer 呢?这是因为本地内存不好管理,发生内存泄漏难以定位,从稳定性考虑,NioEndpoint 和 Nio2Endpoint 没有去冒这个险。

3.2.1. sendfile

我们再来考虑另一个网络通信的场景,也就是静态文件的处理。浏览器通过 Tomcat 来获取一个 HTML 文件,而 Tomcat 的处理逻辑无非是两步:

  1. 从磁盘读取 HTML 到内存。
  2. 将这段内存的内容通过 Socket 发送出去。

但是在传统方式下,有很多次的内存拷贝:

  • 读取文件时,首先是内核把文件内容读取到内核缓冲区。
  • 如果使用 HeapByteBuffer,文件数据从内核到 JVM 堆内存需要经过本地内存中转。
  • 同样在将文件内容推入网络时,从 JVM 堆到内核缓冲区需要经过本地内存中转。
  • 最后还需要把文件从内核缓冲区拷贝到网卡缓冲区。

从下面的图你会发现这个过程有 6 次内存拷贝,并且 read 和 write 等系统调用将导致进程从用户态到内核态的切换,会耗费大量的 CPU 和内存资源。

img

而 Tomcat 的 AprEndpoint 通过操作系统层面的 sendfile 特性解决了这个问题,sendfile 系统调用方式非常简洁。

1
sendfile(socket, file, len);

它带有两个关键参数:Socket 和文件句柄。将文件从磁盘写入 Socket 的过程只有两步:

第一步:将文件内容读取到内核缓冲区。

第二步:数据并没有从内核缓冲区复制到 Socket 关联的缓冲区,只有记录数据位置和长度的描述符被添加到 Socket 缓冲区中;接着把数据直接从内核缓冲区传递给网卡。这个过程你可以看下面的图。

img

4. Executor 组件

为了提高处理能力和并发度,Web 容器一般会把处理请求的工作放到线程池里来执行,Tomcat 扩展了原生的 Java 线程池,来满足 Web 容器高并发的需求。

4.1. Tomcat 定制线程池

Tomcat 的线程池也是一个定制版的 ThreadPoolExecutor。Tomcat 传入的参数是这样的:

1
2
3
4
5
6
7
8
// 定制版的任务队列
taskqueue = new TaskQueue(maxQueueSize);

// 定制版的线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());

// 定制版的线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);

其中的两个关键点:

  • Tomcat 有自己的定制版任务队列和线程工厂,并且可以限制任务队列的长度,它的最大长度是 maxQueueSize。
  • Tomcat 对线程数也有限制,设置了核心线程数(minSpareThreads)和最大线程池数(maxThreads)。

除了资源限制以外,Tomcat 线程池还定制自己的任务处理流程。我们知道 Java 原生线程池的任务处理逻辑比较简单:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 后面再来任务,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,执行拒绝策略。

Tomcat 线程池扩展了原生的 ThreadPoolExecutor,通过重写 execute 方法实现了自己的任务处理逻辑:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,则继续尝试把任务添加到任务队列中去。
  4. 如果缓冲队列也满了,插入失败,执行拒绝策略。

观察 Tomcat 线程池和 Java 原生线程池的区别,其实就是在第 3 步,Tomcat 在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。那具体如何实现呢,其实很简单,我们来看一下 Tomcat 线程池的 execute 方法的核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

...

public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
// 调用 Java 原生线程池的 execute 去执行任务
super.execute(command);
} catch (RejectedExecutionException rx) {
// 如果总线程数达到 maximumPoolSize,Java 原生线程池执行拒绝策略
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 继续尝试把任务放到任务队列中去
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
// 如果缓冲队列也满了,插入失败,执行拒绝策略。
throw new RejectedExecutionException("...");
}
}
}
}
}

从这个方法你可以看到,Tomcat 线程池的 execute 方法会调用 Java 原生线程池的 execute 去执行任务,如果总线程数达到 maximumPoolSize,Java 原生线程池的 execute 方法会抛出 RejectedExecutionException 异常,但是这个异常会被 Tomcat 线程池的 execute 方法捕获到,并继续尝试把这个任务放到任务队列中去;如果任务队列也满了,再执行拒绝策略。

4.2. Tomcat 定制任务队列

细心的你有没有发现,在 Tomcat 线程池的 execute 方法最开始有这么一行:

1
submittedCount.incrementAndGet();

这行代码的意思把 submittedCount 这个原子变量加一,并且在任务执行失败,抛出拒绝异常时,将这个原子变量减一:

1
submittedCount.decrementAndGet();

其实 Tomcat 线程池是用这个变量 submittedCount 来维护已经提交到了线程池,但是还没有执行完的任务个数。Tomcat 为什么要维护这个变量呢?这跟 Tomcat 的定制版的任务队列有关。Tomcat 的任务队列 TaskQueue 扩展了 Java 中的 LinkedBlockingQueue,我们知道 LinkedBlockingQueue 默认情况下长度是没有限制的,除非给它一个 capacity。因此 Tomcat 给了它一个 capacity,TaskQueue 的构造函数中有个整型的参数 capacity,TaskQueue 将 capacity 传给父类 LinkedBlockingQueue 的构造函数。

1
2
3
4
5
6
7
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

public TaskQueue(int capacity) {
super(capacity);
}
...
}

这个 capacity 参数是通过 Tomcat 的 maxQueueSize 参数来设置的,但问题是默认情况下 maxQueueSize 的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题,TaskQueue 重写了 LinkedBlockingQueue 的 offer 方法,在合适的时机返回 false,返回 false 表示任务添加失败,这时线程池会创建新的线程。那什么是合适的时机呢?请看下面 offer 方法的核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

...
@Override
// 线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
public boolean offer(Runnable o) {

// 如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
if (parent.getPoolSize() == parent.getMaximumPoolSize())
return super.offer(o);

// 执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
// 表明是可以创建新线程的,那到底要不要创建呢?分两种情况:

//1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
if (parent.getSubmittedCount()<=(parent.getPoolSize()))
return super.offer(o);

//2. 如果已提交的任务数大于当前线程数,线程不够用了,返回 false 去创建新线程
if (parent.getPoolSize()<parent.getMaximumPoolSize())
return false;

// 默认情况下总是把任务添加到任务队列
return super.offer(o);
}

}

从上面的代码我们看到,只有当前线程数大于核心线程数、小于最大线程数,并且已提交的任务个数大于当前线程数时,也就是说线程不够用了,但是线程数又没达到极限,才会去创建新的线程。这就是为什么 Tomcat 需要维护已提交任务数这个变量,它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程

当然默认情况下 Tomcat 的任务队列是没有限制的,你可以通过设置 maxQueueSize 参数来限制任务队列的长度。

5. WebSocket 组件

HTTP 协议是“请求 - 响应”模式,浏览器必须先发请求给服务器,服务器才会响应这个请求。也就是说,服务器不会主动发送数据给浏览器。

对于实时性要求比较的高的应用,比如在线游戏、股票基金实时报价和在线协同编辑等,浏览器需要实时显示服务器上最新的数据,因此出现了 Ajax 和 Comet 技术。Ajax 本质上还是轮询,而 Comet 是在 HTTP 长连接的基础上做了一些 hack,但是它们的实时性不高,另外频繁的请求会给服务器带来压力,也会浪费网络流量和带宽。于是 HTML5 推出了 WebSocket 标准,使得浏览器和服务器之间任何一方都可以主动发消息给对方,这样服务器有新数据时可以主动推送给浏览器。

Tomcat 如何支持 WebSocket?简单来说,Tomcat 做了两件事:

  • Endpoint 加载
  • WebSocket 请求处理

5.1. WebSocket 加载

Tomcat 的 WebSocket 加载是通过 SCI 机制完成的。SCI 全称 ServletContainerInitializer,是 Servlet 3.0 规范中定义的用来接收 Web 应用启动事件的接口。那为什么要监听 Servlet 容器的启动事件呢?因为这样我们有机会在 Web 应用启动时做一些初始化工作,比如 WebSocket 需要扫描和加载 Endpoint 类。SCI 的使用也比较简单,将实现 ServletContainerInitializer 接口的类增加 HandlesTypes 注解,并且在注解内指定的一系列类和接口集合。比如 Tomcat 为了扫描和加载 Endpoint 而定义的 SCI 类如下:

1
2
3
4
5
6
7
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class, Endpoint.class})
public class WsSci implements ServletContainerInitializer {

public void onStartup(Set<Class<?>> clazzes, ServletContext ctx) throws ServletException {
...
}
}

一旦定义好了 SCI,Tomcat 在启动阶段扫描类时,会将 HandlesTypes 注解中指定的类都扫描出来,作为 SCI 的 onStartup 方法的参数,并调用 SCI 的 onStartup 方法。注意到 WsSci 的 HandlesTypes 注解中定义了ServerEndpoint.classServerApplicationConfig.classEndpoint.class,因此在 Tomcat 的启动阶段会将这些类的类实例(注意不是对象实例)传递给 WsSci 的 onStartup 方法。那么 WsSci 的 onStartup 方法又做了什么事呢?

它会构造一个 WebSocketContainer 实例,你可以把 WebSocketContainer 理解成一个专门处理 WebSocket 请求的Endpoint 容器。也就是说 Tomcat 会把扫描到的 Endpoint 子类和添加了注解@ServerEndpoint的类注册到这个容器中,并且这个容器还维护了 URL 到 Endpoint 的映射关系,这样通过请求 URL 就能找到具体的 Endpoint 来处理 WebSocket 请求。

5.2. WebSocket 请求处理

Tomcat 用 ProtocolHandler 组件屏蔽应用层协议的差异,其中 ProtocolHandler 中有两个关键组件:Endpoint 和 Processor。需要注意,这里的 Endpoint 跟上文提到的 WebSocket 中的 Endpoint 完全是两回事,连接器中的 Endpoint 组件用来处理 I/O 通信。WebSocket 本质就是一个应用层协议,因此不能用 HttpProcessor 来处理 WebSocket 请求,而要用专门 Processor 来处理,而在 Tomcat 中这样的 Processor 叫作 UpgradeProcessor。

为什么叫 Upgrade Processor 呢?这是因为 Tomcat 是将 HTTP 协议升级成 WebSocket 协议的。

WebSocket 是通过 HTTP 协议来进行握手的,因此当 WebSocket 的握手请求到来时,HttpProtocolHandler 首先接收到这个请求,在处理这个 HTTP 请求时,Tomcat 通过一个特殊的 Filter 判断该当前 HTTP 请求是否是一个 WebSocket Upgrade 请求(即包含Upgrade: websocket的 HTTP 头信息),如果是,则在 HTTP 响应里添加 WebSocket 相关的响应头信息,并进行协议升级。具体来说就是用 UpgradeProtocolHandler 替换当前的 HttpProtocolHandler,相应的,把当前 Socket 的 Processor 替换成 UpgradeProcessor,同时 Tomcat 会创建 WebSocket Session 实例和 Endpoint 实例,并跟当前的 WebSocket 连接一一对应起来。这个 WebSocket 连接不会立即关闭,并且在请求处理中,不再使用原有的 HttpProcessor,而是用专门的 UpgradeProcessor,UpgradeProcessor 最终会调用相应的 Endpoint 实例来处理请求。

img

你可以看到,Tomcat 对 WebSocket 请求的处理没有经过 Servlet 容器,而是通过 UpgradeProcessor 组件直接把请求发到 ServerEndpoint 实例,并且 Tomcat 的 WebSocket 实现不需要关注具体 I/O 模型的细节,从而实现了与具体 I/O 方式的解耦。

6. 参考资料

Tomcat 优化

Tomcat 启动优化

如果 Tomcat 启动比较慢,可以考虑一些优化点

清理 Tomcat

  • 清理不必要的 Web 应用:首先我们要做的是删除掉 webapps 文件夹下不需要的工程,一般是 host-manager、example、doc 等这些默认的工程,可能还有以前添加的但现在用不着的工程,最好把这些全都删除掉。
  • 清理 XML 配置文件:Tomcat 在启动时会解析所有的 XML 配置文件,解析 XML 较为耗时,所以应该尽量保持配置文件的简洁。
  • 清理 JAR 文件:JVM 的类加载器在加载类时,需要查找每一个 JAR 文件,去找到所需要的类。如果删除了不需要的 JAR 文件,查找的速度就会快一些。这里请注意:Web 应用中的 lib 目录下不应该出现 Servlet API 或者 Tomcat 自身的 JAR,这些 JAR 由 Tomcat 负责提供。
  • 清理其他文件:及时清理日志,删除 logs 文件夹下不需要的日志文件。同样还有 work 文件夹下的 catalina 文件夹,它其实是 Tomcat 把 JSP 转换为 Class 文件的工作目录。有时候我们也许会遇到修改了代码,重启了 Tomcat,但是仍没效果,这时候便可以删除掉这个文件夹,Tomcat 下次启动的时候会重新生成。

禁止 Tomcat TLD 扫描

Tomcat 为了支持 JSP,在应用启动的时候会扫描 JAR 包里面的 TLD 文件,加载里面定义的标签库。所以在 Tomcat 的启动日志里,你可能会碰到这种提示:

At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time.

Tomcat 的意思是,我扫描了你 Web 应用下的 JAR 包,发现 JAR 包里没有 TLD 文件。我建议配置一下 Tomcat 不要去扫描这些 JAR 包,这样可以提高 Tomcat 的启动速度,并节省 JSP 编译时间。

如何配置不去扫描这些 JAR 包呢,这里分两种情况:

  • 如果你的项目没有使用 JSP 作为 Web 页面模板,而是使用 Velocity 之类的模板引擎,你完全可以把 TLD 扫描禁止掉。方法是,找到 Tomcat 的conf/目录下的context.xml文件,在这个文件里 Context 标签下,加上JarScannerJarScanFilter子标签,像下面这样。

    1
    2
    3
    4
    5
    <Context>
    <JarScanner >
    <JarScanFilter defaultTldScan="true" defaultpluggabilityScan="true" />
    </JarScanner>
    </Context>
  • 如果你的项目使用了 JSP 作为 Web 页面模块,意味着 TLD 扫描无法避免,但是我们可以通过配置来告诉 Tomcat,只扫描那些包含 TLD 文件的 JAR 包。方法是,找到 Tomcat 的conf/目录下的catalina.properties文件,在这个文件里的 jarsToSkip 配置项中,加上你的 JAR 包。

    1
    tomcat.util.scan.StandardJarScanFilter.jarsToSkip=xxx.jar

关闭 WebSocket 支持

Tomcat 会扫描 WebSocket 注解的 API 实现,比如 @ServerEndpoint 注解的类。如果不需要使用 WebSockets 就可以关闭它。具体方法是,找到 Tomcat 的 conf/ 目录下的 context.xml 文件,给 Context 标签加一个 containerSciFilter 的属性:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci">
...
</Context>

更进一步,如果你不需要 WebSockets 这个功能,你可以把 Tomcat lib 目录下的 websocket-api.jartomcat-websocket.jar 这两个 JAR 文件删除掉,进一步提高性能。

关闭 JSP 支持

如果不需要使用 JSP,可以关闭 JSP 功能:

1
2
3
<Context containerSciFilter="org.apache.jasper.servlet.JasperInitializer">
...
</Context>

如果要同时关闭 WebSocket 和 Jsp,可以这样配置:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci | org.apache.jasper.servlet.JasperInitializer">
...
</Context>

禁止扫描 Servlet 注解

Servlet 3.0 引入了注解 Servlet,Tomcat 为了支持这个特性,会在 Web 应用启动时扫描你的类文件,因此如果你没有使用 Servlet 注解这个功能,可以告诉 Tomcat 不要去扫描 Servlet 注解。具体配置方法是,在你的 Web 应用的web.xml文件中,设置<web-app>元素的属性metadata-complete="true",像下面这样。

1
2
<web-app metadata-complete="true">
</web-app>

metadata-complete 的意思是,web.xml 里配置的 Servlet 是完整的,不需要再去库类中找 Servlet 的定义。

配置 Web-Fragment 扫描

Servlet 3.0 还引入了“Web 模块部署描述符片段”的 web-fragment.xml,这是一个部署描述文件,可以完成 web.xml 的配置功能。而这个 web-fragment.xml 文件必须存放在 JAR 文件的 META-INF 目录下,而 JAR 包通常放在 WEB-INF/lib 目录下,因此 Tomcat 需要对 JAR 文件进行扫描才能支持这个功能。

可以通过配置 web.xml 里面的 <absolute-ordering> 元素直接指定了哪些 JAR 包需要扫描 web fragment,如果 <absolute-ordering/> 元素是空的, 则表示不需要扫描,像下面这样。

1
2
3
4
5
<web-app metadata-complete="true">
...
<absolute-ordering />
...
</web-app>

随机数熵源优化

Tomcat 7 以上的版本依赖 Java 的 SecureRandom 类来生成随机数,比如 Session ID。而 JVM 默认使用阻塞式熵源(/dev/random), 在某些情况下就会导致 Tomcat 启动变慢。当阻塞时间较长时, 你会看到这样一条警告日志:

1
2
<DATE> org.apache.catalina.util.SessionIdGenerator createSecureRandom
INFO: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [8152] milliseconds.

解决方案是通过设置,让 JVM 使用非阻塞式的熵源。

我们可以设置 JVM 的参数:

1
-Djava.security.egd=file:/dev/./urandom

或者是设置 java.security 文件,位于 $JAVA_HOME/jre/lib/security 目录之下: securerandom.source=file:/dev/./urandom

这里请你注意,/dev/./urandom 中间有个 ./ 的原因是 Oracle JRE 中的 Bug,Java 8 里面的 SecureRandom 类已经修正这个 Bug。 阻塞式的熵源(/dev/random)安全性较高, 非阻塞式的熵源(/dev/./urandom)安全性会低一些,因为如果你对随机数的要求比较高, 可以考虑使用硬件方式生成熵源。

并行启动多个 Web 应用

Tomcat 启动的时候,默认情况下 Web 应用都是一个一个启动的,等所有 Web 应用全部启动完成,Tomcat 才算启动完毕。如果在一个 Tomcat 下有多个 Web 应用,为了优化启动速度,你可以配置多个应用程序并行启动,可以通过修改 server.xml 中 Host 元素的 startStopThreads 属性来完成。startStopThreads 的值表示你想用多少个线程来启动你的 Web 应用,如果设成 0 表示你要并行启动 Web 应用,像下面这样的配置。

1
2
3
4
5
6
7
<Engine startStopThreads="0">
...
<Host startStopThreads="0">
...
</Host>
...
</Engine>

需要注意的是,Engine 元素里也配置了这个参数,这意味着如果你的 Tomcat 配置了多个 Host(虚拟主机),Tomcat 会以并行的方式启动多个 Host。

参考资料

Tomcat 容器

Tomcat 实现热部署和热加载

  • 热加载的实现方式是 Web 容器启动一个后台线程,定期检测类文件的变化,如果有变化,就重新加载类,在这个过程中不会清空 Session ,一般用在开发环境。
  • 热部署原理类似,也是由后台线程定时检测 Web 应用的变化,但它会重新加载整个 Web 应用。这种方式会清空 Session,比热加载更加干净、彻底,一般用在生产环境。

Tomcat 通过开启后台线程,使得各个层次的容器组件都有机会完成一些周期性任务。Tomcat 是基于 ScheduledThreadPoolExecutor 实现周期性任务的:

1
2
3
4
5
bgFuture = exec.scheduleWithFixedDelay(
new ContainerBackgroundProcessor(),// 要执行的 Runnable
backgroundProcessorDelay, // 第一次执行延迟多久
backgroundProcessorDelay, // 之后每次执行间隔多久
TimeUnit.SECONDS); // 时间单位

第一个参数就是要周期性执行的任务类 ContainerBackgroundProcessor,它是一个 Runnable,同时也是 ContainerBase 的内部类,ContainerBase 是所有容器组件的基类,我们来回忆一下容器组件有哪些,有 Engine、Host、Context 和 Wrapper 等,它们具有父子关系。

ContainerBackgroundProcessor 实现

我们接来看 ContainerBackgroundProcessor 具体是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected class ContainerBackgroundProcessor implements Runnable {

@Override
public void run() {
// 请注意这里传入的参数是 " 宿主类 " 的实例
processChildren(ContainerBase.this);
}

protected void processChildren(Container container) {
try {
//1. 调用当前容器的 backgroundProcess 方法。
container.backgroundProcess();

//2. 遍历所有的子容器,递归调用 processChildren,
// 这样当前容器的子孙都会被处理
Container[] children = container.findChildren();
for (int i = 0; i < children.length; i++) {
// 这里请你注意,容器基类有个变量叫做 backgroundProcessorDelay,如果大于 0,表明子容器有自己的后台线程,无需父容器来调用它的 processChildren 方法。
if (children[i].getBackgroundProcessorDelay() <= 0) {
processChildren(children[i]);
}
}
} catch (Throwable t) { ... }

上面的代码逻辑也是比较清晰的,首先 ContainerBackgroundProcessor 是一个 Runnable,它需要实现 run 方法,它的 run 很简单,就是调用了 processChildren 方法。这里有个小技巧,它把“宿主类”,也就是ContainerBase 的类实例当成参数传给了 run 方法

而在 processChildren 方法里,就做了两步:调用当前容器的 backgroundProcess 方法,以及递归调用子孙的 backgroundProcess 方法。请你注意 backgroundProcess 是 Container 接口中的方法,也就是说所有类型的容器都可以实现这个方法,在这个方法里完成需要周期性执行的任务。

这样的设计意味着什么呢?我们只需要在顶层容器,也就是 Engine 容器中启动一个后台线程,那么这个线程不但会执行 Engine 容器的周期性任务,它还会执行所有子容器的周期性任务

backgroundProcess 方法

上述代码都是在基类 ContainerBase 中实现的,那具体容器类需要做什么呢?其实很简单,如果有周期性任务要执行,就实现 backgroundProcess 方法;如果没有,就重用基类 ContainerBase 的方法。ContainerBase 的 backgroundProcess 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void backgroundProcess() {

//1. 执行容器中 Cluster 组件的周期性任务
Cluster cluster = getClusterInternal();
if (cluster != null) {
cluster.backgroundProcess();
}

//2. 执行容器中 Realm 组件的周期性任务
Realm realm = getRealmInternal();
if (realm != null) {
realm.backgroundProcess();
}

//3. 执行容器中 Valve 组件的周期性任务
Valve current = pipeline.getFirst();
while (current != null) {
current.backgroundProcess();
current = current.getNext();
}

//4. 触发容器的 " 周期事件 ",Host 容器的监听器 HostConfig 就靠它来调用
fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null);
}

从上面的代码可以看到,不仅每个容器可以有周期性任务,每个容器中的其他通用组件,比如跟集群管理有关的 Cluster 组件、跟安全管理有关的 Realm 组件都可以有自己的周期性任务。

我在前面的专栏里提到过,容器之间的链式调用是通过 Pipeline-Valve 机制来实现的,从上面的代码你可以看到容器中的 Valve 也可以有周期性任务,并且被 ContainerBase 统一处理。

请你特别注意的是,在 backgroundProcess 方法的最后,还触发了容器的“周期事件”。我们知道容器的生命周期事件有初始化、启动和停止等,那“周期事件”又是什么呢?它跟生命周期事件一样,是一种扩展机制,你可以这样理解:

又一段时间过去了,容器还活着,你想做点什么吗?如果你想做点什么,就创建一个监听器来监听这个“周期事件”,事件到了我负责调用你的方法。

总之,有了 ContainerBase 中的后台线程和 backgroundProcess 方法,各种子容器和通用组件不需要各自弄一个后台线程来处理周期性任务,这样的设计显得优雅和整洁。

Tomcat 热加载

有了 ContainerBase 的周期性任务处理“框架”,作为具体容器子类,只需要实现自己的周期性任务就行。而 Tomcat 的热加载,就是在 Context 容器中实现的。Context 容器的 backgroundProcess 方法是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void backgroundProcess() {

//WebappLoader 周期性的检查 WEB-INF/classes 和 WEB-INF/lib 目录下的类文件
Loader loader = getLoader();
if (loader != null) {
loader.backgroundProcess();
}

//Session 管理器周期性的检查是否有过期的 Session
Manager manager = getManager();
if (manager != null) {
manager.backgroundProcess();
}

// 周期性的检查静态资源是否有变化
WebResourceRoot resources = getResources();
if (resources != null) {
resources.backgroundProcess();
}

// 调用父类 ContainerBase 的 backgroundProcess 方法
super.backgroundProcess();
}

从上面的代码我们看到 Context 容器通过 WebappLoader 来检查类文件是否有更新,通过 Session 管理器来检查是否有 Session 过期,并且通过资源管理器来检查静态资源是否有更新,最后还调用了父类 ContainerBase 的 backgroundProcess 方法。

这里我们要重点关注,WebappLoader 是如何实现热加载的,它主要是调用了 Context 容器的 reload 方法,而 Context 的 reload 方法比较复杂,总结起来,主要完成了下面这些任务:

  1. 停止和销毁 Context 容器及其所有子容器,子容器其实就是 Wrapper,也就是说 Wrapper 里面 Servlet 实例也被销毁了。
  2. 停止和销毁 Context 容器关联的 Listener 和 Filter。
  3. 停止和销毁 Context 下的 Pipeline 和各种 Valve。
  4. 停止和销毁 Context 的类加载器,以及类加载器加载的类文件资源。
  5. 启动 Context 容器,在这个过程中会重新创建前面四步被销毁的资源。

在这个过程中,类加载器发挥着关键作用。一个 Context 容器对应一个类加载器,类加载器在销毁的过程中会把它加载的所有类也全部销毁。Context 容器在启动过程中,会创建一个新的类加载器来加载新的类文件。

在 Context 的 reload 方法里,并没有调用 Session 管理器的 distroy 方法,也就是说这个 Context 关联的 Session 是没有销毁的。你还需要注意的是,Tomcat 的热加载默认是关闭的,你需要在 conf 目录下的 Context.xml 文件中设置 reloadable 参数来开启这个功能,像下面这样:

1
<Context reloadable="true"/>

Tomcat 热部署

我们再来看看热部署,热部署跟热加载的本质区别是,热部署会重新部署 Web 应用,原来的 Context 对象会整个被销毁掉,因此这个 Context 所关联的一切资源都会被销毁,包括 Session。

那么 Tomcat 热部署又是由哪个容器来实现的呢?应该不是由 Context,因为热部署过程中 Context 容器被销毁了,那么这个重担就落在 Host 身上了,因为它是 Context 的父容器。

跟 Context 不一样,Host 容器并没有在 backgroundProcess 方法中实现周期性检测的任务,而是通过监听器 HostConfig 来实现的,HostConfig 就是前面提到的“周期事件”的监听器,那“周期事件”达到时,HostConfig 会做什么事呢?

1
2
3
4
5
6
public void lifecycleEvent(LifecycleEvent event) {
// 执行 check 方法。
if (event.getType().equals(Lifecycle.PERIODIC_EVENT)) {
check();
}
}

它执行了 check 方法,我们接着来看 check 方法里做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void check() {

if (host.getAutoDeploy()) {
// 检查这个 Host 下所有已经部署的 Web 应用
DeployedApplication[] apps =
deployed.values().toArray(new DeployedApplication[0]);

for (int i = 0; i < apps.length; i++) {
// 检查 Web 应用目录是否有变化
checkResources(apps[i], false);
}

// 执行部署
deployApps();
}
}

其实 HostConfig 会检查 webapps 目录下的所有 Web 应用:

  • 如果原来 Web 应用目录被删掉了,就把相应 Context 容器整个销毁掉。
  • 是否有新的 Web 应用目录放进来了,或者有新的 WAR 包放进来了,就部署相应的 Web 应用。

因此 HostConfig 做的事情都是比较“宏观”的,它不会去检查具体类文件或者资源文件是否有变化,而是检查 Web 应用目录级别的变化。

Tomcat 的类加载机制

Tomcat 的自定义类加载器 WebAppClassLoader 打破了双亲委派机制,它首先自己尝试去加载某个类,如果找不到再代理给父类加载器,其目的是优先加载 Web 应用自己定义的类。具体实现就是重写 ClassLoader 的两个方法:findClass 和 loadClass。

findClass 方法

我们先来看看 findClass 方法的实现,为了方便理解和阅读,我去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Class<?> findClass(String name) throws ClassNotFoundException {
...

Class<?> clazz = null;
try {
//1. 先在 Web 应用目录下查找类
clazz = findClassInternal(name);
} catch (RuntimeException e) {
throw e;
}

if (clazz == null) {
try {
//2. 如果在本地目录没有找到,交给父加载器去查找
clazz = super.findClass(name);
} catch (RuntimeException e) {
throw e;
}

//3. 如果父类也没找到,抛出 ClassNotFoundException
if (clazz == null) {
throw new ClassNotFoundException(name);
}

return clazz;
}

在 findClass 方法里,主要有三个步骤:

  1. 先在 Web 应用本地目录下查找要加载的类。
  2. 如果没有找到,交给父加载器去查找,它的父加载器就是上面提到的系统类加载器 AppClassLoader。
  3. 如何父加载器也没找到这个类,抛出 ClassNotFound 异常。

loadClass 方法

接着我们再来看 Tomcat 类加载器的 loadClass 方法的实现,同样我也去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {

synchronized (getClassLoadingLock(name)) {

Class<?> clazz = null;

//1. 先在本地 cache 查找该类是否已经加载过
clazz = findLoadedClass0(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

//2. 从系统类加载器的 cache 中查找是否加载过
clazz = findLoadedClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

// 3. 尝试用 ExtClassLoader 类加载器类加载,为什么?
ClassLoader javaseLoader = getJavaseClassLoader();
try {
clazz = javaseLoader.loadClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 4. 尝试在本地目录搜索 class 并加载
try {
clazz = findClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 5. 尝试用系统类加载器 (也就是 AppClassLoader) 来加载
try {
clazz = Class.forName(name, false, parent);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}
}

//6. 上述过程都加载失败,抛出异常
throw new ClassNotFoundException(name);
}

loadClass 方法稍微复杂一点,主要有六个步骤:

  1. 先在本地 Cache 查找该类是否已经加载过,也就是说 Tomcat 的类加载器是否已经加载过这个类。
  2. 如果 Tomcat 类加载器没有加载过这个类,再看看系统类加载器是否加载过。
  3. 如果都没有,就让ExtClassLoader去加载,这一步比较关键,目的防止 Web 应用自己的类覆盖 JRE 的核心类。因为 Tomcat 需要打破双亲委派机制,假如 Web 应用里自定义了一个叫 Object 的类,如果先加载这个 Object 类,就会覆盖 JRE 里面的那个 Object 类,这就是为什么 Tomcat 的类加载器会优先尝试用 ExtClassLoader 去加载,因为 ExtClassLoader 会委托给 BootstrapClassLoader 去加载,BootstrapClassLoader 发现自己已经加载了 Object 类,直接返回给 Tomcat 的类加载器,这样 Tomcat 的类加载器就不会去加载 Web 应用下的 Object 类了,也就避免了覆盖 JRE 核心类的问题。
  4. 如果 ExtClassLoader 加载器加载失败,也就是说 JRE 核心类中没有这类,那么就在本地 Web 应用目录下查找并加载。
  5. 如果本地目录下没有这个类,说明不是 Web 应用自己定义的类,那么由系统类加载器去加载。这里请你注意,Web 应用是通过Class.forName调用交给系统类加载器的,因为Class.forName的默认加载器就是系统类加载器。
  6. 如果上述加载过程全部失败,抛出 ClassNotFound 异常。

从上面的过程我们可以看到,Tomcat 的类加载器打破了双亲委派机制,没有一上来就直接委托给父加载器,而是先在本地目录下加载,为了避免本地目录下的类覆盖 JRE 的核心类,先尝试用 JVM 扩展类加载器 ExtClassLoader 去加载。那为什么不先用系统类加载器 AppClassLoader 去加载?很显然,如果是这样的话,那就变成双亲委派机制了,这就是 Tomcat 类加载器的巧妙之处。

Tomcat 实现应用隔离

Tomcat 作为 Web 容器,需要解决以下问题:

  1. 如果在 Tomcat 中运行了两个 Web 应用程序,两个 Web 应用中有同名的 Servlet,但是功能不同,Tomcat 需要同时加载和管理这两个同名的 Servlet 类,保证它们不会冲突,因此 Web 应用之间的类需要隔离。
  2. 两个 Web 应用都依赖同一个第三方的 JAR 包,比如 Spring,那 Spring 的 JAR 包被加载到内存后,Tomcat 要保证这两个 Web 应用能够共享,也就是说 Spring 的 JAR 包只被加载一次,否则随着依赖的第三方 JAR 包增多,JVM 的内存会膨胀。
  3. 需要隔离 Tomcat 本身的类和 Web 应用的类。

img

WebAppClassLoader

针对第一个问题:

如果使用 JVM 默认 AppClassLoader 来加载 Web 应用,AppClassLoader 只能加载一个 Servlet 类,在加载第二个同名 Servlet 类时,AppClassLoader 会返回第一个 Servlet 类的 Class 实例,这是因为在 AppClassLoader 看来,同名的 Servlet 类只被加载一次。

Tomcat 的解决方案是自定义一个类加载器 WebAppClassLoader, 并且给每个 Web 应用创建一个类加载器实例。我们知道,Context 容器组件对应一个 Web 应用,因此,每个 Context 容器负责创建和维护一个 WebAppClassLoader 加载器实例。这背后的原理是,不同的加载器实例加载的类被认为是不同的类,即使它们的类名相同。这就相当于在 Java 虚拟机内部创建了一个个相互隔离的 Java 类空间,每一个 Web 应用都有自己的类空间,Web 应用之间通过各自的类加载器互相隔离。

SharedClassLoader

针对第二个问题:

本质需求是两个 Web 应用之间怎么共享库类,并且不能重复加载相同的类。我们知道,在双亲委派机制里,各个子加载器都可以通过父加载器去加载类,那么把需要共享的类放到父加载器的加载路径下不就行了吗,应用程序也正是通过这种方式共享 JRE 的核心类。因此 Tomcat 的设计者又加了一个类加载器 SharedClassLoader,作为 WebAppClassLoader 的父加载器,专门来加载 Web 应用之间共享的类。如果 WebAppClassLoader 自己没有加载到某个类,就会委托父加载器 SharedClassLoader 去加载这个类,SharedClassLoader 会在指定目录下加载共享类,之后返回给 WebAppClassLoader,这样共享的问题就解决了。

CatalinaClassloader

如何隔离 Tomcat 本身的类和 Web 应用的类?

要共享可以通过父子关系,要隔离那就需要兄弟关系了。兄弟关系就是指两个类加载器是平行的,它们可能拥有同一个父加载器,但是两个兄弟类加载器加载的类是隔离的。基于此 Tomcat 又设计一个类加载器 CatalinaClassloader,专门来加载 Tomcat 自身的类。这样设计有个问题,那 Tomcat 和各 Web 应用之间需要共享一些类时该怎么办呢?

CommonClassLoader

老办法,还是再增加一个 CommonClassLoader,作为 CatalinaClassloader 和 SharedClassLoader 的父加载器。CommonClassLoader 能加载的类都可以被 CatalinaClassLoader 和 SharedClassLoader 使用,而 CatalinaClassLoader 和 SharedClassLoader 能加载的类则与对方相互隔离。WebAppClassLoader 可以使用 SharedClassLoader 加载到的类,但各个 WebAppClassLoader 实例之间相互隔离。

Tomcat 实现 Servlet 规范

Servlet 容器最重要的任务就是创建 Servlet 的实例并且调用 Servlet。

一个 Web 应用里往往有多个 Servlet,而在 Tomcat 中一个 Web 应用对应一个 Context 容器,也就是说一个 Context 容器需要管理多个 Servlet 实例。但 Context 容器并不直接持有 Servlet 实例,而是通过子容器 Wrapper 来管理 Servlet,你可以把 Wrapper 容器看作是 Servlet 的包装。

为什么需要 Wrapper 呢?Context 容器直接维护一个 Servlet 数组不就行了吗?这是因为 Servlet 不仅仅是一个类实例,它还有相关的配置信息,比如它的 URL 映射、它的初始化参数,因此设计出了一个包装器,把 Servlet 本身和它相关的数据包起来,没错,这就是面向对象的思想。

除此以外,Servlet 规范中还有两个重要特性:Listener 和 Filter,Tomcat 也需要创建它们的实例,并在合适的时机去调用它们的方法。

Servlet 管理

Tomcat 是用 Wrapper 容器来管理 Servlet 的,那 Wrapper 容器具体长什么样子呢?我们先来看看它里面有哪些关键的成员变量:

1
protected volatile Servlet instance = null;

它拥有一个 Servlet 实例,并且 Wrapper 通过 loadServlet 方法来实例化 Servlet。为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
public synchronized Servlet loadServlet() throws ServletException {
Servlet servlet;

//1. 创建一个 Servlet 实例
servlet = (Servlet) instanceManager.newInstance(servletClass);

//2. 调用了 Servlet 的 init 方法,这是 Servlet 规范要求的
initServlet(servlet);

return servlet;
}

其实 loadServlet 主要做了两件事:创建 Servlet 的实例,并且调用 Servlet 的 init 方法,因为这是 Servlet 规范要求的。

那接下来的问题是,什么时候会调到这个 loadServlet 方法呢?为了加快系统的启动速度,我们往往会采取资源延迟加载的策略,Tomcat 也不例外,默认情况下 Tomcat 在启动时不会加载你的 Servlet,除非你把 Servlet 的loadOnStartup参数设置为true

这里还需要你注意的是,虽然 Tomcat 在启动时不会创建 Servlet 实例,但是会创建 Wrapper 容器,就好比尽管枪里面还没有子弹,先把枪造出来。那子弹什么时候造呢?是真正需要开枪的时候,也就是说有请求来访问某个 Servlet 时,这个 Servlet 的实例才会被创建。

那 Servlet 是被谁调用的呢?我们回忆一下专栏前面提到过 Tomcat 的 Pipeline-Valve 机制,每个容器组件都有自己的 Pipeline,每个 Pipeline 中有一个 Valve 链,并且每个容器组件有一个 BasicValve(基础阀)。Wrapper 作为一个容器组件,它也有自己的 Pipeline 和 BasicValve,Wrapper 的 BasicValve 叫 StandardWrapperValve

你可以想到,当请求到来时,Context 容器的 BasicValve 会调用 Wrapper 容器中 Pipeline 中的第一个 Valve,然后会调用到 StandardWrapperValve。我们先来看看它的 invoke 方法是如何实现的,同样为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void invoke(Request request, Response response) {

//1. 实例化 Servlet
servlet = wrapper.allocate();

//2. 给当前请求创建一个 Filter 链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

//3. 调用这个 Filter 链,Filter 链中的最后一个 Filter 会调用 Servlet
filterChain.doFilter(request.getRequest(), response.getResponse());

}

StandardWrapperValve 的 invoke 方法比较复杂,去掉其他异常处理的一些细节,本质上就是三步:

  • 第一步,创建 Servlet 实例;
  • 第二步,给当前请求创建一个 Filter 链;
  • 第三步,调用这个 Filter 链。

你可能会问,为什么需要给每个请求创建一个 Filter 链?这是因为每个请求的请求路径都不一样,而 Filter 都有相应的路径映射,因此不是所有的 Filter 都需要来处理当前的请求,我们需要根据请求的路径来选择特定的一些 Filter 来处理。

第二个问题是,为什么没有看到调到 Servlet 的 service 方法?这是因为 Filter 链的 doFilter 方法会负责调用 Servlet,具体来说就是 Filter 链中的最后一个 Filter 会负责调用 Servlet。

接下来我们来看 Filter 的实现原理。

Filter 管理

我们知道,跟 Servlet 一样,Filter 也可以在web.xml文件里进行配置,不同的是,Filter 的作用域是整个 Web 应用,因此 Filter 的实例是在 Context 容器中进行管理的,Context 容器用 Map 集合来保存 Filter。

1
private Map<String, FilterDef> filterDefs = new HashMap<>();

那上面提到的 Filter 链又是什么呢?Filter 链的存活期很短,它是跟每个请求对应的。一个新的请求来了,就动态创建一个 FIlter 链,请求处理完了,Filter 链也就被回收了。理解它的原理也非常关键,我们还是来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class ApplicationFilterChain implements FilterChain {

//Filter 链中有 Filter 数组,这个好理解
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];

//Filter 链中的当前的调用位置
private int pos = 0;

// 总共有多少了 Filter
private int n = 0;

// 每个 Filter 链对应一个 Servlet,也就是它要调用的 Servlet
private Servlet servlet = null;

public void doFilter(ServletRequest req, ServletResponse res) {
internalDoFilter(request,response);
}

private void internalDoFilter(ServletRequest req,
ServletResponse res){

// 每个 Filter 链在内部维护了一个 Filter 数组
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
Filter filter = filterConfig.getFilter();

filter.doFilter(request, response, this);
return;
}

servlet.service(request, response);

}

从 ApplicationFilterChain 的源码我们可以看到几个关键信息:

  • Filter 链中除了有 Filter 对象的数组,还有一个整数变量 pos,这个变量用来记录当前被调用的 Filter 在数组中的位置。
  • Filter 链中有个 Servlet 实例,这个好理解,因为上面提到了,每个 Filter 链最后都会调到一个 Servlet。
  • Filter 链本身也实现了 doFilter 方法,直接调用了一个内部方法 internalDoFilter。
  • internalDoFilter 方法的实现比较有意思,它做了一个判断,如果当前 Filter 的位置小于 Filter 数组的长度,也就是说 Filter 还没调完,就从 Filter 数组拿下一个 Filter,调用它的 doFilter 方法。否则,意味着所有 Filter 都调到了,就调用 Servlet 的 service 方法。

但问题是,方法体里没看到循环,谁在不停地调用 Filter 链的 doFIlter 方法呢?Filter 是怎么依次调到的呢?

答案是Filter 本身的 doFilter 方法会调用 Filter 链的 doFilter 方法,我们还是来看看代码就明白了:

1
2
3
4
5
6
7
8
9
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain){

...

// 调用 Filter 的方法
chain.doFilter(request, response);

}

注意 Filter 的 doFilter 方法有个关键参数 FilterChain,就是 Filter 链。并且每个 Filter 在实现 doFilter 时,必须要调用 Filter 链的 doFilter 方法,而 Filter 链中保存当前 FIlter 的位置,会调用下一个 FIlter 的 doFilter 方法,这样链式调用就完成了。

Filter 链跟 Tomcat 的 Pipeline-Valve 本质都是责任链模式,但是在具体实现上稍有不同,你可以细细体会一下。

Listener 管理

我们接着聊 Servlet 规范里 Listener。跟 Filter 一样,Listener 也是一种扩展机制,你可以监听容器内部发生的事件,主要有两类事件:

  • 第一类是生命状态的变化,比如 Context 容器启动和停止、Session 的创建和销毁。
  • 第二类是属性的变化,比如 Context 容器某个属性值变了、Session 的某个属性值变了以及新的请求来了等。

我们可以在web.xml配置或者通过注解的方式来添加监听器,在监听器里实现我们的业务逻辑。对于 Tomcat 来说,它需要读取配置文件,拿到监听器类的名字,实例化这些类,并且在合适的时机调用这些监听器的方法。

Tomcat 是通过 Context 容器来管理这些监听器的。Context 容器将两类事件分开来管理,分别用不同的集合来存放不同类型事件的监听器:

1
2
3
4
5
// 监听属性值变化的监听器
private List<Object> applicationEventListenersList = new CopyOnWriteArrayList<>();

// 监听生命事件的监听器
private Object applicationLifecycleListenersObjects[] = new Object[0];

剩下的事情就是触发监听器了,比如在 Context 容器的启动方法里,就触发了所有的 ServletContextListener:

1
2
3
4
5
6
7
8
9
10
11
12
//1. 拿到所有的生命周期监听器
Object instances[] = getApplicationLifecycleListeners();

for (int i = 0; i < instances.length; i++) {
//2. 判断 Listener 的类型是不是 ServletContextListener
if (!(instances[i] instanceof ServletContextListener))
continue;

//3. 触发 Listener 的方法
ServletContextListener lr = (ServletContextListener) instances[i];
lr.contextInitialized(event);
}

需要注意的是,这里的 ServletContextListener 接口是一种留给用户的扩展机制,用户可以实现这个接口来定义自己的监听器,监听 Context 容器的启停事件。Spring 就是这么做的。ServletContextListener 跟 Tomcat 自己的生命周期事件 LifecycleListener 是不同的。LifecycleListener 定义在生命周期管理组件中,由基类 LifeCycleBase 统一管理。

Tomcat 支持异步 Servlet

异步示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncServlet extends HttpServlet {

//Web 应用线程池,用来处理异步 Servlet
ExecutorService executor = Executors.newSingleThreadExecutor();

public void service(HttpServletRequest req, HttpServletResponse resp) {
//1. 调用 startAsync 或者异步上下文
final AsyncContext ctx = req.startAsync();

// 用线程池来执行耗时操作
executor.execute(new Runnable() {

@Override
public void run() {

// 在这里做耗时的操作
try {
ctx.getResponse().getWriter().println("Handling Async Servlet");
} catch (IOException e) {}

//3. 异步 Servlet 处理完了调用异步上下文的 complete 方法
ctx.complete();
}

});
}
}

有三个要点:

  1. 通过注解的方式来注册 Servlet,除了 @WebServlet 注解,还需要加上 asyncSupported=true 的属性,表明当前的 Servlet 是一个异步 Servlet。
  2. Web 应用程序需要调用 Request 对象的 startAsync 方法来拿到一个异步上下文 AsyncContext。这个上下文保存了请求和响应对象。
  3. Web 应用需要开启一个新线程来处理耗时的操作,处理完成后需要调用 AsyncContext 的 complete 方法。目的是告诉 Tomcat,请求已经处理完成。

这里请你注意,虽然异步 Servlet 允许用更长的时间来处理请求,但是也有超时限制的,默认是 30 秒,如果 30 秒内请求还没处理完,Tomcat 会触发超时机制,向浏览器返回超时错误,如果这个时候你的 Web 应用再调用ctx.complete方法,会得到一个 IllegalStateException 异常。

异步 Servlet 原理

通过上面的例子,相信你对 Servlet 的异步实现有了基本的理解。要理解 Tomcat 在这个过程都做了什么事情,关键就是要弄清楚req.startAsync方法和ctx.complete方法都做了什么。

startAsync 方法

startAsync 方法其实就是创建了一个异步上下文 AsyncContext 对象,AsyncContext 对象的作用是保存请求的中间信息,比如 Request 和 Response 对象等上下文信息。你来思考一下为什么需要保存这些信息呢?

这是因为 Tomcat 的工作线程在Request.startAsync调用之后,就直接结束回到线程池中了,线程本身不会保存任何信息。也就是说一个请求到服务端,执行到一半,你的 Web 应用正在处理,这个时候 Tomcat 的工作线程没了,这就需要有个缓存能够保存原始的 Request 和 Response 对象,而这个缓存就是 AsyncContext。

有了 AsyncContext,你的 Web 应用通过它拿到 request 和 response 对象,拿到 Request 对象后就可以读取请求信息,请求处理完了还需要通过 Response 对象将 HTTP 响应发送给浏览器。

除了创建 AsyncContext 对象,startAsync 还需要完成一个关键任务,那就是告诉 Tomcat 当前的 Servlet 处理方法返回时,不要把响应发到浏览器,因为这个时候,响应还没生成呢;并且不能把 Request 对象和 Response 对象销毁,因为后面 Web 应用还要用呢。

在 Tomcat 中,负责 flush 响应数据的是 CoyoteAdaptor,它还会销毁 Request 对象和 Response 对象,因此需要通过某种机制通知 CoyoteAdaptor,具体来说是通过下面这行代码:

1
this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this);

你可以把它理解为一个 Callback,在这个 action 方法里设置了 Request 对象的状态,设置它为一个异步 Servlet 请求。

我们知道连接器是调用 CoyoteAdapter 的 service 方法来处理请求的,而 CoyoteAdapter 会调用容器的 service 方法,当容器的 service 方法返回时,CoyoteAdapter 判断当前的请求是不是异步 Servlet 请求,如果是,就不会销毁 Request 和 Response 对象,也不会把响应信息发到浏览器。你可以通过下面的代码理解一下,这是 CoyoteAdapter 的 service 方法,我对它进行了简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) {

// 调用容器的 service 方法处理请求
connector.getService().getContainer().getPipeline().
getFirst().invoke(request, response);

// 如果是异步 Servlet 请求,仅仅设置一个标志,
// 否则说明是同步 Servlet 请求,就将响应数据刷到浏览器
if (request.isAsync()) {
async = true;
} else {
request.finishRequest();
response.finishResponse();
}

// 如果不是异步 Servlet 请求,就销毁 Request 对象和 Response 对象
if (!async) {
request.recycle();
response.recycle();
}
}

接下来,当 CoyoteAdaptor 的 service 方法返回到 ProtocolHandler 组件时,ProtocolHandler 判断返回值,如果当前请求是一个异步 Servlet 请求,它会把当前 Socket 的协议处理者 Processor 缓存起来,将 SocketWrapper 对象和相应的 Processor 存到一个 Map 数据结构里。

1
private final Map<S,Processor> connections = new ConcurrentHashMap<>();

之所以要缓存是因为这个请求接下来还要接着处理,还是由原来的 Processor 来处理,通过 SocketWrapper 就能从 Map 里找到相应的 Processor。

complete 方法

接着我们再来看关键的ctx.complete方法,当请求处理完成时,Web 应用调用这个方法。那么这个方法做了些什么事情呢?最重要的就是把响应数据发送到浏览器。

这件事情不能由 Web 应用线程来做,也就是说ctx.complete方法不能直接把响应数据发送到浏览器,因为这件事情应该由 Tomcat 线程来做,但具体怎么做呢?

我们知道,连接器中的 Endpoint 组件检测到有请求数据达到时,会创建一个 SocketProcessor 对象交给线程池去处理,因此 Endpoint 的通信处理和具体请求处理在两个线程里运行。

在异步 Servlet 的场景里,Web 应用通过调用ctx.complete方法时,也可以生成一个新的 SocketProcessor 任务类,交给线程池处理。对于异步 Servlet 请求来说,相应的 Socket 和协议处理组件 Processor 都被缓存起来了,并且这些对象都可以通过 Request 对象拿到。

讲到这里,你可能已经猜到ctx.complete是如何实现的了:

1
2
3
4
5
6
7
8
public void complete() {
// 检查状态合法性,我们先忽略这句
check();

// 调用 Request 对象的 action 方法,其实就是通知连接器,这个异步请求处理完了
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);

}

我们可以看到 complete 方法调用了 Request 对象的 action 方法。而在 action 方法里,则是调用了 Processor 的 processSocketEvent 方法,并且传入了操作码 OPEN_READ。

1
2
3
4
5
6
7
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}

我们接着看 processSocketEvent 方法,它调用 SocketWrapper 的 processSocket 方法:

1
2
3
4
5
6
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}

而 SocketWrapper 的 processSocket 方法会创建 SocketProcessor 任务类,并通过 Tomcat 线程池来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {

if (socketWrapper == null) {
return false;
}

SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 线程池运行
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}

请你注意 createSocketProcessor 函数的第二个参数是 SocketEvent,这里我们传入的是 OPEN_READ。通过这个参数,我们就能控制 SocketProcessor 的行为,因为我们不需要再把请求发送到容器进行处理,只需要向浏览器端发送数据,并且重新在这个 Socket 上监听新的请求就行了。

参考资料

Tomcat 和 Jetty

Web 容器 Tomcat 或 Jetty,作为重要的系统中间件,连接着浏览器和你的 Web 应用,并且支撑着 Web 程序的运行,可以说,弄懂了 Tomcat 和 Jetty 的原理,Java Web 开发对你来说就毫无秘密可言

Web 容器

早期的 Web 应用主要用于浏览新闻等静态页面,HTTP 服务器(比如 Apache、Nginx)向浏览器返回静态 HTML,浏览器负责解析 HTML,将结果呈现给用户。

随着互联网的发展,我们已经不满足于仅仅浏览静态页面,还希望通过一些交互操作,来获取动态结果,因此也就需要一些扩展机制能够让 HTTP 服务器调用服务端程序。

于是 Sun 公司推出了 Servlet 技术。你可以把 Servlet 简单理解为运行在服务端的 Java 小程序,但是 Servlet 没有 main 方法,不能独立运行,因此必须把它部署到 Servlet 容器中,由容器来实例化并调用 Servlet。

而 Tomcat 和 Jetty 就是一个 Servlet 容器。为了方便使用,它们也具有 HTTP 服务器的功能,因此Tomcat 或者 Jetty 就是一个“HTTP 服务器 + Servlet 容器”,我们也叫它们 Web 容器。

其他应用服务器比如 JBoss 和 WebLogic,它们不仅仅有 Servlet 容器的功能,也包含 EJB 容器,是完整的 Java EE 应用服务器。从这个角度看,Tomcat 和 Jetty 算是一个轻量级的应用服务器。

在微服务架构日渐流行的今天,开发人员更喜欢稳定的、轻量级的应用服务器,并且应用程序用内嵌的方式来运行 Servlet 容器也逐渐流行起来。之所以选择轻量级,是因为在微服务架构下,我们把一个大而全的单体应用,拆分成一个个功能单一的微服务,在这个过程中,服务的数量必然要增加,但为了减少资源的消耗,并且降低部署的成本,我们希望运行服务的 Web 容器也是轻量级的,Web 容器本身应该消耗较少的内存和 CPU 资源,并且由应用本身来启动一个嵌入式的 Web 容器,而不是通过 Web 容器来部署和启动应用,这样可以降低应用部署的复杂度。

MQ 面试

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决异步处理、应用间耦合,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ,而部分数据库如 Redis、MySQL 以及 phxsql 也可实现消息队列的功能。

注意:_为了简便,下文中除了文章标题,一律使用 MQ 简称_。

MQ 简介

【简单】什么是 MQ?

MQ(Message Queue,消息队列) 是一种异步通信机制,用于在不同服务、应用或系统组件之间可靠地传递消息。它的核心思想是解耦生产者和消费者,通过缓冲消息来提高系统的可靠性、扩展性和可维护性。

MQ 的核心概念

  • 生产者(Producer):发送消息的应用或服务。
  • 消费者(Consumer):接收并处理消息的应用或服务。
  • 消息(Message):传输的数据单位,可以是文本、JSON、二进制等格式。
  • 队列(Queue):存储消息的缓冲区,遵循 FIFO(先进先出) 或优先级策略。
  • Broker(消息代理):负责接收、存储和转发消息的中间件(如 RabbitMQ、Kafka)。

消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

MQ 的数据可驻留在内存或磁盘上,直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ。

【简单】为什么需要 MQ?

MQ 的典型应用场景

  • 异步处理
  • 系统解耦
  • 流量削峰
  • 系统间通信
  • 传输缓冲
  • 最终一致性

::: info 异步处理

:::

MQ 可以将系统间的处理流程异步化,减少等待响应的时间,从而提高整体并发吞吐量。一般,MQ 异步处理应用于非核心流程,例如:短信/邮件通知、数据推送、上报数据到监控中心、日志中心等。

假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms200ms300ms。最终总耗时为: 10ms+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。

如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。

::: info 系统解耦

:::

通过 MQ,可以消除系统间的强耦合。它的好处在于:

  • 消息的消费者系统可以随意增加,无需修改生产者系统的代码。
  • 生产者系统、消费者系统彼此不会影响对方的流程。
    • 如果生产者系统宕机,消费者系统收不到消息,就不会有下一步的动作。
    • 如果消费者系统宕机,生产者系统让然可以正常发送消息,不影响流程。

不同系统如果要建立通信,传统的做法是:调用接口。

如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。

如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦

::: info 流量削峰

:::

上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息缓冲池,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。

MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。

如果没有 MQ,两个系统之间通过 协商滑动窗口限流/降级/熔断 等复杂的方案也能实现 流控。但 系统复杂性 指数级增长,势必在上游或者下游做存储,并且要处理 定时拥塞 等一系列问题。而且每当有 处理能力有差距 的时候,都需要 单独 开发一套逻辑来维护这套逻辑。

假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。

如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。

::: info 系统间通信

:::

消息队列一般都内置了 高效的通信机制,因此也可以用于单纯的 消息通讯,比如实现 点对点消息队列 或者 聊天室 等。

生产者/消费者 模式,只需要关心消息是否 送达队列,至于谁希望订阅和需要消费,是 下游 的事情,无疑极大地减少了开发和联调的工作量。

::: info 传输缓冲

:::

(1)MQ 常被用于做海量数据的传输缓冲。

例如,Kafka 常被用于做为各种日志数据、采集数据的数据中转。然后,Kafka 将数据转发给 Logstash、Elasticsearch 中,然后基于 Elasticsearch 来做日志中心,提供检索、聚合、分析日志的能力。开发者可以通过 Kibana 集成 Elasticsearch 数据进行可视化展示,或自行进行定制化开发。

img

(2)MQ 也可以被用于流式处理。

例如,Kafka 几乎已经是流计算的数据采集端的标准组件。而流计算通过实时数据处理能力,提供了更为快捷的聚合计算能力,被大量应用于链路监控、实时监控、实时数仓、实时大屏、风控、推荐等应用领域。

::: info 最终一致性

:::

最终一致性 不是 消息队列 的必备特性,但确实可以依靠 消息队列 来做 最终一致性 的事情。

  • 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制 实现消息 可靠发送接收、业务操作的可靠执行,要注意 消息重复幂等设计
  • 所有不保证 100% 不丢消息 的消息队列,理论上无法实现 最终一致性

Kafka 一类的设计,在设计层面上就有 丢消息 的可能(比如 定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

【中等】引入 MQ 带来哪些问题?

任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。

MQ 主要引入了以下问题:

  • 系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的。
  • 系统复杂度提高:使用 MQ,需要关注一些新的问题:
    • 如何保证消息没有 重复消费
    • 如何处理 消息丢失 的问题?
    • 如何保证传递 消息的顺序性
    • 如何处理大量 消息积压 的问题?
  • 一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?

【中等】MQ 有哪些通信模型?

MQ(消息队列)常见的通信模型主要有以下几种,适用于不同的业务场景:

点对点(Point-to-Point / Queue)

  • 特点:消息由 一个生产者 发送到 一个队列只有一个消费者 能消费该消息(竞争消费),消息被消费后即从队列删除。
  • 适用场景:任务分发(如订单处理、异步任务)。
  • 示例:RabbitMQ 的普通队列、ActiveMQ 的 Queue。

发布/订阅(Publish/Subscribe / Topic)

  • 特点:消息由 一个生产者 发送到 Topic(主题)多个消费者(订阅者)可同时接收同一消息,消息会广播给所有订阅者。
  • 适用场景:事件通知(如系统日志广播、实时数据同步)。
  • 示例:RabbitMQ 的 Exchange + Fanout 模式、Kafka 的 Topic。

请求/响应(Request/Reply)

  • 特点:生产者发送消息后,消费者处理并返回响应(类似 RPC),通常需要 临时队列 存储响应。
  • 适用场景:需要同步结果的异步调用(如支付状态查询)。
  • 示例:RabbitMQ 的 RPC 模式、Kafka 的 Request-Reply 扩展。

扇出(Fanout)

  • 特点:消息发送到 Exchange 后,无条件广播 给所有绑定的队列(无路由规则),类似发布/订阅,但更简单。
  • 适用场景:实时通知多个系统(如价格变动、库存更新)。
  • 示例:RabbitMQ 的 Fanout Exchange。

路由(Routing / Direct)

  • 特点:消息根据 Routing Key 被精准投递到匹配的队列,消费者只接收符合规则的消息。
  • 适用场景:条件过滤(如错误日志分级处理)。
  • 示例:RabbitMQ 的 Direct Exchange。

主题路由(Topic)

  • 特点:基于 通配符(如 user.*)匹配 Routing Key,实现灵活订阅,比 Direct 更灵活,比 Fanout 更精准。
  • 适用场景:复杂事件分发(如物联网设备消息分类)。
  • 示例:RabbitMQ 的 Topic Exchange、MQTT 的 Topic。

总结

模型 生产者-消费者关系 典型应用场景
点对点 1:1(竞争消费) 任务队列、订单处理
发布/订阅 1:N(广播) 日志广播、实时通知
请求/响应 1:1(带响应) 异步 RPC、结果回调
扇出 1:N(无条件广播) 多系统数据同步
路由 1:1(精准匹配) 条件过滤、优先级队列
主题路由 1:N(通配符匹配) 复杂事件分类(如 IoT)

【中等】MQ 推拉模式各有什么利弊,如何选择?

消息引擎(MQ)获取消息的模式主要分为 Push(推)Pull(拉) 两种,不同消息队列中间件采用不同的策略,部分系统还支持 混合模式

Push 模式(服务端推送)

  • 特点
    • 消息由 Broker 主动推送给消费者,消费者被动接收。
    • 实时性高,减少消费者轮询开销。
  • 优点:低延迟,适合实时性要求高的场景(如即时通讯)。
  • 缺点:可能造成消费者过载(需背压机制控制流速)。
  • 典型实现:RabbitMQ、ActiveMQ、RocketMQ(默认长轮询模拟 Push)。

Pull 模式(客户端拉取)

  • 特点
    • 消费者主动从 Broker 拉取消息,按需获取。
    • 消费者控制消费速率。
  • 优点:避免消息堆积冲击消费者,适合高吞吐场景(如日志处理)。
  • 缺点:存在空轮询开销(可通过长轮询优化)。
  • 典型实现:Kafka、Pulsar(原生 Pull)、RocketMQ(支持显式 Pull)。

长轮询(Long Polling)

  • 特点Push 和 Pull 的折中方案。消费者发起请求后,Broker 若无消息则保持连接,直到有消息或超时才返回。
  • 优点:减少无效轮询,平衡实时性与服务端压力。
  • 典型实现:RocketMQ(默认模式)、HTTP 长轮询(如 WebSocket)。

混合模式(Push + Pull)

  • 特点
    • 关键消息用 Push 保证实时性,批量数据用 Pull 提高吞吐。
    • 消费者可动态切换模式。
  • 典型实现:Pulsar(支持多模式)、部分自研 MQ 系统。

对比总结

模式 实时性 服务端压力 消费者控制力 适用场景
Push 即时通讯、事件通知
Pull 大数据处理、日志收集
长轮询 平衡实时性与性能(如电商)

选择建议

  • 需要低延迟 → Push(如 RabbitMQ)。
  • 需要高吞吐 → Pull(如 Kafka)。
  • 平衡场景 → 长轮询(如 RocketMQ)。

MQ 可靠传输

【困难】如何保证 MQ 消息不丢失?

要保证 MQ 中的消息不丢失,需从 生产端、MQ 服务端、消费端 三个环节进行可靠性设计。一言以蔽之,生产端确认+服务端持久化+消费端手动 ACK+监控补偿 是保证消息不丢失的核心逻辑,需根据业务场景权衡性能与可靠性。

::: info 生产端防丢失

:::

Kafka RocketMQ
发送方式 支持同步、异步、异步回调发送方式
同步性能低;异步无视成功与否;异步回调比较合适
支持同步、异步回调发送方式
同步性能低;异步回调比较合适
生产 ACK acks 参数确保消息被多少副本写入成功后才返回确认
min.insync.replicas 配合 acks 使用,设定最少写入副本数
(配合刷盘机制)若配置为同步刷盘,消息必须先成功写入磁盘,发送方才能收到写成功的确认
失败重试 retries 参数控制失败重试次数 retryTimesWhenSendFailed 参数控制失败重试次数
事务 通过事务+生产幂等,实现 exactly once 语义,非真正的分布式事务 可实现分布式事务

::: info MQ 服务端防丢失

:::

Kafka RocketMQ
持久化 消息写入页缓存(PageCache),即可返回生产 ACK,由 OS 负责刷盘(fsync
可设置自动刷盘的间隔时间或消息数阈值
可以通过AdminClient手动刷盘
默认采用异步刷盘,类似 Kafka,先写入页缓存(PageCache),由 OS 负责刷盘(fsync
也支持同步刷盘,消息必须先成功写入磁盘,发送方才能收到写成功的确认
副本机制 通过副本机制保证冗余,避免单点故障
副本的粒度是针对分区
replication.factor 设置分区副本数
min.insync.replicas 控制写入到多少个副本才算是“已提交”
通过副本机制保证冗余,避免单点故障
副本的粒度是针对节点(Broker)
故障检测 所有Broker会向zk的/borker路径写临时节点,而Controller会监听该目录
一旦有Broker故障或失联,zk会话中断,zk自动删除临时节点,并通知Controller
Controller将Broker视为下线,并将该Broker上的分区Leader置为无效
主节点定期向从节点发送心跳以续活
超时未收到消息,从节点视主节点为下线
故障恢复 Controller 负责选新的分区 Leader,优先从ISR中选
通过 unclean.leader.election.enable 可设置是否允许从非ISR中选Leader,但丢失数据风险增大
采用 Raft 来选主

::: info 消费端防丢失

:::

Kafka RocketMQ
消费 ACK 支持自动、手动提交Offset(enable.auto.commit=false配置)
关闭自动提交,处理完整事务后,再手动提交Offset
支持自动、手动提交Offset(consumer.setAutoCommit配置)
关闭自动提交,处理完整事务后,再手动提交Offset
重置 Offset auto.offset.reset 可以重置消费的 Offset
失败重试 不完善:自动提交不支持重试;手动提交续自行处理重试逻辑 默认重试16次
多次消费失败后会被投递到死信队列,供后续特殊处理

::: info 监控与补偿

:::

  • 消息堆积告警:监控队列长度,及时发现异常。
  • 定期对账:对比生产与消费的记录,修复差异(如定时扫描数据库补发)。

【困难】如何保证 MQ 消息不重复?

::: info MQ 为什么会出现重复消息?

:::

  • 生产端重试(网络抖动时自动重发)
  • 消费端超时后 MQ 重新投递(如 RabbitMQ 未及时 ACK)
  • 消息队列集群脑裂(如 Kafka 副本切换)

以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。

Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。

::: info 重复消息通用解决方案

:::

处理重复消息 = “业务幂等为基础,缓存/DB 去重为辅助,监控兜底保万一”

处理 MQ 重复消息的核心思路是 幂等性设计 + 去重机制,确保即使消息被多次消费,业务结果也不会出错。

(1)业务层幂等设计

  • 唯一标识:每条消息携带唯一业务 ID(如订单号、支付流水号),处理前先查库判断是否已执行。

    1
    2
    SELECT status FROM orders WHERE order_id = '123';
    -- 若已处理则直接跳过
  • 状态机控制:业务状态严格流转(如「已支付」订单不允许重复扣款)。

(2)去重表/缓存

  • 数据库去重表:消费前先 INSERT 唯一键(消息 ID),利用主键冲突避免重复处理。

    1
    INSERT INTO message_processed(id) VALUES ('msg_123') ON DUPLICATE KEY IGNORE;
  • Redis 去重:用 SETNX 设置消息 ID 过期时间(适合高频场景)。

    1
    SETNX msg_123 1 EX 3600  # 1 小时内不重复处理

::: info 主流 MQ 处理

:::

消息队列 重复触发场景 推荐方案
Kafka 消费者重启导致 offset 回滚 业务幂等 + 本地 offset 持久化
RabbitMQ 未 ACK 导致重新入队 手动 ACK + 死信队列监控
RocketMQ 消息重试机制(16 次后进死信) 消费日志 + 人工干预

极端情况兜底:

  • 对账系统:定时扫描业务数据与消息记录,修复不一致(如定时补发短信)。
  • 人工告警:监控重复消息频率(如 1 分钟同消息 ID 出现 3 次以上则报警)。

::: info 方案选型

:::

  • 低频业务:数据库唯一索引(简单可靠)。
  • 高频业务:Redis + 过期时间(高性能)。
  • 金融级场景:幂等 + 对账 + 人工审核(强一致)。

【困难】如何保证 MQ 消息的顺序性?

要保证 MQ 消息的顺序性,需从 生产、存储、消费 三个环节控制。

核心思路是:“同一业务 ID 锁定同一队列 + 单线程消费”,需结合业务需求选择局部顺序或全局顺序方案。

::: info 生产端保序

:::

  • 单生产者+单线程发送:同一业务 ID(如订单 ID)的消息由 同一生产者线程 顺序发送,避免多线程并发乱序。

    1
    2
    // 示例:相同 orderId 的消息由同一线程发送
    mqProducer.send(msg, orderId); // 通过 hash 选择分区
  • 禁用异步发送重试:异步发送失败时可能乱序,需同步发送或关闭重试(如 Kafka 配置 max.in.flight.requests.per.connection=1)。

::: info MQ 服务端保序

:::

  • 单分区/队列有序

    • Kafka/RocketMQ:同一业务 ID 的消息发送到 同一分区(Partition)。

      1
      2
      // 根据 orderId 哈希选择分区
      int partition = orderId.hashCode() % partitionNum;
    • RabbitMQ:使用单队列(或一致性哈希交换器绑定唯一队列)。

  • 关闭分区/队列并行:避免服务端多分区/多副本间的顺序混乱(如 Kafka 的 unclean.leader.election.enable=false)。

::: info 消费端保序

:::

  • 单消费者串行消费
    • 同一队列/分区由 单消费者线程 处理(如 Kafka 单线程消费或 max.poll.records=1)。
    • 多消费者时,相同业务 ID 的消息路由到同一消费者(如 RocketMQ 的 MessageQueueSelector)。
  • 内存队列排序(复杂场景):消费者拉取消息后,按业务 ID 分组存入内存队列,由不同线程分别串行处理。

::: info 特殊场景处理

:::

  • 全局严格顺序:牺牲性能,全链路单线程(生产→MQ→消费),仅适合低吞吐场景(如 Binlog 同步)。
  • 局部顺序:仅保证同一业务 ID 的顺序(如订单的创建→支付→退款),允许不同订单并发。

::: info 主流 MQ 处理

:::

Kafka RocketMQ
单分区/队列有序 单分区追加写入,天然有序 单队列有序
哈希路由 采用哈希路由,相同key固定发往同一分区
如果不指定key,则采用轮询方式选择分区
使用 MessageQueueSelector 指定队列
单线程生产 max.in.flight.requests.per.connection=1 控制
单线程消费 确保一个分区仅由一个消费者线程处理(Kafka 默认规则) 使用 MessageListenerOrderly
业务保证 避免多线程并发消费同一队列
分布式锁(如 Redis)
关键业务操作加锁,防止并发执行乱序
避免多线程并发消费同一队列
分布式锁(如 Redis)
关键业务操作加锁,防止并发执行乱序

::: info 注意事项

:::

  • 性能权衡:顺序性越高,并发性能越低(需根据业务容忍度平衡)。
  • 错误处理:消费失败时需暂停当前分区消费(如 Kafka 的 pause()),避免跳过消息导致乱序。
  • 监控:定期检查消息积压和顺序偏移(如 Kafka 的 consumer.position())。

【困难】如何处理 MQ 消息积压?

处理 MQ 消息积压的核心思路是 “快速消费存量+优化生产速率”,需结合监控、扩容、降级等手段综合治理。

大致可以归纳为:

  • 短期:扩容+降级,优先恢复服务。
  • 长期:优化消费逻辑+自动化运维,预防再次积压。
  • 口诀监控早发现,扩容扛流量,消费改批量,生产限流速

::: info 快速消费积压消息

:::

  • 增加消费者实例:横向扩展消费者服务(如 Kubernetes 动态扩容 Pod),注意分区数限制(Kafka 需提前规划足够分区)。
  • 提升消费并行度
    • 调整消费者并发参数(如 Kafka 的 max.poll.records、RabbitMQ 的 prefetch_count)。
    • 多线程消费(需保证无顺序要求的场景)。
  • 临时降级:非核心业务暂停消费(如日志处理),集中资源处理核心业务消息。

::: info 优化消费能力

:::

  • 批量处理:合并多条消息一次处理(如数据库批量插入)。
  • 异步化+削峰:消费者将消息存入内存队列,后台线程异步处理,避免同步阻塞。
  • 跳过非关键逻辑:临时关闭日志记录、数据校验等非必要操作。

::: info 控制生产端流量

:::

  • 限流:生产端启用速率限制(如 Kafka 的 quota、Redis 令牌桶)。
  • 削峰填谷:消息先写入缓存层(如 Redis List),再匀速写入 MQ。
  • 业务降级:高峰期关闭非核心功能的消息生产(如暂停推荐系统更新)。

::: info 监控与告警

:::

  • 实时监控指标
    • 队列堆积量(如 Kafka 的 lag)、消费速率(TPS)、消费者状态。
    • 设置阈值告警(如积压超过 10W 条触发短信通知)。
  • 根因分析工具
    • 日志分析(消费者卡顿、GC 问题)。
    • 链路追踪(如 SkyWalking 定位慢消费)。

::: info 长期预防措施

:::

  • 容量规划:根据业务峰值预先扩容分区/队列(如 Kafka 分区数 = 消费者数 × 1.5)。
  • 死信队列+重试机制:处理失败的消息转入死信队列,避免阻塞正常消费。
  • 自动化扩缩容:基于积压指标动态调整消费者数量(如 K8s HPA)。

::: info 主流 MQ 处理

:::

消息队列 关键操作
Kafka 增加分区+消费者,调整 fetch.max.bytes
RabbitMQ 镜像队列扩容,提高 prefetch_count
RocketMQ 消费组扩容,启用定时消息延迟消费

MQ 高可用

【困难】如何保证 MQ 的高可用?

不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。

::: info Kafka 的核心概念

:::

了解 Kafka,必须先了解 Kafka 的核心概念:

  • Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。

  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。

  • Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

    • Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。

img

::: info Kafka 的副本机制

:::

Kafka 是如何实现高可用的呢?

Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。

为了实现高可用,Kafka 引入了复制功能,简单来说,就是副本机制( Replicate ):

每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。

  • Leader 处理一切对 Partition (分区)的读写请求
  • 而 Follower 只需被动的同步 Leader 上的数据

同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。

img

FAQ

问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?

答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。

::: info Kafka 选举 Leader

:::

由上文可知,Partition 在多个 Broker 上存在副本。

如果某个 Follower 宕机,啥事儿没有,正常工作。

如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。

MQ 架构

【困难】Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

::: info ActiveMQ

:::

ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持JMS1.1J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

(a) 主要特性

  1. 服从 JMS 规范JMS 规范提供了良好的标准和保证,包括:同步异步 的消息分发,一次和仅一次的消息分发,消息接收订阅 等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
  2. 连接灵活性ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/SIP 多播SSLTCPUDP 等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性;
  3. 支持的协议种类多OpenWireSTOMPRESTXMPPAMQP
  4. 持久化插件和安全插件ActiveMQ 提供了 多种持久化 选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行 自定义鉴权授权
  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++.NETPerlPHPPythonRuby
  6. 代理集群:多个 ActiveMQ 代理 可以组成一个 集群 来提供服务;
  7. 异常简单的管理ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以 监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者在 ActiveMQWeb Console 中使用 JMX。通过处理 JMX 的告警消息,通过使用 命令行脚本,甚至可以通过监控各种类型的 日志

(b) 部署环境

ActiveMQ 可以运行在 Java 语言所支持的平台之上。使用 ActiveMQ 需要:

  • Java JDK
  • ActiveMQ 安装包

(c) 优点

  1. 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
  2. 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
  3. 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
  4. 支持 自动重连错误重试机制
  5. 有安全机制:支持基于 shirojaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权
  6. 监控完善:拥有完善的 监控,包括 Web ConsoleJMXShell 命令行,JolokiaRESTful API
  7. 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio

(d) 缺点

  1. 社区活跃度不及 RabbitMQ 高;
  2. 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息
  3. 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
  4. 不适合用于 上千个队列 的应用场景;

::: info RabbitMQ

:::

RabbitMQ2007 年发布,是一个在 AMQP (高级消息队列协议) 基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

(a) 主要特性

  1. 可靠性:提供了多种技术可以让你在 性能可靠性 之间进行 权衡。这些技术包括 持久性机制投递确认发布者证实高可用性机制
  2. 灵活的路由:消息在到达队列前是通过 交换机 进行 路由 的。RabbitMQ 为典型的路由逻辑提供了 多种内置交换机 类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做 RabbitMQ插件 来使用;
  3. 消息集群:在相同局域网中的多个 RabbitMQ 服务器可以 聚合 在一起,作为一个独立的逻辑代理来使用;
  4. 队列高可用:队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全
  5. 支持多种协议:支持 多种消息队列协议
  6. 支持多种语言:用 Erlang 语言编写,支持只要是你能想到的 所有编程语言
  7. 管理界面RabbitMQ 有一个易用的 用户界面,使得用户可以 监控管理 消息 Broker 的许多方面;
  8. 跟踪机制:如果 消息异常RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
  9. 插件机制:提供了许多 插件,来从多方面进行扩展,也可以编写自己的插件。

(b) 部署环境

RabbitMQ 可以运行在 Erlang 语言所支持的平台之上,包括 SolarisBSDLinuxMacOSXTRU64Windows 等。使用 RabbitMQ 需要:

  • ErLang 语言包
  • RabbitMQ 安装包

(c) 优点

  1. 由于 Erlang 语言的特性,消息队列性能较好,支持 高并发
  2. 健壮、稳定、易用、跨平台、支持 多种语言、文档齐全;
  3. 有消息 确认机制持久化机制,可靠性高;
  4. 高度可定制的 路由
  5. 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

(d) 缺点

  1. 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做 二次开发和维护
  2. 实现了 代理架构,意味着消息在发送到客户端之前可以在 中央节点 上排队。此特性使得 RabbitMQ 易于使用和部署,但是使得其 运行速度较慢,因为中央节点 增加了延迟消息封装后 也比较大;
  3. 需要学习 比较复杂接口和协议,学习和维护成本较高。

::: info RocketMQ

:::

RocketMQ 出自 阿里 的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上Kafka 更好。RocketMQ 在阿里内部  被广泛应用在 订单交易充值流计算消息推送日志流式处理binglog 分发 等场景。

(a) 主要特性

  1. 基于 队列模型:具有 高性能高可靠高实时分布式 等特点;
  2. ProducerConsumer队列 都支持 分布式
  3. Producer 向一些队列轮流发送消息,队列集合 称为 TopicConsumer 如果做 广播消费,则一个 Consumer 实例消费这个 Topic 对应的 所有队列;如果做 集群消费,则 多个 Consumer 实例 平均消费 这个 Topic 对应的队列集合;
  4. 能够保证 严格的消息顺序
  5. 提供丰富的 消息拉取模式
  6. 高效的订阅者 水平扩展能力;
  7. 实时消息订阅机制
  8. 亿级 消息堆积 能力;
  9. 较少的外部依赖。

(b) 部署环境

RocketMQ 可以运行在 Java 语言所支持的平台之上。使用 RocketMQ 需要:

  • Java JDK
  • 安装 gitMaven
  • RocketMQ 安装包

(c) 优点

  1. 单机 支持 1 万以上 持久化队列
  2. RocketMQ 的所有消息都是 持久化的,先写入系统 PAGECACHE,然后 刷盘,可以保证 内存磁盘 都有一份数据,而 访问 时,直接 从内存读取
  3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);
  4. 性能非常好,可以允许 大量堆积消息Broker 中;
  5. 支持 多种消费模式,包括 集群消费广播消费等;
  6. 各个环节 分布式扩展设计,支持 主从高可用
  7. 开发度较活跃,版本更新很快。

(d) 缺点

  1. 支持的 客户端语言 不多,目前是 JavaC++,其中 C++ 还不成熟;
  2. RocketMQ 社区关注度及成熟度也不及前两者;
  3. 没有 Web 管理界面,提供了一个 CLI (命令行界面) 管理工具带来 查询管理诊断各种问题
  4. 没有在 MQ 核心里实现 JMS 等接口;

::: info Kafka

:::

Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效可扩展良好 并且 可持久化。它的 分区特性可复制可容错 都是其不错的特性。

(a) 主要特性

  1. 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化
  2. 高吞吐:在一台普通的服务器上既可以达到 10W/s吞吐速率
  3. 完全的分布式系统BrokerProducerConsumer 都原生自动支持 分布式,自动实现 负载均衡
  4. 支持 同步异步 复制两种 高可用机制
  5. 支持 数据批量发送拉取
  6. **零拷贝技术 (zero-copy)**:减少 IO 操作步骤,提高 系统吞吐量
  7. 数据迁移扩容 对用户透明;
  8. 无需停机 即可扩展机器;
  9. 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;

(b) 部署环境

使用 Kafka 需要:

  • Java JDK
  • Kafka 安装包

(c) 优点

  1. 客户端语言丰富:支持 Java.NetPHPRubyPythonGo 等多种语言;
  2. 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
  3. 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性可靠性,理论上支持 消息无限堆积
  4. 支持批量操作;
  5. 消费者 采用 Pull 方式获取消息。消息有序通过控制 能够保证所有消息被消费且仅被消费 一次
  6. 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager
  7. 日志领域 比较成熟,被多家公司和多个开源项目使用。

(d) 缺点

  1. Kafka 单机超过 64队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长
  2. 使用 短轮询方式实时性 取决于 轮询间隔时间
  3. 消费失败 不支持重试
  4. 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序
  5. 社区更新较慢。

::: info 技术选型

:::

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

【困难】什么是 JMS?

提到 MQ,就顺便提一下 JMS 。

JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在 EJB 架构中,有消息 bean 可以无缝的与 JMS 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

::: info JMS 消息模型

:::

在 JMS 标准中,有两种消息模型:

  • P2P(Point to Point)
  • Pub/Sub(Publish/Subscribe)

P2P 模式

P2P 模式包含三个角色:MQ(Queue),发送者 (Sender),接收者 (Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P 的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。

Pub/sub 模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。

Pub/Sub 的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

::: info JMS 消息消费

:::

在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。

  • 同步 - 订阅者或接收者通过 receive 方法来接收消息,receive 方法在接收到消息之前(或超时之前)将一直阻塞;
  • 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。

JNDI - Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。

【中等】什么是 AMQP?

AMQP(Advanced Message Queuing Protocol)是一种应用层协议,用于在消息队列系统中定义消息的格式、传输方式和处理机制

AMQP 是一个面向消息的异步传输的协议,具有高可靠性、可拓展性、跨平台的特性,适合在分布式系统中传输重要数据。它是 RabbitMQ、ActiveMQ 等消息中间件的底层协议。

核心组件

组件 作用
Connection 客户端与消息代理(如 RabbitMQ)的物理连接
Channel 逻辑通信链路(多路复用连接,减少开销)
Exchange 消息路由枢纽(支持多种路由策略)
Queue 存储消息的容器,消费者从中拉取数据
Binding 定义 Exchange 与 Queue 的映射关系(含路由规则)

路由模型

  • Direct:精确匹配路由键(如 order.create
  • Fanout:广播到所有绑定队列(无视路由键)
  • Topic:通配符匹配(如 order.*
  • Headers:基于消息头键值匹配(非路由键)

可靠性保障

  • 消息确认:消费者手动确认,失败则重入队列
  • 持久化:队列/消息持久化防丢失
  • 事务:支持原子性提交/回滚(批量操作)

协议对比

协议 优势场景 局限性
AMQP 企业级应用(强事务、高可靠) 略重(不适合 IoT 轻量场景)
MQTT 物联网(低功耗、低带宽) 功能简单(无复杂路由)
JMS Java 生态集成 仅限 Java,跨平台性弱

Kafka

【困难】Kafka 为什么性能高?

Kafka 的数据存储在磁盘上,为什么还能这么快?

说 Kafka 很快时,他们通常指的是 Kafka 高效移动大量数据的能力。Kafka 为了提高传输效率,做了很多精妙的设计。

::: info 顺序 I/O(追加写入)

:::

磁盘读写有两种方式:顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存接近。因为磁盘是机械结构,每次读写都会寻址写入,其中寻址是一个“机械动作”。Kafka 利用了一种分段式的、只追加 (Append-Only) 的日志,基本上把自身的读写操作限制为顺序 I/O,也就使得它在各种存储介质上能有很快的速度。

::: info 零拷贝

:::

Kafka 数据传输是一个从网络到磁盘,再由磁盘到网络的过程。在网络和磁盘之间传输数据时,消除多余的复制是提高效率的关键。Kafka 利用零拷贝技术来消除传输过程中的多余复制

如果不采用零拷贝,Kafka 将数据同步给消费者的大致流程是:

  1. 从磁盘加载数据到 os buffer
  2. 拷贝数据到 app buffer
  3. 再拷贝数据到 socket buffer
  4. 接下来,将数据拷贝到网卡 buffer
  5. 最后,通过网络传输,将数据发送到消费者

采用零拷贝技术,Kafka 使用 sendfile() 系统方法,将数据从 os buffer 直接复制到网卡 buffer。这个过程中,唯一一次复制数据是从 os buffer 到网卡 buffer。这个复制过程是通过 DMA(Direct Memory Access,直接内存访问) 完成的。使用 DMA 时,CPU 不参与,这使得它非常高效。

::: info 其他性能设计

:::

  • 页缓存 - Kafka 的数据并不是实时的写入磁盘,它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Kafka 接收来自 socket buffer 的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用 mmap 内存文件映射。
  • 压缩 - Kafka 内置了几种压缩算法,并允许定制化压缩算法。通过压缩算法,可以有效减少传输数据的大小,从而提升传输效率。
  • 批处理 - Kafka 的 Clients 和 Brokers 会把多条读写的日志记录合并成一个批次,然后才通过网络发送出去。日志记录的批处理通过使用更大的包以及提高带宽效率来摊薄网络往返的开销。
  • 分区 - Kafka 将 Topic 分区,每个分区对应一个名为的 Log 的磁盘目录,而 Log 又根据大小,可以分为多个 Log Segment 文件。这种分而治之的策略,使得 Kafka 可以并发读,以支撑非常高的吞吐量。此外,Kafka 支持负载均衡机制,将数据分区近似均匀地分配给消费者群组的各个消费者。

【困难】ZooKeeper 在 Kafka 中的作用是什么?

ZooKeeper 在 Kafka 中扮演着核心的协调者角色,主要负责集群的元数据管理、Broker 协调和状态维护。这些管理机制可以确保集群一致性、高可用性和协调能力。

Broker 管理

  • 节点注册与存活监控:每个 Kafka Broker 启动时会在 ZooKeeper 中注册临时节点(/brokers/ids)。ZooKeeper 通过心跳机制监控 Broker 存活状态,若 Broker 宕机,临时节点消失,集群会触发重新选举或分区重新分配。
  • Leader 选举:Kafka 分区的 Leader 副本选举由 ZooKeeper 协调完成(旧版本依赖 ZooKeeper,新版本已逐步迁移至 Kafka 自身协议)。

Topic 与分区元数据

  • 存储拓扑信息:Topic 的分区数量、副本分布(/brokers/topics/[topic])等元数据由 ZooKeeper 维护,供所有 Broker 和客户端查询。
  • 分区状态同步:分区 Leader 变更、ISR(In-Sync Replicas)列表更新等操作通过 ZooKeeper 通知其他 Broker。

控制器(Controller)选举

Kafka 集群中的某个 Broker 会被选举为 Controller(通过 ZooKeeper 的临时节点竞争),负责分区 Leader 选举、副本分配等关键任务。Controller 故障时,ZooKeeper 会触发重新选举。

客户端服务发现

生产者和消费者通过 ZooKeeper(旧版本)或直接通过 Broker(新版本)获取集群的 Broker 列表和 Topic 元数据,以确定数据的读写位置。

ACL 与配额管理(可选)

ZooKeeper 存储访问控制列表(ACL)和客户端配额配置,用于权限控制和流量限制。

KRaft 将替代 ZooKeeper

Kafka 2.8+ 版本开始支持** KRaft 模式**(基于 Raft 协议),逐步弃用 ZooKeeper,将元数据管理和选举逻辑内置于 Kafka 自身,以简化架构并提升性能。但在大多数生产环境中,ZooKeeper 仍广泛使用。

【中等】Kafka 为什么要弃用 Zookeeper?

Kafka 弃用 ZooKeeper 主要是为了简化架构、提升性能、降低运维复杂度

减少外部依赖

  • 架构简化:ZooKeeper 是独立的外部系统,需额外部署和维护。移除后,Kafka 成为完全自包含的系统,降低部署和运维成本。
  • 避免单点风险:ZooKeeper 本身需要集群化,若出现故障会影响 Kafka 的元数据管理,内嵌治理逻辑可减少此类风险。

提升扩展性与性能

  • 元数据效率:ZooKeeper 的写操作(如 Leader 选举)是串行的,可能成为瓶颈。Kafka 内置的 KRaft 协议(基于 Raft)支持并行日志写入,显著提升元数据处理速度(如分区扩容、Leader 切换)。
  • 降低延迟:省去与 ZooKeeper 的网络通信,元数据操作(如 Broker 注册、Topic 变更)延迟更低。

统一元数据管理

  • 一致性模型统一:ZooKeeper 使用 ZAB 协议,而 Kafka 使用自身的日志复制机制,两者不一致可能导致协调问题。KRaft 模式通过单一协议(Raft)管理所有元数据,逻辑更清晰。
  • 简化客户端访问:旧版客户端需同时连接 Kafka 和 ZooKeeper,新版只需直连 Kafka Broker。

支持更大规模集群

ZooKeeper 的局限性:ZooKeeper 对节点数量(通常≤7)和 Watcher 数量有限制,影响 Kafka 集群的扩展性。KRaft 模式通过分片和流式元数据传递,支持超大规模集群(如数十万分区)。

补充说明

  • Kafka 2.8+ 开始实验性支持 KRaft 模式,3.0+ 逐步稳定,但仍兼容 ZooKeeper 模式。
  • 完全移除 ZooKeeper 需确保 KRaft 在生产环境中的成熟度(如故障恢复、监控工具链完善)。

【中等】Kafka 中如何实现时间轮?

时间轮是用于高效管理和调度大量定时任务的环形数据结构,通过时间分片(槽)管理任务,优化调度效率。

数据结构

  • 环形数组:每个槽代表一个时间片(如 1 秒),存储双向链表管理的任务。
  • 指针移动:以固定时间步长推进,触发当前槽的任务执行。

工作原理

  • 任务插入:根据延迟时间计算槽位(如 延迟 % 槽数),插入链表尾部(O(1) 复杂度)。
  • 任务触发:指针每移动一格,执行对应槽中所有任务。

处理长延迟任务

  • 方案 1:轮次(Netty):计算轮数(如 (延迟-1)/槽数),轮数归零时触发。
  • 方案 2:多层次时间轮(Kafka)
    • 层级递进:高层槽覆盖更大时间范围(如秒→分→时)。
    • 降级机制:任务随时间推移从高层移至底层,保证精度。

优点

  • 高效性:插入/删除任务 O(1) 复杂度。
  • 低内存开销:固定槽数,内存占用稳定。

应用场景

  • 高并发定时任务(如 Netty 的超时检测)。
  • 网络服务器(连接/请求超时管理)。
  • 分布式系统(节点间任务协调)。

实际应用

  • NettyHashedWheelTimer(单层+轮次)。
  • Kafka:多层次时间轮+降级。
  • Caffeine Cache:本地缓存的任务调度。

【中等】Kafka 的索引设计有什么亮点?

Kafka 的索引设计通过稀疏索引 + 二分查找 + 分段存储,在查询效率、存储成本、扩展性之间取得平衡,适合高吞吐、低延迟的消息系统需求。

Kafka 的索引设计(主要涉及偏移量索引(.index)和时间戳索引(.timeindex))具有以下核心优势:

高效查询(O(1) ~ O(logN) 复杂度)

  • 稀疏索引:不存储每条消息的索引,而是按一定间隔(如每 4KB 数据)建立索引项,大幅减少索引文件大小。
  • 二分查找:通过索引快速定位消息所在的物理位置(磁盘文件+偏移量),减少全量扫描。

低存储开销

  • 紧凑结构:索引文件仅存储偏移量 + 物理位置(固定 8 字节/项),占用空间极小。
  • 分段存储:每个日志段(Segment)独立维护索引,避免单一大文件索引的性能瓶颈。

快速故障恢复

  • 内存映射(MMAP):索引文件通过内存映射加速读取,重启时无需全量加载。
  • 懒加载:仅加载活跃分片的索引,减少启动时间。

支持时间范围查询时间戳索引(.timeindex)允许按时间戳快速定位消息,适用于日志回溯、监控等场景。

索引自动更新:日志压缩(Compaction)或删除(Retention)时,索引同步清理,避免无效查询。

【困难】Kafka 处理请求的全流程?

Kafka 采用 多线程池 + 事件驱动 模型,核心线程组分工如下:

网络通信层(1 个 Acceptor + N 个 Processor)

  • Acceptor 线程(1 个):监听 ServerSocket,接收客户端连接,轮询分发给 Processor 线程。
  • Processor 线程(默认 3 个,可配置):每个 Processor 维护一个 Selector(NIO),负责:
    • 读请求:解析请求数据,放入共享请求队列RequestChannel)。
    • 写响应:从 ResponseQueue 获取结果,通过 Socket 返回客户端。

请求处理层(KafkaRequestHandlerPool)

IO 线程池(默认 8 个,可配置)从 RequestChannel 拉取请求,根据类型调用对应 API 层 处理(如 handleProduceRequest)。

关键操作:

  • 生产请求:写入 Leader 副本的 LogSegment(内存→PageCache→磁盘)。
  • 消费请求:从 PageCache 或磁盘读取数据(零拷贝优化)。

后台线程

  • Log Cleaner:日志压缩(Compaction)和删除(Retention)。
  • Replica Manager:副本同步(ISR)、Leader 选举。
  • Delayed Operation:处理延迟操作(如 Produce 的 ACK 等待)。

kafka broker internals

核心设计优势

  • 解耦网络 I/O 与业务处理Processor 仅负责通信,Handler 专注逻辑。
  • 无锁队列RequestChannel 使用 ConcurrentLinkedQueue,减少竞争。
  • 动态扩展:可调整 ProcessorHandler 线程数适配负载。

【困难】Kafka 中如何实现事务机制?

::: info Kafka 事务实现

:::

Kafka 事务是为了实现消息的 Exactly-Once 语义,确保消息在生产、传输、消费过程中仅处理一次

关键组件

  • 事务协调器:管理事务生命周期,状态持久化到__transaction_state主题。
  • 幂等生产者:通过Producer ID + Sequence Number避免重复消息。
  • 事务性消费:支持read_committed隔离级别,过滤未提交消息。

工作流程

  • 启事务:生产者向协调器注册事务
  • 写消息:带事务 ID 的消息写入分区(标记为未提交)
  • 定结局:协调器最终提交(写 Commit 标记)或回滚
  • 控消费:消费者仅读取已提交消息

设计特点

  • 轻量级事务(非完整 2PC)
  • 与高吞吐架构兼容
  • 故障自动恢复(通过事务日志)

::: info Kafka 事务 和 RocketMQ 事务的区别

:::

  • Kafka 事务主要解决 Exactly-Once 语义(生产、消费的精确一次处理),适用于 流处理场景(如 Flink、Kafka Streams 的状态一致性)。
  • RocketMQ 事务主要解决 分布式事务(如订单+库存的跨系统事务),适用于 业务事务场景(如电商、金融的最终一致性)。

实现机制差异

特性 Kafka RocketMQ
事务协调者 内置 TransactionCoordinator 依赖外部 事务消息存储(如 MySQL)
事务状态存储 内部 Topic __transaction_state 独立存储(如数据库、ZK)
事务消息可见性 支持 read_committed 隔离 需消费者主动回查(半消息机制)
事务提交方式 两阶段提交(2PC) 本地事务+定时任务(事务回查)

应用场景

  • Kafka
    • 流处理任务的 端到端精确一次(如 Flink+Kafka)。
    • 跨分区的原子写入(如多个 Topic 的关联操作)。
  • RocketMQ
    • 跨系统事务(如 订单创建+扣库存)。
    • 需要 业务回查 的最终一致性场景。

【困难】Kafka 控制器如何处理事件?

控制器是 Kafka 集群的管理中枢,基于 ZooKeeper/KRaft 选举,其本质是一个特殊的 Broker,额外承担集群管理职责。

控制器选举

  • 每个 Broker 启动时尝试在 ZooKeeper 创建/controller临时节点
  • 成功创建的 Broker 成为控制器(Leader)
  • 其他 Broker 监听该节点,实现故障转移

事件处理流程

  • 事件捕获:通过 ZooKeeper Watch 机制监听关键路径:
    • /brokers/ids(Broker 上下线)
    • /admin/delete_topics(主题删除)
    • /isr_change_notification(ISR 变更)
  • 事件入队
    • 控制器将事件放入异步事件队列(ControllerEventManager)
    • 保证事件顺序处理(单线程模型)
  • 事件处理
    • 分区再均衡:触发 Leader 选举,更新 ISR 列表
    • Broker 故障
      • 将该 Broker 上的 Leader 分区迁移到其他 Broker
      • 更新元数据并同步到所有 Broker
    • 主题变更:更新分区分配方案并持久化到 ZooKeeper
  • 元数据同步
    • 通过 UpdateMetadataRequest 将最新元数据广播给所有 Broker
    • Broker 更新本地缓存(MetadataCache)

关键设计

  • 单线程模型:避免并发问题(所有事件串行处理)
  • 批处理优化:合并相似事件(如多个分区 Leader 选举)
  • ZooKeeper 解耦:新版逐渐用 KRaft 替代 ZK 依赖

容错机制

  • 控制器故障时通过 ZK 重新选举
  • 事件处理失败会重试或触发新一轮选举
  • 通过 epoch 机制防止脑裂(控制器切换时递增 epoch)

流程图解

1
2
3
4
5
6
7
8
9
10
11
12
ZooKeeper 事件触发

控制器事件队列(ControllerEventManager)

单线程事件处理器
├─ 分区 Leader 选举
├─ ISR 列表更新
├─ 副本重分配

UpdateMetadataRequest 广播

集群元数据最终一致

特点:高可靠性但存在单点瓶颈(新版 KRaft 改进为分布式控制器)

RocketMQ

【中等】RocketMQ 的事务消息有什么缺点?你还了解过别的事务消息实现吗?

【中等】为什么 RocketMQ 不使用 Zookeeper 作为注册中心?

RocketMQ 自研 NameServer 是为了:

  • 规避 ZooKeeper 的 性能瓶颈
  • 适应 大规模分布式队列场景
  • 实现 简单高效的服务发现

注:阿里内部早期版本曾用 ZK,后因性能问题重构

设计目标差异

  • ZooKeeper:强一致性(CP),适合小规模元数据管理(如 Kafka 的 Controller 选举)
  • RocketMQ:高可用性(AP),需要支持 高频读写(如 Broker 注册、心跳检测)

性能瓶颈

场景 ZooKeeper RocketMQ 选择
注册中心读写频率 低频(秒级) 高频(毫秒级 Broker 心跳)
节点规模 适合中小集群(<100 节点) 支持大规模集群(数千节点)
吞吐量 写性能受限(需全局有序) 自研 NameServer(无强一致性要求)

架构简化

  • NameServer 轻量化
    • 无选举机制(所有节点平等)
    • 最终一致性(通过定时心跳维护状态)
    • 无持久化存储(重启后依赖 Broker 重新注册)
  • 避免 ZooKeeper 的依赖
    • 减少运维复杂度(无需单独维护 ZK 集群)
    • 降低网络开销(ZK 的 Watcher 机制在大规模下压力大)

容灾能力

  • ZooKeeper
    • 集群半数以上节点存活才可用
    • 脑裂问题需人工干预
  • NameServer
    • 单节点故障不影响整体服务(无状态)
    • 任意存活节点均可提供服务

业务场景适配

RocketMQ 的核心需求

  • 快速发现 Broker(路由信息)
  • 容忍短暂数据不一致(消息队列场景可接受)
  • 低延迟响应(Producer/Consumer 频繁拉取路由表)

【中等】RocketMQ 的延迟消息是怎么实现的?

RocketMQ 的延迟消息采用 多级时间轮 + 定时任务扫描 实现,非实时投递而是延迟触发。

实现步骤

  1. 消息标记

    • Producer 发送消息时指定 delayTimeLevel(如 3 表示延迟 10 秒)
    • 支持 18 个固定延迟级别(1s/5s/10s/30s/1m…2h)
  2. 延迟存储

    • Broker 将延迟消息存入 专用 TopicSCHEDULE_TOPIC_XXXX
    • 按延迟级别分队列(如 delayLevel=3 的消息存入 SCHEDULE_TOPIC_XXXX 的 Queue3)
  3. 定时扫描

    • 时间轮算法 管理延迟队列
    • 每秒扫描对应队列,将到期消息 重新投递 到目标 Topic
  4. 消息投递

    • 到期后,Broker 将消息从延迟 Topic 转移到原始 Topic
    • Consumer 正常消费

关键设计

组件 作用
ScheduleTopic 存储所有延迟消息(内部 Topic,对用户透明)
TimerWheel 高效触发延迟任务(O(1) 时间复杂度)
定时线程 每秒扫描时间轮,将到期消息移出延迟队列

特点

  • 固定延迟级别:不支持任意时间精度(如 23 秒)
  • 投递误差:±1 秒(依赖扫描间隔)
  • 高吞吐:时间轮算法避免遍历所有消息

示例代码

1
2
3
4
Message msg = new Message("TestTopic", "Hello Delay".getBytes());
// 设置延迟级别 3(对应 10 秒)
msg.setDelayTimeLevel(3);
producer.send(msg);

对比 Kafka

特性 RocketMQ Kafka
延迟精度 固定级别(18 种) 需外部实现(如 Streams 延迟处理)
实现复杂度 内置支持 需自定义逻辑
适用场景 订单超时、定时通知 流处理中的窗口操作

:RocketMQ 5.0+ 支持 定时消息(精确到毫秒),底层改用时间戳+哈希分片。

【中等】RocketMQ 和 Kafka 在架构和功能上有什么区别?

::: info RocketMQ 和 Kafka 架构对比

:::

维度 RocketMQ Kafka
注册中心 自研轻量级 NameServer(AP 模型) 依赖 ZooKeeper(CP 模型),Kafka 新版本计划用 KRaft 取代 ZooKeeper
Broker 角色 主从架构(Master-Slave) 无主从区分(所有节点平等)
存储模型 单个 CommitLog 文件(含所有 Topic 数据) + 消费队列索引 主题-分区-段的层级结构
消费模式 支持 Push/Pull 两种模式 仅 Pull 模式

::: info RocketMQ 和 Kafka 功能对比

:::

设计差异:

  • RocketMQ
    • 业务友好:强事务、延迟消息、过滤等开箱即用
    • 轻量可控:NameServer 简化集群管理
  • Kafka
    • 流式优先:高吞吐、分区弹性伸缩
    • 生态整合:与 Flink/Spark 等深度兼容

主要功能差异如下:

功能 RocketMQ Kafka
消息类型 普通消息、顺序消息、事务消息、延迟消息 主要支持普通消息(需扩展实现其他特性)
事务支持 二阶段提交+事务回查(业务级事务) 精确一次语义(内部消息事务)
延迟消息 内置 18 个固定延迟级别 需外部实现(如流处理或自定义调度)
消息回溯 支持按时间/偏移量回溯 仅支持按偏移量回溯
消息过滤 支持给 Topic 打标签,进一步分类 仅支持通过 Topic 给消息分类
死信队列 消息消费重试失败多次后,进入死信队列 Kafka 原生不支持

::: info RocketMQ 和 Kafka 性能与扩展对比

:::

维度 RocketMQ Kafka
吞吐量 单机约 10 万 TPS 单机可达百万 TPS(更高吞吐)
延迟 毫秒级(优化后更低) 毫秒级(依赖 PageCache)
水平扩展 需手动调整队列数 分区自动均衡(更易扩展)
数据保留 基于时间/大小策略 基于时间/大小策略(支持日志压缩)

::: info RocketMQ 和 Kafka 应用场景与选型

:::

场景 RocketMQ Kafka
电商/金融业务 订单、支付等业务级事务 日志收集、流处理(如 Flink)
延迟/定时任务 内置支持(如订单超时) 需外部系统配合
大规模日志流 适合中小规模 超大规模日志场景(如大数据管道)
多协议支持 兼容 JMS、MQTT 等 仅原生协议

总结选择建议

  • RocketMQ:需要 业务级事务、延迟消息、多协议支持 的场景
  • Kafka:需要 超高吞吐、流处理集成、大规模日志 的场景

【困难】RocketMQ 如何实现事务消息?

RocketMQ 事务消息采用 “半消息(Half Message) + 事务状态回查” 的二阶段方案,确保分布式事务的最终一致性。

关键流程

  1. 第一阶段:发送半消息

    • Producer 发送半消息(对 Consumer 不可见)
    • Broker 持久化消息并返回确认响应
    • Producer 执行本地事务(如数据库操作)
  2. 第二阶段:事务状态确认

    • 成功:Producer 提交事务确认 → Broker 将消息改为可消费状态
    • 失败:Producer 回滚 → Broker 丢弃消息
    • 超时未决:Broker 主动回查 Producer 的事务状态(默认每分钟 1 次)
  3. 消息投递

    • 仅提交后的消息会被 Consumer 消费
    • 支持消息重试和死信队列机制

核心组件

  • 事务监听器(TransactionListener):实现本地事务执行和回查逻辑
  • 事务日志存储:Broker 存储半消息和事务状态(基于文件或 DB)
  • 定时回查线程:处理超时未确认的事务

特点

  • 最终一致性:依赖定期回查机制
  • 业务耦合:需实现本地事务和回查接口
  • 高可用:Broker 集群保证消息不丢失

应用场景

  • 订单支付(扣库存+生成订单)
  • 跨系统数据同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 示例:事务生产者
TransactionMQProducer producer = new TransactionMQProducer();
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务状态回查
return LocalTransactionState.UNKNOW;
}
});

RabbitMQ

【简单】RabbitMQ 的 routing key 和 binding key 的最大长度是多少字节?

长度限制

  • 最大 255 字节(超限会抛出异常)。
  • 适用于 Routing Key(生产者指定)和 Binding Key(队列绑定交换机时指定)。

匹配规则(不同交换机类型)

交换机类型 匹配方式 示例
Direct 完全匹配 routing_key == binding_key
Topic 通配符匹配(* 匹配一个词,# 匹配多个词) *.order.# 匹配 user.order.create
Headers 不依赖 Routing Key,基于消息头键值对匹配 x-match: all/any

最佳实践

  • 保持简短:避免接近 255 字节,提升性能。
  • 命名规范:如 {服务}.{模块}.{事件}(例:user.order.paid)。
  • Topic 通配符:合理使用 *#,避免过度复杂。

⚠️ 注意:Headers 交换机忽略 Routing Key,仅依赖消息头(Headers)匹配。

【中等】RabbitMQ 中无法路由的消息会去到哪里?

在 RabbitMQ 中,无法路由的消息(即无法被投递到任何队列的消息)的处理方式取决于消息的 mandatoryimmediate 属性(RabbitMQ 3.0+ 已弃用 immediate),具体规则如下:

默认情况(未设置 mandatory

  • 消息被直接丢弃(即 “静默丢失”)。
  • 生产者无感知:Broker 不会返回任何通知。

设置了 mandatory=true

  • 若消息无法路由到任何队列,Broker 会通过 basic.return 方法将消息返回给生产者。

  • 生产者需监听返回消息

    1
    2
    3
    4
    5
    6
    7
    8
    channel.basicPublish("exchange", "routingKey",
    new AMQP.BasicProperties.Builder().mandatory(true).build(),
    message.getBytes());

    // 添加 ReturnListener 监听返回消息
    channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
    System.out.println("消息未被路由:" + new String(body));
    });
  • 适用场景:需严格确保消息路由成功的业务(如关键订单通知)。

备用交换机(Alternate Exchange)

  • 预先声明一个备用交换机,绑定一个队列(如 unrouted_queue)接收无法路由的消息。

  • 配置方式

    1
    2
    3
    4
    5
    6
    7
    8
    Map<String, Object> args = new HashMap<>();
    args.put("alternate-exchange", "my_ae"); // 指定备用交换机
    channel.exchangeDeclare("main_exchange", "direct", false, false, args);

    // 声明备用交换机和队列
    channel.exchangeDeclare("my_ae", "fanout");
    channel.queueDeclare("unrouted_queue", false, false, false, null);
    channel.queueBind("unrouted_queue", "my_ae", "");
  • 逻辑:若消息无法通过 main_exchange 路由,则自动转发到 my_ae,最终进入 unrouted_queue

关键区别

处理方式 条件 结果 适用场景
直接丢弃 默认情况 消息丢失,无通知 允许消息丢失的非关键业务
返回生产者 mandatory=true 通过 basic.return 回退消息 需严格监控路由失败的场景
转发到备用交换机 配置了 Alternate Exchange 消息存入备用队列 需审计或补偿无法路由的消息

最佳实践

  • 关键消息:始终设置 mandatory=true 并监听 basic.return
  • 日志与监控:使用备用交换机收集无法路由的消息,便于排查问题。
  • 避免消息丢失:确保交换机和队列的绑定关系正确,或使用 死信队列(DLX) 处理异常消息。

📌 注意:RabbitMQ 3.0+ 已移除 immediate 参数,旧版本中设置 immediate=true 会导致无法路由的消息被丢弃(除非同时设置 mandatory)。

【中等】RabbitMQ 中消息什么时候会进入死信交换机?

通过合理配置 DLX,可以实现消息的优雅降级和故障隔离。

::: info RabbitMQ 中消息进入死信交换机的触发情况

:::

在 RabbitMQ 中,消息进入 死信交换机(Dead Letter Exchange, DLX) 通常由以下 5 种情况触发:

(1)消息被消费者拒绝:消费者显式拒绝消息且不重新入队。

1
channel.basicReject(deliveryTag, false); // 或 basicNack 且 requeue=false
  • 典型场景:消息处理失败且无需重试(如业务校验不通过)。

(2)消息过期(TTL 超时)

  • 消息设置了 TTL(Time-To-Live),且未在过期前被消费。
  • 队列设置了 x-message-ttl,消息在队列中停留超时。
1
2
3
4
5
// 设置消息 TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("60000") // 60 秒过期
.build();
channel.basicPublish("", "normal_queue", props, message.getBytes());

(3)队列达到最大长度:队列设置了 x-max-lengthx-max-length-bytes,且新消息到达时队列已满。

1
2
3
4
// 声明队列时设置最大长度
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 1000); // 最多 1000 条消息
channel.queueDeclare("normal_queue", false, false, false, args);

(4)队列被删除:消息所在的队列被删除(queueDelete),且消息未被消费。

(5)主节点崩溃镜像队列(Mirrored Queue) 中主节点崩溃,且消息未同步到从节点。

::: info 关键配置步骤

:::

  1. 声明死信交换机(DLX)和死信队列

    1
    2
    3
    4
    5
    // 声明死信交换机(类型通常为 direct/fanout)
    channel.exchangeDeclare("dlx_exchange", "direct");
    // 声明死信队列
    channel.queueDeclare("dlx_queue", false, false, false, null);
    channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");
1
2
3
4
5
6
7
8

2. **为普通队列绑定死信交换机**:

```java
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 指定 DLX
args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 可选
channel.queueDeclare("normal_queue", false, false, false, args);

注意事项

  • 死信消息的 原始属性(如 headers)会被保留,但 exchangeroutingKey 会被替换为 DLX 的配置。
  • 若未指定 x-dead-letter-routing-key,则使用消息原来的 routing key。

典型应用场景

  • 延迟队列:通过 TTL+DLX 实现消息延迟投递。
  • 失败处理:将处理失败的消息自动路由到死信队列,供人工或异步处理。
  • 流量控制:队列满时转移旧消息,避免阻塞新消息。

【中等】RabbitMQ 如何实现事务机制?

RabbitMQ 的事务机制(Transaction)通过 信道(Channel) 提供了一种保证消息可靠投递的机制,但其设计简单且对性能影响较大。

RabbitMQ 的事务通过同步机制确保消息投递的原子性,但性能代价高。在绝大多数生产环境中,推荐使用 Publisher Confirms 替代事务,以兼顾可靠性和吞吐量。

事务的核心操作

  • 开启事务

    1
    channel.txSelect(); // 开启事务模式
  • 提交事务

    1
    channel.txCommit(); // 提交事务,消息真正投递到队列
  • 回滚事务

    1
    channel.txRollback(); // 回滚事务,丢弃未提交的消息

事务的工作流程

  1. 生产者发送消息到 RabbitMQ(消息暂存于信道缓冲区,未写入队列)。
  2. 执行 txCommit():消息持久化到队列;若失败或调用 txRollback(),消息丢弃。
  3. 同步阻塞:事务提交/回滚需等待 Broker 确认,性能较低。

事务的局限性

  • 性能差:每次提交需等待 Broker 确认,吞吐量显著下降(通常降低 100~200 倍)。
  • 无分布式事务:仅保证生产者到 Broker 的可靠性,不涉及消费者或下游系统。
  • 不推荐高频使用:适合低频关键业务,高并发场景建议用 确认机制(Publisher Confirms)

事务 vs. 确认机制(Publisher Confirms)

特性 事务(Transaction) 确认机制(Publisher Confirms)
可靠性 强一致(同步阻塞) 最终一致(异步)
性能 极低(同步等待) 高(异步回调)
适用场景 低频关键消息(如支付) 高频业务(如日志、订单)
复杂度 简单 需处理确认/未确认逻辑

代码示例

1
2
3
4
5
6
7
8
9
try {
channel.txSelect(); // 开启事务
channel.basicPublish("", "queue1", null, msg1.getBytes());
channel.basicPublish("", "queue2", null, msg2.getBytes());
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback(); // 回滚事务
// 处理异常
}

使用建议

  • 优先选择 Confirm 模式

    1
    2
    channel.confirmSelect(); // 开启确认模式
    channel.addConfirmListener(...); // 异步回调
  • 事务适用场景

    • 严格保证单批次消息的原子性(如同时投递订单和库存消息)。
    • 兼容旧版 RabbitMQ(Confirm 模式需 v3.3+)。

【中等】RabbitMQ 中有哪些核心概念?

RabbitMQ 通过 生产者→交换机→队列→消费者 的链路传递消息,核心设计围绕 解耦灵活路由。理解这些角色和概念是掌握其工作原理的基础。

核心角色

角色 作用
生产者(Producer) 发送消息到交换机的客户端
消费者(Consumer) 从队列订阅并处理消息的客户端
消息代理(Broker) RabbitMQ 服务实例,负责接收、路由和存储消息(即 RabbitMQ 服务器本身)

核心组件

组件 功能
连接(Connection) 生产者/消费者与 Broker 之间的 TCP 长连接(复用减少开销)
信道(Channel) 连接中的虚拟通道(轻量级,避免频繁创建 TCP 连接)
交换机(Exchange) 接收生产者消息,按规则路由到队列(类型决定路由逻辑)
队列(Queue) 存储消息的容器,消费者从中拉取数据
绑定(Binding) 定义交换机与队列的映射关系(含路由键规则)

辅助概念

概念 说明
路由键(Routing Key) 生产者发送时指定的关键字,用于交换机匹配队列(类似“邮件地址”)
虚拟主机(VHost) 逻辑隔离单元(类似命名空间),不同 VHost 的队列/交换机互不可见
死信队列(DLX) 处理过期/被拒绝消息的特殊队列(用于延迟队列或异常处理)
消息确认(ACK) 消费者手动确认消息处理完成,确保可靠性

关键协议与机制

  • AMQP 协议:RabbitMQ 的核心通信协议,定义消息格式与交互规则
  • 持久化(Durable):队列/消息可持久化到磁盘,防止服务重启丢失
  • 事务(Transaction):确保批量操作的原子性(但性能较差,通常用 ACK 替代)

【中等】RabbitMQ 有哪些工作模式?

RabbitMQ 有以下几种主要的工作模式:

  • 简单模式(Simple)
  • 工作队列模式(Work Queue)
  • 发布/订阅模式(Publish/Subscribe)
  • 路由模式(Routing)
  • 主题模式(Topic)
  • RPC 模式(远程调用)

以下,对几种工作模式逐一进行说明:

简单模式(Simple)

  • 角色:1 生产者 → 1 队列 → 1 消费者
  • 特点:单向通信,无路由逻辑,即点对点模式
  • 场景:单任务处理(如日志记录)

工作队列模式(Work Queue)

  • 角色:1 生产者 → 1 队列 → 多个消费者竞争消费
  • 特点
    • 消息轮询分发(默认)或公平分发(需设置prefetch=1
    • 消费者并行处理
  • 场景:任务分发(如订单处理)

发布/订阅模式(Publish/Subscribe)

  • 角色:1 生产者 → Fanout 交换机 → 绑定多个队列 → 多个消费者
  • 特点
    • 消息广播到所有队列
    • 消费者各自独立接收全量消息
  • 场景:事件通知(如系统公告)

路由模式(Routing)

  • 角色:1 生产者 → Direct 交换机 → 根据routing_key路由到特定队列
  • 特点
    • 精确匹配路由键
    • 支持多队列绑定相同路由键
  • 场景:条件过滤(如错误日志分级处理)

主题模式(Topic)

  • 角色:1 生产者 → Topic 交换机 → 基于通配符(*/#)匹配路由键
  • 特点
    • 模糊匹配(如order.*匹配order.create
    • 灵活性高
  • 场景:复杂路由(如多维度消息分类)

RPC 模式(远程调用)

  • 角色:客户端 → 请求队列 → 服务端 → 响应队列 → 客户端
  • 特点
    • 通过reply_tocorrelation_id关联请求/响应
    • 同步阻塞式通信
  • 场景:服务间调用(需即时响应)

模式对比

模式 交换机类型 路由规则 典型应用
简单模式 单任务处理
工作队列 轮询/公平分发 并行任务
发布/订阅 Fanout 广播 多系统通知
路由模式 Direct 精确匹配routing_key 条件过滤
主题模式 Topic 通配符匹配 复杂路由
RPC 模式 请求-响应关联 同步服务调用

选择建议

  • 广播需求 → Fanout
  • 条件过滤 → Direct/Topic
  • 任务并行 → Work Queue
  • 服务调用 → RPC

【中等】RabbitMQ 有哪些集群模式?

RabbitMQ 有以下集群模式:

  • 普通集群
  • 镜像队列集群(高可用模式)
  • 联邦集群
  • 分片集群

所有集群模式均依赖 Erlang Cookie 实现节点间认证,需确保一致。

::: info 普通集群

:::

  • 核心特点
    • 元数据(队列、交换机等)全节点同步
    • 消息实体仅存于创建队列的节点(其他节点通过指针访问)
  • 优点
    • 节省存储(消息不冗余)
    • 横向扩展方便
  • 缺点
    • 单点故障风险:若某节点宕机,其上的队列消息不可用
    • 跨节点访问消息需网络传输

::: info 镜像队列集群(高可用模式)

:::

  • 核心特点
    • 队列跨节点镜像复制(消息实体全节点冗余)
    • 通过策略(Policy)定义镜像规则(如 ha-mode=all 表示全节点复制)
  • 优点
    • 高可用:任一节点宕机,其他节点可继续服务
    • 自动故障转移(消费者无感知)
  • 缺点
    • 存储开销大(消息全量复制)
    • 写入性能略低(需同步所有副本)

::: info 联邦集群(Federation)

:::

  • 核心特点
    • 跨机房/地域部署,消息按需异步转发
    • 基于插件(rabbitmq_federation)实现
  • 适用场景
    • 异地容灾
    • 多区域消息同步

::: info 分片集群(Sharding)

:::

  • 核心特点
    • 通过插件(rabbitmq_sharding)将队列水平拆分到不同节点
    • 生产者自动路由到对应分片
  • 适用场景
    • 超大规模队列(减轻单节点压力)

::: info 方案对比

:::

模式 数据冗余 高可用 跨地域 适用场景
普通集群 开发测试、低重要性数据
镜像队列 全量复制 生产环境(如订单、支付)
联邦集群 按需同步 异地多活
分片集群 超大规模队列

选择建议

  • 生产环境:优先使用 镜像队列集群(需权衡性能与冗余)
  • 异地容灾:结合 联邦集群 + 镜像队列
  • 海量数据:考虑 分片集群(但需业务适配)

【中等】RabbitMQ 如何实现延迟队列?

::: info 原生方案:TTL+死信队列(DLX)

:::

  • 核心原理:通过消息 TTL(存活时间)和死信交换机(DLX)实现延迟投递。

  • 实现步骤

    1. 创建延迟队列:设置 x-message-ttl(消息过期时间)和 x-dead-letter-exchange(死信交换机)
    2. 消息投递:发送到延迟队列,等待 TTL 到期
    3. 自动转发:过期后由 DLX 将消息路由到目标队列
    4. 消费者:从目标队列获取延迟消息
  • 特点

    • 固定延迟时间(每条消息 TTL 需单独设置)
    • 无需插件,但灵活性较差

::: info 插件方案:rabbitmq_delayed_message_exchange

:::

  • 核心原理:官方插件提供 x-delayed-message 交换机类型,支持动态延迟时间。

  • 实现步骤

    1. 启用插件:安装 rabbitmq_delayed_message_exchange
    2. 声明交换机:类型设为 x-delayed-message,并指定路由规则(如 direct/topic)
    3. 发送消息:通过 headers 设置 x-delay 参数(毫秒级延迟)
    4. 自动投递:插件内部调度,到期后投递到目标队列
  • 特点

    • 支持动态延迟时间(每条消息可独立设置)
    • 高精度(毫秒级)
    • 需额外安装插件

::: info 方案对比

:::

维度 TTL+DLX 插件方案
灵活性 固定延迟(队列级别) 动态延迟(消息级别)
精度 秒级 毫秒级
复杂度 无需插件,需配置 DLX 需安装插件
适用场景 简单延迟需求(如统一 30 秒延迟) 复杂延迟需求(如不同订单超时时间)
缺点 队列中消息若阻塞,会延迟后续消息投递(需确保 FIFO 消费) 大量延迟消息可能占用较高内存

示例代码(插件方案)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 声明延迟交换机
channel.exchange_declare(
exchange='delayed_exchange',
exchange_type='x-delayed-message', # 关键参数
arguments={'x-delayed-type': 'direct'}
)

# 发送延迟消息(延迟 5 秒)
channel.basic_publish(
exchange='delayed_exchange',
routing_key='order_queue',
body=message,
properties=pika.BasicProperties(
headers={'x-delay': 5000} # 延迟毫秒数
)
)

参考资料

RocketMQ 快速入门

Apache RocketMQ 是一个分布式 MQ 和流处理平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。

RocketMQ 由阿里巴巴孵化,被捐赠给 Apache,成为 Apache 的顶级项目。

RocketMQ 概念

img

消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

消息还可以具有可选 Tag 和额外的键值对。例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。

标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Tag 相当于子主题,为用户提供了额外的灵活性。对于 Tag,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的 Tag。

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名称服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名称服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

警告:考虑到提供的 Producer 在发送消息方面足够强大,每个 Producer 组只允许一个实例,以避免不必要的生成器实例初始化

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)

Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。

集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

RocketMQ 特性

订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

RocketMQ 组件

img

RocketMQ 由四部分组成:NameServer、Broker、Producer、Consumer。其中任意一个组成都可以水平扩展为集群模式,以避免单点故障问题。

NameServer(命名服务器)

NameServer 是一个 Topic 路由注册中心,其角色类似 Kafka 中的 zookeeper,支持 Broker 的动态注册与发现。每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 主要包括两个功能:

  • Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
  • 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由的信息。

NameServer 是一个功能齐全的服务器,主要包括两个功能:

  1. Broker 管理 - NameServer 接受来自 Broker 集群的注册,并提供心跳机制来检查 Broker 节点是否存活。
  2. 路由管理 - 每个 NameServer 将保存有关 Broker 集群的完整路由信息和客户端查询的查询队列。

RocketMQ 客户端(Producer/Consumer)将从 NameServer 查询队列路由信息。

将 NameServer 地址列表提供给客户端有四种方法:

  1. 编程方式 - 类似:producer.setNamesrvAddr("ip:port")
  2. Java 选项 - 使用 rocketmq.namesrv.addr 参数
  3. 环境变量 - 设置环境变量 NAMESRV_ADDR
  4. HTTP 端点

更详细信息可以参考官方文档:here

Broker(代理)

Broker 主要负责消息的存储、投递和查询以及服务高可用保证。

Broker 同时支持推拉模型,包含容错机制(2 副本或 3 副本),并提供强大的峰值填充和按原始时间顺序累积数千亿消息的能力。此外,Broker 提供了灾难恢复、丰富的指标统计和警报机制,这些都是传统 MQ 所缺乏的。

为了实现这些功能,Broker 包含了以下几个重要子模块:

  • Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。
  • Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
  • Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

img

Producer(生产者)

Producers 支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

Consumer(消费者)

Consumer 支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ 安装

环境要求

  • 推荐 64 位操作系统:Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git

下载解压

进入官方下载地址:https://rocketmq.apache.org/dowloading/releases/,选择合适版本

建议选择 binary 版本。

解压到本地:

1
2
> unzip rocketmq-all-4.2.0-source-release.zip
> cd rocketmq-all-4.2.0/

启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

1
2
3
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

收发消息

执行收发消息操作之前,不许告诉客户端命名服务器的位置。在 RocketMQ 中有多种方法来实现这个目的。这里,我们使用最简单的方法——设置环境变量 NAMESRV_ADDR

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

关闭服务器

1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMQ 入门级示例

首先在项目中引入 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>

Producer

Producer 在 RocketMQ 中负责发送消息。

RocketMQ 有三种消息发送方式:

  • 可靠的同步发送
  • 可靠的异步发送
  • 单项发送

可靠的同步发送

可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

可靠的异步发送

异步传输通常用于响应时间敏感的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

单向传输

单向传输用于需要中等可靠性的情况,例如日志收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);

}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

Consumer

Consumer 在 RocketMQ 中负责接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr(RocketConfig.HOST);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

RocketMQ 官方示例

参考资料

ActiveMQ 快速入门

JMS 基本概念

JMSJava 消息服务(Java Message Service)API,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

消息模型

JMS 有两种消息模型:

  • Point-to-Point(P2P)
  • Publish/Subscribe(Pub/Sub)

P2P 的特点

img

在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 javax.jms.Queue 相关联。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

接收者在成功接收消息之后需向队列应答成功。

如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。

Pub/Sub 的特点

img

发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 javax.jms.Topic 关联。

每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 编程模型

img

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactoryTopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnectionTopicConnection

Destination

Destination 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 Queue ,或者是主题 Topic 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 Queue,以及发布者/订阅者模型的 Topic

Session

Session 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,Session 也分 QueueSessionTopicSession

MessageConsumer

MessageConsumerSession 创建,并用于将消息发送到 Destination。消费者可以同步地(阻塞模式),或(非阻塞)接收 QueueTopic 类型的消息。同样,消息生产者分两种类型:QueueSenderTopicPublisher

MessageProducer

MessageProducerSession 创建,用于接收被发送到 Destination 的消息。MessageProducer 有两种类型:QueueReceiverTopicSubscriber。可分别通过 sessioncreateReceiver(Queue)createSubscriber(Topic) 来创建。当然,也可以 sessioncreatDurableSubscriber 方法来创建持久化的订阅者。

Message

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

Common Point-to-Point Publish-Subscribe
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageSender QueueReceiver, QueueBrowser TopicSubscriber

安装

安装条件

JDK1.7 及以上版本

本地配置了 JAVA_HOME 环境变量。

下载

支持 Windows/Unix/Linux/Cygwin

ActiveMQ 官方下载地址

Windows 下运行

(1)解压压缩包到本地;

(2)打开控制台,进入安装目录的 bin 目录下;

1
cd [activemq_install_dir]

(3)执行 activemq start 来启动 ActiveMQ

1
bin\activemq start

测试安装是否成功

(1)ActiveMQ 默认监听端口为 61616

1
netstat -an|find “61616”

(2)访问 http://127.0.0.1:8161/admin/

(3)输入用户名、密码

1
2
Login: admin
Passwort: admin

(4)点击导航栏的 Queues 菜单

(5)添加一个队列(queue)

项目中的应用

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>

Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Sender {
private static final int SEND_NUMBER = 4;

public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}

Receiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

运行

先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。

输出

Send 的输出内容

1
2
3
4
发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3

Receiver 的输出内容

1
2
3
4
收到消息ActiveMQ 发送消息0
收到消息ActiveMQ 发送消息1
收到消息ActiveMQ 发送消息2
收到消息ActiveMQ 发送消息3

资源

Flink API

Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。

Programming levels of abstraction

  • Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

    Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。

  • Flink API 第三层抽象是 Table APITable API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。

    表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table APIDataStream/DataSet API 混合使用。

  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 _Table API_,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

ProcessFunction

ProcessFunction 是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。

下面的代码示例展示了如何在 KeyedStream 上利用 KeyedProcessFunction 对标记为 STARTEND 的事件进行处理。当收到 START 事件时,处理函数会记录其时间戳,并且注册一个时长 4 小时的计时器。如果在计时器结束之前收到 END 事件,处理函数会计算其与上一个 START 事件的时间间隔,清空状态并将计算结果返回。否则,计时器结束,并清空状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**

* 将相邻的 keyed START 和 END 事件相匹配并计算两者的时间间隔
* 输入数据为 Tuple2<String, String> 类型,第一个字段为 key 值,
* 第二个字段标记 START 和 END 事件。
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

private ValueState<Long> startTime;

@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}

/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {

switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}

/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {

// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}

这个例子充分展现了 KeyedProcessFunction 强大的表达力,也因此是一个实现相当复杂的接口。

DataStream API

DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()reduce()aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。

下面的代码示例展示了如何捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 网站点击 Click 的数据流
DataStream<Click> clicks = ...

DataStream<Tuple2<String, Long>> result = clicks
// 将网站点击映射为 (userId, 1) 以便计数
.map(
// 实现 MapFunction 接口定义函数
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以 userId (field 0) 作为 key
.keyBy(0)
// 定义 30 分钟超时的会话窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 对每个会话窗口的点击进行计数,使用 lambda 表达式定义 reduce 函数
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

SQL & Table API

Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

Flink 的关系型 API 旨在简化数据分析数据流水线和 ETL 应用的定义。

下面的代码示例展示了如何使用 SQL 语句查询捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。此示例与上述 DataStream API 中的示例有着相同的逻辑。

1
2
3
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId

Flink 具有数个适用于常见数据处理应用场景的扩展库。这些库通常嵌入在 API 中,且并不完全独立于其它 API。它们也因此可以受益于 API 的所有特性,并与其他库集成。

  • **复杂事件处理(CEP)**:模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。
  • **DataSet API*:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括mapreduce_、(outer) join_、_co-group_、iterate*等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序(external merge-sort)。
  • Gelly: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API

参考资料

Flink ETL

Apache Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用 Flink 的 DataStream API 实现这类应用。

这里注意,Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论你最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

本节涵盖了 map()flatmap(),这两种算子可以用来实现无状态转换的基本操作。

map()

在第一个练习中,你将过滤出租车行程数据中的事件。在同一代码仓库中,有一个 GeoUtils 类,提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),它可以将位置坐标(经度,维度)映射到 100x100 米的对应不同区域的网格单元。

现在让我们为每个出租车行程时间的数据对象增加 startCellendCell 字段。你可以创建一个继承 TaxiRideEnrichedRide 类,添加这些字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;

public EnrichedRide() {}

public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}

public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}

然后你可以创建一个应用来转换这个流

1
2
3
4
5
6
7
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());

enrichedNYCRides.print();

使用这个 MapFunction:

1
2
3
4
5
6
7
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}

flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。对于除此以外的场景,你将要使用 flatmap()

1
2
3
4
5
6
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());

enrichedNYCRides.print();

其中用到的 FlatMapFunction :

1
2
3
4
5
6
7
8
9
10
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector) 实现。

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

通过计算得到键

KeySelector 不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了 hashCode()equals()。这些限制条件不包括产生随机数或者返回 Arrays 或 Enums 的 KeySelector,但你可以用元组和 POJO 来组成键,只要他们的元素遵循上述条件。

键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。

例如,比起创建一个新的带有 startCell 字段的 EnrichedRide 类,用这个字段作为 key:

1
keyBy(enrichedRide -> enrichedRide.startCell)

我们更倾向于这样做:

1
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream 的聚合

以下代码为每个行程结束事件创建了一个新的包含 startCell 和时长(分钟)的元组流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});

现在就可以产生一个流,对每个 startCell 仅包含那些最长行程的数据。

有很多种方法表示使用哪个字段作为键。前面使用 EnrichedRide POJO 的例子,用字段名来指定键。而这个使用 Tuple2 对象的例子中,用字段在元组中的序号(从 0 开始)来指定键。

1
2
3
4
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();

现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应 50797 网格单元的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

(隐式的)状态

这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink 必须跟踪每个不同的键的最大时长。

只要应用中有状态,你就应该考虑状态的大小。如果键值的数量是无限的,那 Flink 的状态需要的空间也同样是无限的。

在流处理场景中,考虑有限窗口的聚合往往比整个流聚合更有意义。

reduce() 和其他聚合算子

上面用到的 maxBy() 只是 Flink 中 KeyedStream 上众多聚合函数中的一个。还有一个更通用的 reduce() 函数可以用来实现你的自定义聚合。

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunctionMapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

一个使用 Keyed State 的例子

在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 DeduplicatorRichFlatMapFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static class Event {
public final String key;
public final long timestamp;
...
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();

env.execute();
}

为了实现这个功能,Deduplicator 需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。

当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。

Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean 类型的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open() 方法通过定义 ValueStateDescriptor<Boolean> 建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}

@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}

当 flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null 时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeentrue

这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator 的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunctionopen 方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

1
ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean 类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear() 来实现,如下:

1
keyHasBeenSeen.clear()

对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction 的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Non-keyed State

在没有键的上下文中我们也可以使用 Flink 管理的状态。这也被称作 算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来说使用 non-keyed state 是不太常见的,所以这里就不多介绍了。这个特性最常用于 source 和 sink 的实现。

Connected Streams

相比于下面这种预先定义的转换:

simple transformation

有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

示例

在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunctionRichCoFlatMapFunction 作用于连接的流来实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);

DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);

control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();

env.execute();
}

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}

@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)

认识到你没法控制 flatMap1flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

参考资料