Dunwu Blog

大道至简,知易行难

Java 进程内缓存

关键词:ConcurrentHashMap、LRUHashMap、Guava Cache、Caffeine、Ehcache

一、ConcurrentHashMap

最简单的进程内缓存可以通过 JDK 自带的 HashMapConcurrentHashMap 实现。

适用场景:不需要淘汰的缓存数据

缺点:无法进行缓存淘汰,内存会无限制的增长。

二、LRUHashMap

可以通过**继承 LinkedHashMap 来实现一个简单的 LRUHashMap**,即可完成一个简单的 LRU (最近最少使用)算法。

缺点:

  • 锁竞争严重,性能比较低。
  • 不支持过期时间
  • 不支持自动刷新

【示例】LRUHashMap 的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class LRUCache extends LinkedHashMap {

private final int max;
private Object lock;

public LRUCache(int max) {
//无需扩容
super((int) (max * 1.4f), 0.75f, true);
this.max = max;
this.lock = new Object();
}

/**
* 重写LinkedHashMap的removeEldestEntry方法即可 在Put的时候判断,如果为true,就会删除最老的
*
* @param eldest
* @return
*/
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > max;
}

public Object getValue(Object key) {
synchronized (lock) {
return get(key);
}
}

public void putValue(Object key, Object value) {
synchronized (lock) {
put(key, value);
}
}

public boolean removeValue(Object key) {
synchronized (lock) {
return remove(key) != null;
}
}

public boolean removeAll() {
clear();
return true;
}

}

三、Guava Cache

Guava Cache 解决了 LRUHashMap 中的几个缺点。

Guava Cache 提供了基于容量,时间和引用的缓存回收方式。基于容量的方式内部实现采用 LRU 算法,基于引用回收很好的利用了 Java 虚拟机的垃圾回收机制。

其中的缓存构造器 CacheBuilder 采用构建者模式提供了设置好各种参数的缓存对象。缓存核心类 LocalCache 里面的内部类 Segment 与 jdk1.7 及以前的 ConcurrentHashMap 非常相似,分段加锁,减少锁竞争,并且都继承于 ReetrantLock,还有六个队列,以实现丰富的本地缓存方案。Guava Cache 对于过期的 Entry 并没有马上过期(也就是并没有后台线程一直在扫),而是通过进行读写操作的时候进行过期处理,这样做的好处是避免后台线程扫描的时候进行全局加锁。

直接通过查询,判断其是否满足刷新条件,进行刷新。

Guava Cache 缓存回收

Guava Cache 提供了三种基本的缓存回收方式。

基于容量回收

maximumSize(long):当缓存中的元素数量超过指定值时触发回收。

基于定时回收

  • expireAfterAccess(long, TimeUnit):缓存项在给定时间内没有被读/写访问,则回收。请注意这种缓存的回收顺序和基于大小回收一样。
  • expireAfterWrite(long, TimeUnit):缓存项在给定时间内没有被写访问(创建或覆盖),则回收。如果认为缓存数据总是在固定时候后变得陈旧不可用,这种回收方式是可取的。

如下文所讨论,定时回收周期性地在写操作中执行,偶尔在读操作中执行。

基于引用回收

  • CacheBuilder.weakKeys():使用弱引用存储键。当键没有其它(强或软)引用时,缓存项可以被垃圾回收。
  • CacheBuilder.weakValues():使用弱引用存储值。当值没有其它(强或软)引用时,缓存项可以被垃圾回收。
  • CacheBuilder.softValues():使用软引用存储值。软引用只有在响应内存需要时,才按照全局最近最少使用的顺序回收。

Guava Cache 核心 API

CacheBuilder

缓存构建器。构建缓存的入口,指定缓存配置参数并初始化本地缓存。
主要采用 builder 的模式,CacheBuilder 的每一个方法都返回这个 CacheBuilder 知道 build 方法的调用。
注意 build 方法有重载,带有参数的为构建一个具有数据加载功能的缓存,不带参数的构建一个没有数据加载功能的缓存。

LocalManualCache

作为 LocalCache 的一个内部类,在构造方法里面会把 LocalCache 类型的变量传入,并且调用方法时都直接或者间接调用 LocalCache 里面的方法。

LocalLoadingCache

可以看到该类继承了 LocalManualCache 并实现接口 LoadingCache。
覆盖了 get,getUnchecked 等方法。

LocalCache

Guava Cache 中的核心类,重点了解。

LocalCache 的数据结构与 ConcurrentHashMap 很相似,都由多个 segment 组成,且各 segment 相对独立,互不影响,所以能支持并行操作。每个 segment 由一个 table 和若干队列组成。缓存数据存储在 table 中,其类型为 AtomicReferenceArray。

四、Caffeine

caffeine 是一个使用 JDK8 改进 Guava 缓存的高性能缓存库。

Caffeine 实现了 W-TinyLFU(LFU + LRU 算法的变种),其命中率和读写吞吐量大大优于 Guava Cache

其实现原理较复杂,可以参考你应该知道的缓存进化史

五、Ehcache

参考:Ehcache

六、进程内缓存对比

常用进程内缓存技术对比:

比较项 ConcurrentHashMap LRUMap Ehcache Guava Cache Caffeine
读写性能 很好,分段锁 一般,全局加锁 好,需要做淘汰操作 很好
淘汰算法 LRU,一般 支持多种淘汰算法,LRU,LFU,FIFO LRU,一般 W-TinyLFU, 很好
功能丰富程度 功能比较简单 功能比较单一 功能很丰富 功能很丰富,支持刷新和虚引用等 功能和 Guava Cache 类似
工具大小 jdk 自带类,很小 基于 LinkedHashMap,较小 很大,最新版本 1.4MB 是 Guava 工具类中的一个小部分,较小 一般,最新版本 644KB
是否持久化
是否支持集群
  • ConcurrentHashMap - 比较适合缓存比较固定不变的元素,且缓存的数量较小的。虽然从上面表格中比起来有点逊色,但是其由于是 JDK 自带的类,在各种框架中依然有大量的使用,比如我们可以用来缓存我们反射的 Method,Field 等等;也可以缓存一些链接,防止其重复建立。在 Caffeine 中也是使用的 ConcurrentHashMap 来存储元素。
  • LRUMap - 如果不想引入第三方包,又想使用淘汰算法淘汰数据,可以使用这个。
  • Ehcache - 由于其 jar 包很大,较重量级。对于需要持久化和集群的一些功能的,可以选择 Ehcache。需要注意的是,虽然 Ehcache 也支持分布式缓存,但是由于其节点间通信方式为 rmi,表现不如 Redis,所以一般不建议用它来作为分布式缓存。
  • Guava Cache - Guava 这个 jar 包在很多 Java 应用程序中都有大量的引入,所以很多时候其实是直接用就好了,并且其本身是轻量级的而且功能较为丰富,在不了解 Caffeine 的情况下可以选择 Guava Cache。
  • Caffeine - 其在命中率,读写性能上都比 Guava Cache 好很多,并且其 API 和 Guava cache 基本一致,甚至会多一点。在真实环境中使用 Caffeine,取得过不错的效果。

总结一下:**如果不需要淘汰算法则选择 ConcurrentHashMap,如果需要淘汰算法和一些丰富的 API,推荐选择 Caffeine**。

参考资料

Http 缓存

HTTP 缓存分为 2 种,一种是强缓存,另一种是协商缓存。主要作用是可以加快资源获取速度,提升用户体验,减少网络传输,缓解服务端的压力。

Http 强缓存

不需要发送请求到服务端,直接读取浏览器本地缓存,在 Chrome 的 Network 中显示的 HTTP 状态码是 200 ,在 Chrome 中,强缓存又分为 Disk Cache (存放在硬盘中)和 Memory Cache (存放在内存中),存放的位置是由浏览器控制的。是否强缓存由 ExpiresCache-ControlPragma 3 个 Header 属性共同来控制。

Expires

Expires 的值是一个 HTTP 日期,在浏览器发起请求时,会根据系统时间和 Expires 的值进行比较,如果系统时间超过了 Expires 的值,缓存失效。由于和系统时间进行比较,所以当系统时间和服务器时间不一致的时候,会有缓存有效期不准的问题。Expires 的优先级在三个 Header 属性中是最低的。

Cache-Control

Cache-Control 是 HTTP/1.1 中新增的属性,在请求头和响应头中都可以使用,常用的属性值如有:

  • max-age:单位是秒,缓存时间计算的方式是距离发起的时间的秒数,超过间隔的秒数缓存失效
  • no-cache:不使用强缓存,需要与服务器验证缓存是否新鲜
  • no-store:禁止使用缓存(包括协商缓存),每次都向服务器请求最新的资源
  • private:专用于个人的缓存,中间代理、CDN 等不能缓存此响应
  • public:响应可以被中间代理、CDN 等缓存
  • must-revalidate:在缓存过期前可以使用,过期后必须向服务器验证

Pragma

Pragma 只有一个属性值,就是 no-cache ,效果和 Cache-Control 中的 no-cache 一致,不使用强缓存,需要与服务器验证缓存是否新鲜,在 3 个头部属性中的优先级最高。

协商缓存

当浏览器的强缓存失效的时候或者请求头中设置了不走强缓存,并且在请求头中设置了 If-Modified-Since 或者 If-None-Match 的时候,会将这两个属性值到服务端去验证是否命中协商缓存,如果命中了协商缓存,会返回 304 状态,加载浏览器缓存,并且响应头会设置 Last-Modified 或者 ETag 属性。

ETag/If-None-Match

Etag: 服务器响应请求时,通过此字段告诉浏览器当前资源在服务器生成的唯一标识(生成规则由服务器决定)

If-None-Match: 再次请求服务器时,浏览器的请求报文头部会包含此字段,后面的值为在缓存中获取的标识。服务器接收到次报文后发现 If-None-Match 则与被请求资源的唯一标识进行对比。

  1. 不同,说明资源被改动过,则响应整个资源内容,返回状态码 200。
  2. 相同,说明资源无心修改,则响应 header,浏览器直接从缓存中获取数据信息。返回状态码 304.

但是实际应用中由于 Etag 的计算是使用算法来得出的,而算法会占用服务端计算的资源,所有服务端的资源都是宝贵的,所以就很少使用 Etag 了。

Last-Modified/If-Modified-Since

Last-Modified: 服务器在响应请求时,会告诉浏览器资源的最后修改时间。

if-Modified-Since: 浏览器再次请求服务器的时候,请求头会包含此字段,后面跟着在缓存中获得的最后修改时间。服务端收到此请求头发现有 if-Modified-Since,则与被请求资源的最后修改时间进行对比,如果一致则返回 304 和响应报文头,浏览器只需要从缓存中获取信息即可。 从字面上看,就是说:从某个时间节点算起,是否文件被修改了

  1. 如果真的被修改:那么开始传输响应一个整体,服务器返回:200 OK
  2. 如果没有被修改:那么只需传输响应 header,服务器返回:304 Not Modified

if-Unmodified-Since: 从字面上看, 就是说: 从某个时间点算起, 是否文件没有被修改

  1. 如果没有被修改:则开始`继续’传送文件: 服务器返回: 200 OK
  2. 如果文件被修改:则不传输,服务器返回: 412 Precondition failed (预处理错误)

这两个的区别是一个是修改了才下载一个是没修改才下载。 Last-Modified 说好却也不是特别好,因为如果在服务器上,一个资源被修改了,但其实际内容根本没发生改变,会因为 Last-Modified 时间匹配不上而返回了整个实体给客户端(即使客户端缓存里有个一模一样的资源)。为了解决这个问题,HTTP1.1 推出了 Etag。

参考资料

Hystrix 快速入门

Hystrix 简介

Hystrix 是什么

Hystrix 是由 Netflix 开源,用于处理分布式系统的延迟和容错的一个开源组件。在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix 采用断路器模式来实现服务间的彼此隔离,从而避免级联故障,以提高分布式系统整体的弹性。

“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。

Hystrix 官方已宣布不再发布新版本。但是,Hystrix 的断路器设计理念,有非常高的学习价值。

为什么需要 Hystrix

复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。

例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:

99.9930 = 99.7% 的正常运行时间

10 亿个请求中的 0.3%= 3,000,000 个失败

即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。

然而,现实情况一般比这种估量情况更糟糕。


当一切正常时,整体系统如下所示:

img

在高并发场景,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖 I 出现不可用,但是其他依赖服务是可用的。

img

但是,在高并发场景下,当依赖 I 阻塞时,大多数服务器的线程池就出现阻塞(BLOCK)。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图:

img

Hystrix 就是为了解决这类问题而应运而生。

Hystrix 的功能

Hystrix 具有以下功能:

  • 避免资源耗尽:阻止任何一个依赖服务耗尽所有的资源,比如 tomcat 中的所有线程资源。
  • 避免请求排队和积压:采用限流和 fail fast 来控制故障。
  • 支持降级:提供 fallback 降级机制来应对故障。
  • 资源隔离:比如 bulkhead(舱壁隔离技术)、swimlane(泳道技术)、circuit breaker(断路技术)来限制任何一个依赖服务的故障的影响。
  • 统计/监控/报警:通过近实时的统计/监控/报警功能,来提高故障发现的速度。
  • 通过近实时的属性和配置热修改功能,来提高故障处理和恢复的速度。
  • 保护依赖服务调用的所有故障情况,而不仅仅只是网络故障情况。

如果使用 Hystrix 对每个基础依赖服务进行过载保护,则整个系统架构将会类似下图所示,每个依赖项彼此隔离,受到延迟时发生饱和的资源的被限制访问,并包含 fallback 逻辑(用于降级处理),该逻辑决定了在依赖项中发生任何类型的故障时做出对应的处理。

img

Hystrix 原理

如下图所示,Hystrix 的工作流程大致可以分为 9 个步骤。

img

(一)构建一个 HystrixCommand 或 HystrixObservableCommand 对象

Hystrix 进行资源隔离,其实是提供了一个抽象,叫做命令模式。这也是 Hystrix 最基本的资源隔离技术。

在使用 Hystrix 的过程中,会对依赖服务的调用请求封装成命令对象,Hystrix 对 命令对象抽象了两个抽象类:HystrixCommandHystrixObservableCommand

  • HystrixCommand 表示的命令对象会返回一个唯一返回值。
  • HystrixObservableCommand 表示的命令对象 会返回多个返回值。
1
2
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

(二)执行命令

Hystrix 中共有 4 种方式执行命令,如下所示:

执行方式 说明 可用对象
execute() 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
queue() 异步执行,通过 Future 返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
observe() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。代调用代码先执行(Hot Obserable) HystrixObservableCommand
toObservable() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。执行代码等到真正订阅的时候才会执行(cold observable) HystrixObservableCommand

这四种命令中,exeucte()queue()observe() 的表示其实是通过 toObservable() 实现的,其转换关系如下图所示:

img

HystrixCommand 执行方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
K value   = command.execute();
// 等价语句:
K value = command.execute().queue().get();


Future<K> fValue = command.queue();
//等价语句:
Future<K> fValue = command.toObservable().toBlocking().toFuture();


Observable<K> ohValue = command.observe(); //hot observable,立刻订阅,命令立刻执行
//等价语句:
Observable<K> ohValue = command.toObservable().subscribe(subject);

// 上述执行最终实现还是基于 toObservable()
Observable<K> ocValue = command.toObservable(); //cold observable,延后订阅,订阅发生后,执行才真正执行

(三)是否缓存

如果当前命令对象启用了请求缓存,并且请求的响应存在于缓存中,则缓存的响应会立刻以 Observable 的形式返回。

(四)是否开启断路器

如果第三步没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则 Hystrix 将不会执行此 Command 命令,直接执行步骤 8 调用 Fallback;

如果断路器状态是关闭,则执行步骤 5 检查是否有足够的资源运行 Command 命令

(五)信号量、线程池是否拒绝

当您执行该命令时,Hystrix 会检查断路器以查看电路是否打开。

如果电路开路(或“跳闸”),则 Hystrix 将不会执行该命令,而是将流程路由到 (8) 获取回退。

如果电路闭合,则流程前进至 (5) 以检查是否有可用容量来运行命令。

如果当前要执行的 Command 命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix 将不会运行 Command 命令,直接执行 步骤 8的 Fallback 降级处理;如果未满,表示有剩余的资源执行 Command 命令,则执行步骤 6

(六)construct() 或 run()

当经过步骤 5 判断,有足够的资源执行 Command 命令时,本步骤将调用 Command 命令运行方法,基于不同类型的 Command,有如下两种两种运行方式:

运行方式 说明
HystrixCommand.run() 返回一个处理结果或者抛出一个异常
HystrixObservableCommand.construct() 返回一个 Observable 表示的结果(可能多个),或者 基于onError的错误通知

如果run() 或者construct()方法 的真实执行时间超过了 Command 设置的超时时间阈值, 则当前则执行线程(或者是独立的定时器线程)将会抛出TimeoutException。抛出超时异常 TimeoutException,后,将执行步骤 8的 Fallback 降级处理。即使run()或者construct()执行没有被取消或中断,最终能够处理返回结果,但在降级处理逻辑中,将会抛弃run()construct()方法的返回结果,而返回 Fallback 降级处理结果。

注意事项
需要注意的是,Hystrix 无法强制 将正在运行的线程停止掉–Hystrix 能够做的最好的方式就是在 JVM 中抛出一个InterruptedException。如果 Hystrix 包装的工作不抛出中断异常InterruptedException, 则在 Hystrix 线程池中的线程将会继续执行,尽管调用的客户端已经接收到了TimeoutException。这种方式会使 Hystrix 的线程池处于饱和状态。大部分的 Java Http Client 开源库并不会解析 InterruptedException。所以确认 HTTP client 相关的连接和读/写相关的超时时间设置。
如果 Command 命令没有抛出任何异常,并且有返回结果,则 Hystrix 将会在做完日志记录和统计之后会将结果返回。 如果是通过run()方式运行,则返回一个Obserable对象,包含一个唯一值,并且发送一个onCompleted通知;如果是通过consturct()方式运行 ,则返回一个Observable对象

(七)健康检查

Hystrix 会统计 Command 命令执行执行过程中的成功数失败数拒绝数超时数,将这些信息记录到断路器(Circuit Breaker)中。断路器将上述统计按照时间窗的形式记录到一个定长数组中。断路器根据时间窗内的统计数据去判定请求什么时候可以被熔断,熔断后,在接下来一段恢复周期内,相同的请求过来后会直接被熔断。当再次校验,如果健康监测通过后,熔断开关将会被关闭。

(八)获取 Fallback

当以下场景出现后,Hystrix 将会尝试触发 Fallback:

  • 步骤 6 Command 执行时抛出了任何异常;
  • 步骤 4 断路器已经被打开
  • 步骤 5 执行命令的线程池、队列或者信号量资源已满
  • 命令执行的时间超过阈值

(九)返回结果

如果 Hystrix 命令对象执行成功,将会返回结果,或者以Observable形式包装的结果。根据步骤 2的 command 调用方式,返回的Observable 会按照如下图说是的转换关系进行返回:

img

  • execute() — 用和 .queue() 相同的方式获取 Future,然后调用 Futureget() 以获取 Observable 的单个值。
  • queue() —将 Observable 转换为 BlockingObservable,以便可以将其转换为 Future 并返回。
  • watch() —订阅 Observable 并开始执行命令的流程; 返回一个 Observable,当订阅该 Observable 时,它会重新通知。
  • toObservable() —返回不变的 Observable; 必须订阅它才能真正开始执行命令的流程。

断路器工作原理

img

  1. 断路器时间窗内的请求数 是否超过了请求数断路器生效阈值circuitBreaker.requestVolumeThreshold,如果超过了阈值,则将会触发断路,断路状态为开启
    例如,如果当前阈值设置的是20,则当时间窗内统计的请求数共计 19 个,即使 19 个全部失败了,都不会触发断路器。
  2. 并且请求错误率超过了请求错误率阈值errorThresholdPercentage
  3. 如果两个都满足,则将断路器由关闭迁移到开启
  4. 如果断路器开启,则后续的所有相同请求将会被断路掉;
  5. 直到过了沉睡时间窗sleepWindowInMilliseconds后,再发起请求时,允许其通过(此时的状态为半开起状态)。如果请求失败了,则保持断路器状态为开启状态,并更新沉睡时间窗。如果请求成功了,则将断路器状态改为关闭状态;

核心的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}

系统指标

Hystrix 对系统指标的统计是基于时间窗模式的:

时间窗:最近的一个时间区间内,比如前一小时到现在,那么时间窗的长度就是1小时
:桶是在特定的时间窗内,等分的指标收集的统计集合;比如时间窗的长度为1小时,而桶的数量为10,那么每个桶在时间轴上依次排开,时间由远及近,每个桶统计的时间分片为 1h / 10 = 6 min 6 分钟。一个桶中,包含了成功数失败数超时数拒绝数 四个指标。

在系统内,时间窗会随着系统的运行逐渐向前移动,而时间窗的长度和桶的数量是固定不变的,那么随着时间的移动,会出现较久的过期的桶被移除出去,新的桶被添加进来,如下图所示:

img

资源隔离技术

线程池隔离

如下图所示,由于计算机系统的基本执行单位就是线程,线程具备独立的执行能力,所以,为了做到资源保护,需要对系统的线程池进行划分,对于外部调用方

1
User Request

的请求,调用各个线程池的服务,各个线程池独立完成调用,然后将结果返回

1
调用方

。在调用服务的过程中,如果

1
服务提供方

执行时间过长,则

1
调用方

可以直接以超时的方式直接返回,快速失败。

img

线程池隔离的几点好处

  1. 使用超时返回的机制,避免同步调用服务时,调用时间过长,无法释放,导致资源耗尽的情况
  2. 服务方可以控制请求数量,请求过多,可以直接拒绝,达到快速失败的目的;
  3. 请求排队,线程池可以维护执行队列,将请求压到队列中处理

举个例子,如下代码段,模拟了同步调用服务的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//服务提供方,执行服务的时候模拟2分钟的耗时
Callable<String> callableService = ()->{
long start = System.currentTimeMillis();
while(System.currentTimeMillis()-start> 1000 * 60 *2){
//模拟服务执行时间过长的情况
}
return "OK";
};

//模拟10个客户端调用服务
ExecutorService clients = Executors.newFixedThreadPool(10);
//模拟给10个客户端提交处理请求
for (int i = 0; i < 20; i++) {
clients.execute(()->{
//同步调用
try {
String result = callableService.call();
System.out.println("当前客户端:"+Thread.currentThread().getName()+"调用服务完成,得到结果:"+result);
} catch (Exception e) {
e.printStackTrace();
}
});
}

在此环节中,客户端 clients必须等待服务方返回结果之后,才能接收新的请求。如果用吞吐量来衡量系统的话,会发现系统的处理能力比较低。为了提高相应时间,可以借助线程池的方式,设置超时时间,这样的话,客户端就不需要必须等待服务方返回,如果时间过长,可以提前返回,改造后的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//服务提供方,执行服务的时候模拟2分钟的耗时
Callable<String> callableService = ()->{
long start = System.currentTimeMillis();
while(System.currentTimeMillis()-start> 1000 * 60 *2){
//模拟服务执行时间过长的情况
}
return "OK";
};

//创建线程池作为服务方
ExecutorService executorService = Executors.newFixedThreadPool(30);


//模拟10个客户端调用服务
ExecutorService clients = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
clients.execute(()->{
//同步调用
//将请求提交给线程池执行,Callable 和 Runnable在某种意义上,也是Command对象
Future<String> future = executorService.submit(callableService::call);
//在指定的时间内获取结果,如果超时,调用方可以直接返回
try {
String result = future.get(1000, TimeUnit.SECONDS);
//客户端等待时间之后,快速返回
System.out.println("当前客户端:"+Thread.currentThread().getName()+"调用服务完成,得到结果:"+result);
}catch (TimeoutException timeoutException){
System.out.println("服务调用超时,返回处理");
} catch (InterruptedException e) {

} catch (ExecutionException e) {
}
});
}

如果我们将服务方的线程池设置为:

1
2
3
4
ThreadPoolExecutor executorService = new ThreadPoolExecutor(10,1000,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.DiscardPolicy() // 提交请求过多时,可以丢弃请求,避免死等阻塞的情况。
)

线程池隔离模式的弊端

线程池隔离模式,会根据服务划分出独立的线程池,系统资源的线程并发数是有限的,当线程数过多,系统话费大量的 CPU 时间来做线程上下文切换的无用操作,反而降低系统性能;如果线程池隔离的过多,会导致真正用于接收用户请求的线程就相应地减少,系统吞吐量反而下降;
在实践上,应当对像远程方法调用,网络资源请求这种服务时间不太可控的场景下使用线程池隔离模式处理
如下图所示,是线程池隔离模式的三种场景:

img

信号量隔离

由于基于线程池隔离的模式占用系统线程池资源,Hystrix 还提供了另外一个隔离技术:基于信号量的隔离。

基于信号量的隔离方式非常地简单,其核心就是使用共用变量

1
semaphore

进行原子操作,控制线程的并发量,当并发量达到一定量级时,服务禁止调用。如下图所示:信号量本身不会消耗多余的线程资源,所以就非常轻量。

img

基于信号量隔离的利弊

利:基于信号量的隔离,利用 JVM 的原子性 CAS 操作,避免了资源锁的竞争,省去了线程池开销,效率非常高;
弊:本质上基于信号量的隔离是同步行为,所以无法做到超时熔断,所以服务方自身要控制住执行时间,避免超时。
应用场景:业务服务上,有并发上限限制时,可以考虑此方式 > Alibaba Sentinel开源框架,就是基于信号量的熔断和断路器框架。

Hystrix 应用

  • Hystrix 配置无法动态调节生效。Hystrix 框架本身是使用的Archaius框架完成的配置加载和刷新,但是集成自 Spring Cloud 下,无法有效地根据实时监控结果,动态调整熔断和系统参数
  • 线程池和 Command 之间的配置比较复杂,在 Spring Cloud 在做 feigin-hystrix 集成的时候,还有些 BUG,对 command 的默认配置没有处理好,导致所有 command 占用公共的 command 线程池,没有细粒度控制,还需要做框架适配调整
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface SetterFactory {

/**
* Returns a hystrix setter appropriate for the given target and method
*/
HystrixCommand.Setter create(Target<?> target, Method method);

/**
* Default behavior is to derive the group key from {@link Target#name()} and the command key from
* {@link Feign#configKey(Class, Method)}.
*/
final class Default implements SetterFactory {

@Override
public HystrixCommand.Setter create(Target<?> target, Method method) {
String groupKey = target.name();
String commandKey = Feign.configKey(target.type(), method);
return HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
//没有处理好default配置项的加载
}
}
}

Hystrix 配置

详细配置可以参考 Hystrix 官方配置手册,这里仅介绍比较核心的配置

执行配置

以下配置用于控制 HystrixCommand.run() 如何执行。

配置项 说明 默认值
execution.isolation.strategy 线程隔离(THREAD)或信号量隔离(SEMAPHORE) THREAD
execution.isolation.thread.timeoutInMilliseconds 方法执行超时时间 1000(ms)
execution.isolation.semaphore.maxConcurrentRequests 信号量隔离最大并发数 10

断路配置

以下配置用于控制 HystrixCircuitBreaker 的断路处理。

配置项 说明 默认值
circuitBreaker.enabled 是否开启断路器 true
circuitBreaker.requestVolumeThreshold 断路器启用请求数阈值 20
circuitBreaker.sleepWindowInMilliseconds 断路器启用后的休眠时间 5000(ms)
circuitBreaker.errorThresholdPercentage 断路器启用失败率阈值 50(%)
circuitBreaker.forceOpen 是否强制将断路器设置成开启状态 false
circuitBreaker.forceClosed 是否强制将断路器设置成关闭状态 false

指标配置

以下配置用于从 HystrixCommand 和 HystrixObservableCommand 执行中捕获相关指标。

配置项 说明 默认值
metrics.rollingStats.timeInMilliseconds 时间窗的长度 10000(ms)
metrics.rollingStats.numBuckets 桶的数量,需要保证timeInMilliseconds % numBuckets =0 10
metrics.rollingPercentile.enabled 是否统计运行延迟的占比 true
metrics.rollingPercentile.timeInMilliseconds 运行延迟占比统计的时间窗 60000(ms)
metrics.rollingPercentile.numBuckets 运行延迟占比统计的桶数 6
metrics.rollingPercentile.bucketSize 百分比统计桶的容量,桶内最多保存的运行时间统计 100
metrics.healthSnapshot.intervalInMilliseconds 统计快照刷新间隔 500 (ms)

线程池配置

以下配置用于控制 Hystrix Command 执行所使用的线程池。

配置项 说明 默认值
coreSize 线程池核心线程数 10
maximumSize 线程池最大线程数 10
maxQueueSize 最大 LinkedBlockingQueue 的大小,-1 表示用 SynchronousQueue -1
queueSizeRejectionThreshold 队列大小阈值,超过则拒绝 5
allowMaximumSizeToDivergeFromCoreSize 此属性允许 maximumSize 的配置生效。该值可以等于或大于 coreSize。设置 coreSize <maximumSize 使得线程池可以维持 maximumSize 并发性,但是会在相对空闲时将线程回收。(取决于 keepAliveTimeInMinutes) false

其他限流技术

  • resilience4j
    Hystrix 虽然官方宣布不再维护,其推荐另外一个框架:resilience4j, 这个框架是是为 Java 8 和 函数式编程设计的一个轻量级的容错框架,该框架充分利用函数式编程的概念,为函数式接口lamda表达式方法引用高阶函数进行包装,(本质上是装饰者模式的概念),通过包装实现断路限流重试舱壁功能。
    这个框架整体而言比较轻量,没有控制台,不太好做系统级监控;

  • Alibaba Sentinel

    1
    Sentinel

    是 阿里巴巴开源的轻量级的流量控制、熔断降级 Java 库,该库的核心是使用的是信号量隔离的方式做流量控制和熔断,其优点是其集成性和易用性,几乎能和当前主流的 Spring Cloud, dubbo ,grpc ,nacos, zookeeper 做集成,如下图所示:

    img

    sentinel-features-overview-en.png

    1
    Sentinel

    的目标生态圈:

    img

    1
    sentinel

    一个强大的功能,就是它有一个流控管理控制台,你可以实时地监控每个服务的流控情况,并且可以实时编辑各种流控、熔断规则,有效地保证了服务保护的及时性。下图是内部试用的 sentinel 控制台:

    img另外,

    1
    sentinel

    还可以和

    1
    ctrip apollo

    分布式配置系统进行集成,将流控规降级等各种规则先配置在 apollo 中,然后服务启动自动加载流控规则。

参考资料

Tomcat 快速入门

🎁 版本说明

当前最新版本:Tomcat 8.5.24

环境要求:JDK7+

1. Tomcat 简介

1.1. Tomcat 是什么

Tomcat 是由 Apache 开发的一个 Servlet 容器,实现了对 Servlet 和 JSP 的支持,并提供了作为 Web 服务器的一些特有功能,如 Tomcat 管理和控制平台、安全域管理和 Tomcat 阀等。

由于 Tomcat 本身也内含了一个 HTTP 服务器,它也可以被视作一个单独的 Web 服务器。但是,不能将 Tomcat 和 Apache HTTP 服务器混淆,Apache HTTP 服务器是一个用 C 语言实现的 HTTP Web 服务器;这两个 HTTP web server 不是捆绑在一起的。Tomcat 包含了一个配置管理工具,也可以通过编辑 XML 格式的配置文件来进行配置。

1.2. Tomcat 重要目录

  • /bin - Tomcat 脚本存放目录(如启动、关闭脚本)。 *.sh 文件用于 Unix 系统; *.bat 文件用于 Windows 系统。
  • /conf - Tomcat 配置文件目录。
  • /logs - Tomcat 默认日志目录。
  • /webapps - webapp 运行的目录。

1.3. web 工程发布目录结构

一般 web 项目路径结构

1
2
3
4
5
6
7
8
9
10
11
12
|-- webapp                         # 站点根目录
|-- META-INF # META-INF 目录
| `-- MANIFEST.MF # 配置清单文件
|-- WEB-INF # WEB-INF 目录
| |-- classes # class文件目录
| | |-- *.class # 程序需要的 class 文件
| | `-- *.xml # 程序需要的 xml 文件
| |-- lib # 库文件夹
| | `-- *.jar # 程序需要的 jar 包
| `-- web.xml # Web应用程序的部署描述文件
|-- <userdir> # 自定义的目录
|-- <userfiles> # 自定义的资源文件
  • webapp:工程发布文件夹。其实每个 war 包都可以视为 webapp 的压缩包。

  • META-INF:META-INF 目录用于存放工程自身相关的一些信息,元文件信息,通常由开发工具,环境自动生成。

  • WEB-INF:Java web 应用的安全目录。所谓安全就是客户端无法访问,只有服务端可以访问的目录。

  • /WEB-INF/classes:存放程序所需要的所有 Java class 文件。

  • /WEB-INF/lib:存放程序所需要的所有 jar 文件。

  • /WEB-INF/web.xml:web 应用的部署配置文件。它是工程中最重要的配置文件,它描述了 servlet 和组成应用的其它组件,以及应用初始化参数、安全管理约束等。

1.4. Tomcat 功能

Tomcat 支持的 I/O 模型有:

  • NIO:非阻塞 I/O,采用 Java NIO 类库实现。
  • NIO2:异步 I/O,采用 JDK 7 最新的 NIO2 类库实现。
  • APR:采用 Apache 可移植运行库实现,是 C/C++ 编写的本地库。

Tomcat 支持的应用层协议有:

  • HTTP/1.1:这是大部分 Web 应用采用的访问协议。
  • AJP:用于和 Web 服务器集成(如 Apache)。
  • HTTP/2:HTTP 2.0 大幅度的提升了 Web 性能。

2. Tomcat 入门

2.1. 安装

前提条件

Tomcat 8.5 要求 JDK 版本为 1.7 以上。

进入 Tomcat 官方下载地址 选择合适版本下载,并解压到本地。

Windows

添加环境变量 CATALINA_HOME ,值为 Tomcat 的安装路径。

进入安装目录下的 bin 目录,运行 startup.bat 文件,启动 Tomcat

Linux / Unix

下面的示例以 8.5.24 版本为例,包含了下载、解压、启动操作。

1
2
3
4
5
# 下载解压到本地
wget http://mirrors.hust.edu.cn/apache/tomcat/tomcat-8/v8.5.24/bin/apache-tomcat-8.5.24.tar.gz
tar -zxf apache-tomcat-8.5.24.tar.gz
# 启动 Tomcat
./apache-tomcat-8.5.24/bin/startup.sh

启动后,访问 http://localhost:8080 ,可以看到 Tomcat 安装成功的测试页面。

img

2.2. 配置

本节将列举一些重要、常见的配置项。详细的 Tomcat8 配置可以参考 Tomcat 8 配置官方参考文档

2.2.1. Server

Server 元素表示整个 Catalina servlet 容器。

因此,它必须是 conf/server.xml 配置文件中的根元素。它的属性代表了整个 servlet 容器的特性。

属性表

属性 描述 备注
className 这个类必须实现 org.apache.catalina.Server 接口。 默认 org.apache.catalina.core.StandardServer
address 服务器等待关机命令的 TCP / IP 地址。如果没有指定地址,则使用 localhost。
port 服务器等待关机命令的 TCP / IP 端口号。设置为-1 以禁用关闭端口。
shutdown 必须通过 TCP / IP 连接接收到指定端口号的命令字符串,以关闭 Tomcat。

2.2.2. Service

Service 元素表示一个或多个连接器组件的组合,这些组件共享一个用于处理传入请求的引擎组件。Server 中可以有多个 Service。

属性表

属性 描述 备注
className 这个类必须实现org.apache.catalina.Service接口。 默认 org.apache.catalina.core.StandardService
name 此服务的显示名称,如果您使用标准 Catalina 组件,将包含在日志消息中。与特定服务器关联的每个服务的名称必须是唯一的。

实例 - conf/server.xml 配置文件示例

1
2
3
4
5
6
<?xml version="1.0" encoding="UTF-8"?>
<Server port="8080" shutdown="SHUTDOWN">
<Service name="xxx">
...
</Service>
</Server>

2.2.3. Executor

Executor 表示可以在 Tomcat 中的组件之间共享的线程池。

属性表

属性 描述 备注
className 这个类必须实现org.apache.catalina.Executor接口。 默认 org.apache.catalina.core.StandardThreadExecutor
name 线程池名称。 要求唯一, 供 Connector 元素的 executor 属性使用
namePrefix 线程名称前缀。
maxThreads 最大活跃线程数。 默认 200
minSpareThreads 最小活跃线程数。 默认 25
maxIdleTime 当前活跃线程大于 minSpareThreads 时,空闲线程关闭的等待最大时间。 默认 60000ms
maxQueueSize 线程池满情况下的请求排队大小。 默认 Integer.MAX_VALUE
1
2
3
<Service name="xxx">
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="300" minSpareThreads="25"/>
</Service>

2.2.4. Connector

Connector 代表连接组件。Tomcat 支持三种协议:HTTP/1.1、HTTP/2.0、AJP。

属性表

属性 说明 备注
asyncTimeout Servlet3.0 规范中的异步请求超时 默认 30s
port 请求连接的 TCP Port 设置为 0,则会随机选取一个未占用的端口号
protocol 协议. 一般情况下设置为 HTTP/1.1,这种情况下连接模型会在 NIO 和 APR/native 中自动根据配置选择
URIEncoding 对 URI 的编码方式. 如果设置系统变量 org.apache.catalina.STRICT_SERVLET_COMPLIANCE 为 true,使用 ISO-8859-1 编码;如果未设置此系统变量且未设置此属性, 使用 UTF-8 编码
useBodyEncodingForURI 是否采用指定的 contentType 而不是 URIEncoding 来编码 URI 中的请求参数

以下属性在标准的 Connector(NIO, NIO2 和 APR/native)中有效:

属性 说明 备注
acceptCount 当最大请求连接 maxConnections 满时的最大排队大小 默认 100,注意此属性和 Executor 中属性 maxQueueSize 的区别.这个指的是请求连接满时的堆栈大小,Executor 的 maxQueueSize 指的是处理线程满时的堆栈大小
connectionTimeout 请求连接超时 默认 60000ms
executor 指定配置的线程池名称
keepAliveTimeout keeAlive 超时时间 默认值为 connectionTimeout 配置值.-1 表示不超时
maxConnections 最大连接数 连接满时后续连接放入最大为 acceptCount 的队列中. 对 NIO 和 NIO2 连接,默认值为 10000;对 APR/native,默认值为 8192
maxThreads 如果指定了 Executor, 此属性忽略;否则为 Connector 创建的内部线程池最大值 默认 200
minSpareThreads 如果指定了 Executor, 此属性忽略;否则为 Connector 创建线程池的最小活跃线程数 默认 10
processorCache 协议处理器缓存 Processor 对象的大小 -1 表示不限制.当不使用 servlet3.0 的异步处理情况下: 如果配置 Executor,配置为 Executor 的 maxThreads;否则配置为 Connnector 的 maxThreads. 如果使用 Serlvet3.0 异步处理, 取 maxThreads 和 maxConnections 的最大值

2.2.5. Context

Context 元素表示一个 Web 应用程序,它在特定的虚拟主机中运行。每个 Web 应用程序都基于 Web 应用程序存档(WAR)文件,或者包含相应的解包内容的相应目录,如 Servlet 规范中所述。

属性表

属性 说明 备注
altDDName web.xml 部署描述符路径 默认 /WEB-INF/web.xml
docBase Context 的 Root 路径 和 Host 的 appBase 相结合, 可确定 web 应用的实际目录
failCtxIfServletStartFails 同 Host 中的 failCtxIfServletStartFails, 只对当前 Context 有效 默认为 false
logEffectiveWebXml 是否日志打印 web.xml 内容(web.xml 由默认的 web.xml 和应用中的 web.xml 组成) 默认为 false
path web 应用的 context path 如果为根路径,则配置为空字符串(“”), 不能不配置
privileged 是否使用 Tomcat 提供的 manager servlet
reloadable /WEB-INF/classes/ 和/WEB-INF/lib/ 目录中 class 文件发生变化是否自动重新加载 默认为 false
swallowOutput true 情况下, System.out 和 System.err 输出将被定向到 web 应用日志中 默认为 false

2.2.6. Engine

Engine 元素表示与特定的 Catalina 服务相关联的整个请求处理机器。它接收并处理来自一个或多个连接器的所有请求,并将完成的响应返回给连接器,以便最终传输回客户端。

属性表

属性 描述 备注
defaultHost 默认主机名,用于标识将处理指向此服务器上主机名称但未在此配置文件中配置的请求的主机。 这个名字必须匹配其中一个嵌套的主机元素的名字属性。
name 此引擎的逻辑名称,用于日志和错误消息。 在同一服务器中使用多个服务元素时,每个引擎必须分配一个唯一的名称。

2.2.7. Host

Host 元素表示一个虚拟主机,它是一个服务器的网络名称(如“www.mycompany.com”)与运行 Tomcat 的特定服务器的关联。

属性表

属性 说明 备注
name 名称 用于日志输出
appBase 虚拟主机对应的应用基础路径 可以是个绝对路径, 或${CATALINA_BASE}相对路径
xmlBase 虚拟主机 XML 基础路径,里面应该有 Context xml 配置文件 可以是个绝对路径, 或${CATALINA_BASE}相对路径
createDirs 当 appBase 和 xmlBase 不存在时,是否创建目录 默认为 true
autoDeploy 是否周期性的检查 appBase 和 xmlBase 并 deploy web 应用和 context 描述符 默认为 true
deployIgnore 忽略 deploy 的正则
deployOnStartup Tomcat 启动时是否自动 deploy 默认为 true
failCtxIfServletStartFails 配置为 true 情况下,任何 load-on-startup >=0 的 servlet 启动失败,则其对应的 Contxt 也启动失败 默认为 false

2.2.8. Cluster

由于在实际开发中,我从未用过 Tomcat 集群配置,所以没研究。

2.3. 启动

2.3.1. 部署方式

这种方式要求本地必须安装 Tomcat 。

将打包好的 war 包放在 Tomcat 安装目录下的 webapps 目录下,然后在 bin 目录下执行 startup.batstartup.sh ,Tomcat 会自动解压 webapps 目录下的 war 包。

成功后,可以访问 http://localhost:8080/xxx (xxx 是 war 包文件名)。

注意

以上步骤是最简单的示例。步骤中的 war 包解压路径、启动端口以及一些更多的功能都可以修改配置文件来定制 (主要是 server.xmlcontext.xml 文件)。

2.3.2. 嵌入式

2.3.2.1. API 方式

在 pom.xml 中添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.24</version>
</dependency>

添加 SimpleEmbedTomcatServer.java 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Optional;
import org.apache.catalina.startup.Tomcat;

public class SimpleTomcatServer {
private static final int PORT = 8080;
private static final String CONTEXT_PATH = "/javatool-server";

public static void main(String[] args) throws Exception {
// 设定 profile
Optional<String> profile = Optional.ofNullable(System.getProperty("spring.profiles.active"));
System.setProperty("spring.profiles.active", profile.orElse("develop"));

Tomcat tomcat = new Tomcat();
tomcat.setPort(PORT);
tomcat.getHost().setAppBase(".");
tomcat.addWebapp(CONTEXT_PATH, getAbsolutePath() + "src/main/webapp");
tomcat.start();
tomcat.getServer().await();
}

private static String getAbsolutePath() {
String path = null;
String folderPath = SimpleEmbedTomcatServer.class.getProtectionDomain().getCodeSource().getLocation().getPath()
.substring(1);
if (folderPath.indexOf("target") > 0) {
path = folderPath.substring(0, folderPath.indexOf("target"));
}
return path;
}
}

成功后,可以访问 http://localhost:8080/javatool-server

说明

本示例是使用 org.apache.tomcat.embed 启动嵌入式 Tomcat 的最简示例。

这个示例中使用的是 Tomcat 默认的配置,但通常,我们需要对 Tomcat 配置进行一些定制和调优。为了加载配置文件,启动类就要稍微再复杂一些。这里不想再贴代码,有兴趣的同学可以参考:

示例项目

2.3.2.2. 使用 maven 插件启动(不推荐)

不推荐理由:这种方式启动 maven 虽然最简单,但是有一个很大的问题是,真的很久很久没发布新版本了(最新版本发布时间:2013-11-11)。且貌似只能找到 Tomcat6 、Tomcat7 插件。

使用方法

在 pom.xml 中引入插件

1
2
3
4
5
6
7
8
9
10
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<port>8080</port>
<path>/${project.artifactId}</path>
<uriEncoding>UTF-8</uriEncoding>
</configuration>
</plugin>

运行 mvn tomcat7:run 命令,启动 Tomcat。

成功后,可以访问 http://localhost:8080/xxx (xxx 是 ${project.artifactId} 指定的项目名)。

2.3.3. IDE 插件

常见 Java IDE 一般都有对 Tomcat 的支持。

以 Intellij IDEA 为例,提供了 Tomcat and TomEE Integration 插件(一般默认会安装)。

使用步骤

  • 点击 Run/Debug Configurations > New Tomcat Server > local ,打开 Tomcat 配置页面。
  • 点击 Confiure… 按钮,设置 Tomcat 安装路径。
  • 点击 Deployment 标签页,设置要启动的应用。
  • 设置启动应用的端口、JVM 参数、启动浏览器等。
  • 成功后,可以访问 http://localhost:8080/(当然,你也可以在 url 中设置上下文名称)。

img

说明

个人认为这个插件不如 Eclipse 的 Tomcat 插件好用,Eclipse 的 Tomcat 插件支持对 Tomcat xml 配置文件进行配置。而这里,你只能自己去 Tomcat 安装路径下修改配置文件。

文中的嵌入式启动示例可以参考我的示例项目

3. Tomcat 架构

img

Tomcat 要实现 2 个核心功能:

  • 处理 Socket 连接,负责网络字节流与 Request 和 Response 对象的转化。
  • 加载和管理 Servlet,以及处理具体的 Request 请求

为此,Tomcat 设计了两个核心组件:

  • 连接器(Connector):负责和外部通信
  • 容器(Container):负责内部处理

3.1. Service

Tomcat 支持的 I/O 模型有:

  • NIO:非阻塞 I/O,采用 Java NIO 类库实现。
  • NIO2:异步 I/O,采用 JDK 7 最新的 NIO2 类库实现。
  • APR:采用 Apache 可移植运行库实现,是 C/C++ 编写的本地库。

Tomcat 支持的应用层协议有:

  • HTTP/1.1:这是大部分 Web 应用采用的访问协议。
  • AJP:用于和 Web 服务器集成(如 Apache)。
  • HTTP/2:HTTP 2.0 大幅度的提升了 Web 性能。

Tomcat 支持多种 I/O 模型和应用层协议。为了实现这点,一个容器可能对接多个连接器。但是,单独的连接器或容器都不能对外提供服务,需要把它们组装起来才能工作,组装后这个整体叫作 Service 组件。Tomcat 内可能有多个 Service,通过在 Tomcat 中配置多个 Service,可以实现通过不同的端口号来访问同一台机器上部署的不同应用。

img

一个 Tomcat 实例有一个或多个 Service;一个 Service 有多个 Connector 和 Container。Connector 和 Container 之间通过标准的 ServletRequest 和 ServletResponse 通信。

3.2. 连接器

连接器对 Servlet 容器屏蔽了协议及 I/O 模型等的区别,无论是 HTTP 还是 AJP,在容器中获取到的都是一个标准的 ServletRequest 对象。

连接器的主要功能是:

  • 网络通信
  • 应用层协议解析
  • Tomcat Request/Response 与 ServletRequest/ServletResponse 的转化

Tomcat 设计了 3 个组件来实现这 3 个功能,分别是 EndPointProcessor 和 **Adapter**。

img

组件间通过抽象接口交互。这样做还有一个好处是封装变化。这是面向对象设计的精髓,将系统中经常变化的部分和稳定的部分隔离,有助于增加复用性,并降低系统耦合度。网络通信的 I/O 模型是变化的,可能是非阻塞 I/O、异步 I/O 或者 APR。应用层协议也是变化的,可能是 HTTP、HTTPS、AJP。浏览器端发送的请求信息也是变化的。但是整体的处理逻辑是不变的,EndPoint 负责提供字节流给 Processor,Processor 负责提供 Tomcat Request 对象给 Adapter,Adapter 负责提供 ServletRequest 对象给容器。

如果要支持新的 I/O 方案、新的应用层协议,只需要实现相关的具体子类,上层通用的处理逻辑是不变的。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。

img

3.2.1. ProtocolHandler 组件

连接器用 ProtocolHandler 接口来封装通信协议和 I/O 模型的差异。ProtocolHandler 内部又分为 EndPoint 和 Processor 模块,EndPoint 负责底层 Socket 通信,Proccesor 负责应用层协议解析。

3.2.1.1. EndPoint

EndPoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 EndPoint 是用来实现 TCP/IP 协议的。

EndPoint 是一个接口,对应的抽象实现类是 AbstractEndpoint,而 AbstractEndpoint 的具体子类,比如在 NioEndpoint 和 Nio2Endpoint 中,有两个重要的子组件:Acceptor 和 SocketProcessor。

其中 Acceptor 用于监听 Socket 连接请求。SocketProcessor 用于处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理。为了提高处理能力,SocketProcessor 被提交到线程池来执行。而这个线程池叫作执行器(Executor)。

3.2.1.2. Processor

如果说 EndPoint 是用来实现 TCP/IP 协议的,那么 Processor 用来实现 HTTP 协议,Processor 接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象,并通过 Adapter 将其提交到容器处理,Processor 是对应用层协议的抽象。

Processor 是一个接口,定义了请求的处理等方法。它的抽象实现类 AbstractProcessor 对一些协议共有的属性进行封装,没有对方法进行实现。具体的实现有 AJPProcessor、HTTP11Processor 等,这些具体实现类实现了特定协议的解析方法和请求处理方式。

img

从图中我们看到,EndPoint 接收到 Socket 连接后,生成一个 SocketProcessor 任务提交到线程池去处理,SocketProcessor 的 Run 方法会调用 Processor 组件去解析应用层协议,Processor 通过解析生成 Request 对象后,会调用 Adapter 的 Service 方法。

3.2.2. Adapter

连接器通过适配器 Adapter 调用容器

由于协议不同,客户端发过来的请求信息也不尽相同,Tomcat 定义了自己的 Request 类来适配这些请求信息。

ProtocolHandler 接口负责解析请求并生成 Tomcat Request 类。但是这个 Request 对象不是标准的 ServletRequest,也就意味着,不能用 Tomcat Request 作为参数来调用容器。Tomcat 的解决方案是引入 CoyoteAdapter,这是适配器模式的经典运用,连接器调用 CoyoteAdapter 的 Sevice 方法,传入的是 Tomcat Request 对象,CoyoteAdapter 负责将 Tomcat Request 转成 ServletRequest,再调用容器的 Service 方法。

3.3. 容器

Tomcat 设计了 4 种容器,分别是 Engine、Host、Context 和 Wrapper。

  • Engine - Servlet 的顶层容器,包含一 个或多个 Host 子容器;
  • Host - 虚拟主机,负责 web 应用的部署和 Context 的创建;
  • Context - Web 应用上下文,包含多个 Wrapper,负责 web 配置的解析、管理所有的 Web 资源;
  • Wrapper - 最底层的容器,是对 Servlet 的封装,负责 Servlet 实例的创 建、执行和销毁。

3.3.1. 请求分发 Servlet 过程

Tomcat 是怎么确定请求是由哪个 Wrapper 容器里的 Servlet 来处理的呢?答案是,Tomcat 是用 Mapper 组件来完成这个任务的。

举例来说,假如有一个网购系统,有面向网站管理人员的后台管理系统,还有面向终端客户的在线购物系统。这两个系统跑在同一个 Tomcat 上,为了隔离它们的访问域名,配置了两个虚拟域名:manage.shopping.comuser.shopping.com,网站管理人员通过manage.shopping.com域名访问 Tomcat 去管理用户和商品,而用户管理和商品管理是两个单独的 Web 应用。终端客户通过user.shopping.com域名去搜索商品和下订单,搜索功能和订单管理也是两个独立的 Web 应用。如下所示,演示了 url 应声 Servlet 的处理流程。

img

假如有用户访问一个 URL,比如图中的http://user.shopping.com:8080/order/buy,Tomcat 如何将这个 URL 定位到一个 Servlet 呢?

  1. 首先,根据协议和端口号选定 Service 和 Engine。
  2. 然后,根据域名选定 Host。
  3. 之后,根据 URL 路径找到 Context 组件。
  4. 最后,根据 URL 路径找到 Wrapper(Servlet)。

这个路由分发过程具体是怎么实现的呢?答案是使用 Pipeline-Valve 管道。

3.3.2. Pipeline-Value

Pipeline 可以理解为现实中的管道,Valve 为管道中的阀门,Request 和 Response 对象在管道中经过各个阀门的处理和控制。

Pipeline-Valve 是责任链模式,责任链模式是指在一个请求处理的过程中有很多处理者依次对请求进行处理,每个处理者负责做自己相应的处理,处理完之后将再调用下一个处理者继续处理。Valve 表示一个处理点,比如权限认证和记录日志。

先来了解一下 Valve 和 Pipeline 接口的设计:

img

  • 每一个容器都有一个 Pipeline 对象,只要触发这个 Pipeline 的第一个 Valve,这个容器里 Pipeline 中的 Valve 就都会被调用到。但是,不同容器的 Pipeline 是怎么链式触发的呢,比如 Engine 中 Pipeline 需要调用下层容器 Host 中的 Pipeline。
  • 这是因为 Pipeline 中还有个 getBasic 方法。这个 BasicValve 处于 Valve 链表的末端,它是 Pipeline 中必不可少的一个 Valve,负责调用下层容器的 Pipeline 里的第一个 Valve。
  • Pipeline 中有 addValve 方法。Pipeline 中维护了 Valve 链表,Valve 可以插入到 Pipeline 中,对请求做某些处理。我们还发现 Pipeline 中没有 invoke 方法,因为整个调用链的触发是 Valve 来完成的,Valve 完成自己的处理后,调用 getNext.invoke() 来触发下一个 Valve 调用。
  • Valve 中主要的三个方法:setNextgetNextinvoke。Valve 之间的关系是单向链式结构,本身 invoke 方法中会调用下一个 Valve 的 invoke 方法。
  • 各层容器对应的 basic valve 分别是 StandardEngineValveStandardHostValveStandardContextValveStandardWrapperValve
  • 由于 Valve 是一个处理点,因此 invoke 方法就是来处理请求的。注意到 Valve 中有 getNext 和 setNext 方法,因此我们大概可以猜到有一个链表将 Valve 链起来了。

img

整个调用过程由连接器中的 Adapter 触发的,它会调用 Engine 的第一个 Valve:

1
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

4. Tomcat 生命周期

4.1. Tomcat 的启动过程

img

  1. Tomcat 是一个 Java 程序,它的运行从执行 startup.sh 脚本开始。startup.sh 会启动一个 JVM 来运行 Tomcat 的启动类 Bootstrap
  2. Bootstrap 会初始化 Tomcat 的类加载器并实例化 Catalina
  3. Catalina 会通过 Digester 解析 server.xml,根据其中的配置信息来创建相应组件,并调用 Serverstart 方法。
  4. Server 负责管理 Service 组件,它会调用 Servicestart 方法。
  5. Service 负责管理 Connector 和顶层容器 Engine,它会调用 ConnectorEnginestart 方法。

4.1.1. Catalina 组件

Catalina 的职责就是解析 server.xml 配置,并据此实例化 Server。接下来,调用 Server 组件的 init 方法和 start 方法,将 Tomcat 启动起来。

Catalina 还需要处理各种“异常”情况,比如当我们通过“Ctrl + C”关闭 Tomcat 时,Tomcat 将如何优雅的停止并且清理资源呢?因此 Catalina 在 JVM 中注册一个“关闭钩子”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void start() {
//1. 如果持有的 Server 实例为空,就解析 server.xml 创建出来
if (getServer() == null) {
load();
}

//2. 如果创建失败,报错退出
if (getServer() == null) {
log.fatal(sm.getString("catalina.noServer"));
return;
}

//3. 启动 Server
try {
getServer().start();
} catch (LifecycleException e) {
return;
}

// 创建并注册关闭钩子
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
}

// 用 await 方法监听停止请求
if (await) {
await();
stop();
}
}

为什么需要关闭钩子?

如果我们需要在 JVM 关闭时做一些清理工作,比如将缓存数据刷到磁盘上,或者清理一些临时文件,可以向 JVM 注册一个“关闭钩子”。“关闭钩子”其实就是一个线程,JVM 在停止之前会尝试执行这个线程的 run 方法。

Tomcat 的“关闭钩子”—— CatalinaShutdownHook 做了些什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
protected class CatalinaShutdownHook extends Thread {

@Override
public void run() {
try {
if (getServer() != null) {
Catalina.this.stop();
}
} catch (Throwable ex) {
...
}
}
}

Tomcat 的“关闭钩子”实际上就执行了 Serverstop 方法,Serverstop 方法会释放和清理所有的资源。

4.1.2. Server 组件

Server 组件的具体实现类是 StandardServer,Server 继承了 LifeCycleBase,它的生命周期被统一管理,并且它的子组件是 Service,因此它还需要管理 Service 的生命周期,也就是说在启动时调用 Service 组件的启动方法,在停止时调用它们的停止方法。Server 在内部维护了若干 Service 组件,它是以数组来保存的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void addService(Service service) {

service.setServer(this);

synchronized (servicesLock) {
// 创建一个长度 +1 的新数组
Service results[] = new Service[services.length + 1];

// 将老的数据复制过去
System.arraycopy(services, 0, results, 0, services.length);
results[services.length] = service;
services = results;

// 启动 Service 组件
if (getState().isAvailable()) {
try {
service.start();
} catch (LifecycleException e) {
// Ignore
}
}

// 触发监听事件
support.firePropertyChange("service", null, service);
}

}

Server 并没有一开始就分配一个很长的数组,而是在添加的过程中动态地扩展数组长度,当添加一个新的 Service 实例时,会创建一个新数组并把原来数组内容复制到新数组,这样做的目的其实是为了节省内存空间。

除此之外,Server 组件还有一个重要的任务是启动一个 Socket 来监听停止端口,这就是为什么你能通过 shutdown 命令来关闭 Tomcat。不知道你留意到没有,上面 Caralina 的启动方法的最后一行代码就是调用了 Server 的 await 方法。

在 await 方法里会创建一个 Socket 监听 8005 端口,并在一个死循环里接收 Socket 上的连接请求,如果有新的连接到来就建立连接,然后从 Socket 中读取数据;如果读到的数据是停止命令“SHUTDOWN”,就退出循环,进入 stop 流程。

4.1.3. Service 组件

Service 组件的具体实现类是 StandardService。

【源码】StandardService 源码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class StandardService extends LifecycleBase implements Service {
// 名字
private String name = null;

//Server 实例
private Server server = null;

// 连接器数组
protected Connector connectors[] = new Connector[0];
private final Object connectorsLock = new Object();

// 对应的 Engine 容器
private Engine engine = null;

// 映射器及其监听器
protected final Mapper mapper = new Mapper();
protected final MapperListener mapperListener = new MapperListener(this);

// ...
}

StandardService 继承了 LifecycleBase 抽象类。

StandardService 维护了一个 MapperListener 用于支持 Tomcat 热部署。当 Web 应用的部署发生变化时,Mapper 中的映射信息也要跟着变化,MapperListener 就是一个监听器,它监听容器的变化,并把信息更新到 Mapper 中,这是典型的观察者模式。

作为“管理”角色的组件,最重要的是维护其他组件的生命周期。此外在启动各种组件时,要注意它们的依赖关系,也就是说,要注意启动的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void startInternal() throws LifecycleException {

//1. 触发启动监听器
setState(LifecycleState.STARTING);

//2. 先启动 Engine,Engine 会启动它子容器
if (engine != null) {
synchronized (engine) {
engine.start();
}
}

//3. 再启动 Mapper 监听器
mapperListener.start();

//4. 最后启动连接器,连接器会启动它子组件,比如 Endpoint
synchronized (connectorsLock) {
for (Connector connector: connectors) {
if (connector.getState() != LifecycleState.FAILED) {
connector.start();
}
}
}
}

从启动方法可以看到,Service 先启动了 Engine 组件,再启动 Mapper 监听器,最后才是启动连接器。这很好理解,因为内层组件启动好了才能对外提供服务,才能启动外层的连接器组件。而 Mapper 也依赖容器组件,容器组件启动好了才能监听它们的变化,因此 Mapper 和 MapperListener 在容器组件之后启动。组件停止的顺序跟启动顺序正好相反的,也是基于它们的依赖关系。

4.1.4. Engine 组件

Engine 本质是一个容器,因此它继承了 ContainerBase 基类,并且实现了 Engine 接口。

4.2. Web 应用的部署方式

注:catalina.home:安装目录;catalina.base:工作目录;默认值 user.dir

  • Server.xml 配置 Host 元素,指定 appBase 属性,默认$catalina.base/webapps/
  • Server.xml 配置 Context 元素,指定 docBase,元素,指定 web 应用的路径
  • 自定义配置:在$catalina.base/EngineName/HostName/XXX.xml 配置 Context 元素

HostConfig 监听了 StandardHost 容器的事件,在 start 方法中解析上述配置文件:

  • 扫描 appbase 路径下的所有文件夹和 war 包,解析各个应用的 META-INF/context.xml,并 创建 StandardContext,并将 Context 加入到 Host 的子容器中。
  • 解析$catalina.base/EngineName/HostName/下的所有 Context 配置,找到相应 web 应 用的位置,解析各个应用的 META-INF/context.xml,并创建 StandardContext,并将 Context 加入到 Host 的子容器中。

注:

  • HostConfig 并没有实际解析 Context.xml,而是在 ContextConfig 中进行的。
  • HostConfig 中会定期检查 watched 资源文件(context.xml 配置文件)

ContextConfig 解析 context.xml 顺序:

  • 先解析全局的配置 config/context.xml
  • 然后解析 Host 的默认配置 EngineName/HostName/context.xml.default
  • 最后解析应用的 META-INF/context.xml

ContextConfig 解析 web.xml 顺序:

  • 先解析全局的配置 config/web.xml
  • 然后解析 Host 的默认配置 EngineName/HostName/web.xml.default 接着解析应用的 MEB-INF/web.xml
  • 扫描应用 WEB-INF/lib/下的 jar 文件,解析其中的 META-INF/web-fragment.xml 最后合并 xml 封装成 WebXml,并设置 Context

注:

  • 扫描 web 应用和 jar 中的注解(Filter、Listener、Servlet)就是上述步骤中进行的。
  • 容器的定期执行:backgroundProcess,由 ContainerBase 来实现的,并且只有在顶层容器 中才会开启线程。(backgroundProcessorDelay=10 标志位来控制)

4.3. LifeCycle

img

4.3.1. 请求处理过程

  1. 根据 server.xml 配置的指定的 connector 以及端口监听 http、或者 ajp 请求
  2. 请求到来时建立连接,解析请求参数,创建 Request 和 Response 对象,调用顶层容器 pipeline 的 invoke 方法
  3. 容器之间层层调用,最终调用业务 servlet 的 service 方法
  4. Connector 将 response 流中的数据写到 socket 中

4.4. Connector 流程

4.4.1. 阻塞 IO

4.4.2. 非阻塞 IO

4.4.3. IO 多路复用

阻塞与非阻塞的区别在于进行读操作和写操作的系统调用时,如果此时内核态没有数据可读或者没有缓冲空间可写时,是否阻塞。

IO 多路复用的好处在于可同时监听多个 socket 的可读和可写事件,这样就能使得应用可以同时监听多个 socket,释放了应用线程资源。

4.4.4. Tomcat 各类 Connector 对比

  • JIO:用 java.io 编写的 TCP 模块,阻塞 IO
  • NIO:用 java.nio 编写的 TCP 模块,非阻塞 IO,(IO 多路复用)
  • APR:全称 Apache Portable Runtime,使用 JNI 的方式来进行读取文件以及进行网络传输

Apache Portable Runtime 是一个高度可移植的库,它是 Apache HTTP Server 2.x 的核心。 APR 具有许多用途,包括访问高级 IO 功能(如 sendfile,epoll 和 OpenSSL),操作系统级功能(随机数生成,系统状态等)和本地进程处理(共享内存,NT 管道和 Unix 套接字)。

表格中字段含义说明:

  • Support Polling - 是否支持基于 IO 多路复用的 socket 事件轮询
  • Polling Size - 轮询的最大连接数
  • Wait for next Request - 在等待下一个请求时,处理线程是否释放,BIO 是没有释放的,所以在 keep-alive=true 的情况下处理的并发连接数有限
  • Read Request Headers - 由于 request header 数据较少,可以由容器提前解析完毕,不需要阻塞
  • Read Request Body - 读取 request body 的数据是应用业务逻辑的事情,同时 Servlet 的限制,是需要阻塞读取的
  • Write Response - 跟读取 request body 的逻辑类似,同样需要阻塞写

NIO 处理相关类

Poller 线程从 EventQueue 获取 PollerEvent,并执行 PollerEvent 的 run 方法,调用 Selector 的 select 方法,如果有可读的 Socket 则创建 Http11NioProcessor,放入到线程池中执行;

CoyoteAdapter 是 Connector 到 Container 的适配器,Http11NioProcessor 调用其提供的 service 方法,内部创建 Request 和 Response 对象,并调用最顶层容器的 Pipeline 中的第一个 Valve 的 invoke 方法

Mapper 主要处理 http url 到 servlet 的映射规则的解析,对外提供 map 方法

4.5. Comet

Comet 是一种用于 web 的推送技术,能使服务器实时地将更新的信息传送到客户端,而无须客户端发出请求
在 WebSocket 出来之前,如果不适用 comet,只能通过浏览器端轮询 Server 来模拟实现服务器端推送。
Comet 支持 servlet 异步处理 IO,当连接上数据可读时触发事件,并异步写数据(阻塞)

Tomcat 要实现 Comet,只需继承 HttpServlet 同时,实现 CometProcessor 接口

  • Begin:新的请求连接接入调用,可进行与 Request 和 Response 相关的对象初始化操作,并保存 response 对象,用于后续写入数据
  • Read:请求连接有数据可读时调用
  • End:当数据可用时,如果读取到文件结束或者 response 被关闭时则被调用
  • Error:在连接上发生异常时调用,数据读取异常、连接断开、处理异常、socket 超时

Note:

  • Read:在 post 请求有数据,但在 begin 事件中没有处理,则会调用 read,如果 read 没有读取数据,在会触发 Error 回调,关闭 socket
  • End:当 socket 超时,并且 response 被关闭时也会调用;server 被关闭时调用
  • Error:除了 socket 超时不会关闭 socket,其他都会关闭 socket
  • End 和 Error 时间触发时应关闭当前 comet 会话,即调用 CometEvent 的 close 方法
    Note:在事件触发时要做好线程安全的操作

4.6. 异步 Servlet

传统流程:

  • 首先,Servlet 接收到请求之后,request 数据解析;
  • 接着,调用业务接口的某些方法,以完成业务处理;
  • 最后,根据处理的结果提交响应,Servlet 线程结束

异步处理流程:

  • 客户端发送一个请求
  • Servlet 容器分配一个线程来处理容器中的一个 servlet
  • servlet 调用 request.startAsync(),保存 AsyncContext, 然后返回
  • 任何方式存在的容器线程都将退出,但是 response 仍然保持开放
  • 业务线程使用保存的 AsyncContext 来完成响应(线程池)
  • 客户端收到响应

Servlet 线程将请求转交给一个异步线程来执行业务处理,线程本身返回至容器,此时 Servlet 还没有生成响应数据,异步线程处理完业务以后,可以直接生成响应数据(异步线程拥有 ServletRequest 和 ServletResponse 对象的引用)

为什么 web 应用中支持异步?

推出异步,主要是针对那些比较耗时的请求:比如一次缓慢的数据库查询,一次外部 REST API 调用, 或者是其他一些 I/O 密集型操作。这种耗时的请求会很快的耗光 Servlet 容器的线程池,继而影响可扩展性。

Note:从客户端的角度来看,request 仍然像任何其他的 HTTP 的 request-response 交互一样,只是耗费了更长的时间而已

异步事件监听

  • onStartAsync:Request 调用 startAsync 方法时触发
  • onComplete:syncContext 调用 complete 方法时触发
  • onError:处理请求的过程出现异常时触发
  • onTimeout:socket 超时触发

Note :
onError/ onTimeout 触发后,会紧接着回调 onComplete
onComplete 执行后,就不可再操作 request 和 response

5. 参考资料

Tomcat 连接器

1. NioEndpoint 组件

Tomcat 的 NioEndPoint 组件利用 Java NIO 实现了 I/O 多路复用模型。

img

NioEndPoint 子组件功能简介:

  • LimitLatch 是连接控制器,负责控制最大连接数。NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝。
  • Acceptor 负责监听连接请求。Acceptor 运行在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。
  • Poller 的本质是一个 Selector,也运行在单独线程里。Poller 内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。
  • Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。我们知道,Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

NioEndpoint 如何实现高并发的呢?

要实现高并发需要合理设计线程模型充分利用 CPU 资源,尽量不要让线程阻塞;另外,就是有多少任务,就用相应规模的线程数去处理。

NioEndpoint 要完成三件事情:接收连接、检测 I/O 事件以及处理请求,那么最核心的就是把这三件事情分开,用不同规模的线程去处理,比如用专门的线程组去跑 Acceptor,并且 Acceptor 的个数可以配置;用专门的线程组去跑 Poller,Poller 的个数也可以配置;最后具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小。

1.1. LimitLatch

LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {

@Override
protected int tryAcquireShared() {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}

@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}

private final Sync sync;
private final AtomicLong count;
private volatile long limit;

// 线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}

LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。我们可以扩展它来实现自己的同步器,实际上 Java 并发包里的锁和条件变量等等都是通过 AQS 来实现的,而这里的 LimitLatch 也不例外。

理解源码要点:

  • 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。
  • 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的releaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

1.2. Acceptor

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。

1
2
3
serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);
  • bind 方法的第二个参数表示操作系统的等待队列长度,我在上面提到,当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。
  • ServerSocketChannel 被设置成阻塞模式,也就是说它是以阻塞的方式接收连接的。ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的生产者 - 消费者模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

1.3. Poller

Poller 本质是一个 Selector,它内部维护一个 Queue

1
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的核心方法都使用了 Synchronized 关键字进行修饰,用来保证同一时刻只有一个线程进行读写。

使用 SynchronizedQueue,意味着同一时刻只有一个 Acceptor 线程对队列进行读写;同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的队列。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel

1.4. SocketProcessor

我们知道,Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,这里请你注意:

Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannelSocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapperHttp11Processor 只调用 SocketWrapper 的方法去读写数据。

2. Nio2Endpoint 组件

Nio2Endpoint 工作流程跟 NioEndpoint 较为相似。

img

Nio2Endpoint 子组件功能说明:

  • LimitLatch 是连接控制器,它负责控制最大连接数。
  • Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannelNio2AcceptorAsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。
  • Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。

Nio2Endpoint 跟 NioEndpoint 的一个明显不同点是,Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。

2.1. Nio2Acceptor

NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数。

但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断的调 accept 方法,而是通过回调函数来完成的。我们来看看它的连接监听方法:

1
serverSock.accept(null, this);

其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。那么它是如何实现 CompletionHandler 接口的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
implements CompletionHandler<AsynchronousSocketChannel, Void> {

@Override
public void completed(AsynchronousSocketChannel socket,
Void attachment) {

if (isRunning() && !isPaused()) {
if (getMaxConnections() == -1) {
// 如果没有连接限制,继续接收新的连接
serverSock.accept(null, this);
} else {
// 如果有连接限制,就在线程池里跑 Run 方法,Run 方法会检查连接数
getExecutor().execute(this);
}
// 处理请求
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
}
}
}

可以看到 CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,我在前面说过第一个参数就是 accept 方法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。completed 方法的处理逻辑比较简单:

  • 如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。
  • 如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方法可能被阻塞,会导致这个回调方法一直不返回。

接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapperSocketProcessor,并交给线程池处理。

2.2. Nio2SocketWrapper

Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。讲到这里你是不是有个疑问:Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据, 怎么办呢?这个请求的处理不就失败了吗?

为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

  • 第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着 SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。这里请注意,Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状态。
  • 第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用层的 Buffer。

这个回调类 readCompletionHandler 的源码如下,最关键的一点是,**Nio2SocketWrapper 是作为附件类来传递的**,这样在回调函数里能拿到所有的上下文。

1
2
3
4
5
6
7
8
9
10
11
this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
...
// 通过附件类 SocketWrapper 拿到所有的上下文
Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
}

public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
...
}
}

3. AprEndpoint 组件

我们在使用 Tomcat 时,可能会在启动日志里看到这样的提示信息:

The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: ***

这句话的意思就是推荐你去安装 APR 库,可以提高系统性能。

APR(Apache Portable Runtime Libraries)是 Apache 可移植运行时库,它是用 C 语言实现的,其目的是向上层应用程序提供一个跨平台的操作系统接口库。Tomcat 可以用它来处理包括文件和网络 I/O,从而提升性能。Tomcat 支持的连接器有 NIO、NIO.2 和 APR。跟 NioEndpoint 一样,AprEndpoint 也实现了非阻塞 I/O,它们的区别是:NioEndpoint 通过调用 Java 的 NIO API 来实现非阻塞 I/O,而 AprEndpoint 是通过 JNI 调用 APR 本地库而实现非阻塞 I/O 的。

同样是非阻塞 I/O,为什么 Tomcat 会提示使用 APR 本地库的性能会更好呢?这是因为在某些场景下,比如需要频繁与操作系统进行交互,Socket 网络通信就是这样一个场景,特别是如果你的 Web 应用使用了 TLS 来加密传输,我们知道 TLS 协议在握手过程中有多次网络交互,在这种情况下 Java 跟 C 语言程序相比还是有一定的差距,而这正是 APR 的强项。

Tomcat 本身是 Java 编写的,为了调用 C 语言编写的 APR,需要通过 JNI 方式来调用。JNI(Java Native Interface) 是 JDK 提供的一个编程接口,它允许 Java 程序调用其他语言编写的程序或者代码库,其实 JDK 本身的实现也大量用到 JNI 技术来调用本地 C 程序库。

3.1. AprEndpoint 工作流程

img

3.1.1. Acceptor

Accpetor 的功能就是监听连接,接收并建立连接。它的本质就是调用了四个操作系统 API:socket、bind、listen 和 accept。那 Java 语言如何直接调用 C 语言 API 呢?答案就是通过 JNI。具体来说就是两步:先封装一个 Java 类,在里面定义一堆用native 关键字修饰的方法,像下面这样。

1
2
3
4
5
6
7
8
9
10
11
12
public class Socket {
...
// 用 native 修饰这个方法,表明这个函数是 C 语言实现
public static native long create(int family, int type,
int protocol, long cont)

public static native int bind(long sock, long sa);

public static native int listen(long sock, int backlog);

public static native long accept(long sock)
}

接着用 C 代码实现这些方法,比如 bind 函数就是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
// 注意函数的名字要符合 JNI 规范的要求
JNIEXPORT jint JNICALL
Java_org_apache_tomcat_jni_Socket_bind(JNIEnv *e, jlong sock,jlong sa)
{
jint rv = APR_SUCCESS;
tcn_socket_t *s = (tcn_socket_t *)sock;
apr_sockaddr_t *a = (apr_sockaddr_t *) sa;

// 调用 APR 库自己实现的 bind 函数
rv = (jint)apr_socket_bind(s->sock, a);
return rv;
}

专栏里我就不展开 JNI 的细节了,你可以扩展阅读获得更多信息和例子。我们要注意的是函数名字要符合 JNI 的规范,以及 Java 和 C 语言如何互相传递参数,比如在 C 语言有指针,Java 没有指针的概念,所以在 Java 中用 long 类型来表示指针。AprEndpoint 的 Acceptor 组件就是调用了 APR 实现的四个 API。

3.1.2. Poller

Acceptor 接收到一个新的 Socket 连接后,按照 NioEndpoint 的实现,它会把这个 Socket 交给 Poller 去查询 I/O 事件。AprEndpoint 也是这样做的,不过 AprEndpoint 的 Poller 并不是调用 Java NIO 里的 Selector 来查询 Socket 的状态,而是通过 JNI 调用 APR 中的 poll 方法,而 APR 又是调用了操作系统的 epoll API 来实现的。

这里有个特别的地方是在 AprEndpoint 中,我们可以配置一个叫deferAccept的参数,它对应的是 TCP 协议中的TCP_DEFER_ACCEPT,设置这个参数后,当 TCP 客户端有新的连接请求到达时,TCP 服务端先不建立连接,而是再等等,直到客户端有请求数据发过来时再建立连接。这样的好处是服务端不需要用 Selector 去反复查询请求数据是否就绪。

这是一种 TCP 协议层的优化,不是每个操作系统内核都支持,因为 Java 作为一种跨平台语言,需要屏蔽各种操作系统的差异,因此并没有把这个参数提供给用户;但是对于 APR 来说,它的目的就是尽可能提升性能,因此它向用户暴露了这个参数。

3.2. APR 提升性能的秘密

APR 连接器之所以能提高 Tomcat 的性能,除了 APR 本身是 C 程序库之外,还有哪些提速的秘密呢?

JVM 堆 VS 本地内存

我们知道 Java 的类实例一般在 JVM 堆上分配,而 Java 是通过 JNI 调用 C 代码来实现 Socket 通信的,那么 C 代码在运行过程中需要的内存又是从哪里分配的呢?C 代码能否直接操作 Java 堆?

为了回答这些问题,我先来说说 JVM 和用户进程的关系。如果你想运行一个 Java 类文件,可以用下面的 Java 命令来执行。

1
java my.class

这个命令行中的java其实是一个可执行程序,这个程序会创建 JVM 来加载和运行你的 Java 类。操作系统会创建一个进程来执行这个java可执行程序,而每个进程都有自己的虚拟地址空间,JVM 用到的内存(包括堆、栈和方法区)就是从进程的虚拟地址空间上分配的。请你注意的是,JVM 内存只是进程空间的一部分,除此之外进程空间内还有代码段、数据段、内存映射区、内核空间等。从 JVM 的角度看,JVM 内存之外的部分叫作本地内存,C 程序代码在运行过程中用到的内存就是本地内存中分配的。下面我们通过一张图来理解一下。

img

Tomcat 的 Endpoint 组件在接收网络数据时需要预先分配好一块 Buffer,所谓的 Buffer 就是字节数组byte[],Java 通过 JNI 调用把这块 Buffer 的地址传给 C 代码,C 代码通过操作系统 API 读取 Socket 并把数据填充到这块 Buffer。Java NIO API 提供了两种 Buffer 来接收数据:HeapByteBuffer 和 DirectByteBuffer,下面的代码演示了如何创建两种 Buffer。

1
2
3
4
5
// 分配 HeapByteBuffer
ByteBuffer buf = ByteBuffer.allocate(1024);

// 分配 DirectByteBuffer
ByteBuffer buf = ByteBuffer.allocateDirect(1024);

创建好 Buffer 后直接传给 Channel 的 read 或者 write 函数,最终这块 Buffer 会通过 JNI 调用传递给 C 程序。

1
2
// 将 buf 作为 read 函数的参数
int bytesRead = socketChannel.read(buf);

那 HeapByteBuffer 和 DirectByteBuffer 有什么区别呢?HeapByteBuffer 对象本身在 JVM 堆上分配,并且它持有的字节数组byte[]也是在 JVM 堆上分配。但是如果用HeapByteBuffer来接收网络数据,需要把数据从内核先拷贝到一个临时的本地内存,再从临时本地内存拷贝到 JVM 堆,而不是直接从内核拷贝到 JVM 堆上。这是为什么呢?这是因为数据从内核拷贝到 JVM 堆的过程中,JVM 可能会发生 GC,GC 过程中对象可能会被移动,也就是说 JVM 堆上的字节数组可能会被移动,这样的话 Buffer 地址就失效了。如果这中间经过本地内存中转,从本地内存到 JVM 堆的拷贝过程中 JVM 可以保证不做 GC。

如果使用 HeapByteBuffer,你会发现 JVM 堆和内核之间多了一层中转,而 DirectByteBuffer 用来解决这个问题,DirectByteBuffer 对象本身在 JVM 堆上,但是它持有的字节数组不是从 JVM 堆上分配的,而是从本地内存分配的。DirectByteBuffer 对象中有个 long 类型字段 address,记录着本地内存的地址,这样在接收数据的时候,直接把这个本地内存地址传递给 C 程序,C 程序会将网络数据从内核拷贝到这个本地内存,JVM 可以直接读取这个本地内存,这种方式比 HeapByteBuffer 少了一次拷贝,因此一般来说它的速度会比 HeapByteBuffer 快好几倍。你可以通过上面的图加深理解。

Tomcat 中的 AprEndpoint 就是通过 DirectByteBuffer 来接收数据的,而 NioEndpoint 和 Nio2Endpoint 是通过 HeapByteBuffer 来接收数据的。你可能会问,NioEndpoint 和 Nio2Endpoint 为什么不用 DirectByteBuffer 呢?这是因为本地内存不好管理,发生内存泄漏难以定位,从稳定性考虑,NioEndpoint 和 Nio2Endpoint 没有去冒这个险。

3.2.1. sendfile

我们再来考虑另一个网络通信的场景,也就是静态文件的处理。浏览器通过 Tomcat 来获取一个 HTML 文件,而 Tomcat 的处理逻辑无非是两步:

  1. 从磁盘读取 HTML 到内存。
  2. 将这段内存的内容通过 Socket 发送出去。

但是在传统方式下,有很多次的内存拷贝:

  • 读取文件时,首先是内核把文件内容读取到内核缓冲区。
  • 如果使用 HeapByteBuffer,文件数据从内核到 JVM 堆内存需要经过本地内存中转。
  • 同样在将文件内容推入网络时,从 JVM 堆到内核缓冲区需要经过本地内存中转。
  • 最后还需要把文件从内核缓冲区拷贝到网卡缓冲区。

从下面的图你会发现这个过程有 6 次内存拷贝,并且 read 和 write 等系统调用将导致进程从用户态到内核态的切换,会耗费大量的 CPU 和内存资源。

img

而 Tomcat 的 AprEndpoint 通过操作系统层面的 sendfile 特性解决了这个问题,sendfile 系统调用方式非常简洁。

1
sendfile(socket, file, len);

它带有两个关键参数:Socket 和文件句柄。将文件从磁盘写入 Socket 的过程只有两步:

第一步:将文件内容读取到内核缓冲区。

第二步:数据并没有从内核缓冲区复制到 Socket 关联的缓冲区,只有记录数据位置和长度的描述符被添加到 Socket 缓冲区中;接着把数据直接从内核缓冲区传递给网卡。这个过程你可以看下面的图。

img

4. Executor 组件

为了提高处理能力和并发度,Web 容器一般会把处理请求的工作放到线程池里来执行,Tomcat 扩展了原生的 Java 线程池,来满足 Web 容器高并发的需求。

4.1. Tomcat 定制线程池

Tomcat 的线程池也是一个定制版的 ThreadPoolExecutor。Tomcat 传入的参数是这样的:

1
2
3
4
5
6
7
8
// 定制版的任务队列
taskqueue = new TaskQueue(maxQueueSize);

// 定制版的线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());

// 定制版的线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);

其中的两个关键点:

  • Tomcat 有自己的定制版任务队列和线程工厂,并且可以限制任务队列的长度,它的最大长度是 maxQueueSize。
  • Tomcat 对线程数也有限制,设置了核心线程数(minSpareThreads)和最大线程池数(maxThreads)。

除了资源限制以外,Tomcat 线程池还定制自己的任务处理流程。我们知道 Java 原生线程池的任务处理逻辑比较简单:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 后面再来任务,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,执行拒绝策略。

Tomcat 线程池扩展了原生的 ThreadPoolExecutor,通过重写 execute 方法实现了自己的任务处理逻辑:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,则继续尝试把任务添加到任务队列中去。
  4. 如果缓冲队列也满了,插入失败,执行拒绝策略。

观察 Tomcat 线程池和 Java 原生线程池的区别,其实就是在第 3 步,Tomcat 在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。那具体如何实现呢,其实很简单,我们来看一下 Tomcat 线程池的 execute 方法的核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

...

public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
// 调用 Java 原生线程池的 execute 去执行任务
super.execute(command);
} catch (RejectedExecutionException rx) {
// 如果总线程数达到 maximumPoolSize,Java 原生线程池执行拒绝策略
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 继续尝试把任务放到任务队列中去
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
// 如果缓冲队列也满了,插入失败,执行拒绝策略。
throw new RejectedExecutionException("...");
}
}
}
}
}

从这个方法你可以看到,Tomcat 线程池的 execute 方法会调用 Java 原生线程池的 execute 去执行任务,如果总线程数达到 maximumPoolSize,Java 原生线程池的 execute 方法会抛出 RejectedExecutionException 异常,但是这个异常会被 Tomcat 线程池的 execute 方法捕获到,并继续尝试把这个任务放到任务队列中去;如果任务队列也满了,再执行拒绝策略。

4.2. Tomcat 定制任务队列

细心的你有没有发现,在 Tomcat 线程池的 execute 方法最开始有这么一行:

1
submittedCount.incrementAndGet();

这行代码的意思把 submittedCount 这个原子变量加一,并且在任务执行失败,抛出拒绝异常时,将这个原子变量减一:

1
submittedCount.decrementAndGet();

其实 Tomcat 线程池是用这个变量 submittedCount 来维护已经提交到了线程池,但是还没有执行完的任务个数。Tomcat 为什么要维护这个变量呢?这跟 Tomcat 的定制版的任务队列有关。Tomcat 的任务队列 TaskQueue 扩展了 Java 中的 LinkedBlockingQueue,我们知道 LinkedBlockingQueue 默认情况下长度是没有限制的,除非给它一个 capacity。因此 Tomcat 给了它一个 capacity,TaskQueue 的构造函数中有个整型的参数 capacity,TaskQueue 将 capacity 传给父类 LinkedBlockingQueue 的构造函数。

1
2
3
4
5
6
7
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

public TaskQueue(int capacity) {
super(capacity);
}
...
}

这个 capacity 参数是通过 Tomcat 的 maxQueueSize 参数来设置的,但问题是默认情况下 maxQueueSize 的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题,TaskQueue 重写了 LinkedBlockingQueue 的 offer 方法,在合适的时机返回 false,返回 false 表示任务添加失败,这时线程池会创建新的线程。那什么是合适的时机呢?请看下面 offer 方法的核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

...
@Override
// 线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
public boolean offer(Runnable o) {

// 如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
if (parent.getPoolSize() == parent.getMaximumPoolSize())
return super.offer(o);

// 执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
// 表明是可以创建新线程的,那到底要不要创建呢?分两种情况:

//1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
if (parent.getSubmittedCount()<=(parent.getPoolSize()))
return super.offer(o);

//2. 如果已提交的任务数大于当前线程数,线程不够用了,返回 false 去创建新线程
if (parent.getPoolSize()<parent.getMaximumPoolSize())
return false;

// 默认情况下总是把任务添加到任务队列
return super.offer(o);
}

}

从上面的代码我们看到,只有当前线程数大于核心线程数、小于最大线程数,并且已提交的任务个数大于当前线程数时,也就是说线程不够用了,但是线程数又没达到极限,才会去创建新的线程。这就是为什么 Tomcat 需要维护已提交任务数这个变量,它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程

当然默认情况下 Tomcat 的任务队列是没有限制的,你可以通过设置 maxQueueSize 参数来限制任务队列的长度。

5. WebSocket 组件

HTTP 协议是“请求 - 响应”模式,浏览器必须先发请求给服务器,服务器才会响应这个请求。也就是说,服务器不会主动发送数据给浏览器。

对于实时性要求比较的高的应用,比如在线游戏、股票基金实时报价和在线协同编辑等,浏览器需要实时显示服务器上最新的数据,因此出现了 Ajax 和 Comet 技术。Ajax 本质上还是轮询,而 Comet 是在 HTTP 长连接的基础上做了一些 hack,但是它们的实时性不高,另外频繁的请求会给服务器带来压力,也会浪费网络流量和带宽。于是 HTML5 推出了 WebSocket 标准,使得浏览器和服务器之间任何一方都可以主动发消息给对方,这样服务器有新数据时可以主动推送给浏览器。

Tomcat 如何支持 WebSocket?简单来说,Tomcat 做了两件事:

  • Endpoint 加载
  • WebSocket 请求处理

5.1. WebSocket 加载

Tomcat 的 WebSocket 加载是通过 SCI 机制完成的。SCI 全称 ServletContainerInitializer,是 Servlet 3.0 规范中定义的用来接收 Web 应用启动事件的接口。那为什么要监听 Servlet 容器的启动事件呢?因为这样我们有机会在 Web 应用启动时做一些初始化工作,比如 WebSocket 需要扫描和加载 Endpoint 类。SCI 的使用也比较简单,将实现 ServletContainerInitializer 接口的类增加 HandlesTypes 注解,并且在注解内指定的一系列类和接口集合。比如 Tomcat 为了扫描和加载 Endpoint 而定义的 SCI 类如下:

1
2
3
4
5
6
7
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class, Endpoint.class})
public class WsSci implements ServletContainerInitializer {

public void onStartup(Set<Class<?>> clazzes, ServletContext ctx) throws ServletException {
...
}
}

一旦定义好了 SCI,Tomcat 在启动阶段扫描类时,会将 HandlesTypes 注解中指定的类都扫描出来,作为 SCI 的 onStartup 方法的参数,并调用 SCI 的 onStartup 方法。注意到 WsSci 的 HandlesTypes 注解中定义了ServerEndpoint.classServerApplicationConfig.classEndpoint.class,因此在 Tomcat 的启动阶段会将这些类的类实例(注意不是对象实例)传递给 WsSci 的 onStartup 方法。那么 WsSci 的 onStartup 方法又做了什么事呢?

它会构造一个 WebSocketContainer 实例,你可以把 WebSocketContainer 理解成一个专门处理 WebSocket 请求的Endpoint 容器。也就是说 Tomcat 会把扫描到的 Endpoint 子类和添加了注解@ServerEndpoint的类注册到这个容器中,并且这个容器还维护了 URL 到 Endpoint 的映射关系,这样通过请求 URL 就能找到具体的 Endpoint 来处理 WebSocket 请求。

5.2. WebSocket 请求处理

Tomcat 用 ProtocolHandler 组件屏蔽应用层协议的差异,其中 ProtocolHandler 中有两个关键组件:Endpoint 和 Processor。需要注意,这里的 Endpoint 跟上文提到的 WebSocket 中的 Endpoint 完全是两回事,连接器中的 Endpoint 组件用来处理 I/O 通信。WebSocket 本质就是一个应用层协议,因此不能用 HttpProcessor 来处理 WebSocket 请求,而要用专门 Processor 来处理,而在 Tomcat 中这样的 Processor 叫作 UpgradeProcessor。

为什么叫 Upgrade Processor 呢?这是因为 Tomcat 是将 HTTP 协议升级成 WebSocket 协议的。

WebSocket 是通过 HTTP 协议来进行握手的,因此当 WebSocket 的握手请求到来时,HttpProtocolHandler 首先接收到这个请求,在处理这个 HTTP 请求时,Tomcat 通过一个特殊的 Filter 判断该当前 HTTP 请求是否是一个 WebSocket Upgrade 请求(即包含Upgrade: websocket的 HTTP 头信息),如果是,则在 HTTP 响应里添加 WebSocket 相关的响应头信息,并进行协议升级。具体来说就是用 UpgradeProtocolHandler 替换当前的 HttpProtocolHandler,相应的,把当前 Socket 的 Processor 替换成 UpgradeProcessor,同时 Tomcat 会创建 WebSocket Session 实例和 Endpoint 实例,并跟当前的 WebSocket 连接一一对应起来。这个 WebSocket 连接不会立即关闭,并且在请求处理中,不再使用原有的 HttpProcessor,而是用专门的 UpgradeProcessor,UpgradeProcessor 最终会调用相应的 Endpoint 实例来处理请求。

img

你可以看到,Tomcat 对 WebSocket 请求的处理没有经过 Servlet 容器,而是通过 UpgradeProcessor 组件直接把请求发到 ServerEndpoint 实例,并且 Tomcat 的 WebSocket 实现不需要关注具体 I/O 模型的细节,从而实现了与具体 I/O 方式的解耦。

6. 参考资料

Tomcat 容器

Tomcat 实现热部署和热加载

  • 热加载的实现方式是 Web 容器启动一个后台线程,定期检测类文件的变化,如果有变化,就重新加载类,在这个过程中不会清空 Session ,一般用在开发环境。
  • 热部署原理类似,也是由后台线程定时检测 Web 应用的变化,但它会重新加载整个 Web 应用。这种方式会清空 Session,比热加载更加干净、彻底,一般用在生产环境。

Tomcat 通过开启后台线程,使得各个层次的容器组件都有机会完成一些周期性任务。Tomcat 是基于 ScheduledThreadPoolExecutor 实现周期性任务的:

1
2
3
4
5
bgFuture = exec.scheduleWithFixedDelay(
new ContainerBackgroundProcessor(),// 要执行的 Runnable
backgroundProcessorDelay, // 第一次执行延迟多久
backgroundProcessorDelay, // 之后每次执行间隔多久
TimeUnit.SECONDS); // 时间单位

第一个参数就是要周期性执行的任务类 ContainerBackgroundProcessor,它是一个 Runnable,同时也是 ContainerBase 的内部类,ContainerBase 是所有容器组件的基类,我们来回忆一下容器组件有哪些,有 Engine、Host、Context 和 Wrapper 等,它们具有父子关系。

ContainerBackgroundProcessor 实现

我们接来看 ContainerBackgroundProcessor 具体是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected class ContainerBackgroundProcessor implements Runnable {

@Override
public void run() {
// 请注意这里传入的参数是 " 宿主类 " 的实例
processChildren(ContainerBase.this);
}

protected void processChildren(Container container) {
try {
//1. 调用当前容器的 backgroundProcess 方法。
container.backgroundProcess();

//2. 遍历所有的子容器,递归调用 processChildren,
// 这样当前容器的子孙都会被处理
Container[] children = container.findChildren();
for (int i = 0; i < children.length; i++) {
// 这里请你注意,容器基类有个变量叫做 backgroundProcessorDelay,如果大于 0,表明子容器有自己的后台线程,无需父容器来调用它的 processChildren 方法。
if (children[i].getBackgroundProcessorDelay() <= 0) {
processChildren(children[i]);
}
}
} catch (Throwable t) { ... }

上面的代码逻辑也是比较清晰的,首先 ContainerBackgroundProcessor 是一个 Runnable,它需要实现 run 方法,它的 run 很简单,就是调用了 processChildren 方法。这里有个小技巧,它把“宿主类”,也就是ContainerBase 的类实例当成参数传给了 run 方法

而在 processChildren 方法里,就做了两步:调用当前容器的 backgroundProcess 方法,以及递归调用子孙的 backgroundProcess 方法。请你注意 backgroundProcess 是 Container 接口中的方法,也就是说所有类型的容器都可以实现这个方法,在这个方法里完成需要周期性执行的任务。

这样的设计意味着什么呢?我们只需要在顶层容器,也就是 Engine 容器中启动一个后台线程,那么这个线程不但会执行 Engine 容器的周期性任务,它还会执行所有子容器的周期性任务

backgroundProcess 方法

上述代码都是在基类 ContainerBase 中实现的,那具体容器类需要做什么呢?其实很简单,如果有周期性任务要执行,就实现 backgroundProcess 方法;如果没有,就重用基类 ContainerBase 的方法。ContainerBase 的 backgroundProcess 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void backgroundProcess() {

//1. 执行容器中 Cluster 组件的周期性任务
Cluster cluster = getClusterInternal();
if (cluster != null) {
cluster.backgroundProcess();
}

//2. 执行容器中 Realm 组件的周期性任务
Realm realm = getRealmInternal();
if (realm != null) {
realm.backgroundProcess();
}

//3. 执行容器中 Valve 组件的周期性任务
Valve current = pipeline.getFirst();
while (current != null) {
current.backgroundProcess();
current = current.getNext();
}

//4. 触发容器的 " 周期事件 ",Host 容器的监听器 HostConfig 就靠它来调用
fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null);
}

从上面的代码可以看到,不仅每个容器可以有周期性任务,每个容器中的其他通用组件,比如跟集群管理有关的 Cluster 组件、跟安全管理有关的 Realm 组件都可以有自己的周期性任务。

我在前面的专栏里提到过,容器之间的链式调用是通过 Pipeline-Valve 机制来实现的,从上面的代码你可以看到容器中的 Valve 也可以有周期性任务,并且被 ContainerBase 统一处理。

请你特别注意的是,在 backgroundProcess 方法的最后,还触发了容器的“周期事件”。我们知道容器的生命周期事件有初始化、启动和停止等,那“周期事件”又是什么呢?它跟生命周期事件一样,是一种扩展机制,你可以这样理解:

又一段时间过去了,容器还活着,你想做点什么吗?如果你想做点什么,就创建一个监听器来监听这个“周期事件”,事件到了我负责调用你的方法。

总之,有了 ContainerBase 中的后台线程和 backgroundProcess 方法,各种子容器和通用组件不需要各自弄一个后台线程来处理周期性任务,这样的设计显得优雅和整洁。

Tomcat 热加载

有了 ContainerBase 的周期性任务处理“框架”,作为具体容器子类,只需要实现自己的周期性任务就行。而 Tomcat 的热加载,就是在 Context 容器中实现的。Context 容器的 backgroundProcess 方法是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void backgroundProcess() {

//WebappLoader 周期性的检查 WEB-INF/classes 和 WEB-INF/lib 目录下的类文件
Loader loader = getLoader();
if (loader != null) {
loader.backgroundProcess();
}

//Session 管理器周期性的检查是否有过期的 Session
Manager manager = getManager();
if (manager != null) {
manager.backgroundProcess();
}

// 周期性的检查静态资源是否有变化
WebResourceRoot resources = getResources();
if (resources != null) {
resources.backgroundProcess();
}

// 调用父类 ContainerBase 的 backgroundProcess 方法
super.backgroundProcess();
}

从上面的代码我们看到 Context 容器通过 WebappLoader 来检查类文件是否有更新,通过 Session 管理器来检查是否有 Session 过期,并且通过资源管理器来检查静态资源是否有更新,最后还调用了父类 ContainerBase 的 backgroundProcess 方法。

这里我们要重点关注,WebappLoader 是如何实现热加载的,它主要是调用了 Context 容器的 reload 方法,而 Context 的 reload 方法比较复杂,总结起来,主要完成了下面这些任务:

  1. 停止和销毁 Context 容器及其所有子容器,子容器其实就是 Wrapper,也就是说 Wrapper 里面 Servlet 实例也被销毁了。
  2. 停止和销毁 Context 容器关联的 Listener 和 Filter。
  3. 停止和销毁 Context 下的 Pipeline 和各种 Valve。
  4. 停止和销毁 Context 的类加载器,以及类加载器加载的类文件资源。
  5. 启动 Context 容器,在这个过程中会重新创建前面四步被销毁的资源。

在这个过程中,类加载器发挥着关键作用。一个 Context 容器对应一个类加载器,类加载器在销毁的过程中会把它加载的所有类也全部销毁。Context 容器在启动过程中,会创建一个新的类加载器来加载新的类文件。

在 Context 的 reload 方法里,并没有调用 Session 管理器的 distroy 方法,也就是说这个 Context 关联的 Session 是没有销毁的。你还需要注意的是,Tomcat 的热加载默认是关闭的,你需要在 conf 目录下的 Context.xml 文件中设置 reloadable 参数来开启这个功能,像下面这样:

1
<Context reloadable="true"/>

Tomcat 热部署

我们再来看看热部署,热部署跟热加载的本质区别是,热部署会重新部署 Web 应用,原来的 Context 对象会整个被销毁掉,因此这个 Context 所关联的一切资源都会被销毁,包括 Session。

那么 Tomcat 热部署又是由哪个容器来实现的呢?应该不是由 Context,因为热部署过程中 Context 容器被销毁了,那么这个重担就落在 Host 身上了,因为它是 Context 的父容器。

跟 Context 不一样,Host 容器并没有在 backgroundProcess 方法中实现周期性检测的任务,而是通过监听器 HostConfig 来实现的,HostConfig 就是前面提到的“周期事件”的监听器,那“周期事件”达到时,HostConfig 会做什么事呢?

1
2
3
4
5
6
public void lifecycleEvent(LifecycleEvent event) {
// 执行 check 方法。
if (event.getType().equals(Lifecycle.PERIODIC_EVENT)) {
check();
}
}

它执行了 check 方法,我们接着来看 check 方法里做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void check() {

if (host.getAutoDeploy()) {
// 检查这个 Host 下所有已经部署的 Web 应用
DeployedApplication[] apps =
deployed.values().toArray(new DeployedApplication[0]);

for (int i = 0; i < apps.length; i++) {
// 检查 Web 应用目录是否有变化
checkResources(apps[i], false);
}

// 执行部署
deployApps();
}
}

其实 HostConfig 会检查 webapps 目录下的所有 Web 应用:

  • 如果原来 Web 应用目录被删掉了,就把相应 Context 容器整个销毁掉。
  • 是否有新的 Web 应用目录放进来了,或者有新的 WAR 包放进来了,就部署相应的 Web 应用。

因此 HostConfig 做的事情都是比较“宏观”的,它不会去检查具体类文件或者资源文件是否有变化,而是检查 Web 应用目录级别的变化。

Tomcat 的类加载机制

Tomcat 的自定义类加载器 WebAppClassLoader 打破了双亲委派机制,它首先自己尝试去加载某个类,如果找不到再代理给父类加载器,其目的是优先加载 Web 应用自己定义的类。具体实现就是重写 ClassLoader 的两个方法:findClass 和 loadClass。

findClass 方法

我们先来看看 findClass 方法的实现,为了方便理解和阅读,我去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Class<?> findClass(String name) throws ClassNotFoundException {
...

Class<?> clazz = null;
try {
//1. 先在 Web 应用目录下查找类
clazz = findClassInternal(name);
} catch (RuntimeException e) {
throw e;
}

if (clazz == null) {
try {
//2. 如果在本地目录没有找到,交给父加载器去查找
clazz = super.findClass(name);
} catch (RuntimeException e) {
throw e;
}

//3. 如果父类也没找到,抛出 ClassNotFoundException
if (clazz == null) {
throw new ClassNotFoundException(name);
}

return clazz;
}

在 findClass 方法里,主要有三个步骤:

  1. 先在 Web 应用本地目录下查找要加载的类。
  2. 如果没有找到,交给父加载器去查找,它的父加载器就是上面提到的系统类加载器 AppClassLoader。
  3. 如何父加载器也没找到这个类,抛出 ClassNotFound 异常。

loadClass 方法

接着我们再来看 Tomcat 类加载器的 loadClass 方法的实现,同样我也去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {

synchronized (getClassLoadingLock(name)) {

Class<?> clazz = null;

//1. 先在本地 cache 查找该类是否已经加载过
clazz = findLoadedClass0(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

//2. 从系统类加载器的 cache 中查找是否加载过
clazz = findLoadedClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

// 3. 尝试用 ExtClassLoader 类加载器类加载,为什么?
ClassLoader javaseLoader = getJavaseClassLoader();
try {
clazz = javaseLoader.loadClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 4. 尝试在本地目录搜索 class 并加载
try {
clazz = findClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 5. 尝试用系统类加载器 (也就是 AppClassLoader) 来加载
try {
clazz = Class.forName(name, false, parent);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}
}

//6. 上述过程都加载失败,抛出异常
throw new ClassNotFoundException(name);
}

loadClass 方法稍微复杂一点,主要有六个步骤:

  1. 先在本地 Cache 查找该类是否已经加载过,也就是说 Tomcat 的类加载器是否已经加载过这个类。
  2. 如果 Tomcat 类加载器没有加载过这个类,再看看系统类加载器是否加载过。
  3. 如果都没有,就让ExtClassLoader去加载,这一步比较关键,目的防止 Web 应用自己的类覆盖 JRE 的核心类。因为 Tomcat 需要打破双亲委派机制,假如 Web 应用里自定义了一个叫 Object 的类,如果先加载这个 Object 类,就会覆盖 JRE 里面的那个 Object 类,这就是为什么 Tomcat 的类加载器会优先尝试用 ExtClassLoader 去加载,因为 ExtClassLoader 会委托给 BootstrapClassLoader 去加载,BootstrapClassLoader 发现自己已经加载了 Object 类,直接返回给 Tomcat 的类加载器,这样 Tomcat 的类加载器就不会去加载 Web 应用下的 Object 类了,也就避免了覆盖 JRE 核心类的问题。
  4. 如果 ExtClassLoader 加载器加载失败,也就是说 JRE 核心类中没有这类,那么就在本地 Web 应用目录下查找并加载。
  5. 如果本地目录下没有这个类,说明不是 Web 应用自己定义的类,那么由系统类加载器去加载。这里请你注意,Web 应用是通过Class.forName调用交给系统类加载器的,因为Class.forName的默认加载器就是系统类加载器。
  6. 如果上述加载过程全部失败,抛出 ClassNotFound 异常。

从上面的过程我们可以看到,Tomcat 的类加载器打破了双亲委派机制,没有一上来就直接委托给父加载器,而是先在本地目录下加载,为了避免本地目录下的类覆盖 JRE 的核心类,先尝试用 JVM 扩展类加载器 ExtClassLoader 去加载。那为什么不先用系统类加载器 AppClassLoader 去加载?很显然,如果是这样的话,那就变成双亲委派机制了,这就是 Tomcat 类加载器的巧妙之处。

Tomcat 实现应用隔离

Tomcat 作为 Web 容器,需要解决以下问题:

  1. 如果在 Tomcat 中运行了两个 Web 应用程序,两个 Web 应用中有同名的 Servlet,但是功能不同,Tomcat 需要同时加载和管理这两个同名的 Servlet 类,保证它们不会冲突,因此 Web 应用之间的类需要隔离。
  2. 两个 Web 应用都依赖同一个第三方的 JAR 包,比如 Spring,那 Spring 的 JAR 包被加载到内存后,Tomcat 要保证这两个 Web 应用能够共享,也就是说 Spring 的 JAR 包只被加载一次,否则随着依赖的第三方 JAR 包增多,JVM 的内存会膨胀。
  3. 需要隔离 Tomcat 本身的类和 Web 应用的类。

img

WebAppClassLoader

针对第一个问题:

如果使用 JVM 默认 AppClassLoader 来加载 Web 应用,AppClassLoader 只能加载一个 Servlet 类,在加载第二个同名 Servlet 类时,AppClassLoader 会返回第一个 Servlet 类的 Class 实例,这是因为在 AppClassLoader 看来,同名的 Servlet 类只被加载一次。

Tomcat 的解决方案是自定义一个类加载器 WebAppClassLoader, 并且给每个 Web 应用创建一个类加载器实例。我们知道,Context 容器组件对应一个 Web 应用,因此,每个 Context 容器负责创建和维护一个 WebAppClassLoader 加载器实例。这背后的原理是,不同的加载器实例加载的类被认为是不同的类,即使它们的类名相同。这就相当于在 Java 虚拟机内部创建了一个个相互隔离的 Java 类空间,每一个 Web 应用都有自己的类空间,Web 应用之间通过各自的类加载器互相隔离。

SharedClassLoader

针对第二个问题:

本质需求是两个 Web 应用之间怎么共享库类,并且不能重复加载相同的类。我们知道,在双亲委派机制里,各个子加载器都可以通过父加载器去加载类,那么把需要共享的类放到父加载器的加载路径下不就行了吗,应用程序也正是通过这种方式共享 JRE 的核心类。因此 Tomcat 的设计者又加了一个类加载器 SharedClassLoader,作为 WebAppClassLoader 的父加载器,专门来加载 Web 应用之间共享的类。如果 WebAppClassLoader 自己没有加载到某个类,就会委托父加载器 SharedClassLoader 去加载这个类,SharedClassLoader 会在指定目录下加载共享类,之后返回给 WebAppClassLoader,这样共享的问题就解决了。

CatalinaClassloader

如何隔离 Tomcat 本身的类和 Web 应用的类?

要共享可以通过父子关系,要隔离那就需要兄弟关系了。兄弟关系就是指两个类加载器是平行的,它们可能拥有同一个父加载器,但是两个兄弟类加载器加载的类是隔离的。基于此 Tomcat 又设计一个类加载器 CatalinaClassloader,专门来加载 Tomcat 自身的类。这样设计有个问题,那 Tomcat 和各 Web 应用之间需要共享一些类时该怎么办呢?

CommonClassLoader

老办法,还是再增加一个 CommonClassLoader,作为 CatalinaClassloader 和 SharedClassLoader 的父加载器。CommonClassLoader 能加载的类都可以被 CatalinaClassLoader 和 SharedClassLoader 使用,而 CatalinaClassLoader 和 SharedClassLoader 能加载的类则与对方相互隔离。WebAppClassLoader 可以使用 SharedClassLoader 加载到的类,但各个 WebAppClassLoader 实例之间相互隔离。

Tomcat 实现 Servlet 规范

Servlet 容器最重要的任务就是创建 Servlet 的实例并且调用 Servlet。

一个 Web 应用里往往有多个 Servlet,而在 Tomcat 中一个 Web 应用对应一个 Context 容器,也就是说一个 Context 容器需要管理多个 Servlet 实例。但 Context 容器并不直接持有 Servlet 实例,而是通过子容器 Wrapper 来管理 Servlet,你可以把 Wrapper 容器看作是 Servlet 的包装。

为什么需要 Wrapper 呢?Context 容器直接维护一个 Servlet 数组不就行了吗?这是因为 Servlet 不仅仅是一个类实例,它还有相关的配置信息,比如它的 URL 映射、它的初始化参数,因此设计出了一个包装器,把 Servlet 本身和它相关的数据包起来,没错,这就是面向对象的思想。

除此以外,Servlet 规范中还有两个重要特性:Listener 和 Filter,Tomcat 也需要创建它们的实例,并在合适的时机去调用它们的方法。

Servlet 管理

Tomcat 是用 Wrapper 容器来管理 Servlet 的,那 Wrapper 容器具体长什么样子呢?我们先来看看它里面有哪些关键的成员变量:

1
protected volatile Servlet instance = null;

它拥有一个 Servlet 实例,并且 Wrapper 通过 loadServlet 方法来实例化 Servlet。为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
public synchronized Servlet loadServlet() throws ServletException {
Servlet servlet;

//1. 创建一个 Servlet 实例
servlet = (Servlet) instanceManager.newInstance(servletClass);

//2. 调用了 Servlet 的 init 方法,这是 Servlet 规范要求的
initServlet(servlet);

return servlet;
}

其实 loadServlet 主要做了两件事:创建 Servlet 的实例,并且调用 Servlet 的 init 方法,因为这是 Servlet 规范要求的。

那接下来的问题是,什么时候会调到这个 loadServlet 方法呢?为了加快系统的启动速度,我们往往会采取资源延迟加载的策略,Tomcat 也不例外,默认情况下 Tomcat 在启动时不会加载你的 Servlet,除非你把 Servlet 的loadOnStartup参数设置为true

这里还需要你注意的是,虽然 Tomcat 在启动时不会创建 Servlet 实例,但是会创建 Wrapper 容器,就好比尽管枪里面还没有子弹,先把枪造出来。那子弹什么时候造呢?是真正需要开枪的时候,也就是说有请求来访问某个 Servlet 时,这个 Servlet 的实例才会被创建。

那 Servlet 是被谁调用的呢?我们回忆一下专栏前面提到过 Tomcat 的 Pipeline-Valve 机制,每个容器组件都有自己的 Pipeline,每个 Pipeline 中有一个 Valve 链,并且每个容器组件有一个 BasicValve(基础阀)。Wrapper 作为一个容器组件,它也有自己的 Pipeline 和 BasicValve,Wrapper 的 BasicValve 叫 StandardWrapperValve

你可以想到,当请求到来时,Context 容器的 BasicValve 会调用 Wrapper 容器中 Pipeline 中的第一个 Valve,然后会调用到 StandardWrapperValve。我们先来看看它的 invoke 方法是如何实现的,同样为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void invoke(Request request, Response response) {

//1. 实例化 Servlet
servlet = wrapper.allocate();

//2. 给当前请求创建一个 Filter 链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

//3. 调用这个 Filter 链,Filter 链中的最后一个 Filter 会调用 Servlet
filterChain.doFilter(request.getRequest(), response.getResponse());

}

StandardWrapperValve 的 invoke 方法比较复杂,去掉其他异常处理的一些细节,本质上就是三步:

  • 第一步,创建 Servlet 实例;
  • 第二步,给当前请求创建一个 Filter 链;
  • 第三步,调用这个 Filter 链。

你可能会问,为什么需要给每个请求创建一个 Filter 链?这是因为每个请求的请求路径都不一样,而 Filter 都有相应的路径映射,因此不是所有的 Filter 都需要来处理当前的请求,我们需要根据请求的路径来选择特定的一些 Filter 来处理。

第二个问题是,为什么没有看到调到 Servlet 的 service 方法?这是因为 Filter 链的 doFilter 方法会负责调用 Servlet,具体来说就是 Filter 链中的最后一个 Filter 会负责调用 Servlet。

接下来我们来看 Filter 的实现原理。

Filter 管理

我们知道,跟 Servlet 一样,Filter 也可以在web.xml文件里进行配置,不同的是,Filter 的作用域是整个 Web 应用,因此 Filter 的实例是在 Context 容器中进行管理的,Context 容器用 Map 集合来保存 Filter。

1
private Map<String, FilterDef> filterDefs = new HashMap<>();

那上面提到的 Filter 链又是什么呢?Filter 链的存活期很短,它是跟每个请求对应的。一个新的请求来了,就动态创建一个 FIlter 链,请求处理完了,Filter 链也就被回收了。理解它的原理也非常关键,我们还是来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class ApplicationFilterChain implements FilterChain {

//Filter 链中有 Filter 数组,这个好理解
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];

//Filter 链中的当前的调用位置
private int pos = 0;

// 总共有多少了 Filter
private int n = 0;

// 每个 Filter 链对应一个 Servlet,也就是它要调用的 Servlet
private Servlet servlet = null;

public void doFilter(ServletRequest req, ServletResponse res) {
internalDoFilter(request,response);
}

private void internalDoFilter(ServletRequest req,
ServletResponse res){

// 每个 Filter 链在内部维护了一个 Filter 数组
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
Filter filter = filterConfig.getFilter();

filter.doFilter(request, response, this);
return;
}

servlet.service(request, response);

}

从 ApplicationFilterChain 的源码我们可以看到几个关键信息:

  • Filter 链中除了有 Filter 对象的数组,还有一个整数变量 pos,这个变量用来记录当前被调用的 Filter 在数组中的位置。
  • Filter 链中有个 Servlet 实例,这个好理解,因为上面提到了,每个 Filter 链最后都会调到一个 Servlet。
  • Filter 链本身也实现了 doFilter 方法,直接调用了一个内部方法 internalDoFilter。
  • internalDoFilter 方法的实现比较有意思,它做了一个判断,如果当前 Filter 的位置小于 Filter 数组的长度,也就是说 Filter 还没调完,就从 Filter 数组拿下一个 Filter,调用它的 doFilter 方法。否则,意味着所有 Filter 都调到了,就调用 Servlet 的 service 方法。

但问题是,方法体里没看到循环,谁在不停地调用 Filter 链的 doFIlter 方法呢?Filter 是怎么依次调到的呢?

答案是Filter 本身的 doFilter 方法会调用 Filter 链的 doFilter 方法,我们还是来看看代码就明白了:

1
2
3
4
5
6
7
8
9
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain){

...

// 调用 Filter 的方法
chain.doFilter(request, response);

}

注意 Filter 的 doFilter 方法有个关键参数 FilterChain,就是 Filter 链。并且每个 Filter 在实现 doFilter 时,必须要调用 Filter 链的 doFilter 方法,而 Filter 链中保存当前 FIlter 的位置,会调用下一个 FIlter 的 doFilter 方法,这样链式调用就完成了。

Filter 链跟 Tomcat 的 Pipeline-Valve 本质都是责任链模式,但是在具体实现上稍有不同,你可以细细体会一下。

Listener 管理

我们接着聊 Servlet 规范里 Listener。跟 Filter 一样,Listener 也是一种扩展机制,你可以监听容器内部发生的事件,主要有两类事件:

  • 第一类是生命状态的变化,比如 Context 容器启动和停止、Session 的创建和销毁。
  • 第二类是属性的变化,比如 Context 容器某个属性值变了、Session 的某个属性值变了以及新的请求来了等。

我们可以在web.xml配置或者通过注解的方式来添加监听器,在监听器里实现我们的业务逻辑。对于 Tomcat 来说,它需要读取配置文件,拿到监听器类的名字,实例化这些类,并且在合适的时机调用这些监听器的方法。

Tomcat 是通过 Context 容器来管理这些监听器的。Context 容器将两类事件分开来管理,分别用不同的集合来存放不同类型事件的监听器:

1
2
3
4
5
// 监听属性值变化的监听器
private List<Object> applicationEventListenersList = new CopyOnWriteArrayList<>();

// 监听生命事件的监听器
private Object applicationLifecycleListenersObjects[] = new Object[0];

剩下的事情就是触发监听器了,比如在 Context 容器的启动方法里,就触发了所有的 ServletContextListener:

1
2
3
4
5
6
7
8
9
10
11
12
//1. 拿到所有的生命周期监听器
Object instances[] = getApplicationLifecycleListeners();

for (int i = 0; i < instances.length; i++) {
//2. 判断 Listener 的类型是不是 ServletContextListener
if (!(instances[i] instanceof ServletContextListener))
continue;

//3. 触发 Listener 的方法
ServletContextListener lr = (ServletContextListener) instances[i];
lr.contextInitialized(event);
}

需要注意的是,这里的 ServletContextListener 接口是一种留给用户的扩展机制,用户可以实现这个接口来定义自己的监听器,监听 Context 容器的启停事件。Spring 就是这么做的。ServletContextListener 跟 Tomcat 自己的生命周期事件 LifecycleListener 是不同的。LifecycleListener 定义在生命周期管理组件中,由基类 LifeCycleBase 统一管理。

Tomcat 支持异步 Servlet

异步示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncServlet extends HttpServlet {

//Web 应用线程池,用来处理异步 Servlet
ExecutorService executor = Executors.newSingleThreadExecutor();

public void service(HttpServletRequest req, HttpServletResponse resp) {
//1. 调用 startAsync 或者异步上下文
final AsyncContext ctx = req.startAsync();

// 用线程池来执行耗时操作
executor.execute(new Runnable() {

@Override
public void run() {

// 在这里做耗时的操作
try {
ctx.getResponse().getWriter().println("Handling Async Servlet");
} catch (IOException e) {}

//3. 异步 Servlet 处理完了调用异步上下文的 complete 方法
ctx.complete();
}

});
}
}

有三个要点:

  1. 通过注解的方式来注册 Servlet,除了 @WebServlet 注解,还需要加上 asyncSupported=true 的属性,表明当前的 Servlet 是一个异步 Servlet。
  2. Web 应用程序需要调用 Request 对象的 startAsync 方法来拿到一个异步上下文 AsyncContext。这个上下文保存了请求和响应对象。
  3. Web 应用需要开启一个新线程来处理耗时的操作,处理完成后需要调用 AsyncContext 的 complete 方法。目的是告诉 Tomcat,请求已经处理完成。

这里请你注意,虽然异步 Servlet 允许用更长的时间来处理请求,但是也有超时限制的,默认是 30 秒,如果 30 秒内请求还没处理完,Tomcat 会触发超时机制,向浏览器返回超时错误,如果这个时候你的 Web 应用再调用ctx.complete方法,会得到一个 IllegalStateException 异常。

异步 Servlet 原理

通过上面的例子,相信你对 Servlet 的异步实现有了基本的理解。要理解 Tomcat 在这个过程都做了什么事情,关键就是要弄清楚req.startAsync方法和ctx.complete方法都做了什么。

startAsync 方法

startAsync 方法其实就是创建了一个异步上下文 AsyncContext 对象,AsyncContext 对象的作用是保存请求的中间信息,比如 Request 和 Response 对象等上下文信息。你来思考一下为什么需要保存这些信息呢?

这是因为 Tomcat 的工作线程在Request.startAsync调用之后,就直接结束回到线程池中了,线程本身不会保存任何信息。也就是说一个请求到服务端,执行到一半,你的 Web 应用正在处理,这个时候 Tomcat 的工作线程没了,这就需要有个缓存能够保存原始的 Request 和 Response 对象,而这个缓存就是 AsyncContext。

有了 AsyncContext,你的 Web 应用通过它拿到 request 和 response 对象,拿到 Request 对象后就可以读取请求信息,请求处理完了还需要通过 Response 对象将 HTTP 响应发送给浏览器。

除了创建 AsyncContext 对象,startAsync 还需要完成一个关键任务,那就是告诉 Tomcat 当前的 Servlet 处理方法返回时,不要把响应发到浏览器,因为这个时候,响应还没生成呢;并且不能把 Request 对象和 Response 对象销毁,因为后面 Web 应用还要用呢。

在 Tomcat 中,负责 flush 响应数据的是 CoyoteAdaptor,它还会销毁 Request 对象和 Response 对象,因此需要通过某种机制通知 CoyoteAdaptor,具体来说是通过下面这行代码:

1
this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this);

你可以把它理解为一个 Callback,在这个 action 方法里设置了 Request 对象的状态,设置它为一个异步 Servlet 请求。

我们知道连接器是调用 CoyoteAdapter 的 service 方法来处理请求的,而 CoyoteAdapter 会调用容器的 service 方法,当容器的 service 方法返回时,CoyoteAdapter 判断当前的请求是不是异步 Servlet 请求,如果是,就不会销毁 Request 和 Response 对象,也不会把响应信息发到浏览器。你可以通过下面的代码理解一下,这是 CoyoteAdapter 的 service 方法,我对它进行了简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) {

// 调用容器的 service 方法处理请求
connector.getService().getContainer().getPipeline().
getFirst().invoke(request, response);

// 如果是异步 Servlet 请求,仅仅设置一个标志,
// 否则说明是同步 Servlet 请求,就将响应数据刷到浏览器
if (request.isAsync()) {
async = true;
} else {
request.finishRequest();
response.finishResponse();
}

// 如果不是异步 Servlet 请求,就销毁 Request 对象和 Response 对象
if (!async) {
request.recycle();
response.recycle();
}
}

接下来,当 CoyoteAdaptor 的 service 方法返回到 ProtocolHandler 组件时,ProtocolHandler 判断返回值,如果当前请求是一个异步 Servlet 请求,它会把当前 Socket 的协议处理者 Processor 缓存起来,将 SocketWrapper 对象和相应的 Processor 存到一个 Map 数据结构里。

1
private final Map<S,Processor> connections = new ConcurrentHashMap<>();

之所以要缓存是因为这个请求接下来还要接着处理,还是由原来的 Processor 来处理,通过 SocketWrapper 就能从 Map 里找到相应的 Processor。

complete 方法

接着我们再来看关键的ctx.complete方法,当请求处理完成时,Web 应用调用这个方法。那么这个方法做了些什么事情呢?最重要的就是把响应数据发送到浏览器。

这件事情不能由 Web 应用线程来做,也就是说ctx.complete方法不能直接把响应数据发送到浏览器,因为这件事情应该由 Tomcat 线程来做,但具体怎么做呢?

我们知道,连接器中的 Endpoint 组件检测到有请求数据达到时,会创建一个 SocketProcessor 对象交给线程池去处理,因此 Endpoint 的通信处理和具体请求处理在两个线程里运行。

在异步 Servlet 的场景里,Web 应用通过调用ctx.complete方法时,也可以生成一个新的 SocketProcessor 任务类,交给线程池处理。对于异步 Servlet 请求来说,相应的 Socket 和协议处理组件 Processor 都被缓存起来了,并且这些对象都可以通过 Request 对象拿到。

讲到这里,你可能已经猜到ctx.complete是如何实现的了:

1
2
3
4
5
6
7
8
public void complete() {
// 检查状态合法性,我们先忽略这句
check();

// 调用 Request 对象的 action 方法,其实就是通知连接器,这个异步请求处理完了
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);

}

我们可以看到 complete 方法调用了 Request 对象的 action 方法。而在 action 方法里,则是调用了 Processor 的 processSocketEvent 方法,并且传入了操作码 OPEN_READ。

1
2
3
4
5
6
7
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}

我们接着看 processSocketEvent 方法,它调用 SocketWrapper 的 processSocket 方法:

1
2
3
4
5
6
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}

而 SocketWrapper 的 processSocket 方法会创建 SocketProcessor 任务类,并通过 Tomcat 线程池来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {

if (socketWrapper == null) {
return false;
}

SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 线程池运行
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}

请你注意 createSocketProcessor 函数的第二个参数是 SocketEvent,这里我们传入的是 OPEN_READ。通过这个参数,我们就能控制 SocketProcessor 的行为,因为我们不需要再把请求发送到容器进行处理,只需要向浏览器端发送数据,并且重新在这个 Socket 上监听新的请求就行了。

参考资料

Tomcat 优化

Tomcat 启动优化

如果 Tomcat 启动比较慢,可以考虑一些优化点

清理 Tomcat

  • 清理不必要的 Web 应用:首先我们要做的是删除掉 webapps 文件夹下不需要的工程,一般是 host-manager、example、doc 等这些默认的工程,可能还有以前添加的但现在用不着的工程,最好把这些全都删除掉。
  • 清理 XML 配置文件:Tomcat 在启动时会解析所有的 XML 配置文件,解析 XML 较为耗时,所以应该尽量保持配置文件的简洁。
  • 清理 JAR 文件:JVM 的类加载器在加载类时,需要查找每一个 JAR 文件,去找到所需要的类。如果删除了不需要的 JAR 文件,查找的速度就会快一些。这里请注意:Web 应用中的 lib 目录下不应该出现 Servlet API 或者 Tomcat 自身的 JAR,这些 JAR 由 Tomcat 负责提供。
  • 清理其他文件:及时清理日志,删除 logs 文件夹下不需要的日志文件。同样还有 work 文件夹下的 catalina 文件夹,它其实是 Tomcat 把 JSP 转换为 Class 文件的工作目录。有时候我们也许会遇到修改了代码,重启了 Tomcat,但是仍没效果,这时候便可以删除掉这个文件夹,Tomcat 下次启动的时候会重新生成。

禁止 Tomcat TLD 扫描

Tomcat 为了支持 JSP,在应用启动的时候会扫描 JAR 包里面的 TLD 文件,加载里面定义的标签库。所以在 Tomcat 的启动日志里,你可能会碰到这种提示:

At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time.

Tomcat 的意思是,我扫描了你 Web 应用下的 JAR 包,发现 JAR 包里没有 TLD 文件。我建议配置一下 Tomcat 不要去扫描这些 JAR 包,这样可以提高 Tomcat 的启动速度,并节省 JSP 编译时间。

如何配置不去扫描这些 JAR 包呢,这里分两种情况:

  • 如果你的项目没有使用 JSP 作为 Web 页面模板,而是使用 Velocity 之类的模板引擎,你完全可以把 TLD 扫描禁止掉。方法是,找到 Tomcat 的conf/目录下的context.xml文件,在这个文件里 Context 标签下,加上JarScannerJarScanFilter子标签,像下面这样。

    1
    2
    3
    4
    5
    <Context>
    <JarScanner >
    <JarScanFilter defaultTldScan="true" defaultpluggabilityScan="true" />
    </JarScanner>
    </Context>
  • 如果你的项目使用了 JSP 作为 Web 页面模块,意味着 TLD 扫描无法避免,但是我们可以通过配置来告诉 Tomcat,只扫描那些包含 TLD 文件的 JAR 包。方法是,找到 Tomcat 的conf/目录下的catalina.properties文件,在这个文件里的 jarsToSkip 配置项中,加上你的 JAR 包。

    1
    tomcat.util.scan.StandardJarScanFilter.jarsToSkip=xxx.jar

关闭 WebSocket 支持

Tomcat 会扫描 WebSocket 注解的 API 实现,比如 @ServerEndpoint 注解的类。如果不需要使用 WebSockets 就可以关闭它。具体方法是,找到 Tomcat 的 conf/ 目录下的 context.xml 文件,给 Context 标签加一个 containerSciFilter 的属性:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci">
...
</Context>

更进一步,如果你不需要 WebSockets 这个功能,你可以把 Tomcat lib 目录下的 websocket-api.jartomcat-websocket.jar 这两个 JAR 文件删除掉,进一步提高性能。

关闭 JSP 支持

如果不需要使用 JSP,可以关闭 JSP 功能:

1
2
3
<Context containerSciFilter="org.apache.jasper.servlet.JasperInitializer">
...
</Context>

如果要同时关闭 WebSocket 和 Jsp,可以这样配置:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci | org.apache.jasper.servlet.JasperInitializer">
...
</Context>

禁止扫描 Servlet 注解

Servlet 3.0 引入了注解 Servlet,Tomcat 为了支持这个特性,会在 Web 应用启动时扫描你的类文件,因此如果你没有使用 Servlet 注解这个功能,可以告诉 Tomcat 不要去扫描 Servlet 注解。具体配置方法是,在你的 Web 应用的web.xml文件中,设置<web-app>元素的属性metadata-complete="true",像下面这样。

1
2
<web-app metadata-complete="true">
</web-app>

metadata-complete 的意思是,web.xml 里配置的 Servlet 是完整的,不需要再去库类中找 Servlet 的定义。

配置 Web-Fragment 扫描

Servlet 3.0 还引入了“Web 模块部署描述符片段”的 web-fragment.xml,这是一个部署描述文件,可以完成 web.xml 的配置功能。而这个 web-fragment.xml 文件必须存放在 JAR 文件的 META-INF 目录下,而 JAR 包通常放在 WEB-INF/lib 目录下,因此 Tomcat 需要对 JAR 文件进行扫描才能支持这个功能。

可以通过配置 web.xml 里面的 <absolute-ordering> 元素直接指定了哪些 JAR 包需要扫描 web fragment,如果 <absolute-ordering/> 元素是空的, 则表示不需要扫描,像下面这样。

1
2
3
4
5
<web-app metadata-complete="true">
...
<absolute-ordering />
...
</web-app>

随机数熵源优化

Tomcat 7 以上的版本依赖 Java 的 SecureRandom 类来生成随机数,比如 Session ID。而 JVM 默认使用阻塞式熵源(/dev/random), 在某些情况下就会导致 Tomcat 启动变慢。当阻塞时间较长时, 你会看到这样一条警告日志:

1
2
<DATE> org.apache.catalina.util.SessionIdGenerator createSecureRandom
INFO: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [8152] milliseconds.

解决方案是通过设置,让 JVM 使用非阻塞式的熵源。

我们可以设置 JVM 的参数:

1
-Djava.security.egd=file:/dev/./urandom

或者是设置 java.security 文件,位于 $JAVA_HOME/jre/lib/security 目录之下: securerandom.source=file:/dev/./urandom

这里请你注意,/dev/./urandom 中间有个 ./ 的原因是 Oracle JRE 中的 Bug,Java 8 里面的 SecureRandom 类已经修正这个 Bug。 阻塞式的熵源(/dev/random)安全性较高, 非阻塞式的熵源(/dev/./urandom)安全性会低一些,因为如果你对随机数的要求比较高, 可以考虑使用硬件方式生成熵源。

并行启动多个 Web 应用

Tomcat 启动的时候,默认情况下 Web 应用都是一个一个启动的,等所有 Web 应用全部启动完成,Tomcat 才算启动完毕。如果在一个 Tomcat 下有多个 Web 应用,为了优化启动速度,你可以配置多个应用程序并行启动,可以通过修改 server.xml 中 Host 元素的 startStopThreads 属性来完成。startStopThreads 的值表示你想用多少个线程来启动你的 Web 应用,如果设成 0 表示你要并行启动 Web 应用,像下面这样的配置。

1
2
3
4
5
6
7
<Engine startStopThreads="0">
...
<Host startStopThreads="0">
...
</Host>
...
</Engine>

需要注意的是,Engine 元素里也配置了这个参数,这意味着如果你的 Tomcat 配置了多个 Host(虚拟主机),Tomcat 会以并行的方式启动多个 Host。

参考资料

Tomcat 和 Jetty

Web 容器 Tomcat 或 Jetty,作为重要的系统中间件,连接着浏览器和你的 Web 应用,并且支撑着 Web 程序的运行,可以说,弄懂了 Tomcat 和 Jetty 的原理,Java Web 开发对你来说就毫无秘密可言

Web 容器

早期的 Web 应用主要用于浏览新闻等静态页面,HTTP 服务器(比如 Apache、Nginx)向浏览器返回静态 HTML,浏览器负责解析 HTML,将结果呈现给用户。

随着互联网的发展,我们已经不满足于仅仅浏览静态页面,还希望通过一些交互操作,来获取动态结果,因此也就需要一些扩展机制能够让 HTTP 服务器调用服务端程序。

于是 Sun 公司推出了 Servlet 技术。你可以把 Servlet 简单理解为运行在服务端的 Java 小程序,但是 Servlet 没有 main 方法,不能独立运行,因此必须把它部署到 Servlet 容器中,由容器来实例化并调用 Servlet。

而 Tomcat 和 Jetty 就是一个 Servlet 容器。为了方便使用,它们也具有 HTTP 服务器的功能,因此Tomcat 或者 Jetty 就是一个“HTTP 服务器 + Servlet 容器”,我们也叫它们 Web 容器。

其他应用服务器比如 JBoss 和 WebLogic,它们不仅仅有 Servlet 容器的功能,也包含 EJB 容器,是完整的 Java EE 应用服务器。从这个角度看,Tomcat 和 Jetty 算是一个轻量级的应用服务器。

在微服务架构日渐流行的今天,开发人员更喜欢稳定的、轻量级的应用服务器,并且应用程序用内嵌的方式来运行 Servlet 容器也逐渐流行起来。之所以选择轻量级,是因为在微服务架构下,我们把一个大而全的单体应用,拆分成一个个功能单一的微服务,在这个过程中,服务的数量必然要增加,但为了减少资源的消耗,并且降低部署的成本,我们希望运行服务的 Web 容器也是轻量级的,Web 容器本身应该消耗较少的内存和 CPU 资源,并且由应用本身来启动一个嵌入式的 Web 容器,而不是通过 Web 容器来部署和启动应用,这样可以降低应用部署的复杂度。

MQ 面试

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决异步处理、应用间耦合,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ,而部分数据库如 Redis、MySQL 以及 phxsql 也可实现消息队列的功能。

注意:_为了简便,下文中除了文章标题,一律使用 MQ 简称_。

MQ 简介

【基础】什么是 MQ?

:::details 要点

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

MQ 的数据可驻留在内存或磁盘上,直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ。

:::

【基础】为什么需要 MQ?

:::details 要点

MQ 的常见应用场景有:

  • 异步处理
  • 系统解耦
  • 流量削峰
  • 系统间通信
  • 传输缓冲
  • 最终一致性

异步处理

MQ 可以将系统间的处理流程异步化,减少等待响应的时间,从而提高整体并发吞吐量。一般,MQ 异步处理应用于非核心流程,例如:短信/邮件通知、数据推送、上报数据到监控中心、日志中心等。

假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms200ms300ms。最终总耗时为: 10ms+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。

如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。

系统解耦

通过 MQ,可以消除系统间的强耦合。它的好处在于:

  • 消息的消费者系统可以随意增加,无需修改生产者系统的代码。
  • 生产者系统、消费者系统彼此不会影响对方的流程。
    • 如果生产者系统宕机,消费者系统收不到消息,就不会有下一步的动作。
    • 如果消费者系统宕机,生产者系统让然可以正常发送消息,不影响流程。

不同系统如果要建立通信,传统的做法是:调用接口。

如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。

如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦

流量削峰

上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息缓冲池,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。

MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。

如果没有 MQ,两个系统之间通过 协商滑动窗口限流/降级/熔断 等复杂的方案也能实现 流控。但 系统复杂性 指数级增长,势必在上游或者下游做存储,并且要处理 定时拥塞 等一系列问题。而且每当有 处理能力有差距 的时候,都需要 单独 开发一套逻辑来维护这套逻辑。

假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。

如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。

系统间通信

消息队列一般都内置了 高效的通信机制,因此也可以用于单纯的 消息通讯,比如实现 点对点消息队列 或者 聊天室 等。

生产者/消费者 模式,只需要关心消息是否 送达队列,至于谁希望订阅和需要消费,是 下游 的事情,无疑极大地减少了开发和联调的工作量。

传输缓冲

(1)MQ 常被用于做海量数据的传输缓冲。

例如,Kafka 常被用于做为各种日志数据、采集数据的数据中转。然后,Kafka 将数据转发给 Logstash、Elasticsearch 中,然后基于 Elasticsearch 来做日志中心,提供检索、聚合、分析日志的能力。开发者可以通过 Kibana 集成 Elasticsearch 数据进行可视化展示,或自行进行定制化开发。

img

(2)MQ 也可以被用于流式处理。

例如,Kafka 几乎已经是流计算的数据采集端的标准组件。而流计算通过实时数据处理能力,提供了更为快捷的聚合计算能力,被大量应用于链路监控、实时监控、实时数仓、实时大屏、风控、推荐等应用领域。

最终一致性

最终一致性 不是 消息队列 的必备特性,但确实可以依靠 消息队列 来做 最终一致性 的事情。

  • 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制 实现消息 可靠发送接收、业务操作的可靠执行,要注意 消息重复幂等设计
  • 所有不保证 100% 不丢消息 的消息队列,理论上无法实现 最终一致性

Kafka 一类的设计,在设计层面上就有 丢消息 的可能(比如 定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

:::

【基础】MQ 有哪些通信模型?

:::details 要点

MQ 通信模型大致有以下类型:

  • 点对点 - 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
  • 多点广播 - MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是”多点广播”应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
  • 发布/订阅 (Publish/Subscribe) - 发布/订阅模式使消息的分发可以突破目的队列地理位置的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅模式使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。
  • 集群 (Cluster) - 为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(集群) 的解决方案。集群类似于一个域 (Domain),集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用集群 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

:::

【基础】获取 MQ 消息有哪些模式?

:::details 要点

消息引擎获取消息有两种模式:

  • push 模式 - MQ 推送数据给消费者
  • pull 模式 - 消费者主动向 MQ 请求数据

Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。

push 模式的优缺点:

  • 缺点:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。

push 模式的优缺点:

  • 优点:consumer 可以根据自己的消费能力自主的决定消费策略
  • 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达

:::

【中级】引入 MQ 带来哪些问题?

:::details 要点

任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。

MQ 主要引入了以下问题:

  • 系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的。
  • 系统复杂度提高:使用 MQ,需要关注一些新的问题:
    • 如何保证消息没有 重复消费
    • 如何处理 消息丢失 的问题?
    • 如何保证传递 消息的顺序性
    • 如何处理大量 消息积压 的问题?
  • 一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?

:::

重复消费

【中级】MQ 为什么会存在重复消费问题?

:::details 要点

重复消费问题通常不是 MQ 来处理,而是由开发来处理的。

以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。

Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。

:::

【中级】如何保证消息不被重复消费?

:::details 要点

应对重复消费问题,需要在业务层面,通过 幂等性设计 来解决。

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:

  • 如果是写关系型数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;
  • 如果是写 Redis,由于 set 操作天然具有幂等性,所以什么都不用做;
  • 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。

在实际开发中,可以参考上面的例子,结合现实场景,设计合理的幂等性方案。

:::

消息丢失

【高级】如何保证消息不丢失?

:::details 要点

要保证消息不丢失,首先要弄清楚 MQ 消息在哪些环节可能出现丢失的情况,才能对症下药。

实际上,MQ 消息在以下场景都可能会出现丢失:

  • 生产方丢失数据
  • MQ Server 丢失数据
  • 消费方丢失数据

下面以 Kafka 为例,讲解在传输的不同场景下如何保证消息不丢失。

消费方丢失数据

唯一可能导致消费方丢失数据的情况是:消费方设置了自动提交 Offset。一旦设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。

解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。

MQ Server 丢失数据

当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。

为此,一般要求至少设置 4 个参数:

  • 冗余 - 通过副本机制保证冗余。
    • 给 Topic 设置 replication.factor 参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。
    • 在 Kafka 服务端设置 min.insync.replicas 参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。
  • 强一致性 - 在 Producer 端设置 acks=all,这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了。保证强一致性需要付出一定的代价,通常只有业务场景真的需要保证万无一失才会这么设置。
  • 失败重试 - 在 Producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思),这意味着要求一旦写入失败,就无限重试,卡在这里了。

生产方丢失数据

如果按照上述的思路设置了 acks=all,生产方一定不会丢数据。

要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。

:::

消息的顺序性

【高级】如何保证消息的顺序性?

:::details 要点

要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。

方案一

一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。

方案二

  • 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。
  • 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:
    • 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;
    • 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。

:::

消息积压

【高级】如何解决消息积压?

:::details 要点

假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。

对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:

  • 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。
  • 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。
  • 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。
  • 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

:::

MQ 高可用

【高级】如何保证 MQ 的高可用?

:::details 要点

不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。

Kafka 的核心概念

了解 Kafka,必须先了解 Kafka 的核心概念:

  • Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。

  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。

  • Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

    • Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。

img

Kafka 的副本机制

Kafka 是如何实现高可用的呢?

Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。

为了实现高可用,Kafka 引入了复制功能,简单来说,就是副本机制( Replicate ):

每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。

  • Leader 处理一切对 Partition (分区)的读写请求
  • 而 Follower 只需被动的同步 Leader 上的数据

同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。

img

FAQ

问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?

答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。

Kafka 选举 Leader

由上文可知,Partition 在多个 Broker 上存在副本。

如果某个 Follower 宕机,啥事儿没有,正常工作。

如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。

:::

MQ 架构

【高级】Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

:::details 要点

ActiveMQ

ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持JMS1.1J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

(a) 主要特性

  1. 服从 JMS 规范JMS 规范提供了良好的标准和保证,包括:同步异步 的消息分发,一次和仅一次的消息分发,消息接收订阅 等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
  2. 连接灵活性ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/SIP 多播SSLTCPUDP 等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性;
  3. 支持的协议种类多OpenWireSTOMPRESTXMPPAMQP
  4. 持久化插件和安全插件ActiveMQ 提供了 多种持久化 选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行 自定义鉴权授权
  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++.NETPerlPHPPythonRuby
  6. 代理集群:多个 ActiveMQ 代理 可以组成一个 集群 来提供服务;
  7. 异常简单的管理ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以 监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者在 ActiveMQWeb Console 中使用 JMX。通过处理 JMX 的告警消息,通过使用 命令行脚本,甚至可以通过监控各种类型的 日志

(b) 部署环境

ActiveMQ 可以运行在 Java 语言所支持的平台之上。使用 ActiveMQ 需要:

  • Java JDK
  • ActiveMQ 安装包

(c) 优点

  1. 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
  2. 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
  3. 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
  4. 支持 自动重连错误重试机制
  5. 有安全机制:支持基于 shirojaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权
  6. 监控完善:拥有完善的 监控,包括 Web ConsoleJMXShell 命令行,JolokiaRESTful API
  7. 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio

(d) 缺点

  1. 社区活跃度不及 RabbitMQ 高;
  2. 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息
  3. 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
  4. 不适合用于 上千个队列 的应用场景;

RabbitMQ

RabbitMQ2007 年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

(a) 主要特性

  1. 可靠性:提供了多种技术可以让你在 性能可靠性 之间进行 权衡。这些技术包括 持久性机制投递确认发布者证实高可用性机制
  2. 灵活的路由:消息在到达队列前是通过 交换机 进行 路由 的。RabbitMQ 为典型的路由逻辑提供了 多种内置交换机 类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做 RabbitMQ插件 来使用;
  3. 消息集群:在相同局域网中的多个 RabbitMQ 服务器可以 聚合 在一起,作为一个独立的逻辑代理来使用;
  4. 队列高可用:队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全
  5. 支持多种协议:支持 多种消息队列协议
  6. 支持多种语言:用 Erlang 语言编写,支持只要是你能想到的 所有编程语言
  7. 管理界面RabbitMQ 有一个易用的 用户界面,使得用户可以 监控管理 消息 Broker 的许多方面;
  8. 跟踪机制:如果 消息异常RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
  9. 插件机制:提供了许多 插件,来从多方面进行扩展,也可以编写自己的插件。

(b) 部署环境

RabbitMQ 可以运行在 Erlang 语言所支持的平台之上,包括 SolarisBSDLinuxMacOSXTRU64Windows 等。使用 RabbitMQ 需要:

  • ErLang 语言包
  • RabbitMQ 安装包

(c) 优点

  1. 由于 Erlang 语言的特性,消息队列性能较好,支持 高并发
  2. 健壮、稳定、易用、跨平台、支持 多种语言、文档齐全;
  3. 有消息 确认机制持久化机制,可靠性高;
  4. 高度可定制的 路由
  5. 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

(d) 缺点

  1. 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做 二次开发和维护
  2. 实现了 代理架构,意味着消息在发送到客户端之前可以在 中央节点 上排队。此特性使得 RabbitMQ 易于使用和部署,但是使得其 运行速度较慢,因为中央节点 增加了延迟消息封装后 也比较大;
  3. 需要学习 比较复杂接口和协议,学习和维护成本较高。

RocketMQ

RocketMQ 出自 阿里 的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上Kafka 更好。RocketMQ 在阿里内部  被广泛应用在 订单交易充值流计算消息推送日志流式处理binglog 分发 等场景。

(a) 主要特性

  1. 基于 队列模型:具有 高性能高可靠高实时分布式 等特点;
  2. ProducerConsumer队列 都支持 分布式
  3. Producer 向一些队列轮流发送消息,队列集合 称为 TopicConsumer 如果做 广播消费,则一个 Consumer 实例消费这个 Topic 对应的 所有队列;如果做 集群消费,则 多个 Consumer 实例 平均消费 这个 Topic 对应的队列集合;
  4. 能够保证 严格的消息顺序
  5. 提供丰富的 消息拉取模式
  6. 高效的订阅者 水平扩展能力;
  7. 实时消息订阅机制
  8. 亿级 消息堆积 能力;
  9. 较少的外部依赖。

(b) 部署环境

RocketMQ 可以运行在 Java 语言所支持的平台之上。使用 RocketMQ 需要:

  • Java JDK
  • 安装 gitMaven
  • RocketMQ 安装包

(c) 优点

  1. 单机 支持 1 万以上 持久化队列
  2. RocketMQ 的所有消息都是 持久化的,先写入系统 PAGECACHE,然后 刷盘,可以保证 内存磁盘 都有一份数据,而 访问 时,直接 从内存读取
  3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);
  4. 性能非常好,可以允许 大量堆积消息Broker 中;
  5. 支持 多种消费模式,包括 集群消费广播消费等;
  6. 各个环节 分布式扩展设计,支持 主从高可用
  7. 开发度较活跃,版本更新很快。

(d) 缺点

  1. 支持的 客户端语言 不多,目前是 JavaC++,其中 C++ 还不成熟;
  2. RocketMQ 社区关注度及成熟度也不及前两者;
  3. 没有 Web 管理界面,提供了一个 CLI (命令行界面) 管理工具带来 查询管理诊断各种问题
  4. 没有在 MQ 核心里实现 JMS 等接口;

Kafka

Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效可扩展良好 并且 可持久化。它的 分区特性可复制可容错 都是其不错的特性。

(a) 主要特性

  1. 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化
  2. 高吞吐:在一台普通的服务器上既可以达到 10W/s吞吐速率
  3. 完全的分布式系统BrokerProducerConsumer 都原生自动支持 分布式,自动实现 负载均衡
  4. 支持 同步异步 复制两种 高可用机制
  5. 支持 数据批量发送拉取
  6. **零拷贝技术(zero-copy)**:减少 IO 操作步骤,提高 系统吞吐量
  7. 数据迁移扩容 对用户透明;
  8. 无需停机 即可扩展机器;
  9. 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;

(b) 部署环境

使用 Kafka 需要:

  • Java JDK
  • Kafka 安装包

(c) 优点

  1. 客户端语言丰富:支持 Java.NetPHPRubyPythonGo 等多种语言;
  2. 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
  3. 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性可靠性,理论上支持 消息无限堆积
  4. 支持批量操作;
  5. 消费者 采用 Pull 方式获取消息。消息有序通过控制 能够保证所有消息被消费且仅被消费 一次
  6. 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager
  7. 日志领域 比较成熟,被多家公司和多个开源项目使用。

(d) 缺点

  1. Kafka 单机超过 64队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长
  2. 使用 短轮询方式实时性 取决于 轮询间隔时间
  3. 消费失败 不支持重试
  4. 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序
  5. 社区更新较慢。

技术选型

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

:::

【高级】什么是 JMS?

:::details 要点

提到 MQ,就顺便提一下 JMS 。

JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在 EJB 架构中,有消息 bean 可以无缝的与 JMS 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

JMS 消息模型

在 JMS 标准中,有两种消息模型:

  • P2P(Point to Point)
  • Pub/Sub(Publish/Subscribe)
P2P 模式

P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P 的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。

Pub/sub 模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。

Pub/Sub 的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 消息消费

在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。

  • 同步 - 订阅者或接收者通过 receive 方法来接收消息,receive 方法在接收到消息之前(或超时之前)将一直阻塞;
  • 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。

JNDI - Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。

:::

参考资料

RocketMQ 快速入门

Apache RocketMQ 是一个分布式 MQ 和流处理平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。

RocketMQ 由阿里巴巴孵化,被捐赠给 Apache,成为 Apache 的顶级项目。

RocketMQ 概念

img

消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

消息还可以具有可选 Tag 和额外的键值对。例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。

标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Tag 相当于子主题,为用户提供了额外的灵活性。对于 Tag,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的 Tag。

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名称服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名称服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

警告:考虑到提供的 Producer 在发送消息方面足够强大,每个 Producer 组只允许一个实例,以避免不必要的生成器实例初始化

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)

Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。

集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

RocketMQ 特性

订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

RocketMQ 组件

img

RocketMQ 由四部分组成:NameServer、Broker、Producer、Consumer。其中任意一个组成都可以水平扩展为集群模式,以避免单点故障问题。

NameServer(命名服务器)

NameServer 是一个 Topic 路由注册中心,其角色类似 Kafka 中的 zookeeper,支持 Broker 的动态注册与发现。每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 主要包括两个功能:

  • Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
  • 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由的信息。

NameServer 是一个功能齐全的服务器,主要包括两个功能:

  1. Broker 管理 - NameServer 接受来自 Broker 集群的注册,并提供心跳机制来检查 Broker 节点是否存活。
  2. 路由管理 - 每个 NameServer 将保存有关 Broker 集群的完整路由信息和客户端查询的查询队列。

RocketMQ 客户端(Producer/Consumer)将从 NameServer 查询队列路由信息。

将 NameServer 地址列表提供给客户端有四种方法:

  1. 编程方式 - 类似:producer.setNamesrvAddr("ip:port")
  2. Java 选项 - 使用 rocketmq.namesrv.addr 参数
  3. 环境变量 - 设置环境变量 NAMESRV_ADDR
  4. HTTP 端点

更详细信息可以参考官方文档:here

Broker(代理)

Broker 主要负责消息的存储、投递和查询以及服务高可用保证。

Broker 同时支持推拉模型,包含容错机制(2 副本或 3 副本),并提供强大的峰值填充和按原始时间顺序累积数千亿消息的能力。此外,Broker 提供了灾难恢复、丰富的指标统计和警报机制,这些都是传统 MQ 所缺乏的。

为了实现这些功能,Broker 包含了以下几个重要子模块:

  • Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。
  • Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
  • Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

img

Producer(生产者)

Producers 支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

Consumer(消费者)

Consumer 支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ 安装

环境要求

  • 推荐 64 位操作系统:Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git

下载解压

进入官方下载地址:https://rocketmq.apache.org/dowloading/releases/,选择合适版本

建议选择 binary 版本。

解压到本地:

1
2
> unzip rocketmq-all-4.2.0-source-release.zip
> cd rocketmq-all-4.2.0/

启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

1
2
3
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

收发消息

执行收发消息操作之前,不许告诉客户端命名服务器的位置。在 RocketMQ 中有多种方法来实现这个目的。这里,我们使用最简单的方法——设置环境变量 NAMESRV_ADDR

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

关闭服务器

1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMQ 入门级示例

首先在项目中引入 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>

Producer

Producer 在 RocketMQ 中负责发送消息。

RocketMQ 有三种消息发送方式:

  • 可靠的同步发送
  • 可靠的异步发送
  • 单项发送

可靠的同步发送

可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

可靠的异步发送

异步传输通常用于响应时间敏感的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

单向传输

单向传输用于需要中等可靠性的情况,例如日志收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);

}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

Consumer

Consumer 在 RocketMQ 中负责接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr(RocketConfig.HOST);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

RocketMQ 官方示例

参考资料