Dunwu Blog

大道至简,知易行难

Redis 脚本

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。

关键词:Lua

为什么使用 Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

在 Redis 中,执行单一命令是原子性操作,所以不会出现并发问题。但有的业务场景下,需要执行多个命令,同时确保不出现并发问题,这就需要用到 Lua 脚本了。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。

由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

参考资料

Markdown 极简教程

目录

标题

Markdown 支持六个级别的标题。

1
2
3
4
5
6
7
语法:
# 一级标题
## 二级标题
### 三级标题
#### 四级标题
##### 五级标题
###### 六级标题

文本样式

:bulb: 粗体、斜体、删除线可以混合使用。

在 Markdown 中,粗体文本、斜体文本可以使用 *_ 符号标记。建议统一风格,始终只用一种符号。

语法 效果
普通文本 普通文本
*斜体文本* _斜体文本_ 斜体文本 斜体文本
**粗体文本** __粗体文本__ 粗体文本 粗体文本
~~删除文本~~ 删除文本
***粗斜体文本*** ___粗斜体文本___ 粗斜体文本 粗斜体文本

列表

无序列表

  • RED
  • YELLOW
  • BLUE

有序列表

  1. 第一步
  2. 第二步
  3. 第三步

任务列表

  • 完成任务
  • 计划任务

多级列表

  • 数据结构
    • 线性表
      • 顺序表
      • 链表
        • 单链表
        • 双链表
      • 二叉树
        • 二叉平衡树

分割线

***---___ 都可以作为分割线。




链接

普通链接

语法:

1
[钝悟的博客](https://dunwu.github.io/waterdrop/)
  • [] 中标记链接名。类似 HTML 中 <a> 元素的 title 属性。
  • () 中标记链接的 url,也支持相对路径(前提是资源可以访问)。类似 HTML 中 <a> 元素的 href 属性。

效果:

图片

Markdown 引用图片的语法:

1
![alt](url title)

alt 和 title 即对应 HTML 中 img 元素的 alt 和 title 属性(都可省略):

  • alt - 表示图片显示失败时的替换文本。

  • title - 表示鼠标悬停在图片时的显示文本(注意这里要加引号)

  • url - 即图片的 url 地址

logo

图片链接

可以将图片和链接混合使用。

logo

锚点

其实呢,每一个标题都是一个锚点,和 HTML 的锚点(#)类似,比如:回到顶部

引用

普通引用:

:question: 什么是 Markdown

Markdown是一种轻量级标记语言,创始人为约翰·格鲁伯(英语:John Gruber)。它允许人们“使用易读易写的纯文本格式编写文档,然后转换成有效的XHTML(或者HTML)文档”。[4]这种语言吸收了很多在电子邮件中已有的纯文本标记的特性。 —— 摘自 Wiki

嵌套引用:

数据结构

二叉树

平衡二叉树

满二叉树

代码高亮

标签

语法:

1
`Markdown` `Doc`

效果:

Markdown, Doc

代码块

语法一:在文本前后都使用三个反引号进行标记。【✔️️️️ 推荐】

1
2
3
这是一个文本块。
这是一个文本块。
这是一个文本块。

语法二:在连续几行的文本开头加入 1 个 Tab 或者 4 个空格。【❌ 不推荐】

这是一个文本块。
这是一个文本块。
这是一个文本块。

语法

在三个反引号后面加上编程语言的名字,另起一行开始写代码,最后一行再加上三个反引号。

1
public static void main(String[]args){} //Java
1
int main(int argc, char *argv[]) //C
1
echo "hello GitHub" #Bash
1
document.getElementById('myH1').innerHTML = 'Welcome to my Homepage' //javascipt
1
string &operator+(const string& A,const string& B) //cpp

表格

一般表格:

表头 1 表头 2
表格单元 表格单元
表格单元 表格单元

表格可以指定对齐方式:

序号 商品 价格
1 电脑 6000.0
2 鼠标 100.0
3 键盘 200.0

Emoji 表情

:bulb: 注意:部分 Markdown 引擎支持 Emoji。

合理使用 Emoji 表情,往往可以使得文章内容更加丰富生动。例如::heavy_check_mark: :x: :bulb: :bell: :heavy_exclamation_mark: :question:

更多 Emoji 表情请参考:

注脚

:bulb: 注意:部分 Markdown 引擎支持注脚。

一个具有注脚的文本。^1

数学公式

:bulb: 注意:部分 Markdown 引擎支持 Latex。

很多文档中,需要引入一些数学符号、特殊符号,其排版问题比较头疼。这种问题,可以用 Latex 来解决,大部分 Markdown 引擎都支持 Latex。

Latex 可以使用 $ 符号来标记 Latex 表达式,下面是一个数学公式示例:

$$
\Gamma(z) = \int_0^\infty t^{z-1}e^{-t}dt,.
$$

列举一些常用数学符号:

符号 语法 描述
$\leq$ $\leq$ 小于等于
$\geq$ $\geq$ 大于等于
$\neq$ $\neq$ 不等于
$\approx$ $\approx$ 约等于
$\infty$ $\infty$ 无穷
$\prod_{x}^{y}$ $\prod_{x}^{y}$ 累乘
$\sum_{i=0}^n$ $\sum_{i=0}^n$ 求和
$\int$ $\int$ 积分
$\iint$ $\iint$ 双重积分
$\log_x{y}$ $\log_x{y}$ 对数
$x^{y+1}$ $x^{y+1}$ 上标
$x_{y+1}$ $x_{y+1}$ 下标
$\frac{x}{y}$ $\frac{x}{y}$ 分数
$\sqrt[y]{x}$ $\sqrt[y]{x}$ 开方
$\sin$ $\sin$ 正弦
$\cos$ $\cos$ 余弦
$\tan$ $\tan$ 正切

更多数学符号支持请参考:

Diff

:bulb: 注意:部分 Markdown 引擎支持 Diff。

版本控制的系统中都少不了 diff 的功能,即展示一个文件内容的增加与删除。
GFM 中可以显示的展示 diff 效果。可以用 + 开头表示新增,- 开头表示删除。

1
2
+ 新增内容
- 删除内容

UML 图

💡 注意:部分 Markdown 引擎支持 mermaid

mermaid 提供了多种 UML 图。详情请参考:mermaid 文档

流程图

1
2
3
4
5
graph LR
A[Hard edge] -->|Link text| B(Round edge)
B --> C{Decision}
C -->|One| D[Result one]
C -->|Two| E[Result two]

时序图

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
Alice->>Bob: Hello Bob, how are you?
alt is sick
Bob->>Alice: Not so good :(
else is well
Bob->>Alice: Feeling fresh like a daisy
end
opt Extra response
Bob->>Alice: Thanks for asking
end

甘特图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gantt
dateFormat YYYY-MM-DD
title Adding GANTT diagram functionality to mermaid

section A section
Completed task :done, des1, 2014-01-06,2014-01-08
Active task :active, des2, 2014-01-09, 3d
Future task : des3, after des2, 5d
Future task2 : des4, after des3, 5d

section Critical tasks
Completed task in the critical line :crit, done, 2014-01-06,24h
Implement parser and jison :crit, done, after des1, 2d
Create tests for parser :crit, active, 3d
Future task in critical line :crit, 5d
Create tests for renderer :2d
Add to mermaid :1d

section Documentation
Describe gantt syntax :active, a1, after des1, 3d
Add gantt diagram to demo page :after a1 , 20h
Add another diagram to demo page :doc1, after a1 , 48h

section Last section
Describe gantt syntax :after doc1, 3d
Add gantt diagram to demo page :20h
Add another diagram to demo page :48h

HTML

有些 Markdown 引擎支持在文档中嵌入的 html 元素。

有些 Markdown 语法所不支持的特性,可以使用 html 元素来支持。

折叠

折叠内容一

展开才能看到的内容

折叠内容二

展开才能看到的内容

居中

居中显示的文本

图片尺寸

编辑器

推荐 Markdown 编辑器

  • Typora - 个人认为是功能最强的 Markdown 编辑器。
  • Visual Studio Code - 可以通过安装插件,量身打造 Markdown 编辑器。
  • marktext - 一款简单优雅的 Markdown 编辑器。
  • StackEdit - 在线 Markdown 编辑器。
  • Editor.md - 在线 Markdown 编辑器。
  • Marxico - 一款专为印象笔记(Evernote)打造的 Markdown 编辑器。

想了解更多 Markdown 编辑器可以参考:主流 Markdown 编辑器推荐

参考资料

流量控制

在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。

流量控制简介

什么是流量控制

流量控制(Flow Control),根据流量、并发线程数、响应时间等指标,把随机到来的流量调整成合适的形状,即流量塑形。避免应用被瞬时的流量高峰冲垮,从而保障应用的高可用性。

为什么需要流量控制

复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。

例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:

99.9930 = 99.7% 的正常运行时间

10 亿个请求中的 0.3%= 3,000,000 个失败

即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。

然而,现实情况一般比这种估量情况更糟糕。


当一切正常时,整体系统如下所示:

img

图片来自 Hystrix Wiki

在分布式系统架构下,这些强依赖的子服务稳定与否对系统的影响非常大。但是,依赖的子服务可能有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖服务 I 出现不可用,但是其他依赖服务是可用的。

img

图片来自 Hystrix Wiki

当流量很大的情况下,某个依赖的阻塞,会导致上游服务请求被阻塞。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图。这种情况若持续恶化,如果上游服务本身还被其他服务所依赖,就可能出现多米洛骨牌效应,导致多个服务都无法正常工作。

img

图片来自 Hystrix Wiki

流量控制有哪些保护机制

流量控制常见的手段就是限流、熔断、降级。

什么是降级?

降级是保障服务能够稳定运行的一种保护方式:面对突增的流量,牺牲一些吞吐量以换取系统的稳定。常见的降级实现方式有:开关降级、限流降级、熔断降级。

什么是限流?

限流一般针对下游服务,当上游流量较大时,避免被上游服务的请求撑爆。

限流就是限制系统的输入和输出流量,以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。

什么是熔断?

熔断一般针对上游服务,当下游服务超时/异常较多时,避免被下游服务拖垮。

当调用链路中某个资源出现不稳定,例如,超时异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

熔断尽最大的可能去完成所有的请求,容忍一些失败,熔断也能自动恢复。熔断的常见策略有:

  • 在每秒请求异常数超过多少时触发熔断降级
  • 在每秒请求异常错误率超过多少时触发熔断降级
  • 在每秒请求平均耗时超过多少时触发熔断降级

流量控制有哪些衡量指标

流量控制有以下几个角度:

  • 流量指标,例如 QPS、并发线程数等。
  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系,调用来源等。
  • 控制效果,例如排队等待、直接拒绝、Warm Up(预热)等。

限流算法

限流的本质是:在一定的时间范围内,限制某一个资源被访问的频率。如何去限制流量,就需要采用一定的策略,即限流算法。常见的限流算法有:固定窗口限流算法、滑动窗口限流算法、漏桶限流算法、令牌桶限流算法。

下面,将对这几种算法进行一一介绍。

固定窗口限流算法

固定窗口限流算法的原理

固定窗口限流算法的基本策略是:

  1. 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
  2. 为每个固定时间窗口设置一个计数器,用于统计请求数;
  3. 一旦请求数超过最大请求数,则请求会被拦截。

img

固定窗口限流算法的利弊

固定窗口限流算法的优点是:实现简单。

固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。

img

固定窗口限流算法的实现

:::details Java 版本的固定窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

滑动窗口限流算法

滑动窗口限流算法的原理

滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。

滑动窗口限流算法的基本策略是:

  • 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
  • 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
  • 要保证所有子窗口的统计数之和不能超过阈值。

滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。

img

滑动窗口限流算法的利弊

滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。

滑动窗口限流算法的缺点是:

  • 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
  • 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。

滑动窗口限流算法的实现

:::details Java 版本的滑动窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

漏桶限流算法

漏桶限流算法的原理

漏桶限流算法的基本策略是:

  • 水(请求)以任意速率由入口进入到漏桶中;
  • 水以固定的速率由出口出水(请求通过);
  • 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。

img

漏桶限流算法的利弊

漏桶限流算法的优点是:流量速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。

漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。

漏桶策略适用于间隔性突发流量且流量不用即时处理的场景

漏桶限流算法的实现

:::details Java 版本的漏桶限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final int qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 计算的起始时间
*/
private long beginTimeMillis;

/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);

public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}

@Override
public synchronized boolean tryAcquire(int permits) {

// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}

// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));

// 重置时间
beginTimeMillis = System.currentTimeMillis();

if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

令牌桶限流算法

令牌桶限流算法的原理

img

令牌桶算法的原理

  1. 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
  2. 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
  3. 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理

令牌桶限流算法的利弊

因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。

规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。

令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景

令牌桶限流算法的实现

:::details Java 实现令牌桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.atomic.AtomicLong;

/**
* 令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final long qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 上一次令牌发放时间
*/
private long endTimeMillis;

/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);

public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}

@Override
public synchronized boolean tryAcquire(int permits) {

long now = System.currentTimeMillis();
long gap = now - endTimeMillis;

// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));

if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}

}

:::

扩展

Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法

限流算法测试

:::details 限流算法测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class RateLimiterDemo {

public static void main(String[] args) {

// ============================================================================

int qps = 20;

System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);

System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);

System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);

System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}

private static void testRateLimit(RateLimiter rateLimiter, int qps) {

AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();

int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}

try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}

private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}

}

:::

限流框架 - Hystrix

Hystrix 是由 Netflix 开源,用于处理分布式系统的延迟和容错的一个开源组件。在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix 采用断路器模式来实现服务间的彼此隔离,从而避免级联故障,以提高分布式系统整体的弹性。

“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。

Hystrix 官方已宣布不再发布新版本。但是,Hystrix 的断路器设计理念,有非常高的学习价值。

如果使用 Hystrix 对每个基础依赖服务进行过载保护,则整个系统架构将会类似下图所示,每个依赖项彼此隔离,受到延迟时发生饱和的资源的被限制访问,并包含 fallback 逻辑(用于降级处理),该逻辑决定了在依赖项中发生任何类型的故障时做出对应的处理。

img

Hystrix 原理

如下图所示,Hystrix 的工作流程大致可以分为 9 个步骤。

img

(一)构建一个 HystrixCommand 或 HystrixObservableCommand 对象

Hystrix 进行资源隔离,其实是提供了一个抽象,叫做命令模式。这也是 Hystrix 最基本的资源隔离技术

在使用 Hystrix 的过程中,会对依赖服务的调用请求封装成命令对象,Hystrix 对 命令对象抽象了两个抽象类:HystrixCommandHystrixObservableCommand

  • HystrixCommand 表示的命令对象会返回一个唯一返回值。
  • HystrixObservableCommand 表示的命令对象会返回多个返回值。
1
2
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

(二)执行命令

Hystrix 中共有 4 种方式执行命令,如下所示:

执行方式 说明 可用对象
execute() 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
queue() 异步执行,通过 Future 返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
observe() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。代调用代码先执行 (Hot Obserable) HystrixObservableCommand
toObservable() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。执行代码等到真正订阅的时候才会执行 (cold observable) HystrixObservableCommand

这四种命令中,exeucte()queue()observe() 的表示其实是通过 toObservable() 实现的,其转换关系如下图所示:

img

HystrixCommand 执行方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
K value   = command.execute();
// 等价语句:
K value = command.execute().queue().get();

Future<K> fValue = command.queue();
//等价语句:
Future<K> fValue = command.toObservable().toBlocking().toFuture();

Observable<K> ohValue = command.observe(); //hot observable,立刻订阅,命令立刻执行
//等价语句:
Observable<K> ohValue = command.toObservable().subscribe(subject);

// 上述执行最终实现还是基于 toObservable()
Observable<K> ocValue = command.toObservable(); //cold observable,延后订阅,订阅发生后,执行才真正执行

(三)是否缓存

如果当前命令对象启用了请求缓存,并且请求的响应存在于缓存中,则缓存的响应会立刻以 Observable 的形式返回。

(四)是否开启断路器

如果第三步没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则 Hystrix 将不会执行此 Command 命令,直接执行步骤 8 调用 Fallback;

如果断路器状态是关闭,则执行步骤 5 检查是否有足够的资源运行 Command 命令

(五)信号量、线程池是否拒绝

当您执行该命令时,Hystrix 会检查断路器以查看电路是否打开。

如果电路开路(或“跳闸”),则 Hystrix 将不会执行该命令,而是将流程路由到 (8) 获取回退。

如果电路闭合,则流程前进至 (5) 以检查是否有可用容量来运行命令。

如果当前要执行的 Command 命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix 将不会运行 Command 命令,直接执行 步骤 8 的 Fallback 降级处理;如果未满,表示有剩余的资源执行 Command 命令,则执行步骤 6

(六)construct()run()

当经过步骤 5 判断,有足够的资源执行 Command 命令时,本步骤将调用 Command 命令运行方法,基于不同类型的 Command,有如下两种两种运行方式:

运行方式 说明
HystrixCommand.run() 返回一个处理结果或者抛出一个异常
HystrixObservableCommand.construct() 返回一个 Observable 表示的结果(可能多个),或者 基于onError的错误通知

如果run() 或者construct()方法 的真实执行时间超过了 Command 设置的超时时间阈值, 则当前则执行线程(或者是独立的定时器线程)将会抛出TimeoutException。抛出超时异常 TimeoutException,后,将执行**步骤 8 **的 Fallback 降级处理。即使run()或者construct()执行没有被取消或中断,最终能够处理返回结果,但在降级处理逻辑中,将会抛弃run()construct()方法的返回结果,而返回 Fallback 降级处理结果。

注意事项
需要注意的是,Hystrix 无法强制 将正在运行的线程停止掉–Hystrix 能够做的最好的方式就是在 JVM 中抛出一个InterruptedException。如果 Hystrix 包装的工作不抛出中断异常InterruptedException, 则在 Hystrix 线程池中的线程将会继续执行,尽管调用的客户端已经接收到了TimeoutException。这种方式会使 Hystrix 的线程池处于饱和状态。大部分的 Java Http Client 开源库并不会解析 InterruptedException。所以确认 HTTP client 相关的连接和读/写相关的超时时间设置。
如果 Command 命令没有抛出任何异常,并且有返回结果,则 Hystrix 将会在做完日志记录和统计之后会将结果返回。 如果是通过run()方式运行,则返回一个Obserable对象,包含一个唯一值,并且发送一个onCompleted通知;如果是通过consturct()方式运行 ,则返回一个Observable 对象

(七)健康检查

Hystrix 会统计 Command 命令执行执行过程中的成功数失败数拒绝数超时数, 将这些信息记录到断路器 (Circuit Breaker) 中。断路器将上述统计按照时间窗的形式记录到一个定长数组中。断路器根据时间窗内的统计数据去判定请求什么时候可以被熔断,熔断后,在接下来一段恢复周期内,相同的请求过来后会直接被熔断。当再次校验,如果健康监测通过后,熔断开关将会被关闭。

(八)获取 Fallback

当以下场景出现后,Hystrix 将会尝试触发 Fallback:

  • 步骤 6 Command 执行时抛出了任何异常;
  • 步骤 4 断路器已经被打开
  • 步骤 5 执行命令的线程池、队列或者信号量资源已满
  • 命令执行的时间超过阈值

(九)返回结果

如果 Hystrix 命令对象执行成功,将会返回结果,或者以Observable形式包装的结果。根据**步骤 2 **的 command 调用方式,返回的Observable 会按照如下图说是的转换关系进行返回:

img

  • execute() — 用和 .queue() 相同的方式获取 Future,然后调用 Futureget() 以获取 Observable 的单个值。
  • queue() —将 Observable 转换为 BlockingObservable,以便可以将其转换为 Future 并返回。
  • watch() —订阅 Observable 并开始执行命令的流程; 返回一个 Observable,当订阅该 Observable 时,它会重新通知。
  • toObservable() —返回不变的 Observable; 必须订阅它才能真正开始执行命令的流程。

断路器工作原理

img

  1. 断路器时间窗内的请求数是否超过了请求数断路器生效阈值circuitBreaker.requestVolumeThreshold,如果超过了阈值,则将会触发断路,断路状态为开启
    例如,如果当前阈值设置的是20,则当时间窗内统计的请求数共计 19 个,即使 19 个全部失败了,都不会触发断路器。
  2. 并且请求错误率超过了请求错误率阈值errorThresholdPercentage
  3. 如果两个都满足,则将断路器由关闭迁移到开启
  4. 如果断路器开启,则后续的所有相同请求将会被断路掉;
  5. 直到过了沉睡时间窗sleepWindowInMilliseconds后,再发起请求时,允许其通过(此时的状态为半开起状态)。如果请求失败了,则保持断路器状态为开启状态,并更新沉睡时间窗。如果请求成功了,则将断路器状态改为关闭状态;

核心的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}

限流框架 - Sentinel

其他限流解决方案

Guava RateLimiter

Guava 是 Google 开源的 Java 类库,提供了一个工具类 RateLimiter,它基于令牌桶算法实现了本地限流器。

:::details RateLimiter 限流示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 限流器流速:2 个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
// 执行任务的线程池
ExecutorService es = Executors.newFixedThreadPool(1);
// 记录上一次执行时间
prev = System.nanoTime();
// 测试执行 20 次
for (int i = 0; i < 20; i++) {
// 限流器限流
limiter.acquire();
// 提交任务异步执行
es.execute(() -> {
long cur = System.nanoTime();
// 打印时间间隔:毫秒
System.out.println((cur - prev) / 1000000);
prev = cur;
});
}

// 输出结果:
// ...
// 500
// 499
// 500
// 499

:::

Redis + Lua

如果想要针对分布式系统资源进行限流,则必须具备两个要素:

  1. 对于资源的访问统计,必须是所有分布式节点都可以共享访问的数据存储;并且,由于在高并发场景下,读写访问统计数据会很频繁,该数据存储必须有很高的读写性能。
  2. 访问统计、限流计算都以原子操作方式进行。

满足以上要素的一种简单解决方案是,采用 Redis + Lua 来实现,原因在于:

  • Redis 数据库的读写性能极高;
  • Redis 支持以原子操作的方式执行 Lua 脚本。

Redis + Lua 实现的固定窗口限流算法

Redis + Lua 实现的固定窗口限流算法实现思路:

  • 根据实际需要,将当前时间格式化为天(yyyyMMdd)、时(yyyyMMddHH)、分(yyyyMMddHHmm)、秒(yyyyMMddHHmmss),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量;
  • 用于代表不同粒度的时间窗口按需设置过期时间;
  • 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。

:::details Redis + Lua 实现的固定窗口限流算法

下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。

限流脚本 fixed_window_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 缓存 Key
local key = KEYS[1]
-- 访问请求数
local permits = tonumber(ARGV[1])
-- 过期时间
local seconds = tonumber(ARGV[2])
-- 限流阈值
local limit = tonumber(ARGV[3])

-- 获取统计值
local count = tonumber(redis.call('GET', key) or "0")

if count + permits > limit then
-- 触发限流
return 0
else
redis.call('INCRBY', key, permits)
redis.call('EXPIRE', key, seconds)
return count + permits
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class RedisFixedWindowRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long maxPermits;
private final long periodSeconds;
private final String key;

public RedisFixedWindowRateLimiter(long qps, String key) {
this(qps * 60, 60, TimeUnit.SECONDS, key);
}

public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
this.maxPermits = maxPermits;
this.periodSeconds = timeUnit.toSeconds(period);
this.key = key;
}

@Override
public boolean tryAcquire(int permits) {
List<String> keys = Collections.singletonList(key);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
String.valueOf(maxPermits));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
int num = RandomUtil.randomInt(qps, 100);
for (int second = 0; second < seconds; second++) {
for (int i = 0; i < num; i++) {
total++;
if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

Redis + Lua 实现的令牌桶限流算法

:::details Redis + Lua 实现的令牌桶限流算法

限流脚本 token_bucket_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
local tokenKey = KEYS[1]
local timeKey = KEYS[2]

-- 申请令牌数
local permits = tonumber(ARGV[1])
-- QPS
local qps = tonumber(ARGV[2])
-- 桶的容量
local capacity = tonumber(ARGV[3])
-- 当前时间(单位:毫秒)
local nowMillis = tonumber(ARGV[4])
-- 填满令牌桶所需要的时间
local fillTime = capacity / qps
local ttl = math.min(capacity, math.floor(fillTime * 2))

local currentTokenNum = tonumber(redis.call("GET", tokenKey))
if currentTokenNum == nil then
currentTokenNum = capacity
end

local endTimeMillis = tonumber(redis.call("GET", timeKey))
if endTimeMillis == nil then
endTimeMillis = 0
end

local gap = nowMillis - endTimeMillis
local newTokenNum = math.max(0, gap * qps / 1000)
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)

if currentTokenNum < permits then
-- 请求拒绝
return -1
else
-- 请求通过
local finalTokenNum = currentTokenNum - permits
redis.call("SETEX", tokenKey, ttl, finalTokenNum)
redis.call("SETEX", timeKey, ttl, nowMillis)
return finalTokenNum
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package io.github.dunwu.distributed.ratelimit;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 基于 Redis + Lua 实现的令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-23
*/
public class RedisTokenBucketRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long qps;
private final long capacity;
private final String tokenKey;
private final String timeKey;

public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
this.qps = qps;
this.capacity = capacity;
this.tokenKey = tokenKey;
this.timeKey = timeKey;
}

@Override
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
String.valueOf(capacity), String.valueOf(now));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
int bucket = 100;
RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");

// 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
redisTokenBucketRateLimiter.tryAcquire(bucket);
TimeUnit.SECONDS.sleep(1);

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
for (int second = 0; second < seconds; second++) {
int num = RandomUtil.randomInt(qps, 100);
for (int i = 0; i < num; i++) {
total++;
if (redisTokenBucketRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

参考资料

Java 容器之 Map

Map 简介

Map 架构

Map 家族主要成员功能如下:

  • Map 是 Map 容器家族的祖先,Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。
  • AbstractMap 继承了 Map 的抽象类,它实现了 Map 中的核心 API。其它 Map 的实现类可以通过继承 AbstractMap 来减少重复编码。
  • SortedMap 继承了 Map 的接口。SortedMap 中的内容是排序的键值对,排序的方法是通过实现比较器(Comparator)完成的。
  • NavigableMap 继承了 SortedMap 的接口。相比于 SortedMapNavigableMap 有一系列的“导航”方法;如”获取大于/等于某对象的键值对”、“获取小于/等于某对象的键值对”等等。
  • HashMap 继承了 AbstractMap,但没实现 NavigableMap 接口。HashMap 的主要作用是储存无序的键值对,而 Hash 也体现了它的查找效率很高。HashMap 是使用最广泛的 Map
  • Hashtable 虽然没有继承 AbstractMap,但它继承了 DictionaryDictionary 也是键值对的接口),而且也实现 Map 接口。因此,Hashtable 的主要作用是储存无序的键值对。和 HashMap 相比,Hashtable 在它的主要方法中使用 synchronized 关键字修饰,来保证线程安全。但是,由于它的锁粒度太大,非常影响读写速度,所以,现代 Java 程序几乎不会使用 Hashtable ,如果需要保证线程安全,一般会用 ConcurrentHashMap 来替代。
  • TreeMap 继承了 AbstractMap,且实现了 NavigableMap 接口。TreeMap 的主要作用是储存有序的键值对,排序依据根据元素类型的 Comparator 而定。
  • WeakHashMap 继承了 AbstractMapWeakHashMap 的键是弱引用 (即 WeakReference),它的主要作用是当 GC 内存不足时,会自动将 WeakHashMap 中的 key 回收,这避免了 WeakHashMap 的内存空间无限膨胀。很明显,WeakHashMap 适用于作为缓存。

Map 接口

Map 的定义如下:

1
public interface Map<K,V> { }

Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。

Map 接口提供三种 Collection 视图,允许以键集值集键-值映射关系集的形式访问数据。

Map 有些实现类,可以有序的保存元素,如 TreeMap;另一些实现类则不保证顺序,如 HashMap 类。

Map 的实现类应该提供 2 个“标准的”构造方法:

  • void(无参数)构造方法,用于创建空 Map;
  • 带有单个 Map 类型参数的构造方法,用于创建一个与其参数具有相同键-值映射关系的新 Map。

实际上,后一个构造方法允许用户复制任意 Map,生成所需类的一个等价 Map。尽管无法强制执行此建议(因为接口不能包含构造方法),但是 JDK 中所有通用的 Map 实现都遵从它。

Map.Entry 接口

Map.Entry 一般用于通过迭代器(Iterator)访问问 Map

Map.Entry 是 Map 中内部的一个接口,Map.Entry 代表了 键值对 实体,Map 通过 entrySet() 获取 Map.Entry 集合,从而通过该集合实现对键值对的操作。

AbstractMap 抽象类

AbstractMap 的定义如下:

1
public abstract class AbstractMap<K,V> implements Map<K,V> {}

AbstractMap 抽象类提供了 Map 接口的核心实现,以最大限度地减少实现 Map 接口所需的工作。

要实现不可修改的 Map,编程人员只需扩展此类并提供 entrySet() 方法的实现即可,该方法将返回 Map 的映射关系 Set 视图。通常,返回的 set 将依次在 AbstractSet 上实现。此 Set 不支持 add()remove() 方法,其迭代器也不支持 remove() 方法。

要实现可修改的 Map,编程人员必须另外重写此类的 put 方法(否则将抛出 UnsupportedOperationException),entrySet().iterator() 返回的迭代器也必须另外实现其 remove() 方法。

SortedMap 接口

SortedMap 的定义如下:

1
public interface SortedMap<K,V> extends Map<K,V> { }

SortedMap 继承了 Map ,它是一个有序的 Map

SortedMap 的排序方式有两种:自然排序或者用户指定比较器插入有序 SortedMap 的所有元素都必须实现 Comparable 接口(或者被指定的比较器所接受)

另外,所有 SortedMap 实现类都应该提供 4 个“标准”构造方法:

  1. void(无参数)构造方法,它创建一个空的有序 Map,按照键的自然顺序进行排序。
  2. 带有一个 Comparator 类型参数的构造方法,它创建一个空的有序 Map,根据指定的比较器进行排序。
  3. 带有一个 Map 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系与参数相同,按照键的自然顺序进行排序。
  4. 带有一个 SortedMap 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系和排序方法与输入的有序 Map 相同。无法保证强制实施此建议,因为接口不能包含构造方法。

NavigableMap 的定义如下:

1
public interface NavigableMap<K,V> extends SortedMap<K,V> { }

NavigableMap 继承了 SortedMap ,它提供了丰富的查找方法。

NavigableMap 分别提供了获取“键”、“键-值对”、“键集”、“键-值对集”的相关方法。

NavigableMap 提供的功能可以分为 4 类:

  • 获取键-值对
    • lowerEntryfloorEntryceilingEntryhigherEntry 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键关联的 Map.Entry 对象。
    • firstEntrypollFirstEntrylastEntrypollLastEntry 方法,它们返回和/或移除最小和最大的映射关系(如果存在),否则返回 null。
  • 获取键。这个和第 1 类比较类似。
    • lowerKeyfloorKeyceilingKeyhigherKey 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键。
  • 获取键的集合
    • navigableKeySetdescendingKeySet 分别获取正序/反序的键集。
  • 获取键-值对的子集

Dictionary 抽象类

Dictionary 的定义如下:

1
public abstract class Dictionary<K,V> {}

Dictionary 是 JDK 1.0 定义的操作键值对的抽象类,它包括了操作键值对的基本方法。

HashMap 类

HashMap 的命名,也可以看出:**HashMap 以散列方式存储键值对**。HashMap 是非线程安全的。

HashMap 允许使用空值和空键,但 null 作为键只能有一个,null 作为值可以有多个。

HashMap 类大致等同于 Hashtable,除了它是不同步的并且允许为空值。)这个类不保序;特别是,它的元素顺序可能会随着时间的推移变化。

JDK1.8 之前 HashMap 由 数组+链表 组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。 JDK1.8 以后的 HashMap 在解决哈希冲突时有了较大的变化,当链表长度大于等于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。

HashMap 默认的初始化大小为 16。之后每次扩充,容量变为原来的 2 倍。并且, HashMap 总是使用 2 的幂作为哈希表的大小。

JDK8 之前 HashMap 数据结构

之前 HashMap 底层是 数组和链表 结合在一起使用也就是 链表散列

HashMap 通过 key 的 hashCode 经过扰动函数处理过后得到 hash 值,然后通过 (n - 1) & hash 判断当前元素存放的位置(这里的 n 指的是数组的长度),如果当前位置存在元素的话,就判断该元素与要存入的元素的 hash 值以及 key 是否相同,如果相同的话,直接覆盖,不相同就通过拉链法解决冲突。

所谓扰动函数指的就是 HashMap 的 hash 方法。使用 hash 方法也就是扰动函数是为了防止一些实现比较差的 hashCode() 方法 换句话说使用扰动函数之后可以减少碰撞。

JDK7 的 HashMap 的 hash 方法:

1
2
3
4
static int hash(int h) {
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

JDK8 之后 HashMap 数据结构

HashMap 的主要字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HashMap<K,V> extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable {

// 该表在初次使用时初始化,并根据需要调整大小。分配时,长度总是2的幂。
transient Node<K,V>[] table;
// 保存缓存的 entrySet()。请注意,AbstractMap 字段用于 keySet() 和 values()。
transient Set<Map.Entry<K,V>> entrySet;
// map 中的键值对数
transient int size;
// 这个HashMap被结构修改的次数结构修改是那些改变HashMap中的映射数量或者修改其内部结构(例如,重新散列)的修改。
transient int modCount;
// 下一个调整大小的值(容量*负载因子)。
int threshold;
// 散列表的负载因子
final float loadFactor;
}

HashMap 有两个影响其性能的参数:初始容量和负载因子

  • size - 初始容量。默认为 16,每次容量不够自动扩容。容量是哈希表中桶的数量,初始容量就是哈希表创建时的容量。
  • loadFactor - 负载因子。自动扩容之前被允许的最大饱和量,默认 0.75。负载因子是散列表在其容量自动扩容之前被允许的最大饱和量。当哈希表中的 entry 数量超过负载因子和当前容量的乘积时,散列表就会被重新映射(即重建内部数据结构),一般散列表大约是存储桶数量的两倍。

通常,默认负载因子(0.75)在时间和空间成本之间提供了良好的平衡。较高的值会减少空间开销,但会增加查找成本(反映在大部分 HashMap 类的操作中,包括 getput)。在设置初始容量时,应考虑映射中的条目数量及其负载因子,以尽量减少重新运行操作的次数。如果初始容量大于最大入口数除以负载因子,则不会发生重新刷新操作。

如果许多映射要存储在 HashMap 实例中,使用足够大的容量创建映射将允许映射存储的效率高于根据需要执行自动重新散列以增长表。请注意,使用多个具有相同 hashCode() 的密钥是降低任何散列表性能的一个可靠方法。为了改善影响,当键是 Comparable 时,该类可以使用键之间的比较顺序来帮助断开关系。

JDK8 的 HashMap 的 hash 方法:

1
2
3
4
5
6
7
static final int hash(Object key) {
int h;
// key.hashCode():返回散列值也就是hashcode
// ^:按位异或
// >>>:无符号右移,忽略符号位,空位都以0补齐
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

getput 的过程中,计算下标时,先对 hashCode 进行 hash 操作,然后再通过 hash 值进一步计算下标,如下图所示:

在对 hashCode() 计算 hash 时具体实现是这样的:

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

可以看到这个方法大概的作用就是:高 16bit 不变,低 16bit 和高 16bit 做了一个异或。

在设计 hash 方法时,因为目前的 table 长度 n 为 2 的幂,而计算下标的时候,是这样实现的(使用 & 位操作,而非 % 求余):

1
(n - 1) & hash

设计者认为这方法很容易发生碰撞。为什么这么说呢?不妨思考一下,在 n - 1 为 15(0x1111) 时,其实散列真正生效的只是低 4bit 的有效位,当然容易碰撞了。

因此,设计者想了一个顾全大局的方法(综合考虑了速度、作用、质量),就是把高 16bit 和低 16bit 异或了一下。设计者还解释到因为现在大多数的 hashCode 的分布已经很不错了,就算是发生了碰撞也用 O(logn)的 tree 去做了。仅仅异或一下,既减少了系统的开销,也不会造成的因为高位没有参与下标的计算(table 长度比较小时),从而引起的碰撞。

如果还是产生了频繁的碰撞,会发生什么问题呢?作者注释说,他们使用树来处理频繁的碰撞(we use trees to handle large sets of collisions in bins),在 JEP-180 中,描述了这个问题:

Improve the performance of java.util.HashMap under high hash-collision conditions by using balanced trees rather than linked lists to store map entries. Implement the same improvement in the LinkedHashMap class.

之前已经提过,在获取 HashMap 的元素时,基本分两步:

  1. 首先根据 hashCode() 做 hash,然后确定 bucket 的 index;

  2. 如果 bucket 的节点的 key 不是我们需要的,则通过 keys.equals()在链中找。

在 JDK8 之前的实现中是用链表解决冲突的,在产生碰撞的情况下,进行 get 时,两步的时间复杂度是 O(1)+O(n)。因此,当碰撞很厉害的时候 n 很大,O(n)的速度显然是影响速度的。

因此在 JDK8 中,利用红黑树替换链表,这样复杂度就变成了 O(1)+O(logn)了,这样在 n 很大的时候,能够比较理想的解决这个问题,在 JDK8:HashMap 的性能提升一文中有性能测试的结果。

HashMap 构造方法

1
2
3
4
public HashMap(); // 默认负载因子0.75
public HashMap(int initialCapacity); // 默认负载因子0.75;以 initialCapacity 初始化容量
public HashMap(int initialCapacity, float loadFactor); // 以 initialCapacity 初始化容量;以 loadFactor 初始化负载因子
public HashMap(Map<? extends K, ? extends V> m) // 默认负载因子0.75

put 方法的实现

put 方法大致的思路为:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算 Node 的存储位置;
  • 如果没有哈希碰撞,直接放到桶里;如果有哈希碰撞,以链表的形式存在桶后。
  • 如果哈希碰撞导致链表过长(大于等于 TREEIFY_THRESHOLD,数值为 8),就把链表转换成红黑树;
  • 如果节点已经存在就替换旧值
  • 桶数量超过容量*负载因子(即 load factor * current capacity),HashMap 调用 resize 自动扩容一倍

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

// hashcode 无符号位移 16 位
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// tab 为空则创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 计算 index,并对 null 做处理
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 节点存在
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 该链为树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 该链为链表
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 写入
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

为什么计算 hash 使用 hashcode 无符号位移 16 位。

假设要添加两个对象 a 和 b,如果数组长度是 16,这时对象 a 和 b 通过公式 (n - 1) & hash 运算,也就是 (16-1)&a.hashCode 和 (16-1)&b.hashCode,15 的二进制为 0000000000000000000000000001111,假设对象 A 的 hashCode 为 1000010001110001000001111000000,对象 B 的 hashCode 为 0111011100111000101000010100000,你会发现上述与运算结果都是 0。这样的哈希结果就太让人失望了,很明显不是一个好的哈希算法。

但如果我们将 hashCode 值右移 16 位(h >>> 16 代表无符号右移 16 位),也就是取 int 类型的一半,刚好可以将该二进制数对半切开,并且使用位异或运算(如果两个数对应的位置相反,则结果为 1,反之为 0),这样的话,就能避免上面的情况发生。这就是 hash() 方法的具体实现方式。简而言之,就是尽量打乱 hashCode 真正参与运算的低 16 位。

get 方法的实现

在理解了 put 之后,get 就很简单了。大致思路如下:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算桶的 index

  • 如果桶中的第一个节点命中,直接返回;

  • 如果有冲突,则通过 key.equals(k) 去查找对应的 entry

    • 若为树,则在红黑树中通过 key.equals(k) 查找,O(logn);

    • 若为链表,则在链表中通过 key.equals(k) 查找,O(n)。

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 直接命中
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 未命中
if ((e = first.next) != null) {
// 在树中 get
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 在链表中 get
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

resize 的实现

put 时,如果发现目前的 bucket 占用程度已经超过了 Load Factor 所希望的比例,那么就会发生 resize。在 resize 的过程,简单的说就是把 bucket 扩充为 2 倍,之后重新计算 index,把节点再放到新的 bucket 中。

当超过限制的时候会 resize,然而又因为我们使用的是 2 次幂的扩展(指长度扩为原来 2 倍),所以,元素的位置要么是在原位置,要么是在原位置再移动 2 次幂的位置。

怎么理解呢?例如我们从 16 扩展为 32 时,具体的变化如下所示:

因此元素在重新计算 hash 之后,因为 n 变为 2 倍,那么 n-1 的 mask 范围在高位多 1bit(红色),因此新的 index 就会发生这样的变化:

因此,我们在扩充 HashMap 的时候,不需要重新计算 hash,只需要看看原来的 hash 值新增的那个 bit 是 1 还是 0 就好了,是 0 的话索引没变,是 1 的话索引变成“原索引+oldCap”。可以看看下图为 16 扩充为 32 的 resize 示意图:

这个设计确实非常的巧妙,既省去了重新计算 hash 值的时间,而且同时,由于新增的 1bit 是 0 还是 1 可以认为是随机的,因此 resize 的过程,均匀的把之前的冲突的节点分散到新的 bucket 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的 2 倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}

// 计算新的 resize 上限
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// 把每个 bucket 都移动到新的 buckets 中
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 原索引
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
// 原索引+oldCap
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 原索引放到bucket里
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 原索引+oldCap放到bucket里
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

LinkedHashMap 类

LinkedHashMap 要点

LinkedHashMap 通过维护一个保存所有条目(Entry)的双向链表,保证了元素迭代的顺序(即插入顺序)

关注点 结论
是否允许键值对为 null Key 和 Value 都允许 null
是否允许重复数据 Key 重复会覆盖、Value 允许重复
是否有序 按照元素插入顺序存储
是否线程安全 非线程安全

LinkedHashMap 数据结构

LinkedHashMap 通过维护一对 LinkedHashMap.Entry<K,V> 类型的头尾指针,以双链表形式,保存所有数据

学习过数据结构的双链表,就能理解其元素存储以及访问必然是有序的。

1
2
3
4
5
6
7
8
9
10
11
public class LinkedHashMap<K,V>
extends HashMap<K,V>
implements Map<K,V> {

// 双链表的头指针
transient LinkedHashMap.Entry<K,V> head;
// 双链表的尾指针
transient LinkedHashMap.Entry<K,V> tail;
// 迭代排序方法:true 表示访问顺序;false 表示插入顺序
final boolean accessOrder;
}

LinkedHashMap 继承了 HashMapput 方法,本身没有实现 put 方法。

TreeMap 类

TreeMap 要点

TreeMap 基于红黑树实现。

TreeMap 是有序的。它的排序规则是:根据 map 中的 key 的自然语义顺序或提供的比较器(Comparator)的自定义比较顺序。

TreeMap 不是线程安全的。

TreeMap 原理

put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
Entry<K,V> t = root;
// 如果根节点为 null,插入第一个节点
if (t == null) {
compare(key, key); // type (and possibly null) check

root = new Entry<>(key, value, null);
size = 1;
modCount++;
return null;
}
int cmp;
Entry<K,V> parent;
// split comparator and comparable paths
Comparator<? super K> cpr = comparator;
// 每个节点的左孩子节点的值小于它;右孩子节点的值大于它
// 如果有比较器,使用比较器进行比较
if (cpr != null) {
do {
parent = t;
cmp = cpr.compare(key, t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 没有比较器,使用 key 的自然顺序进行比较
else {
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 通过上面的遍历未找到 key 值,则新插入节点
Entry<K,V> e = new Entry<>(key, value, parent);
if (cmp < 0)
parent.left = e;
else
parent.right = e;
// 插入后,为了维持红黑树的平衡需要调整
fixAfterInsertion(e);
size++;
modCount++;
return null;
}

get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get(Object key) {
Entry<K,V> p = getEntry(key);
return (p==null ? null : p.value);
}

final Entry<K,V> getEntry(Object key) {
// Offload comparator-based version for sake of performance
if (comparator != null)
return getEntryUsingComparator(key);
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
Entry<K,V> p = root;
// 按照二叉树搜索的方式进行搜索,搜到返回
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)
p = p.left;
else if (cmp > 0)
p = p.right;
else
return p;
}
return null;
}

remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V remove(Object key) {
Entry<K,V> p = getEntry(key);
if (p == null)
return null;

V oldValue = p.value;
deleteEntry(p);
return oldValue;
}
private void deleteEntry(Entry<K,V> p) {
modCount++;
size--;

// 如果当前节点有左右孩子节点,使用后继节点替换要删除的节点
// If strictly internal, copy successor's element to p and then make p
// point to successor.
if (p.left != null && p.right != null) {
Entry<K,V> s = successor(p);
p.key = s.key;
p.value = s.value;
p = s;
} // p has 2 children

// Start fixup at replacement node, if it exists.
Entry<K,V> replacement = (p.left != null ? p.left : p.right);

if (replacement != null) { // 要删除的节点有一个孩子节点
// Link replacement to parent
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;

// Null out links so they are OK to use by fixAfterDeletion.
p.left = p.right = p.parent = null;

// Fix replacement
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) { // return if we are the only node.
root = null;
} else { // No children. Use self as phantom replacement and unlink.
if (p.color == BLACK)
fixAfterDeletion(p);

if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}

TreeMap 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TreeMapDemo {

private static final String[] chars = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split(" ");

public static void main(String[] args) {
TreeMap<Integer, String> treeMap = new TreeMap<>();
for (int i = 0; i < chars.length; i++) {
treeMap.put(i, chars[i]);
}
System.out.println(treeMap);
Integer low = treeMap.firstKey();
Integer high = treeMap.lastKey();
System.out.println(low);
System.out.println(high);
Iterator<Integer> it = treeMap.keySet().iterator();
for (int i = 0; i <= 6; i++) {
if (i == 3) { low = it.next(); }
if (i == 6) { high = it.next(); } else { it.next(); }
}
System.out.println(low);
System.out.println(high);
System.out.println(treeMap.subMap(low, high));
System.out.println(treeMap.headMap(high));
System.out.println(treeMap.tailMap(low));
}
}

WeakHashMap

WeakHashMap 的定义如下:

1
2
3
public class WeakHashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V> {}

WeakHashMap 继承了 AbstractMap,实现了 Map 接口。

和 HashMap 一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是 null。

不过 WeakHashMap 的键是弱键。在 WeakHashMap 中,当某个键不再被其它对象引用,会被从 WeakHashMap 中被自动移除。更精确地说,对于一个给定的键,其映射的存在并不阻止垃圾回收器对该键的丢弃,这就使该键成为可终止的,被终止,然后被回收。某个键被终止时,它对应的键值对也就从映射中有效地移除了。

这个弱键的原理呢?大致上就是,通过 WeakReference 和 ReferenceQueue 实现的。

WeakHashMap 的 key 是弱键,即是 WeakReference 类型的;ReferenceQueue 是一个队列,它会保存被 GC 回收的弱键。实现步骤是:

  1. 新建 WeakHashMap,将键值对添加到 WeakHashMap 中。实际上,WeakHashMap 是通过数组 table 保存 Entry(键值对);每一个 Entry 实际上是一个单向链表,即 Entry 是键值对链表。
  2. 当某弱键不再被其它对象引用,并被 GC 回收时。在 GC 回收该弱键时,这个弱键也同时会被添加到 ReferenceQueue(queue)队列中。
  3. 当下一次我们需要操作 WeakHashMap 时,会先同步 table 和 queue。table 中保存了全部的键值对,而 queue 中保存被 GC 回收的键值对;同步它们,就是删除 table 中被 GC 回收的键值对。

这就是弱键如何被自动从 WeakHashMap 中删除的步骤了。

和 HashMap 一样,WeakHashMap 是不同步的。可以使用 Collections.synchronizedMap 方法来构造同步的 WeakHashMap。

总结

Map 简介

img

HashMap

img

其他 Map

img

参考资料

Java 容器之 Set

Set 简介

Set 家族成员简介:

  • Set 继承了 Collection 的接口。实际上 Set 就是 Collection,只是行为略有不同:Set 集合不允许有重复元素。
  • SortedSet 继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。
  • NavigableSet 继承了 SortedSet 的接口。它提供了丰富的查找方法:如”获取大于/等于某值的元素”、“获取小于/等于某值的元素”等等。
  • AbstractSet 是一个抽象类,它继承于 AbstractCollectionAbstractCollection 实现了 Set 中的绝大部分方法,为实现 Set 的实例类提供了便利。
  • HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。
  • TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。
  • LinkedHashSet 是按插入顺序排序的 Set。
  • EnumSet 是只能存放 Emum 枚举类型的 Set。

Set 接口

Set 继承了 Collection 的接口。实际上,Set 就是 Collection,二者提供的方法完全相同。

Set 接口定义如下:

1
public interface Set<E> extends Collection<E> {}

SortedSet 接口

继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。

SortedSet 接口定义如下:

1
public interface SortedSet<E> extends Set<E> {}

SortedSet 接口新扩展的方法:

  • comparator - 返回 Comparator
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集
  • first - 返回第一个元素
  • last - 返回最后一个元素
  • spliterator

NavigableSet 继承了 SortedSet。它提供了丰富的查找方法。

NavigableSet 接口定义如下:

1
public interface NavigableSet<E> extends SortedSet<E> {}

NavigableSet 接口新扩展的方法:

  • lower - 返回小于指定值的元素中最接近的元素
  • higher - 返回大于指定值的元素中最接近的元素
  • floor - 返回小于或等于指定值的元素中最接近的元素
  • ceiling - 返回大于或等于指定值的元素中最接近的元素
  • pollFirst - 检索并移除第一个(最小的)元素
  • pollLast - 检索并移除最后一个(最大的)元素
  • descendingSet - 返回反序排列的 Set
  • descendingIterator - 返回反序排列的 Set 的迭代器
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集

AbstractSet 抽象类

AbstractSet 类提供 Set 接口的核心实现,以最大限度地减少实现 Set 接口所需的工作。

AbstractSet 抽象类定义如下:

1
public abstract class AbstractSet<E> extends AbstractCollection<E> implements Set<E> {}

事实上,主要的实现已经在 AbstractCollection 中完成。

HashSet 类

HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。

HashSet 类定义如下:

1
2
3
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

HashSet 要点

  • HashSet 通过继承 AbstractSet 实现了 Set 接口中的骨干方法。
  • HashSet 实现了 Cloneable,所以支持克隆。
  • HashSet 实现了 Serializable,所以支持序列化。
  • HashSet 中存储的元素是无序的。
  • HashSet 允许 null 值的元素。
  • HashSet 不是线程安全的。

HashSet 原理

HashSet 是基于 HashMap 实现的。

1
2
3
4
5
6
// HashSet 的核心,通过维护一个 HashMap 实体来实现 HashSet 方法
private transient HashMap<E,Object> map;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
}
  • HashSet 中维护了一个 HashMap 对象 map,HashSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
    • HashSet 类中通过定义 writeObject()readObject() 方法确定了其序列化和反序列化的机制。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。

TreeSet 类

TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。

TreeSet 类定义如下:

1
2
public class TreeSet<E> extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {}

TreeSet 要点

  • TreeSet 通过继承 AbstractSet 实现了 NavigableSet 接口中的骨干方法。
  • TreeSet 实现了 Cloneable,所以支持克隆。
  • TreeSet 实现了 Serializable,所以支持序列化。
  • TreeSet 中存储的元素是有序的。排序规则是自然顺序或比较器(Comparator)中提供的顺序规则。
  • TreeSet 不是线程安全的。

TreeSet 源码

TreeSet 是基于 TreeMap 实现的。

1
2
3
4
5
// TreeSet 的核心,通过维护一个 NavigableMap 实体来实现 TreeSet 方法
private transient NavigableMap<E,Object> m;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
  • TreeSet 中维护了一个 NavigableMap 对象 map(实际上是一个 TreeMap 实例),TreeSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。TreeSet 中的元素都被当成 TreeMap 的 key 存储,而 value 都填的是 PRESENT

LinkedHashSet 类

LinkedHashSet 是按插入顺序排序的 Set。

LinkedHashSet 类定义如下:

1
2
3
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

LinkedHashSet 要点

  • LinkedHashSet 通过继承 HashSet 实现了 Set 接口中的骨干方法。
  • LinkedHashSet 实现了 Cloneable,所以支持克隆。
  • LinkedHashSet 实现了 Serializable,所以支持序列化。
  • LinkedHashSet 中存储的元素是按照插入顺序保存的。
  • LinkedHashSet 不是线程安全的。

LinkedHashSet 原理

LinkedHashSet 有三个构造方法,无一例外,都是调用父类 HashSet 的构造方法。

1
2
3
4
5
6
7
8
9
public LinkedHashSet(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor, true);
}
public LinkedHashSet(int initialCapacity) {
super(initialCapacity, .75f, true);
}
public LinkedHashSet() {
super(16, .75f, true);
}

需要强调的是:LinkedHashSet 构造方法实际上调用的是父类 HashSet 的非 public 构造方法。

1
2
3
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}

不同于 HashSet public 构造方法中初始化的 HashMap 实例,这个构造方法中,初始化了 LinkedHashMap 实例。

也就是说,实际上,LinkedHashSet 维护了一个双链表。由双链表的特性可以知道,它是按照元素的插入顺序保存的。所以,这就是 LinkedHashSet 中存储的元素是按照插入顺序保存的原理。

EnumSet 类

EnumSet 类定义如下:

1
2
public abstract class EnumSet<E extends Enum<E>> extends AbstractSet<E>
implements Cloneable, java.io.Serializable {}

EnumSet 要点

  • EnumSet 继承了 AbstractSet,所以有 Set 接口中的骨干方法。
  • EnumSet 实现了 Cloneable,所以支持克隆。
  • EnumSet 实现了 Serializable,所以支持序列化。
  • EnumSet 通过 <E extends Enum<E>> 限定了存储元素必须是枚举值。
  • EnumSet 没有构造方法,只能通过类中的 static 方法来创建 EnumSet 对象。
  • EnumSet 是有序的。以枚举值在 EnumSet 类中的定义顺序来决定集合元素的顺序。
  • EnumSet 不是线程安全的。

HashSet vs. LinkedHashSet vs. TreeSet

HashSetLinkedHashSetTreeSet 都是 Set 接口的实现类,都能保证元素唯一,并且都不是线程安全的。

HashSetLinkedHashSetTreeSet 的主要区别在于底层数据结构不同。HashSet 的底层数据结构是哈希表(基于 HashMap 实现)。LinkedHashSet 的底层数据结构是链表和哈希表,元素的插入和取出顺序满足 FIFO。TreeSet 底层数据结构是红黑树,元素是有序的,排序的方式有自然排序和定制排序。

底层数据结构不同又导致这三者的应用场景不同。HashSet 用于不需要保证元素插入和取出顺序的场景,LinkedHashSet 用于保证元素的插入和取出顺序满足 FIFO 的场景,TreeSet 用于支持对元素自定义排序规则的场景。

参考资料

Java 容器简介

容器简介

数组与容器

Java 中常用的存储容器就是数组和容器,二者有以下区别:

  • 存储大小是否固定
    • 数组的长度固定
    • 容器的长度可变
  • 数据类型
    • 数组可以存储基本数据类型,也可以存储引用数据类型
    • 容器只能存储引用数据类型,基本数据类型的变量要转换成对应的包装类才能放入容器类中。

:bulb: 不了解什么是基本数据类型、引用数据类型、包装类这些概念,可以参考:Java 基本数据类型

容器框架

img

Java 容器框架主要分为 CollectionMap 两种。其中,Collection 又分为 ListSet 以及 Queue

  • Collection - 一个独立元素的序列,这些元素都服从一条或者多条规则。
    • List - 必须按照插入的顺序保存元素。常见 List 容器有:
      • ArrayList - Object[] 数组。
      • LinkedList - 双链表 (JDK1.6 之前为循环链表,JDK1.7 取消了循环)。
      • Vector - 通过 synchronized 修饰读写方法来保证并发安全。
      • Vector - Object[] 数组,通过 synchronized 修饰读写方法来保证并发安全。
    • Set - 不能有重复的元素。常见 Set 容器有:
      • HashSet - 无序,内部基于 HashMap 来实现的。
      • LinkedHashSet - 保证插入顺序,内部基于 LinkedHashMap 来实现的。
      • TreeSet - 保证自然序或用户指定的比较器顺序,内部基于红黑树实现。
    • Queue - 按照排队规则来确定对象产生的顺序。
      • PriorityQueue - 基于 Object[] 数组来实现小顶堆
      • DelayQueue - 延迟队列。
      • ArrayQueue - ArrayDequeDeque 的顺序表实现。基于动态数组实现了栈和队列所需的所有操作。
      • LinkedList - LinkedListDeque 的链表实现。
  • Map - 一组成对的“键值对”对象,允许你使用键来查找值。常见的 Map 容器有:
    • HashMap:JDK1.8 之前 HashMap 由数组+链表组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。JDK1.8 以后在解决哈希冲突时有了较大的变化,当链表长度大于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。
    • LinkedHashMapLinkedHashMap 继承自 HashMap,所以它的底层仍然是基于拉链式散列结构即由数组和链表或红黑树组成。另外,LinkedHashMap 在上面结构的基础上,增加了一条双向链表,使得上面的结构可以保持键值对的插入顺序。同时通过对链表进行相应的操作,实现了访问顺序相关逻辑。
    • Hashtable:数组+链表组成的,数组是 Hashtable 的主体,链表则是主要为了解决哈希冲突而存在的。
    • TreeMap:红黑树(自平衡的排序二叉树)。

容器的基本机制

Java 的容器具有一定的共性,它们或全部或部分依赖以下技术。所以,学习以下技术点,对于理解 Java 容器的特性和原理有很大的帮助。

泛型

Java 1.5 引入了泛型技术。

Java 容器通过泛型技术来保证其数据的类型安全。什么是类型安全呢?

举例来说:如果有一个 List<Object> 容器,Java 编译器在编译时不会对原始类型进行类型安全检查,却会对带参数的类型进行检查,通过使用 Object 作为类型,可以告知编译器该方法可以接受任何类型的对象,比如 String 或 Integer。

1
2
3
List<Object> list = new ArrayList<Object>();
list.add("123");
list.add(123);

如果没有泛型技术,如示例中的代码那样,容器中就可能存储任意数据类型,这是很危险的行为。

1
2
3
List<String> list = new ArrayList<String>();
list.add("123");
list.add(123);

:bulb: 想深入了解 Java 泛型技术的用法和原理可以参考:深入理解 Java 泛型

Iterable 和 Iterator

Iterable 和 Iterator 目的在于遍历访问容器中的元素。

Iterator 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Iterator<E> {

boolean hasNext();

E next();

default void remove() {
throw new UnsupportedOperationException("remove");
}

default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
}

Iterable 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Iterable<T> {

Iterator<T> iterator();

default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}

default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}

Collection 接口扩展了 Iterable 接口。

迭代其实我们可以简单地理解为遍历,是一个标准化遍历各类容器里面的所有对象的接口。它是一个经典的设计模式——迭代器模式(Iterator)。

迭代器模式 - 提供一种方法顺序访问一个聚合对象中各个元素,而又无须暴露该对象的内部表示

示例:迭代器遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IteratorDemo {

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
}

}

《阿里巴巴 Java 开发手册》的描述如下:

不要在 foreach 循环里进行元素的 remove/add 操作。remove 元素请使用 Iterator 方式,如果并发操作,需要对 Iterator 对象加锁。

通过反编译你会发现 foreach 语法底层其实还是依赖 Iterator 。不过, remove/add 操作直接调用的是集合自己的方法,而不是 Iteratorremove/add方法

这就导致 Iterator 莫名其妙地发现自己有元素被 remove/add ,然后,它就会抛出一个 ConcurrentModificationException 来提示用户发生了并发修改异常。这就是单线程状态下产生的 fail-fast 机制

fail-fast 机制:多个线程对 fail-fast 集合进行修改的时候,可能会抛出ConcurrentModificationException。 即使是单线程下也有可能会出现这种情况,上面已经提到过。

相关阅读:什么是 fail-fastopen in new window

Java8 开始,可以使用 Collection#removeIf()方法删除满足特定条件的元素,如:

1
2
3
4
5
6
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 10; ++i) {
list.add(i);
}
list.removeIf(filter -> filter % 2 == 0); /* 删除list中的所有偶数 */
System.out.println(list); /* [1, 3, 5, 7, 9] */

除了上面介绍的直接使用 Iterator 进行遍历操作之外,你还可以:

  • 使用普通的 for 循环
  • 使用 fail-safe 的集合类。java.util包下面的所有的集合类都是 fail-fast 的,而java.util.concurrent包下面的所有的类都是 fail-safe 的。
  • … …

Comparable 和 Comparator

Comparable 接口和 Comparator 接口一般用于实现容器中元素的比较及排序:

  • Comparable 接口实际上是出自 java.lang 包 它有一个 compareTo(Object obj) 方法用来排序
  • Comparator 接口实际上是出自 java.util 包它有一个 compare(Object obj1, Object obj2) 方法用来排序

::: tabs#Comparable和Comparator接口定义

@tab Comparable 接口定义

Comparable 接口定义

1
2
3
public interface Comparable<T> {
public int compareTo(T o);
}

@tab Comparator 接口定义

Comparator 接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@FunctionalInterface
public interface Comparator<T> {

int compare(T o1, T o2);

boolean equals(Object obj);

// 反转
default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator<T> & Serializable) (c1, c2) -> {
int res = compare(c1, c2);
return (res != 0) ? res : other.compare(c1, c2);
};
}

// thenComparingXXX 方法略

// 静态方法略
}

:::

假设,有一个 List 容器,存储的是 User 类型对象。现在要根据 User 中的 age 属性进行排序。

User 定义如下:

1
2
3
4
5
6
7
8
9
10
11
public class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}
// getter、setter 略
}

我们分别通过 ComparableComparator 来实现比较、排序,体会一下有何差异。

::: tabs#Comparable和Comparator使用示例

@tab Comparable 接口使用示例

Comparable 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ComparableDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list);
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

// 需要对被比较、排序的类进行改造,实现 Comparable 接口
static class User implements Comparable<User> {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public int compareTo(User o) {
return this.age - o.age;
}

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparable 接口,被排序对象类必须实现 Comparable 接口;并在类中定义 compareTo 方法的实现,即排序逻辑必须置于被排序对象类中。

@tab Comparator 接口使用示例

Comparator 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ComparatorDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list, new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o1.age - o2.age;
}
});
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

static class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparator 接口和 Comparable 接口的不同点在于:被排序的对象类无需实现 Comparator 接口,排序逻辑置于被排序对象类的外部。

:::

Cloneable

Java 中 一个类要实现 clone 功能 必须实现 Cloneable 接口,否则在调用 clone() 时会报 CloneNotSupportedException 异常。

Java 中所有类都默认继承 java.lang.Object 类,在 java.lang.Object 类中有一个方法 clone(),这个方法将返回 Object 对象的一个拷贝。Object 类里的 clone() 方法仅仅用于浅拷贝(拷贝基本成员属性,对于引用类型仅返回指向改地址的引用)。

如果 Java 类需要深拷贝,需要覆写 clone() 方法。

fail-fast

fail-fast 的要点

Java 容器(如:ArrayList、HashMap、TreeSet 等待)的 javadoc 中常常提到类似的描述:

注意,迭代器的快速失败行为无法得到保证,因为一般来说,不可能对是否出现不同步并发修改做出任何硬性保证。快速失败(fail-fast)迭代器会尽最大努力抛出 ConcurrentModificationException。因此,为提高这类迭代器的正确性而编写一个依赖于此异常的程序是错误的做法:迭代器的快速失败行为应该仅用于检测 bug。

那么,我们不禁要问,什么是 fail-fast,为什么要有 fail-fast 机制?

fail-fast 是 Java 容器的一种错误检测机制。当多个线程对容器进行结构上的改变的操作时,就可能触发 fail-fast 机制。记住是有可能,而不是一定。

例如:假设存在两个线程(线程 1、线程 2),线程 1 通过 Iterator 在遍历容器 A 中的元素,在某个时候线程 2 修改了容器 A 的结构(是结构上面的修改,而不是简单的修改容器元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生 fail-fast 机制。

容器在迭代操作中改变元素个数(添加、删除元素)都可能会导致 fail-fast

示例:fail-fast 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FailFastDemo {

private static int MAX = 100;

private static List<Integer> list = new ArrayList<>();

public static void main(String[] args) {
for (int i = 0; i < MAX; i++) {
list.add(i);
}
new Thread(new MyThreadA()).start();
new Thread(new MyThreadB()).start();
}

/** 迭代遍历容器所有元素 */
static class MyThreadA implements Runnable {

@Override
public void run() {
Iterator<Integer> iterator = list.iterator();
while (iterator.hasNext()) {
int i = iterator.next();
System.out.println("MyThreadA 访问元素:" + i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

/** 遍历删除指定范围内的所有偶数 */
static class MyThreadB implements Runnable {

@Override
public void run() {
int i = 0;
while (i < MAX) {
if (i % 2 == 0) {
System.out.println("MyThreadB 删除元素" + i);
list.remove(i);
}
i++;
}
}

}

}

执行后,会抛出 java.util.ConcurrentModificationException 异常。

解决 fail-fast

fail-fast 有两种解决方案:

  • 在遍历过程中所有涉及到改变容器个数的地方全部加上 synchronized 或者直接使用 Collections.synchronizedXXX 容器,这样就可以解决。但是不推荐,因为增删造成的同步锁可能会阻塞遍历操作,影响吞吐。
  • 使用并发容器,如:CopyOnWriterArrayList

容器和线程安全

为了在并发环境下安全地使用容器,Java 提供了同步容器和并发容器。

同步容器和并发容器详情请参考:Java 并发之容器

参考资料

Java 并发之 AQS

AQS 简介

AQSAbstractQueuedSynchronizer 的缩写,即 队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

**AQS 提供了对锁和同步器的通用能力支持 **。在 java.util.concurrent.locks 包中的相关锁(常用的有 ReentrantLockThreadPoolExecutor)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync 类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 的应用

AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore / CountDownLatch)。

独占锁 API

获取、释放独占锁的主要 API 如下:

1
2
3
4
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 获取独占锁。
  • acquireInterruptibly - 获取可中断的独占锁。
  • tryAcquireNanos - 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回 false。
  • release - 释放独占锁。

共享锁 API

获取、释放共享锁的主要 API 如下:

1
2
3
4
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 获取共享锁。
  • acquireSharedInterruptibly - 获取可中断的共享锁。
  • tryAcquireSharedNanos - 尝试在指定时间内获取可中断的共享锁。
  • release - 释放共享锁。

AQS 的原理

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH 本是一个单向队列,AQS 中的队列采用了 CLH 的变体,是一个虚拟的 FIFO 双向队列(虚拟的双向队列,是指不存在结点实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 的核心原理图:

AQS 的数据结构

先看一下 AbstractQueuedSynchronizer 的定义:

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/** 等待队列的队头,懒加载。只能通过 setHead 方法修改。 */
private transient volatile Node head;
/** 等待队列的队尾,懒加载。只能通过 enq 方法添加新的等待节点。*/
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
}

阅读 AQS 的源码,可以发现:AQS 继承自 AbstractOwnableSynchronize,它有以下核心属性:

  • state - AQS 使用一个整型的 volatile 变量来 维护同步状态。这个整数状态的意义由子类来赋予,如 ReentrantLock 中该状态值表示所有者线程已经重复获取该锁的次数;Semaphore 中该状态值表示剩余的许可数量。
  • headtail - AQS **维护了一个 Node 类型(AQS 的内部类)的双向队列来完成同步状态的管理 **。这个双向队列是一个双向的 FIFO 队列,通过 headtail 指针进行访问。当 **有线程获取锁失败后,就被添加到队列末尾 **。

再来看一下 Node 的源码,很显然,Node 是一个双向队列结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
/** 该等待同步的节点处于共享模式 */
static final Node SHARED = new Node();
/** 该等待同步的节点处于独占模式 */
static final Node EXCLUSIVE = null;

/** 线程等待状态,状态值有:0、1、-1、-2、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待锁的线程 */
volatile Thread thread;

/** 和节点是否共享有关 */
Node nextWaiter;
}

属性说明:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
next 后继指针

waitStatus 是一个整型的 volatile 变量,用来维护 AQS 同步队列中线程节点的状态。waitStatus 有五个状态值:

  • 0 - 一个 Node 被初始化的时候的默认值
  • CANCELLED(1) - 表示线程获取锁的请求已经取消了
  • SIGNAL(-1) - 表示线程已经准备好了,就等资源释放了
  • CONDITION(-2) - 表示节点在等待队列中,节点线程等待唤醒
  • PROPAGATE(-3) - 当前线程处在 SHARED 情况下,该字段才会使用

独占锁的获取和释放

获取独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 利用 CAS 操作将当前线程加入等待队列队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

// 自旋尝试为等待队列中的线程节点获取独占锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 获取锁成功,退出
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 线程中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其大致流程如下:

  1. 先通过 tryAcquire 尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。
  2. 若不成功,调用 addWaiter 方法,利用 CAS 操作将当前线程加入等待队列队尾。
  3. 接着,自旋尝试为等待队列中的线程节点获取独占锁,直到获取成功或线程中断。

释放独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 如果队列不为空,唤醒下一个节点中的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
  1. 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
  2. 如果获取同步状态成功且队列不为空,AQS 会尝试唤醒下一个节点中的线程。

获取可中断的独占锁

AQS 中使用 acquireInterruptibly(int arg) 方法获取可中断的独占锁。

acquireInterruptibly(int arg) 实现方式 相较于获取独占锁方法( acquire)非常相似,区别仅在于它会 通过 Thread.interrupted 检测当前线程是否被中断,如果是,则立即抛出中断异常(InterruptedException)。

限时获取独占锁

AQS 中使用 tryAcquireNanos(int arg) 方法获取超时等待的独占锁。

doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire)非常相似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,如果超时,直接返回 false;如果没超时,则用 LockSupport.parkNanos 来阻塞当前线程。

共享锁的获取和释放

获取共享锁

AQS 中使用 acquireShared(int arg) 方法获取共享锁。

acquireShared 方法和 acquire 方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。

成功获得共享锁的条件如下:

  • tryAcquireShared(arg) 返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。
  • 当前节点的前驱节点是头结点。

释放共享锁

AQS 中使用 releaseShared(int arg) 方法释放共享锁。

releaseShared 首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独占锁流程大体相似,区别在于:

对于独占模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor

获取可中断的共享锁

AQS 中使用 acquireSharedInterruptibly(int arg) 方法获取可中断的共享锁。

acquireSharedInterruptibly 方法与 acquireInterruptibly 几乎一致,不再赘述。

限时获取共享锁

AQS 中使用 tryAcquireSharedNanos(int arg) 方法获取超时等待式的共享锁。

tryAcquireSharedNanos 方法与 tryAcquireNanos 几乎一致,不再赘述。

自定义同步器

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

1
2
3
4
5
6
7
8
9
10
// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
protected boolean tryAcquire(int)
// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryRelease(int)
// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryReleaseShared(int)
// 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
protected boolean isHeldExclusively()

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

参考资料

Java 并发之锁

本文先阐述 Java 中各种锁的概念。

然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。

互斥,即同一时刻只允许一个线程访问共享资源;

同步,即线程之间如何通信、协作。

这两大问题,管程(sychronized)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

并发锁简介

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

在工作、面试中,经常会听到各种五花八门的锁,听的人云里雾里。锁的概念术语很多,它们是针对不同的问题所提出的,通过简单的梳理,也不难理解。

可重入锁

可重入锁,顾名思义,指的是线程可以重复获取同一把锁。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

可重入锁可以在一定程度上避免死锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

【示例】synchronized 的可重入示例

1
2
3
4
5
6
7
8
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}

synchronized void setB() throws Exception{
Thread.sleep(1000);
}

上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB 可能不会被当前线程执行,从而造成死锁。

【示例】ReentrantLock 的可重入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Task {

private int value;
private final Lock lock = new ReentrantLock();

public Task() {
this.value = 0;
}

public int get() {
// 获取锁
lock.lock();
try {
return value;
} finally {
// 保证锁能释放
lock.unlock();
}
}

public void addOne() {
// 获取锁
lock.lock();
try {
// 注意:此处已经成功获取锁,进入 get 方法后,又尝试获取锁,
// 如果锁不是可重入的,会导致死锁
value = 1 + get();
} finally {
// 保证锁能释放
lock.unlock();
}
}

}

公平锁与非公平锁

  • 公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁
  • 非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。

公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。

公平锁与非公平锁 在 Java 中的典型实现:

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独占锁与共享锁

独占锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。

  • 独占锁 - 独占锁是指 锁一次只能被一个线程所持有
  • 共享锁 - 共享锁是指 锁可被多个线程所持有

独占锁与共享锁在 Java 中的典型实现:

  • synchronizedReentrantLock 只支持独占锁
  • ReentrantReadWriteLock 其写锁是独占锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略

悲观锁(Pessimistic Lock)

  • 总是假设最坏的情况,认为:不加锁的并发操作一定会出问题
  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。
  • 悲观锁适合写操作频繁的场景。高并发的场景下,激烈的锁竞争会造成线程阻塞,大量阻塞线程会导致系统的上下文切换,增加系统的性能开销。并且,悲观锁还可能会存在死锁问题,影响代码的正常运行。

【示例】悲观锁示例

1
2
3
4
5
6
7
8
9
10
11
12
13
public void syncTask() {
synchronized (this) {
// 需要同步的操作
}
}

private Lock lock = new ReentrantLock();
lock.lock();
try {
// 需要同步的操作
} finally {
lock.unlock();
}

乐观锁(OptimisticLock)

  • 乐观锁总是假设最好的情况,认为:不加锁的并发操作也没什么问题。每次访问数据时,都假设数据不会被其他线程修改,不必加锁。虽然不加锁,但不意味着什么都不做,而是在更新的时候,判断一下在此期间是否有其他线程更新该数据。
  • 乐观锁最常见的实现方式,是使用版本号机制或 CAS 算法(Compare And Swap)去实现。Java 中的原子类就是基于 CAS 实现的。
  • 乐观锁的优点是:减少锁竞争,提高并发度。
  • 乐观锁的缺点是:
    • 存在 ABA 问题。所谓的 ABA 问题是指在并发编程中,如果一个变量初次读取的时候是 A 值,它的值被改成了 B,然后又其他线程把 B 值改成了 A,而另一个早期线程在对比值时会误以为此值没有发生改变,但其实已经发生变化了
    • 如果乐观锁所检查的数据存在大量锁竞争,会由于不断循环重试,产生大量的 CPU 开销
  • 乐观锁适合读多写少的场景。高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试,这样同样会非常影响性能,导致 CPU 飙升。

【示例】乐观锁示例

1
2
3
4
5
6
7
8
9
10
// AtomicInteger 的 getAndAccumulate 方法采用了自旋 + CAS 的乐观锁模式
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

乐观锁也是一种通用的锁机制,不仅在 Java 中,在其他很多软件领域,也存在乐观锁机制。比如下面的示例是 MySQL 中的乐观锁示例。

假设,order 表中有一个字段 status,表示订单状态:status 为 1 代表订单未支付;status 为 2 代表订单已支付。现在,要将 id 为 1 的订单状态置为已支付,则操作如下:

1
2
3
4
5
select status, version from order where id=#{id}

update order
set status=2, version=version+1
where id=#{id} and version=#{version};

偏向锁、轻量级锁、重量级锁

所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

  • 偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
  • 轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
  • 重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。

Hashtable 使用 synchronized 修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。

Java 1.7 以前的 ConcurrentHashMap 就是分段锁的典型案例。ConcurrentHashMap 维护了一个 Segment 数组,一般称为分段桶。

1
final Segment<K,V>[] segments;

当有线程访问 ConcurrentHashMap 的数据时,ConcurrentHashMap 会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment

内置锁和显示锁

Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronizedvolatile。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。

Java 1.5 之后,增加了新的机制:ReentrantLockReentrantReadWriteLock ,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。

💡 synchronized 的用法和原理可以参考:Java 并发之内存模型

:bell: 注意:如果不需要 ReentrantLockReentrantReadWriteLock 所提供的高级同步特性,**应该优先考虑使用 synchronized**。理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 从趋势来看,Java 未来更可能会优化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,因为 synchronized 是 JVM 内置属性,它能执行一些优化。
  • ReentrantLockReentrantReadWriteLock 申请和释放锁都是由程序控制,如果使用不当,可能造成死锁,这是很危险的。

以下对比一下显示锁和内置锁的差异:

  • 主动获取锁和释放锁
    • synchronized 不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
    • ReentrantLock 可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
  • 响应中断
    • synchronized 不能响应中断。
    • ReentrantLock 可以响应中断。
  • 超时机制
    • synchronized 没有超时机制。
    • ReentrantLock 有超时机制。ReentrantLock 可以设置超时时间,超时后自动释放锁,避免一直等待。
  • 支持公平锁
    • synchronized 只支持非公平锁。
    • ReentrantLock 支持非公平锁和公平锁。
  • 是否支持共享
    • synchronized 修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
    • ReentrantLock 可以基于 Condition 灵活的控制同步条件。
  • 是否支持读写分离
    • synchronized 不支持读写锁分离;
    • ReentrantReadWriteLock 支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。

Lock 和 Condition

为何引入 Lock 和 Condition

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。

JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。synchronized 无法通过破坏不可抢占条件来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

与内置锁 synchronized 不同的是,**Lock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作**,所有获取锁、释放锁的操作都是显式的操作。

  • 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  • 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  • 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口

Lock 的接口定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
  • lock() - 获取锁。
  • unlock() - 释放锁。
  • tryLock() - 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
  • lockInterruptibly() - 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
  • newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。

Condition

Condition 实现了管程模型里面的条件变量

前文中提过 Lock 接口中 有一个 newCondition() 方法用于返回一个绑定到 Lock 对象上的 Condition 实例。Condition 是什么?有什么作用?本节将一一讲解。

在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。

Java 1.5 之前,主要是利用 Object 类中的 waitnotifynotifyAll 配合 synchronized 来进行线程间通信。waitnotifynotifyAll 需要配合 synchronized 使用,不适用于 Lock。而使用 Lock 的线程,彼此间通信应该使用 Condition 。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

Condition 的特性

Condition 接口定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相对应,功能也相似。除此以外,Condition 相比内置条件队列( waitnotifynotifyAll ),提供了更为丰富的功能:

  • 每个锁(Lock)上可以存在多个 Condition,这意味着锁的状态条件可以有多个。
  • 支持公平的或非公平的队列操作。
  • 支持可中断的条件等待,相关方法:awaitUninterruptibly()
  • 支持可定时的等待,相关方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

这里以 Condition 来实现一个消费者、生产者模式。

:bell: 注意:事实上,解决此类问题使用 CountDownLatchSemaphore 等工具更为便捷、安全。想了解详情,可以参考 Java 并发工具类

产品类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {

private final Lock lock = new ReentrantLock();

private final Condition producedMsg = lock.newCondition();

private final Condition consumedMsg = lock.newCondition();

private String message;

private boolean state;

private boolean end;

public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }

System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}

public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }

System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}

public boolean isEnd() {
return end;
}

public void setEnd(boolean end) {
this.end = end;
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MessageConsumer implements Runnable {

private Message message;

public MessageConsumer(Message msg) {
message = msg;
}

@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MessageProducer implements Runnable {

private Message message;

public MessageProducer(Message msg) {
message = msg;
}

@Override
public void run() {
produce();
}

public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");

for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

message.produce("End");
message.setEnd(true);
}

}

测试

1
2
3
4
5
6
7
8
9
10
public class LockConditionDemo {

public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}

ReentrantLock

ReentrantLock 类是 Lock 接口的具体实现,与内置锁 synchronized 相同的是,它是一个可重入锁

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性,增加了轮询、超时、中断等功能。
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下来,我们要讲述其具体用法。

ReentrantLock 的构造方法

ReentrantLock 有两个构造方法:

1
2
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)

lock 和 unlock 方法

  • lock() - 无条件获取锁。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
  • unlock() - 用于释放锁

:bell: 注意:请务必牢记,获取锁操作 lock() 必须在 try catch 块中进行,并且将释放锁操作 unlock() 放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生

示例:ReentrantLock 的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReentrantLockDemo {

public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}

static class MyThread extends Thread {

private Task task;

public MyThread(String name, Task task) {
super(name);
this.task = task;
}

@Override
public void run() {
task.execute();
}

}

static class Task {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());

// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());

// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());

// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());

// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());

// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...

tryLock 方法

与无条件获取锁相比,tryLock 有更完善的容错机制。

  • tryLock() - 可轮询获取锁。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回,获取不到锁(锁已被其他线程获取)时不会一直等待。
  • tryLock(long, TimeUnit) - 可定时获取锁。和 tryLock() 类似,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

示例:ReentrantLocktryLock() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}

示例:ReentrantLocktryLock(long, TimeUnit) 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中断获取锁。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch 块(如果在获取锁的操作中抛出了 InterruptedException ,那么可以使用标准的 try-finally 加锁模式)。
    • 举例来说:假设有两个线程同时通过 lock.lockInterruptibly() 获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

:bell: 注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。单独调用 interrupt() 方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。

示例:ReentrantLocklockInterruptibly() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute() {
try {
lock.lockInterruptibly();

for (int i = 0; i < 3; i++) {
// 略。..
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}

newCondition 方法

newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。Condition 的特性和具体方法请阅读下文 [Condition](#五 condition)。

ReentrantLock 的原理

ReentrantLock 的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。

ReentrantLock 的数据结构

阅读 ReentrantLock 的源码,可以发现它有一个核心字段:

1
private final Sync sync;
  • sync - 内部抽象类 ReentrantLock.Sync 对象,Sync 继承自 AQS。它有两个子类:
  • ReentrantLock.FairSync - 公平锁。
  • ReentrantLock.NonfairSync - 非公平锁。

查看源码可以发现,ReentrantLock 实现 Lock 接口其实是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现,这里不一一列举。

ReentrantLock 的获取锁和释放锁

ReentrantLock 获取锁和释放锁的接口,从表象看,是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现;从本质上看,是基于 AQS 的实现。

仔细阅读源码很容易发现:

  • void lock() 调用 Sync 的 lock() 方法。

  • void lockInterruptibly() 直接调用 AQS 的 [获取可中断的独占锁](#获取可中断的独占锁) 方法 lockInterruptibly()

  • boolean tryLock() 调用 Sync 的 nonfairTryAcquire()

  • boolean tryLock(long time, TimeUnit unit) 直接调用 AQS 的 [获取超时等待式的独占锁](#获取超时等待式的独占锁) 方法 tryAcquireNanos(int arg, long nanosTimeout)

  • void unlock() 直接调用 AQS 的 [释放独占锁](#释放独占锁) 方法 release(int arg)

直接调用 AQS 接口的方法就不再赘述了,其原理在 [AQS 的原理](#AQS 的原理) 中已经用很大篇幅进行过讲解。

nonfairTryAcquire 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 公平锁和非公平锁都会用这个方法区尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步状态为 0,将其设为 acquires,并设置当前线程为排它线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

处理流程很简单:

  • 如果同步状态为 0,设置同步状态设为 acquires,并设置当前线程为排它线程,然后返回 true,获取锁成功。
  • 如果同步状态不为 0 且当前线程为排它线程,设置同步状态为当前状态值+acquires 值,然后返回 true,获取锁成功。
  • 否则,返回 false,获取锁失败。

公平锁和非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

lock 方法在公平锁和非公平锁中的实现:

二者的区别仅在于申请非公平锁时,如果同步状态为 0,尝试将其设为 1,如果成功,直接将当前线程置为排它线程;否则和公平锁一样,调用 AQS 获取独占锁方法 acquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步状态为 0,将其设为 1,并设置当前线程为排它线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

// 公平锁实现
final void lock() {
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

ReentrantReadWriteLock

ReadWriteLock 适用于读多写少的场景

ReentrantReadWriteLock 类是 ReadWriteLock 接口的具体实现,它是一个可重入的读写锁。ReentrantReadWriteLock 维护了一对读写锁,将读写锁分开,有利于提高并发效率。

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性如下:

  • ReentrantReadWriteLock 适用于读多写少的场景。如果是写多读少的场景,由于 ReentrantReadWriteLock 其内部实现比 ReentrantLock 复杂,性能可能反而要差一些。如果存在这样的问题,需要具体问题具体分析。由于 ReentrantReadWriteLock 的读写锁(ReadLockWriteLock)都实现了 Lock 接口,所以要替换为 ReentrantLock 也较为容易。
  • ReentrantReadWriteLock 实现了 ReadWriteLock 接口,支持了 ReentrantLock 所不具备的读写锁分离。ReentrantReadWriteLock 维护了一对读写锁(ReadLockWriteLock)。将读写锁分开,有利于提高并发效率。ReentrantReadWriteLock 的加锁策略是:允许多个读操作并发执行,但每次只允许一个写操作
  • ReentrantReadWriteLock 为读写锁都提供了可重入的加锁语义。
  • ReentrantReadWriteLock 支持公平锁和非公平锁(默认)两种模式。

ReadWriteLock 接口定义如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
  • readLock - 返回用于读操作的锁(ReadLock)。
  • writeLock - 返回用于写操作的锁(WriteLock)。

在读写锁和写入锁之间的交互可以采用多种实现方式,ReadWriteLock 的一些可选实现包括:

  • 释放优先 - 当一个写入操作释放写锁,并且队列中同时存在读线程和写线程,那么应该优先选择读线程、写线程,还是最先发出请求的线程?
  • 读线程插队 - 如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但可能造成线程饥饿问题。
  • 重入性 - 读锁和写锁是否是可重入的?
  • 降级 - 如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读锁?这可能会使得写锁被降级为读锁,同时不允许其他写线程修改被保护的资源。
  • 升级 - 读锁能否优先于其他正在等待的读线程和写线程而升级为一个写锁?在大多数的读写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。

ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下来,我们要讲述其具体用法。

ReentrantReadWriteLock 的构造方法

ReentrantReadWriteLockReentrantLock 一样,也有两个构造方法,且用法相似。

1
2
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)。在非公平的锁中,线程获得锁的顺序是不确定的。写线程降级为读线程是可以的,但读线程升级为写线程是不可以的(这样会导致死锁)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)。对于公平锁,等待时间最长的线程将优先获得锁。如果这个锁是读线程持有,则另一个线程请求写锁,那么其他读线程都不能获得读锁,直到写线程释放写锁。

ReentrantReadWriteLock 的使用实例

在 [ReentrantReadWriteLock 的特性](#reentrantreadwritelock-的特性) 中已经介绍过,ReentrantReadWriteLock 的读写锁(ReadLockWriteLock) 都实现了 Lock 接口,所以其各自独立的使用方式与 ReentrantLock 一样,这里不再赘述。

ReentrantReadWriteLockReentrantLock 用法上的差异,主要在于读写锁的配合使用。本文以一个典型使用场景来进行讲解。

【示例】基于 ReadWriteLock 实现一个简单的泛型无界缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 简单的无界缓存实现
* <p>
* 使用 WeakHashMap 存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
*/
static class UnboundedCache<K, V> {

private final Map<K, V> cacheMap = new WeakHashMap<>();

private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 读数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}

public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 写入数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}

public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}

public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}

}

说明:

  • 使用 WeakHashMap 而不是 HashMap 来存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
  • Map 写数据前加写锁,写完后,释放写锁。
  • Map 读数据前加读锁,读完后,释放读锁。

测试其线程安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {

static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}

/** 线程任务每次向缓存中写入 3 个随机值,key 固定 */
static class MyThread implements Runnable {

@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}

}

}

说明:示例中,通过线程池启动 20 个并发任务。任务每次向缓存中写入 3 个随机值,key 固定;然后主线程每次固定读取缓存中第一个 key 的值。

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
main 读数据 0:null
pool-1-thread-1 写入数据 0:16
pool-1-thread-1 写入数据 1:58
pool-1-thread-1 写入数据 2:50
main 读数据 0:16
pool-1-thread-1 写入数据 0:85
pool-1-thread-1 写入数据 1:76
pool-1-thread-1 写入数据 2:46
pool-1-thread-2 写入数据 0:21
pool-1-thread-2 写入数据 1:41
pool-1-thread-2 写入数据 2:63
main 读数据 0:21
// ...

ReentrantReadWriteLock 的原理

前面了解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的数据结构

阅读 ReentrantReadWriteLock 的源码,可以发现它有三个核心字段:

1
2
3
4
5
6
7
8
9
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  • sync - 内部类 ReentrantReadWriteLock.Sync 对象。与 ReentrantLock 类似,它有两个子类:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分别表示公平锁和非公平锁的实现。
  • readerLock - 内部类 ReentrantReadWriteLock.ReadLock 对象,这是一把读锁。
  • writerLock - 内部类 ReentrantReadWriteLock.WriteLock 对象,这是一把写锁。

ReentrantReadWriteLock 的获取锁和释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class ReadLock implements Lock, java.io.Serializable {

// 调用 AQS 获取共享锁方法
public void lock() {
sync.acquireShared(1);
}

// 调用 AQS 释放共享锁方法
public void unlock() {
sync.releaseShared(1);
}
}

public static class WriteLock implements Lock, java.io.Serializable {

// 调用 AQS 获取独占锁方法
public void lock() {
sync.acquire(1);
}

// 调用 AQS 释放独占锁方法
public void unlock() {
sync.release(1);
}
}

StampedLock

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

注意这里,用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。

  • ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;
  • 而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是,StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量。
  • 如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

【示例】StampedLock 阻塞时,调用 interrupt() 导致 CPU 飙升

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(() ->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

【示例】StampedLock 读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final StampedLock sl = new StampedLock();

// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入方法局部变量
// ......
// 校验 stamp
if (!sl.validate(stamp)) {
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
// .....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
// ......

【示例】StampedLock 写模板:

1
2
3
4
5
6
7
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

参考资料

Java 并发之无锁

并发安全需要保证几个基本特性:

  • 可见性 - 是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 有序性 - 是保证线程内串行语义,避免指令重排等。
  • 原子性 - 简单说就是相关操作不会中途被其他线程干扰,一般通过互斥机制(加锁:sychronizedLock)实现。

互斥同步是最常见的原子性保障手段。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。因此,互斥同步也被称为阻塞同步。互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

解决并发安全问题,还可以采用无锁方案。无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性。

Java 中的无锁技术有:

  • CAS
  • 原子类
  • ThreadLocal
  • Copy-on-Write
  • 不变模式

CAS

CAS 的要点

CAS(Compare and Swap),字面意思为比较并交换。

CAS 涉及三个操作数:

  • V:需要读写的内存位置
  • A:进行比较的值
  • B:拟写入的新值

当且仅当 V 的值等于 A 时,才会通过原子方式用新值 B 来更新 A 的值,否则什么都不做

CAS 实际是乐观锁的一种实现方式,因此,CAS 只适用于线程冲突较少的情况

CAS 的应用

CAS 的典型应用场景是:

  • 原子类
  • 自旋锁

原子类

原子类是 CAS 在 Java 中最典型的应用。

我们先来看一个常见的代码片段。

1
2
3
if(a==b) {
a++;
}

如果 a++ 执行前, a 的值被修改了怎么办?还能得到预期值吗?出现该问题的原因是在并发环境下,以上代码片段不是原子操作,随时可能被其他线程所篡改。

解决这种问题的最经典方式是应用原子类的 incrementAndGet 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
count.incrementAndGet();
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}

}

J.U.C 包中提供了 AtomicBooleanAtomicIntegerAtomicLong 分别针对 BooleanIntegerLong 执行原子操作,操作和上面的示例大体相似,不做赘述。

自旋锁

利用原子类(本质上是 CAS),可以实现自旋锁。

所谓自旋锁,是指线程反复检查锁变量是否可用,直到成功为止。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。

示例:非线程安全示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AtomicReferenceDemo {

private static int ticket = 10;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 10 张票
pool-1-thread-3 卖出了第 10 张票
pool-1-thread-1 卖出了第 8 张票
pool-1-thread-2 卖出了第 9 张票
pool-1-thread-1 卖出了第 6 张票
pool-1-thread-3 卖出了第 7 张票
pool-1-thread-1 卖出了第 4 张票
pool-1-thread-2 卖出了第 5 张票
pool-1-thread-1 卖出了第 2 张票
pool-1-thread-3 卖出了第 3 张票
pool-1-thread-2 卖出了第 1 张票

很明显,出现了重复售票的情况。

【示例】使用自旋锁来保证线程安全

可以通过自旋锁这种非阻塞同步来保证线程安全,下面使用 AtomicReference 来实现一个自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 9 张票
pool-1-thread-3 卖出了第 8 张票
pool-1-thread-2 卖出了第 7 张票
pool-1-thread-3 卖出了第 6 张票
pool-1-thread-1 卖出了第 5 张票
pool-1-thread-2 卖出了第 4 张票
pool-1-thread-1 卖出了第 3 张票
pool-1-thread-3 卖出了第 2 张票
pool-1-thread-1 卖出了第 1 张票

CAS 的原理

在 Java 中,主要利用 Unsafe 这个类实现 CAS

Unsafe 类位于 sun.misc 包下,是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性,它通常用于 JVM 内部或一些需要极高性能和底层访问的库中,而不推荐普通开发者在应用程序中使用。

Unsafe 类提供了 compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong方法来实现的对 Objectintlong 类型的 CAS 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 以原子方式更新对象字段的值。
*/
boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
* 以原子方式更新 int 类型的对象字段的值。
*/
boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
* 以原子方式更新 long 类型的对象字段的值。
*/
boolean compareAndSwapLong(Object o, long offset, long expected, long x);

Unsafe 类中的 CAS 方法是 native 方法。native 关键字表明这些方法是用本地代码(通常是 C 或 C++)实现的,而不是用 Java 实现的。这些方法直接调用底层的、具有原子性的 CPU 指令来实现。

由于 CAS 操作可能会因为并发冲突而失败,因此通常会伴随着自旋,而所谓自旋,其实就是循环尝试。

Unsafe#getAndAddInt 源码:

1
2
3
4
5
6
7
8
9
10
// 原子地获取并增加整数值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 以 volatile 方式获取对象 o 在内存偏移量 offset 处的整数值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
// 返回旧值
return v;
}

CAS 的问题

一般情况下,CAS 比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程阻塞和唤醒的等待时间。但是,事物总会有利有弊,CAS 也存在三大问题:

  • ABA 问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子性

ABA 问题

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过,这就是 ABA 问题

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效

循环时间长开销大

自旋 CAS (不断尝试,直到成功为止)如果长时间不成功,会给 CPU 带来非常大的执行开销

如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用:

  • 它可以延迟流水线执行指令(de-pipeline), 使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  • 它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子性

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。

或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2, j = a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java 1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

原子类

原子类简介

原子性是确保并发安全三大特性之一。为了兼顾原子性以及锁带来的性能问题,Java 引入了 CAS (主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁),CAS 底层基于 CPU 指令(硬件支持)支持,具有原子性。并基于 CAS ,提供了一套原子工具类。

原子类比锁的粒度更细,更轻量级,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上。

原子类相当于一种泛化的 volatile 变量,能够支持原子的、有条件的读/改/写操作。

原子类可以分为 5 个类别,这 5 个类别提供的方法基本上是相似的:

  • 基本数据类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用数据类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组数据类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器
    • AtomicLongFieldUpdater - 长整型字段的原子更新器
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段
  • 累加器
    • DoubleAdder - 浮点型原子累加器
    • LongAdder - 长整型原子累加器
    • DoubleAccumulator - 更复杂的浮点型原子累加器
    • LongAccumulator - 更复杂的长整型原子累加器

原子类之基本数据类型

基本数据类型原子类针对 Java 基本类型提供原子操作

  • AtomicBoolean - 布尔类型原子类
  • AtomicInteger - 整型原子类
  • AtomicLong - 长整型原子类

以上类都支持 CAS(compare-and-swap)技术,此外,AtomicIntegerAtomicLong 还支持算术运算。

:bulb: 提示:

虽然 Java 只提供了 AtomicBooleanAtomicIntegerAtomicLong,但是可以模拟其他基本类型的原子变量。要想模拟其他基本类型的原子变量,可以将 shortbyte 等类型与 int 类型进行转换,以及使用 Float.floatToIntBitsDouble.doubleToLongBits 来转换浮点数。

由于 AtomicBooleanAtomicIntegerAtomicLong 实现方式、使用方式都相近,所以本文仅针对 AtomicInteger 进行介绍。

AtomicInteger 用法

1
2
3
4
5
6
7
public final int get() // 获取当前值
public final int getAndSet(int newValue) // 获取当前值,并设置新值
public final int getAndIncrement()// 获取当前值,并自增
public final int getAndDecrement() // 获取当前值,并自减
public final int getAndAdd(int delta) // 获取当前值,并加上预期值
boolean compareAndSet(int expect, int update) // 如果输入值(update)等于预期值,将该值设置为输入值
public final void lazySet(int newValue) // 最终设置为 newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
executorService.submit((Runnable) () -> {
System.out.println(Thread.currentThread().getName() + " count=" + count.get());
count.incrementAndGet();
});
}

executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}
}

AtomicInteger 实现

阅读 AtomicInteger 源码,可以看到如下定义:

1
2
3
4
5
6
7
8
9
10
11
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

说明:

  • value - value 属性使用 volatile 修饰,使得对 value 的修改在并发环境下对所有线程可见。
  • valueOffset - value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
  • unsafe - Unsafe 类型的属性,它为 AtomicInteger 提供了 CAS 操作。

原子类之引用数据类型

Java 数据类型分为 基本数据类型引用数据类型 两大类(不了解 Java 数据类型划分可以参考: Java 基本数据类型 )。

上一节中提到了针对基本数据类型的原子类,那么如果想针对引用类型做原子操作怎么办?Java 也提供了相关的原子类:

  • AtomicReference - 引用类型原子类
  • AtomicMarkableReference - 带有标记位的引用类型原子类
  • AtomicStampedReference - 带有版本号的引用类型原子类

AtomicStampedReference 类在引用类型原子类中,彻底地解决了 ABA 问题,其它的 CAS 能力与另外两个类相近,所以最具代表性。因此,本节只针对 AtomicStampedReference 进行说明。

::: tabs#原子类之引用类型示例

@tab AtomicReference 使用示例

【示例】基于 AtomicReference 实现一个简单的自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

/**
* 基于 {@link AtomicReference} 实现的简单自旋锁
*/
static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

/**
* 利用自旋锁 {@link SpinLock} 并发处理数据
*/
static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

@tab AtomicMarkableReference 使用示例

【示例】AtomicMarkableReference 使用示例(解决 ABA 问题)

原子类的实现基于 CAS 机制,而 CAS 存在 ABA 问题(不了解 ABA 问题,可以参考:Java 并发之内存模型)。正是为了解决 ABA 问题,才有了 AtomicMarkableReferenceAtomicStampedReference

AtomicMarkableReference 使用一个布尔值作为标记,修改时在 true / false 之间切换。这种策略不能根本上解决 ABA 问题,但是可以降低 ABA 发生的几率。常用于缓存或者状态描述这样的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AtomicMarkableReferenceDemo {

private final static String INIT_TEXT = "abc";

public static void main(String[] args) throws InterruptedException {

final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(INIT_TEXT, false);

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

String name = Thread.currentThread().getName();
if (amr.compareAndSet(INIT_TEXT, name, amr.isMarked(), !amr.isMarked())) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + amr.getReference());
}
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

}

@tab AtomicStampedReference 使用示例

【示例】AtomicStampedReference 使用示例

AtomicStampedReference 使用一个整型值做为版本号,每次更新前先比较版本号,如果一致,才进行修改。通过这种策略,可以根本上解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AtomicStampedReferenceDemo {

private final static String INIT_REF = "pool-1-thread-3";

private final static AtomicStampedReference<String> asr = new AtomicStampedReference<>(INIT_REF, 0);

public static void main(String[] args) throws InterruptedException {

System.out.println("初始对象为:" + asr.getReference());

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyThread());
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

static class MyThread implements Runnable {

@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

final int stamp = asr.getStamp();
if (asr.compareAndSet(INIT_REF, Thread.currentThread().getName(), stamp, stamp + 1)) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + asr.getReference());
}
}

}

}

:::

原子类之数组数据类型

Java 提供了以下针对数组的原子类:

  • AtomicIntegerArray - 整形数组原子类
  • AtomicLongArray - 长整型数组原子类
  • AtomicReferenceArray - 引用类型数组原子类

已经有了针对基本类型和引用类型的原子类,为什么还要提供针对数组的原子类呢?

数组类型的原子类为数组元素提供了 volatile 类型的访问语义,这是普通数组所不具备的特性——**volatile 类型的数组仅在数组引用上具有 volatile 语义**。

【示例】AtomicIntegerArray 使用示例(AtomicLongArrayAtomicReferenceArray 使用方式也类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicIntegerArrayDemo {

private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

public static void main(final String[] arguments) throws InterruptedException {

System.out.println("Init Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, i);
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();

Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("Final Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
}

static class Increment implements Runnable {

@Override
public void run() {

for (int i = 0; i < atomicIntegerArray.length(); i++) {
int value = atomicIntegerArray.incrementAndGet(i);
System.out.println(Thread.currentThread().getName() + ", index = " + i + ", value = " + value);
}
}

}

static class Compare implements Runnable {

@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if (swapped) {
System.out.println(Thread.currentThread().getName() + " swapped, index = " + i + ", value = 3");
}
}
}

}

}

原子类之属性更新器

属性更新器支持基于反射机制的更新字段值的原子操作

  • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
  • AtomicLongFieldUpdater - 长整型字段的原子更新器。
  • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

这些类的使用有一定限制:

  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。
  • 字段必须是 volatile 类型的;
  • 不能作用于静态变量(static);
  • 不能作用于常量(final);

【示例】AtomicReferenceFieldUpdater 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class AtomicReferenceFieldUpdaterDemo {

static User user = new User("begin");

static AtomicReferenceFieldUpdater<User, String> updater =
AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
if (updater.compareAndSet(user, "begin", "end")) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 已修改 name = " + user.getName());
} else {
System.out.println(Thread.currentThread().getName() + " 已被其他线程修改");
}
}

}

static class User {

volatile String name;

public User(String name) {
this.name = name;
}

public String getName() {
return name;
}

public User setName(String name) {
this.name = name;
return this;
}

}

}

原子类之累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好,代价就是会消耗更多的内存空间。

LongAdder 内部由一个 base 变量和一个 cell[] 数组组成。

  • 当只有一个写线程,没有竞争的情况下,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;
  • 当有多个写线程竞争的情况下,除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell[] 数组中。

我们可以发现,LongAdder 在操作后的返回值只是一个近似准确的数值,但是 LongAdder 最终返回的是一个准确的数值, 所以在一些对实时性要求比较高的场景下,LongAdder 并不能取代 AtomicIntegerAtomicLong

ThreadLocal

在多线程环境下,共享变量存在并发安全问题。换个思路,如果变量非共享,而是各个线程独享,就不会有并发安全问题。这种思想有个术语叫线程封闭,其本质上就是避免共享。没有共享,自然也就没有并发安全问题。在 Java 中,ThreadLocal 正是根据这个思路而设计的。

ThreadLocal 为每个线程都创建了一个本地副本,这个副本只能被当前线程访问,其他线程无法访问,那么自然是线程安全的。

ThreadLocal 的应用

ThreadLocal 的方法:

1
2
3
4
5
6
public class ThreadLocal<T> {
public T get() {}
public void set(T value) {}
public void remove() {}
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {}
}

说明:

  • get - 用于获取 ThreadLocal 在当前线程中保存的变量副本。
  • set - 用于设置当前线程中变量的副本。
  • remove - 用于删除当前线程中变量的副本。如果此线程局部变量随后被当前线程读取,则其值将通过调用其 initialValue 方法重新初始化,除非其值由中间线程中的当前线程设置。 这可能会导致当前线程中多次调用 initialValue 方法。
  • initialValue - 为 ThreadLocal 设置默认的 get 初始值,需要重写 initialValue 方法 。

ThreadLocal 常用于防止对可变的单例(Singleton)变量或全局变量进行共享。典型应用场景有:管理数据库连接、Session 管理等。

::: tabs#ThreadLocal 应用示例

@tab 数据库连接

【示例】数据库连接

1
2
3
4
5
6
7
8
9
10
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
@Override
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};

public static Connection getConnection() {
return connectionHolder.get();
}

@tab Session 管理

【示例】Session 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final ThreadLocal<Session> sessionHolder = new ThreadLocal<>();

public static Session getSession() {
Session session = (Session) sessionHolder.get();
try {
if (session == null) {
session = createSession();
sessionHolder.set(session);
}
} catch (Exception e) {
e.printStackTrace();
}
return session;
}

@tab 线程安全的 SimpleDateFormat

【示例】线程安全的 SimpleDateFormat

SimpleDateFormat 不是线程安全的,如果要保证并发安全,可以使用 ThreadLocal 来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SafeDateFormat {

//定义 ThreadLocal 变量
static final ThreadLocal<DateFormat>
tl = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

static DateFormat get() {
return tl.get();
}

public static void main(String[] args) {
//不同线程执行下面代码
//返回的 df 是不同的
DateFormat df = SafeDateFormat.get();
}

}

@tab 完整使用 ThreadLocal 示例

【示例】完整使用 ThreadLocal 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadLocalDemo {

private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
int count = threadLocal.get();
for (int i = 0; i < 10; i++) {
try {
count++;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
threadLocal.set(count);
threadLocal.remove();
System.out.println(Thread.currentThread().getName() + " : " + count);
}

}

}

:::

ThreadLocal 的原理

存储结构

Thread 类中维护着 2 个 ThreadLocal.ThreadLocalMap 类型的成员 threadLocals 和 inheritableThreadLocals 。这 2 个成员就是用来存储当前线程独占的变量副本。

ThreadLocalMapThreadLocal 的内部类,它维护着一个 Entry 数组,**Entry 继承了 WeakReference** ,所以是弱引用。 Entry 用于保存键值对,其中:

  • keyThreadLocal 对象
  • value 是传递进来的对象(变量副本)

ThreadLocal 关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Thread implements Runnable {
// ...
ThreadLocal.ThreadLocalMap threadLocals = null;
// ...
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

static class ThreadLocalMap {
// ...
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ...
}

如何解决 Hash 冲突

ThreadLocalMap 虽然是类似 Map 结构的数据结构,但它并没有实现 Map 接口。它不支持 Map 接口中的 next 方法,这意味着 ThreadLocalMap 中解决 Hash 冲突的方式并非 拉链表 方式。

实际上,**ThreadLocalMap 采用线性探测的方式来解决 Hash 冲突**。所谓线性探测,就是根据初始 key 的 hashcode 值确定元素在 table 数组中的位置,如果发现这个位置上已经被其他的 key 值占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

内存泄漏问题

ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在 Thread 里面,这样的设计容易理解。

当然还有一个更加深层次的原因,那就是不容易产生内存泄露。如果 ThreadLocal 和实际实现反其道而行之:将 Thread 的引用维护在一个 Map 中,就会出现这种情况——只要 ThreadLocal 对象存在,那么 Map 中的 Thread 对象就永远不会被回收。而 ThreadLocal 的生命周期往往都比线程要长,所以这种设计方案很容易导致内存泄露。而 Java 的实现中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里对 ThreadLocal 的引用还是弱引用(WeakReference),所以只要 Thread 对象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的这种实现方案虽然看上去复杂一些,但是更加安全。

ThreadLocalMapEntry 继承了 WeakReference,所以它的 key (ThreadLocal 对象)是弱引用,而 value (变量副本)是强引用。如果 ThreadLocal 对象没有外部强引用来引用它,那么 ThreadLocal 对象会在下次 GC 时被回收。此时,Entry 中的 key 已经被回收,但是 value 由于是强引用不会被垃圾收集器回收。如果创建 ThreadLocal 的线程一直持续运行,那么 value 就会一直得不到回收,从而导致内存泄露

那么如何避免内存泄漏呢?方法就是:使用 ThreadLocalset 方法后,在 try {} finally {} 中显示的调用 remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService es;
ThreadLocal tl;
es.execute(() -> {
//ThreadLocal 增加变量
tl.set(obj);
try {
// 省略业务逻辑代码
} finally {
//手动清理 ThreadLocal
tl.remove();
}
});

ThreadLocal 的误区

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。

前文提到,ThreadLocal 是线程隔离的,那么是不是使用 ThreadLocal 就一定高枕无忧呢?

ThreadLocal 错误案例

使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")
public Map<String, String> wrong(@RequestParam("id") Integer userId) {
//设置用户信息之前先查询一次 ThreadLocal 中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到 ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次 ThreadLocal 中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
}

【预期】从代码逻辑来看,我们预期第一次获取的值始终应该是 null。

【实际】

为了方便复现,将 Tomcat 工作线程设为 1:

1
server.tomcat.max-threads=1

当访问 id = 1 时,符合预期

img

当访问 id = 2 时,before 的应答不是 null,而是 1,不符合预期。

【分析】实际情况和预期存在偏差。Spring Boot 程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从
ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息

并不能认为没有显式开启多线程就不会有线程安全问题。使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。

ThreadLocal 错误案例修正

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("right")
public Map<String, String> right(@RequestParam("id") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在 finally 代码块中删除 ThreadLocal 中的数据,确保数据不串
currentUser.remove();
}
}

InheritableThreadLocal

通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。也就是说你在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,你在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。

如果你需要子线程继承父线程的线程变量,那该怎么办呢?其实很简单,Java 提供了 InheritableThreadLocal 来支持这种特性,InheritableThreadLocalThreadLocal 子类,所以用法和 ThreadLocal 相同。与 ThreadLocal 不同的是,InheritableThreadLocal 允许一个线程以及该线程创建的所有子线程都可以访问它保存的数据。

不过,完全不建议你在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。

原理参考:Java 多线程:InheritableThreadLocal 实现原理

Immutability 模式

解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作。这个办法如此重要,以至于被上升到了一种解决并发问题的设计模式:不变性(Immutability)模式。所谓不变性,是指:一旦创建,状态不再变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。

快速实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性,所以推荐你在实际工作中,使用这种更严格的做法。

在 Java 中,经常用到的 StringLongIntegerDouble 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的

String 这个类虽然有替换操作,但实际仍是只读的。阅读 String 源码可以发现:String 这个类以及它的属性 value[] 都是 final 的;而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class String {
private final char value[];
// 字符替换
String replace(char oldChar,
char newChar) {
//无需替换,直接返回 this
if (oldChar == newChar){
return this;
}

int len = value.length;
int i = -1;
/* avoid getfield opcode */
char[] val = value;
//定位到需要替换的字符位置
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
//未找到 oldChar,无需替换
if (i >= len) {
return this;
}
//创建一个 buf[],这是关键
//用来保存替换后的字符串
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ?
newChar : c;
i++;
}
//创建一个新的字符串返回
//原字符串不会发生任何变化
return new String(buf, true);
}
}

通过分析 String 的实现,你可能已经发现了,如果具备不可变性的类,需要提供类似修改的功能,具体该怎么操作呢?做法很简单,那就是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性。

使用 Immutability 模式的注意事项

在使用 Immutability 模式的时候,需要注意以下两点:

  1. 对象的所有属性都是 final 的,并不能保证不可变性;
  2. 不可变对象也需要正确发布。

在 Java 语言中,final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。例如下面的代码中,Bar 的属性 foo 虽然是 final 的,依然可以通过 setAge() 方法来设置 foo 的属性 age。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性

1
2
3
4
5
6
7
8
9
10
class Foo{
int age=0;
int name="abc";
}
final class Bar {
final Foo foo;
void setAge(int a){
foo.age=a;
}
}

不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。例如在下面的代码中,Foo 具备不可变性,线程安全,但是类 Bar 并不是线程安全的,类 Bar 中持有对 Foo 的引用 foo,对 foo 这个引用的修改在多线程中并不能保证可见性和原子性。

1
2
3
4
5
6
7
8
9
10
11
12
//Foo 线程安全
final class Foo{
final int age=0;
final int name="abc";
}
//Bar 线程不安全
class Bar {
Foo foo;
void setFoo(Foo f){
this.foo=f;
}
}

如果你的程序仅仅需要 foo 保持可见性,无需保证原子性,那么可以将 foo 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现。下面的示例代码是合理库存的原子化实现,你应该很熟悉了,其中就是用原子类解决了不可变对象引用的原子性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SafeWM {

class WMRange {
final int upper;
final int lower;
WMRange(int upper, int lower) {
//省略构造函数实现
}
}

final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0, 0));

// 设置库存上限
void setUpper(int v) {
while (true) {
WMRange or = rf.get();
// 检查参数合法性
if (v < or.lower) {
throw new IllegalArgumentException();
}
WMRange nr = new WMRange(v, or.lower);
if (rf.compareAndSet(or, nr)) {
return;
}
}
}
}

Copy-on-Write 模式

所谓 Copy-on-Write,经常被缩写为 CoW,顾名思义就是写时复制

Java 支持 CopyOnWriteArrayListCopyOnWriteArraySet 两种并发容器,其设计思想就是 CoW;通过 Copy-on-Write 这两个容器实现的读操作是无锁的,由于无锁,所以将读操作的性能发挥到了极致。

CoW 是一项非常通用的技术方案,在很多领域都有着广泛的应用。不过,它也有缺点的,那就是消耗内存,每次修改都需要复制一个新的副本出来。

参考资料

Java 并发之同步工具

Semaphore

Semaphore 译为信号量,是一种同步机制,用于控制多线程对共享资源的访问。信号量是由计算机科学家 Edsger Dijkstra 于 1965 年提出的,用于解决所谓的“临界区”问题,即多个进程或线程试图同时访问共享资源(如打印机、内存缓冲区等)时可能出现的问题。

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

  • 这三个方法详细的语义具体如下所示。

    • init():设置计数器的初始值。
    • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
    • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

    这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java 中,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

    信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。

Semaphore 使用

Semaphore 提供了 2 个构造方法:

1
2
3
4
// 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
// 参数 fair 表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {}

说明:

  • permits - 初始化固定数量的 permit。
  • fair - 设置是否为公平模式。所谓公平,是指等待久的优先获取 permit。

Semaphore 的重要方法:

1
2
3
4
5
6
7
8
// 获取 1 个许可
public void acquire() throws InterruptedException {}
//获取 permits 个许可
public void acquire(int permits) throws InterruptedException {}
// 释放 1 个许可
public void release() {}
//释放 permits 个许可
public void release(int permits) {}

说明:

  • acquire() - 获取 1 个 permit。
  • acquire(int permits) - 获取 permits 数量的 permit。
  • release() - 释放 1 个 permit。
  • release(int permits) - 释放 permits 数量的 permit。

img

【示例】Semaphore 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

threadPool.shutdown();
}

}

Semaphore 原理

Semaphore 是共享锁的一种实现,它默认构造 AQS 的 state 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

调用semaphore.acquire() ,线程尝试获取许可证,如果 state >= 0 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 state 的值 state=state-1。如果 state<0 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 获取 1 个许可证
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg 为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于 0, 则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state>=0 则获取令牌成功,否则重新进入阻塞队列,挂起线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}

实现一个限流器

Semaphore 最重要的特性是:Semaphore 可以允许多个线程访问一个临界区

Semaphore 在现实中有很多应用场景:

  • 各种池化资源,例如连接池、对象池、线程池等;
  • 信号量限流(例如 Hystrix 就支持信号量限流模式);

【示例】一个基于信号量实现的简单对象限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SemaphoreRateLimit {

public static void main(String[] args) {
// 创建对象池,大小为 10
ObjectPool<Long, String> pool = new ObjectPool<>(10, 2L);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
}

static class ObjectPool<T, R> {

final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;

// 构造函数
ObjectPool(int size, T t) {
pool = new Vector<T>() { };
for (int i = 0; i < size; i++) {
pool.add(t);
}
sem = new Semaphore(size);
}

// 利用对象池的对象,调用 func
R exec(Function<T, R> func) {
T t = null;
try {
sem.acquire();
t = pool.remove(0);
return func.apply(t);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.add(t);
sem.release();
return null;
}
}

}

}

在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号量,而其他线程则会阻塞在 acquire() 方法上。对于通过信号量的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

CountDownLatch

CountDownLatch 字面意思为递减计数锁。用于控制一个线程等待多个线程

CountDownLatch 内部维护了一个计数器,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生。调用 await 方法的线程会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

img

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。直到count 个线程调用了countDown()使 state 值被减为 0,或者调用await()的线程被中断,该线程才会从阻塞中被唤醒,await() 方法之后的语句得到执行。

CountDownLatch 唯一的构造方法:

1
2
// 初始化计数器
public CountDownLatch(int count) {};

CountDownLatch 的重要方法:

1
2
3
public void await() throws InterruptedException { };
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { };

说明:

  • await() - 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行。
  • await(long timeout, TimeUnit unit) - 和 await() 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
  • countDown() - 将统计值 count 减 1

【示例】CountDownLatch 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CountDownLatchDemo {

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(new MyThread(latch)).start();

try {
System.out.println("等待 2 个子线程执行完毕。..");
latch.await();
System.out.println("2 个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static class MyThread implements Runnable {

private CountDownLatch latch;

public MyThread(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
}

}

}

CyclicBarrier

CyclicBarrier 字面意思是循环栅栏。**CyclicBarrier 可以让一组线程等待至某个状态(遵循字面意思,不妨称这个状态为栅栏)之后再全部同时执行。之所以叫循环栅栏是因为:当所有等待线程都被释放以后,CyclicBarrier 可以被重用**。

CyclicBarrier 是基于 ReentrantLockReentrantLock 底层也是基于 AQS 实现的)和 Condition 实现的。CyclicBarrier 内部维护一个计数器,每次执行 await 方法之后,计数器加 1,直到计数器的值和设置的值相等,等待的所有线程才会继续执行。

CyclicBarrier 在并行迭代算法中非常有用。

img

CyclicBarrier 提供了 2 个构造方法

1
2
public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}

说明:

  • parties - parties 数相当于一个阈值,当有 parties 数量的线程在等待时, CyclicBarrier 处于栅栏状态。
  • barrierAction - 当 CyclicBarrier 处于栅栏状态时执行的动作。

CyclicBarrier 的重要方法:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {}
// 将屏障重置为初始状态
public void reset() {}

说明:

  • await() - 等待调用 await() 的线程数达到屏障数。如果当前线程是最后一个到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。如果在屏障动作期间发生异常,那么该异常将在当前线程中传播并且屏障被置于断开状态。
  • await(long timeout, TimeUnit unit) - 相比于 await() 方法,这个方法让这些线程等待至一定的时间,如果还有线程没有到达栅栏状态就直接让到达栅栏状态的线程执行后续任务。
  • reset() - 将屏障重置为初始状态。

【示例】CyclicBarrier 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierDemo {

final static int N = 4;

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getName());
}
});

for (int i = 0; i < N; i++) {
MyThread myThread = new MyThread(barrier);
new Thread(myThread).start();
}
}

static class MyThread implements Runnable {

private CyclicBarrier cyclicBarrier;

MyThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据。..");
try {
Thread.sleep(3000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

}

小结

  • CountDownLatchCyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch 是不可以重用的,而 CyclicBarrier 是可以重用的。
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。

参考资料